(module p3 mzscheme
(require "p3-msg.ss")
(provide protocol3:new
protocol3:reset
protocol3:close
protocol3:encode
protocol3:flush)
(provide stream:cons
stream:current
stream:next
stream:current+next
stream:done?
stream->list)
(define-struct protocol3 (inport outport stream))
(define (protocol3:new inport outport)
(make-protocol3 inport outport #f))
(define (protocol3:reset protocol)
(let [(mg (protocol3-stream protocol))
(inport (protocol3-inport protocol))]
(let loop [(mg mg)]
(when (and mg (not (stream:done? mg)))
(loop (stream:next mg))))
(let [(new-mg (stream:new protocol))]
(set-protocol3-stream! protocol new-mg)
new-mg)))
(define (protocol3:close protocol)
(close-output-port (protocol3-outport protocol))
(close-input-port (protocol3-inport protocol)))
(define (protocol3:encode protocol message)
(write-message message (protocol3-outport protocol)))
(define (protocol3:flush protocol)
(flush-output (protocol3-outport protocol)))
(define (parse-message protocol)
(parse-server-message (protocol3-inport protocol)))
(define-struct stream (p3 promise done?))
(define (stream:current+next mg)
(force (stream-promise mg)))
(define (stream:current mg)
(let-values [((current next) (force (stream-promise mg)))]
current))
(define (stream:next mg)
(let-values [((current next) (force (stream-promise mg)))]
next))
(define (stream:done? mg)
(stream-done? mg))
(define (stream:new protocol)
(make-stream
protocol
(delay
(let [(next-message (parse-message protocol))]
(values next-message
(if (or (eof-object? next-message)
(end-of-exchange-message? next-message))
(make-stream protocol #f #t)
(stream:new protocol)))))
#f))
(define (stream:cons r mg)
(make-stream
(stream-p3 mg)
(delay (values r mg))
#f))
(define (stream->list mg)
(if (stream:done? mg)
null
(let-values ([(r mg) (stream:current+next mg)])
(cons r (stream->list mg)))))
(define (end-of-exchange-message? msg)
(ReadyForQuery? msg))
)