1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module proc; 7 8 import core.sys.posix.signal : SIGKILL; 9 import core.thread : Thread; 10 import core.time : dur, Duration; 11 import logger = std.experimental.logger; 12 import std.algorithm : filter, count, joiner, map; 13 import std.array : appender, empty, array; 14 import std.exception : collectException; 15 import std.stdio : File, fileno, writeln; 16 import std.typecons : Flag, Yes; 17 static import std.process; 18 static import std.stdio; 19 20 import my.gc.refc; 21 import my.from_; 22 23 public import proc.channel; 24 public import proc.pid; 25 public import proc.tty; 26 27 version (unittest) { 28 import std.file : remove; 29 } 30 31 /** Manage a process by reference counting so that it is terminated when the it 32 * stops being used such as the instance going out of scope. 33 */ 34 auto rcKill(T)(T p, int signal = SIGKILL) { 35 return refCounted(ScopeKill!T(p, signal)); 36 } 37 38 // backward compatibility. 39 alias scopeKill = rcKill; 40 41 struct ScopeKill(T) { 42 T process; 43 alias process this; 44 45 private int signal = SIGKILL; 46 private bool hasProcess; 47 48 this(T process, int signal) @safe { 49 this.process = process; 50 this.signal = signal; 51 this.hasProcess = true; 52 } 53 54 ~this() @safe { 55 if (hasProcess) 56 process.dispose(); 57 } 58 } 59 60 /// Async process wrapper for a std.process SpawnProcess 61 struct SpawnProcess { 62 import core.sys.posix.signal : SIGKILL; 63 import std.algorithm : among; 64 65 private { 66 enum State { 67 running, 68 terminated, 69 exitCode 70 } 71 72 std.process.Pid process; 73 RawPid pid; 74 int status_; 75 State st; 76 } 77 78 this(std.process.Pid process) @safe { 79 this.process = process; 80 this.pid = process.osHandle.RawPid; 81 } 82 83 ~this() @safe { 84 } 85 86 /// Returns: The raw OS handle for the process ID. 87 RawPid osHandle() nothrow @safe { 88 return pid; 89 } 90 91 /// Kill and cleanup the process. 92 void dispose() @safe { 93 final switch (st) { 94 case State.running: 95 this.kill; 96 this.wait; 97 break; 98 case State.terminated: 99 this.wait; 100 break; 101 case State.exitCode: 102 break; 103 } 104 105 st = State.exitCode; 106 } 107 108 /** Send `signal` to the process. 109 * 110 * Param: 111 * signal = a signal from `core.sys.posix.signal` 112 */ 113 void kill(int signal = SIGKILL) nothrow @trusted { 114 final switch (st) { 115 case State.running: 116 break; 117 case State.terminated: 118 goto case; 119 case State.exitCode: 120 return; 121 } 122 123 try { 124 std.process.kill(process, signal); 125 } catch (Exception e) { 126 } 127 128 st = State.terminated; 129 } 130 131 /// Blocking wait for the process to terminated. 132 /// Returns: the exit status. 133 int wait() @safe { 134 final switch (st) { 135 case State.running: 136 status_ = std.process.wait(process); 137 break; 138 case State.terminated: 139 status_ = std.process.wait(process); 140 break; 141 case State.exitCode: 142 break; 143 } 144 145 st = State.exitCode; 146 147 return status_; 148 } 149 150 /// Non-blocking wait for the process termination. 151 /// Returns: `true` if the process has terminated. 152 bool tryWait() @safe { 153 final switch (st) { 154 case State.running: 155 auto s = std.process.tryWait(process); 156 if (s.terminated) { 157 st = State.exitCode; 158 status_ = s.status; 159 } 160 break; 161 case State.terminated: 162 status_ = std.process.wait(process); 163 st = State.exitCode; 164 break; 165 case State.exitCode: 166 break; 167 } 168 169 return st.among(State.terminated, State.exitCode) != 0; 170 } 171 172 /// Returns: The exit status of the process. 173 int status() @safe { 174 if (st != State.exitCode) { 175 throw new Exception( 176 "Process has not terminated and wait/tryWait been called to collect the exit status"); 177 } 178 return status_; 179 } 180 181 /// Returns: If the process has terminated. 182 bool terminated() @safe { 183 return st.among(State.terminated, State.exitCode) != 0; 184 } 185 } 186 187 /// Async process that do not block on read from stdin/stderr. 188 struct PipeProcess { 189 import std.algorithm : among; 190 import core.sys.posix.signal : SIGKILL; 191 192 private { 193 enum State { 194 running, 195 terminated, 196 exitCode 197 } 198 199 std.process.ProcessPipes process; 200 std.process.Pid pid; 201 202 FileReadChannel stderr_; 203 FileReadChannel stdout_; 204 FileWriteChannel stdin_; 205 int status_; 206 State st; 207 } 208 209 this(std.process.Pid pid, File stdin, File stdout, File stderr) @safe { 210 this.pid = pid; 211 212 this.stdin_ = FileWriteChannel(stdin); 213 this.stdout_ = FileReadChannel(stdout); 214 this.stderr_ = FileReadChannel(stderr); 215 } 216 217 this(std.process.ProcessPipes process, std.process.Redirect r) @safe { 218 this.process = process; 219 this.pid = process.pid; 220 221 if (r & std.process.Redirect.stdin) { 222 stdin_ = FileWriteChannel(this.process.stdin); 223 } 224 if (r & std.process.Redirect.stdout) { 225 stdout_ = FileReadChannel(this.process.stdout); 226 } 227 if (r & std.process.Redirect.stderr) { 228 this.stderr_ = FileReadChannel(this.process.stderr); 229 } 230 } 231 232 /// Returns: The raw OS handle for the process ID. 233 RawPid osHandle() nothrow @safe { 234 return pid.osHandle.RawPid; 235 } 236 237 /// Access to stdout. 238 ref FileWriteChannel stdin() return scope nothrow @safe { 239 return stdin_; 240 } 241 242 /// Access to stdout. 243 ref FileReadChannel stdout() return scope nothrow @safe { 244 return stdout_; 245 } 246 247 /// Access stderr. 248 ref FileReadChannel stderr() return scope nothrow @safe { 249 return stderr_; 250 } 251 252 /// Kill and cleanup the process. 253 void dispose() @safe { 254 final switch (st) { 255 case State.running: 256 this.kill; 257 this.wait; 258 .destroy(process); 259 break; 260 case State.terminated: 261 this.wait; 262 .destroy(process); 263 break; 264 case State.exitCode: 265 break; 266 } 267 268 st = State.exitCode; 269 } 270 271 /** Send `signal` to the process. 272 * 273 * Param: 274 * signal = a signal from `core.sys.posix.signal` 275 */ 276 void kill(int signal = SIGKILL) nothrow @trusted { 277 final switch (st) { 278 case State.running: 279 break; 280 case State.terminated: 281 return; 282 case State.exitCode: 283 return; 284 } 285 286 try { 287 std.process.kill(pid, signal); 288 } catch (Exception e) { 289 } 290 291 st = State.terminated; 292 } 293 294 /// Blocking wait for the process to terminated. 295 /// Returns: the exit status. 296 int wait() @safe { 297 final switch (st) { 298 case State.running: 299 status_ = std.process.wait(pid); 300 break; 301 case State.terminated: 302 status_ = std.process.wait(pid); 303 break; 304 case State.exitCode: 305 break; 306 } 307 308 st = State.exitCode; 309 310 return status_; 311 } 312 313 /// Non-blocking wait for the process termination. 314 /// Returns: `true` if the process has terminated. 315 bool tryWait() @safe { 316 final switch (st) { 317 case State.running: 318 auto s = std.process.tryWait(pid); 319 if (s.terminated) { 320 st = State.exitCode; 321 status_ = s.status; 322 } 323 break; 324 case State.terminated: 325 status_ = std.process.wait(pid); 326 st = State.exitCode; 327 break; 328 case State.exitCode: 329 break; 330 } 331 332 return st.among(State.terminated, State.exitCode) != 0; 333 } 334 335 /// Returns: The exit status of the process. 336 int status() @safe { 337 if (st != State.exitCode) { 338 throw new Exception( 339 "Process has not terminated and wait/tryWait been called to collect the exit status"); 340 } 341 return status_; 342 } 343 344 /// Returns: If the process has terminated. 345 bool terminated() @safe { 346 return st.among(State.terminated, State.exitCode) != 0; 347 } 348 } 349 350 SpawnProcess spawnProcess(scope const(char[])[] args, File stdin = std.stdio.stdin, 351 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 352 const string[string] env = null, std.process.Config config = std.process.Config.none, 353 scope const char[] workDir = null) { 354 return SpawnProcess(std.process.spawnProcess(args, stdin, stdout, stderr, 355 env, config, workDir)); 356 } 357 358 SpawnProcess spawnProcess(scope const(char[])[] args, const string[string] env, 359 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 360 return SpawnProcess(std.process.spawnProcess(args, std.stdio.stdin, 361 std.stdio.stdout, std.stdio.stderr, env, config, workDir)); 362 } 363 364 SpawnProcess spawnProcess(scope const(char)[] program, 365 File stdin = std.stdio.stdin, File stdout = std.stdio.stdout, 366 File stderr = std.stdio.stderr, const string[string] env = null, 367 std.process.Config config = std.process.Config.none, scope const(char)[] workDir = null) { 368 return SpawnProcess(std.process.spawnProcess((&program)[0 .. 1], stdin, 369 stdout, stderr, env, config, workDir)); 370 } 371 372 SpawnProcess spawnShell(scope const(char)[] command, File stdin = std.stdio.stdin, 373 File stdout = std.stdio.stdout, File stderr = std.stdio.stderr, 374 scope const string[string] env = null, std.process.Config config = std.process.Config.none, 375 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 376 return SpawnProcess(std.process.spawnShell(command, stdin, stdout, stderr, 377 env, config, workDir, shellPath)); 378 } 379 380 /// ditto 381 SpawnProcess spawnShell(scope const(char)[] command, scope const string[string] env, 382 std.process.Config config = std.process.Config.none, 383 scope const(char)[] workDir = null, scope string shellPath = std.process.nativeShell) { 384 return SpawnProcess(std.process.spawnShell(command, env, config, workDir, shellPath)); 385 } 386 387 PipeProcess pipeProcess(scope const(char[])[] args, 388 std.process.Redirect redirect = std.process.Redirect.all, 389 const string[string] env = null, std.process.Config config = std.process.Config.none, 390 scope const(char)[] workDir = null) @safe { 391 return PipeProcess(std.process.pipeProcess(args, redirect, env, config, workDir), redirect); 392 } 393 394 PipeProcess pipeShell(scope const(char)[] command, 395 std.process.Redirect redirect = std.process.Redirect.all, 396 const string[string] env = null, std.process.Config config = std.process.Config.none, 397 scope const(char)[] workDir = null, string shellPath = std.process.nativeShell) @safe { 398 return PipeProcess(std.process.pipeShell(command, redirect, env, config, 399 workDir, shellPath), redirect); 400 } 401 402 /** Moves the process to a separate process group and on exit kill it and all 403 * its children. 404 */ 405 @safe struct Sandbox(ProcessT) { 406 import core.sys.posix.signal : SIGKILL; 407 408 private { 409 ProcessT p; 410 RawPid pid; 411 } 412 413 this(ProcessT p) @safe { 414 import core.sys.posix.unistd : setpgid; 415 416 this.p = p; 417 this.pid = p.osHandle; 418 setpgid(pid, 0); 419 } 420 421 RawPid osHandle() nothrow @safe { 422 return pid; 423 } 424 425 static if (__traits(hasMember, ProcessT, "stdin")) { 426 ref FileWriteChannel stdin() nothrow @safe { 427 return p.stdin; 428 } 429 } 430 431 static if (__traits(hasMember, ProcessT, "stdout")) { 432 ref FileReadChannel stdout() nothrow @safe { 433 return p.stdout; 434 } 435 } 436 437 static if (__traits(hasMember, ProcessT, "stderr")) { 438 ref FileReadChannel stderr() nothrow @safe { 439 return p.stderr; 440 } 441 } 442 443 void dispose() @safe { 444 // this also reaps the children thus cleaning up zombies 445 this.kill; 446 p.dispose; 447 } 448 449 /** Send `signal` to the process. 450 * 451 * Param: 452 * signal = a signal from `core.sys.posix.signal` 453 */ 454 void kill(int signal = SIGKILL) nothrow @safe { 455 // must first retrieve the submap because after the process is killed 456 // its children may have changed. 457 auto pmap = makePidMap.getSubMap(pid); 458 459 p.kill(signal); 460 461 // only kill and reap the children 462 pmap.remove(pid); 463 proc.pid.kill(pmap, Yes.onlyCurrentUser, signal).reap; 464 } 465 466 int wait() @safe { 467 return p.wait; 468 } 469 470 bool tryWait() @safe { 471 return p.tryWait; 472 } 473 474 int status() @safe { 475 return p.status; 476 } 477 478 bool terminated() @safe { 479 return p.terminated; 480 } 481 } 482 483 auto sandbox(T)(T p) @safe { 484 return Sandbox!T(p); 485 } 486 487 @("shall terminate a group of processes") 488 unittest { 489 import std.datetime.stopwatch : StopWatch, AutoStart; 490 491 immutable scriptName = makeScript(`#!/bin/bash 492 sleep 10m & 493 sleep 10m & 494 sleep 10m 495 `); 496 scope (exit) 497 remove(scriptName); 498 499 auto p = pipeProcess([scriptName]).sandbox.rcKill; 500 waitUntilChildren(p.osHandle, 3); 501 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 502 p.kill; 503 Thread.sleep(500.dur!"msecs"); // wait for the OS to kill the children 504 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 505 506 assert(p.wait == -9); 507 assert(p.terminated); 508 assert(preChildren == 3); 509 assert(postChildren == 0); 510 } 511 512 /** dispose the process after the timeout. 513 */ 514 @safe struct Timeout(ProcessT) { 515 import core.sys.posix.signal : SIGKILL; 516 import core.thread; 517 import std.algorithm : among; 518 import std.datetime : Clock, Duration; 519 520 private { 521 enum Msg { 522 none, 523 stop, 524 status, 525 } 526 527 enum Reply { 528 none, 529 running, 530 normalDeath, 531 killedByTimeout, 532 } 533 534 static struct Payload { 535 ProcessT p; 536 RawPid pid; 537 Background background; 538 Reply backgroundReply; 539 } 540 541 RefCounted!Payload rc; 542 } 543 544 this(ProcessT p, Duration timeout) @trusted { 545 import std.algorithm : move; 546 547 auto pid = p.osHandle; 548 rc = refCounted(Payload(move(p), pid)); 549 rc.background = new Background(&rc.p, timeout); 550 rc.background.isDaemon = true; 551 rc.background.start; 552 } 553 554 ~this() @trusted { 555 rc.release; 556 } 557 558 private static class Background : Thread { 559 import core.sync.condition : Condition; 560 import core.sync.mutex : Mutex; 561 562 Duration timeout; 563 ProcessT* p; 564 Mutex mtx; 565 Msg[] msg; 566 Reply reply_; 567 RawPid pid; 568 int signal = SIGKILL; 569 570 this(ProcessT* p, Duration timeout) { 571 this.p = p; 572 this.timeout = timeout; 573 this.mtx = new Mutex(); 574 this.pid = p.osHandle; 575 576 super(&run); 577 } 578 579 void run() { 580 checkProcess(this.pid, this.timeout, this); 581 } 582 583 void put(Msg msg) @trusted nothrow { 584 this.mtx.lock_nothrow(); 585 scope (exit) 586 this.mtx.unlock_nothrow(); 587 this.msg ~= msg; 588 } 589 590 Msg popMsg() @trusted nothrow { 591 this.mtx.lock_nothrow(); 592 scope (exit) 593 this.mtx.unlock_nothrow(); 594 if (msg.empty) 595 return Msg.none; 596 auto rval = msg[$ - 1]; 597 msg = msg[0 .. $ - 1]; 598 return rval; 599 } 600 601 void setReply(Reply reply_) @trusted nothrow { 602 this.mtx.lock_nothrow(); 603 scope (exit) 604 this.mtx.unlock_nothrow(); 605 this.reply_ = reply_; 606 } 607 608 Reply reply() @trusted nothrow { 609 this.mtx.lock_nothrow(); 610 scope (exit) 611 this.mtx.unlock_nothrow(); 612 return reply_; 613 } 614 615 void setSignal(int signal) @trusted nothrow { 616 this.mtx.lock_nothrow(); 617 scope (exit) 618 this.mtx.unlock_nothrow(); 619 this.signal = signal; 620 } 621 622 void kill() @trusted nothrow { 623 this.mtx.lock_nothrow(); 624 scope (exit) 625 this.mtx.unlock_nothrow(); 626 p.kill(signal); 627 } 628 } 629 630 private static void checkProcess(RawPid p, Duration timeout, Background bg) nothrow { 631 import std.algorithm : max, min; 632 import std.variant : Variant; 633 static import core.sys.posix.signal; 634 635 const stopAt = Clock.currTime + timeout; 636 // the purpose is to poll the process often "enough" that if it 637 // terminates early `Process` detects it fast enough. 1000 is chosen 638 // because it "feels good". the purpose 639 auto sleepInterval = min(500, max(20, timeout.total!"msecs" / 1000)).dur!"msecs"; 640 641 bool forceStop; 642 bool running = true; 643 while (running && Clock.currTime < stopAt) { 644 const msg = bg.popMsg; 645 646 final switch (msg) { 647 case Msg.none: 648 () @trusted { Thread.sleep(sleepInterval); }(); 649 break; 650 case Msg.stop: 651 forceStop = true; 652 running = false; 653 break; 654 case Msg.status: 655 bg.setReply(Reply.running); 656 break; 657 } 658 659 () @trusted { 660 if (core.sys.posix.signal.kill(p, 0) == -1) { 661 running = false; 662 } 663 }(); 664 } 665 666 // may be children alive thus must ensure that the whole process tree 667 // is killed if this is a sandbox with a timeout. 668 bg.kill(); 669 670 if (!forceStop && Clock.currTime >= stopAt) { 671 bg.setReply(Reply.killedByTimeout); 672 } else { 673 bg.setReply(Reply.normalDeath); 674 } 675 } 676 677 RawPid osHandle() nothrow @trusted { 678 return rc.pid; 679 } 680 681 static if (__traits(hasMember, ProcessT, "stdin")) { 682 ref FileWriteChannel stdin() nothrow @safe { 683 return rc.p.stdin; 684 } 685 } 686 687 static if (__traits(hasMember, ProcessT, "stdout")) { 688 ref FileReadChannel stdout() nothrow @safe { 689 return rc.p.stdout; 690 } 691 } 692 693 static if (__traits(hasMember, ProcessT, "stderr")) { 694 ref FileReadChannel stderr() nothrow @trusted { 695 return rc.p.stderr; 696 } 697 } 698 699 void dispose() @trusted { 700 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 701 rc.background.put(Msg.stop); 702 rc.background.join; 703 rc.backgroundReply = rc.background.reply; 704 } 705 rc.p.dispose; 706 } 707 708 /** Send `signal` to the process. 709 * 710 * Param: 711 * signal = a signal from `core.sys.posix.signal` 712 */ 713 void kill(int signal = SIGKILL) nothrow @trusted { 714 rc.background.setSignal(signal); 715 rc.background.kill(); 716 } 717 718 int wait() @trusted { 719 while (!this.tryWait) { 720 Thread.sleep(20.dur!"msecs"); 721 } 722 return rc.p.wait; 723 } 724 725 bool tryWait() @trusted { 726 return rc.p.tryWait; 727 } 728 729 int status() @trusted { 730 return rc.p.status; 731 } 732 733 bool terminated() @trusted { 734 return rc.p.terminated; 735 } 736 737 bool timeoutTriggered() @trusted { 738 if (rc.backgroundReply.among(Reply.none, Reply.running)) { 739 rc.background.put(Msg.status); 740 rc.backgroundReply = rc.background.reply; 741 } 742 return rc.backgroundReply == Reply.killedByTimeout; 743 } 744 } 745 746 auto timeout(T)(T p, Duration timeout_) @trusted { 747 return Timeout!T(p, timeout_); 748 } 749 750 /// Returns when the process has pending data. 751 void waitForPendingData(ProcessT)(Process p) { 752 while (!p.pipe.hasPendingData || !p.stderr.hasPendingData) { 753 Thread.sleep(20.dur!"msecs"); 754 } 755 } 756 757 @("shall kill the process after the timeout") 758 unittest { 759 import std.datetime.stopwatch : StopWatch, AutoStart; 760 761 auto p = pipeProcess(["sleep", "1m"]).timeout(100.dur!"msecs").rcKill; 762 auto sw = StopWatch(AutoStart.yes); 763 p.wait; 764 sw.stop; 765 766 assert(sw.peek >= 100.dur!"msecs"); 767 assert(sw.peek <= 500.dur!"msecs"); 768 assert(p.wait == -9); 769 assert(p.terminated); 770 assert(p.status == -9); 771 assert(p.timeoutTriggered); 772 } 773 774 struct DrainElement { 775 enum Type { 776 stdout, 777 stderr, 778 } 779 780 Type type; 781 const(ubyte)[] data; 782 783 /// Returns: iterates the data as an input range. 784 auto byUTF8() @safe pure nothrow const @nogc { 785 static import std.utf; 786 787 return std.utf.byUTF!(const(char))(cast(const(char)[]) data); 788 } 789 790 bool empty() @safe pure nothrow const @nogc { 791 return data.length == 0; 792 } 793 } 794 795 /** A range that drains a process stdout/stderr until it terminates. 796 * 797 * There may be `DrainElement` that are empty. 798 */ 799 struct DrainRange(ProcessT) { 800 private { 801 enum State { 802 start, 803 draining, 804 lastStdout, 805 lastStderr, 806 lastElement, 807 empty, 808 } 809 810 ProcessT p; 811 DrainElement front_; 812 State st; 813 ubyte[] buf; 814 } 815 816 this(ProcessT p) { 817 this.p = p; 818 this.buf = new ubyte[4096]; 819 } 820 821 DrainElement front() @safe pure nothrow const @nogc { 822 assert(!empty, "Can't get front of an empty range"); 823 return front_; 824 } 825 826 void popFront() @safe { 827 assert(!empty, "Can't pop front of an empty range"); 828 829 static bool isAnyPipeOpen(ref ProcessT p) { 830 return p.stdout.isOpen || p.stderr.isOpen; 831 } 832 833 DrainElement readData(ref ProcessT p) @safe { 834 if (p.stderr.hasPendingData) { 835 return DrainElement(DrainElement.Type.stderr, p.stderr.read(buf)); 836 } else if (p.stdout.hasPendingData) { 837 return DrainElement(DrainElement.Type.stdout, p.stdout.read(buf)); 838 } 839 return DrainElement.init; 840 } 841 842 DrainElement waitUntilData() @safe { 843 import std.datetime : Clock; 844 845 // may livelock if the process never terminates and never writes to 846 // the terminal. timeout ensure that it sooner or later is break 847 // the loop. This is important if the drain is part of a timeout 848 // wrapping. 849 850 const timeout = 100.dur!"msecs"; 851 const stopAt = Clock.currTime + timeout; 852 const sleepFor = timeout / 20; 853 const useSleep = Clock.currTime + sleepFor; 854 bool running = true; 855 while (running) { 856 const now = Clock.currTime; 857 858 running = (now < stopAt) && isAnyPipeOpen(p); 859 860 auto bufRead = readData(p); 861 862 if (!bufRead.empty) { 863 return DrainElement(bufRead.type, bufRead.data.dup); 864 } else if (running && now > useSleep && bufRead.empty) { 865 import core.thread : Thread; 866 867 () @trusted { Thread.sleep(sleepFor); }(); 868 } 869 } 870 871 return DrainElement.init; 872 } 873 874 front_ = DrainElement.init; 875 876 final switch (st) { 877 case State.start: 878 st = State.draining; 879 front_ = waitUntilData; 880 break; 881 case State.draining: 882 if (p.terminated) { 883 st = State.lastStdout; 884 } else if (isAnyPipeOpen(p)) { 885 front_ = waitUntilData(); 886 } else { 887 st = State.lastStdout; 888 } 889 break; 890 case State.lastStdout: 891 if (p.stdout.hasPendingData) { 892 front_ = DrainElement(DrainElement.Type.stdout, p.stdout.read(buf).dup); 893 } else { 894 st = State.lastStderr; 895 } 896 break; 897 case State.lastStderr: 898 if (p.stderr.hasPendingData) { 899 front_ = DrainElement(DrainElement.Type.stderr, p.stderr.read(buf).dup); 900 } else { 901 st = State.lastElement; 902 } 903 break; 904 case State.lastElement: 905 st = State.empty; 906 break; 907 case State.empty: 908 break; 909 } 910 } 911 912 bool empty() @safe pure nothrow const @nogc { 913 return st == State.empty; 914 } 915 } 916 917 /// Drain a process pipe until empty. 918 auto drain(T)(T p) { 919 return DrainRange!T(p); 920 } 921 922 /// Read the data from a ReadChannel by line. 923 struct DrainByLineCopyRange(ProcessT) { 924 private { 925 enum State { 926 start, 927 draining, 928 lastLine, 929 lastBuf, 930 empty, 931 } 932 933 ProcessT process; 934 DrainRange!ProcessT range; 935 State st; 936 const(ubyte)[] buf; 937 const(char)[] line; 938 } 939 940 this(ProcessT p) { 941 process = p; 942 range = p.drain; 943 } 944 945 string front() @trusted pure nothrow const @nogc { 946 import std.exception : assumeUnique; 947 948 assert(!empty, "Can't get front of an empty range"); 949 return line.assumeUnique; 950 } 951 952 void popFront() @safe { 953 assert(!empty, "Can't pop front of an empty range"); 954 import std.algorithm : countUntil; 955 import std.array : array; 956 static import std.utf; 957 958 const(ubyte)[] updateBuf(size_t idx) { 959 const(ubyte)[] tmp; 960 if (buf.empty) { 961 // do nothing 962 } else if (idx == -1) { 963 tmp = buf; 964 buf = null; 965 } else { 966 idx = () { 967 if (idx < buf.length) { 968 return idx + 1; 969 } 970 return idx; 971 }(); 972 tmp = buf[0 .. idx]; 973 buf = buf[idx .. $]; 974 } 975 976 if (!tmp.empty && tmp[$ - 1] == '\n') { 977 tmp = tmp[0 .. $ - 1]; 978 } 979 return tmp; 980 } 981 982 void drainLine() { 983 void fillBuf() { 984 if (!range.empty) { 985 range.popFront; 986 } 987 if (!range.empty) { 988 buf ~= range.front.data; 989 } 990 } 991 992 size_t idx; 993 () { 994 int cnt; 995 do { 996 fillBuf(); 997 idx = buf.countUntil('\n'); 998 // 2 is a magic number which mean that it at most wait 2x timeout for data 999 } 1000 while (!range.empty && idx == -1 && cnt++ < 2); 1001 }(); 1002 1003 if (idx != -1) { 1004 auto tmp = updateBuf(idx); 1005 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 1006 } 1007 } 1008 1009 bool lastLine() { 1010 size_t idx = buf.countUntil('\n'); 1011 if (idx == -1) 1012 return true; 1013 1014 auto tmp = updateBuf(idx); 1015 line = std.utf.byUTF!(const(char))(cast(const(char)[]) tmp).array; 1016 return false; 1017 } 1018 1019 line = null; 1020 final switch (st) { 1021 case State.start: 1022 drainLine; 1023 st = State.draining; 1024 break; 1025 case State.draining: 1026 drainLine; 1027 if (range.empty) 1028 st = State.lastLine; 1029 break; 1030 case State.lastLine: 1031 if (lastLine) 1032 st = State.lastBuf; 1033 break; 1034 case State.lastBuf: 1035 line = std.utf.byUTF!(const(char))(cast(const(char)[]) buf).array; 1036 st = State.empty; 1037 break; 1038 case State.empty: 1039 break; 1040 } 1041 } 1042 1043 bool empty() @safe pure nothrow const @nogc { 1044 return st == State.empty; 1045 } 1046 } 1047 1048 @("shall drain the process output by line") 1049 unittest { 1050 import std.algorithm : filter, joiner, map; 1051 import std.array : array; 1052 1053 auto p = pipeProcess(["dd", "if=/dev/zero", "bs=10", "count=3"]).rcKill; 1054 auto res = p.process.drainByLineCopy.filter!"!a.empty".array; 1055 1056 assert(res.length == 3); 1057 assert(res.joiner.count >= 30); 1058 assert(p.wait == 0); 1059 assert(p.terminated); 1060 } 1061 1062 auto drainByLineCopy(T)(T p) { 1063 return DrainByLineCopyRange!T(p); 1064 } 1065 1066 /// Drain the process output until it is done executing. 1067 auto drainToNull(T)(T p) { 1068 foreach (l; p.drain()) { 1069 } 1070 return p; 1071 } 1072 1073 /// Drain the output from the process into an output range. 1074 auto drain(ProcessT, T)(ProcessT p, ref T range) { 1075 foreach (l; p.drain()) { 1076 range.put(l); 1077 } 1078 return p; 1079 } 1080 1081 @("shall drain the output of a process while it is running with a separation of stdout and stderr") 1082 unittest { 1083 auto p = pipeProcess(["dd", "if=/dev/urandom", "bs=10", "count=3"]).rcKill; 1084 auto res = p.drain.array; 1085 1086 // this is just a sanity check. It has to be kind a high because there is 1087 // some wiggleroom allowed 1088 assert(res.count <= 50); 1089 1090 assert(res.filter!(a => a.type == DrainElement.Type.stdout) 1091 .map!(a => a.data) 1092 .joiner 1093 .count == 30); 1094 assert(res.filter!(a => a.type == DrainElement.Type.stderr).count == 0); 1095 assert(p.wait == 0); 1096 assert(p.terminated); 1097 } 1098 1099 @("shall kill the process tree when the timeout is reached") 1100 unittest { 1101 immutable script = makeScript(`#!/bin/bash 1102 sleep 10m 1103 `); 1104 scope (exit) 1105 remove(script); 1106 1107 auto p = pipeProcess([script]).sandbox.timeout(1.dur!"seconds").rcKill; 1108 waitUntilChildren(p.osHandle, 1); 1109 const preChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1110 const res = p.process.drain.array; 1111 const postChildren = makePidMap.getSubMap(p.osHandle).remove(p.osHandle).length; 1112 1113 assert(p.wait == -9); 1114 assert(p.terminated); 1115 assert(preChildren == 1); 1116 assert(postChildren == 0); 1117 } 1118 1119 string makeScript(string script, string file = __FILE__, uint line = __LINE__) { 1120 import core.sys.posix.sys.stat; 1121 import std.file : getAttributes, setAttributes, thisExePath; 1122 import std.stdio : File; 1123 import std.path : baseName; 1124 import std.conv : to; 1125 1126 immutable fname = thisExePath ~ "_" ~ file.baseName ~ line.to!string ~ ".sh"; 1127 1128 File(fname, "w").writeln(script); 1129 setAttributes(fname, getAttributes(fname) | S_IXUSR | S_IXGRP | S_IXOTH); 1130 return fname; 1131 } 1132 1133 /// Wait for p to have num children or fail after 10s. 1134 void waitUntilChildren(RawPid p, int num) { 1135 import std.datetime : Clock; 1136 1137 const failAt = Clock.currTime + 10.dur!"seconds"; 1138 do { 1139 Thread.sleep(50.dur!"msecs"); 1140 if (Clock.currTime > failAt) 1141 break; 1142 } 1143 while (makePidMap.getSubMap(p).remove(p).length < num); 1144 }