1 /**
2 Copyright: Copyright (c) 2018, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Methods prefixed with `cli_` are strongly related to user commands.
7 They more or less fully implement a command line interface command.
8 */
9 module distssh.main_;
10 
11 import core.time : Duration;
12 import std.algorithm : splitter, map, filter, joiner;
13 import std.exception : collectException;
14 import std.stdio : File;
15 import std.typecons : Nullable, NullableRef;
16 import logger = std.experimental.logger;
17 
18 import distssh.from;
19 
20 static import std.getopt;
21 
22 immutable globalEnvHostKey = "DISTSSH_HOSTS";
23 immutable globalEnvFileKey = "DISTSSH_IMPORT_ENV";
24 immutable globalEnvFilterKey = "DISTSSH_ENV_EXPORT_FILTER";
25 immutable distShell = "distshell";
26 immutable distCmd = "distcmd";
27 immutable distsshEnvExport = "distssh_env.export";
28 immutable ulong defaultTimeout_s = 2;
29 
30 int rmain(string[] args) nothrow {
31     try {
32         import distssh.app_logger;
33 
34         auto simple_logger = new SimpleLogger();
35         logger.sharedLog(simple_logger);
36     }
37     catch (Exception e) {
38         logger.warning(e.msg).collectException;
39     }
40 
41     Options opts;
42     try {
43         opts = parseUserArgs(args);
44     }
45     catch (Exception e) {
46         logger.error(e.msg).collectException;
47         return 1;
48     }
49 
50     try {
51         if (opts.verbose) {
52             logger.globalLogLevel(logger.LogLevel.all);
53         } else {
54             logger.globalLogLevel(logger.LogLevel.warning);
55         }
56     }
57     catch (Exception e) {
58         logger.warning(e.msg).collectException;
59     }
60 
61     if (opts.help) {
62         printHelp(opts);
63         return 0;
64     }
65 
66     return appMain(opts);
67 }
68 
69 int appMain(const Options opts) nothrow {
70     final switch (opts.mode) with (Options.Mode) {
71     case exportEnv:
72         return cli_exportEnv(opts);
73     case install:
74         import std.file : symlink;
75 
76         return cli_install(opts, (string src, string dst) => symlink(src, dst));
77     case shell:
78         return cli_shell(opts);
79     case localShell:
80         return cli_localShell(opts);
81     case cmd:
82         return cli_cmd(opts);
83     case importEnvCmd:
84         return cli_cmdWithImportedEnv(opts);
85     case measureHosts:
86         return cli_measureHosts(opts);
87     case localLoad:
88         import std.stdio : writeln;
89 
90         return cli_localLoad((string s) => writeln(s));
91     case runOnAll:
92         return cli_runOnAll(opts);
93     }
94 }
95 
96 private:
97 
98 /** Export the environment to a file for later import.
99 
100   */
101 int cli_exportEnv(const Options opts) nothrow {
102     try {
103         writeEnv(opts.importEnv, cloneEnv);
104         logger.info("Exported environment to ", opts.importEnv);
105     }
106     catch (Exception e) {
107         logger.error(e.msg).collectException;
108         return 1;
109     }
110 
111     return 0;
112 }
113 
114 unittest {
115     import std.conv : to;
116     import std..string : toStringz;
117     import std.file;
118 
119     immutable remove_me = "remove_me.export";
120     scope (exit)
121         remove(remove_me);
122 
123     auto opts = parseUserArgs(["distssh", "--export-env", "--export-env-file", remove_me]);
124 
125     // make sure there are at least one environment variable
126     import core.sys.posix.stdlib : putenv, unsetenv;
127 
128     const env_key = "DISTSSH_ENV_TEST";
129     const env_var = (env_key ~ "=" ~ remove_me).toStringz;
130     putenv(cast(char*) env_var);
131     scope (exit)
132         unsetenv(cast(char*) env_key.ptr);
133 
134     // shall export the environment to the file
135     void verify1() {
136         // test normal export
137         cli_exportEnv(opts);
138 
139         auto env = readEnv(remove_me);
140         assert(!env.filter!(a => a.key == env_key).empty, env.to!string);
141     }
142 
143     verify1;
144 
145     // shall filter out specified env before exporting to the file
146     const env_filter_var = (globalEnvFilterKey ~ "=DISTSSH_ENV_TEST;junk ").toStringz;
147     putenv(cast(char*) env_filter_var);
148     scope (exit)
149         unsetenv(cast(char*) globalEnvFilterKey.ptr);
150 
151     void verify2() {
152         cli_exportEnv(opts);
153 
154         auto env = readEnv(remove_me);
155         assert(env.filter!(a => a.key == env_key).empty, env.to!string);
156     }
157 
158     verify2;
159 }
160 
161 int cli_install(const Options opts, void delegate(string src, string dst) symlink) nothrow {
162     import std.path : buildPath;
163 
164     try {
165         symlink(opts.selfBinary, buildPath(opts.selfDir, distShell));
166         symlink(opts.selfBinary, buildPath(opts.selfDir, distCmd));
167         return 0;
168     }
169     catch (Exception e) {
170         logger.error(e.msg).collectException;
171         return 1;
172     }
173 }
174 
175 @("shall create symlinks to self")
176 unittest {
177     string[2][] symlinks;
178     void fakeSymlink(string src, string dst) {
179         string[2] v = [src, dst];
180         symlinks ~= v;
181     }
182 
183     Options opts;
184     opts.selfBinary = "/foo/src";
185     opts.selfDir = "/bar";
186 
187     cli_install(opts, &fakeSymlink);
188 
189     assert(symlinks[0] == ["/foo/src", "/bar/distshell"]);
190     assert(symlinks[1] == ["/foo/src", "/bar/distcmd"]);
191 }
192 
193 int cli_shell(const Options opts) nothrow {
194     import std.datetime.stopwatch : StopWatch, AutoStart;
195     import std.file : thisExePath, getcwd;
196     import std.process : spawnProcess, wait;
197     import std.stdio : writeln, writefln;
198 
199     auto hosts = RemoteHostCache.make(opts.timeout);
200     hosts.sortByLoad;
201 
202     if (hosts.empty) {
203         logger.errorf("No remote host online").collectException;
204     }
205 
206     const timout_until_considered_successfull_connection = opts.timeout * 2;
207 
208     while (!hosts.empty) {
209         auto host = hosts.randomAndPop;
210 
211         try {
212             if (host.isNull) {
213                 logger.error("No remote host online");
214                 return 1;
215             }
216 
217             writeln("Connecting to ", host);
218 
219             auto sw = StopWatch(AutoStart.yes);
220 
221             // two -t forces a tty to be created and used which mean that the remote shell will *think* it is an interactive shell
222             auto exit_status = spawnProcess(["ssh", "-q", "-t", "-t", "-oStrictHostKeyChecking=no",
223                     host, thisExePath, "--local-shell", "--workdir", getcwd]).wait;
224 
225             // #SPC-fallback_remote_host
226             if (exit_status == 0 || sw.peek > timout_until_considered_successfull_connection) {
227                 writefln("Connection to %s closed.", host);
228                 return exit_status;
229             } else {
230                 logger.warningf("Connection failed to %s. Falling back on next available host",
231                         host);
232             }
233         }
234         catch (Exception e) {
235             logger.error(e.msg).collectException;
236         }
237     }
238 
239     return 1;
240 }
241 
242 // #SPC-shell_current_dir
243 int cli_localShell(const Options opts) nothrow {
244     import std.file : exists;
245     import std.process : spawnProcess, wait, userShell, Config, Pid;
246 
247     try {
248         Pid pid;
249         if (exists(opts.workDir))
250             pid = spawnProcess([userShell], null, Config.none, opts.workDir);
251         else
252             pid = spawnProcess([userShell]);
253 
254         return pid.wait;
255     }
256     catch (Exception e) {
257         logger.error(e.msg).collectException;
258         return 1;
259     }
260 }
261 
262 int cli_cmd(const Options opts) nothrow {
263     auto hosts = RemoteHostCache.make(opts.timeout);
264     hosts.sortByLoad;
265 
266     if (hosts.empty) {
267         logger.errorf("No remote host online").collectException;
268         return 1;
269     }
270 
271     auto host = hosts.randomAndPop;
272     if (host.isNull)
273         return 1;
274 
275     return executeOnHost(opts, host);
276 }
277 
278 /** Execute a command on a remote host.
279  *
280  * #SPC-automatic_env_import
281  */
282 int executeOnHost(const Options opts, Host host) nothrow {
283     import distssh.protocol : ProtocolEnv;
284     import core.thread : Thread;
285     import core.time : dur, MonoTime;
286     import std.file : thisExePath;
287     import std.path : absolutePath;
288     import std.process : tryWait, Redirect, pipeProcess;
289 
290     // #SPC-draft_remote_cmd_spec
291     try {
292         import std.file : getcwd;
293 
294         auto args = ["ssh", "-oStrictHostKeyChecking=no", host, thisExePath,
295             "--local-run", "--workdir", getcwd, "--stdin-msgpack-env", "--"] ~ opts.command;
296 
297         logger.info("Connecting to: ", host);
298         logger.info("run: ", args.joiner(" "));
299 
300         auto p = pipeProcess(args, Redirect.stdin);
301 
302         auto pwriter = PipeWriter(p.stdin);
303 
304         ProtocolEnv env;
305         if (opts.cloneEnv)
306             env = cloneEnv;
307         else if (!opts.noImportEnv)
308             env = readEnv(opts.importEnv.absolutePath);
309         pwriter.pack(env);
310 
311         while (true) {
312             try {
313                 auto st = p.pid.tryWait;
314                 if (st.terminated)
315                     return st.status;
316 
317                 Watchdog.ping(pwriter);
318             }
319             catch (Exception e) {
320             }
321 
322             Thread.sleep(50.dur!"msecs");
323         }
324     }
325     catch (Exception e) {
326         logger.error(e.msg).collectException;
327         return 1;
328     }
329 }
330 
331 // #SPC-fast_env_startup
332 int cli_cmdWithImportedEnv(const Options opts) nothrow {
333     import core.thread : Thread;
334     import core.time : dur;
335     import std.stdio : File, stdin;
336     import std.file : exists;
337     import std.process : spawnProcess, Config, spawnShell, Pid, tryWait,
338         thisProcessID;
339     import std.utf : toUTF8;
340 
341     if (opts.command.length == 0)
342         return 0;
343 
344     static auto updateEnv(const Options opts, ref PipeReader pread, ref string[string] out_env) {
345         import distssh.protocol : ProtocolEnv;
346 
347         ProtocolEnv env;
348 
349         if (opts.stdinMsgPackEnv) {
350             while (true) {
351                 pread.update;
352 
353                 try {
354                     auto tmp = pread.unpack!(ProtocolEnv);
355                     if (!tmp.isNull) {
356                         env = tmp;
357                         break;
358                     }
359                 }
360                 catch (Exception e) {
361                 }
362 
363                 Thread.sleep(10.dur!"msecs");
364             }
365         } else {
366             env = readEnv(opts.importEnv);
367         }
368 
369         foreach (kv; env) {
370             out_env[kv.key] = kv.value;
371         }
372     }
373 
374     try {
375         string[string] env;
376         auto pread = PipeReader(stdin.fileno);
377 
378         try {
379             updateEnv(opts, pread, env);
380         }
381         catch (Exception e) {
382             logger.trace(e.msg).collectException;
383         }
384 
385         Pid res;
386 
387         if (exists(opts.command[0])) {
388             res = spawnProcess(opts.command, env, Config.none, opts.workDir);
389         } else {
390             res = spawnShell(opts.command.dup.joiner(" ").toUTF8, env, Config.none, opts.workDir);
391         }
392 
393         import core.time : MonoTime, dur;
394         import core.sys.posix.unistd : getppid;
395 
396         const parent_pid = getppid;
397         const check_parent_interval = 500.dur!"msecs";
398         const timeout = opts.timeout * 2;
399 
400         int exit_status = 1;
401         bool sigint_cleanup;
402 
403         auto check_parent = MonoTime.currTime + check_parent_interval;
404 
405         auto wd = Watchdog(pread, timeout);
406 
407         while (!wd.isTimeout) {
408             pread.update;
409 
410             try {
411                 auto status = tryWait(res);
412                 if (status.terminated) {
413                     exit_status = status.status;
414                     break;
415                 }
416                 wd.update;
417             }
418             catch (Exception e) {
419             }
420 
421             // #SPC-sigint_detection
422             if (MonoTime.currTime > check_parent) {
423                 check_parent = MonoTime.currTime + check_parent_interval;
424                 if (getppid != parent_pid) {
425                     sigint_cleanup = true;
426                     break;
427                 }
428             }
429 
430             Thread.sleep(50.dur!"msecs");
431         }
432 
433         import core.sys.posix.signal : kill, killpg, SIGKILL;
434         import core.sys.posix.unistd : getpid;
435 
436         // #SPC-early_terminate_no_processes_left
437         if (wd.isTimeout) {
438             // cleanup all subprocesses of sshd. Should catch those that fork and create another process group.
439             cleanupProcess(.Pid(parent_pid), (int a) => kill(a, SIGKILL),
440                     (int a) => killpg(a, SIGKILL));
441         }
442 
443         if (sigint_cleanup || wd.isTimeout) {
444             // sshd has already died on a SIGINT on the host side thus it is only possible to reliabley cleanup *this* process tree if anything is left.
445             cleanupProcess(.Pid(getpid), (int a) => kill(a, SIGKILL), (int a) => killpg(a, SIGKILL)).killpg(
446                     SIGKILL);
447         }
448 
449         return exit_status;
450     }
451     catch (Exception e) {
452         logger.error(e.msg).collectException;
453         return 1;
454     }
455 }
456 
457 // #SPC-measure_remote_hosts
458 int cli_measureHosts(const Options opts) nothrow {
459     import std.conv : to;
460     import std.stdio : writefln, writeln;
461     import distssh.table;
462 
463     auto hosts = RemoteHostCache.make(opts.timeout);
464     hosts.sortByLoad;
465 
466     writefln("Configured hosts (%s='%s')", globalEnvHostKey, hosts.remoteHosts.joiner(";"))
467         .collectException;
468 
469     string[3] row = ["Host", "Access Time", "Load"];
470     auto tbl = Table!3(row);
471 
472     foreach (a; hosts.remoteByLoad) {
473         try {
474             row[0] = a[0];
475             row[1] = a[1].accessTime.to!string;
476             row[2] = a[1].loadAvg.to!string;
477             tbl.put(row);
478             //writefln("%s | %s | %s", a[0], a[1].accessTime.to!string, a[1].loadAvg.to!string);
479         }
480         catch (Exception e) {
481             logger.trace(e.msg).collectException;
482         }
483     }
484 
485     try {
486         writeln(tbl);
487     }
488     catch (Exception e) {
489         logger.error(e.msg).collectException;
490     }
491 
492     return 0;
493 }
494 
495 /** Print the load of localhost.
496  *
497  * #SPC-measure_local_load
498  */
499 int cli_localLoad(WriterT)(scope WriterT writer) nothrow {
500     import std.ascii : newline;
501     import std.conv : to;
502     import std.parallelism : totalCPUs;
503     import distssh.libc : getloadavg;
504 
505     try {
506         double[3] loadavg;
507         int samples = getloadavg(&loadavg[0], 3);
508 
509         if (samples == -1 || samples == 0)
510             loadavg[0] = totalCPUs > 0 ? totalCPUs : 1;
511 
512         double cores = totalCPUs;
513 
514         // make sure the loadavg is on a new line because the last line parsed is expected to contain the loadavg.
515         writer(newline);
516 
517         if (cores > 0)
518             writer((loadavg[0] / cores).to!string);
519         else
520             writer(loadavg[0].to!string);
521     }
522     catch (Exception e) {
523         logger.trace(e.msg).collectException;
524         return -1;
525     }
526 
527     return 0;
528 }
529 
530 int cli_runOnAll(const Options opts) nothrow {
531     import std.algorithm : sort;
532     import std.stdio : writefln, writeln, stdout;
533 
534     auto shosts = hostsFromEnv;
535 
536     writefln("Configured hosts (%s): %(%s|%)", globalEnvHostKey, shosts).collectException;
537 
538     bool exit_status = true;
539     foreach (a; shosts.sort) {
540         stdout.writefln("Connecting to %s.", a).collectException;
541         try {
542             // #SPC-flush_buffers
543             stdout.flush;
544         }
545         catch (Exception e) {
546         }
547 
548         auto status = executeOnHost(opts, a);
549 
550         if (status != 0) {
551             writeln("Failed, error code: ", status).collectException;
552             exit_status = false;
553         }
554 
555         stdout.writefln("Connection to %s closed.", a).collectException;
556     }
557 
558     return exit_status ? 0 : 1;
559 }
560 
561 struct NonblockingFd {
562     int fileno;
563 
564     private const int old_fcntl;
565 
566     this(int fd) {
567         this.fileno = fd;
568 
569         import core.sys.posix.fcntl : fcntl, F_SETFL, F_GETFL, O_NONBLOCK;
570 
571         old_fcntl = fcntl(fileno, F_GETFL);
572         fcntl(fileno, F_SETFL, old_fcntl | O_NONBLOCK);
573     }
574 
575     ~this() {
576         import core.sys.posix.fcntl : fcntl, F_SETFL;
577 
578         fcntl(fileno, F_SETFL, old_fcntl);
579     }
580 
581     void read(ref ubyte[] buf) {
582         static import core.sys.posix.unistd;
583 
584         auto len = core.sys.posix.unistd.read(fileno, buf.ptr, buf.length);
585         if (len > 0)
586             buf = buf[0 .. len];
587     }
588 }
589 
590 struct Watchdog {
591     import std.datetime.stopwatch : StopWatch;
592 
593     enum State {
594         ok,
595         timeout
596     }
597 
598     private {
599         State st;
600         Duration timeout;
601         NullableRef!PipeReader pread;
602         StopWatch sw;
603     }
604 
605     this(ref PipeReader pread, Duration timeout) {
606         this.pread = &pread;
607         this.timeout = timeout;
608         sw.start;
609     }
610 
611     void update() {
612         import distssh.protocol : HeartBeat;
613 
614         if (!pread.unpack!HeartBeat.isNull) {
615             sw.reset;
616             sw.start;
617         } else if (sw.peek > timeout) {
618             st = State.timeout;
619         }
620     }
621 
622     bool isTimeout() {
623         return State.timeout == st;
624     }
625 
626     static void ping(ref PipeWriter f) {
627         import distssh.protocol : HeartBeat;
628 
629         f.pack!HeartBeat;
630     }
631 }
632 
633 @("shall print the load of the localhost")
634 unittest {
635     string load;
636     auto exit_status = cli_localLoad((string s) => load = s);
637     assert(exit_status == 0);
638     assert(load.length > 0, load);
639 }
640 
641 struct Options {
642     import core.time : dur;
643 
644     enum Mode {
645         shell,
646         cmd,
647         importEnvCmd,
648         install,
649         measureHosts,
650         localLoad,
651         runOnAll,
652         localShell,
653         exportEnv,
654     }
655 
656     Mode mode;
657 
658     bool help;
659     bool noImportEnv;
660     bool cloneEnv;
661     bool verbose;
662     bool stdinMsgPackEnv;
663     std.getopt.GetoptResult help_info;
664 
665     Duration timeout = defaultTimeout_s.dur!"seconds";
666 
667     string selfBinary;
668     string selfDir;
669 
670     string importEnv;
671     string workDir;
672     string[] command;
673 }
674 
675 /** Update a Configs object's file to import the environment from.
676  *
677  * This should only be called after all other command line parsing has been
678  * done. It is because this function take into consideration the priority as
679  * specified in the requirement:
680  * #SPC-configure_env_import_file
681  *
682  * Params:
683  *  opts = config to update the file to import the environment from.
684  */
685 void configImportEnvFile(ref Options opts) nothrow {
686     import std.process : environment;
687 
688     if (opts.noImportEnv) {
689         opts.importEnv = null;
690     } else if (opts.importEnv.length != 0) {
691         // do nothing. the user has specified a file
692     } else {
693         try {
694             opts.importEnv = environment.get(globalEnvFileKey, distsshEnvExport);
695         }
696         catch (Exception e) {
697         }
698     }
699 }
700 
701 /**
702  * #SPC-remote_command_parse
703  *
704  * Params:
705  *  args = the command line arguments to parse.
706  */
707 Options parseUserArgs(string[] args) {
708     import std.algorithm : among;
709     import std.file : thisExePath;
710     import std.path : dirName, baseName, buildPath;
711 
712     Options opts;
713 
714     opts.selfBinary = buildPath(thisExePath.dirName, args[0].baseName);
715     opts.selfDir = opts.selfBinary.dirName;
716 
717     switch (opts.selfBinary.baseName) {
718     case distShell:
719         opts.mode = Options.Mode.shell;
720         return opts;
721     case distCmd:
722         opts.mode = Options.Mode.cmd;
723         opts.command = args.length > 1 ? args[1 .. $] : null;
724         opts.help = args.length > 1 && args[1].among("-h", "--help");
725         configImportEnvFile(opts);
726         return opts;
727     default:
728     }
729 
730     try {
731         bool export_env;
732         string export_env_file;
733         bool remote_shell;
734         bool install;
735         bool measure_hosts;
736         bool local_load;
737         bool local_run;
738         bool local_shell;
739         bool run_on_all;
740         ulong timeout_s;
741 
742         // alphabetical order
743         // dfmt off
744         opts.help_info = std.getopt.getopt(args, std.getopt.config.passThrough,
745             std.getopt.config.keepEndOfOptions,
746             "clone-env", "clone the current environment to the remote host without an intermediate file", &opts.cloneEnv,
747             "export-env", "export the current environment to a file that is used on the remote host", &export_env,
748             "export-env-file", "export the current environment to the specified file", &export_env_file,
749             "install", "install distssh by setting up the correct symlinks", &install,
750             "i|import-env", "import the env from the file (default: " ~ distsshEnvExport ~ ")", &opts.importEnv,
751             "no-import-env", "do not automatically import the environment from " ~ distsshEnvExport, &opts.noImportEnv,
752             "measure", "measure the login time and load of all remote hosts", &measure_hosts,
753             "local-load", "measure the load on the current host", &local_load,
754             "local-run", "import env and run the command locally", &local_run,
755             "local-shell", "run the shell locally", &local_shell,
756             "run-on-all", "run the command on all remote hosts", &run_on_all,
757             "shell", "open an interactive shell on the remote host", &remote_shell,
758             "stdin-msgpack-env", "import env from stdin as a msgpack stream", &opts.stdinMsgPackEnv,
759             "timeout", "timeout to use when checking remote hosts", &timeout_s,
760             "v|verbose", "verbose logging", &opts.verbose,
761             "workdir", "working directory to run the command in", &opts.workDir,
762             );
763         // dfmt on
764         opts.help = opts.help_info.helpWanted;
765 
766         import core.time : dur;
767 
768         if (timeout_s > 0)
769             opts.timeout = timeout_s.dur!"seconds";
770 
771         if (install)
772             opts.mode = Options.Mode.install;
773         else if (export_env) {
774             opts.mode = Options.Mode.exportEnv;
775             if (export_env_file.length != 0)
776                 opts.importEnv = export_env_file;
777         } else if (remote_shell)
778             opts.mode = Options.Mode.shell;
779         else if (measure_hosts)
780             opts.mode = Options.Mode.measureHosts;
781         else if (local_load)
782             opts.mode = Options.Mode.localLoad;
783         else if (local_run)
784             opts.mode = Options.Mode.importEnvCmd;
785         else if (run_on_all)
786             opts.mode = Options.Mode.runOnAll;
787         else if (local_shell)
788             opts.mode = Options.Mode.localShell;
789         else
790             opts.mode = Options.Mode.cmd;
791     }
792     catch (std.getopt.GetOptException e) {
793         // unknown option
794         opts.help = true;
795         logger.error(e.msg).collectException;
796     }
797     catch (Exception e) {
798         opts.help = true;
799         logger.error(e.msg).collectException;
800     }
801 
802     if (args.length > 1) {
803         import std.algorithm : find;
804         import std.range : drop;
805         import std.array : array;
806 
807         opts.command = args.find("--").drop(1).array();
808     }
809 
810     if (opts.mode == Options.Mode.cmd && opts.command.length == 0)
811         opts.help = true;
812 
813     configImportEnvFile(opts);
814 
815     return opts;
816 }
817 
818 @("shall determine the absolute path of self")
819 unittest {
820     import std.path;
821     import std.file;
822 
823     auto opts = parseUserArgs(["distssh", "ls"]);
824     assert(opts.selfBinary[0] == '/');
825     assert(opts.selfBinary.baseName == "distssh");
826 
827     opts = parseUserArgs(["distshell"]);
828     assert(opts.selfBinary[0] == '/');
829     assert(opts.selfBinary.baseName == "distshell");
830 
831     opts = parseUserArgs(["distcmd"]);
832     assert(opts.selfBinary[0] == '/');
833     assert(opts.selfBinary.baseName == "distcmd");
834 
835     opts = parseUserArgs(["distcmd_recv", getcwd, distsshEnvExport]);
836     assert(opts.selfBinary[0] == '/');
837     assert(opts.selfBinary.baseName == "distcmd_recv");
838 }
839 
840 @("shall either return the default timeout or the user specified timeout")
841 unittest {
842     import core.time : dur;
843     import std.conv;
844 
845     auto opts = parseUserArgs(["distssh", "ls"]);
846     assert(opts.timeout == defaultTimeout_s.dur!"seconds");
847     opts = parseUserArgs(["distssh", "--timeout", "10", "ls"]);
848     assert(opts.timeout == 10.dur!"seconds");
849 
850     opts = parseUserArgs(["distshell"]);
851     assert(opts.timeout == defaultTimeout_s.dur!"seconds", opts.timeout.to!string);
852     opts = parseUserArgs(["distshell", "--timeout", "10"]);
853     assert(opts.timeout == defaultTimeout_s.dur!"seconds");
854 }
855 
856 @("shall only be the default timeout because --timeout should be passed on to the command")
857 unittest {
858     import core.time : dur;
859     import std.conv;
860 
861     auto opts = parseUserArgs(["distcmd", "ls"]);
862     assert(opts.timeout == defaultTimeout_s.dur!"seconds");
863 
864     opts = parseUserArgs(["distcmd", "--timeout", "10"]);
865     assert(opts.timeout == defaultTimeout_s.dur!"seconds");
866 }
867 
868 void printHelp(const Options opts) nothrow {
869     import std.getopt : defaultGetoptPrinter;
870     import std.format : format;
871     import std.path : baseName;
872 
873     auto help_txt = "usage: %s [options] -- [COMMAND]\n";
874     if (opts.selfBinary.baseName == distCmd) {
875         help_txt = "usage: %s [COMMAND]\n";
876     }
877 
878     try {
879         defaultGetoptPrinter(format(help_txt, opts.selfBinary.baseName), opts.help_info.options.dup);
880     }
881     catch (Exception e) {
882         logger.error(e.msg).collectException;
883     }
884 }
885 
886 struct Host {
887     string payload;
888     alias payload this;
889 }
890 
891 /// The load of a host.
892 struct Load {
893     double loadAvg;
894     Duration accessTime;
895 
896     bool opEquals(const this o) nothrow @safe pure @nogc {
897         return loadAvg == o.loadAvg && accessTime == o.accessTime;
898     }
899 
900     int opCmp(const this o) pure @safe @nogc nothrow {
901         if (loadAvg < o.loadAvg)
902             return  - 1;
903         else if (loadAvg > o.loadAvg)
904             return 1;
905 
906         if (accessTime < o.accessTime)
907             return  - 1;
908         else if (accessTime > o.accessTime)
909             return 1;
910 
911         return this == o ? 0 : 1;
912     }
913 }
914 
915 @("shall sort the loads")
916 unittest {
917     import std.algorithm : sort;
918     import std.array : array;
919     import core.time : dur;
920 
921     {
922         auto raw = [Load(0.6, 500.dur!"msecs"), Load(0.5, 500.dur!"msecs")].sort.array;
923         assert(raw[0].loadAvg == 0.5);
924     }
925 
926     {
927         auto raw = [Load(0.5, 600.dur!"msecs"), Load(0.5, 500.dur!"msecs")].sort.array;
928         assert(raw[0].accessTime == 500.dur!"msecs");
929     }
930 }
931 
932 /** Login on host and measure its load.
933  *
934  * Params:
935  *  h = remote host to check
936  *
937  * Returns: the Load of the remote host
938  */
939 Load getLoad(Host h, Duration timeout) nothrow {
940     import std.conv : to;
941     import std.file : thisExePath;
942     import std.process : tryWait, pipeProcess, kill, wait;
943     import std.range : takeOne, retro;
944     import std.stdio : writeln;
945     import core.time : MonoTime, dur;
946     import core.sys.posix.signal : SIGKILL;
947 
948     enum ExitCode {
949         error,
950         timeout,
951         ok,
952     }
953 
954     ExitCode exit_code;
955     const start_at = MonoTime.currTime;
956     const stop_at = start_at + timeout;
957 
958     auto elapsed = 3600.dur!"seconds";
959     double load = int.max;
960 
961     try {
962         immutable abs_distssh = thisExePath;
963         auto res = pipeProcess(["ssh", "-q", "-oStrictHostKeyChecking=no", h,
964                 abs_distssh, "--local-load"]);
965 
966         while (true) {
967             auto st = res.pid.tryWait;
968 
969             if (st.terminated && st.status == 0) {
970                 exit_code = ExitCode.ok;
971                 break;
972             } else if (st.terminated && st.status != 0) {
973                 exit_code = ExitCode.error;
974                 break;
975             } else if (stop_at < MonoTime.currTime) {
976                 exit_code = ExitCode.timeout;
977                 res.pid.kill(SIGKILL);
978                 break;
979             }
980         }
981 
982         // must read the exit or a zombie process is left behind
983         res.pid.wait;
984 
985         elapsed = MonoTime.currTime - start_at;
986 
987         if (exit_code != ExitCode.ok)
988             return Load(load, elapsed);
989 
990         try {
991             string last_line;
992             foreach (a; res.stdout.byLineCopy) {
993                 last_line = a;
994             }
995 
996             load = last_line.to!double;
997         }
998         catch (Exception e) {
999             logger.trace(res.stdout).collectException;
1000             logger.trace(res.stderr).collectException;
1001             logger.trace(e.msg).collectException;
1002         }
1003     }
1004     catch (Exception e) {
1005         logger.trace(e.msg).collectException;
1006     }
1007 
1008     return Load(load, elapsed);
1009 }
1010 
1011 /// Mirror of an environment.
1012 struct Env {
1013     string[string] payload;
1014     alias payload this;
1015 }
1016 
1017 /**
1018  * #SPC-env_export_filter
1019  *
1020  * Params:
1021  *  env = a null terminated array of C strings.
1022  *
1023  * Returns: a clone of the environment.
1024  */
1025 auto cloneEnv() nothrow {
1026     import std.process : environment;
1027     import std..string : strip;
1028     import distssh.protocol : ProtocolEnv, EnvVariable;
1029 
1030     ProtocolEnv app;
1031 
1032     try {
1033         auto env = environment.toAA;
1034 
1035         foreach (k; environment.get(globalEnvFilterKey, null)
1036                 .strip.splitter(';').map!(a => a.strip).filter!(a => a.length != 0)) {
1037             if (env.remove(k)) {
1038                 logger.infof("Removed '%s' from the exported environment", k);
1039             }
1040         }
1041 
1042         foreach (const a; env.byKeyValue) {
1043             app ~= EnvVariable(a.key, a.value);
1044         }
1045     }
1046     catch (Exception e) {
1047         logger.warning(e.msg).collectException;
1048     }
1049 
1050     return app;
1051 }
1052 
1053 import core.sys.posix.unistd : pid_t;
1054 
1055 struct Pid {
1056     pid_t value;
1057     alias value this;
1058 }
1059 
1060 Pid[] getShallowChildren(int parent_pid) {
1061     import core.sys.posix.unistd : pid_t;
1062     import std.conv : to;
1063     import std.array : appender;
1064     import std.file : dirEntries, SpanMode, exists;
1065     import std.path : buildPath, baseName;
1066 
1067     auto children = appender!(Pid[])();
1068 
1069     foreach (const p; File(buildPath("/proc", parent_pid.to!string, "task",
1070             parent_pid.to!string, "children")).readln.splitter(" ").filter!(a => a.length > 0)) {
1071         try {
1072             children.put(p.to!pid_t.Pid);
1073         }
1074         catch (Exception e) {
1075             logger.trace(e.msg).collectException;
1076         }
1077     }
1078 
1079     return children.data;
1080 }
1081 
1082 @("shall return the immediate children of the init process")
1083 unittest {
1084     auto res = getShallowChildren(1);
1085     //logger.trace(res);
1086     assert(res.length > 0);
1087 }
1088 
1089 /** Returns: a list of all processes with the leafs being at the back
1090  */
1091 Pid[] getDeepChildren(int parent_pid) {
1092     import std.array : appender;
1093     import std.container : DList;
1094 
1095     auto children = DList!Pid();
1096 
1097     children.insert(getShallowChildren(parent_pid));
1098     auto res = appender!(Pid[])();
1099 
1100     while (!children.empty) {
1101         const p = children.front;
1102         res.put(p);
1103         children.insertBack(getShallowChildren(p));
1104         children.removeFront;
1105     }
1106 
1107     return res.data;
1108 }
1109 
1110 @("shall return a list of all children of the init process")
1111 unittest {
1112     auto direct = getShallowChildren(1);
1113     auto deep = getDeepChildren(1);
1114 
1115     //logger.trace(direct);
1116     //logger.trace(deep);
1117 
1118     assert(deep.length > direct.length);
1119 }
1120 
1121 struct PidGroup {
1122     pid_t value;
1123     alias value this;
1124 }
1125 
1126 /** Returns: a list of the process groups in the same order as the input.
1127  */
1128 PidGroup[] getPidGroups(const Pid[] pids) {
1129     import core.sys.posix.unistd : getpgid;
1130     import std.array : array, appender;
1131 
1132     auto res = appender!(PidGroup[])();
1133     bool[pid_t] pgroups;
1134 
1135     foreach (const p; pids.map!(a => getpgid(a)).filter!(a => a != -1)) {
1136         if (p !in pgroups) {
1137             pgroups[p] = true;
1138             res.put(PidGroup(p));
1139         }
1140     }
1141 
1142     return res.data;
1143 }
1144 
1145 /** Cleanup the children processes.
1146  *
1147  * Create an array of children process groups.
1148  * Create an array of children pids.
1149  * Kill the children pids to prohibit them from spawning more processes.
1150  * Kill the groups from top to bottom.
1151  *
1152  * Params:
1153  *  parent_pid = pid of the parent to analyze and kill the children of
1154  *  kill = a function that kill a process
1155  *  killpg = a function that kill a process group
1156  *
1157  * Returns: the process group of "this".
1158  */
1159 pid_t cleanupProcess(KillT, KillPgT)(Pid parent_pid, KillT kill, KillPgT killpg) nothrow {
1160     import core.sys.posix.unistd : getpgrp, getpid, getppid;
1161 
1162     const this_pid = getpid();
1163     const this_gid = getpgrp();
1164 
1165     try {
1166         auto children_groups = getDeepChildren(parent_pid).getPidGroups;
1167         auto direct_children = getShallowChildren(parent_pid);
1168 
1169         foreach (const p; direct_children.filter!(a => a != this_pid)) {
1170             kill(p);
1171         }
1172 
1173         foreach (const p; children_groups.filter!(a => a != this_gid)) {
1174             killpg(p);
1175         }
1176     }
1177     catch (Exception e) {
1178         logger.trace(e.msg).collectException;
1179     }
1180 
1181     return this_gid;
1182 }
1183 
1184 @("shall return a list of all children pid groups of the init process")
1185 unittest {
1186     import std.conv;
1187     import core.sys.posix.unistd : getpgrp, getpid, getppid;
1188 
1189     auto direct = getShallowChildren(1);
1190     auto deep = getDeepChildren(1);
1191     auto groups = getPidGroups(deep);
1192 
1193     //logger.trace(direct.length, direct);
1194     //logger.trace(deep.length, deep);
1195     //logger.trace(groups.length, groups);
1196 
1197     assert(groups.length < deep.length);
1198 }
1199 
1200 struct PipeReader {
1201     import distssh.protocol : Deserialize;
1202 
1203     NonblockingFd nfd;
1204     Deserialize deser;
1205 
1206     alias deser this;
1207 
1208     this(int fd) {
1209         this.nfd = NonblockingFd(fd);
1210     }
1211 
1212     // Update the buffer with data from the pipe.
1213     void update() nothrow {
1214         ubyte[128] buf;
1215         ubyte[] s = buf[];
1216 
1217         try {
1218             nfd.read(s);
1219             if (s.length > 0)
1220                 deser.put(s);
1221         }
1222         catch (Exception e) {
1223         }
1224 
1225         deser.cleanupUntilKind;
1226     }
1227 }
1228 
1229 struct PipeWriter {
1230     import distssh.protocol : Serialize;
1231 
1232     File fout;
1233     Serialize!(void delegate(const(ubyte)[]) @safe) ser;
1234 
1235     alias ser this;
1236 
1237     this(File f) {
1238         this.fout = f;
1239         this.ser = typeof(ser)(&this.put);
1240     }
1241 
1242     void put(const(ubyte)[] v) @safe {
1243         fout.rawWrite(v);
1244         fout.flush;
1245     }
1246 }
1247 
1248 auto readEnv(string filename) nothrow {
1249     import distssh.protocol : ProtocolEnv, EnvVariable, Deserialize;
1250     import std.file : exists;
1251     import std.stdio : File;
1252 
1253     ProtocolEnv rval;
1254 
1255     if (!exists(filename)) {
1256         logger.info("File to import the environment from do not exist: ",
1257                 filename).collectException;
1258         return rval;
1259     }
1260 
1261     try {
1262         auto fin = File(filename);
1263         Deserialize deser;
1264 
1265         ubyte[128] buf;
1266         while (!fin.eof) {
1267             auto read_ = fin.rawRead(buf[]);
1268             deser.put(read_);
1269         }
1270 
1271         rval = deser.unpack!(ProtocolEnv);
1272     }
1273     catch (Exception e) {
1274         logger.error(e.msg).collectException;
1275         logger.errorf("Unable to import environment from '%s'", filename).collectException;
1276     }
1277 
1278     return rval;
1279 }
1280 
1281 void writeEnv(string filename, from!"distssh.protocol".ProtocolEnv env) {
1282     import core.sys.posix.sys.stat : fchmod, S_IRUSR, S_IWUSR;
1283     import std.stdio : File;
1284     import distssh.protocol : Serialize;
1285 
1286     auto fout = File(filename, "w");
1287     fchmod(fout.fileno, S_IRUSR | S_IWUSR);
1288 
1289     auto ser = Serialize!(void delegate(const(ubyte)[]) @safe)((const(ubyte)[] a) => fout.rawWrite(
1290             a));
1291 
1292     ser.pack(env);
1293 }
1294 
1295 Host[] hostsFromEnv() nothrow {
1296     import std.array : array;
1297     import std..string : strip;
1298     import std.process : environment;
1299 
1300     static import core.stdc.stdlib;
1301 
1302     typeof(return) rval;
1303 
1304     try {
1305         string hosts_env = environment.get(globalEnvHostKey, "").strip;
1306         rval = hosts_env.splitter(";").map!(a => a.strip)
1307             .filter!(a => a.length > 0).map!(a => Host(a)).array;
1308 
1309         if (rval.length == 0) {
1310             logger.errorf("No remote host configured (%s='%s')", globalEnvHostKey, hosts_env);
1311         }
1312     }
1313     catch (Exception e) {
1314         logger.error(e.msg).collectException;
1315     }
1316 
1317     return rval;
1318 }
1319 
1320 /**
1321  * #SPC-load_balance
1322  * #SPC-best_remote_host
1323  */
1324 struct RemoteHostCache {
1325     import std.array : array;
1326     import std.typecons : Tuple, tuple;
1327 
1328     alias HostLoad = Tuple!(Host, Load);
1329 
1330     Duration timeout;
1331     Host[] remoteHosts;
1332     HostLoad[] remoteByLoad;
1333 
1334     static auto make(Duration timeout) nothrow {
1335         return RemoteHostCache(timeout, hostsFromEnv);
1336     }
1337 
1338     /// Measure and sort the remote hosts.
1339     void sortByLoad() nothrow {
1340         import std.algorithm : sort;
1341         import std.parallelism : TaskPool;
1342 
1343         static auto loadHost(T)(T host_timeout) nothrow {
1344             return HostLoad(host_timeout[0], getLoad(host_timeout[0], host_timeout[1]));
1345         }
1346 
1347         auto shosts = remoteHosts.map!(a => tuple(a, timeout)).array;
1348 
1349         try {
1350             auto pool = new TaskPool(shosts.length + 1);
1351             scope (exit)
1352                 pool.stop;
1353 
1354             // dfmt off
1355             remoteByLoad =
1356                 pool.map!loadHost(shosts)
1357                 .array
1358                 .sort!((a,b) => a[1] < b[1])
1359                 .array;
1360             // dfmt on
1361 
1362             if (remoteByLoad.length == 0) {
1363                 // this should be very uncommon but may happen.
1364                 remoteByLoad = remoteHosts.map!(a => HostLoad(a, Load.init)).array;
1365             }
1366         }
1367         catch (Exception e) {
1368             logger.trace(e.msg).collectException;
1369         }
1370     }
1371 
1372     /// Returns: the lowest loaded server.
1373     Nullable!Host randomAndPop() @safe nothrow {
1374         import std.range : take;
1375         import std.random : uniform;
1376 
1377         typeof(return) rval;
1378 
1379         if (empty)
1380             return rval;
1381 
1382         try {
1383             auto top3 = remoteByLoad.take(3).array;
1384             auto ridx = uniform(0, top3.length);
1385             rval = top3[ridx][0];
1386         }
1387         catch (Exception e) {
1388             rval = remoteByLoad[0][0];
1389         }
1390 
1391         remoteByLoad = remoteByLoad.filter!(a => a[0] != rval).array;
1392 
1393         return rval;
1394     }
1395 
1396     Host front() @safe pure nothrow {
1397         assert(!empty);
1398         return remoteByLoad[0][0];
1399     }
1400 
1401     void popFront() @safe pure nothrow {
1402         assert(!empty);
1403         remoteByLoad = remoteByLoad[1 .. $];
1404     }
1405 
1406     bool empty() @safe pure nothrow {
1407         return remoteByLoad.length == 0;
1408     }
1409 }