1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module proc;
7 
8 import core.sys.posix.signal : SIGKILL;
9 import core.thread : Thread;
10 import core.time : dur, Duration;
11 import logger = std.experimental.logger;
12 import std.algorithm : filter, count, joiner, map;
13 import std.array : appender, empty, array;
14 import std.exception : collectException;
15 import std.stdio : File, fileno, writeln;
16 import std.typecons : Flag, Yes;
17 static import std.process;
18 static import std.stdio;
19 
20 import my.gc.refc;
21 import my.from_;
22 
23 public import proc.channel;
24 public import proc.pid;
25 public import proc.tty;
26 
27 version (unittest) {
28     import std.file : remove;
29 }
30 
31 /** Manage a process by reference counting so that it is terminated when the it
32  * stops being used such as the instance going out of scope.
33  */
34 auto rcKill(T)(T p, int signal = SIGKILL) {
35     return refCounted(ScopeKill!T(p, signal));
36 }
37 
38 // backward compatibility.
39 alias scopeKill = rcKill;
40 
41 struct ScopeKill(T) {
42     T process;
43     alias process this;
44 
45     private int signal = SIGKILL;
46     private bool hasProcess;
47 
48     this(T process, int signal) @safe {
49         this.process = process;
50         this.signal = signal;
51         this.hasProcess = true;
52     }
53 
54     ~this() @safe {
55         if (hasProcess)
56             process.dispose();
57     }
58 }
59 
60 /// Async process wrapper for a std.process SpawnProcess
61 struct SpawnProcess {
62     import core.sys.posix.signal : SIGKILL;
63     import std.algorithm : among;
64 
65     private {
66         enum State {
67             running,
68             terminated,
69             exitCode
70         }
71 
72         std.process.Pid process;
73         RawPid pid;
74         int status_;
75         State st;
76     }
77 
78     this(std.process.Pid process) @safe {
79         this.process = process;
80         this.pid = process.osHandle.RawPid;
81     }
82 
83     ~this() @safe {
84     }
85 
86     /// Returns: The raw OS handle for the process ID.
87     RawPid osHandle() nothrow @safe {
88         return pid;
89     }
90 
91     /// Kill and cleanup the process.
92     void dispose() @safe {
93         final switch (st) {
94         case State.running:
95             this.kill;
96             this.wait;
97             break;
98         case State.terminated:
99             this.wait;
100             break;
101         case State.exitCode:
102             break;
103         }
104 
105         st = State.exitCode;
106     }
107 
108     /** Send `signal` to the process.
109      *
110      * Param:
111      *  signal = a signal from `core.sys.posix.signal`
112      */
113     void kill(int signal = SIGKILL) nothrow @trusted {
114         final switch (st) {
115         case State.running:
116             break;
117         case State.terminated:
118             goto case;
119         case State.exitCode:
120             return;
121         }
122 
123         try {
124             std.process.kill(process, signal);
125         } catch (Exception e) {
126         }
127 
128         st = State.terminated;
129     }
130 
131     /// Blocking wait for the process to terminated.
132     /// Returns: the exit status.
133     int wait() @safe {
134         final switch (st) {
135         case State.running:
136             status_ = std.process.wait(process);
137             break;
138         case State.terminated:
139             status_ = std.process.wait(process);
140             break;
141         case State.exitCode:
142             break;
143         }
144 
145         st = State.exitCode;
146 
147         return status_;
148     }
149 
150     /// Non-blocking wait for the process termination.
151     /// Returns: `true` if the process has terminated.
152     bool tryWait() @safe {
153         final switch (st) {
154         case State.running:
155             auto s = std.process.tryWait(process);
156             if (s.terminated) {
157                 st = State.exitCode;
158                 status_ = s.status;
159             }
160             break;
161         case State.terminated:
162             status_ = std.process.wait(process);
163             st = State.exitCode;
164             break;
165         case State.exitCode:
166             break;
167         }
168 
169         return st.among(State.terminated, State.exitCode) != 0;
170     }
171 
172     /// Returns: The exit status of the process.
173     int status() @safe {
174         if (st != State.exitCode) {
175             throw new Exception(
176                     "Process has not terminated and wait/tryWait been called to collect the exit status");
177         }
178         return status_;
179     }
180 
181     /// Returns: If the process has terminated.
182     bool terminated() @safe {
183         return st.among(State.terminated, State.exitCode) != 0;
184     }
185 }
186 
187 /// Async process that do not block on read from stdin/stderr.
188 struct PipeProcess {
189     import std.algorithm : among;
190     import core.sys.posix.signal : SIGKILL;
191 
192     private {
193         enum State {
194             running,
195             terminated,
196             exitCode
197         }
198 
199         std.process.ProcessPipes process;
200         std.process.Pid pid;
201 
202         FileReadChannel stderr_;
203         FileReadChannel stdout_;
204         FileWriteChannel stdin_;
205         int status_;
206         State st;
207     }
208 
209     this(std.process.Pid pid, File stdin, File stdout, File stderr) @safe {
210         this.pid = pid;
211 
212         this.stdin_ = FileWriteChannel(stdin);
213         this.stdout_ = FileReadChannel(stdout);
214         this.stderr_ = FileReadChannel(stderr);
215     }
216 
217     this(std.process.ProcessPipes process, std.process.Redirect r) @safe {
218         this.process = process;
219         this.pid = process.pid;
220 
221         if (r & std.process.Redirect.stdin) {
222             stdin_ = FileWriteChannel(this.process.stdin);
223         }
224         if (r & std.process.Redirect.stdout) {
225             stdout_ = FileReadChannel(this.process.stdout);
226         }
227         if (r & std.process.Redirect.stderr) {
228             this.stderr_ = FileReadChannel(this.process.stderr);
229         }
230     }
231 
232     /// Returns: The raw OS handle for the process ID.
233     RawPid osHandle() nothrow @safe {
234         return pid.osHandle.RawPid;
235     }
236 
237     /// Access to stdout.
238     ref FileWriteChannel stdin() return scope nothrow @safe {
239         return stdin_;
240     }
241 
242     /// Access to stdout.
243     ref FileReadChannel stdout() return scope nothrow @safe {
244         return stdout_;
245     }
246 
247     /// Access stderr.
248     ref FileReadChannel stderr() return scope nothrow @safe {
249         return stderr_;
250     }
251 
252     /// Kill and cleanup the process.
253     void dispose() @safe {
254         final switch (st) {
255         case State.running:
256             this.kill;
257             this.wait;
258             .destroy(process);
259             break;
260         case State.terminated:
261             this.wait;
262             .destroy(process);
263             break;
264         case State.exitCode:
265             break;
266         }
267 
268         st = State.exitCode;
269     }
270 
271     /** Send `signal` to the process.
272      *
273      * Param:
274      *  signal = a signal from `core.sys.posix.signal`
275      */
276     void kill(int signal = SIGKILL) nothrow @trusted {
277         final switch (st) {
278         case State.running:
279             break;
280         case State.terminated:
281             return;
282         case State.exitCode:
283             return;
284         }
285 
286         try {
287             std.process.kill(pid, signal);
288         } catch (Exception e) {
289         }
290 
291         st = State.terminated;
292     }
293 
294     /// Blocking wait for the process to terminated.
295     /// Returns: the exit status.
296     int wait() @safe {
297         final switch (st) {
298         case State.running:
299             status_ = std.process.wait(pid);
300             break;
301         case State.terminated:
302             status_ = std.process.wait(pid);
303             break;
304         case State.exitCode:
305             break;
306         }
307 
308         st = State.exitCode;
309 
310         return status_;
311     }
312 
313     /// Non-blocking wait for the process termination.
314     /// Returns: `true` if the process has terminated.
315     bool tryWait() @safe {
316         final switch (st) {
317         case State.running:
318             auto s = std.process.tryWait(pid);
319             if (s.terminated) {
320                 st = State.exitCode;
321                 status_ = s.status;
322             }
323             break;
324         case State.terminated:
325             status_ = std.process.wait(pid);
326             st = State.exitCode;
327             break;
328         case State.exitCode:
329             break;
330         }
331 
332         return st.among(State.terminated, State.exitCode) != 0;
333     }
334 
335     /// Returns: The exit status of the process.
336     int status() @safe {
337         if (st != State.exitCode) {
338             throw new Exception(
339                     "Process has not terminated and wait/tryWait been called to collect the exit status");
340         }
341         return status_;
342     }
343 
344     /// Returns: If the process has terminated.
345     bool terminated() @safe {
346         return st.among(State.terminated, State.exitCode) != 0;
347     }
348 }
349 
350 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin,
351         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
352         const string[string] env = null, std.process.Config config = std.process.Config.none,
353         scope const char[] workDir = null) {
354     return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr,
355             env, config, workDir));
356 }
357 
358 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env,
359         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
360     return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin,
361             std.stdio.stdout, std.stdio.stderr, env, config, workDir));
362 }
363 
364 SpawnProcess spawnProcess(scope const(char)[] program,
365         File stdin = std.stdio.stdin, File stdout = std.stdio.stdout,
366         File stderr = std.stdio.stderr, const string[string] env = null,
367         std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) {
368     return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin,
369             stdout, stderr, env, config, workDir));
370 }
371 
372 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin,
373         File stdout = std.stdio.stdout, File stderr = std.stdio.stderr,
374         scope const string[string] env = null, std.process.Config config = std.process.Config.none,
375         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
376     return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr,
377             env, config, workDir, shellPath));
378 }
379 
380 /// ditto
381 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env,
382         std.process.Config config = std.process.Config.none,
383         scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) {
384     return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath));
385 }
386 
387 PipeProcess pipeProcess(scope const(char[])[] args,
388         std.process.Redirect redirect = std.process.Redirect.all,
389         const string[string] env = null, std.process.Config config = std.process.Config.none,
390         scope const(char)[] workDir = null) @safe {
391     return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir), redirect);
392 }
393 
394 PipeProcess pipeShell(scope const(char)[] command,
395         std.process.Redirect redirect = std.process.Redirect.all,
396         const string[string] env = null, std.process.Config config = std.process.Config.none,
397         scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe {
398     return PipeProcess(std.process.pipeShell(command, redirect, env, config,
399             workDir, shellPath), redirect);
400 }
401 
402 /** Moves the process to a separate process group and on exit kill it and all
403  * its children.
404  */
405 @safe struct Sandbox(ProcessT) {
406     import core.sys.posix.signal : SIGKILL;
407 
408     private {
409         ProcessT p;
410         RawPid pid;
411     }
412 
413     this(ProcessT p) @safe {
414         import core.sys.posix.unistd : setpgid;
415 
416         this.p = p;
417         this.pid = p.osHandle;
418         setpgid(pid, 0);
419     }
420 
421     RawPid osHandle() nothrow @safe {
422         return pid;
423     }
424 
425     static if (__traits(hasMember, ProcessT, "stdin")) {
426         ref FileWriteChannel stdin() nothrow @safe {
427             return p.stdin;
428         }
429     }
430 
431     static if (__traits(hasMember, ProcessT, "stdout")) {
432         ref FileReadChannel stdout() nothrow @safe {
433             return p.stdout;
434         }
435     }
436 
437     static if (__traits(hasMember, ProcessT, "stderr")) {
438         ref FileReadChannel stderr() nothrow @safe {
439             return p.stderr;
440         }
441     }
442 
443     void dispose() @safe {
444         // this also reaps the children thus cleaning up zombies
445         this.kill;
446         p.dispose;
447     }
448 
449     /** Send `signal` to the process.
450      *
451      * Param:
452      *  signal = a signal from `core.sys.posix.signal`
453      */
454     void kill(int signal = SIGKILL) nothrow @safe {
455         // must first retrieve the submap because after the process is killed
456         // its children may have changed.
457         auto pmap = makePidMap.getSubMap(pid);
458 
459         p.kill(signal);
460 
461         // only kill and reap the children
462         pmap.remove(pid);
463         proc.pid.kill(pmap, Yes.onlyCurrentUser, signal).reap;
464     }
465 
466     int wait() @safe {
467         return p.wait;
468     }
469 
470     bool tryWait() @safe {
471         return p.tryWait;
472     }
473 
474     int status() @safe {
475         return p.status;
476     }
477 
478     bool terminated() @safe {
479         return p.terminated;
480     }
481 }
482 
483 auto sandbox(T)(T p) @safe {
484     return Sandbox!T(p);
485 }
486 
487 @("shall terminate a group of processes")
488 unittest {
489     import std.datetime.stopwatch : StopWatch, AutoStart;
490 
491     immutable scriptName = makeScript(`#!/bin/bash
492 sleep 10m &
493 sleep 10m &
494 sleep 10m
495 `);
496     scope (exit)
497         remove(scriptName);
498 
499     auto p = pipeProcess([scriptName]).sandbox.rcKill;
500     waitUntilChildren(p.osHandle, 3);
501     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
502     p.kill;
503     Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children
504     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
505 
506     assert(p.wait == -9);
507     assert(p.terminated);
508     assert(preChildren == 3);
509     assert(postChildren == 0);
510 }
511 
512 /** dispose the process after the timeout.
513  */
514 @safe struct Timeout(ProcessT) {
515     import core.sys.posix.signal : SIGKILL;
516     import core.thread;
517     import std.algorithm : among;
518     import std.datetime : Clock, Duration;
519 
520     private {
521         enum Msg {
522             none,
523             stop,
524             status,
525         }
526 
527         enum Reply {
528             none,
529             running,
530             normalDeath,
531             killedByTimeout,
532         }
533 
534         static struct Payload {
535             ProcessT p;
536             RawPid pid;
537             Background background;
538             Reply backgroundReply;
539         }
540 
541         RefCounted!Payload rc;
542     }
543 
544     this(ProcessT p, Duration timeout) @trusted {
545         import std.algorithm : move;
546 
547         auto pid = p.osHandle;
548         rc = refCounted(Payload(move(p), pid));
549         rc.background = new Background(&rc.p, timeout);
550         rc.background.isDaemon = true;
551         rc.background.start;
552     }
553 
554     ~this() @trusted {
555         rc.release;
556     }
557 
558     private static class Background : Thread {
559         import core.sync.condition : Condition;
560         import core.sync.mutex : Mutex;
561 
562         Duration timeout;
563         ProcessT* p;
564         Mutex mtx;
565         Msg[] msg;
566         Reply reply_;
567         RawPid pid;
568         int signal = SIGKILL;
569 
570         this(ProcessT* p, Duration timeout) {
571             this.p = p;
572             this.timeout = timeout;
573             this.mtx = new Mutex();
574             this.pid = p.osHandle;
575 
576             super(&run);
577         }
578 
579         void run() {
580             checkProcess(this.pid, this.timeout, this);
581         }
582 
583         void put(Msg msg) @trusted nothrow {
584             this.mtx.lock_nothrow();
585             scope (exit)
586                 this.mtx.unlock_nothrow();
587             this.msg ~= msg;
588         }
589 
590         Msg popMsg() @trusted nothrow {
591             this.mtx.lock_nothrow();
592             scope (exit)
593                 this.mtx.unlock_nothrow();
594             if (msg.empty)
595                 return Msg.none;
596             auto rval = msg[$ - 1];
597             msg = msg[0 .. $ - 1];
598             return rval;
599         }
600 
601         void setReply(Reply reply_) @trusted nothrow {
602             this.mtx.lock_nothrow();
603             scope (exit)
604                 this.mtx.unlock_nothrow();
605             this.reply_ = reply_;
606         }
607 
608         Reply reply() @trusted nothrow {
609             this.mtx.lock_nothrow();
610             scope (exit)
611                 this.mtx.unlock_nothrow();
612             return reply_;
613         }
614 
615         void setSignal(int signal) @trusted nothrow {
616             this.mtx.lock_nothrow();
617             scope (exit)
618                 this.mtx.unlock_nothrow();
619             this.signal = signal;
620         }
621 
622         void kill() @trusted nothrow {
623             this.mtx.lock_nothrow();
624             scope (exit)
625                 this.mtx.unlock_nothrow();
626             p.kill(signal);
627         }
628     }
629 
630     private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow {
631         import std.algorithm : max, min;
632         import std.variant : Variant;
633         static import core.sys.posix.signal;
634 
635         const stopAt = Clock.currTime + timeout;
636         // the purpose is to poll the process often "enough" that if it
637         // terminates early `Process` detects it fast enough. 1000 is chosen
638         // because it "feels good". the purpose
639         auto sleepInterval = min(500, max(20, timeout.total!"msecs" / 1000)).dur!"msecs";
640 
641         bool forceStop;
642         bool running = true;
643         while (running && Clock.currTime < stopAt) {
644             const msg = bg.popMsg;
645 
646             final switch (msg) {
647             case Msg.none:
648                 () @trusted { Thread.sleep(sleepInterval); }();
649                 break;
650             case Msg.stop:
651                 forceStop = true;
652                 running = false;
653                 break;
654             case Msg.status:
655                 bg.setReply(Reply.running);
656                 break;
657             }
658 
659             () @trusted {
660                 if (core.sys.posix.signal.kill(p, 0) == -1) {
661                     running = false;
662                 }
663             }();
664         }
665 
666         // may be children alive thus must ensure that the whole process tree
667         // is killed if this is a sandbox with a timeout.
668         bg.kill();
669 
670         if (!forceStop && Clock.currTime >= stopAt) {
671             bg.setReply(Reply.killedByTimeout);
672         } else {
673             bg.setReply(Reply.normalDeath);
674         }
675     }
676 
677     RawPid osHandle() nothrow @trusted {
678         return rc.pid;
679     }
680 
681     static if (__traits(hasMember, ProcessT, "stdin")) {
682         ref FileWriteChannel stdin() nothrow @safe {
683             return rc.p.stdin;
684         }
685     }
686 
687     static if (__traits(hasMember, ProcessT, "stdout")) {
688         ref FileReadChannel stdout() nothrow @safe {
689             return rc.p.stdout;
690         }
691     }
692 
693     static if (__traits(hasMember, ProcessT, "stderr")) {
694         ref FileReadChannel stderr() nothrow @trusted {
695             return rc.p.stderr;
696         }
697     }
698 
699     void dispose() @trusted {
700         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
701             rc.background.put(Msg.stop);
702             rc.background.join;
703             rc.backgroundReply = rc.background.reply;
704         }
705         rc.p.dispose;
706     }
707 
708     /** Send `signal` to the process.
709      *
710      * Param:
711      *  signal = a signal from `core.sys.posix.signal`
712      */
713     void kill(int signal = SIGKILL) nothrow @trusted {
714         rc.background.setSignal(signal);
715         rc.background.kill();
716     }
717 
718     int wait() @trusted {
719         while (!this.tryWait) {
720             Thread.sleep(20.dur!"msecs");
721         }
722         return rc.p.wait;
723     }
724 
725     bool tryWait() @trusted {
726         return rc.p.tryWait;
727     }
728 
729     int status() @trusted {
730         return rc.p.status;
731     }
732 
733     bool terminated() @trusted {
734         return rc.p.terminated;
735     }
736 
737     bool timeoutTriggered() @trusted {
738         if (rc.backgroundReply.among(Reply.none, Reply.running)) {
739             rc.background.put(Msg.status);
740             rc.backgroundReply = rc.background.reply;
741         }
742         return rc.backgroundReply == Reply.killedByTimeout;
743     }
744 }
745 
746 auto timeout(T)(T p, Duration timeout_) @trusted {
747     return Timeout!T(p, timeout_);
748 }
749 
750 /// Returns when the process has pending data.
751 void waitForPendingData(ProcessT)(Process p) {
752     while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) {
753         Thread.sleep(20.dur!"msecs");
754     }
755 }
756 
757 @("shall kill the process after the timeout")
758 unittest {
759     import std.datetime.stopwatch : StopWatch, AutoStart;
760 
761     auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill;
762     auto sw = StopWatch(AutoStart.yes);
763     p.wait;
764     sw.stop;
765 
766     assert(sw.peek >= 100.dur!"msecs");
767     assert(sw.peek <= 500.dur!"msecs");
768     assert(p.wait == -9);
769     assert(p.terminated);
770     assert(p.status == -9);
771     assert(p.timeoutTriggered);
772 }
773 
774 struct DrainElement {
775     enum Type {
776         stdout,
777         stderr,
778     }
779 
780     Type type;
781     const(ubyte)[] data;
782 
783     /// Returns: iterates the data as an input range.
784     auto byUTF8() @safe pure nothrow const @nogc {
785         static import std.utf;
786 
787         return std.utf.byUTF!(const(char))(cast(const(char)[]) data);
788     }
789 
790     bool empty() @safe pure nothrow const @nogc {
791         return data.length == 0;
792     }
793 }
794 
795 /** A range that drains a process stdout/stderr until it terminates.
796  *
797  * There may be `DrainElement` that are empty.
798  */
799 struct DrainRange(ProcessT) {
800     private {
801         enum State {
802             start,
803             draining,
804             lastStdout,
805             lastStderr,
806             lastElement,
807             empty,
808         }
809 
810         ProcessT p;
811         DrainElement front_;
812         State st;
813         ubyte[] buf;
814     }
815 
816     this(ProcessT p) {
817         this.p = p;
818         this.buf = new ubyte[4096];
819     }
820 
821     DrainElement front() @safe pure nothrow const @nogc {
822         assert(!empty, "Can't get front of an empty range");
823         return front_;
824     }
825 
826     void popFront() @safe {
827         assert(!empty, "Can't pop front of an empty range");
828 
829         static bool isAnyPipeOpen(ref ProcessT p) {
830             return p.stdout.isOpen || p.stderr.isOpen;
831         }
832 
833         DrainElement readData(ref ProcessT p) @safe {
834             if (p.stderr.hasPendingData) {
835                 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf));
836             } else if (p.stdout.hasPendingData) {
837                 return DrainElement(DrainElement.Type.stdout, p.stdout.read(buf));
838             }
839             return DrainElement.init;
840         }
841 
842         DrainElement waitUntilData() @safe {
843             import std.datetime : Clock;
844 
845             // may livelock if the process never terminates and never writes to
846             // the terminal. timeout ensure that it sooner or later is break
847             // the loop. This is important if the drain is part of a timeout
848             // wrapping.
849 
850             const timeout = 100.dur!"msecs";
851             const stopAt = Clock.currTime + timeout;
852             const sleepFor = timeout / 20;
853             const useSleep = Clock.currTime + sleepFor;
854             bool running = true;
855             while (running) {
856                 const now = Clock.currTime;
857 
858                 running = (now < stopAt) && isAnyPipeOpen(p);
859 
860                 auto bufRead = readData(p);
861 
862                 if (!bufRead.empty) {
863                     return DrainElement(bufRead.type, bufRead.data.dup);
864                 } else if (running && now > useSleep && bufRead.empty) {
865                     import core.thread : Thread;
866 
867                     () @trusted { Thread.sleep(sleepFor); }();
868                 }
869             }
870 
871             return DrainElement.init;
872         }
873 
874         front_ = DrainElement.init;
875 
876         final switch (st) {
877         case State.start:
878             st = State.draining;
879             front_ = waitUntilData;
880             break;
881         case State.draining:
882             if (p.terminated) {
883                 st = State.lastStdout;
884             } else if (isAnyPipeOpen(p)) {
885                 front_ = waitUntilData();
886             } else {
887                 st = State.lastStdout;
888             }
889             break;
890         case State.lastStdout:
891             if (p.stdout.hasPendingData) {
892                 front_ = DrainElement(DrainElement.Type.stdout, p.stdout.read(buf).dup);
893             } else {
894                 st = State.lastStderr;
895             }
896             break;
897         case State.lastStderr:
898             if (p.stderr.hasPendingData) {
899                 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup);
900             } else {
901                 st = State.lastElement;
902             }
903             break;
904         case State.lastElement:
905             st = State.empty;
906             break;
907         case State.empty:
908             break;
909         }
910     }
911 
912     bool empty() @safe pure nothrow const @nogc {
913         return st == State.empty;
914     }
915 }
916 
917 /// Drain a process pipe until empty.
918 auto drain(T)(T p) {
919     return DrainRange!T(p);
920 }
921 
922 /// Read the data from a ReadChannel by line.
923 struct DrainByLineCopyRange(ProcessT) {
924     private {
925         enum State {
926             start,
927             draining,
928             lastLine,
929             lastBuf,
930             empty,
931         }
932 
933         ProcessT process;
934         DrainRange!ProcessT range;
935         State st;
936         const(ubyte)[] buf;
937         const(char)[] line;
938     }
939 
940     this(ProcessT p) {
941         process = p;
942         range = p.drain;
943     }
944 
945     string front() @trusted pure nothrow const @nogc {
946         import std.exception : assumeUnique;
947 
948         assert(!empty, "Can't get front of an empty range");
949         return line.assumeUnique;
950     }
951 
952     void popFront() @safe {
953         assert(!empty, "Can't pop front of an empty range");
954         import std.algorithm : countUntil;
955         import std.array : array;
956         static import std.utf;
957 
958         const(ubyte)[] updateBuf(size_t idx) {
959             const(ubyte)[] tmp;
960             if (buf.empty) {
961                 // do nothing
962             } else if (idx == -1) {
963                 tmp = buf;
964                 buf = null;
965             } else {
966                 idx = () {
967                     if (idx < buf.length) {
968                         return idx + 1;
969                     }
970                     return idx;
971                 }();
972                 tmp = buf[0 .. idx];
973                 buf = buf[idx .. $];
974             }
975 
976             if (!tmp.empty && tmp[$ - 1] == '\n') {
977                 tmp = tmp[0 .. $ - 1];
978             }
979             return tmp;
980         }
981 
982         void drainLine() {
983             void fillBuf() {
984                 if (!range.empty) {
985                     range.popFront;
986                 }
987                 if (!range.empty) {
988                     buf ~= range.front.data;
989                 }
990             }
991 
992             size_t idx;
993             () {
994                 int cnt;
995                 do {
996                     fillBuf();
997                     idx = buf.countUntil('\n');
998                     // 2 is a magic number which mean that it at most wait 2x timeout for data
999                 }
1000                 while (!range.empty && idx == -1 && cnt++ < 2);
1001             }();
1002 
1003             if (idx != -1) {
1004                 auto tmp = updateBuf(idx);
1005                 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
1006             }
1007         }
1008 
1009         bool lastLine() {
1010             size_t idx = buf.countUntil('\n');
1011             if (idx == -1)
1012                 return true;
1013 
1014             auto tmp = updateBuf(idx);
1015             line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array;
1016             return false;
1017         }
1018 
1019         line = null;
1020         final switch (st) {
1021         case State.start:
1022             drainLine;
1023             st = State.draining;
1024             break;
1025         case State.draining:
1026             drainLine;
1027             if (range.empty)
1028                 st = State.lastLine;
1029             break;
1030         case State.lastLine:
1031             if (lastLine)
1032                 st = State.lastBuf;
1033             break;
1034         case State.lastBuf:
1035             line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array;
1036             st = State.empty;
1037             break;
1038         case State.empty:
1039             break;
1040         }
1041     }
1042 
1043     bool empty() @safe pure nothrow const @nogc {
1044         return st == State.empty;
1045     }
1046 }
1047 
1048 @("shall drain the process output by line")
1049 unittest {
1050     import std.algorithm : filter, joiner, map;
1051     import std.array : array;
1052 
1053     auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill;
1054     auto res = p.process.drainByLineCopy.filter!"!a.empty".array;
1055 
1056     assert(res.length == 3);
1057     assert(res.joiner.count >= 30);
1058     assert(p.wait == 0);
1059     assert(p.terminated);
1060 }
1061 
1062 auto drainByLineCopy(T)(T p) {
1063     return DrainByLineCopyRange!T(p);
1064 }
1065 
1066 /// Drain the process output until it is done executing.
1067 auto drainToNull(T)(T p) {
1068     foreach (l; p.drain()) {
1069     }
1070     return p;
1071 }
1072 
1073 /// Drain the output from the process into an output range.
1074 auto drain(ProcessT, T)(ProcessT p, ref T range) {
1075     foreach (l; p.drain()) {
1076         range.put(l);
1077     }
1078     return p;
1079 }
1080 
1081 @("shall drain the output of a process while it is running with a separation of stdout and stderr")
1082 unittest {
1083     auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill;
1084     auto res = p.drain.array;
1085 
1086     // this is just a sanity check. It has to be kind a high because there is
1087     // some wiggleroom allowed
1088     assert(res.count <= 50);
1089 
1090     assert(res.filter!(a => a.type == DrainElement.Type.stdout)
1091             .map!(a => a.data)
1092             .joiner
1093             .count == 30);
1094     assert(res.filter!(a => a.type == DrainElement.Type.stderr).count == 0);
1095     assert(p.wait == 0);
1096     assert(p.terminated);
1097 }
1098 
1099 @("shall kill the process tree when the timeout is reached")
1100 unittest {
1101     immutable script = makeScript(`#!/bin/bash
1102 sleep 10m
1103 `);
1104     scope (exit)
1105         remove(script);
1106 
1107     auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill;
1108     waitUntilChildren(p.osHandle, 1);
1109     const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1110     const res = p.process.drain.array;
1111     const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length;
1112 
1113     assert(p.wait == -9);
1114     assert(p.terminated);
1115     assert(preChildren == 1);
1116     assert(postChildren == 0);
1117 }
1118 
1119 string makeScript(string script, string file = __FILE__, uint line = __LINE__) {
1120     import core.sys.posix.sys.stat;
1121     import std.file : getAttributes, setAttributes, thisExePath;
1122     import std.stdio : File;
1123     import std.path : baseName;
1124     import std.conv : to;
1125 
1126     immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh";
1127 
1128     File(fname, "w").writeln(script);
1129     setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH);
1130     return fname;
1131 }
1132 
1133 /// Wait for p to have num children or fail after 10s.
1134 void waitUntilChildren(RawPid p, int num) {
1135     import std.datetime : Clock;
1136 
1137     const failAt = Clock.currTime + 10.dur!"seconds";
1138     do {
1139         Thread.sleep(50.dur!"msecs");
1140         if (Clock.currTime > failAt)
1141             break;
1142     }
1143     while (makePidMap.getSubMap(p).remove(p).length < num);
1144 }