updates from verum/gaiag.

This commit is contained in:
Jan Nieuwenhuizen 2018-06-26 20:34:07 +02:00
parent ca01e904d3
commit df4f7971aa
5 changed files with 222 additions and 208 deletions

View File

@ -1,24 +1,24 @@
(define-module (gash gash) (define-module (gash gash)
:use-module (srfi srfi-1) #:use-module (srfi srfi-1)
:use-module (srfi srfi-26) #:use-module (srfi srfi-26)
:use-module (ice-9 ftw) #:use-module (ice-9 ftw)
:use-module (ice-9 getopt-long) #:use-module (ice-9 getopt-long)
:use-module (ice-9 local-eval) #:use-module (ice-9 local-eval)
:use-module (ice-9 match) #:use-module (ice-9 match)
:use-module (ice-9 rdelim) #:use-module (ice-9 rdelim)
:use-module (ice-9 readline) #:use-module (ice-9 readline)
:use-module (ice-9 buffered-input) #:use-module (ice-9 buffered-input)
:use-module (ice-9 regex) #:use-module (ice-9 regex)
:use-module (gash job) #:use-module (gash job)
:use-module (gash pipe) #:use-module (gash pipe)
:use-module (gash peg) #:use-module (gash peg)
:use-module (gash io) #:use-module (gash io)
:use-module (gash util) #:use-module (gash util)
:export (main)) #:export (main))
(define (remove-shell-comments s) (define (remove-shell-comments s)
(string-join (map (string-join (map

View File

@ -1,6 +1,6 @@
(define-module (gash io) (define-module (gash io)
:export (stdout stderr)) #:export (stdout stderr))
(define (output port o) (define (output port o)
(map (lambda (o) (display o port)) o) (map (lambda (o) (display o port)) o)

View File

@ -1,21 +1,22 @@
(define-module (gash job) (define-module (gash job)
:use-module (srfi srfi-1) #:use-module (srfi srfi-1)
:use-module (srfi srfi-8) #:use-module (srfi srfi-8)
:use-module (srfi srfi-9) #:use-module (srfi srfi-9)
:use-module (srfi srfi-26) #:use-module (srfi srfi-26)
:use-module (gash io) #:use-module (gash io)
:use-module (gash util) #:use-module (gash util)
:export (job-control-init :export (bg
jobs report-jobs fg
new-job job-add-process
job-add-process job-control-init
add-to-process-group job-debug-id
wait job-setup-process
fg jobs
bg new-job
setup-process)) report-jobs
wait))
(define-record-type <process> (define-record-type <process>
(make-process pid command status) (make-process pid command status)
@ -25,14 +26,21 @@
(status process-status set-process-status!)) (status process-status set-process-status!))
(define-record-type <job> (define-record-type <job>
(make-job id pgid processes) (make-job id pgid processes debug-id)
job? job?
(id job-id) (id job-id)
(pgid job-pgid set-job-pgid!) (pgid job-pgid set-job-pgid!)
(processes job-processes set-job-processes!)) (processes job-processes set-job-processes!)
(debug-id job-debug-id))
(define debug-id
(let ((id -1))
(lambda ()
(set! id (1+ id))
(number->string id))))
(define (new-job) (define (new-job)
(let ((job (make-job (+ 1 (length job-table)) #f '()))) (let ((job (make-job (+ 1 (length job-table)) #f '() (debug-id))))
(set! job-table (cons job job-table)) (set! job-table (cons job job-table))
job)) job))
@ -82,27 +90,37 @@
(every (cut member <> '(Done Terminated)) state))) (every (cut member <> '(Done Terminated)) state)))
(define (add-to-process-group job pid) (define (add-to-process-group job pid)
(let* ((pgid (job-pgid job)) (let* ((interactive? (isatty? (current-error-port)))
(pgid (or pgid pid))) (pgid (if interactive?
(setpgid pid pgid) (or (job-pgid job) pid)
(getpgrp))))
(set-job-pgid! job pgid)
(when interactive? (setpgid pid pgid))
pgid)) pgid))
(define (job-add-process fg? job pid command) (define (job-add-process fg? job pid command)
(let ((pgid (add-to-process-group job pid))) (let ((pgid (add-to-process-group job pid)))
(set-job-pgid! job pgid) (set-job-pgid! job pgid)
(when fg? (tcsetpgrp (current-error-port) pgid)) (stderr "job-add-process fg?=~a\n" fg?)
(when (and #f fg?) ;; FIXME
(tcsetpgrp (current-error-port) pgid))
(set-job-processes! job (cons (make-process pid command #f) (job-processes job))))) (set-job-processes! job (cons (make-process pid command #f) (job-processes job)))))
(define (job-setup-process fg? job)
(when (isatty? (current-error-port))
(when (and #f fg?)
(tcsetpgrp (current-error-port) (add-to-process-group job (getpid))))
(map (cut sigaction <> SIG_DFL)
(list SIGINT SIGQUIT SIGTSTP SIGTTIN SIGTTOU SIGCHLD))))
(define (job-control-init) (define (job-control-init)
(let* ((interactive? (isatty? (current-error-port))) (when (isatty? (current-error-port))
(pgid (getpgrp)) (let ((pgid (getpgrp)))
(pid (getpid)))
(when interactive?
(while (not (eqv? (tcgetpgrp (current-error-port)) pgid)) (while (not (eqv? (tcgetpgrp (current-error-port)) pgid))
(kill (- pgid) SIGTTIN)) ;; oops we are not in the foreground (kill (- pgid) SIGTTIN))) ;; oops we are not in the foreground
(map (cut sigaction <> SIG_IGN) (map (cut sigaction <> SIG_IGN) (list SIGINT SIGQUIT SIGTSTP SIGTTIN SIGTTOU))
(list SIGINT SIGQUIT SIGTSTP SIGTTIN SIGTTOU)) (sigaction SIGCHLD SIG_DFL)
(sigaction SIGCHLD SIG_DFL) (let ((pid (getpid)))
(setpgid pid pid) ;; create new process group for ourself (setpgid pid pid) ;; create new process group for ourself
(tcsetpgrp (current-error-port) pid)))) (tcsetpgrp (current-error-port) pid))))
@ -120,24 +138,24 @@
(reap-jobs))))) (reap-jobs)))))
(define (wait job) (define (wait job)
(let loop () (when (job-running? job)
(let* ((pid-status (waitpid (- (job-pgid job)) WUNTRACED)) (let loop ()
(pid (car pid-status)) (let* ((pid-status (waitpid (- (job-pgid job)) WUNTRACED))
(status (cdr pid-status))) (pid (car pid-status))
(job-update job pid status) (status (cdr pid-status)))
(if (job-running? job) (loop)))) (job-update job pid status)
(tcsetpgrp (current-error-port) (getpid)) (if (job-running? job) (loop)))))
(unless (job-completed? job) (unless (job-completed? job)
(newline) (display-job job)) (newline) (display-job job))
(reap-jobs) (reap-jobs)
(last (job-status job))) (or (and (every zero? (job-status job)) 0) 1))
(define (fg index) (define (fg index)
(let ((job (job-index index))) (let ((job (job-index index)))
(cond (job (cond (job
(let ((pgid (job-pgid job))) (let ((pgid (job-pgid job)))
(tcsetpgrp (current-error-port) pgid) (tcsetpgrp (current-error-port) pgid)
(kill (- (job-pgid job)) SIGCONT)) (kill (- (job-pgid job)) SIGCONT))
(stdout (job-command job)) (stdout (job-command job))
(wait job)) (wait job))
(#t (#t
@ -146,14 +164,7 @@
(define (bg index) (define (bg index)
(let ((job (job-index index))) (let ((job (job-index index)))
(cond (job (cond (job
(map (cut set-process-status! <> #f) (job-processes job)) (map (cut set-process-status! <> #f) (job-processes job))
(kill (- (job-pgid job)) SIGCONT)) (kill (- (job-pgid job)) SIGCONT))
(#t (#t
(stderr "fg: no such job " index))))) (stderr "fg: no such job " index)))))
(define (setup-process fg? job)
(when (isatty? (current-error-port))
(when fg? (tcsetpgrp (current-error-port) (add-to-process-group job (getpid))))
(map (cut sigaction <> SIG_DFL)
(list SIGINT SIGQUIT SIGTSTP SIGTTIN SIGTTOU SIGCHLD)))
(fdes->inport 0) (map fdes->outport '(1 2))) ;; reset stdin/stdout/stderr

View File

@ -1,173 +1,176 @@
(define-module (gash pipe) (define-module (gash pipe)
:use-module (ice-9 popen) #:use-module (ice-9 curried-definitions)
:use-module (ice-9 rdelim) #:use-module (ice-9 popen)
#:use-module (ice-9 rdelim)
:use-module (srfi srfi-1) #:use-module (srfi srfi-1)
:use-module (srfi srfi-8) #:use-module (srfi srfi-8)
:use-module (srfi srfi-9) #:use-module (srfi srfi-9)
:use-module (srfi srfi-26) #:use-module (srfi srfi-26)
:use-module (gash job) #:use-module (gash job)
#:use-module (gash io)
:export (pipeline substitute)) #:export (handle-error pipeline pipeline->string substitute))
;; TODO
(define %debug-level 0)
(define (handle-error job error)
(let ((status (wait job)))
(when (not (zero? status))
(format (current-error-port) "ERROR: exit: ~a: ~s" status error)
(exit status))
status))
(define (pipe*) (define (pipe*)
(let ((p (pipe))) (let ((p (pipe)))
(values (car p) (cdr p)))) (values (car p) (cdr p))))
;; lhs rhs ;; lhs rhs
;; [source] w -> r [filter] w -> r [sink] ;; [source] w[1] -> r[0] [filter] w[1] -> r[0] [sink]
;; w[2] -> r[3] [sink]
(define (exec* command) ;; list of strings (define (exec* command) ;; list of strings
(catch #t (lambda () (apply execlp (cons (car command) command))) (catch #t (lambda () (apply execlp (cons (car command) command)))
(lambda (key . args) (format (current-error-port) "~a\n" (caaddr args)) (lambda (key . args) (format (current-error-port) "~a\n" (caaddr args))
(exit #f)))) (exit #f))))
(define (spawn-source fg? job command) (define ((tee-n file-names) inputs outputs)
(receive (r w) (pipe*) (let* ((files (map open-output-file file-names))
(let ((pid (primitive-fork))) (tees (zip files inputs outputs)))
(cond ((= 0 pid) (let loop ((tees tees))
(close r) (loop (filter-map (lambda (tee)
(setup-process fg? job) (let ((file (first tee))
(move->fdes w 1) (input (second tee))
(if (procedure? command) (output (third tee)))
(begin (when (char-ready? input)
(close-port (current-output-port)) (let ((char (read-char input)))
(set-current-output-port w) (if (not (eof-object? char))
(command) (begin (display char file)
(exit 0)) (display char output)
(exec* command))) (list file input output))
(#t #f)))))
(job-add-process fg? job pid command) tees)))
(close w) (map close outputs)))
r)))))
(define (spawn-filter fg? job src command) (define* (spawn fg? job command #:optional (input '()))
(receive (r w) (pipe*) ;;(format #t "spawn: ~a\n" (length input))
(let ((pid (primitive-fork))) (let* ((ofd '(1 2)) ;; output file descriptors 1, ...
(cond ((= 0 pid) (ifd (cond
(setup-process fg? job) ((= (length input) 0) '())
(if src (move->fdes src 0)) ((= (length input) 1) '(0))))
(close r) (pipes (map (lambda (. _) (pipe)) ofd))
(move->fdes w 1) (r (map car pipes))
(if (procedure? command) (w (map cdr pipes))
(begin (pid (primitive-fork)))
(close-port (current-input-port)) ;;(format (current-error-port) "INPUT: ~a\n" (length input))
(close-port (current-output-port)) ;;(format (current-error-port) "OUTPUT: ~a\n" (length w))
(set-current-input-port src)
(set-current-output-port w)
(command)
(exit 0))
(exec* command)))
(#t
(job-add-process fg? job pid command)
(close w)
r)))))
(define (spawn-sink fg? job src command)
(let ((pid (primitive-fork)))
(cond ((= 0 pid) (cond ((= 0 pid)
(setup-process fg? job) (job-setup-process fg? job)
(if src (move->fdes src 0)) (map close r)
(if (procedure? command) (if (procedure? command)
(begin (begin
(close-port (current-input-port)) (when (pair? input)
(set-current-input-port src) (close-port (current-input-port))
(command) (set-current-input-port (car input)))
(exit 0)) (when (pair? w)
(exec* command))) (close-port (current-output-port))
(set-current-output-port (car w)))
;;(format (current-error-port) "INPUT: ~a\n" (length input))
;;(format (current-error-port) "OUTPUT: ~a\n" (length w))
(if (thunk? command) (command)
(command input w))
(exit 0))
(begin
(map dup->fdes w ofd)
(map dup->fdes input ifd)
(exec* command))))
(#t (#t
(job-add-process fg? job pid command) (job-add-process fg? job pid command)
(and src (close src)))))) (map close w)
r))))
(define* (spawn fg? job command #:optional (input '()) (output 0))
;;(format #t "spawn: ~a ~a\n" (length input) output)
(let* ((ofd (iota output 1)) ;; output file descriptors 1, ...
(count (length input))
(start (1+ output))
(ifd (cond
((= count 0) '())
((= count 1) '(0))
((#t (cons 0 (iota (1- count) start))))))
(ifd (if (pair? input) (cons 0 ifd) ifd))
;;(foo (format #t "ifd: ~a\n" ifd))
;;(foo (format #t "ofd: ~a\n" ofd))
(pipes (map (lambda (. _) (pipe)) ofd))
(r (map car pipes))
(w (map cdr pipes))
(pid (primitive-fork)))
(cond ((= 0 pid)
(setup-process fg? job)
(map close r)
(map move->fdes w ofd)
(map move->fdes input ifd)
(if (procedure? command)
(begin
(when (pair? input)
(close-port (current-input-port))
(set-current-input-port (car input)))
(when (pair? w)
(close-port (current-output-port))
(set-current-output-port (car w)))
(command)
(exit 0))
(exec* command)))
(#t
(job-add-process fg? job pid command)
(map close w)
r))))
(define (pipeline+ fg? open? . commands)
(let* ((job (new-job))
(ports (if (> (length commands) 1)
(let loop ((input (spawn fg? job (car commands) '() 1)) ;; spawn-source
(commands (cdr commands)))
(if (null? (cdr commands))
(spawn fg? job (car commands) input (if open? 1 0)) ;; spawn-sink
(loop (spawn fg? job (car commands) input 1) ;; spawn-filter
(cdr commands))))
(spawn fg? job (car commands) `((current-input-port))))))
(if fg? (wait job) (values job ports))))
(define (pipeline fg? . commands) (define (pipeline fg? . commands)
(apply pipeline+ (cons* fg? #f commands))) (when (> %debug-level 0)
(format (current-error-port) "pipeline[~a]: COMMANDS: ~s\n" fg? commands))
(receive (r w)
(pipe*)
(move->fdes w 2)
(let* ((error-port (set-current-error-port w))
(job (new-job))
(debug-id (job-debug-id job))
(commands
(if (zero? %debug-level) commands
(fold-right (lambda (command id lst)
(let ((file (string-append debug-id "." id)))
(cons* command `("tee" ,file) lst)))
'() commands (map number->string (iota (length commands))))))
(foo (when (> %debug-level 0) (with-output-to-file debug-id (cut format #t "COMMANDS: ~s\n" commands))))
(ports (if (> (length commands) 1)
(let loop ((input (spawn fg? job (car commands) '())) ;; spawn-source
(commands (cdr commands)))
(if (null? (cdr commands))
(spawn fg? job (car commands) input) ;; spawn-sink
(loop (spawn fg? job (car commands) input) ;; spawn-filter
(cdr commands))))
(spawn fg? job (car commands) '())))) ;; spawn-sink
(when fg?
(let loop ((input ports)
(output (list (current-output-port) error-port)))
(let ((line (map read-line input)))
(let* ((input-available? (lambda (o ln) (and (not (eof-object? ln)) o)))
(line (filter-map input-available? line line))
(output (filter-map input-available? output line))
(input (filter-map input-available? input line)))
(when (pair? input)
(map display line output)
(map newline output)
(loop input output)))))
(wait job))
(move->fdes error-port 2)
(set-current-error-port error-port)
(close w)
(values job (append ports (list r))))))
(define (pipeline->string . commands)
(receive (job ports)
(apply pipeline #f commands)
(let ((output (read-string (car ports))))
(wait job)
output)))
;;(pipeline #f '("head" "-c128" "/dev/urandom") '("tr" "-dc" "A-Z0-9") (lambda () (display (read-string)))) ;;(pipeline #f '("head" "-c128" "/dev/urandom") '("tr" "-dc" "A-Z0-9") (lambda () (display (read-string))))
;;(pipeline #f '("head" "-c128" "/dev/urandom") '("tr" "-dc" "A-Z0-9") '("cat")) ;;(pipeline #f '("head" "-c128" "/dev/urandom") '("tr" "-dc" "A-Z0-9") '("cat"))
;;(pipeline #f (lambda () (display 'foo)) '("grep" "o") '("tr" "o" "e")) ;;(pipeline #f (lambda () (display 'foo)) '("grep" "o") '("tr" "o" "e"))
;; (pipeline #f ;; (pipeline #f
;; (lambda () (display "\nbin\nboot\nroot\nusr\nvar")) ;; (lambda () (display "\nbin\nboot\nroot\nusr\nvar"))
;; '("tr" "u" "a") ;; '("tr" "u" "a")
;; (lambda () (display (string-map (lambda (c) (if (eq? c #\o) #\e c)) (read-string)))) ;; (lambda () (display (string-map (lambda (c) (if (eq? c #\o) #\e c)) (read-string))))
;; '("cat") ;; '("cat")
;; (lambda () (display (read-string)))) ;; (lambda () (display (read-string))))
;; (receive (job ports) ;; (receive (job ports)
;; (pipeline+ #f #t ;; (pipeline #f
;; (lambda () (display "\nbin\nboot\nroot\nusr\nvar")) ;; (lambda ()
;; '("tr" "u" "a") ;; (display "foo")
;; (lambda () (display (string-map (lambda (c) (if (eq? c #\o) #\e c)) (read-string)))) ;; (display "bar" (current-error-port)))
;; '("cat")) ;; '("tr" "o" "e"))
;; (display (read-string (car ports)))) ;; (map (compose display read-string) ports))
;; _
;; \
;; -
;; _/
(define (pipeline->string . commands) ;; (display (pipeline->string
(let* ((fg? #f) ;; (lambda () (display "\nbin\nboot\nroot\nusr\nvar"))
(job (new-job)) ;; '("tr" "u" "a")
(output (read-string ;; (lambda () (display (string-map (lambda (c) (if (eq? c #\o) #\e c)) (read-string))))
(if (> (length commands) 1) ;; '("cat")
(let loop ((src (spawn-source fg? job (car commands))) ;; (lambda () (display (read-string)) (newline))))
(commands (cdr commands)))
(if (null? (cdr commands))
(spawn-filter fg? job src (car commands))
(loop (spawn-filter fg? job src (car commands))
(cdr commands))))
(spawn-filter fg? job #f (car commands))))))
(wait job)
output))
;; _ ;; _
;; \ ;; \

View File

@ -1,8 +1,8 @@
(define-module (gash util) (define-module (gash util)
:use-module (srfi srfi-1) #:use-module (srfi srfi-1)
:use-module (srfi srfi-26) #:use-module (srfi srfi-26)
:export (disjoin conjoin)) #:export (disjoin conjoin))
(define (disjoin . predicates) (define (disjoin . predicates)
(lambda (. arguments) (lambda (. arguments)