base.ss
#lang scheme/base
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; THREAD.plt
;;
;; microthread-based toolkit.  Inspired by Erlang.
;;
;; Bonzai Lab, LLC.  All rights reserved.
;;
;; released under LGPL.
;;
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; base.ss
;; implementation of the erlang selective receive pattern.
;; yc 8/31/2009
(require mzlib/trace
         scheme/contract
         scheme/match
         (for-syntax scheme/base)
         scheme/stxparam
         )
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; implementation of the erlang selective receive pattern.

;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; non-match is a private thread-cell (which holds different values for
;; differen threads) to implement the holding queue for non-matched messages.
(define non-match (make-thread-cell '())) 

;; helper functions that makes non-match easier to operate.
;; get the values from the queue
(define (ref)
  (thread-cell-ref non-match))

;; push onto the queue
(define (push! v)
  (let ((lst (cons v (ref))))
    ;; (display (format "(push! ~a)\n" (current-thread)))
    (thread-cell-set! non-match lst)))

;; get all of the value from the queue onto the thread mailbox
(define (rewind!)
  (let ((v (thread-cell-ref non-match)))
    (thread-cell-set! non-match '())
    (thread-rewind-receive v)))
;; once a while we ought to *clear* the mailbox queues!!... hmm...

;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(define-syntax-parameter it
  (lambda (stx) (raise-syntax-error #f "illegal use" stx)))

;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; receive/match (or shall it be thread-receive/match ??)
;; this is the selective receive pattern based on erlang.
;; (receive/match (pat exp ...) ... (after time exp ...))
;; (after 0 ...) will immediately kick into the after clause.
;; (receive/match (path exp ...) ... (after time exp ...) (sync lst (test? exp ...) ...))
(define-syntax receive/match
  (syntax-rules (after else sync)
    ((~ (pat exp exp2 ...) ... (else e1 e2 ...) (after time a1 a2 ...))
     (raise-syntax-error 'receive/match "else keyword is not supported"))
    ((~ (pat exp exp2 ...) ... (else eexp eexp2 ...))
     (raise-syntax-error 'receive/match "else keyword is not supported"))
    ((~ lst (sync (pat1 e1 e2 ...) ...))
     (let ((evt (apply sync lst)))
       (match evt (pat1 e1 e2 ...) ...)))
    ;; #|
    ((~ lst (pat exp exp2 ...) ... (after time a1 a2 ...) (sync (pat1 e1 e2 ...) ...))
     (let loop ((alarm (alarm-evt (+ (current-inexact-milliseconds) (* 1000 time))))
                (thd-evt (thread-receive-evt)))
       (let ((evt (apply sync alarm thd-evt lst)))
         (cond ((eq? evt thd-evt)
                (let ((v (thread-receive)))
                  (match v (pat (rewind!) exp exp2 ...)
                         ...
                         (else (push! v)
                               (loop alarm thd-evt)))))
               ((eq? evt alarm) a1 a2 ...)
               (else
                (match evt (pat1 e1 e2 ...) ...))))))
    ((~ (pat exp exp2 ...) ... (after time aexp aexp2 ...))
     (let loop ((alarm (alarm-evt (+ (current-inexact-milliseconds) (* 1000 time))))
                (thd-evt (thread-receive-evt)))
       (cond ((eq? (sync alarm thd-evt) thd-evt)
              (let ((v (thread-receive)))
                (match v (pat (rewind!) exp exp2 ...)
                       ...
                       (else (push! v)
                             (loop alarm thd-evt)))))
             (else aexp aexp2 ...))))
    ((~ lst (pat exp exp2 ...) ... (sync (pat1 e1 e2 ...) ...))
     (let loop ((thd-evt (thread-receive-evt)))
       (let ((evt (apply sync thd-evt lst)))
         (cond ((eq? evt thd-evt)
                (let ((v (thread-receive)))
                  (match v (pat (rewind!) exp exp2 ...)
                         ...
                         (else (push! v)
                               (loop thd-evt)))))
               (else
                (match evt (pat1 e1 e2 ...) ...))))))
    ((~ (pat exp exp2 ...) ...)
     (let loop ((v (thread-receive)))
       (match v
              (pat (rewind!) exp exp2 ...)
              ...
              (else (push! v)
                    (loop (thread-receive))))))
    ((~ (after time exp exp2 ...))
     (begin
       (sync/timeout time)
       exp exp2 ...))
    ))


;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; utility functions...
(define (self? thd)
  (eq? (current-thread) thd))

(define (==? x)
  (lambda (y)
    (eq? x y))) 


;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; send-exn-to
;; send the exn to the originator thread.
(define (send-exn-to exn thd (on-behalf (current-thread)))
  (thread-send thd (cons exn on-behalf) #f))

;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; curry the send-exn-to
(define (make-send-exn-to thd)
  (lambda (e)
    (send-exn-to e thd)))

;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; thread/link
;; a one-way link.... this might not be made available in the future!!
(define (thread/link proc (thd (current-thread)))
  (thread (lambda ()
            (with-handlers ((exn?
                             (make-send-exn-to thd)))
              (proc)))))

;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; call & cast
;; these are specific patterns

;; cast* - send the args in a list format.
(define (thread-cast* thd . args)
  (thread-resume thd (current-thread))
  (thread-send thd args)) 

;; cast - call without waiting for the return of the value...
(define (thread-cast thd arg)
  (thread-resume thd (current-thread))
  (thread-send thd (list arg)))
;; (trace thread-cast)

;; call - cast + wait for the return
(define (thread-call thd arg (timeout +inf.0))
  (thread-resume thd (current-thread))
  (thread-send thd (list (current-thread) arg))
  (let ((sender? (==? thd)))
    (receive/match
     ((list (? sender? thd) v) v)
     ((cons (? exn? e) (? sender? thd))
      (raise e))
     (after timeout
            (error 'thread-call "time-out")))))

;; thread-reply ought to be used to return the value
;; back to a thread-call.  their signature match each other
(define (thread-reply thd v (on-behalf (current-thread)))
  (thread-send thd (list on-behalf v) #f))
;; (trace thread-reply)

(provide/contract
 (send-exn-to (->* (exn? thread?) (thread?) any))
 (make-send-exn-to (-> thread? (-> exn? any)))
 (thread/link (->* (procedure?) (thread?) thread?))
 (self? (-> any/c any))
 (==? (-> any/c any))
 (thread-call (->* (thread? any/c)
                   (number?)
                   any))
 (thread-cast (-> thread? any/c any))
 (thread-cast* (->* (thread?)
                    ()
                    #:rest (listof any/c)
                    any))
 (thread-reply (->* (thread? any/c)
                    (thread?)
                    any))
 )

;; syntax export - no contracts right now...
(provide receive/match 
         )