1 /**
2 Copyright: Copyright (c) 2018, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Methods prefixed with `cli_` are strongly related to user commands.
7 They more or less fully implement a command line interface command.
8 */
9 module distssh.main_;
10 
11 import core.time : Duration;
12 import logger = std.experimental.logger;
13 import std.algorithm : splitter, map, filter, joiner;
14 import std.array : empty;
15 import std.exception : collectException;
16 import std.stdio : File;
17 import std.typecons : Nullable, NullableRef;
18 
19 import colorlog;
20 
21 import my.from_;
22 import my.path;
23 
24 import distssh.config;
25 import distssh.metric;
26 import distssh.types;
27 import distssh.utility;
28 
29 static import std.getopt;
30 
31 import unit_threaded.attrs : Serial;
32 
33 version (unittest) {
34     import unit_threaded.assertions;
35 }
36 
37 int rmain(string[] args) {
38     import distssh.daemon;
39     static import distssh.purge;
40 
41     confLogger(VerboseMode.info);
42 
43     auto conf = parseUserArgs(args);
44 
45     if (conf.global.helpInfo.helpWanted) {
46         return cli(conf);
47     }
48 
49     confLogger(conf.global.verbosity);
50     logger.trace(conf);
51 
52     import core.stdc.signal : signal;
53     import std.file : symlink;
54     import std.stdio : writeln;
55     import std.variant : visit;
56 
57     // register a handler for writing to a closed pipe in case it ever happens.
58     // the handler ignores it.
59     signal(13, &handleSIGPIPE);
60 
61     // dfmt off
62     return conf.data.visit!(
63           (Config.Help a) => cli(conf),
64           (Config.Shell a) => cli(conf, a),
65           (Config.Cmd a) => cli(conf, a),
66           (Config.LocalRun a) => cli(conf, a),
67           (Config.Install a) => cli(conf, a, (string src, string dst) => symlink(src, dst)),
68           (Config.MeasureHosts a) => cli(conf, a),
69           (Config.LocalLoad a) => cli(a, (string s) => writeln(s)),
70           (Config.RunOnAll a) => cli(conf, a),
71           (Config.LocalShell a) => cli(conf, a),
72           (Config.Env a) => cli(conf, a),
73           (Config.Daemon a) => distssh.daemon.cli(conf, a),
74           (Config.Purge a) => distssh.purge.cli(conf, a),
75           (Config.LocalPurge a) => distssh.purge.cli(conf, a),
76     );
77     // dfmt on
78 }
79 
80 private:
81 
82 /// dummy used to ignore SIGPIPE
83 extern (C) void handleSIGPIPE(int sig) nothrow @nogc @system {
84 }
85 
86 int cli(Config conf) {
87     conf.printHelp;
88     return 0;
89 }
90 
91 int cli(const Config fconf, Config.Shell conf) {
92     import std.datetime.stopwatch : StopWatch, AutoStart;
93     import std.process : spawnProcess, wait;
94     import std.stdio : writeln, writefln;
95     import distssh.connection : sshShellArgs;
96 
97     auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster).bestSelectRange;
98 
99     if (hosts.empty) {
100         logger.errorf("No remote host online").collectException;
101     }
102 
103     auto bgBeat = BackgroundClientBeat(AbsolutePath(fconf.global.dbPath));
104 
105     const timout_until_considered_successfull_connection = fconf.global.timeout * 2;
106 
107     foreach (host; hosts) {
108         try {
109             writeln("Connecting to ", host);
110 
111             auto sw = StopWatch(AutoStart.yes);
112 
113             auto exit_status = spawnProcess(sshShellArgs(host,
114                     fconf.global.workDir.get.Path).toArgs).wait;
115 
116             // #SPC-fallback_remote_host
117             if (exit_status == 0 || sw.peek > timout_until_considered_successfull_connection) {
118                 writefln("Connection to %s closed.", host);
119                 return exit_status;
120             } else {
121                 logger.warningf("Connection failed to %s. Falling back on next available host",
122                         host);
123             }
124         } catch (Exception e) {
125             logger.error(e.msg).collectException;
126         }
127     }
128 
129     logger.error("No remote host online").collectException;
130     return 1;
131 }
132 
133 // #SPC-fast_env_startup
134 int cli(Config fconf, Config.Cmd conf) {
135     import std.stdio : stdout;
136 
137     if (fconf.global.command.empty) {
138         logger.error("No command specified");
139         logger.error("Specify by adding -- <my command>");
140         return 1;
141     }
142 
143     auto bgBeat = BackgroundClientBeat(AbsolutePath(fconf.global.dbPath));
144 
145     foreach (host; RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster).bestSelectRange) {
146         logger.info("Connecting to ", host);
147         // to ensure connecting to is at the top of e.g. logfiles when the user run
148         // distcmd env > log.txt
149         stdout.flush;
150         return executeOnHost(ExecuteOnHostConf(fconf.global.workDir, fconf.global.command,
151                 fconf.global.importEnv, fconf.global.cloneEnv, fconf.global.noImportEnv), host);
152     }
153 
154     logger.error("No remote host online from the available ",
155             fconf.global.cluster).collectException;
156     return 1;
157 }
158 
159 // #SPC-fast_env_startup
160 int cli(Config fconf, Config.LocalRun conf) {
161     import core.time : dur;
162     import std.datetime : Clock;
163     import std.file : exists;
164     import std.process : PConfig = Config, Redirect, userShell, thisProcessID;
165     import std.stdio : File, stdin, stdout, stderr;
166     import std.utf : toUTF8;
167     import proc;
168     import sumtype;
169     import my.timer : makeTimers, makeInterval;
170     import distssh.protocol;
171 
172     static struct LocalRunConf {
173         import core.sys.posix.termios;
174 
175         string[string] env;
176         string[] cmd;
177         Path workdir;
178         termios mode;
179     }
180 
181     static string[string] readEnvFromStdin(ProtocolEnv src) {
182         string[string] rval;
183         foreach (kv; src) {
184             rval[kv.key] = kv.value;
185         }
186         return rval;
187     }
188 
189     static string[string] readEnvFromFile(const Config fconf) {
190         string[string] rval;
191         foreach (kv; readEnv(fconf.global.importEnv.get.Path)) {
192             rval[kv.key] = kv.value;
193         }
194         return rval;
195     }
196 
197     try {
198         auto pread = PipeReader(stdin.fileno);
199 
200         auto localConf = () {
201             LocalRunConf conf;
202             conf.cmd = fconf.global.command.get;
203             conf.workdir = fconf.global.workDir.get.Path;
204 
205             // guard against an error occuring when transfering the data such
206             // that ConfDone is trashed.
207             const timeout = Clock.currTime + 5.dur!"minutes";
208 
209             bool running = fconf.global.stdinMsgPackEnv.get;
210             while (running && Clock.currTime < timeout) {
211                 pread.update;
212                 pread.unpack().match!((None x) {}, (ConfDone x) {
213                     running = false;
214                 }, (ProtocolEnv x) { conf.env = readEnvFromStdin(x); }, (HeartBeat x) {
215                 }, (Command x) { conf.cmd = x.value; }, (Workdir x) {
216                     conf.workdir = x.value.Path;
217                 }, (Key x) {}, (TerminalCapability x) { conf.mode = x.value; });
218             }
219 
220             if (!fconf.global.stdinMsgPackEnv) {
221                 conf.env = readEnvFromFile(fconf);
222             }
223 
224             return conf;
225         }();
226 
227         auto res = () {
228             if (conf.useFakeTerminal) {
229                 import core.sys.posix.termios : tcsetattr, TCSAFLUSH;
230 
231                 auto p = ttyProcess([userShell] ~ shellSwitch(userShell) ~ [
232                         localConf.cmd.joiner(" ").toUTF8
233                         ], localConf.env, PConfig.none, localConf.workdir).sandbox.scopeKill;
234                 tcsetattr(p.stdin.file.fileno, TCSAFLUSH, &localConf.mode);
235                 return p;
236             }
237             return pipeShell(localConf.cmd.joiner(" ").toUTF8,
238                     Redirect.stdin | Redirect.stdout | Redirect.stderr,
239                     localConf.env, PConfig.none, localConf.workdir).sandbox.scopeKill;
240         }();
241 
242         import core.sys.posix.unistd : getppid;
243 
244         const parent_pid = getppid;
245         bool loop_running = true;
246 
247         auto timers = makeTimers;
248         makeInterval(timers, () {
249             // detect ctrl+c on the client side
250             if (getppid != parent_pid) {
251                 loop_running = false;
252             }
253             return 500.dur!"msecs";
254         }, 50.dur!"msecs");
255 
256         ubyte[4096] buf;
257         bool pipeOutputToUser() @trusted {
258             // guard against an error occuring when writing to the users
259             // stdout.
260             const timeout = Clock.currTime + 5.dur!"minutes";
261 
262             bool hasWritten;
263             {
264                 bool doFlush;
265                 while (res.stdout.hasPendingData && Clock.currTime < timeout) {
266                     auto r = res.stdout.read(buf[]);
267                     stdout.rawWrite(r);
268                     hasWritten = true;
269                     doFlush = true;
270                 }
271                 if (doFlush) {
272                     stdout.flush;
273                 }
274             }
275 
276             {
277                 bool doFlush;
278                 while (res.stderr.hasPendingData && Clock.currTime < timeout) {
279                     auto r = res.stderr.read(buf[]);
280                     stderr.rawWrite(r);
281                     hasWritten = true;
282                     doFlush = true;
283                 }
284                 if (doFlush) {
285                     stderr.flush;
286                 }
287             }
288 
289             return hasWritten;
290         }
291 
292         scope (exit)
293             pipeOutputToUser;
294 
295         makeInterval(timers, () @safe {
296             if (pipeOutputToUser) {
297                 return 10.dur!"msecs";
298             }
299             // slower if not much is happening
300             return 100.dur!"msecs";
301         }, 25.dur!"msecs");
302 
303         // a dummy event that ensure that it tick each 50 msec.
304         makeInterval(timers, () => 50.dur!"msecs", 50.dur!"msecs");
305 
306         int exit_status = 1;
307 
308         auto wd = HeartBeatMonitor(fconf.global.timeout * 2);
309 
310         while (!wd.isTimeout && loop_running) {
311             pread.update;
312 
313             try {
314                 if (res.tryWait) {
315                     exit_status = res.status;
316                     loop_running = false;
317                 }
318 
319                 pread.unpack().match!((None x) {}, (ConfDone x) {}, (ProtocolEnv x) {
320                 }, (HeartBeat x) { wd.beat; }, (Command x) {}, (Workdir x) {}, (Key x) {
321                     auto data = x.value;
322                     while (!data.empty) {
323                         auto written = res.stdin.write(x.value);
324                         if (written.empty) {
325                             // closed so break.
326                             break;
327                         }
328                         data = data[written.length .. $];
329                     }
330                 }, (TerminalCapability x) {});
331             } catch (Exception e) {
332             }
333 
334             timers.tick(25.dur!"msecs");
335         }
336 
337         return exit_status;
338     } catch (Exception e) {
339         () @trusted { logger.trace(e).collectException; }();
340         logger.error(e.msg).collectException;
341     }
342 
343     return 1;
344 }
345 
346 int cli(const Config fconf, Config.Install conf, void delegate(string src, string dst) symlink) nothrow {
347     import std.path : buildPath;
348     import std.file : exists, remove;
349 
350     void replace(string src, string dst) {
351         if (exists(dst))
352             remove(dst);
353         symlink(src, dst);
354     }
355 
356     try {
357         replace(fconf.global.selfBinary.get, buildPath(fconf.global.selfDir.get, distShell));
358         replace(fconf.global.selfBinary.get, buildPath(fconf.global.selfDir.get, distCmd));
359         return 0;
360     } catch (Exception e) {
361         logger.error(e.msg).collectException;
362     }
363 
364     return 1;
365 }
366 
367 // #SPC-measure_remote_hosts
368 int cli(const Config fconf, Config.MeasureHosts conf) nothrow {
369     import std.algorithm : sort;
370     import std.conv : to;
371     import std.stdio : writefln, writeln;
372     import distssh.table;
373     import distssh.connection;
374 
375     writeln("Host is overloaded if Load is >1").collectException;
376 
377     auto tbl = Table!5(["Host", "Load", "Access Time", "Updated", "Multiplex"]);
378     void addRow(HostLoad a) nothrow {
379         static string toInternal(Duration d) {
380             import std.format : format;
381 
382             int seconds;
383             short msecs;
384             d.split!("seconds", "msecs")(seconds, msecs);
385             if (seconds == 0)
386                 return format("%sms", msecs);
387             else
388                 return format("%ss %sms", seconds, msecs);
389         }
390 
391         string[5] row;
392         try {
393             row[0] = a.host;
394             row[1] = a.load.loadAvg.to!string;
395             row[2] = toInternal(a.load.accessTime);
396             row[3] = a.updated.toLocalTime.to!string;
397             row[4] = makeMaster(a.host).isAlive ? "yes" : "no";
398             tbl.put(row);
399         } catch (Exception e) {
400             logger.trace(e.msg).collectException;
401         }
402     }
403 
404     auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster);
405 
406     foreach (a; hosts.onlineRange.sort!((a, b) => a.load < b.load)) {
407         addRow(a);
408     }
409 
410     auto unused = hosts.unusedRange;
411     if (!unused.empty) {
412         tbl.put(["-", "-", "-", "-", "-"]).collectException;
413         foreach (a; unused) {
414             addRow(a);
415         }
416     }
417 
418     try {
419         writeln(tbl);
420     } catch (Exception e) {
421         logger.error(e.msg).collectException;
422     }
423 
424     return 0;
425 }
426 
427 /** Print the load of localhost.
428  *
429  * #SPC-measure_local_load
430  */
431 int cli(WriterT)(Config.LocalLoad conf, scope WriterT writer) {
432     import std.ascii : newline;
433     import std.conv : to;
434     import std.parallelism : totalCPUs;
435     import distssh.libc : getloadavg;
436 
437     try {
438         double[3] loadavg;
439         int samples = getloadavg(&loadavg[0], 3);
440 
441         if (samples == -1 || samples == 0)
442             loadavg[0] = totalCPUs > 0 ? totalCPUs : 1;
443 
444         double cores = totalCPUs;
445 
446         // make sure the loadavg is on a new line because the last line parsed is expected to contain the loadavg.
447         writer(newline);
448 
449         if (cores > 0)
450             writer((loadavg[0] / cores).to!string);
451         else
452             writer(loadavg[0].to!string);
453     } catch (Exception e) {
454         logger.trace(e.msg).collectException;
455         return -1;
456     }
457 
458     return 0;
459 }
460 
461 int cli(Config fconf, Config.RunOnAll conf) nothrow {
462     import std.algorithm : sort;
463     import std.stdio : writefln, writeln, stdout;
464 
465     auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster);
466 
467     writefln("Hosts (%s): %(%s|%)", globalEnvHostKey, hosts.allRange.map!(a => a.host))
468         .collectException;
469 
470     bool exit_status = true;
471     foreach (a; hosts.allRange
472             .filter!(a => !a.load.unknown)
473             .map!(a => a.host)) {
474         stdout.writefln("Connecting to %s", a).collectException;
475         try {
476             // #SPC-flush_buffers
477             stdout.flush;
478         } catch (Exception e) {
479         }
480 
481         auto status = executeOnHost(ExecuteOnHostConf(fconf.global.workDir, fconf.global.command,
482                 fconf.global.importEnv, fconf.global.cloneEnv, fconf.global.noImportEnv), a);
483 
484         if (status != 0) {
485             writeln("Failed, error code: ", status).collectException;
486             exit_status = false;
487         }
488 
489         stdout.writefln("Connection to %s closed.", a).collectException;
490     }
491 
492     return exit_status ? 0 : 1;
493 }
494 
495 // #SPC-shell_current_dir
496 int cli(const Config fconf, Config.LocalShell conf) {
497     import std.file : exists;
498     import std.process : spawnProcess, wait, userShell, Config, Pid;
499 
500     try {
501         auto pid = () {
502             if (exists(fconf.global.workDir.get))
503                 return spawnProcess([userShell], null, Config.none, fconf.global.workDir.get);
504             return spawnProcess([userShell]);
505         }();
506         return pid.wait;
507     } catch (Exception e) {
508         logger.error(e.msg).collectException;
509         return 1;
510     }
511 }
512 
513 // #SPC-modify_env
514 int cli(const Config fconf, Config.Env conf) {
515     import std.algorithm : map, filter;
516     import std.array : assocArray, array;
517     import std.stdio : writeln, writefln;
518     import std..string : split;
519     import std.typecons : tuple;
520     import distssh.protocol : ProtocolEnv, EnvVariable;
521 
522     if (conf.exportEnv) {
523         try {
524             writeEnv(fconf.global.importEnv.get, cloneEnv);
525             logger.info("Exported environment to ", fconf.global.importEnv.get);
526         } catch (Exception e) {
527             logger.error(e.msg).collectException;
528             return 1;
529         }
530 
531         return 0;
532     }
533 
534     string[string] set_envs;
535     try {
536         set_envs = conf.envSet
537             .map!(a => a.split('='))
538             .filter!(a => !a.empty)
539             .map!(a => tuple(a[0], a[1]))
540             .assocArray;
541     } catch (Exception e) {
542         writeln("Unable to parse supplied envs to modify: ", e.msg).collectException;
543         return 1;
544     }
545 
546     try {
547         auto env = readEnv(fconf.global.importEnv.get.Path).map!(a => tuple(a.key,
548                 a.value)).assocArray;
549 
550         foreach (k; conf.envDel.filter!(a => a in env)) {
551             writeln("Removing ", k);
552             env.remove(k);
553         }
554 
555         foreach (kv; set_envs.byKeyValue) {
556             if (kv.key in env)
557                 writefln("Setting %s=%s", kv.key, kv.value);
558             else
559                 writefln("Adding %s=%s", kv.key, kv.value);
560             env[kv.key] = kv.value;
561         }
562 
563         if (conf.print) {
564             foreach (const kv; env.byKeyValue)
565                 writefln(`%s="%s"`, kv.key, kv.value);
566         }
567 
568         writeEnv(fconf.global.importEnv.get,
569                 ProtocolEnv(env.byKeyValue.map!(a => EnvVariable(a.key, a.value)).array));
570     } catch (Exception e) {
571         logger.error(e.msg).collectException;
572         return 1;
573     }
574 
575     return 0;
576 }
577 
578 @("shall export the environment")
579 @Serial unittest {
580     import std.conv : to;
581     import std.file;
582     import std.process : environment;
583     import std.variant : tryVisit;
584 
585     // arrange
586     immutable remove_me = "remove_me.export".Path;
587     scope (exit)
588         remove(remove_me);
589 
590     auto opts = parseUserArgs(["distssh", "env", "-e", "--env-file", remove_me]);
591     auto envConf = opts.data.tryVisit!((Config.Env a) => a, () => Config.Env.init);
592 
593     const env_key = "DISTSSH_ENV_TEST";
594     environment[env_key] = env_key ~ remove_me;
595     scope (exit)
596         environment.remove(env_key);
597 
598     // shall export the environment to the file
599     void verify1() {
600         // test normal export
601         cli(opts, envConf);
602         auto env = readEnv(remove_me);
603         assert(!env.filter!(a => a.key == env_key).empty, env.to!string);
604     }
605 
606     verify1;
607 
608     // shall filter out specified env before exporting to the file
609     environment[globalEnvFilterKey] = "DISTSSH_ENV_TEST;junk ";
610     scope (exit)
611         environment.remove(globalEnvFilterKey);
612 
613     void verify2() {
614         cli(opts, envConf);
615         auto env = readEnv(remove_me);
616         assert(env.filter!(a => a.key == env_key).empty, env.to!string);
617     }
618 
619     verify2;
620 }
621 
622 @("shall create symlinks to self")
623 @Serial unittest {
624     string[2][] symlinks;
625     void fakeSymlink(string src, string dst) {
626         string[2] v = [src, dst];
627         symlinks ~= v;
628     }
629 
630     Config conf;
631     conf.global.selfBinary = typeof(conf.global.selfBinary)("/foo/src");
632     conf.global.selfDir = typeof(conf.global.selfDir)("/bar");
633 
634     cli(conf, Config.Install.init, &fakeSymlink);
635 
636     assert(symlinks[0] == ["/foo/src", "/bar/distshell"]);
637     assert(symlinks[1] == ["/foo/src", "/bar/distcmd"]);
638 }
639 
640 @("shall modify the exported env by adding, removing and modifying")
641 @Serial unittest {
642     import std.array;
643     import std.file;
644     import std.process : environment;
645     import std.stdio;
646     import std.typecons : tuple;
647     import std.variant : tryVisit;
648 
649     // arrange
650     immutable remove_me = "remove_me.export".Path;
651     scope (exit)
652         remove(remove_me);
653 
654     environment["FOO_DEL"] = "del me";
655     environment["FOO_MOD"] = "mod me";
656     scope (exit) {
657         environment.remove("FOO_DEL");
658         environment.remove("FOO_MOD");
659         environment.remove("FOO_ADD");
660     }
661 
662     auto conf = parseUserArgs(["distssh", "env", "-e", "--env-file", remove_me]);
663     auto envConf = conf.data.tryVisit!((Config.Env a) => a, () => Config.Env.init);
664     cli(conf, envConf);
665 
666     // act
667     conf = parseUserArgs([
668             "distssh", "env", "-d", "FOO_DEL", "-s", "FOO_MOD=42", "--set",
669             "FOO_ADD=42", "--env-file", remove_me
670             ]);
671     envConf = conf.data.tryVisit!((Config.Env a) => a, () => Config.Env.init);
672     cli(conf, envConf);
673 
674     // assert
675     auto env = readEnv(remove_me).map!(a => tuple(a.key, a.value)).assocArray;
676     assert(env["FOO_MOD"] == "42");
677     assert(env["FOO_ADD"] == "42");
678     assert("FOO_DEL" !in env);
679 }
680 
681 @("shall print the load of the localhost")
682 @Serial unittest {
683     string load;
684     auto exit_status = cli(Config.LocalLoad.init, (string s) => load = s);
685     assert(exit_status == 0);
686     assert(load.length > 0, load);
687 }
688 
689 void writeEnv(string filename, from.distssh.protocol.ProtocolEnv env) {
690     import core.sys.posix.sys.stat : fchmod, S_IRUSR, S_IWUSR;
691     import std.stdio : File;
692     import distssh.protocol : Serialize;
693 
694     auto fout = File(filename, "w");
695     fchmod(fout.fileno, S_IRUSR | S_IWUSR);
696 
697     auto ser = Serialize!(void delegate(const(ubyte)[]) @safe)((const(ubyte)[] a) => fout.rawWrite(
698             a));
699 
700     ser.pack(env);
701 }