filter.ss
#lang scheme/base
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PORT.plt - port utilities 
;;
;;
;; Bonzai Lab, LLC.  All rights reserved.
;;
;; Licensed under LGPL.
;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; filter.ss - utilities for creating and building port filters.
;; yc 9/21/2009 - first version
;; yc 10/3/2009 - fix the flush-output bug (returns 0 instead of void now)
;; yc 1/26/2010 - add a #:close? arg to determine whether to close the inner ports
;; (the default is to close... this is used for situation where you do not read
;; all of the values from the inner port)
;; yc 2/28/2010 - added channel for the filter threads to pass the exceptions to
;; the port, so the port can close down properly when there are errors...



;; filter abstracts pre-procesing of port data.
;; examples are gzipped files.  Instead of reading raw, gzipped file, one can wrap
;; the file with a filter so one can read the file as if it has never been gzipped.
;;
;; there are 2 types of filters:
;; input-filter: it takes an input-port and a filter (which is a function that takes
;; both input-port & output-port) and produces an input-port
;; output-filter: takes an output-port and a filter to produce an output-port
;;
;; if one wants to chain multiple filters together, remember for input-filter, the
;; inner filter gets to process the data first, and for output-filter it is reversed
;; (so the outer filter gets to process the data first).
;;
;; once a while - you might reuse code that already provide custom pipes (and handles
;; the filtering at the custom pipe level) - in those case, you want to pass in copy-port
;; for the filter, and write an custom make-pipe-helper function, which takes in a port and
;; returns an input-port and an output-port.  The best example is the crypto-file...
(require "depend.ss" "port.ss")

(define (make-pipe-helper port)
  (make-pipe (expt 2 15) (object-name port) (object-name port)))

(define (make-threaded-read in out ch)
  (define read-helper (make-default-read in))
  (lambda (bytes) 
    (let ((v (sync in ch))) 
      (if (eq? v in) 
          (read-helper bytes) 
          (raise v)))))

(define (make-threaded-peek in out ch) 
  (define peek-helper (make-default-peek in))
  (lambda (bytes skip progress-evt) 
    (let ((v (sync in ch))) 
      (if (eq? v in) 
          (peek-helper bytes skip progress-evt) 
          (raise v)))))

(define (make-input-filter-port/1 input-port filter make-pipe close?) 
  (define (filter-helper)
    (if (not filter) copy-port filter))
  (define (pipe-helper)
    (if (not make-pipe) make-pipe-helper make-pipe))
  (let-values (((in out)
                ((pipe-helper) input-port)))
    (let* ((ch (make-channel)) 
           (thd (thread 
                 (lambda () 
                   (with-handlers ((exn? (lambda (e) 
                                           (close-output-port out)
                                           (channel-put ch e))))
                     (cond ((not close?)
                            ((filter-helper) input-port out)
                            (close-output-port out))
                           (else
                            (call-with-input-port input-port
                                                  (lambda (in)
                                                    ((filter-helper) in out)
                                                    (close-output-port out))))))))))
      (make-input-port (object-name in)
                       (make-threaded-read in out ch)
                       (make-threaded-peek in out ch)
                       (lambda () 
                         (kill-thread thd)
                         (close-output-port out)
                         (unless (not close?)
                           (close-input-port input-port))
                         (close-input-port in))))))

(define (make-threaded-write out in ch) 
  (define write-helper (make-default-write out))
  (lambda (bytes-out start end block? break?)
    (let ((v (sync out ch))) 
      (if (eq? v out) 
          (write-helper bytes-out start end block? break?)
          (begin
            (close-input-port in) 
            (raise v))))))

(define (make-output-filter-port/1 output-port filter make-pipe)
  (define (filter-helper)
    (if (not filter) copy-port filter))
  (define (pipe-helper)
    (if (not make-pipe) make-pipe-helper make-pipe))
  (let-values (((in out)
                ((pipe-helper) output-port)))
    (let* ((ch (make-channel))
           (thd (thread (lambda () 
                          (with-handlers ((exn? (lambda (e) 
                                                  (close-input-port in)
                                                  (channel-put ch e))))
                            (call-with-output-port output-port
                                                   (lambda (out)
                                                     ((filter-helper) in out)
                                                     (close-input-port in)))))))
           (thd-dead? (thread-dead-evt thd)))
      (make-output-port (object-name in)
                        always-evt
                        (make-threaded-write out in ch)
                        (lambda () 
                          (begin0 (close-output-port out)
                                  (sync thd-dead?)))))))

(define (make-input-filter-port in filter make-pipe #:close? (close? #t). rest) 
  ;; the outer filter gets to handle first... ;; so what we want to do is to *reverse the whole thing!!!
  (define (helper in filter make-pipe rest)
    (cond ((null? rest) ;; we are done!!!
           (make-input-filter-port/1 in filter make-pipe close?))
          ((null? (cdr rest)) 
           (error 'make-input-filter-port "uneven filter/pipe pairs: ~a" rest))
          (else
           (make-input-filter-port/1 (helper in (car rest) (cadr rest) (cddr rest))
                                      filter make-pipe
                                      close?))))
  (helper in filter make-pipe rest))

(define (make-output-filter-port out filter make-pipe . rest)
  ;; the outer filter gets to handle first... ;; so what we want to do is to *reverse the whole thing!!!
  (define (helper out filter make-pipe rest)
    (cond ((null? rest) ;; we are done!!!
           (make-output-filter-port/1 out filter make-pipe))
          ((null? (cdr rest)) 
           (error 'make-output-filter-port "uneven filter/pipe pairs: ~a" rest))
          (else
           (make-output-filter-port/1 (helper out (car rest) (cadr rest) (cddr rest))
                                      filter make-pipe))))
  (helper out filter make-pipe rest))

;; how
(define make-pipe/c 
  (-> port? (values input-port? output-port?)))

(define port-filter/c
  (-> input-port? output-port? any))

(provide/contract 
 (make-input-filter-port (->* (input-port? (or/c #f port-filter/c) (or/c #f make-pipe/c))
                              (#:close? boolean?)
                              #:rest (listof (or/c #f procedure?))
                              input-port?))
 (make-output-filter-port (->* (output-port? (or/c #f port-filter/c) (or/c #f make-pipe/c))
                               ()
                               #:rest (listof (or/c #f procedure?))
                               output-port?))
 )

(provide make-pipe/c port-filter/c)