pool.ss
#lang scheme

(require (prefix-in channel: "channel.ss"))

(require (prefix-in connection: "connection.ss"))
(require (prefix-in session: "session.ss"))
(require (prefix-in dest: "destination.ss"))
(require (only-in "parse.ss" headers/c))

(require (prefix-in log: (planet synx/log)))

(require (prefix-in uri: "uri.ss"))

(require net/url)

(define connection-idle 10)

(define dest-sessions (make-immutable-hash null))
(define proxy (make-parameter (list "localhost" 3128)))
;(define proxy (make-parameter #f))

(define-struct session (request response dead))

(define (not-dead? session)
  (sync/timeout 0 (session-dead session)))

(define (connect method uri headers)
  (let* ([maybe-dest (dest:parse-uri uri)]
         [dest (if (and
                    ; for now, SSL can't be proxied
                    ; really, with end-to-end encryption,
                    ; can SSL ever be proxied?
                    (not (dest:ssl? maybe-dest))
                    (proxy))
                   (apply dest:make (proxy))
                   maybe-dest)]
        [uri (if (proxy) uri (uri:extract-path uri))])
    (let ([sessions (filter not-dead? (dict-ref dest-sessions dest (λ () null)))]
          [value (list method uri headers)])
      (set! dest-sessions (dict-set dest-sessions dest sessions))
      (call-with-escape-continuation
       (λ (return)
         (for-each 
          (λ (session)
            (channel:putting 
             (session-request session)
             (λ (request)
               (let ([result (sync/timeout 0.001 (channel-put-evt (session-request session) value) (session-dead session))])
                 (when (not (eq? result (session-dead session)))
                   (return (session-request session) (session-response session)))))))
           sessions)
         ; make a new one if that fails...
;         (log:log "Connecting to ~s" dest)
         (let ([session (manage-connection (connection:new dest))])
           (set! dest-sessions
                 (dict-set dest-sessions dest (cons session sessions)))
           (channel:putting
            (session-request session)
            (λ (request)
              (apply request value)))
           (return (session-request session) (session-response session))))))))

; pipeline... need two threads, one piping requests up, one feeding responses down
; request -> thread A -> send request -> thread B -> receive response -> response

(define (manage-connection connection)
  (local
    [(define request-channel (make-channel))
     (define response-channel (make-channel))
     (define request-thread
       (thread
        (λ ()
          (channel:putting
           response-channel
           (λ (respond)
             (let next-session ()
               (let ([value (sync/timeout connection-idle request-channel)])
                 (when value
                   (when (exn? value) (error "Could not get request" value))
                   (let-values ([(method uri headers) (apply values value)])
;                     (log:log "Got ~s ~s ~s c:~s" method uri headers (connection:closed? connection))
                     (when (connection:closed? connection)
                       (set! connection (connection:reconnect connection)))
                     (session:handle-request method uri headers request-channel (connection:output connection))
                     (thread-send response-thread (list method (connection:input connection)))
                     (next-session))))))))))
     (define response-thread
       (thread
        (λ ()
          (call-with-exception-handler
           (λ (e) (kill-thread request-thread))
           (λ ()
             (let ([ready (thread-receive-evt)])
               (channel:putting
                response-channel
                (λ (respond)
                  (let next-session ()
                    (when (sync/timeout connection-idle ready)
                      (let-values ([(method input) (apply values (thread-receive))])
                        (when (port-closed? input)
                          (error "AAAAAAAAA"))
                        (session:handle-response method respond response-channel input)
                        (let ([continue? (not (port-closed? input))])
                          (when continue?
                            (next-session)))))))))))
          (kill-thread request-thread))))]
    (make-session request-channel response-channel (thread-dead-evt response-thread))))

(provide/contract
 [connect (->
           symbol? 
           url?
           headers/c 
           (values channel? channel?))]
 [proxy (->* () ((list/c string? integer?)) (list/c string? integer?))])