dht.ss
#lang scheme/base
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; DBD-MEMCACHED - DBI interface to memcached in both single & multi-instance mode.
;;
;; Bonzai Lab, LLC.  All rights reserved.
;;
;; Licensed under LGPL.
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; dht.ss - implementation of a multi-instance memcached (disributed hashtable) client
;; yc 9/23/2009 - first version
(require (planet bzlib/base)
         "memcached.ss"
         )
;; distributed memcached.
;; there can be many different ways of using memcached.
;; one way is to have it built as a huge

(define-struct memcached/dht-client (clients)) 

;; hash algorithm from http://www.cse.yorku.ca/~oz/hash.html
(define (djb2-hash key)
  (define (convert key)
    (cond ((string? key) (convert (string->bytes/utf-8 key)))
          ((bytes? key) (bytes->list key))
          ((symbol? key) (convert (symbol->string key)))))
  (define (helper bytes hash)
    (cond ((null? bytes) hash)
          (else
           (helper (cdr bytes) (+ (* hash 33) (car bytes))))))
  (helper (convert key) 5381))

(define (memcached/dht-hash client key)
  (remainder (djb2-hash key) (vector-length (memcached/dht-client-clients client))))

(define (memcached/dht-target client key)
  (vector-ref (memcached/dht-client-clients client)
              (memcached/dht-hash client key)))

(define (memcached/dht-set! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-set! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-add! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-add! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-replace! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-replace! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-append! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-append! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-prepend! client key value #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-prepend! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (memcached/dht-cas! client key value cas #:flags (flags 0) #:exp-time (exp-time 0) #:noreply? (noreply? #f))
  (memcached-cas! (memcached/dht-target client key) key value #:flags flags #:exp-time exp-time #:noreply? noreply?))

(define (group alist)
  ;; for each alist with the same key - group them together!!
  (foldl (lambda (kv interim)
           (if-it (assoc (car kv) interim) ;; the key already exists...
               (cons (cons (car it) (cons (cdr kv) (cdr it)))
                     (filter (lambda (kv)
                               (not (equal? it kv))) interim))
               (cons (list (car kv) (cdr kv)) interim)))
         '()
         alist))

;; this would be interesting... hmmm....
(define (memcached/dht-retrieve* type client keys)
  ;; first to separate out all of the keys.
  ;; then I want to group them together so I can access them together, rather than separately...
  (apply append (map (lambda (kv)
                       (apply type (car kv) (cdr kv)))
                     (group (map cons  
                                 (map (lambda (key)
                                        (memcached/dht-target client key))
                                      keys)
                                 keys)))))

(define (memcached/dht-get client key . keys)
  (memcached/dht-retrieve* memcached-get client (cons key keys)))

(define (memcached/dht-gets client key . keys)
  (memcached/dht-retrieve* memcached-gets client (cons key keys)))

(define (memcached/dht-delete! client key (delay 0) (noreply? #f))
  (memcached-delete! (memcached/dht-target client key) key delay noreply?))

(define (memcached/dht-incr! client key val (noreply? #f))
  (memcached-incr! (memcached/dht-target client key) key val noreply?))

(define (memcached/dht-decr! client key val (noreply? #f))
  (memcached-decr! (memcached/dht-target client key) key val noreply?))

(define (memcached/dht-flush-all! client (interval 10) (noreply? #f))
  (for-each (lambda (client i)
              (memcached-flush-all! client (* interval i) noreply?))
            (vector->list (memcached/dht-client-clients client))
            (build-list (vector-length (memcached/dht-client-clients client)) add1)))

(define (memcached/dht-connect host/port . rest)
  (make-memcached/dht-client (list->vector (map (lambda (h/p)
                                                  (apply memcached-connect 
                                                         (cond ((pair? h/p)
                                                                (list (car h/p) (cdr h/p)))
                                                               ((string? h/p)
                                                                (list h/p 11211))
                                                               ((number? h/p)
                                                                (list "localhost" 11211))
                                                                (else (error 'memcached-connect "Invalid host/port ~a" h/p)))))
                                                (cons host/port rest)))))

(define (memcached/dht-disconnect client)
  (for-each memcached-disconnect (vector->list (memcached/dht-client-clients client))))

(define storage-api/c
  (->* (memcached/dht-client? key? bytes?)
       (#:flags flags? #:exp-time exact-nonnegative-integer?
                #:noreply? boolean?)
       any))

(provide/contract 
 (memcached/dht-connect (->* ((or/c string? number? (cons/c string? number?)))
                             ()
                             #:rest (listof (or/c string? number? (cons/c string? number?)))
                             memcached/dht-client?))
 (memcached/dht-disconnect (-> memcached/dht-client? any))
 (memcached/dht-set! storage-api/c)
 (memcached/dht-add! storage-api/c)
 (memcached/dht-replace! storage-api/c)
 (memcached/dht-append! storage-api/c)
 (memcached/dht-prepend! storage-api/c)
 (memcached/dht-cas! (->* (memcached/dht-client? key? bytes? exact-nonnegative-integer?)
                          (#:flags flags? #:exp-time exact-nonnegative-integer?
                                   #:noreply? boolean?)
                          any))
 (memcached/dht-get (->* (memcached/dht-client? key?)
                     ()
                     #:rest (listof key?)
                     any))
 (memcached/dht-gets (->* (memcached/dht-client? key?)
                      ()
                      #:rest (listof key?)
                      any))
 (memcached/dht-delete! (->* (memcached/dht-client? key?)
                         (exact-nonnegative-integer? boolean?)
                         any))
 (memcached/dht-incr! (->* (memcached/dht-client? key? bytes?)
                       (boolean?)
                       any))
 (memcached/dht-decr! (->* (memcached/dht-client? key? bytes?)
                       (boolean?)
                       any))
 (memcached/dht-flush-all! (->* (memcached/dht-client?) 
                            (exact-nonnegative-integer? boolean?)
                            any))
 )