1 Introduction
1.1 Destinations
2 Examples
2.1 Sending a message
2.2 Sending a message, with a receipt
2.3 Subscribing to an exchange
3 API
5.3.2.2

STOMP

Tony Garnock-Jones <tonygarnockjones@gmail.com>

    1 Introduction

      1.1 Destinations

    2 Examples

      2.1 Sending a message

      2.2 Sending a message, with a receipt

      2.3 Subscribing to an exchange

    3 API

If you find that this library lacks some feature you need, or you have a suggestion for improving it, please don’t hesitate to get in touch with me!

1 Introduction

This library implements the STOMP protocol, both the codec for STOMP frames and the session protocol used by clients.

The STOMP specification includes a lot of important information that’s needed to make sense of the definitions below, including such things as

1.1 Destinations

When using RabbitMQ with its STOMP plugin as your STOMP server, the available destinations are

ActiveMQ and other STOMP message brokers have different destination name schemata and routing behaviours.

2 Examples

In the examples below, I’ll make use of the RabbitMQ demonstration broker service.

2.1 Sending a message

(require (planet tonyg/stomp))
(define s (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/"))
(stomp-send s "/exchange/amq.rabbitmq.log/info"
            (string->bytes/utf-8 "Hello world, from Racket!"))
(stomp-disconnect s)

2.2 Sending a message, with a receipt

(require (planet tonyg/stomp))
(define s (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/"))
(call-with-receipt s
 (lambda (receipt)
   (stomp-send s "/exchange/amq.rabbitmq.log/info"
               (string->bytes/utf-8 "Hello world, from Racket!")
               `((receipt ,receipt)))))
; At this point, we know the server has received our
; SEND, because we have received its RECEIPT frame. See
; the STOMP specification for details of what exact
; implications this has for whether the server has
; processed the SEND or not.
(stomp-disconnect s)

2.3 Subscribing to an exchange

This example uses RabbitMQ’s AMQP "wildcards" to subscribe to all messages travelling through the "amq.rabbitmq.log" exchange.

(require (planet tonyg/stomp))
(define s (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/"))
(stomp-subscribe s "/exchange/amq.rabbitmq.log/#" "my-subscription")
(let loop ()
  (let ((m (stomp-next-message s "my-subscription")))
    (pretty-print m)
    (loop)))

3 API

struct

(struct exn:stomp exn:fail (frame)
  #:transparent)
  frame : stomp-frame?
Represents a STOMP error. The frame is either an ERROR frame received from the server, or a frame used locally that is problematic in some way.

struct

(struct stomp-frame (command headers body)
  #:transparent)
  command : string?
  headers : (listof (list symbol? string?))
  body : (or bytes? #f)
Represents a STOMP frame, with its three parts:

procedure

(stomp-frame-header frame    
  header    
  [default-value])  any?
  frame : stomp-frame?
  header : symbol?
  default-value : any? = #f
Convenience function for extracting a single header from a STOMP frame. If a header named header is present, its (string) value is returned; otherwise, default-value is returned.

procedure

(stomp-connect hostname    
  [login    
  passcode    
  virtual-host    
  port-number])  stomp-session?
  hostname : string?
  login : (or string? #f) = #f
  passcode : (or string? #f) = #f
  virtual-host : string? = hostname
  port-number : 
(and/c exact-nonnegative-integer?
       (integer-in 0 65535))
 = 61613
Opens a STOMP connection and session to the given hostname at TCP port port-number. If login and/or passcode are supplied, they are sent during connection negotiation. (Note: they are sent in the clear!) If virtual-host is not specified, the value of hostname is used. Note that RabbitMQ’s default virtual host is "/", so to make a connection to the demo STOMP server hosted at dev.rabbitmq.com, you would use:

> (stomp-connect "dev.rabbitmq.com" "guest" "guest" "/")

procedure

(stomp-disconnect session)  void?

  session : stomp-session?
Cleanly disconnects a STOMP session, sending a DISCONNECT frame and waiting for receipt of an acknowledgement before closing the session’s underlying socket.

procedure

(stomp-disconnect/abrupt session)  void?

  session : stomp-session?
Abruptly disconnects a STOMP session, closing the socket without sending a DISCONNECT (or any other) frame.

procedure

(stomp-flush session)  void?

  session : stomp-session?
Outbound frames will frequently be buffered to improve performance. Automatic buffer flushes happen every time a frame is read from the connection, so in many cases manual flushing is not required, but every now and then (especially during development and testing) it can be important to force all queued frames to be sent down the socket. This is what stomp-flush is for.

procedure

(stomp-message-id frame)  (or string? #f)

  frame : stomp-frame?
Extracts the message-id header from the given frame (usually a MESSAGE frame), if any. Returns #f if no such header exists within the frame.

procedure

(wait-for-receipt session receipt)  void?

  session : stomp-session?
  receipt : string?
Waits for the server to send a RECEIPT frame with a receipt-id header matching the given receipt value. Returns an end-of-file object if the connection was closed before such a receipt was received.

procedure

(call-with-receipt session proc)  any?

  session : stomp-session?
  proc : (-> string? any?)
Creates a fresh receipt string (of the form "R1234") and passes it to proc, which should use it in a receipt header of a frame sent to the server. After proc returns, this function calls wait-for-receipt to wait for the server’s acknowledgement. The result of proc is finally returned as the result of the whole call.

The following example demonstrates the use of a receipt request with a SEND operation:

> (call-with-receipt session
   (lambda (receipt)
     (stomp-send session
                 "/queue/a"
                 (string->bytes/utf-8 "some message body")
                 `((receipt ,receipt)))))

procedure

(stomp-send-command session    
  command    
  [headers    
  body])  void?
  session : stomp-session?
  command : string?
  headers : (listof (list symbol? string?)) = '()
  body : (or bytes? #f) = #f
Sends a STOMP frame with the given command, headers and optional body to the server. See call-with-receipt for information on how to wait for an acknowledgement-of-receipt from the server.

Note that this is a low-level way of sending commands to the server: better to use stomp-send, stomp-subscribe, stomp-ack-message etc.

procedure

(stomp-next-frame session [block?])

  (or stomp-frame? eof-object? #f)
  session : stomp-session?
  block? : boolean? = #t
Retrieves the next frame from the server. If no frames are in the session’s buffer of waiting frames already received, returns #f if block? is #f and waits for a frame to arrive otherwise. If the connection closes before a frame arrives, returns an end-of-file object. Otherwise, returns a stomp-frame structure.

procedure

(stomp-next-frame/filter session 
  predicate 
  [block?]) 
  (or stomp-frame? eof-object? #f)
  session : stomp-session?
  predicate : (-> stomp-frame? boolean?)
  block? : boolean? = #t
As stomp-next-frame, except returns the first frame that matches predicate, if any. If no match is found in the buffer and block? is #f, returns #f ; otherwise, waits for a match to arrive, buffering non-matching frames as it goes. Never reorders frames in the buffer.

procedure

(stomp-next-message session 
  subscription-id 
  [block?]) 
  (or stomp-frame? eof-object? #f)
  session : stomp-session?
  subscription-id : string?
  block? : boolean? = #t
Uses stomp-next-frame/filter to retrieve the next available MESSAGE frame that has a subscription header matching subscription-id. The block? argument acts as for stomp-next-frame/filter. Returns the first matching MESSAGE frame, end-of-file if the connection closed, or #f if block? was #f and no matching MESSAGE was available in the session’s buffer.

procedure

(stomp-send session destination body [headers])  void?

  session : stomp-session?
  destination : string?
  body : (or bytes? #f)
  headers : (listof (list symbol? string?)) = '()
Sends a SEND frame to the server with the given destination, other headers, and optional body. See call-with-receipt for information on getting acknowledgements back from the server.

This is the procedure you will want to use to actually publish messages to the STOMP server.

procedure

(stomp-send/flush session    
  destination    
  body    
  [headers])  void?
  session : stomp-session?
  destination : string?
  body : (or bytes? #f)
  headers : (listof (list symbol? string?)) = '()
Just like stomp-send, but calls stomp-flush as it returns.

procedure

(stomp-subscribe session    
  destination    
  subscription-id    
  ack-mode    
  [headers])  void?
  session : stomp-session?
  destination : string?
  subscription-id : string?
  ack-mode : (or 'auto 'client 'client-individual)
  headers : (listof (list symbol? string?)) = '()
Sends a SUBSCRIBE frame to the server. The destination is the name of the message source to subscribe to (called "destination" because that’s how message senders think of it, presumably). The subscription-id is a session-unique string identifying this subscription. The subscription-id will be sent by the server in MESSAGE frames that result from this SUBSCRIBE operation. The ack-mode parameter must be one of the following:

Proceeds without waiting for a reply. To wait for a reply, supply a receipt header; see call-with-receipt.

procedure

(stomp-ack session    
  subscription-id    
  message-id    
  [headers])  void?
  session : stomp-session?
  subscription-id : string?
  message-id : string?
  headers : (listof (list symbol? string?)) = '()
Sends an ACK frame in response to some previous MESSAGE received from the server. The subscription-id and message-id should match the subscription and message-id headers from the MESSAGE being responded to, respectively.

Use this procedure or stomp-ack-message to acknowledge messages received via a call to stomp-subscribe where ack-mode was either 'client or 'client-individual.

procedure

(stomp-ack-message session message [headers])  void?

  session : stomp-session?
  message : stomp-frame?
  headers : (listof (list symbol? string?)) = '()
Convenience function that extracts the subscription and message-id headers from the message and delegates to stomp-ack.

procedure

(stomp-nack session    
  subscription-id    
  message-id    
  [headers])  void?
  session : stomp-session?
  subscription-id : string?
  message-id : string?
  headers : (listof (list symbol? string?)) = '()
Sends a NACK frame to the server. See the STOMP specification of NACK for more information. Use with caution!

procedure

(stomp-begin session transaction [headers])  void?

  session : stomp-session?
  transaction : string?
  headers : (listof (list symbol? string?)) = '()

procedure

(stomp-commit session transaction [headers])  void?

  session : stomp-session?
  transaction : string?
  headers : (listof (list symbol? string?)) = '()

procedure

(stomp-abort session transaction [headers])  void?

  session : stomp-session?
  transaction : string?
  headers : (listof (list symbol? string?)) = '()

Start, commit, or abort a transaction, respectively. Transaction names are managed by the client. See the STOMP specification for BEGIN, COMMIT and ABORT for more information on how transaction names are used.

The procedure call-with-stomp-transaction abstracts away from some of the detail of managing transactions for you.

procedure

(call-with-stomp-transaction session proc)  any?

  session : stomp-session?
  proc : (-> string? any)
Creates a fresh transaction name (of the form "Tx1234"), sends a BEGIN frame, calls proc with the transaction name as its argument, and sends a COMMIT frame. If proc terminates normally, the result of proc becomes the result of the whole call. If it terminates with an exception, an ABORT frame is sent to the server before the exception propagates out of the call to call-with-stomp-transaction.

struct

(struct stomp-session (input output id server-info buffer)
  #:transparent)
  input : input-port?
  output : output-port?
  id : (or string? #f)
  server-info : (or string? #f)
  buffer : queue?
Represents a STOMP client session. The input and output represent the socket connection to the server. The id is the session ID, as decided by the server. The server-info is ad-hoc server information, if any was sent during connection setup. The buffer is a queue of received frames that have not yet been processed.