#lang scheme (require "unwind-protect.ss") (require (planet synx/displayz)) (require (prefix-in queue: "queue.ss")) (require (prefix-in filedb: "filedb.ss")) (require (prefix-in connect: "connect.ss")) (require net/url net/url-structs) (require web-server/web-server) (require web-server/private/connection-manager) (require web-server/http/request-structs) (define upstream-proxy (list "localhost" 3128)) (define *connection-pool* null) (define (remove-connection input output) (thread (λ () (sync (eof-evt input)) (close-input-port input) (close-output-port output))) (list input output)) (define (get-connection) (sleep 0.001) ; to allow the connection watchers to remove connections (set! *connection-pool* (filter (λ (c) (ormap (λ (p) (not (port-closed? p))) c)) *connection-pool*)) (if (null? *connection-pool*) (let ([c (apply (compose remove-connection tcp-connect) upstream-proxy)]) (set! *connection-pool* (cons c *connection-pool*)) (apply values c)) (apply values (car *connection-pool*)))) (define (call-with-connection proc) (let ([input #f] [output #f]) (set!-values (input output) (get-connection)) (proc input output))) (define (read-net-line input output) (let ([line (read-line input 'return-linefeed)]) (when (eof-object? line) (raise-user-error "Connection unexpectedly closed")) (display line output) (display "\r\n" output) ; read-line...strips it...ohohoho... (flush-output output) line)) (define (copy-close-port input . outputs) (unwind-protect (λ () (apply copy-port input outputs)) (λ () (close-input-port input) (apply close-output-port outputs)))) (define (copy-chunked-port input output . spies) (let loop () (let* ([chunk-size (string->number (read-net-line input output))]) (if (= chunk-size 0) (for-each (λ (spy) (close-output-port spy)) spies) (let ([chunk (make-bytes chunk-size)]) (when (eof-object? (read-bytes! chunk input)) (raise-user-error "Connection closed on chunk read")) (write-bytes chunk output) (for-each (λ (spy) (write-bytes chunk spy)) spies) (when (eof-object? (read-bytes 2 input)) ; pass CRLF (raise-user-error "Connection closed on chunk tail")) (write-bytes "\r\n" output) (flush-output output) (loop)))))) (define (copy-length-port input length output . spies) (display (format "Length starting ~s~n" length)) (when (and length (> length 0)) (let ([buffer (make-bytes 4096)]) (let loop ([left length]) (when (> left 0) (sync input) (display "Input is ready\n") (let ([size (read-bytes-avail! buffer input)]) (display (format "length writing ~s~n" size)) (when (not (eof-object? size)) (for-each (λ (guy) (write-bytes buffer guy 0 size) (flush-output guy)) (list* output spies)) (loop (- left size))))))) (flush-output output)) (display "don\n") (for-each (λ (guy) (close-output-port guy)) spies)) (define parse-header-line (let ([splitter #px"([^:]+)\\s*:\\s*(.*)"]) (λ (line) (display (format "Got header ~s~n" line)) (cdr (regexp-match splitter line))))) (define (headers-hash) (make-immutable-custom-hash (λ (a b) (equal? (string-downcase a) (string-downcase b))) (λ (a) (equal-hash-code (string-downcase a))) (λ (a) (equal-secondary-hash-code (string-downcase a))))) ; headers hash as caseless, but still have case eh. ; maybe just downcase 'em for processing. whatever. (define (parse-headers input output) (let loop ([hash (headers-hash)]) (let ([line (read-net-line input output)]) (if (equal? line "") hash (loop (apply dict-set hash (parse-header-line line))))))) (define parse-status-line (let ([split #rx" "] [front #rx"HTTP/([0-9]\\.[0-9])"]) (λ (line) (let ([bits (regexp-split split line)]) (when (not (>= (length bits) 3)) (raise-user-error "Bad status line ~s must have at least 3 bits!" line)) (let ([version (regexp-match front (car bits))]) (when (not version) (raise-user-error "Bad version ~s" (car bits))) (let ([version (string->number (cadr version))] [code (string->number (cadr bits))]) (values version code (string-join (cddr bits) " ")))))))) (define *redirects* (make-immutable-hash null)) (define (check-for-redirection? code uri headers) (if (and (> code 300) (< code 304)) (let ([location (hash-ref headers "Location" (λ () #f))]) (if location (begin (set! *redirects* (hash-set *redirects* location (url->string uri))) #t) #f)) #f)) (define reset-redirects-later (let ([waiter #f]) (λ () (if waiter #f (set! waiter (thread (λ () (sleep 10) (set! *redirects* (make-immutable-hash null)) (set! waiter #f)))))))) (define (get-redirects queued uri) (let loop ([uri (url->string uri)]) (when uri (queue:add-source queued uri) (loop (hash-ref *redirects* uri (λ () #f))))) (reset-redirects-later)) (define (image-type? headers) (call/cc (λ (return) (let ([type (dict-ref headers "Content-Type" (λ () (return #f)))]) (when type (when (regexp-match #rx"^image/" type) (let ([size (dict-ref headers "Content-Length" (λ () #f))]) (when (not size) (return #t)) ; can't tell (when (< (string->number size) 4096) ; arbitrary (return #f)) (return #t))))) (return #f)))) (define (send-request input request output) (display (format "~a ~a HTTP/1.1\r\n" (string-upcase (symbol->string (request-method request))) (url->string (request-uri request))) output) (let ([chunked? #f] [close-it? #f]) (for-each (λ (header) (when (and (equal? (header-field header) "Content-Transfer-Encoding") (regexp-match #rx"chunked" (header-value header))) (set! chunked? #t)) (when (and (equal? (header-field header) "Connection") (regexp-match #rx"close" (header-value header))) (set! close-it? #t)) (when (equal? (header-field header) "Keep-alive") (set! close-it? #f)) (display (format "~a: ~a\r\n" (header-field header) (header-value header)) output)) (request-headers/raw request)) (display "\r\n" output) (flush-output output) (if chunked? (copy-chunked-port input output) (if close-it? (copy-close-port input output) ; otherwise it would hang if we tried (copy-port input output) ; we need to check if there's a size, and if not, copy NOTHING (let ([value (headers-assq* #"Content-Length" (request-headers/raw request))]) (when value (copy-length-port input (string->number value) output))))) ; otherwise the content length must be 0. (flush-output output))) (define (handle-response code uri response-headers input output chunked? close-it?) (let ([copy (if close-it? copy-close-port (if chunked? copy-chunked-port (let ([length (call/cc (λ (return) (string->number (dict-ref response-headers "Content-Length" (λ () (return #f))))))]) (if (and length (> length 0)) (λ (input . outputs) (apply copy-length-port input length outputs)) (λ (input . outputs) ; this better not have any data... (void))))))]) (if (check-for-redirection? code uri response-headers) (copy input output) (if (not (image-type? response-headers)) (copy input output) (let ([id (queue:find-by-source uri (λ () #f))]) (if id (begin (when (close-it?) (close-input-port input)) (call-with-input-file (filedb:queue id) (λ (input) (copy input output))) (get-redirects id uri)) (connect:with-transaction (let ([id (queue:create)]) (filedb:with-uncertain-file (λ (file) (copy input output (open-output-file file #:exists 'truncate)) (rename-file-or-directory file (filedb:queue id)))) (get-redirects id uri))))))))) (define (dispatch conn request) (with-handlers ([exn:fail:user? (λ (e) ((error-display-handler) (exn-message e) e)(display "\n") (raise e))]) (call-with-connection (λ (input output) (display "Connected!\n") (send-request (connection-i-port conn) request output) (display "Sent boo.\n") (receive-response input request (connection-o-port conn)))))) (define (receive-response input request output) (let-values ([(version code message) (parse-status-line (read-net-line input output))]) (let ([response-headers (parse-headers input output)] [uri (request-uri request)]) ; (write-bytes #"\r\n" output) ; we have to ask, is the response chunked or close? already checked the request. (let ([chunked? (call/cc (λ (return) (regexp-match #rx"chunked" (dict-ref response-headers "Content-Transfer-Encoding" (λ () (return #f))))))] [close-it? (call/cc (λ (return) (regexp-match #rx"close" (dict-ref response-headers "Connection" (λ () (return #f))))))]) (handle-response code uri response-headers input output chunked? close-it?))))) (define (startup) (let ([shutdown (serve #:dispatch dispatch #:port 12323)]) (unwind-protect (λ () (let loop () (sleep 9001) (loop))) shutdown))) (startup) (startup)