rpc-server.scm
(module rpc-server mzscheme
        (require (planet "roos.scm" ("oesterholt" "roos.plt" 1 0)))
        (require "rpc-function-definer.scm")
        (require "rpc-io.scm")
        (require "rpc-marshal.scm")
        (require "rpc-log.scm")
        (provide rpc-server
                 (all-from (planet "roos.scm" ("oesterholt" "roos.plt" 1 0))))
        
        
        ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
        ;; Constants/Variables
        ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
        
        (define WELCOME (list "mzrpc" 0 1))
        
        ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
        ;; RPC Handler
        ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
        
        (def-class
         (this (client-handler rpc-server from-client to-client))
         (supers)
         (private
          (define _nr               #f)
          (define _connection-ended #f)
          (define _logged-in        #f)
          (define _last-chalenge    0)
          (define _autorized?       (->> rpc-server autorized?))
          (define _login-level      #f)
          )
         (public
          
          (define (notify-client argument)
            (let ((R (write* (list '%rpc-notify% (rpc-marshal argument)) to-client )))
              (not (io-error? R))))

          (define (rpc-chalenge)
            (set! _last-chalenge (+ (random 2000000000) 10000))
            _last-chalenge)

          (define (end-connection)
            (if _connection-ended
                'done
                (begin
                  (set! _connection-ended #t)
                  (close-input-port  from-client)
                  (close-output-port to-client)
                  'done)))
          
          (define (server)
            rpc-server)

          (define (handle-wrong-input)
            (if _connection-ended
                'connection-ended
                (let ((R (write* (list 'error (rpc-marshal "Wrong input")) to-client)))
                  (if (io-error? R)
                      (begin
                        (rpc-log "client: " _nr ":call-function: i/o error writing result to client.")
                        R)
                      'continue))))

          (define (login user pass)
            (let ((L (-> rpc-server login user pass _last-chalenge)))
              (set! _logged-in (if (eq? L #f) #f L))
              (rpc-log-info "client: " _nr ":login:user=" user ", result: " L)
              L))

          (define (my-rpc-call f-sym _args)
            (rpc-log-debug "my-rpc-call:" f-sym _args)
            (cond
             ((eq? f-sym 'rpc-connected)      (-> rpc-server connected))
             ((eq? f-sym 'rpc-shutdown)       (begin
                                                (rpc-log-info "client: " _nr " _logged-in: " _logged-in " rpc-shutdown")
                                                (if (eq? _logged-in 'admin)
                                                  (-> rpc-server shutdown)
                                                  'forbidden)))
             ((eq? f-sym 'rpc-force-shutdown) (begin
                                                (rpc-log-info "client: " _nr " _logged-in: " _logged-in " rpc-shutdown")
                                                (if (eq? _logged-in 'admin)
                                                  (-> rpc-server force-shutdown)
                                                  'forbidden)))
             ((eq? f-sym 'rpc-login)          (let ((result (apply login _args)))
                                                (set! _login-level result)
                                                result))
             ((eq? f-sym 'rpc-chalenge)       (rpc-chalenge))
             (else (rpc-fcall this f-sym _args))))
          
          (define (autorized? f-sym level)
            (or (eq? f-sym 'rpc-login) (eq? f-sym 'rpc-end)
                (eq? f-sym 'rpc-chalenge) (_autorized? f-sym level)))

          (define (call-function rpc)
            ;(rpc-log-info "call-function: " rpc)
            (let ((f-sym (if (and (list? rpc) (not (null? rpc))) (car rpc) '%rpc-wrong-input%))
                  (args  (if (and (list? rpc) (not (null? rpc))) (cdr rpc) (list))))
              (let ((_error #f)
                    (_result #f))
                (let ((_args
                       (with-handlers ((exn:fail? (lambda (exn)
                                                    (set! _error (format "~a" (exn-message exn)))
                                                    #f)))
                                      (map rpc-de-marshal args))))
                  (if (eq? _args #f)
                      (begin
                        (rpc-log-warn "client: " _nr ":call: " rpc " has marshaling problem: " _error)
                        (set! _result (list 'error (rpc-marshal "Marshaling problem."))))
                      (begin
                        (if (or (eq? f-sym 'rpc-login) (eq? f-sym 'rpc-end)
                                (eq? f-sym 'rpc-chalenge) (not (eq? _logged-in #f)) )
                            (if (eq? f-sym '%rpc-wrong-input%)
                                (begin
                                  (rpc-log-warn "client: " _nr ": this is no function call: " rpc)
                                  (set! _result (list 'error (rpc-marshal "This is no function call."))))
                                (if (autorized? f-sym _login-level)
                                    (let ((C (rpc-check f-sym _args)))
                                      (let ((result (if (eq? C #t)
                                                        (list 'rpc   (rpc-marshal (my-rpc-call f-sym _args)))
                                                        (begin
                                                          (rpc-log-warn (format "~s" rpc) " : error : " C)
                                                          (list 'error (rpc-marshal C))))))
                                        (set! _result result)))
                                    (begin
                                      (rpc-log-warn rpc " : client not autorized to call this function.")
                                      (set! _result (list 'error (rpc-marshal "You are not autorized to call this function."))))
                                    ))
                            (if (eq? f-sym 'rpc-login)
                                (let ((C (rpc-check f-sym _args)))
                                  (let ((result (if (eq? C #t)
                                                    (list 'rpc   (rpc-marshal (my-rpc-call f-sym _args)))
                                                    (begin
                                                      (rpc-log-warn rpc " : error : " C)
                                                      (list 'error (rpc-marshal C))))))
                                    (set! _result result)))
                                (begin
                                  (rpc-log-warn "client: " _nr " not logged in:trying to execute function: " rpc)
                                  (set! _result (list 'error (rpc-marshal "Login first")))))))))
                (let ((R (write* _result to-client)))
                  (if (io-error? R)
                      (begin
                        (rpc-log "client: " _nr ":call-function: i/o error writing result to client.")
                        R)
                      (if (eq? f-sym 'rpc-end)
                          'stop
                          'continue))))))

          (define (handle-rpc)
            (let ((rpc (read* from-client)))
              ;(rpc-log-info "handle-rpc:" rpc)
              (if (eof-object? rpc)
                  (end-connection)
                  (if (wrong-input? rpc)
                      (if (eq? (handle-wrong-input) 'continue)
                          (handle-rpc)
                          (end-connection))
                      (if (eq? (call-function rpc) 'continue)
                          (handle-rpc)
                          (end-connection))))))

          (define (number)
            _nr)
          
          (define (id)
            _nr)

          (define (run)
            (set! _nr (-> rpc-server start-client this))
            (write* WELCOME to-client)
            (newline* to-client)
            (-> rpc-server client-handler-started this)
            (handle-rpc)
            (-> rpc-server client-handler-ended this)
            (-> rpc-server end-client this)
            
            )
          
          )
         (constructor)
         )

        ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
        ;; Exported functions/macros
        ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

        ;;;; rpc-server
        (def-class
         (roos-doc (sp "This class provides a rpc server. It is initialized with a port number <port> "
                       "and a login provider <_login-provider>. The login provider is a function of 3 arguments: "
                       (s% "(lambda (user pass chalenge) ...)") ". The login provider must determine if the given "
                       "user <user> can login with the given password <pass>, given the given <chalenge>. It must "
                       "return #f, if the user cannot login and a symbol indicating the autorisation context otherwise.")
                   (s== "Example")
                   (sverb "(require (planet \"roos.scm\" (\"oesterholt\" \"roos.plt\" 1 0)))"
                          "(require \"mzrpc.scm\")"
                          ""
                          ";; define an function that can becalled over rpc"
                          "(rpc-define (plus (a number?) (b number?)) (+ a b))"
                          ""
                          ";; define an overridden class for rpc-server."
                          "(def-class"
                          "  (this (my-rpc-server . args))"
                          "  (supers (apply rpc-server args))"
                          "  (private"
                          "    (define _clients (make-hash-table))"
                          "    (define (notifier c i)"
                          "      (sleep 1)"
                          "      (-> c notify-client i)"
                          "      (notifier c (+ i 1)))"
                          "   )"
                          "  (public"
                          "    (define (client-handler-started client)"
                          "      (display \"client-handler-started called\\n\")"
                          "      (hash-table-put! _clients client (thread (lambda ()"
                          "                                                  (notifier client 0)))))"
                          ""
                          "     (define (client-handler-ended client)"
                          "       (display \"client-handler-ended called\\n\")"
                          "       (let ((t (hash-table-get _clients client)))"
                          "         (kill-thread t)))"
                          "  )"
                          "  (constructor)"
                          " )"
                          ""
                          ";; Define and call the server."
                          "(define S (my-rpc-server 4002 (lambda (user pass chalenge) 'admin)))"
                          "(-> S add '(admin) plus)"
                          "(-> S run)"
                          )
          )
         (this (rpc-server port _login-provider))
         (supers)
         (private
          (define _rpc-functions (make-hash-table))
          (define _stop-server   #f)
          (define _clients       (make-hash-table))
          (define _connected     0)
          (define _nclients      0)
          (define _listener      #f)

          (define (cleanup)
            (hash-table-for-each _clients
                                 (lambda (K C)
                                   (-> C end-connection)))
            (tcp-close _listener)
            #t)

          (define (connect-to-server)
            (call-with-values
             (lambda () (tcp-connect "localhost" port))
             (lambda (from-server to-server)
               (close-input-port from-server)
               (close-output-port to-server))))

          )
         (public
          (define (start-client C)
            (hash-table-put! _clients C C)
            (set! _connected (+ 1 _connected))
            (set! _nclients  (+ 1 _nclients))
            (rpc-log-info "Client rpc handler " _nclients " started, currently handling " _connected " clients.")
            _nclients)

          (define (end-client C)
            (hash-table-remove! _clients C)
            (set! _connected (- _connected 1))
            (rpc-log-info "Client rpc handler, ended client " (-> C number) "  currently handling "  _connected " clients.")
            )

          (define (connected)
            _connected)

          (define (shutdown)
            (if (= _connected 1)
                (begin
                  (set! _stop-server #t)
                  (connect-to-server)
                  'ok)
                'clients-connected))

          (define (force-shutdown)
            (set! _stop-server #t)
            (connect-to-server)
            'ok)
          
          (define (autorized? fsym type)
            (let ((C (hash-table-get _rpc-functions fsym (lambda () #f))))
              (if (eq? C #f)
                  #f
                  (let ((levels (cadr C)))
                    (if (eq? (memq type levels) #f)
                        #f
                        #t)))))

          (define (login user pass chalenge)
            (_login-provider user pass chalenge))

          ((define (sp "Adds the given rpc-functions, which have been defined using " (s% "rpc-define") 
                       " to this server."))
           (add levels . rpc-functions)
             (for-each (lambda (rpc-f)
                         (let ((f-symbol (rpc-get-sym rpc-f)))
                           (if (eq? f-symbol #f)
                               (error (format "Function ~s is not defined as an RPC function." rpc-f))
                               (hash-table-put! _rpc-functions f-symbol (list rpc-f levels)))))
                       rpc-functions))

          (define (do-run)
            (if _stop-server
                (cleanup)
                (begin
                  (call-with-values
                   (lambda () (tcp-accept _listener))
                   (lambda (from-client to-client)
                     (if (not _stop-server)
                         (thread (lambda ()
                                   (let ((obj (client-handler this from-client to-client)))
                                     (-> obj run)))))))
                  (do-run))))

          ((define (sp "Run this server. It will start handling clients."))
           (run)
            (set! _listener (tcp-listen port 1000 #t))
            (do-run))
          
          ((define (sp "This function can be overridden. It is called when a client connects with "
                       "the client object that handles the client."))
           (client-handler-started client)
            #t)
          
          ((define (sp "This function can be overridden. It is called when a client disconnects with "
                       "the client object that handles the client."))
           (client-handler-ended client)
            #t)
          )
         (constructor

          )
         )
        
        );;;; module-end