#lang scheme (require srfi/2 srfi/31 "message.ss" "../exception.ss" "../errno.ss") (define (start-processor t-handler cleanup) (let ([t-threads (and t-handler (make-hasheqv))] [r-channels (current-tags)] [r-fids (current-fids)]) (thread (λ () (dynamic-wind void (rec (loop) (nest [(let/ec break) (let ([ready (sync (current-input-port) (thread-receive-evt))]))] (cond [(input-port? ready) (match (read-message) [(struct message:t:flush (tag-box old-tag)) (cond [(and t-threads (hash-ref t-threads old-tag #f)) => (λ (handler) (break-thread handler) (thread-send handler tag-box))])] [(and (struct message:t ((and (box tag) tag-box))) request) (if t-handler (let ([reply-to (current-thread)]) (thread (λ () (hash-set! t-threads tag (current-thread)) (with-handlers ([exn:break? (λ (exn) (let more () (and-let* ([flush-box (thread-try-receive)]) (thread-send reply-to (make-message:r:flush flush-box)) (more))))] [exn:fail:filesystem:9p? (λ (exn) (thread-send reply-to (make-message:r:error tag-box (exn-message exn))))]) (match (t-handler request) [(and (struct message:r (tag-box)) response) (unless (eqv? (unbox tag-box) tag) (set-box! tag-box tag)) (thread-send reply-to response)])) (hash-remove! t-threads tag)))) (write-message (make-message:r:error tag-box ENOSYS)))] [(and (struct message:r ((box tag))) response) (cond [(and r-channels (hash-ref r-channels tag #f)) => (λ (reply-to) (hash-remove! r-channels tag) (channel-put reply-to response))] [else (log-warning (format "Dropped message: ~e" response))])] [(? eof-object?) (break)])] [else (match (thread-receive) [(cons (and (struct message:t (tag-box)) request) (? channel? reply-to)) (cond [(and r-fids (message:t:clunk? request)) (hash-remove! r-fids (message:t:clunk-fid request))] [(and r-fids (message:t:remove? request)) (hash-remove! r-fids (message:t:remove-fid request))]) (if r-channels (with-handlers ([exn:fail:filesystem:9p? (λ (exn) (channel-put reply-to (make-message:r:error tag-box (exn-message exn))))]) (write-message request) (hash-set! r-channels (unbox tag-box) reply-to)) (channel-put reply-to (make-message:r:error tag-box ENOSYS)))] [(? message:r? response) (if t-handler (write-message response) (log-warning (format "Suppressed message: ~e" response)))])]) (flush-output) (loop))) cleanup))))) (define ((make-send-request processor) request) (let ([reply-to (make-channel)]) (thread-send processor (cons request reply-to)) (match (with-handlers ([exn:break? (λ (exn) (thread-send processor (cons (make-message:t:flush (box #f) (unbox (message-tag request))) reply-to)) (channel-get reply-to))]) (channel-get reply-to)) [(struct message:r:error (_ name)) (raise-9p-error name)] [(? message:r:flush?) (nest [(let ([marks (current-continuation-marks)])) (let/ec continuation)] (raise (make-exn:break EINTR marks continuation)))] [response response]))) (define (start-client hostname [port-no 564] [local-hostname #f] [local-port-no #f]) (nest [(parameterize ([current-custodian (make-custodian)])) (let-values ([(in out) (tcp-connect hostname port-no local-hostname local-port-no)])) (parameterize ([current-input-port in] [current-output-port out] [current-tags (make-hasheqv)] [current-fids (make-hasheqv)]))] (write-message (make-message:t:version #&0 (max-message-size) "9P2000")) (flush-output) (match (read-message) [(struct message:r:version (_ negotiated-size negotiated-version)) (if (and (<= negotiated-size (max-message-size)) (string=? negotiated-version "9P2000")) (values (make-send-request (start-processor #f void)) (curry hash-remove! (current-fids)) (current-custodian)) (raise-9p-error EPROTONOSUPPORT))] [_ (close-input-port in) (close-output-port out) (raise-9p-error EPROTO)]))) (define (start-server t-handler fid-cleanup [port-no 564] [max-allow-wait 4] [reuse? #f] [hostname #f]) (nest [(parameterize ([current-custodian (make-custodian)])) (let ([original-in (current-input-port)] [original-out (current-output-port)] [ear (tcp-listen port-no max-allow-wait reuse? hostname)]))] (thread (rec (loop) (nest [(let-values ([(in out) (tcp-accept ear)])) (parameterize ([current-input-port in] [current-output-port out] [current-tags #f] [current-fids #f]))] (match (read-message) [(struct message:t:version (tag-box suggested-size suggested-version)) (if (regexp-match? #rx"^9P2000(\\.|$)" suggested-version) (parameterize ([max-message-size (min suggested-size (max-message-size))]) (write-message (make-message:r:version tag-box (max-message-size) "9P2000")) (let ([t-fids (make-hasheqv)]) (hash-set! t-fids #xffffffff #f) (start-processor (λ (request) (parameterize ([current-input-port original-in] [current-output-port original-out] [current-fids t-fids]) (t-handler request))) (λ () (for ([file (in-hash-values t-fids)]) (cond [file => fid-cleanup])))))) (write-message (make-message:r:error tag-box EPROTONOSUPPORT))) (flush-output)] [(struct message:t (tag-box)) (write-message (make-message:r:error tag-box EPROTO)) (close-input-port in) (close-output-port out)] [_ (log-error EPROTO) (close-input-port in) (close-output-port out)])) (loop))) (current-custodian))) (provide/contract [start-client (->* (string?) ((integer-in 1 65535) (or/c string? #f) (or/c (integer-in 1 65535) #f)) (values (-> message:t? message:r?) (-> any/c any) custodian?))] [start-server (->* ((-> message:t? message:r?) (-> any/c any)) ((integer-in 0 65535) natural-number/c any/c (or/c string? #f)) custodian?)])