STOMP
1 Introduction
1.1 Destinations
2 Changes
3 Examples
3.1 Sending a message
3.2 Sending a message, with a receipt
3.3 Subscribing to an exchange
4 API
exn:  stomp
stomp-frame
stomp-frame-header
stomp-connect
stomp-disconnect
stomp-disconnect/  abrupt
stomp-flush
stomp-message-id
wait-for-receipt
call-with-receipt
stomp-send-command
stomp-next-frame
stomp-next-frame/  filter
stomp-next-message
stomp-send
stomp-send/  flush
stomp-subscribe
stomp-unsubscribe
stomp-ack
stomp-ack-message
stomp-nack
stomp-begin
stomp-commit
stomp-abort
call-with-stomp-transaction
stomp-session
6.0.0.4

STOMP

Tony Garnock-Jones <tonygarnockjones@gmail.com>

    1 Introduction

      1.1 Destinations

    2 Changes

    3 Examples

      3.1 Sending a message

      3.2 Sending a message, with a receipt

      3.3 Subscribing to an exchange

    4 API

If you find that this library lacks some feature you need, or you have a suggestion for improving it, please don’t hesitate to get in touch with me!

1 Introduction

This library implements the STOMP protocol, both the codec for STOMP frames and the session protocol used by clients.

The STOMP specification includes a lot of important information that’s needed to make sense of the definitions below, including such things as

1.1 Destinations

When using RabbitMQ with its STOMP plugin as your STOMP server, the available destinations are

ActiveMQ and other STOMP message brokers have different destination name schemata and routing behaviours.

2 Changes

Version 3.0 of this library changes the interface to many of the main API procedures, generally relying more on keyword arguments and less on positional arguments.

3 Examples

In the examples below, I’ll make use of the RabbitMQ demonstration broker service.

3.1 Sending a message

