#lang scheme
(require (prefix-in channel: "channel.ss"))
(require (prefix-in connection: "connection.ss"))
(require (prefix-in session: "session.ss"))
(require (prefix-in dest: "destination.ss"))
(require (only-in "parse.ss" headers/c))
(require (prefix-in log: (planet synx/log)))
(require (prefix-in uri: "uri.ss"))
(require net/url)
(define connection-idle 10)
(define dest-sessions (make-immutable-hash null))
(define proxy (make-parameter (list "localhost" 3128)))
(define-struct session (request response dead))
(define (not-dead? session)
(sync/timeout 0 (session-dead session)))
(define (connect method uri headers)
(let* ([maybe-dest (dest:parse-uri uri)]
[dest (if (and
(not (dest:ssl? maybe-dest))
(proxy))
(apply dest:make (proxy))
maybe-dest)]
[uri (if (proxy) uri (uri:extract-path uri))])
(let ([sessions (filter not-dead? (dict-ref dest-sessions dest (λ () null)))]
[value (list method uri headers)])
(set! dest-sessions (dict-set dest-sessions dest sessions))
(call-with-escape-continuation
(λ (return)
(for-each
(λ (session)
(channel:putting
(session-request session)
(λ (request)
(let ([result (sync/timeout 0.001 (channel-put-evt (session-request session) value) (session-dead session))])
(when (not (eq? result (session-dead session)))
(return (session-request session) (session-response session)))))))
sessions)
(let ([session (manage-connection (connection:new dest))])
(set! dest-sessions
(dict-set dest-sessions dest (cons session sessions)))
(channel:putting
(session-request session)
(λ (request)
(apply request value)))
(return (session-request session) (session-response session))))))))
(define (manage-connection connection)
(local
[(define request-channel (make-channel))
(define response-channel (make-channel))
(define request-thread
(thread
(λ ()
(channel:putting
response-channel
(λ (respond)
(let next-session ()
(let ([value (sync/timeout connection-idle request-channel)])
(when value
(when (exn? value) (error "Could not get request" value))
(let-values ([(method uri headers) (apply values value)])
(when (connection:closed? connection)
(set! connection (connection:reconnect connection)))
(session:handle-request method uri headers request-channel (connection:output connection))
(thread-send response-thread (list method (connection:input connection)))
(next-session))))))))))
(define response-thread
(thread
(λ ()
(call-with-exception-handler
(λ (e) (kill-thread request-thread))
(λ ()
(let ([ready (thread-receive-evt)])
(channel:putting
response-channel
(λ (respond)
(let next-session ()
(when (sync/timeout connection-idle ready)
(let-values ([(method input) (apply values (thread-receive))])
(when (port-closed? input)
(error "AAAAAAAAA"))
(session:handle-response method respond response-channel input)
(let ([continue? (not (port-closed? input))])
(when continue?
(next-session)))))))))))
(kill-thread request-thread))))]
(make-session request-channel response-channel (thread-dead-evt response-thread))))
(provide/contract
[connect (->
symbol?
url?
headers/c
(values channel? channel?))]
[proxy (->* () ((list/c string? integer?)) (list/c string? integer?))])