gash/sh/pipe.scm

155 lines
4.6 KiB
Scheme

(define-module (sh pipe)
:use-module (ice-9 popen)
:use-module (ice-9 pretty-print)
:use-module (srfi srfi-1)
:use-module (srfi srfi-8)
:use-module (srfi srfi-9)
:use-module (srfi srfi-26)
:export (pipeline job-control-init))
(define (stdout . o)
(map display o)
(newline))
(define (stderr . o)
(map (cut display <> (current-error-port)) o)
(newline))
(define-record-type <process>
(make-process pid command status)
process?
(pid process-pid)
(command process-command)
(status process-status set-process-status!))
(define-record-type <job>
(make-job id pgid processes)
job?
(id job-id)
(pgid job-pgid set-job-pgid!)
(processes job-processes set-job-processes!))
(define job-table '()) ;; list of <job>
(define (status->state status)
(cond ((status:exit-val status) 'completed)
((status:term-sig status) 'terminated)
((status:stop-sig status) 'stopped)
(#t 'running)))
(define (job-status job)
(process-status (car (job-processes job))))
(define (job-update job pid status)
(unless (= 0 pid)
(let ((proc (find (compose (cute eqv? pid <>) process-pid) (job-processes job))))
(set-process-status! proc status))))
(define (job-running? job)
(find (compose not process-status) (job-processes job)))
(define (add-to-process-group job pid)
(let* ((pgid (job-pgid job))
(pgid (or pgid pid)))
(setpgid pid pgid)
pgid))
(define (job-add-process job pid command)
(let ((pgid (add-to-process-group job pid)))
(set-job-pgid! job pgid)
(tcsetpgrp (current-error-port) pgid)
(set-job-processes! job (cons (make-process pid command #f) (job-processes job)))))
(define (job-control-init)
(let* ((interactive? (isatty? (current-error-port)))
(pgid (getpgrp))
(pid (getpid)))
(when interactive?
(while (not (eqv? (tcgetpgrp (current-error-port)) pgid))
(kill (- pgid) SIGTTIN)) ;; oops we are not in the foreground
(map (cut sigaction <> SIG_IGN)
(list SIGINT SIGQUIT SIGTSTP SIGTTIN SIGTTOU))
(sigaction SIGCHLD SIG_DFL)
(setpgid pid pid) ;; create new process group for ourself
(tcsetpgrp (current-error-port) pid))))
(define (pipe*)
(let ((p (pipe)))
(values (car p) (cdr p))))
;; lhs rhs
;; [source] w -> r [filter] w -> r [sink]
(define (exec* command) ;; list of strings
(apply execlp (cons (car command) command)))
(define (setup-process job)
(tcsetpgrp (current-error-port) (add-to-process-group job (getpid)))
(map (cut sigaction <> SIG_DFL)
(list SIGINT SIGQUIT SIGTSTP SIGTTIN SIGTTOU SIGCHLD))
(fdes->inport 0) (map fdes->outport '(1 2))) ;; reset stdin/stdout/stderr
(define (spawn-source job interactive? command)
(receive (r w) (pipe*)
(let ((pid (primitive-fork)))
(cond ((= 0 pid) (close r)
(setup-process job)
(move->fdes w 1)
(exec* command))
(#t
(job-add-process job pid command)
(close w)
r)))))
(define (spawn-filter job interactive? src command)
(receive (r w) (pipe*)
(let ((pid (primitive-fork)))
(cond ((= 0 pid)
(setup-process job)
(move->fdes src 0)
(close r)
(move->fdes w 1)
(exec* command))
(#t
(job-add-process job pid command)
(close w)
r)))))
(define (spawn-sink job interactive? src command)
(let ((pid (primitive-fork)))
(cond ((= 0 pid)
(setup-process job)
(and src (move->fdes src 0))
(exec* command))
(#t
(job-add-process job pid command)
(and src (close src))))))
(define (pipeline . commands)
(let ((interactive? (isatty? (current-error-port)))
(job (make-job (length job-table) #f '())))
(set! job-table (cons job job-table))
(if (> (length commands) 1)
(let loop ((src (spawn-source job interactive? (car commands)))
(commands (cdr commands)))
(if (null? (cdr commands))
(spawn-sink job interactive? src (car commands))
(loop (spawn-filter job interactive? src (car commands))
(cdr commands))))
(spawn-sink job interactive? #f (car commands)))
(let loop ()
(let* ((pid-status (waitpid WAIT_ANY WUNTRACED))
(pid (car pid-status))
(status (cdr pid-status)))
(job-update job pid status)
(if (job-running? job) (loop))))
(tcsetpgrp (current-error-port) (getpid))
;;(pretty-print job-table)
(job-status job)))
;;(pipeline (list "ls" "/")
;;(pipeline (list "ls" "/") (list "grep" "o") (list "tr" "o" "e"))