#lang scheme/base
(require (prefix-in sqlite: (planet jaymccarthy/sqlite:4))
(prefix-in log: (planet synx/log))
(planet synx/util/unwind-protect)
(prefix-in finalize: (planet synx/util/finalize))
scheme/class
scheme/contract
scheme/match)
(define (remote-caller)
(define channel (make-channel))
(thread
(λ ()
(let loop ()
(with-handlers
((exn:fail?
(λ (e)
(log:error (exn-message e)))))
(let ((result (thread-receive)))
(match result
((list return func args ...)
(call-with-values
(λ () (apply func args))
return))
(else
(log:error "Can't call remote ~s?" result)))))
(loop)))))
(define-syntax define/remote
(syntax-rules ()
((define/remote define/how remote (name arg args ... . rest) body ...)
(begin
(define (direct arg args ... . rest)
body ...)
(define return-channel (make-channel))
(define (return . ret-args)
(channel-put return-channel ret-args))
(define/how (name arg args ... . rest)
(if (eq? (current-thread) remote)
(apply direct arg args ... rest)
(begin
(thread-send remote (list* return direct arg args ... rest))
(apply values (channel-get return-channel)))))))
((define/remote define/how remote (name) body ...)
(begin
(define direct
(λ () body ...))
(define return-channel (make-channel))
(define (return . ret-args)
(channel-put return-channel ret-args))
(define/how (name)
(if (eq? (current-thread) remote)
(direct)
(begin
(thread-send remote (list return direct))
(apply values (channel-get return-channel)))))))
((define/remote define/how remote (name . rest) body ...)
(begin
(define (direct . rest)
body ...)
(define return-channel (make-channel))
(define (return . ret-args)
(channel-put return-channel ret-args))
(define/how (name . rest)
(if (eq? (current-thread) remote)
(apply direct rest)
(begin
(thread-send remote (list* return direct rest))
(apply values (channel-get return-channel)))))))))
(define statement%
(class object%
(init-field get-context sql remote)
(define statement #f)
(define params #f)
(define (get-statement)
(or statement
(begin
(set! statement (sqlite:prepare (get-context) sql))
(when params
(log:info "Loading params ~s ~s" sql params)
(apply sqlite:load-params (get-statement) params))
statement)))
(define (set-params! new-params)
(set! params new-params)
(when statement
(log:info "Preloading params ~s" params)
(apply sqlite:load-params statement params)))
(super-new)
(define/remote define/public remote (load . params)
(set-params! params))
(define/remote define/public remote (finalize)
(when statement
(when (sqlite:open-statement? statement)
(sqlite:finalize statement))
(set! statement #f)))
(define (with-resetting params next)
(set-params! params)
(dynamic-wind
do-reset
next
do-reset))
(define/remote define remote (do-reset)
(when statement
(sqlite:reset statement)))
(define/public (reset)
(do-reset))
(define/remote define/public remote (for-each proc . params)
(with-resetting
params
(λ ()
(let loop ()
(log:info "foreach Stepping")
(let ([row (sqlite:step (get-statement))])
(when row
(apply proc (vector->list row)))
(loop))))))
(define/remote define/public remote (fold proc init . params)
(with-resetting
params
(λ ()
(let loop ([result init])
(let ([row (sqlite:step (get-statement))])
(if row
(loop (proc row result))
result))))))
(define/remote define/public remote (once . params)
(with-resetting
params
(λ () (sqlite:step (get-statement)))))
(define/public (map proc)
(begin0
(reverse
(fold (λ (row result) (cons (apply proc (vector->list row)) result)) null))))))
(define (hash-for-each-value h proc)
(let loop ((i (hash-iterate-first h)))
(when i
(proc (hash-iterate-value h i))
(loop (hash-iterate-next h i)))))
(define connection%
(class object%
(init-field (path ':memory:))
(init (close-delay 5))
(super-new)
(define context #f)
(when (string? path)
(set! path (string->path path)))
(define (get-context)
(or context
(begin
(set! context (sqlite:open path))
context)))
(define remote (remote-caller))
(define statements (make-immutable-hash null))
(define/remote define/public remote (close)
(when context
(hash-for-each
statements
(λ (sql statement)
(send statement finalize)))
(sqlite:close context)
(set! context #f)))
(define/public (clear)
(close)
(set! statements (make-immutable-hash null)))
(when close-delay
(thread
(λ ()
(let loop ()
(sleep close-delay)
(let retry ((retries 0))
(when
(with-handlers
(((λ (e)
(and (< retries 10)
(exn:fail? e)))
(λ (e) #t)))
(close)
#f)
(log:info "Retrying close ~s" retries)
(sleep 1)
(retry (+ retries 1))))
(collect-garbage)
(loop)))))
(define/remote define/public remote (reset)
(when context
(hash-for-each-value
statements
(λ (statement)
(send statement reset)))))
(define/public (prepare sql)
(define (new-statement)
(let ((stmt (make-object statement% get-context sql remote)))
(set! statements (hash-set statements sql stmt))
stmt))
(hash-ref statements sql new-statement))
(define transaction-level 0)
(define transaction-lock (make-semaphore 1))
(define (adjust-transaction type)
(semaphore-wait transaction-lock)
(when (not (eq? type 'begin))
(set! transaction-level (- transaction-level 1)))
(when (= transaction-level 0)
(send (prepare
(case type
((begin) "BEGIN")
((rollback) "ROLLBACK")
(else "END")))
once))
(when (eq? type 'begin)
(set! transaction-level (+ transaction-level 1)))
(semaphore-post transaction-lock))
(define/public (with-transaction body)
(adjust-transaction 'begin)
(begin0
(call-with-exception-handler
(λ (e)
(log:info "rolling back boo ~s" (exn-message e))
(send this reset)
(adjust-transaction 'rollback)
(log:info "rolled")
e)
body)
(adjust-transaction 'end)))
(define (do-clear (myself #f))
(clear))
(finalize:register this do-clear)
(define/public (with-clearing body)
(rewind-protect
body
do-clear))
(define/public (errmsg)
(sqlite:errmsg (get-context)))
(define/public (changes-count)
(sqlite:changes-count context))
(define/public (last-insert)
(sqlite:last-insert-rowid context))
(define/public (map proc sql . params)
(let ([stmt (prepare sql)])
(send/apply stmt load params)
(send stmt map proc)))
(define/public (for-each proc sql . params)
(let ([stmt (prepare sql)])
(send/apply stmt load params)
(send stmt for-each proc)))
(define/public (fold proc init sql . params)
(let ([stmt (prepare sql)])
(send/apply stmt load params)
(send stmt fold proc init)))
(define/public (once sql . params)
(let ([stmt (prepare sql)])
(send/apply stmt once params)))
))
(define-syntax-rule (with-transaction c body ...)
(send c with-transaction (λ () body ...)))
(define-syntax-rule (with-resetting stmt body ...)
(send stmt with-resetting null (λ () body ...)))
(define-syntax-rule (with-clearing c body ...)
(send c with-clearing (λ () body ...)))
(provide connection% with-transaction with-resetting with-clearing)
(define (test (path ':memory:))
(define c (new connection% (path path)))
(send c clear)
(with-clearing
c
(send c once "CREATE TABLE IF NOT EXISTS foo (id INTEGER PRIMARY KEY, bar TEXT)"))
(send (send c prepare "INSERT INTO foo (bar) VALUES (?)") once "42")
(send (send c prepare "SELECT id,bar FROM foo") fold cons null))