#lang scheme/base
(require (planet bzlib/base)
"memcached.ss"
)
(define-struct memcached/dht-client (clients))
(define (djb2-hash key)
(define (convert key)
(cond ((string? key) (convert (string->bytes/utf-8 key)))
((bytes? key) (bytes->list key))
((symbol? key) (convert (symbol->string key)))))
(define (helper bytes hash)
(cond ((null? bytes) hash)
(else
(helper (cdr bytes) (+ (* hash 33) (car bytes))))))
(helper (convert key) 5381))
(define (memcached/dht-hash client key)
(remainder (djb2-hash key) (vector-length (memcached/dht-client-clients client))))
(define (memcached/dht-target client key)
(vector-ref (memcached/dht-client-clients client)
(memcached/dht-hash client key)))
(define (memcached/dht-set! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
(memcached-set! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))
(define (memcached/dht-add! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
(memcached-add! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))
(define (memcached/dht-replace! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
(memcached-replace! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))
(define (memcached/dht-append! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
(memcached-append! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))
(define (memcached/dht-prepend! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
(memcached-prepend! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))
(define (memcached/dht-cas! client key value cas #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
(memcached-cas! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))
(define (group alist)
(foldl (lambda (kv interim)
(if-it (assoc (car kv) interim) (cons (cons (car it) (cons (cdr kv) (cdr it)))
(filter (lambda (kv)
(not (equal? it kv))) interim))
(cons (list (car kv) (cdr kv)) interim)))
'()
alist))
(define (memcached/dht-retrieve* type client keys)
(apply append (map (lambda (kv)
(apply type (car kv) (cdr kv)))
(group (map cons
(map (lambda (key)
(memcached/dht-target client key))
keys)
keys)))))
(define (memcached/dht-get client key . keys)
(memcached/dht-retrieve* memcached-get client (cons key keys)))
(define (memcached/dht-gets client key . keys)
(memcached/dht-retrieve* memcached-gets client (cons key keys)))
(define (memcached/dht-delete! client key (delay 0) (noreply? #f))
(memcached-delete! (memcached/dht-target client key) key delay noreply?))
(define (memcached/dht-incr! client key val (noreply? #f))
(memcached-incr! (memcached/dht-target client key) key val noreply?))
(define (memcached/dht-decr! client key val (noreply? #f))
(memcached-decr! (memcached/dht-target client key) key val noreply?))
(define (memcached/dht-flush-all! client (interval 10) (noreply? #f))
(for-each (lambda (client i)
(memcached-flush-all! client (* interval i) noreply?))
(vector->list (memcached/dht-client-clients client))
(build-list (vector-length (memcached/dht-client-clients client)) add1)))
(define (memcached/dht-connect host/port . rest)
(make-memcached/dht-client (list->vector (map (lambda (h/p)
(apply memcached-connect
(cond ((pair? h/p)
(list (car h/p) (cdr h/p)))
((string? h/p)
(list h/p 11211))
((number? h/p)
(list "localhost" 11211))
(else (error 'memcached-connect "Invalid host/port ~a" h/p)))))
(cons host/port rest)))))
(define (memcached/dht-disconnect client)
(for-each memcached-disconnect (vector->list (memcached/dht-client-clients client))))
(define storage-api/c
(->* (memcached/dht-client? key? bytes?)
(#:flags flags? #:exp-time exact-nonnegative-integer?
#:noreply? boolean?)
any))
(provide/contract
(memcached/dht-connect (->* ((or/c string? number? (cons/c string? number?)))
()
#:rest (listof (or/c string? number? (cons/c string? number?)))
memcached/dht-client?))
(memcached/dht-disconnect (-> memcached/dht-client? any))
(memcached/dht-set! storage-api/c)
(memcached/dht-add! storage-api/c)
(memcached/dht-replace! storage-api/c)
(memcached/dht-append! storage-api/c)
(memcached/dht-prepend! storage-api/c)
(memcached/dht-cas! (->* (memcached/dht-client? key? bytes? exact-nonnegative-integer?)
(#:flags flags? #:exp-time exact-nonnegative-integer?
#:noreply? boolean?)
any))
(memcached/dht-get (->* (memcached/dht-client? key?)
()
#:rest (listof key?)
any))
(memcached/dht-gets (->* (memcached/dht-client? key?)
()
#:rest (listof key?)
any))
(memcached/dht-delete! (->* (memcached/dht-client? key?)
(exact-nonnegative-integer? boolean?)
any))
(memcached/dht-incr! (->* (memcached/dht-client? key? bytes?)
(boolean?)
any))
(memcached/dht-decr! (->* (memcached/dht-client? key? bytes?)
(boolean?)
any))
(memcached/dht-flush-all! (->* (memcached/dht-client?)
(exact-nonnegative-integer? boolean?)
any))
)