(require (planet tonyg/stomp))
(define s (stomp-connect "dev.rabbitmq.com"
                         #:login "guest"
                         #:passcode "guest"
                         #:virtual-host "/"))
(stomp-send s "/exchange/amq.rabbitmq.log/info"
            (string->bytes/utf-8 "Hello world, from Racket!"))
(stomp-disconnect s)

3.2 Sending a message, with a receipt

(require (planet tonyg/stomp))
(define s (stomp-connect "dev.rabbitmq.com"
                         #:login "guest"
                         #:passcode "guest"
                         #:virtual-host "/"))
(call-with-receipt s
 (lambda (receipt)
   (stomp-send s "/exchange/amq.rabbitmq.log/info"
               (string->bytes/utf-8 "Hello world, from Racket!")
               #:headers `((receipt ,receipt)))))
; At this point, we know the server has received our
; SEND, because we have received its RECEIPT frame. See
; the STOMP specification for details of what exact
; implications this has for whether the server has
; processed the SEND or not.
(stomp-disconnect s)

3.3 Subscribing to an exchange

This example uses RabbitMQ’s AMQP "wildcards" to subscribe to all messages travelling through the "amq.rabbitmq.log" exchange.

(require (planet tonyg/stomp))
(define s (stomp-connect "dev.rabbitmq.com"
                         #:login "guest"
                         #:passcode "guest"
                         #:virtual-host "/"))
(stomp-subscribe s "/exchange/amq.rabbitmq.log/#" "my-subscription")
(let loop ()
  (let ((m (stomp-next-message s "my-subscription")))
    (pretty-print m)
    (loop)))

4 API

 (require (planet tonyg/stomp:3:=0)) package: base

struct

(struct exn:stomp exn:fail (frame)
    #:transparent)
  frame : stomp-frame?
Represents a STOMP error. The frame is either an ERROR frame received from the server, or a frame used locally that is problematic in some way.

struct

(struct stomp-frame (command headers body)
    #:transparent)
  command : string?
  headers : (listof (list symbol? string?))
  body : (or bytes? #f)
Represents a STOMP frame, with its three parts:

procedure

(stomp-frame-header frame    
  header    
  [default-value])  any?
  frame : stomp-frame?
  header : symbol?
  default-value : any? = #f
Convenience function for extracting a single header from a STOMP frame. If a header named header is present, its (string) value is returned; otherwise, default-value is returned.

procedure

(stomp-connect hostname 
  [#:login login 
  #:passcode passcode 
  #:virtual-host virtual-host 
  #:port-number port-number 
  #:headers headers 
  #:request-versions request-versions]) 
  stomp-session?
  hostname : string?
  login : (or string? #f) = #f
  passcode : (or string? #f) = #f
  virtual-host : string? = hostname
  port-number : 
(and/c exact-nonnegative-integer?
       (integer-in 0 65535))
 = 61613
  headers : (listof (list symbol? string?)) = '()
  request-versions : (listof string?) = '("1.1")
Opens a STOMP connection and session to the given hostname at TCP port port-number. If login and/or passcode are supplied, they are sent during connection negotiation. (Note: they are sent in the clear!) If virtual-host is not specified, the value of hostname is used. Note that RabbitMQ’s default virtual host is "/", so to make a connection to the demo STOMP server hosted at dev.rabbitmq.com, you would use:

> (stomp-connect "dev.rabbitmq.com"
                 #:login "guest"
                 #:passcode "guest"
                 #:virtual-host "/")

Any headers given in headers are included in the CONNECT frame.

The optional request-versions argument should be a list of strings indicating the STOMP protocol versions to ask for when negotiating with the server. If omitted, it defaults to asking for STOMP version 1.1. The server will choose a protocol variant that it supports from this list. You can check the version that the server chose using stomp-session-version. If the server supports none of the requested versions, it should fall back to STOMP version 1.0. To force the use of STOMP version 1.0, pass in '() as request-versions.

procedure

(stomp-disconnect session    
  [#:headers headers])  void?
  session : stomp-session?
  headers : (listof (list symbol? string?)) = '()
Cleanly disconnects a STOMP session, sending a DISCONNECT frame and waiting for receipt of an acknowledgement before closing the session’s underlying socket.

procedure

(stomp-disconnect/abrupt session)  void?

  session : stomp-session?
Abruptly disconnects a STOMP session, closing the socket without sending a DISCONNECT (or any other) frame.

procedure

(stomp-flush session)  void?

  session : stomp-session?
Outbound frames will frequently be buffered to improve performance. Automatic buffer flushes happen every time a frame is read from the connection, so in many cases manual flushing is not required, but every now and then (especially during development and testing) it can be important to force all queued frames to be sent down the socket. This is what stomp-flush is for.

procedure

(stomp-message-id frame)  (or string? #f)

  frame : stomp-frame?
Extracts the message-id header from the given frame (usually a MESSAGE frame), if any. Returns #f if no such header exists within the frame.

procedure

(wait-for-receipt session receipt)  void?

  session : stomp-session?
  receipt : string?
Waits for the server to send a RECEIPT frame with a receipt-id header matching the given receipt value. Returns an end-of-file object if the connection was closed before such a receipt was received.

procedure

(call-with-receipt session proc)  any?

  session : stomp-session?
  proc : (-> string? any?)
Creates a fresh receipt string (of the form "R1234") and passes it to proc, which should use it in a receipt header of a frame sent to the server. After proc returns, this function calls wait-for-receipt to wait for the server’s acknowledgement. The result of proc is finally returned as the result of the whole call.

The following example demonstrates the use of a receipt request with a SEND operation:

> (call-with-receipt session
   (lambda (receipt)
     (stomp-send session
                 "/queue/a"
                 (string->bytes/utf-8 "some message body")
                 #:headers `((receipt ,receipt)))))

procedure

(stomp-send-command session 
  command 
  [#:headers headers 
  #:body body 
  #:use-content-length use-content-length]) 
  void?
  session : stomp-session?
  command : string?
  headers : (listof (list symbol? string?)) = '()
  body : (or bytes? #f) = #f
  use-content-length : (or 'default 'always 'never) = 'default
Sends a STOMP frame with the given command, headers and optional body to the server. See call-with-receipt for information on how to wait for an acknowledgement-of-receipt from the server.

Note that this is a low-level way of sending commands to the server: better to use stomp-send, stomp-subscribe, stomp-ack-message etc.

If you are working with a STOMP server such as ActiveMQ that interprets the presence or absence of a content-length header as an indicator of the content type, you can use the use-content-length parameter to override the default behaviour. Setting it to 'default causes the header to be generated whenever body is non-empty. Setting it to 'always causes it to always be generated, and setting it to 'never causes it to never be generated. You should omit use-content-length unless you are certain you need to work with a broken server.

procedure

(stomp-next-frame session [block?])

  (or stomp-frame? eof-object? #f)
  session : stomp-session?
  block? : boolean? = #t
Retrieves the next frame from the server. If no frames are in the session’s buffer of waiting frames already received, returns #f if block? is #f and waits for a frame to arrive otherwise. If the connection closes before a frame arrives, returns an end-of-file object. Otherwise, returns a stomp-frame structure.

procedure

(stomp-next-frame/filter session 
  predicate 
  [block?]) 
  (or stomp-frame? eof-object? #f)
  session : stomp-session?
  predicate : (-> stomp-frame? boolean?)
  block? : boolean? = #t
As stomp-next-frame, except returns the first frame that matches predicate, if any. If no match is found in the buffer and block? is #f, returns #f ; otherwise, waits for a match to arrive, buffering non-matching frames as it goes. Never reorders frames in the buffer.

procedure

(stomp-next-message session 
  subscription-id 
  [block?]) 
  (or stomp-frame? eof-object? #f)
  session : stomp-session?
  subscription-id : string?
  block? : boolean? = #t
Uses stomp-next-frame/filter to retrieve the next available MESSAGE frame that has a subscription header matching subscription-id. The block? argument acts as for stomp-next-frame/filter. Returns the first matching MESSAGE frame, end-of-file if the connection closed, or #f if block? was #f and no matching MESSAGE was available in the session’s buffer.

procedure

(stomp-send session    
  destination    
  body    
  [#:headers headers    
  #:use-content-length use-content-length])  void?
  session : stomp-session?
  destination : string?
  body : (or bytes? #f)
  headers : (listof (list symbol? string?)) = '()
  use-content-length : (or 'default 'always 'never) = 'default
Sends a SEND frame to the server with the given destination, other headers, and optional body. See call-with-receipt for information on getting acknowledgements back from the server.

This is the procedure you will want to use to actually publish messages to the STOMP server.

In some cases, you may need to use the use-content-length parameter. See stomp-send-command for details. You should omit this parameter unless you are certain that you need to set it.

procedure

(stomp-send/flush session    
  destination    
  body    
  [#:headers headers])  void?
  session : stomp-session?
  destination : string?
  body : (or bytes? #f)
  headers : (listof (list symbol? string?)) = '()
Just like stomp-send, but calls stomp-flush as it returns.

procedure

(stomp-subscribe session    
  destination    
  subscription-id    
  #:ack-mode ack-mode    
  [#:headers headers])  void?
  session : stomp-session?
  destination : string?
  subscription-id : string?
  ack-mode : (or 'auto 'client 'client-individual)
  headers : (listof (list symbol? string?)) = '()
Sends a SUBSCRIBE frame to the server. The destination is the name of the message source to subscribe to (called "destination" because that’s how message senders think of it, presumably). The subscription-id is a session-unique string identifying this subscription. The subscription-id will be sent by the server in MESSAGE frames that result from this SUBSCRIBE operation. The ack-mode parameter must be one of the following:

Proceeds without waiting for a reply. To wait for a reply, supply a receipt header; see call-with-receipt.

procedure

(stomp-unsubscribe session    
  subscription-id    
  [#:headers headers])  void?
  session : stomp-session?
  subscription-id : string?
  headers : (listof (list symbol? string?)) = '()
Sends an UNSUBSCRIBE frame to the server, to cancel an earlier subscription.

procedure

(stomp-ack session    
  subscription-id    
  message-id    
  [#:headers headers])  void?
  session : stomp-session?
  subscription-id : string?
  message-id : string?
  headers : (listof (list symbol? string?)) = '()
Sends an ACK frame in response to some previous MESSAGE received from the server. The subscription-id and message-id should match the subscription and message-id headers from the MESSAGE being responded to, respectively.

Use this procedure or stomp-ack-message to acknowledge messages received via a call to stomp-subscribe where ack-mode was either 'client or 'client-individual.

procedure

(stomp-ack-message session    
  message    
  [#:headers headers])  void?
  session : stomp-session?
  message : stomp-frame?
  headers : (listof (list symbol? string?)) = '()
Convenience function that extracts the subscription and message-id headers from the message and delegates to stomp-ack.

procedure

(stomp-nack session    
  subscription-id    
  message-id    
  [#:headers headers])  void?
  session : stomp-session?
  subscription-id : string?
  message-id : string?
  headers : (listof (list symbol? string?)) = '()
Sends a NACK frame to the server. See the STOMP specification of NACK for more information. Use with caution!

procedure

(stomp-begin session    
  transaction    
  [#:headers headers])  void?
  session : stomp-session?
  transaction : string?
  headers : (listof (list symbol? string?)) = '()

procedure

(stomp-commit session    
  transaction    
  [#:headers headers])  void?
  session : stomp-session?
  transaction : string?
  headers : (listof (list symbol? string?)) = '()

procedure

(stomp-abort session    
  transaction    
  [#:headers headers])  void?
  session : stomp-session?
  transaction : string?
  headers : (listof (list symbol? string?)) = '()

Start, commit, or abort a transaction, respectively. Transaction names are managed by the client. See the STOMP specification for BEGIN, COMMIT and ABORT for more information on how transaction names are used.

The procedure call-with-stomp-transaction abstracts away from some of the detail of managing transactions for you.

procedure

(call-with-stomp-transaction session proc)  any?

  session : stomp-session?
  proc : (-> string? any)
Creates a fresh transaction name (of the form "Tx1234"), sends a BEGIN frame, calls proc with the transaction name as its argument, and sends a COMMIT frame. If proc terminates normally, the result of proc becomes the result of the whole call. If it terminates with an exception, an ABORT frame is sent to the server before the exception propagates out of the call to call-with-stomp-transaction.

struct

(struct stomp-session (input output id server-info version buffer)
    #:transparent)
  input : input-port?
  output : output-port?
  id : (or string? #f)
  server-info : (or string? #f)
  version : string?
  buffer : queue?
Represents a STOMP client session. The input and output represent the socket connection to the server. The id is the session ID, as decided by the server. The server-info is ad-hoc server information, if any was sent during connection setup. The version is the protocol version number, as decided by the server. The buffer is a queue of received frames that have not yet been processed.