generalize pipe for multiple in- and out-puts

This commit is contained in:
Rutger van Beusekom 2018-02-22 16:17:43 +01:00
parent b6c566e989
commit ca01e904d3
1 changed files with 81 additions and 17 deletions

View File

@ -82,25 +82,77 @@
(and src (close src))))))
(define (pipeline fg? . commands)
(let ((job (new-job)))
(if (> (length commands) 1)
(let loop ((src (spawn-source fg? job (car commands)))
(commands (cdr commands)))
(if (null? (cdr commands))
(spawn-sink fg? job src (car commands))
(loop (spawn-filter fg? job src (car commands))
(cdr commands))))
(spawn-sink fg? job #f (car commands)))
(if fg? (wait job))))
(define* (spawn fg? job command #:optional (input '()) (output 0))
;;(format #t "spawn: ~a ~a\n" (length input) output)
(let* ((ofd (iota output 1)) ;; output file descriptors 1, ...
(count (length input))
(start (1+ output))
(ifd (cond
((= count 0) '())
((= count 1) '(0))
((#t (cons 0 (iota (1- count) start))))))
(ifd (if (pair? input) (cons 0 ifd) ifd))
;;(foo (format #t "ifd: ~a\n" ifd))
;;(foo (format #t "ofd: ~a\n" ofd))
(pipes (map (lambda (. _) (pipe)) ofd))
(r (map car pipes))
(w (map cdr pipes))
(pid (primitive-fork)))
(cond ((= 0 pid)
(setup-process fg? job)
(map close r)
(map move->fdes w ofd)
(map move->fdes input ifd)
(if (procedure? command)
(begin
(when (pair? input)
(close-port (current-input-port))
(set-current-input-port (car input)))
(when (pair? w)
(close-port (current-output-port))
(set-current-output-port (car w)))
(command)
(exit 0))
(exec* command)))
(#t
(job-add-process fg? job pid command)
(map close w)
r))))
;;(pipeline #t (list "sleep" "10"))
(define (pipeline+ fg? open? . commands)
(let* ((job (new-job))
(ports (if (> (length commands) 1)
(let loop ((input (spawn fg? job (car commands) '() 1)) ;; spawn-source
(commands (cdr commands)))
(if (null? (cdr commands))
(spawn fg? job (car commands) input (if open? 1 0)) ;; spawn-sink
(loop (spawn fg? job (car commands) input 1) ;; spawn-filter
(cdr commands))))
(spawn fg? job (car commands) `((current-input-port))))))
(if fg? (wait job) (values job ports))))
(define (pipeline fg? . commands)
(apply pipeline+ (cons* fg? #f commands)))
;;(pipeline #f '("head" "-c128" "/dev/urandom") '("tr" "-dc" "A-Z0-9") (lambda () (display (read-string))))
;;(pipeline #f '("head" "-c128" "/dev/urandom") '("tr" "-dc" "A-Z0-9") '("cat"))
;;(pipeline #f (lambda () (display 'foo)) '("grep" "o") '("tr" "o" "e"))
(pipeline #f
(lambda () (display "bin\nboot\nroot\ntoot\nusr\nvar"))
;;'("tr" "o" "e")
(lambda () (display (string-map (lambda (c) (if (eq? c #\o) #\e c)) (read-string))))
(lambda () (display (read-string)) (newline)))
;; (pipeline #f
;; (lambda () (display "\nbin\nboot\nroot\nusr\nvar"))
;; '("tr" "u" "a")
;; (lambda () (display (string-map (lambda (c) (if (eq? c #\o) #\e c)) (read-string))))
;; '("cat")
;; (lambda () (display (read-string))))
;; (receive (job ports)
;; (pipeline+ #f #t
;; (lambda () (display "\nbin\nboot\nroot\nusr\nvar"))
;; '("tr" "u" "a")
;; (lambda () (display (string-map (lambda (c) (if (eq? c #\o) #\e c)) (read-string))))
;; '("cat"))
;; (display (read-string (car ports))))
(define (pipeline->string . commands)
(let* ((fg? #f)
@ -117,6 +169,18 @@
(wait job)
output))
;; _
;; \
;; -
;; _/
;; (display (pipeline->string
;; (lambda () (display "\nbin\nboot\nroot\nusr\nvar"))
;; '("tr" "u" "a")
;; (lambda () (display (string-map (lambda (c) (if (eq? c #\o) #\e c)) (read-string))))
;; '("cat")
;; (lambda () (display (read-string)) (newline))))
(define (substitute . commands)
(string-trim-right
(string-map (lambda (c)