STOMP
If you find that this library lacks some feature you need, or you have a suggestion for improving it, please don’t hesitate to get in touch with me!
1 Introduction
This library implements the STOMP protocol, both the codec for STOMP frames and the session protocol used by clients.
The STOMP specification includes a lot of important information that’s needed to make sense of the definitions below, including such things as
what a "destination" is
how to correctly acknowledge messages received from the server
how to arrange for acknowledgements-of-receipt from the server
how to manage transactions
1.1 Destinations
When using RabbitMQ with its STOMP plugin as your STOMP server, the available destinations are
(string-append "/queue/" queue-name) —
When used with stomp-send, delivers a message directly to the named queue. When used with stomp-subscribe, consumes from the named queue (at which point it’s interesting to think about which ack-mode you want to use). (string-append "/exchange/" exchange-name "/" routing-key) —
When used with stomp-send, delivers a message via the named exchange, using the given routing-key. When used with stomp-subscribe, creates an anonymous queue, binds it to the named exchange using routing-key as a binding pattern, and starts consuming from the anonymous queue.
ActiveMQ and other STOMP message brokers have different destination name schemata and routing behaviours.
2 Examples
In the examples below, I’ll make use of the RabbitMQ demonstration broker service.
2.1 Sending a message
(require (planet tonyg/stomp)) (define s (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/")) (stomp-send s "/exchange/amq.rabbitmq.log/info" (string->bytes/utf-8 "Hello world, from Racket!")) (stomp-disconnect s)
2.2 Sending a message, with a receipt
(require (planet tonyg/stomp)) (define s (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/")) (call-with-receipt s (lambda (receipt) (stomp-send s "/exchange/amq.rabbitmq.log/info" (string->bytes/utf-8 "Hello world, from Racket!") `((receipt ,receipt))))) ; At this point, we know the server has received our ; SEND, because we have received its RECEIPT frame. See ; the STOMP specification for details of what exact ; implications this has for whether the server has ; processed the SEND or not. (stomp-disconnect s)
2.3 Subscribing to an exchange
This example uses RabbitMQ’s AMQP "wildcards" to subscribe to all messages travelling through the "amq.rabbitmq.log" exchange.
(require (planet tonyg/stomp)) (define s (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/")) (stomp-subscribe s "/exchange/amq.rabbitmq.log/#" "my-subscription") (let loop () (let ((m (stomp-next-message s "my-subscription"))) (pretty-print m) (loop)))
3 API
(require (planet tonyg/stomp:2:=1)) |
struct
(struct stomp-frame (command headers body) #:transparent) command : string? headers : (listof (list symbol? string?)) body : (or bytes? #f)
command —
The STOMP command part of the frame. For frames received from the server, this will usually be "MESSAGE". Frames sent to the server are usually constructed using the procedures described below (in particular stomp-send). headers —
The STOMP headers sent or received with the frame. See the STOMP specification for details of the meaning of various headers. body —
The body sent or received with the frame. This is entirely application-specific: STOMP makes no restrictions on the length or format of the body part of a frame. Note that it is a byte vector, however: make sure to use string->bytes/utf-8 and bytes->string/utf-8 as appropriate.
procedure
(stomp-frame-header frame header [ default-value]) → any? frame : stomp-frame? header : symbol? default-value : any? = #f
procedure
(stomp-connect hostname [ login passcode virtual-host port-number]) → stomp-session? hostname : string? login : (or string? #f) = #f passcode : (or string? #f) = #f virtual-host : string? = hostname
port-number :
(and/c exact-nonnegative-integer? (integer-in 0 65535)) = 61613
> (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/")
procedure
(stomp-disconnect session) → void?
session : stomp-session?
procedure
(stomp-disconnect/abrupt session) → void?
session : stomp-session?
procedure
(stomp-flush session) → void?
session : stomp-session?
The following example demonstrates the use of a receipt request with a SEND operation:
> (call-with-receipt session (lambda (receipt) (stomp-send session "/queue/a" (string->bytes/utf-8 "some message body") `((receipt ,receipt)))))
procedure
(stomp-send-command session command [ headers body]) → void? session : stomp-session? command : string? headers : (listof (list symbol? string?)) = '() body : (or bytes? #f) = #f
Note that this is a low-level way of sending commands to the server: better to use stomp-send, stomp-subscribe, stomp-ack-message etc.
procedure
→ (or stomp-frame? eof-object? #f) session : stomp-session? block? : boolean? = #t
procedure
(stomp-next-frame/filter session predicate [ block?]) → (or stomp-frame? eof-object? #f) session : stomp-session? predicate : (-> stomp-frame? boolean?) block? : boolean? = #t
procedure
(stomp-next-message session subscription-id [ block?]) → (or stomp-frame? eof-object? #f) session : stomp-session? subscription-id : string? block? : boolean? = #t
procedure
(stomp-send session destination body [headers]) → void?
session : stomp-session? destination : string? body : (or bytes? #f) headers : (listof (list symbol? string?)) = '()
This is the procedure you will want to use to actually publish messages to the STOMP server.
procedure
(stomp-send/flush session destination body [ headers]) → void? session : stomp-session? destination : string? body : (or bytes? #f) headers : (listof (list symbol? string?)) = '()
procedure
(stomp-subscribe session destination subscription-id ack-mode [ headers]) → void? session : stomp-session? destination : string? subscription-id : string? ack-mode : (or 'auto 'client 'client-individual) headers : (listof (list symbol? string?)) = '()
'auto —
The server will not expect any ACK frames in response to MESSAGEs it sends. 'client —
The server will expect ACK frames, and will interpret an acknowledgement of message ID m to mean that message and all preceding messages. 'client-individual —
The server will expect ACK frames, but will interpret each such frame as acknowledging only the message ID mentioned within it.
Proceeds without waiting for a reply. To wait for a reply, supply a receipt header; see call-with-receipt.
procedure
(stomp-ack session subscription-id message-id [ headers]) → void? session : stomp-session? subscription-id : string? message-id : string? headers : (listof (list symbol? string?)) = '()
Use this procedure or stomp-ack-message to acknowledge messages received via a call to stomp-subscribe where ack-mode was either 'client or 'client-individual.
procedure
(stomp-ack-message session message [headers]) → void?
session : stomp-session? message : stomp-frame? headers : (listof (list symbol? string?)) = '()
procedure
(stomp-nack session subscription-id message-id [ headers]) → void? session : stomp-session? subscription-id : string? message-id : string? headers : (listof (list symbol? string?)) = '()
procedure
(stomp-begin session transaction [headers]) → void?
session : stomp-session? transaction : string? headers : (listof (list symbol? string?)) = '()
procedure
(stomp-commit session transaction [headers]) → void?
session : stomp-session? transaction : string? headers : (listof (list symbol? string?)) = '()
procedure
(stomp-abort session transaction [headers]) → void?
session : stomp-session? transaction : string? headers : (listof (list symbol? string?)) = '()
Start, commit, or abort a transaction, respectively. Transaction names are managed by the client. See the STOMP specification for BEGIN, COMMIT and ABORT for more information on how transaction names are used.
The procedure call-with-stomp-transaction abstracts away from some of the detail of managing transactions for you.
struct
(struct stomp-session (input output id server-info buffer) #:transparent) input : input-port? output : output-port? id : (or string? #f) server-info : (or string? #f) buffer : queue?