network/event-loop.rkt
#lang racket
(require
 srfi/2
 srfi/31
 scheme/nest
 "message.rkt"
 "../exception.rkt"
 "../errno.rkt")

(define (start-processor t-handler cleanup)
  (let ([t-threads (and t-handler (make-hasheqv))]
        [r-channels (current-tags)]
        [r-fids (current-fids)])
    (thread
     (λ ()
       (dynamic-wind
        void
        (rec (loop)
          (nest [(let/ec break)
                 (let ([ready (sync (current-input-port) (thread-receive-evt))]))]
            (cond
              [(input-port? ready)
               (match (read-message)
                 [(struct message:t:flush (tag-box old-tag))
                  (cond
                    [(and t-threads (hash-ref t-threads old-tag #f))
                     => (λ (handler)
                          (break-thread handler)
                          (thread-send handler tag-box))])]
                 [(and (struct message:t ((and (box tag) tag-box))) request)
                  (if t-handler
                      (let ([reply-to (current-thread)])
                        (thread
                         (λ ()
                           (hash-set! t-threads tag (current-thread))
                           (with-handlers ([exn:break?
                                            (λ (exn)
                                              (let more ()
                                                (and-let* ([flush-box (thread-try-receive)])
                                                  (thread-send
                                                   reply-to (make-message:r:flush flush-box))
                                                  (more))))]
                                           [exn:fail:filesystem:9p?
                                            (λ (exn)
                                              (thread-send
                                               reply-to
                                               (make-message:r:error tag-box (exn-message exn))))])
                             (match (t-handler request)
                               [(and (struct message:r (tag-box)) response)
                                (unless (eqv? (unbox tag-box) tag)
                                  (set-box! tag-box tag))
                                (thread-send reply-to response)]))
                           (hash-remove! t-threads tag))))
                      (write-message (make-message:r:error tag-box ENOSYS)))]
                 [(and (struct message:r ((box tag))) response)
                  (cond
                    [(and r-channels (hash-ref r-channels tag #f))
                     => (λ (reply-to)
                          (hash-remove! r-channels tag)
                          (channel-put reply-to response))]
                    [else
                     (log-warning (format "Dropped message: ~e" response))])]
                 [(? eof-object?)
                  (break)])]
              [else
               (match (thread-receive)
                 [(cons (and (struct message:t (tag-box)) request) (? channel? reply-to))
                  (cond
                    [(and r-fids (message:t:clunk? request))
                     (hash-remove! r-fids (message:t:clunk-fid request))]
                    [(and r-fids (message:t:remove? request))
                     (hash-remove! r-fids (message:t:remove-fid request))])
                  (if r-channels
                      (with-handlers ([exn:fail:filesystem:9p?
                                       (λ (exn)
                                         (channel-put
                                          reply-to
                                          (make-message:r:error tag-box (exn-message exn))))])
                        (write-message request)
                        (hash-set! r-channels (unbox tag-box) reply-to))
                      (channel-put reply-to (make-message:r:error tag-box ENOSYS)))]
                 [(? message:r? response)
                  (if t-handler
                      (write-message response)
                      (log-warning (format "Suppressed message: ~e" response)))])])
            (flush-output)
            (loop)))
        cleanup)))))
  
(define ((make-send-request processor) request)
  (let ([reply-to (make-channel)])
    (thread-send processor (cons request reply-to))
    (match (with-handlers ([exn:break?
                            (λ (exn)
                              (thread-send
                               processor
                               (cons (make-message:t:flush (box #f) (unbox (message-tag request)))
                                     reply-to))
                              (channel-get reply-to))])
             (channel-get reply-to))
      [(struct message:r:error (_ name))
       (raise-9p-error name)]
      [(? message:r:flush?)
       (nest [(let ([marks (current-continuation-marks)]))
              (let/ec continuation)]
         (raise (make-exn:break EINTR marks continuation)))]
      [response
       response])))

(define (start-client hostname [port-no 564] [local-hostname #f] [local-port-no #f])
  (nest [(parameterize ([current-custodian (make-custodian)]))
         (let-values ([(in out) (tcp-connect hostname port-no local-hostname local-port-no)]))
         (parameterize ([current-input-port in] [current-output-port out]
                        [current-tags (make-hasheqv)] [current-fids (make-hasheqv)]))]
    (write-message (make-message:t:version #&0 (max-message-size) "9P2000"))
    (flush-output)
    (match (read-message)
      [(struct message:r:version (_ negotiated-size negotiated-version))
       (if (and (<= negotiated-size (max-message-size)) (string=? negotiated-version "9P2000"))
           (values (make-send-request (start-processor #f void))
                   (curry hash-remove! (current-fids))
                   (current-custodian))
           (raise-9p-error EPROTONOSUPPORT))]
      [_
       (close-input-port in)
       (close-output-port out)
       (raise-9p-error EPROTO)])))

(define (start-server t-handler fid-cleanup [port-no 564] [max-allow-wait 4] [reuse? #f] [hostname #f])
  (nest [(parameterize ([current-custodian (make-custodian)]))
         (let ([original-in (current-input-port)] [original-out (current-output-port)]
               [ear (tcp-listen port-no max-allow-wait reuse? hostname)]))]
    (thread
     (rec (loop)
       (nest [(let-values ([(in out) (tcp-accept ear)]))
              (parameterize ([current-input-port in] [current-output-port out]
                             [current-tags #f] [current-fids #f]))]
         (match (read-message)
           [(struct message:t:version (tag-box suggested-size suggested-version))
            (if (regexp-match? #rx"^9P2000(\\.|$)" suggested-version)
                (parameterize ([max-message-size (min suggested-size (max-message-size))])
                  (write-message (make-message:r:version tag-box (max-message-size) "9P2000"))
                  (let ([t-fids (make-hasheqv)])
                    (hash-set! t-fids #xffffffff #f)
                    (start-processor
                     (λ (request)
                       (parameterize ([current-input-port original-in]
                                      [current-output-port original-out]
                                      [current-fids t-fids])
                         (t-handler request)))
                     (λ ()
                       (for ([file (in-hash-values t-fids)])
                         (cond [file => fid-cleanup]))))))
                (write-message (make-message:r:error tag-box EPROTONOSUPPORT)))
            (flush-output)]
           [(struct message:t (tag-box))
            (write-message (make-message:r:error tag-box EPROTO))
            (close-input-port in)
            (close-output-port out)]
           [_
            (log-error EPROTO)
            (close-input-port in)
            (close-output-port out)]))
       (loop)))
    (current-custodian)))

(provide/contract
 [start-client (->* (string?)
                    ((integer-in 1 65535) (or/c string? #f) (or/c (integer-in 1 65535) #f))
                    (values (-> message:t? message:r?)
                            (-> any/c any)
                            custodian?))]
 [start-server (->* ((-> message:t? message:r?) (-> any/c any))
                    ((integer-in 0 65535) natural-number/c any/c (or/c string? #f))
                    custodian?)])