1 /** 2 Copyright: Copyright (c) 2018, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 Methods prefixed with `cli_` are strongly related to user commands. 7 They more or less fully implement a command line interface command. 8 */ 9 module distssh.main_; 10 11 import core.time : Duration; 12 import std.algorithm : splitter, map, filter, joiner; 13 import std.exception : collectException; 14 import std.stdio : File; 15 import std.typecons : Nullable, NullableRef; 16 import logger = std.experimental.logger; 17 18 import distssh.from; 19 20 static import std.getopt; 21 22 immutable globalEnvHostKey = "DISTSSH_HOSTS"; 23 immutable globalEnvFileKey = "DISTSSH_IMPORT_ENV"; 24 immutable globalEnvFilterKey = "DISTSSH_ENV_EXPORT_FILTER"; 25 immutable distShell = "distshell"; 26 immutable distCmd = "distcmd"; 27 immutable distsshEnvExport = "distssh_env.export"; 28 immutable ulong defaultTimeout_s = 2; 29 30 int rmain(string[] args) nothrow { 31 try { 32 import distssh.app_logger; 33 34 auto simple_logger = new SimpleLogger(); 35 logger.sharedLog(simple_logger); 36 } 37 catch (Exception e) { 38 logger.warning(e.msg).collectException; 39 } 40 41 Options opts; 42 try { 43 opts = parseUserArgs(args); 44 } 45 catch (Exception e) { 46 logger.error(e.msg).collectException; 47 return 1; 48 } 49 50 try { 51 if (opts.verbose) { 52 logger.globalLogLevel(logger.LogLevel.all); 53 } else { 54 logger.globalLogLevel(logger.LogLevel.warning); 55 } 56 } 57 catch (Exception e) { 58 logger.warning(e.msg).collectException; 59 } 60 61 if (opts.help) { 62 printHelp(opts); 63 return 0; 64 } 65 66 return appMain(opts); 67 } 68 69 int appMain(const Options opts) nothrow { 70 final switch (opts.mode) with (Options.Mode) { 71 case exportEnv: 72 return cli_exportEnv(opts); 73 case install: 74 import std.file : symlink; 75 76 return cli_install(opts, (string src, string dst) => symlink(src, dst)); 77 case shell: 78 return cli_shell(opts); 79 case localShell: 80 return cli_localShell(opts); 81 case cmd: 82 return cli_cmd(opts); 83 case importEnvCmd: 84 return cli_cmdWithImportedEnv(opts); 85 case measureHosts: 86 return cli_measureHosts(opts); 87 case localLoad: 88 import std.stdio : writeln; 89 90 return cli_localLoad((string s) => writeln(s)); 91 case runOnAll: 92 return cli_runOnAll(opts); 93 } 94 } 95 96 private: 97 98 /** Export the environment to a file for later import. 99 100 */ 101 int cli_exportEnv(const Options opts) nothrow { 102 try { 103 writeEnv(opts.importEnv, cloneEnv); 104 logger.info("Exported environment to ", opts.importEnv); 105 } 106 catch (Exception e) { 107 logger.error(e.msg).collectException; 108 return 1; 109 } 110 111 return 0; 112 } 113 114 unittest { 115 import std.conv : to; 116 import std..string : toStringz; 117 import std.file; 118 119 immutable remove_me = "remove_me.export"; 120 scope (exit) 121 remove(remove_me); 122 123 auto opts = parseUserArgs(["distssh", "--export-env", "--export-env-file", remove_me]); 124 125 // make sure there are at least one environment variable 126 import core.sys.posix.stdlib : putenv, unsetenv; 127 128 const env_key = "DISTSSH_ENV_TEST"; 129 const env_var = (env_key ~ "=" ~ remove_me).toStringz; 130 putenv(cast(char*) env_var); 131 scope (exit) 132 unsetenv(cast(char*) env_key.ptr); 133 134 // shall export the environment to the file 135 void verify1() { 136 // test normal export 137 cli_exportEnv(opts); 138 139 auto env = readEnv(remove_me); 140 assert(!env.filter!(a => a.key == env_key).empty, env.to!string); 141 } 142 143 verify1; 144 145 // shall filter out specified env before exporting to the file 146 const env_filter_var = (globalEnvFilterKey ~ "=DISTSSH_ENV_TEST;junk ").toStringz; 147 putenv(cast(char*) env_filter_var); 148 scope (exit) 149 unsetenv(cast(char*) globalEnvFilterKey.ptr); 150 151 void verify2() { 152 cli_exportEnv(opts); 153 154 auto env = readEnv(remove_me); 155 assert(env.filter!(a => a.key == env_key).empty, env.to!string); 156 } 157 158 verify2; 159 } 160 161 int cli_install(const Options opts, void delegate(string src, string dst) symlink) nothrow { 162 import std.path : buildPath; 163 164 try { 165 symlink(opts.selfBinary, buildPath(opts.selfDir, distShell)); 166 symlink(opts.selfBinary, buildPath(opts.selfDir, distCmd)); 167 return 0; 168 } 169 catch (Exception e) { 170 logger.error(e.msg).collectException; 171 return 1; 172 } 173 } 174 175 @("shall create symlinks to self") 176 unittest { 177 string[2][] symlinks; 178 void fakeSymlink(string src, string dst) { 179 string[2] v = [src, dst]; 180 symlinks ~= v; 181 } 182 183 Options opts; 184 opts.selfBinary = "/foo/src"; 185 opts.selfDir = "/bar"; 186 187 cli_install(opts, &fakeSymlink); 188 189 assert(symlinks[0] == ["/foo/src", "/bar/distshell"]); 190 assert(symlinks[1] == ["/foo/src", "/bar/distcmd"]); 191 } 192 193 int cli_shell(const Options opts) nothrow { 194 import std.datetime.stopwatch : StopWatch, AutoStart; 195 import std.file : thisExePath, getcwd; 196 import std.process : spawnProcess, wait; 197 import std.stdio : writeln, writefln; 198 199 auto hosts = RemoteHostCache.make(opts.timeout); 200 hosts.sortByLoad; 201 202 if (hosts.empty) { 203 logger.errorf("No remote host online").collectException; 204 } 205 206 const timout_until_considered_successfull_connection = opts.timeout * 2; 207 208 while (!hosts.empty) { 209 auto host = hosts.randomAndPop; 210 211 try { 212 if (host.isNull) { 213 logger.error("No remote host online"); 214 return 1; 215 } 216 217 writeln("Connecting to ", host); 218 219 auto sw = StopWatch(AutoStart.yes); 220 221 // two -t forces a tty to be created and used which mean that the remote shell will *think* it is an interactive shell 222 auto exit_status = spawnProcess(["ssh", "-q", "-t", "-t", "-oStrictHostKeyChecking=no", 223 host, thisExePath, "--local-shell", "--workdir", getcwd]).wait; 224 225 // #SPC-fallback_remote_host 226 if (exit_status == 0 || sw.peek > timout_until_considered_successfull_connection) { 227 writefln("Connection to %s closed.", host); 228 return exit_status; 229 } else { 230 logger.warningf("Connection failed to %s. Falling back on next available host", 231 host); 232 } 233 } 234 catch (Exception e) { 235 logger.error(e.msg).collectException; 236 } 237 } 238 239 return 1; 240 } 241 242 // #SPC-shell_current_dir 243 int cli_localShell(const Options opts) nothrow { 244 import std.file : exists; 245 import std.process : spawnProcess, wait, userShell, Config, Pid; 246 247 try { 248 Pid pid; 249 if (exists(opts.workDir)) 250 pid = spawnProcess([userShell], null, Config.none, opts.workDir); 251 else 252 pid = spawnProcess([userShell]); 253 254 return pid.wait; 255 } 256 catch (Exception e) { 257 logger.error(e.msg).collectException; 258 return 1; 259 } 260 } 261 262 int cli_cmd(const Options opts) nothrow { 263 auto hosts = RemoteHostCache.make(opts.timeout); 264 hosts.sortByLoad; 265 266 if (hosts.empty) { 267 logger.errorf("No remote host online").collectException; 268 return 1; 269 } 270 271 auto host = hosts.randomAndPop; 272 if (host.isNull) 273 return 1; 274 275 return executeOnHost(opts, host); 276 } 277 278 /** Execute a command on a remote host. 279 * 280 * #SPC-automatic_env_import 281 */ 282 int executeOnHost(const Options opts, Host host) nothrow { 283 import distssh.protocol : ProtocolEnv; 284 import core.thread : Thread; 285 import core.time : dur, MonoTime; 286 import std.file : thisExePath; 287 import std.path : absolutePath; 288 import std.process : tryWait, Redirect, pipeProcess; 289 290 // #SPC-draft_remote_cmd_spec 291 try { 292 import std.file : getcwd; 293 294 auto args = ["ssh", "-oStrictHostKeyChecking=no", host, thisExePath, 295 "--local-run", "--workdir", getcwd, "--stdin-msgpack-env", "--"] ~ opts.command; 296 297 logger.info("Connecting to: ", host); 298 logger.info("run: ", args.joiner(" ")); 299 300 auto p = pipeProcess(args, Redirect.stdin); 301 302 auto pwriter = PipeWriter(p.stdin); 303 304 ProtocolEnv env; 305 if (opts.cloneEnv) 306 env = cloneEnv; 307 else if (!opts.noImportEnv) 308 env = readEnv(opts.importEnv.absolutePath); 309 pwriter.pack(env); 310 311 while (true) { 312 try { 313 auto st = p.pid.tryWait; 314 if (st.terminated) 315 return st.status; 316 317 Watchdog.ping(pwriter); 318 } 319 catch (Exception e) { 320 } 321 322 Thread.sleep(50.dur!"msecs"); 323 } 324 } 325 catch (Exception e) { 326 logger.error(e.msg).collectException; 327 return 1; 328 } 329 } 330 331 // #SPC-fast_env_startup 332 int cli_cmdWithImportedEnv(const Options opts) nothrow { 333 import core.thread : Thread; 334 import core.time : dur; 335 import std.stdio : File, stdin; 336 import std.file : exists; 337 import std.process : spawnProcess, Config, spawnShell, Pid, tryWait, 338 thisProcessID; 339 import std.utf : toUTF8; 340 341 if (opts.command.length == 0) 342 return 0; 343 344 static auto updateEnv(const Options opts, ref PipeReader pread, ref string[string] out_env) { 345 import distssh.protocol : ProtocolEnv; 346 347 ProtocolEnv env; 348 349 if (opts.stdinMsgPackEnv) { 350 while (true) { 351 pread.update; 352 353 try { 354 auto tmp = pread.unpack!(ProtocolEnv); 355 if (!tmp.isNull) { 356 env = tmp; 357 break; 358 } 359 } 360 catch (Exception e) { 361 } 362 363 Thread.sleep(10.dur!"msecs"); 364 } 365 } else { 366 env = readEnv(opts.importEnv); 367 } 368 369 foreach (kv; env) { 370 out_env[kv.key] = kv.value; 371 } 372 } 373 374 try { 375 string[string] env; 376 auto pread = PipeReader(stdin.fileno); 377 378 try { 379 updateEnv(opts, pread, env); 380 } 381 catch (Exception e) { 382 logger.trace(e.msg).collectException; 383 } 384 385 Pid res; 386 387 if (exists(opts.command[0])) { 388 res = spawnProcess(opts.command, env, Config.none, opts.workDir); 389 } else { 390 res = spawnShell(opts.command.dup.joiner(" ").toUTF8, env, Config.none, opts.workDir); 391 } 392 393 import core.time : MonoTime, dur; 394 import core.sys.posix.unistd : getppid; 395 396 const parent_pid = getppid; 397 const check_parent_interval = 500.dur!"msecs"; 398 const timeout = opts.timeout * 2; 399 400 int exit_status = 1; 401 bool sigint_cleanup; 402 403 auto check_parent = MonoTime.currTime + check_parent_interval; 404 405 auto wd = Watchdog(pread, timeout); 406 407 while (!wd.isTimeout) { 408 pread.update; 409 410 try { 411 auto status = tryWait(res); 412 if (status.terminated) { 413 exit_status = status.status; 414 break; 415 } 416 wd.update; 417 } 418 catch (Exception e) { 419 } 420 421 // #SPC-sigint_detection 422 if (MonoTime.currTime > check_parent) { 423 check_parent = MonoTime.currTime + check_parent_interval; 424 if (getppid != parent_pid) { 425 sigint_cleanup = true; 426 break; 427 } 428 } 429 430 Thread.sleep(50.dur!"msecs"); 431 } 432 433 import core.sys.posix.signal : kill, killpg, SIGKILL; 434 import core.sys.posix.unistd : getpid; 435 436 // #SPC-early_terminate_no_processes_left 437 if (wd.isTimeout) { 438 // cleanup all subprocesses of sshd. Should catch those that fork and create another process group. 439 cleanupProcess(.Pid(parent_pid), (int a) => kill(a, SIGKILL), 440 (int a) => killpg(a, SIGKILL)); 441 } 442 443 if (sigint_cleanup || wd.isTimeout) { 444 // sshd has already died on a SIGINT on the host side thus it is only possible to reliabley cleanup *this* process tree if anything is left. 445 cleanupProcess(.Pid(getpid), (int a) => kill(a, SIGKILL), (int a) => killpg(a, SIGKILL)).killpg( 446 SIGKILL); 447 } 448 449 return exit_status; 450 } 451 catch (Exception e) { 452 logger.error(e.msg).collectException; 453 return 1; 454 } 455 } 456 457 // #SPC-measure_remote_hosts 458 int cli_measureHosts(const Options opts) nothrow { 459 import std.conv : to; 460 import std.stdio : writefln, writeln; 461 import distssh.table; 462 463 auto hosts = RemoteHostCache.make(opts.timeout); 464 hosts.sortByLoad; 465 466 writefln("Configured hosts (%s='%s')", globalEnvHostKey, hosts.remoteHosts.joiner(";")) 467 .collectException; 468 469 string[3] row = ["Host", "Access Time", "Load"]; 470 auto tbl = Table!3(row); 471 472 foreach (a; hosts.remoteByLoad) { 473 try { 474 row[0] = a[0]; 475 row[1] = a[1].accessTime.to!string; 476 row[2] = a[1].loadAvg.to!string; 477 tbl.put(row); 478 //writefln("%s | %s | %s", a[0], a[1].accessTime.to!string, a[1].loadAvg.to!string); 479 } 480 catch (Exception e) { 481 logger.trace(e.msg).collectException; 482 } 483 } 484 485 try { 486 writeln(tbl); 487 } 488 catch (Exception e) { 489 logger.error(e.msg).collectException; 490 } 491 492 return 0; 493 } 494 495 /** Print the load of localhost. 496 * 497 * #SPC-measure_local_load 498 */ 499 int cli_localLoad(WriterT)(scope WriterT writer) nothrow { 500 import std.ascii : newline; 501 import std.conv : to; 502 import std.parallelism : totalCPUs; 503 import distssh.libc : getloadavg; 504 505 try { 506 double[3] loadavg; 507 int samples = getloadavg(&loadavg[0], 3); 508 509 if (samples == -1 || samples == 0) 510 loadavg[0] = totalCPUs > 0 ? totalCPUs : 1; 511 512 double cores = totalCPUs; 513 514 // make sure the loadavg is on a new line because the last line parsed is expected to contain the loadavg. 515 writer(newline); 516 517 if (cores > 0) 518 writer((loadavg[0] / cores).to!string); 519 else 520 writer(loadavg[0].to!string); 521 } 522 catch (Exception e) { 523 logger.trace(e.msg).collectException; 524 return -1; 525 } 526 527 return 0; 528 } 529 530 int cli_runOnAll(const Options opts) nothrow { 531 import std.algorithm : sort; 532 import std.stdio : writefln, writeln, stdout; 533 534 auto shosts = hostsFromEnv; 535 536 writefln("Configured hosts (%s): %(%s|%)", globalEnvHostKey, shosts).collectException; 537 538 bool exit_status = true; 539 foreach (a; shosts.sort) { 540 stdout.writefln("Connecting to %s.", a).collectException; 541 try { 542 // #SPC-flush_buffers 543 stdout.flush; 544 } 545 catch (Exception e) { 546 } 547 548 auto status = executeOnHost(opts, a); 549 550 if (status != 0) { 551 writeln("Failed, error code: ", status).collectException; 552 exit_status = false; 553 } 554 555 stdout.writefln("Connection to %s closed.", a).collectException; 556 } 557 558 return exit_status ? 0 : 1; 559 } 560 561 struct NonblockingFd { 562 int fileno; 563 564 private const int old_fcntl; 565 566 this(int fd) { 567 this.fileno = fd; 568 569 import core.sys.posix.fcntl : fcntl, F_SETFL, F_GETFL, O_NONBLOCK; 570 571 old_fcntl = fcntl(fileno, F_GETFL); 572 fcntl(fileno, F_SETFL, old_fcntl | O_NONBLOCK); 573 } 574 575 ~this() { 576 import core.sys.posix.fcntl : fcntl, F_SETFL; 577 578 fcntl(fileno, F_SETFL, old_fcntl); 579 } 580 581 void read(ref ubyte[] buf) { 582 static import core.sys.posix.unistd; 583 584 auto len = core.sys.posix.unistd.read(fileno, buf.ptr, buf.length); 585 if (len > 0) 586 buf = buf[0 .. len]; 587 } 588 } 589 590 struct Watchdog { 591 import std.datetime.stopwatch : StopWatch; 592 593 enum State { 594 ok, 595 timeout 596 } 597 598 private { 599 State st; 600 Duration timeout; 601 NullableRef!PipeReader pread; 602 StopWatch sw; 603 } 604 605 this(ref PipeReader pread, Duration timeout) { 606 this.pread = &pread; 607 this.timeout = timeout; 608 sw.start; 609 } 610 611 void update() { 612 import distssh.protocol : HeartBeat; 613 614 if (!pread.unpack!HeartBeat.isNull) { 615 sw.reset; 616 sw.start; 617 } else if (sw.peek > timeout) { 618 st = State.timeout; 619 } 620 } 621 622 bool isTimeout() { 623 return State.timeout == st; 624 } 625 626 static void ping(ref PipeWriter f) { 627 import distssh.protocol : HeartBeat; 628 629 f.pack!HeartBeat; 630 } 631 } 632 633 @("shall print the load of the localhost") 634 unittest { 635 string load; 636 auto exit_status = cli_localLoad((string s) => load = s); 637 assert(exit_status == 0); 638 assert(load.length > 0, load); 639 } 640 641 struct Options { 642 import core.time : dur; 643 644 enum Mode { 645 shell, 646 cmd, 647 importEnvCmd, 648 install, 649 measureHosts, 650 localLoad, 651 runOnAll, 652 localShell, 653 exportEnv, 654 } 655 656 Mode mode; 657 658 bool help; 659 bool noImportEnv; 660 bool cloneEnv; 661 bool verbose; 662 bool stdinMsgPackEnv; 663 std.getopt.GetoptResult help_info; 664 665 Duration timeout = defaultTimeout_s.dur!"seconds"; 666 667 string selfBinary; 668 string selfDir; 669 670 string importEnv; 671 string workDir; 672 string[] command; 673 } 674 675 /** Update a Configs object's file to import the environment from. 676 * 677 * This should only be called after all other command line parsing has been 678 * done. It is because this function take into consideration the priority as 679 * specified in the requirement: 680 * #SPC-configure_env_import_file 681 * 682 * Params: 683 * opts = config to update the file to import the environment from. 684 */ 685 void configImportEnvFile(ref Options opts) nothrow { 686 import std.process : environment; 687 688 if (opts.noImportEnv) { 689 opts.importEnv = null; 690 } else if (opts.importEnv.length != 0) { 691 // do nothing. the user has specified a file 692 } else { 693 try { 694 opts.importEnv = environment.get(globalEnvFileKey, distsshEnvExport); 695 } 696 catch (Exception e) { 697 } 698 } 699 } 700 701 /** 702 * #SPC-remote_command_parse 703 * 704 * Params: 705 * args = the command line arguments to parse. 706 */ 707 Options parseUserArgs(string[] args) { 708 import std.algorithm : among; 709 import std.file : thisExePath; 710 import std.path : dirName, baseName, buildPath; 711 712 Options opts; 713 714 opts.selfBinary = buildPath(thisExePath.dirName, args[0].baseName); 715 opts.selfDir = opts.selfBinary.dirName; 716 717 switch (opts.selfBinary.baseName) { 718 case distShell: 719 opts.mode = Options.Mode.shell; 720 return opts; 721 case distCmd: 722 opts.mode = Options.Mode.cmd; 723 opts.command = args.length > 1 ? args[1 .. $] : null; 724 opts.help = args.length > 1 && args[1].among("-h", "--help"); 725 configImportEnvFile(opts); 726 return opts; 727 default: 728 } 729 730 try { 731 bool export_env; 732 string export_env_file; 733 bool remote_shell; 734 bool install; 735 bool measure_hosts; 736 bool local_load; 737 bool local_run; 738 bool local_shell; 739 bool run_on_all; 740 ulong timeout_s; 741 742 // alphabetical order 743 // dfmt off 744 opts.help_info = std.getopt.getopt(args, std.getopt.config.passThrough, 745 std.getopt.config.keepEndOfOptions, 746 "clone-env", "clone the current environment to the remote host without an intermediate file", &opts.cloneEnv, 747 "export-env", "export the current environment to a file that is used on the remote host", &export_env, 748 "export-env-file", "export the current environment to the specified file", &export_env_file, 749 "install", "install distssh by setting up the correct symlinks", &install, 750 "i|import-env", "import the env from the file (default: " ~ distsshEnvExport ~ ")", &opts.importEnv, 751 "no-import-env", "do not automatically import the environment from " ~ distsshEnvExport, &opts.noImportEnv, 752 "measure", "measure the login time and load of all remote hosts", &measure_hosts, 753 "local-load", "measure the load on the current host", &local_load, 754 "local-run", "import env and run the command locally", &local_run, 755 "local-shell", "run the shell locally", &local_shell, 756 "run-on-all", "run the command on all remote hosts", &run_on_all, 757 "shell", "open an interactive shell on the remote host", &remote_shell, 758 "stdin-msgpack-env", "import env from stdin as a msgpack stream", &opts.stdinMsgPackEnv, 759 "timeout", "timeout to use when checking remote hosts", &timeout_s, 760 "v|verbose", "verbose logging", &opts.verbose, 761 "workdir", "working directory to run the command in", &opts.workDir, 762 ); 763 // dfmt on 764 opts.help = opts.help_info.helpWanted; 765 766 import core.time : dur; 767 768 if (timeout_s > 0) 769 opts.timeout = timeout_s.dur!"seconds"; 770 771 if (install) 772 opts.mode = Options.Mode.install; 773 else if (export_env) { 774 opts.mode = Options.Mode.exportEnv; 775 if (export_env_file.length != 0) 776 opts.importEnv = export_env_file; 777 } else if (remote_shell) 778 opts.mode = Options.Mode.shell; 779 else if (measure_hosts) 780 opts.mode = Options.Mode.measureHosts; 781 else if (local_load) 782 opts.mode = Options.Mode.localLoad; 783 else if (local_run) 784 opts.mode = Options.Mode.importEnvCmd; 785 else if (run_on_all) 786 opts.mode = Options.Mode.runOnAll; 787 else if (local_shell) 788 opts.mode = Options.Mode.localShell; 789 else 790 opts.mode = Options.Mode.cmd; 791 } 792 catch (std.getopt.GetOptException e) { 793 // unknown option 794 opts.help = true; 795 logger.error(e.msg).collectException; 796 } 797 catch (Exception e) { 798 opts.help = true; 799 logger.error(e.msg).collectException; 800 } 801 802 if (args.length > 1) { 803 import std.algorithm : find; 804 import std.range : drop; 805 import std.array : array; 806 807 opts.command = args.find("--").drop(1).array(); 808 } 809 810 if (opts.mode == Options.Mode.cmd && opts.command.length == 0) 811 opts.help = true; 812 813 configImportEnvFile(opts); 814 815 return opts; 816 } 817 818 @("shall determine the absolute path of self") 819 unittest { 820 import std.path; 821 import std.file; 822 823 auto opts = parseUserArgs(["distssh", "ls"]); 824 assert(opts.selfBinary[0] == '/'); 825 assert(opts.selfBinary.baseName == "distssh"); 826 827 opts = parseUserArgs(["distshell"]); 828 assert(opts.selfBinary[0] == '/'); 829 assert(opts.selfBinary.baseName == "distshell"); 830 831 opts = parseUserArgs(["distcmd"]); 832 assert(opts.selfBinary[0] == '/'); 833 assert(opts.selfBinary.baseName == "distcmd"); 834 835 opts = parseUserArgs(["distcmd_recv", getcwd, distsshEnvExport]); 836 assert(opts.selfBinary[0] == '/'); 837 assert(opts.selfBinary.baseName == "distcmd_recv"); 838 } 839 840 @("shall either return the default timeout or the user specified timeout") 841 unittest { 842 import core.time : dur; 843 import std.conv; 844 845 auto opts = parseUserArgs(["distssh", "ls"]); 846 assert(opts.timeout == defaultTimeout_s.dur!"seconds"); 847 opts = parseUserArgs(["distssh", "--timeout", "10", "ls"]); 848 assert(opts.timeout == 10.dur!"seconds"); 849 850 opts = parseUserArgs(["distshell"]); 851 assert(opts.timeout == defaultTimeout_s.dur!"seconds", opts.timeout.to!string); 852 opts = parseUserArgs(["distshell", "--timeout", "10"]); 853 assert(opts.timeout == defaultTimeout_s.dur!"seconds"); 854 } 855 856 @("shall only be the default timeout because --timeout should be passed on to the command") 857 unittest { 858 import core.time : dur; 859 import std.conv; 860 861 auto opts = parseUserArgs(["distcmd", "ls"]); 862 assert(opts.timeout == defaultTimeout_s.dur!"seconds"); 863 864 opts = parseUserArgs(["distcmd", "--timeout", "10"]); 865 assert(opts.timeout == defaultTimeout_s.dur!"seconds"); 866 } 867 868 void printHelp(const Options opts) nothrow { 869 import std.getopt : defaultGetoptPrinter; 870 import std.format : format; 871 import std.path : baseName; 872 873 auto help_txt = "usage: %s [options] -- [COMMAND]\n"; 874 if (opts.selfBinary.baseName == distCmd) { 875 help_txt = "usage: %s [COMMAND]\n"; 876 } 877 878 try { 879 defaultGetoptPrinter(format(help_txt, opts.selfBinary.baseName), opts.help_info.options.dup); 880 } 881 catch (Exception e) { 882 logger.error(e.msg).collectException; 883 } 884 } 885 886 struct Host { 887 string payload; 888 alias payload this; 889 } 890 891 /// The load of a host. 892 struct Load { 893 double loadAvg; 894 Duration accessTime; 895 896 bool opEquals(const this o) nothrow @safe pure @nogc { 897 return loadAvg == o.loadAvg && accessTime == o.accessTime; 898 } 899 900 int opCmp(const this o) pure @safe @nogc nothrow { 901 if (loadAvg < o.loadAvg) 902 return - 1; 903 else if (loadAvg > o.loadAvg) 904 return 1; 905 906 if (accessTime < o.accessTime) 907 return - 1; 908 else if (accessTime > o.accessTime) 909 return 1; 910 911 return this == o ? 0 : 1; 912 } 913 } 914 915 @("shall sort the loads") 916 unittest { 917 import std.algorithm : sort; 918 import std.array : array; 919 import core.time : dur; 920 921 { 922 auto raw = [Load(0.6, 500.dur!"msecs"), Load(0.5, 500.dur!"msecs")].sort.array; 923 assert(raw[0].loadAvg == 0.5); 924 } 925 926 { 927 auto raw = [Load(0.5, 600.dur!"msecs"), Load(0.5, 500.dur!"msecs")].sort.array; 928 assert(raw[0].accessTime == 500.dur!"msecs"); 929 } 930 } 931 932 /** Login on host and measure its load. 933 * 934 * Params: 935 * h = remote host to check 936 * 937 * Returns: the Load of the remote host 938 */ 939 Load getLoad(Host h, Duration timeout) nothrow { 940 import std.conv : to; 941 import std.file : thisExePath; 942 import std.process : tryWait, pipeProcess, kill, wait; 943 import std.range : takeOne, retro; 944 import std.stdio : writeln; 945 import core.time : MonoTime, dur; 946 import core.sys.posix.signal : SIGKILL; 947 948 enum ExitCode { 949 error, 950 timeout, 951 ok, 952 } 953 954 ExitCode exit_code; 955 const start_at = MonoTime.currTime; 956 const stop_at = start_at + timeout; 957 958 auto elapsed = 3600.dur!"seconds"; 959 double load = int.max; 960 961 try { 962 immutable abs_distssh = thisExePath; 963 auto res = pipeProcess(["ssh", "-q", "-oStrictHostKeyChecking=no", h, 964 abs_distssh, "--local-load"]); 965 966 while (true) { 967 auto st = res.pid.tryWait; 968 969 if (st.terminated && st.status == 0) { 970 exit_code = ExitCode.ok; 971 break; 972 } else if (st.terminated && st.status != 0) { 973 exit_code = ExitCode.error; 974 break; 975 } else if (stop_at < MonoTime.currTime) { 976 exit_code = ExitCode.timeout; 977 res.pid.kill(SIGKILL); 978 break; 979 } 980 } 981 982 // must read the exit or a zombie process is left behind 983 res.pid.wait; 984 985 elapsed = MonoTime.currTime - start_at; 986 987 if (exit_code != ExitCode.ok) 988 return Load(load, elapsed); 989 990 try { 991 string last_line; 992 foreach (a; res.stdout.byLineCopy) { 993 last_line = a; 994 } 995 996 load = last_line.to!double; 997 } 998 catch (Exception e) { 999 logger.trace(res.stdout).collectException; 1000 logger.trace(res.stderr).collectException; 1001 logger.trace(e.msg).collectException; 1002 } 1003 } 1004 catch (Exception e) { 1005 logger.trace(e.msg).collectException; 1006 } 1007 1008 return Load(load, elapsed); 1009 } 1010 1011 /// Mirror of an environment. 1012 struct Env { 1013 string[string] payload; 1014 alias payload this; 1015 } 1016 1017 /** 1018 * #SPC-env_export_filter 1019 * 1020 * Params: 1021 * env = a null terminated array of C strings. 1022 * 1023 * Returns: a clone of the environment. 1024 */ 1025 auto cloneEnv() nothrow { 1026 import std.process : environment; 1027 import std..string : strip; 1028 import distssh.protocol : ProtocolEnv, EnvVariable; 1029 1030 ProtocolEnv app; 1031 1032 try { 1033 auto env = environment.toAA; 1034 1035 foreach (k; environment.get(globalEnvFilterKey, null) 1036 .strip.splitter(';').map!(a => a.strip).filter!(a => a.length != 0)) { 1037 if (env.remove(k)) { 1038 logger.infof("Removed '%s' from the exported environment", k); 1039 } 1040 } 1041 1042 foreach (const a; env.byKeyValue) { 1043 app ~= EnvVariable(a.key, a.value); 1044 } 1045 } 1046 catch (Exception e) { 1047 logger.warning(e.msg).collectException; 1048 } 1049 1050 return app; 1051 } 1052 1053 import core.sys.posix.unistd : pid_t; 1054 1055 struct Pid { 1056 pid_t value; 1057 alias value this; 1058 } 1059 1060 Pid[] getShallowChildren(int parent_pid) { 1061 import core.sys.posix.unistd : pid_t; 1062 import std.conv : to; 1063 import std.array : appender; 1064 import std.file : dirEntries, SpanMode, exists; 1065 import std.path : buildPath, baseName; 1066 1067 auto children = appender!(Pid[])(); 1068 1069 foreach (const p; File(buildPath("/proc", parent_pid.to!string, "task", 1070 parent_pid.to!string, "children")).readln.splitter(" ").filter!(a => a.length > 0)) { 1071 try { 1072 children.put(p.to!pid_t.Pid); 1073 } 1074 catch (Exception e) { 1075 logger.trace(e.msg).collectException; 1076 } 1077 } 1078 1079 return children.data; 1080 } 1081 1082 @("shall return the immediate children of the init process") 1083 unittest { 1084 auto res = getShallowChildren(1); 1085 //logger.trace(res); 1086 assert(res.length > 0); 1087 } 1088 1089 /** Returns: a list of all processes with the leafs being at the back 1090 */ 1091 Pid[] getDeepChildren(int parent_pid) { 1092 import std.array : appender; 1093 import std.container : DList; 1094 1095 auto children = DList!Pid(); 1096 1097 children.insert(getShallowChildren(parent_pid)); 1098 auto res = appender!(Pid[])(); 1099 1100 while (!children.empty) { 1101 const p = children.front; 1102 res.put(p); 1103 children.insertBack(getShallowChildren(p)); 1104 children.removeFront; 1105 } 1106 1107 return res.data; 1108 } 1109 1110 @("shall return a list of all children of the init process") 1111 unittest { 1112 auto direct = getShallowChildren(1); 1113 auto deep = getDeepChildren(1); 1114 1115 //logger.trace(direct); 1116 //logger.trace(deep); 1117 1118 assert(deep.length > direct.length); 1119 } 1120 1121 struct PidGroup { 1122 pid_t value; 1123 alias value this; 1124 } 1125 1126 /** Returns: a list of the process groups in the same order as the input. 1127 */ 1128 PidGroup[] getPidGroups(const Pid[] pids) { 1129 import core.sys.posix.unistd : getpgid; 1130 import std.array : array, appender; 1131 1132 auto res = appender!(PidGroup[])(); 1133 bool[pid_t] pgroups; 1134 1135 foreach (const p; pids.map!(a => getpgid(a)).filter!(a => a != -1)) { 1136 if (p !in pgroups) { 1137 pgroups[p] = true; 1138 res.put(PidGroup(p)); 1139 } 1140 } 1141 1142 return res.data; 1143 } 1144 1145 /** Cleanup the children processes. 1146 * 1147 * Create an array of children process groups. 1148 * Create an array of children pids. 1149 * Kill the children pids to prohibit them from spawning more processes. 1150 * Kill the groups from top to bottom. 1151 * 1152 * Params: 1153 * parent_pid = pid of the parent to analyze and kill the children of 1154 * kill = a function that kill a process 1155 * killpg = a function that kill a process group 1156 * 1157 * Returns: the process group of "this". 1158 */ 1159 pid_t cleanupProcess(KillT, KillPgT)(Pid parent_pid, KillT kill, KillPgT killpg) nothrow { 1160 import core.sys.posix.unistd : getpgrp, getpid, getppid; 1161 1162 const this_pid = getpid(); 1163 const this_gid = getpgrp(); 1164 1165 try { 1166 auto children_groups = getDeepChildren(parent_pid).getPidGroups; 1167 auto direct_children = getShallowChildren(parent_pid); 1168 1169 foreach (const p; direct_children.filter!(a => a != this_pid)) { 1170 kill(p); 1171 } 1172 1173 foreach (const p; children_groups.filter!(a => a != this_gid)) { 1174 killpg(p); 1175 } 1176 } 1177 catch (Exception e) { 1178 logger.trace(e.msg).collectException; 1179 } 1180 1181 return this_gid; 1182 } 1183 1184 @("shall return a list of all children pid groups of the init process") 1185 unittest { 1186 import std.conv; 1187 import core.sys.posix.unistd : getpgrp, getpid, getppid; 1188 1189 auto direct = getShallowChildren(1); 1190 auto deep = getDeepChildren(1); 1191 auto groups = getPidGroups(deep); 1192 1193 //logger.trace(direct.length, direct); 1194 //logger.trace(deep.length, deep); 1195 //logger.trace(groups.length, groups); 1196 1197 assert(groups.length < deep.length); 1198 } 1199 1200 struct PipeReader { 1201 import distssh.protocol : Deserialize; 1202 1203 NonblockingFd nfd; 1204 Deserialize deser; 1205 1206 alias deser this; 1207 1208 this(int fd) { 1209 this.nfd = NonblockingFd(fd); 1210 } 1211 1212 // Update the buffer with data from the pipe. 1213 void update() nothrow { 1214 ubyte[128] buf; 1215 ubyte[] s = buf[]; 1216 1217 try { 1218 nfd.read(s); 1219 if (s.length > 0) 1220 deser.put(s); 1221 } 1222 catch (Exception e) { 1223 } 1224 1225 deser.cleanupUntilKind; 1226 } 1227 } 1228 1229 struct PipeWriter { 1230 import distssh.protocol : Serialize; 1231 1232 File fout; 1233 Serialize!(void delegate(const(ubyte)[]) @safe) ser; 1234 1235 alias ser this; 1236 1237 this(File f) { 1238 this.fout = f; 1239 this.ser = typeof(ser)(&this.put); 1240 } 1241 1242 void put(const(ubyte)[] v) @safe { 1243 fout.rawWrite(v); 1244 fout.flush; 1245 } 1246 } 1247 1248 auto readEnv(string filename) nothrow { 1249 import distssh.protocol : ProtocolEnv, EnvVariable, Deserialize; 1250 import std.file : exists; 1251 import std.stdio : File; 1252 1253 ProtocolEnv rval; 1254 1255 if (!exists(filename)) { 1256 logger.info("File to import the environment from do not exist: ", 1257 filename).collectException; 1258 return rval; 1259 } 1260 1261 try { 1262 auto fin = File(filename); 1263 Deserialize deser; 1264 1265 ubyte[128] buf; 1266 while (!fin.eof) { 1267 auto read_ = fin.rawRead(buf[]); 1268 deser.put(read_); 1269 } 1270 1271 rval = deser.unpack!(ProtocolEnv); 1272 } 1273 catch (Exception e) { 1274 logger.error(e.msg).collectException; 1275 logger.errorf("Unable to import environment from '%s'", filename).collectException; 1276 } 1277 1278 return rval; 1279 } 1280 1281 void writeEnv(string filename, from!"distssh.protocol".ProtocolEnv env) { 1282 import core.sys.posix.sys.stat : fchmod, S_IRUSR, S_IWUSR; 1283 import std.stdio : File; 1284 import distssh.protocol : Serialize; 1285 1286 auto fout = File(filename, "w"); 1287 fchmod(fout.fileno, S_IRUSR | S_IWUSR); 1288 1289 auto ser = Serialize!(void delegate(const(ubyte)[]) @safe)((const(ubyte)[] a) => fout.rawWrite( 1290 a)); 1291 1292 ser.pack(env); 1293 } 1294 1295 Host[] hostsFromEnv() nothrow { 1296 import std.array : array; 1297 import std..string : strip; 1298 import std.process : environment; 1299 1300 static import core.stdc.stdlib; 1301 1302 typeof(return) rval; 1303 1304 try { 1305 string hosts_env = environment.get(globalEnvHostKey, "").strip; 1306 rval = hosts_env.splitter(";").map!(a => a.strip) 1307 .filter!(a => a.length > 0).map!(a => Host(a)).array; 1308 1309 if (rval.length == 0) { 1310 logger.errorf("No remote host configured (%s='%s')", globalEnvHostKey, hosts_env); 1311 } 1312 } 1313 catch (Exception e) { 1314 logger.error(e.msg).collectException; 1315 } 1316 1317 return rval; 1318 } 1319 1320 /** 1321 * #SPC-load_balance 1322 * #SPC-best_remote_host 1323 */ 1324 struct RemoteHostCache { 1325 import std.array : array; 1326 import std.typecons : Tuple, tuple; 1327 1328 alias HostLoad = Tuple!(Host, Load); 1329 1330 Duration timeout; 1331 Host[] remoteHosts; 1332 HostLoad[] remoteByLoad; 1333 1334 static auto make(Duration timeout) nothrow { 1335 return RemoteHostCache(timeout, hostsFromEnv); 1336 } 1337 1338 /// Measure and sort the remote hosts. 1339 void sortByLoad() nothrow { 1340 import std.algorithm : sort; 1341 import std.parallelism : TaskPool; 1342 1343 static auto loadHost(T)(T host_timeout) nothrow { 1344 return HostLoad(host_timeout[0], getLoad(host_timeout[0], host_timeout[1])); 1345 } 1346 1347 auto shosts = remoteHosts.map!(a => tuple(a, timeout)).array; 1348 1349 try { 1350 auto pool = new TaskPool(shosts.length + 1); 1351 scope (exit) 1352 pool.stop; 1353 1354 // dfmt off 1355 remoteByLoad = 1356 pool.map!loadHost(shosts) 1357 .array 1358 .sort!((a,b) => a[1] < b[1]) 1359 .array; 1360 // dfmt on 1361 1362 if (remoteByLoad.length == 0) { 1363 // this should be very uncommon but may happen. 1364 remoteByLoad = remoteHosts.map!(a => HostLoad(a, Load.init)).array; 1365 } 1366 } 1367 catch (Exception e) { 1368 logger.trace(e.msg).collectException; 1369 } 1370 } 1371 1372 /// Returns: the lowest loaded server. 1373 Nullable!Host randomAndPop() @safe nothrow { 1374 import std.range : take; 1375 import std.random : uniform; 1376 1377 typeof(return) rval; 1378 1379 if (empty) 1380 return rval; 1381 1382 try { 1383 auto top3 = remoteByLoad.take(3).array; 1384 auto ridx = uniform(0, top3.length); 1385 rval = top3[ridx][0]; 1386 } 1387 catch (Exception e) { 1388 rval = remoteByLoad[0][0]; 1389 } 1390 1391 remoteByLoad = remoteByLoad.filter!(a => a[0] != rval).array; 1392 1393 return rval; 1394 } 1395 1396 Host front() @safe pure nothrow { 1397 assert(!empty); 1398 return remoteByLoad[0][0]; 1399 } 1400 1401 void popFront() @safe pure nothrow { 1402 assert(!empty); 1403 remoteByLoad = remoteByLoad[1 .. $]; 1404 } 1405 1406 bool empty() @safe pure nothrow { 1407 return remoteByLoad.length == 0; 1408 } 1409 }