1 /**
2 Copyright: Copyright (c) 2018, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Methods prefixed with `cli_` are strongly related to user commands.
7 They more or less fully implement a command line interface command.
8 */
9 module distssh.main_;
10 
11 import core.time : Duration;
12 import logger = std.experimental.logger;
13 import std.algorithm : splitter, map, filter, joiner;
14 import std.array : empty;
15 import std.exception : collectException;
16 import std.stdio : File;
17 import std.typecons : Nullable, NullableRef;
18 
19 import colorlog;
20 
21 import my.from_;
22 import my.path;
23 
24 import distssh.config;
25 import distssh.metric;
26 import distssh.types;
27 import distssh.utility;
28 
29 static import std.getopt;
30 
31 import unit_threaded.attrs : Serial;
32 
33 version (unittest) {
34     import unit_threaded.assertions;
35 }
36 
37 int rmain(string[] args) {
38     import distssh.daemon;
39     static import distssh.purge;
40 
41     confLogger(VerboseMode.info);
42 
43     auto conf = parseUserArgs(args);
44 
45     if (conf.global.helpInfo.helpWanted) {
46         return cli(conf);
47     }
48 
49     confLogger(conf.global.verbosity);
50     logger.trace(conf);
51 
52     import core.stdc.signal : signal;
53     import std.file : symlink;
54     import std.stdio : writeln;
55     import std.variant : visit;
56 
57     // register a handler for writing to a closed pipe in case it ever happens.
58     // the handler ignores it.
59     signal(13, &handleSIGPIPE);
60 
61     // dfmt off
62     return conf.data.visit!(
63           (Config.Help a) => cli(conf),
64           (Config.Shell a) => cli(conf, a),
65           (Config.Cmd a) => cli(conf, a),
66           (Config.LocalRun a) => cli(conf, a),
67           (Config.Install a) => cli(conf, a, (string src, string dst) => symlink(src, dst)),
68           (Config.MeasureHosts a) => cli(conf, a),
69           (Config.LocalLoad a) => cli(a, (string s) => writeln(s)),
70           (Config.RunOnAll a) => cli(conf, a),
71           (Config.LocalShell a) => cli(conf, a),
72           (Config.Env a) => cli(conf, a),
73           (Config.Daemon a) => distssh.daemon.cli(conf, a),
74           (Config.Purge a) => distssh.purge.cli(conf, a),
75           (Config.LocalPurge a) => distssh.purge.cli(conf, a),
76     );
77     // dfmt on
78 }
79 
80 private:
81 
82 /// dummy used to ignore SIGPIPE
83 extern (C) void handleSIGPIPE(int sig) nothrow @nogc @system {
84 }
85 
86 int cli(Config conf) {
87     conf.printHelp;
88     return 0;
89 }
90 
91 int cli(const Config fconf, Config.Shell conf) {
92     import std.datetime.stopwatch : StopWatch, AutoStart;
93     import std.process : spawnProcess, wait;
94     import std.stdio : writeln, writefln;
95     import distssh.connection : sshShellArgs;
96 
97     auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster).bestSelectRange;
98 
99     if (hosts.empty) {
100         logger.errorf("No remote host online").collectException;
101     }
102 
103     auto bgBeat = BackgroundClientBeat(AbsolutePath(fconf.global.dbPath));
104 
105     const timout_until_considered_successfull_connection = fconf.global.timeout * 2;
106 
107     foreach (host; hosts) {
108         try {
109             writeln("Connecting to ", host);
110 
111             auto sw = StopWatch(AutoStart.yes);
112 
113             auto exit_status = spawnProcess(sshShellArgs(host, fconf.global.workDir.Path).toArgs)
114                 .wait;
115 
116             // #SPC-fallback_remote_host
117             if (exit_status == 0 || sw.peek > timout_until_considered_successfull_connection) {
118                 writefln("Connection to %s closed.", host);
119                 return exit_status;
120             } else {
121                 logger.warningf("Connection failed to %s. Falling back on next available host",
122                         host);
123             }
124         } catch (Exception e) {
125             logger.error(e.msg).collectException;
126         }
127     }
128 
129     logger.error("No remote host online").collectException;
130     return 1;
131 }
132 
133 // #SPC-fast_env_startup
134 int cli(const Config fconf, Config.Cmd conf) {
135     import std.stdio : stdout;
136 
137     if (fconf.global.command.empty) {
138         logger.error("No command specified");
139         logger.error("Specify by adding -- <my command>");
140         return 1;
141     }
142 
143     auto bgBeat = BackgroundClientBeat(AbsolutePath(fconf.global.dbPath));
144 
145     foreach (host; RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster).bestSelectRange) {
146         logger.info("Connecting to ", host);
147         // to ensure connecting to is at the top of e.g. logfiles when the user run
148         // distcmd env > log.txt
149         stdout.flush;
150         return executeOnHost(ExecuteOnHostConf(fconf.global.workDir, fconf.global.command.dup,
151                 fconf.global.importEnv, fconf.global.cloneEnv, fconf.global.noImportEnv), host);
152     }
153 
154     logger.error("No remote host online from the available ",
155             fconf.global.cluster).collectException;
156     return 1;
157 }
158 
159 // #SPC-fast_env_startup
160 int cli(const Config fconf, Config.LocalRun conf) {
161     import core.time : dur;
162     import std.datetime : Clock;
163     import std.file : exists;
164     import std.process : PConfig = Config, Redirect, userShell, thisProcessID;
165     import std.stdio : File, stdin, stdout;
166     import std.utf : toUTF8;
167     import proc;
168     import sumtype;
169     import my.timer : makeTimers, makeInterval;
170     import distssh.protocol;
171 
172     static struct LocalRunConf {
173         import core.sys.posix.termios;
174 
175         string[string] env;
176         string[] cmd;
177         string workdir;
178         termios mode;
179     }
180 
181     static string[string] readEnvFromStdin(ProtocolEnv src) {
182         string[string] rval;
183         foreach (kv; src) {
184             rval[kv.key] = kv.value;
185         }
186         return rval;
187     }
188 
189     static string[string] readEnvFromFile(const Config fconf) {
190         string[string] rval;
191         foreach (kv; readEnv(fconf.global.importEnv)) {
192             rval[kv.key] = kv.value;
193         }
194         return rval;
195     }
196 
197     try {
198         auto pread = PipeReader(stdin.fileno);
199 
200         auto localConf = () {
201             LocalRunConf conf;
202             conf.cmd = fconf.global.command.dup;
203             conf.workdir = fconf.global.workDir;
204 
205             // guard against an error occuring when transfering the data such
206             // that ConfDone is trashed.
207             const timeout = Clock.currTime + 5.dur!"minutes";
208 
209             bool running = fconf.global.stdinMsgPackEnv;
210             while (running && Clock.currTime < timeout) {
211                 pread.update;
212                 pread.unpack().match!((None x) {}, (ConfDone x) {
213                     running = false;
214                 }, (ProtocolEnv x) { conf.env = readEnvFromStdin(x); }, (HeartBeat x) {
215                 }, (Command x) { conf.cmd = x.value; }, (Workdir x) {
216                     conf.workdir = x.value;
217                 }, (Key x) {}, (TerminalCapability x) { conf.mode = x.value; });
218             }
219 
220             if (!fconf.global.stdinMsgPackEnv) {
221                 conf.env = readEnvFromFile(fconf);
222             }
223 
224             return conf;
225         }();
226 
227         auto res = () {
228             if (conf.useFakeTerminal) {
229                 import core.sys.posix.termios : tcsetattr, TCSAFLUSH;
230 
231                 auto p = ttyProcess([userShell] ~ shellSwitch(userShell) ~ [
232                         localConf.cmd.joiner(" ").toUTF8
233                         ], localConf.env, PConfig.none, localConf.workdir).sandbox.scopeKill;
234                 tcsetattr(p.stdin.file.fileno, TCSAFLUSH, &localConf.mode);
235                 return p;
236             }
237             return pipeShell(localConf.cmd.joiner(" ").toUTF8,
238                     Redirect.stdin | Redirect.stdout | Redirect.stderrToStdout,
239                     localConf.env, PConfig.none, localConf.workdir).sandbox.scopeKill;
240         }();
241 
242         import core.sys.posix.unistd : getppid;
243 
244         const parent_pid = getppid;
245         bool loop_running = true;
246 
247         auto timers = makeTimers;
248         makeInterval(timers, () {
249             // detect ctrl+c on the client side
250             if (getppid != parent_pid) {
251                 loop_running = false;
252             }
253             return 500.dur!"msecs";
254         }, 50.dur!"msecs");
255 
256         ubyte[4096] buf;
257         bool pipeOutputToUser() @trusted {
258             // guard against an error occuring when writing to the users
259             // stdout.
260             const timeout = Clock.currTime + 5.dur!"minutes";
261 
262             bool hasWritten;
263             while (res.stdout.hasPendingData && Clock.currTime < timeout) {
264                 auto r = res.stdout.read(buf[]);
265                 stdout.rawWrite(r);
266                 hasWritten = true;
267             }
268 
269             if (hasWritten) {
270                 stdout.flush;
271             }
272             return hasWritten;
273         }
274 
275         scope (exit)
276             pipeOutputToUser;
277 
278         makeInterval(timers, () @safe {
279             if (pipeOutputToUser) {
280                 return 10.dur!"msecs";
281             }
282             // slower if not much is happening
283             return 100.dur!"msecs";
284         }, 25.dur!"msecs");
285 
286         // a dummy event that ensure that it tick each 50 msec.
287         makeInterval(timers, () => 50.dur!"msecs", 50.dur!"msecs");
288 
289         int exit_status = 1;
290 
291         auto wd = HeartBeatMonitor(fconf.global.timeout * 2);
292 
293         while (!wd.isTimeout && loop_running) {
294             pread.update;
295 
296             try {
297                 if (res.tryWait) {
298                     exit_status = res.status;
299                     loop_running = false;
300                 }
301 
302                 pread.unpack().match!((None x) {}, (ConfDone x) {}, (ProtocolEnv x) {
303                 }, (HeartBeat x) { wd.beat; }, (Command x) {}, (Workdir x) {}, (Key x) {
304                     auto data = x.value;
305                     while (!data.empty) {
306                         auto written = res.stdin.write(x.value);
307                         if (written.empty) {
308                             // closed so break.
309                             break;
310                         }
311                         data = data[written.length .. $];
312                     }
313                 }, (TerminalCapability x) {});
314             } catch (Exception e) {
315             }
316 
317             timers.tick(25.dur!"msecs");
318         }
319 
320         return exit_status;
321     } catch (Exception e) {
322         () @trusted { logger.trace(e).collectException; }();
323         logger.error(e.msg).collectException;
324     }
325 
326     return 1;
327 }
328 
329 int cli(const Config fconf, Config.Install conf, void delegate(string src, string dst) symlink) nothrow {
330     import std.path : buildPath;
331     import std.file : exists, remove;
332 
333     void replace(string src, string dst) {
334         if (exists(dst))
335             remove(dst);
336         symlink(src, dst);
337     }
338 
339     try {
340         replace(fconf.global.selfBinary, buildPath(fconf.global.selfDir, distShell));
341         replace(fconf.global.selfBinary, buildPath(fconf.global.selfDir, distCmd));
342         return 0;
343     } catch (Exception e) {
344         logger.error(e.msg).collectException;
345     }
346 
347     return 1;
348 }
349 
350 // #SPC-measure_remote_hosts
351 int cli(const Config fconf, Config.MeasureHosts conf) nothrow {
352     import std.algorithm : sort;
353     import std.conv : to;
354     import std.stdio : writefln, writeln;
355     import distssh.table;
356     import distssh.connection;
357 
358     writeln("Host is overloaded if Load is >1").collectException;
359 
360     auto tbl = Table!5(["Host", "Load", "Access Time", "Updated", "Multiplex"]);
361     void addRow(HostLoad a) nothrow {
362         static string toInternal(Duration d) {
363             import std.format : format;
364 
365             int seconds;
366             short msecs;
367             d.split!("seconds", "msecs")(seconds, msecs);
368             if (seconds == 0)
369                 return format("%sms", msecs);
370             else
371                 return format("%ss %sms", seconds, msecs);
372         }
373 
374         string[5] row;
375         try {
376             row[0] = a.host;
377             row[1] = a.load.loadAvg.to!string;
378             row[2] = toInternal(a.load.accessTime);
379             row[3] = a.updated.toLocalTime.to!string;
380             row[4] = makeMaster(a.host).isAlive ? "yes" : "no";
381             tbl.put(row);
382         } catch (Exception e) {
383             logger.trace(e.msg).collectException;
384         }
385     }
386 
387     auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster);
388 
389     foreach (a; hosts.onlineRange.sort!((a, b) => a.load < b.load)) {
390         addRow(a);
391     }
392 
393     auto unused = hosts.unusedRange;
394     if (!unused.empty) {
395         tbl.put(["-", "-", "-", "-", "-"]).collectException;
396         foreach (a; unused) {
397             addRow(a);
398         }
399     }
400 
401     try {
402         writeln(tbl);
403     } catch (Exception e) {
404         logger.error(e.msg).collectException;
405     }
406 
407     return 0;
408 }
409 
410 /** Print the load of localhost.
411  *
412  * #SPC-measure_local_load
413  */
414 int cli(WriterT)(Config.LocalLoad conf, scope WriterT writer) {
415     import std.ascii : newline;
416     import std.conv : to;
417     import std.parallelism : totalCPUs;
418     import distssh.libc : getloadavg;
419 
420     try {
421         double[3] loadavg;
422         int samples = getloadavg(&loadavg[0], 3);
423 
424         if (samples == -1 || samples == 0)
425             loadavg[0] = totalCPUs > 0 ? totalCPUs : 1;
426 
427         double cores = totalCPUs;
428 
429         // make sure the loadavg is on a new line because the last line parsed is expected to contain the loadavg.
430         writer(newline);
431 
432         if (cores > 0)
433             writer((loadavg[0] / cores).to!string);
434         else
435             writer(loadavg[0].to!string);
436     } catch (Exception e) {
437         logger.trace(e.msg).collectException;
438         return -1;
439     }
440 
441     return 0;
442 }
443 
444 int cli(const Config fconf, Config.RunOnAll conf) nothrow {
445     import std.algorithm : sort;
446     import std.stdio : writefln, writeln, stdout;
447 
448     auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster);
449 
450     writefln("Hosts (%s): %(%s|%)", globalEnvHostKey, hosts.allRange.map!(a => a.host))
451         .collectException;
452 
453     bool exit_status = true;
454     foreach (a; hosts.allRange
455             .filter!(a => !a.load.unknown)
456             .map!(a => a.host)) {
457         stdout.writefln("Connecting to %s", a).collectException;
458         try {
459             // #SPC-flush_buffers
460             stdout.flush;
461         } catch (Exception e) {
462         }
463 
464         auto status = executeOnHost(ExecuteOnHostConf(fconf.global.workDir,
465                 fconf.global.command.dup, fconf.global.importEnv,
466                 fconf.global.cloneEnv, fconf.global.noImportEnv), a);
467 
468         if (status != 0) {
469             writeln("Failed, error code: ", status).collectException;
470             exit_status = false;
471         }
472 
473         stdout.writefln("Connection to %s closed.", a).collectException;
474     }
475 
476     return exit_status ? 0 : 1;
477 }
478 
479 // #SPC-shell_current_dir
480 int cli(const Config fconf, Config.LocalShell conf) {
481     import std.file : exists;
482     import std.process : spawnProcess, wait, userShell, Config, Pid;
483 
484     try {
485         auto pid = () {
486             if (exists(fconf.global.workDir))
487                 return spawnProcess([userShell], null, Config.none, fconf.global.workDir);
488             return spawnProcess([userShell]);
489         }();
490         return pid.wait;
491     } catch (Exception e) {
492         logger.error(e.msg).collectException;
493         return 1;
494     }
495 }
496 
497 // #SPC-modify_env
498 int cli(const Config fconf, Config.Env conf) {
499     import std.algorithm : map, filter;
500     import std.array : assocArray, array;
501     import std.path : absolutePath;
502     import std.stdio : writeln, writefln;
503     import std..string : split;
504     import std.typecons : tuple;
505     import distssh.protocol : ProtocolEnv, EnvVariable;
506 
507     if (conf.exportEnv) {
508         try {
509             writeEnv(fconf.global.importEnv, cloneEnv);
510             logger.info("Exported environment to ", fconf.global.importEnv);
511         } catch (Exception e) {
512             logger.error(e.msg).collectException;
513             return 1;
514         }
515 
516         return 0;
517     }
518 
519     string[string] set_envs;
520     try {
521         set_envs = conf.envSet
522             .map!(a => a.split('='))
523             .filter!(a => !a.empty)
524             .map!(a => tuple(a[0], a[1]))
525             .assocArray;
526     } catch (Exception e) {
527         writeln("Unable to parse supplied envs to modify: ", e.msg).collectException;
528         return 1;
529     }
530 
531     try {
532         auto env = readEnv(fconf.global.importEnv.absolutePath).map!(a => tuple(a.key,
533                 a.value)).assocArray;
534 
535         foreach (k; conf.envDel.filter!(a => a in env)) {
536             writeln("Removing ", k);
537             env.remove(k);
538         }
539 
540         foreach (kv; set_envs.byKeyValue) {
541             if (kv.key in env)
542                 writefln("Setting %s=%s", kv.key, kv.value);
543             else
544                 writefln("Adding %s=%s", kv.key, kv.value);
545             env[kv.key] = kv.value;
546         }
547 
548         if (conf.print) {
549             foreach (const kv; env.byKeyValue)
550                 writefln(`%s="%s"`, kv.key, kv.value);
551         }
552 
553         writeEnv(fconf.global.importEnv,
554                 ProtocolEnv(env.byKeyValue.map!(a => EnvVariable(a.key, a.value)).array));
555     } catch (Exception e) {
556         logger.error(e.msg).collectException;
557         return 1;
558     }
559 
560     return 0;
561 }
562 
563 @("shall export the environment")
564 @Serial unittest {
565     import std.conv : to;
566     import std.file;
567     import std.process : environment;
568     import std.variant : tryVisit;
569 
570     // arrange
571     immutable remove_me = "remove_me.export";
572     scope (exit)
573         remove(remove_me);
574 
575     auto opts = parseUserArgs(["distssh", "env", "-e", "--env-file", remove_me]);
576     auto envConf = opts.data.tryVisit!((Config.Env a) => a, () => Config.Env.init);
577 
578     const env_key = "DISTSSH_ENV_TEST";
579     environment[env_key] = env_key ~ remove_me;
580     scope (exit)
581         environment.remove(env_key);
582 
583     // shall export the environment to the file
584     void verify1() {
585         // test normal export
586         cli(opts, envConf);
587         auto env = readEnv(remove_me);
588         assert(!env.filter!(a => a.key == env_key).empty, env.to!string);
589     }
590 
591     verify1;
592 
593     // shall filter out specified env before exporting to the file
594     environment[globalEnvFilterKey] = "DISTSSH_ENV_TEST;junk ";
595     scope (exit)
596         environment.remove(globalEnvFilterKey);
597 
598     void verify2() {
599         cli(opts, envConf);
600         auto env = readEnv(remove_me);
601         assert(env.filter!(a => a.key == env_key).empty, env.to!string);
602     }
603 
604     verify2;
605 }
606 
607 @("shall create symlinks to self")
608 @Serial unittest {
609     string[2][] symlinks;
610     void fakeSymlink(string src, string dst) {
611         string[2] v = [src, dst];
612         symlinks ~= v;
613     }
614 
615     Config conf;
616     conf.global.selfBinary = "/foo/src";
617     conf.global.selfDir = "/bar";
618 
619     cli(conf, Config.Install.init, &fakeSymlink);
620 
621     assert(symlinks[0] == ["/foo/src", "/bar/distshell"]);
622     assert(symlinks[1] == ["/foo/src", "/bar/distcmd"]);
623 }
624 
625 @("shall modify the exported env by adding, removing and modifying")
626 @Serial unittest {
627     import std.array;
628     import std.file;
629     import std.process : environment;
630     import std.stdio;
631     import std.typecons : tuple;
632     import std.variant : tryVisit;
633 
634     // arrange
635     immutable remove_me = "remove_me.export";
636     scope (exit)
637         remove(remove_me);
638 
639     environment["FOO_DEL"] = "del me";
640     environment["FOO_MOD"] = "mod me";
641     scope (exit) {
642         environment.remove("FOO_DEL");
643         environment.remove("FOO_MOD");
644         environment.remove("FOO_ADD");
645     }
646 
647     auto conf = parseUserArgs(["distssh", "env", "-e", "--env-file", remove_me]);
648     auto envConf = conf.data.tryVisit!((Config.Env a) => a, () => Config.Env.init);
649     cli(conf, envConf);
650 
651     // act
652     conf = parseUserArgs([
653             "distssh", "env", "-d", "FOO_DEL", "-s", "FOO_MOD=42", "--set",
654             "FOO_ADD=42", "--env-file", remove_me
655             ]);
656     envConf = conf.data.tryVisit!((Config.Env a) => a, () => Config.Env.init);
657     cli(conf, envConf);
658 
659     // assert
660     auto env = readEnv(remove_me).map!(a => tuple(a.key, a.value)).assocArray;
661     assert(env["FOO_MOD"] == "42");
662     assert(env["FOO_ADD"] == "42");
663     assert("FOO_DEL" !in env);
664 }
665 
666 @("shall print the load of the localhost")
667 @Serial unittest {
668     string load;
669     auto exit_status = cli(Config.LocalLoad.init, (string s) => load = s);
670     assert(exit_status == 0);
671     assert(load.length > 0, load);
672 }
673 
674 void writeEnv(string filename, from.distssh.protocol.ProtocolEnv env) {
675     import core.sys.posix.sys.stat : fchmod, S_IRUSR, S_IWUSR;
676     import std.stdio : File;
677     import distssh.protocol : Serialize;
678 
679     auto fout = File(filename, "w");
680     fchmod(fout.fileno, S_IRUSR | S_IWUSR);
681 
682     auto ser = Serialize!(void delegate(const(ubyte)[]) @safe)((const(ubyte)[] a) => fout.rawWrite(
683             a));
684 
685     ser.pack(env);
686 }