#lang scheme
(require (prefix-in channel: "channel.ss"))
(require (prefix-in parse: "parse.ss"))
(require (prefix-in connection: "connection.ss"))
(require (prefix-in log: (planet synx/log)))
(require net/url)
(define (wrap-chunked-input-port input)
(let* ([finished? (port-closed? input)]
[chunk #f]
[read-chunk
(λ (bytes)
(let must-return-something ()
(if chunk
(if (< (bytes-length chunk) (bytes-length bytes))
(begin0
(bytes-length chunk)
(bytes-copy! bytes 0 chunk)
(set! chunk #f))
(begin0
(bytes-length bytes)
(bytes-copy! bytes 0 chunk 0 (bytes-length bytes))
(set! chunk (subbytes chunk (bytes-length bytes)))))
(let* ([line (read-net-line input)]
[chunk-size (string->number line #x10)])
(when (not chunk-size)
(error (format "Um... what is ~s?" line)))
(if (= chunk-size 0) eof
(begin
(set! chunk (make-bytes chunk-size))
(when (eof-object? (read-bytes! chunk input))
(raise-user-error "Connection closed on chunk read"))
(when (eof-object? (read-bytes 2 input)) (raise-user-error "Connection closed on chunk tail"))
(must-return-something)))))))])
(make-input-port/read-to-peek
"chunked input"
(λ (bytes)
(if finished? eof
(let ([result (read-chunk bytes)])
(when (eof-object? result)
(set! finished? #t))
result)))
#f
(λ ()
(when (not finished?)
(close-input-port input))))))
(define (wrap-long-input-port limit port)
(let ([got 0])
(make-input-port/read-to-peek
(format "~a-~a" (object-name port) limit)
(lambda (str)
(let ([count (min (- limit got) (bytes-length str))])
(if (zero? count)
eof
(call-with-exception-handler
(λ (e) (log:log "Erm... count is ~s str is ~s left is ~s" count (bytes-length str) (- limit got)) e)
(λ ()
(let ([n (read-bytes-avail!* str port 0 count)])
(cond
[(eq? n 0) (wrap-evt port (lambda (x) 0))]
[(number? n) (set! got (+ got n)) n]
[(procedure? n) (set! got (add1 got)) n]
[else n])))))))
#f
(lambda ()
(let ([left (- limit got)])
(when (> left 0)
(close-input-port port)))))))
(define (wrap-chunked-output-port output)
(let ([closed? #f])
(make-output-port
"chunked output"
output
(λ (bytes start end buffer? breaks?)
(if closed? 0
(begin
(display (format "~x\r\n" (- end start)) output)
(begin0
(write-bytes bytes output start end) (write-bytes #"\r\n" output)))))
(λ ()
(write-bytes #"0\r\n\r\n" output)
(set! closed? #t)))))
(define (wrap-long-output-port length)
(λ (output)
(make-output-port
"limited output length"
output
(λ (bytes start end buffer? breaks?)
(if (<= length 0) 0
(let ([eend (min (+ length start) end)])
(let ([written (write-bytes bytes output start eend)])
(set! length (- length written))
written))))
(λ ()
(when (> length 0)
(close-output-port output))
(set! length 0)))))
(define (read-net-line input)
(let ([line (read-line input 'return-linefeed)])
(when (eof-object? line)
(raise-user-error "Connection unexpectedly closed"))
line))
(define (examine version headers)
(let ([length
(or (parse:get-header headers "Content-Length") 0)])
(if (< version 1.1)
(values
#f
length
(false? (parse:get-header headers "Keep-Alive")))
(values
(let ([value (parse:get-header headers "Transfer-Encoding")])
(if (and value (findf (λ (i) (equal? i "chunked")) value)) #t #f))
length
(let ([value (parse:get-header headers "Connection")])
(if (and value (findf (λ (i) (equal? i "close")) value)) #t #f))))))
(define (wrap-output chunked? length close-it? port)
(if chunked?
(wrap-chunked-output-port port)
(if close-it?
port
(if (or (not length) (= length 0))
(open-output-nowhere)
(wrap-long-output-port length port)))))
(define (wrap-input chunked? length close-it? port)
(if chunked?
(wrap-chunked-input-port port)
(if close-it?
port
(wrap-long-input-port length port))))
(define (url->proper-string uri)
((λ (s) (if (equal? s "") "/" s))
(url->string uri)))
(define (handle-request method uri request-headers data-channel output)
(channel:putting
data-channel
(λ (respond)
(write-string (format "~a ~a HTTP/1.1\r\n" (string-upcase (symbol->string method)) (url->proper-string uri)) output)
(parse:for-each-raw-header
request-headers
(λ (name raw)
(write-string (parse:join-header name raw) output)
(write-bytes #"\r\n" output)))
(write-bytes #"\r\n" output)
(flush-output output)
(let-values ([(chunked? length close-it?) (examine 1.1 request-headers)])
(when (or (eq? method 'post) (eq? method 'put))
(sync
(let ([output (wrap-output chunked? length close-it? output)])
(respond output)
(eof-evt output))))
(when close-it?
(close-output-port output))))))
(define null-input-port
(make-input-port
"null"
(λ args eof)
(λ args eof)
(λ () (void))))
(define (handle-response method respond channel input)
(let-values
([(version code message) (parse:status-line (read-net-line input))])
(let ([response-headers
(let loop ([headers #f])
(let ([line (read-net-line input)])
(if (or
(eof-object? line)
(equal? line ""))
headers
(loop (parse:parse-header line headers)))))])
(respond version code message response-headers)
(let-values ([(chunked? length close-it?) (examine version response-headers)])
(when (not (eq? method 'head))
(sync
(let ([input (wrap-input chunked? length close-it? input)])
(respond input)
(eof-evt input))))
(when close-it?
(close-input-port input))))))
(provide/contract
[handle-request (-> symbol? url? parse:headers/c channel? output-port? void?)]
[handle-response (-> symbol? procedure? channel? input-port? void?)]
[examine (-> number? parse:headers? (values boolean? (or/c integer? #f) boolean?))]
[wrap-chunked-output-port (-> output-port? output-port?)]
[wrap-long-output-port (-> integer? (-> output-port? output-port?))])