#lang racket/base
(require racket/contract
racket/match
racket/tcp)
(define-struct/contract redis-server ((in (or/c input-port? #f))
(out (or/c output-port? #f))
(host string?)
(port (integer-in 1 65535)))
#:mutable)
(define-struct (exn:redis exn) ())
(provide redis-server?
redis-connect
redis-query
redis-reply/c
redis-disconnect!
exn:redis?)
(define/contract (redis-connect #:host (host "localhost")
#:port (port 6379))
(->* ()
(#:host string?
#:port (integer-in 1 65535))
redis-server?)
(make-redis-server #f #f host port))
(define/contract (redis-disconnect! server)
(-> redis-server? void?)
(when (not (equal? (redis-server-in server) #f))
(close-input-port (redis-server-in server)))
(when (not (equal? (redis-server-out server) #f))
(close-output-port (redis-server-out server))))
(define (null-char? value)
(and (char? value)
(char=? value #\null)))
(define redis-reply/c (or/c integer?
bytes?
string?
null-char?
(listof (or/c integer?
bytes?
string?
null-char?))))
(define/contract (read-reply in)
(-> input-port? redis-reply/c)
(let ((b (read-bytes 1 in)))
(match b
(#"+" (bytes->string/utf-8 (read-bytes-line in 'return-linefeed)))
(#"-" (raise (make-exn:redis (bytes->string/utf-8
(read-bytes-line in 'return-linefeed))
(current-continuation-marks))))
(#":" (string->number (read-line in 'return-linefeed)))
(#"$" (read-bulk-reply in))
(#"*" (read-multi-bulk-reply in))
(_ (raise (make-exn:redis
(format "unexpected server response: ~a" b)))))))
(define/contract (read-multi-bulk-reply in)
(-> input-port? redis-reply/c)
(let ((nitems (string->number (read-line in 'return-linefeed))))
(if (= -1 nitems)
#\null
(for/list ((i (in-range 0 nitems)))
(read-reply in)))))
(define/contract (read-bulk-reply in)
(-> input-port? redis-reply/c)
(let ((nbytes (string->number (read-line in 'return-linefeed))))
(if (= -1 nbytes)
#\null
(begin0
(read-bytes nbytes in)
(read-bytes-line in 'return-linefeed)))))
(define/contract (format-request command args)
(-> (or/c string? bytes? symbol?)
(listof (or/c string? bytes? symbol? integer?))
bytes?)
(bytes-append #"*"
(string->bytes/utf-8 (number->string (+ 1 (length args))))
#"\r\n"
(format-value command)
(apply bytes-append (map format-value args))))
(define/contract (format-value value)
(-> (or/c string? bytes? symbol? integer?) bytes?)
(cond
((symbol? value)
(let ((bs (string->bytes/utf-8 (symbol->string value))))
(bytes-append #"$"
(string->bytes/utf-8 (number->string (bytes-length bs)))
#"\r\n" bs #"\r\n")))
((string? value)
(let ((bs (string->bytes/utf-8 value)))
(bytes-append #"$"
(string->bytes/utf-8 (number->string (bytes-length bs)))
#"\r\n" bs #"\r\n")))
((bytes? value)
(bytes-append #"$"
(string->bytes/utf-8 (number->string (bytes-length value)))
#"\r\n" value #"\r\n"))
((integer? value)
(let ((bs (string->bytes/utf-8 (number->string value))))
(bytes-append #"$"
(string->bytes/utf-8 (number->string (bytes-length bs)))
#"\r\n" bs #"\r\n")))))
(define/contract (redis-query server command . args)
(->* (redis-server?
(or/c string? bytes? symbol?))
()
#:rest (listof (or/c string? bytes? symbol? integer?))
redis-reply/c)
(define (reconnect)
(let ((host (redis-server-host server))
(port (redis-server-port server)))
(let-values (((in out) (tcp-connect host port)))
(set-redis-server-in! server in)
(set-redis-server-out! server out))))
(define (try-query)
(write-bytes (format-request command args) (redis-server-out server))
(flush-output (redis-server-out server))
(read-reply (redis-server-in server)))
(when (equal? (redis-server-in server) #f)
(reconnect))
(with-handlers ((exn:fail:network? (lambda (e)
(reconnect)
(try-query))))
(try-query)))