Version: 5.1.1
STOMP
If you find that this library lacks some feature you need, or you have
a suggestion for improving it, please don’t hesitate to
get in touch with me!
1 Introduction
This library implements the
STOMP protocol, both the
codec for STOMP frames and the session protocol used by clients.
The
STOMP
specification includes a lot of important information that’s needed
to make sense of the definitions below, including such things as
what a "destination" is
how to correctly acknowledge messages received from the server
how to arrange for acknowledgements-of-receipt from the server
how to manage transactions
1.1 Destinations
When using RabbitMQ with its
STOMP plugin as your
STOMP server, the available destinations are
(string-append "/queue/" queue-name) — When used
with stomp-send, delivers a message directly to the named
queue. When used with stomp-subscribe, consumes from the
named queue (at which point it’s interesting to think about which
ack-mode you want to use).
(string-append "/exchange/" exchange-name "/" routing-key)
— When used with stomp-send, delivers a
message via the named exchange, using the given
routing-key. When used with stomp-subscribe, creates
an anonymous queue, binds it to the named exchange using
routing-key as a binding pattern, and starts consuming from
the anonymous queue.
ActiveMQ and other STOMP message
brokers have different destination name schemata and routing
behaviours.
2 Examples
In the examples below, I’ll make use of the
RabbitMQ
demonstration broker service.
2.1 Sending a message
(require (planet tonyg/stomp)) |
(define s (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/")) |
(stomp-send s "/exchange/amq.rabbitmq.log/info" |
(string->bytes/utf-8 "Hello world, from Racket!")) |
(stomp-disconnect s) |
2.2 Sending a message, with a receipt
(require (planet tonyg/stomp)) |
(define s (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/")) |
(call-with-receipt s |
(lambda (receipt) |
(stomp-send s "/exchange/amq.rabbitmq.log/info" |
(string->bytes/utf-8 "Hello world, from Racket!") |
`((receipt ,receipt))))) |
; At this point, we know the server has received our |
; SEND, because we have received its RECEIPT frame. See |
; the STOMP specification for details of what exact |
; implications this has for whether the server has |
; processed the SEND or not. |
(stomp-disconnect s) |
2.3 Subscribing to an exchange
This example uses RabbitMQ’s AMQP "wildcards" to subscribe to all
messages travelling through the "amq.rabbitmq.log" exchange.
(require (planet tonyg/stomp)) |
(define s (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/")) |
(stomp-subscribe s "/exchange/amq.rabbitmq.log/#" "my-subscription") |
(let loop () |
(let ((m (stomp-next-message s "my-subscription"))) |
(pretty-print m) |
(loop))) |
3 API
Represents a STOMP error. The frame is either an ERROR frame
received from the server, or a frame used locally that is problematic
in some way.
Represents a STOMP frame, with its three parts:
command — The STOMP command part of the frame. For
frames received from the server, this will usually be
"MESSAGE". Frames sent to the server are usually constructed
using the procedures described below (in particular
stomp-send).
headers — The STOMP headers sent or received with
the frame. See the
STOMP
specification for details of the meaning of various headers.
body — The body sent or received with the
frame. This is entirely application-specific: STOMP makes no
restrictions on the length or format of the body part of a frame. Note
that it is a byte vector, however: make sure to use
string->bytes/utf-8 and bytes->string/utf-8 as
appropriate.
(stomp-frame-header | | frame | | | | | | | header | | | | | | [ | default-value]) | | → | | any? |
|
frame : stomp-frame? |
header : symbol? |
default-value : any? = #f |
Convenience function for extracting a single header from a STOMP
frame. If a header named header is present, its (string)
value is returned; otherwise, default-value is returned.
(stomp-connect | | hostname | | | | | | [ | login | | | | | | | passcode | | | | | | | virtual-host | | | | | | | port-number]) | | → | | stomp-session? |
|
hostname : string? |
login : (or string? #f) = #f |
passcode : (or string? #f) = #f |
virtual-host : string? = hostname |
|
Opens a STOMP connection and session to the given
hostname at
TCP port
port-number. If
login and/or
passcode are supplied, they are sent during connection
negotiation. (Note: they are sent in the clear!) If
virtual-host is not specified, the value of
hostname
is used. Note that
RabbitMQ’s
default virtual host is
"/", so to make a connection to the
demo STOMP server hosted at dev.rabbitmq.com, you would use:
> (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/")
(stomp-disconnect session) → void? |
session : stomp-session? |
Cleanly disconnects a STOMP session, sending a DISCONNECT frame and
waiting for receipt of an acknowledgement before closing the session’s
underlying socket.
(stomp-disconnect/abrupt session) → void? |
session : stomp-session? |
Abruptly disconnects a STOMP session, closing the socket without
sending a DISCONNECT (or any other) frame.
(stomp-flush session) → void? |
session : stomp-session? |
Outbound frames will frequently be buffered to improve
performance. Automatic buffer flushes happen every time a frame is
read from the connection, so in many cases manual flushing is not
required, but every now and then (especially during development and
testing) it can be important to force all queued frames to be sent
down the socket. This is what stomp-flush is for.
(stomp-message-id frame) → (or string? #f) |
frame : stomp-frame? |
Extracts the message-id header from the given frame (usually
a MESSAGE frame), if any. Returns #f if no such header exists
within the frame.
(wait-for-receipt session receipt) → void? |
session : stomp-session? |
receipt : string? |
Waits for the server to send a RECEIPT frame with a
receipt-id header matching the given receipt
value. Returns an end-of-file object if the connection was closed
before such a receipt was received.
(call-with-receipt session proc) → any? |
session : stomp-session? |
proc : (-> string? any?) |
Creates a fresh receipt string (of the form "R1234") and
passes it to proc, which should use it in a receipt
header of a frame sent to the server. After proc returns,
this function calls wait-for-receipt to wait for the server’s
acknowledgement. The result of proc is finally returned as
the result of the whole call.
The following example demonstrates the use of a receipt request with a
SEND operation:
> (call-with-receipt session |
(lambda (receipt) |
(stomp-send session |
"/queue/a" |
(string->bytes/utf-8 "some message body") |
`((receipt ,receipt))))) |
Sends a STOMP frame with the given command, headers
and optional body to the server. See
call-with-receipt for information on how to wait for an
acknowledgement-of-receipt from the server.
Note that this is a low-level way of sending commands to the server:
better to use stomp-send, stomp-subscribe,
stomp-ack-message etc.
(stomp-next-frame session [block?]) |
→ (or stomp-frame? eof-object? #f) |
session : stomp-session? |
block? : boolean? = #t |
Retrieves the next frame from the server. If no frames are in the
session’s buffer of waiting frames already received, returns
#f if block? is #f and waits for a frame to
arrive otherwise. If the connection closes before a frame arrives,
returns an end-of-file object. Otherwise, returns a
stomp-frame structure.
(stomp-next-frame/filter | | session | | | | predicate | | | [ | block?]) | |
|
→ (or stomp-frame? eof-object? #f) |
session : stomp-session? |
predicate : (-> stomp-frame? boolean?) |
block? : boolean? = #t |
As stomp-next-frame, except returns the first frame that
matches predicate, if any. If no match is found in the buffer
and block? is #f, returns #f ; otherwise,
waits for a match to arrive, buffering non-matching frames as it goes.
Never reorders frames in the buffer.
(stomp-next-message | | session | | | | subscription-id | | | [ | block?]) | |
|
→ (or stomp-frame? eof-object? #f) |
session : stomp-session? |
subscription-id : string? |
block? : boolean? = #t |
Uses stomp-next-frame/filter to retrieve the next available
MESSAGE frame that has a subscription header matching
subscription-id. The block? argument acts as for
stomp-next-frame/filter. Returns the first matching MESSAGE
frame, end-of-file if the connection closed, or #f if block?
was #f and no matching MESSAGE was available in the session’s
buffer.
Sends a SEND frame to the server with the given destination,
other headers, and optional body. See
call-with-receipt for information on getting acknowledgements
back from the server.
This is the procedure you will want to use to actually publish
messages to the STOMP server.
Just like stomp-send, but calls stomp-flush as it
returns.
(stomp-subscribe | | session | | | | | | | destination | | | | | | | subscription-id | | | | | | | ack-mode | | | | | | [ | headers]) | | → | | void? |
|
session : stomp-session? |
destination : string? |
subscription-id : string? |
ack-mode : (or 'auto 'client 'client-individual) |
headers : (listof (list symbol? string?)) = '() |
Sends a SUBSCRIBE frame to the server. The destination is the
name of the message source to subscribe to (called "destination"
because that’s how message senders think of it, presumably). The
subscription-id is a session-unique string identifying this
subscription. The subscription-id will be sent by the server
in MESSAGE frames that result from this SUBSCRIBE operation. The
ack-mode parameter must be one of the following:
'auto — The server will not expect any ACK frames
in response to MESSAGEs it sends.
'client — The server will expect ACK frames, and
will interpret an acknowledgement of message ID m to mean
that message and all preceding messages.
'client-individual — The server will expect ACK
frames, but will interpret each such frame as acknowledging only the
message ID mentioned within it.
Proceeds without waiting for a reply. To wait for a reply, supply a receipt
header; see call-with-receipt.
Sends an ACK frame in response to some previous MESSAGE received from
the server. The subscription-id and message-id
should match the subscription and message-id headers
from the MESSAGE being responded to, respectively.
Use this procedure or stomp-ack-message to acknowledge
messages received via a call to stomp-subscribe where
ack-mode was either 'client or
'client-individual.
(stomp-ack-message session message [headers]) → void? |
session : stomp-session? |
message : stomp-frame? |
headers : (listof (list symbol? string?)) = '() |
Convenience function that extracts the subscription and
message-id headers from the message and delegates to
stomp-ack.
Start, commit, or abort a transaction, respectively. Transaction names
are managed by the client. See
the
STOMP specification for BEGIN, COMMIT and ABORT for more information
on how transaction names are used.
The procedure call-with-stomp-transaction abstracts away from
some of the detail of managing transactions for you.
(call-with-stomp-transaction session proc) → any? |
session : stomp-session? |
proc : (-> string? any) |
Creates a fresh transaction name (of the form "Tx1234"),
sends a BEGIN frame, calls proc with the transaction name as
its argument, and sends a COMMIT frame. If proc terminates
normally, the result of proc becomes the result of the whole
call. If it terminates with an exception, an ABORT frame is sent to
the server before the exception propagates out of the call to
call-with-stomp-transaction.
Represents a STOMP client session. The input and
output represent the socket connection to the server. The
id is the session ID, as decided by the server. The
server-info is ad-hoc server information, if any was sent
during connection setup. The buffer is a queue of received
frames that have not yet been processed.