1 /**
2 Copyright: Copyright (c) 2018, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Methods prefixed with `cli_` are strongly related to user commands.
7 They more or less fully implement a command line interface command.
8 */
9 module distssh.main_;
10 
11 import core.time : Duration;
12 import std.algorithm : splitter, map, filter, joiner;
13 import std.exception : collectException;
14 import std.stdio : File;
15 import std.typecons : Nullable, NullableRef;
16 import logger = std.experimental.logger;
17 
18 import colorlog;
19 
20 import from_;
21 
22 import distssh.config;
23 import distssh.metric;
24 import distssh.process : Pid;
25 import distssh.types;
26 
27 static import std.getopt;
28 
29 int rmain(string[] args) {
30     import distssh.daemon;
31 
32     confLogger(VerboseMode.info);
33 
34     auto conf = parseUserArgs(args);
35 
36     if (conf.global.helpInfo.helpWanted) {
37         return cli(conf);
38     }
39 
40     confLogger(conf.global.verbosity);
41     logger.trace(conf);
42 
43     import std.file : symlink;
44     import std.stdio : writeln;
45     import std.variant : visit;
46 
47     // dfmt off
48     return conf.data.visit!(
49           (Config.Help a) => cli(conf),
50           (Config.Shell a) => cli(conf, a),
51           (Config.Cmd a) => cli(conf, a),
52           (Config.LocalRun a) => cli(conf, a),
53           (Config.Install a) => cli(conf, a, (string src, string dst) => symlink(src, dst)),
54           (Config.MeasureHosts a) => cli(conf, a),
55           (Config.LocalLoad a) => cli(a, (string s) => writeln(s)),
56           (Config.RunOnAll a) => cli(conf, a),
57           (Config.LocalShell a) => cli(conf, a),
58           (Config.Env a) => cli(conf, a),
59           (Config.Daemon a) => distssh.daemon.cli(conf, a),
60     );
61     // dfmt on
62 }
63 
64 private:
65 
66 int cli(Config conf) {
67     conf.printHelp;
68     return 0;
69 }
70 
71 int cli(const Config fconf, Config.Shell conf) nothrow {
72     import std.datetime.stopwatch : StopWatch, AutoStart;
73     import std.file : thisExePath;
74     import std.process : spawnProcess, wait, escapeShellFileName;
75     import std.stdio : writeln, writefln;
76 
77     auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster);
78 
79     if (hosts.empty) {
80         logger.errorf("No remote host online").collectException;
81     }
82 
83     const timout_until_considered_successfull_connection = fconf.global.timeout * 2;
84 
85     while (!hosts.empty) {
86         auto host = hosts.randomAndPop;
87 
88         try {
89             writeln("Connecting to ", host);
90 
91             auto sw = StopWatch(AutoStart.yes);
92 
93             // two -t forces a tty to be created and used which mean that the remote shell will *think* it is an interactive shell
94             auto exit_status = spawnProcess(["ssh", "-q", "-t",
95                     "-t"] ~ sshNoLoginArgs ~ [
96                     host, thisExePath.escapeShellFileName, "localshell",
97                     "--workdir", fconf.global.workDir.escapeShellFileName
98                     ]).wait;
99 
100             // #SPC-fallback_remote_host
101             if (exit_status == 0 || sw.peek > timout_until_considered_successfull_connection) {
102                 writefln("Connection to %s closed.", host);
103                 return exit_status;
104             } else {
105                 logger.warningf("Connection failed to %s. Falling back on next available host",
106                         host);
107             }
108         } catch (Exception e) {
109             logger.error(e.msg).collectException;
110         }
111     }
112 
113     logger.error("No remote host online").collectException;
114     return 1;
115 }
116 
117 // #SPC-fast_env_startup
118 int cli(const Config fconf, Config.Cmd conf) {
119     auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster);
120 
121     if (hosts.empty) {
122         logger.errorf("No remote host online").collectException;
123         return 1;
124     }
125 
126     return executeOnHost(fconf, hosts.randomAndPop);
127 }
128 
129 // #SPC-fast_env_startup
130 int cli(const Config fconf, Config.LocalRun conf) {
131     import core.time : dur;
132     import std.stdio : File, stdin;
133     import std.file : exists;
134     import std.process : spawnProcess, spawnShell, Pid, tryWait, thisProcessID;
135     import std.process : PConfig = Config;
136     import std.utf : toUTF8;
137     import distssh.timer : makeTimers, makeInterval;
138 
139     if (fconf.global.command.length == 0)
140         return 0;
141 
142     static auto updateEnv(const Config fconf, ref PipeReader pread, ref string[string] out_env) {
143         import distssh.protocol : ProtocolEnv;
144 
145         ProtocolEnv env;
146 
147         if (fconf.global.stdinMsgPackEnv) {
148             auto timers = makeTimers;
149             makeInterval(timers, () @trusted {
150                 pread.update;
151 
152                 try {
153                     auto tmp = pread.unpack!(ProtocolEnv);
154                     if (!tmp.isNull) {
155                         env = tmp;
156                         return false;
157                     }
158                 } catch (Exception e) {
159                 }
160                 return true;
161             }, 10.dur!"msecs");
162             while (!timers.empty) {
163                 timers.tick(10.dur!"msecs");
164             }
165         } else {
166             env = readEnv(fconf.global.importEnv);
167         }
168 
169         foreach (kv; env) {
170             out_env[kv.key] = kv.value;
171         }
172     }
173 
174     try {
175         string[string] env;
176         auto pread = PipeReader(stdin.fileno);
177 
178         try {
179             updateEnv(fconf, pread, env);
180         } catch (Exception e) {
181             logger.trace(e.msg).collectException;
182         }
183 
184         Pid res;
185 
186         if (exists(fconf.global.command[0])) {
187             res = spawnProcess(fconf.global.command, env, PConfig.none, fconf.global.workDir);
188         } else {
189             res = spawnShell(fconf.global.command.dup.joiner(" ").toUTF8, env,
190                     PConfig.none, fconf.global.workDir);
191         }
192 
193         import core.sys.posix.unistd : getppid;
194 
195         const parent_pid = getppid;
196         bool sigint_cleanup;
197         bool loop_running = true;
198 
199         bool sigintEvent() {
200             if (getppid != parent_pid) {
201                 sigint_cleanup = true;
202                 loop_running = false;
203             }
204             return true;
205         }
206 
207         auto timers = makeTimers;
208         makeInterval(timers, &sigintEvent, 500.dur!"msecs");
209         // a dummy event that ensure that it tick each 50 msec.
210         makeInterval(timers, () => true, 50.dur!"msecs");
211 
212         int exit_status = 1;
213 
214         auto wd = Watchdog(pread, fconf.global.timeout * 2);
215 
216         while (!wd.isTimeout && loop_running) {
217             pread.update;
218 
219             try {
220                 auto status = tryWait(res);
221                 if (status.terminated) {
222                     exit_status = status.status;
223                     loop_running = false;
224                 }
225                 wd.update;
226             } catch (Exception e) {
227             }
228 
229             // #SPC-sigint_detection
230             timers.tick(50.dur!"msecs");
231         }
232 
233         import core.sys.posix.signal : kill, killpg, SIGKILL;
234         import core.sys.posix.unistd : getpid;
235         import distssh.process : cleanupProcess;
236 
237         // #SPC-early_terminate_no_processes_left
238         if (wd.isTimeout) {
239             // cleanup all subprocesses of sshd. Should catch those that fork and create another process group.
240             cleanupProcess(.Pid(parent_pid), (int a) => kill(a, SIGKILL),
241                     (int a) => killpg(a, SIGKILL));
242         }
243 
244         if (sigint_cleanup || wd.isTimeout) {
245             // sshd has already died on a SIGINT on the host side thus it is only possible to reliabley cleanup *this* process tree if anything is left.
246             cleanupProcess(.Pid(getpid), (int a) => kill(a, SIGKILL), (int a) => killpg(a, SIGKILL)).killpg(
247                     SIGKILL);
248         }
249 
250         return exit_status;
251     } catch (Exception e) {
252         logger.error(e.msg).collectException;
253     }
254 
255     return 1;
256 }
257 
258 int cli(const Config fconf, Config.Install conf, void delegate(string src, string dst) symlink) nothrow {
259     import std.path : buildPath;
260 
261     try {
262         symlink(fconf.global.selfBinary, buildPath(fconf.global.selfDir, distShell));
263         symlink(fconf.global.selfBinary, buildPath(fconf.global.selfDir, distCmd));
264         return 0;
265     } catch (Exception e) {
266         logger.error(e.msg).collectException;
267     }
268 
269     return 1;
270 }
271 
272 // #SPC-measure_remote_hosts
273 int cli(const Config fconf, Config.MeasureHosts conf) nothrow {
274     import std.conv : to;
275     import std.stdio : writefln, writeln;
276     import distssh.table;
277 
278     auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster);
279 
280     writeln("Host is overloaded if Load is >1").collectException;
281 
282     string[3] row = ["Host", "Access Time", "Load"];
283     auto tbl = Table!3(row);
284 
285     static string toInternal(Duration d) {
286         import std.format : format;
287 
288         int seconds;
289         short msecs;
290         d.split!("seconds", "msecs")(seconds, msecs);
291         if (seconds == 0)
292             return format("%sms", msecs);
293         else
294             return format("%ss %sms", seconds, msecs);
295     }
296 
297     foreach (a; hosts.remoteByLoad) {
298         try {
299             row[0] = a[0];
300             row[1] = toInternal(a[1].accessTime);
301             row[2] = a[1].loadAvg.to!string;
302             tbl.put(row);
303         } catch (Exception e) {
304             logger.trace(e.msg).collectException;
305         }
306     }
307 
308     try {
309         writeln(tbl);
310     } catch (Exception e) {
311         logger.error(e.msg).collectException;
312     }
313 
314     return 0;
315 }
316 
317 /** Print the load of localhost.
318  *
319  * #SPC-measure_local_load
320  */
321 int cli(WriterT)(Config.LocalLoad conf, scope WriterT writer) {
322     import std.ascii : newline;
323     import std.conv : to;
324     import std.parallelism : totalCPUs;
325     import distssh.libc : getloadavg;
326 
327     try {
328         double[3] loadavg;
329         int samples = getloadavg(&loadavg[0], 3);
330 
331         if (samples == -1 || samples == 0)
332             loadavg[0] = totalCPUs > 0 ? totalCPUs : 1;
333 
334         double cores = totalCPUs;
335 
336         // make sure the loadavg is on a new line because the last line parsed is expected to contain the loadavg.
337         writer(newline);
338 
339         if (cores > 0)
340             writer((loadavg[0] / cores).to!string);
341         else
342             writer(loadavg[0].to!string);
343     } catch (Exception e) {
344         logger.trace(e.msg).collectException;
345         return -1;
346     }
347 
348     return 0;
349 }
350 
351 int cli(const Config fconf, Config.RunOnAll conf) nothrow {
352     import std.algorithm : sort;
353     import std.stdio : writefln, writeln, stdout;
354 
355     writefln("Configured hosts (%s): %(%s|%)", globalEnvHostKey, fconf.global.cluster)
356         .collectException;
357 
358     bool exit_status = true;
359     foreach (a; fconf.global.cluster.dup.sort) {
360         stdout.writefln("Connecting to %s.", a).collectException;
361         try {
362             // #SPC-flush_buffers
363             stdout.flush;
364         } catch (Exception e) {
365         }
366 
367         auto status = executeOnHost(fconf, a);
368 
369         if (status != 0) {
370             writeln("Failed, error code: ", status).collectException;
371             exit_status = false;
372         }
373 
374         stdout.writefln("Connection to %s closed.", a).collectException;
375     }
376 
377     return exit_status ? 0 : 1;
378 }
379 
380 // #SPC-shell_current_dir
381 int cli(const Config fconf, Config.LocalShell conf) {
382     import std.file : exists;
383     import std.process : spawnProcess, wait, userShell, Config, Pid;
384 
385     try {
386         Pid pid;
387         if (exists(fconf.global.workDir))
388             pid = spawnProcess([userShell], null, Config.none, fconf.global.workDir);
389         else
390             pid = spawnProcess([userShell]);
391 
392         return pid.wait;
393     } catch (Exception e) {
394         logger.error(e.msg).collectException;
395         return 1;
396     }
397 }
398 
399 // #SPC-modify_env
400 int cli(const Config fconf, Config.Env conf) {
401     import std.algorithm : map, filter;
402     import std.array : assocArray, empty, array;
403     import std.path : absolutePath;
404     import std.stdio : writeln, writefln;
405     import std..string : split;
406     import std.typecons : tuple;
407     import distssh.protocol : ProtocolEnv, EnvVariable;
408 
409     if (conf.exportEnv) {
410         try {
411             writeEnv(fconf.global.importEnv, cloneEnv);
412             logger.info("Exported environment to ", fconf.global.importEnv);
413         } catch (Exception e) {
414             logger.error(e.msg).collectException;
415             return 1;
416         }
417 
418         return 0;
419     }
420 
421     string[string] set_envs;
422     try {
423         set_envs = conf.envSet
424             .map!(a => a.split('='))
425             .filter!(a => !a.empty)
426             .map!(a => tuple(a[0], a[1]))
427             .assocArray;
428     } catch (Exception e) {
429         writeln("Unable to parse supplied envs to modify: ", e.msg).collectException;
430         return 1;
431     }
432 
433     try {
434         auto env = readEnv(fconf.global.importEnv.absolutePath).map!(a => tuple(a.key,
435                 a.value)).assocArray;
436 
437         foreach (k; conf.envDel.filter!(a => a in env)) {
438             writeln("Removing ", k);
439             env.remove(k);
440         }
441 
442         foreach (kv; set_envs.byKeyValue) {
443             if (kv.key in env)
444                 writefln("Setting %s=%s", kv.key, kv.value);
445             else
446                 writefln("Adding %s=%s", kv.key, kv.value);
447             env[kv.key] = kv.value;
448         }
449 
450         if (conf.print) {
451             foreach (const kv; env.byKeyValue)
452                 writefln(`%s="%s"`, kv.key, kv.value);
453         }
454 
455         writeEnv(fconf.global.importEnv,
456                 ProtocolEnv(env.byKeyValue.map!(a => EnvVariable(a.key, a.value)).array));
457     } catch (Exception e) {
458         logger.error(e.msg).collectException;
459         return 1;
460     }
461 
462     return 0;
463 }
464 
465 @("shall export the environment")
466 unittest {
467     import std.conv : to;
468     import std.file;
469     import std.process : environment;
470     import std.variant : tryVisit;
471 
472     // arrange
473     immutable remove_me = "remove_me.export";
474     scope (exit)
475         remove(remove_me);
476 
477     auto opts = parseUserArgs(["distssh", "env", "-e", "--env-file", remove_me]);
478     auto envConf = opts.data.tryVisit!((Config.Env a) => a, () => Config.Env.init);
479 
480     const env_key = "DISTSSH_ENV_TEST";
481     environment[env_key] = env_key ~ remove_me;
482     scope (exit)
483         environment.remove(env_key);
484 
485     // shall export the environment to the file
486     void verify1() {
487         // test normal export
488         cli(opts, envConf);
489         auto env = readEnv(remove_me);
490         assert(!env.filter!(a => a.key == env_key).empty, env.to!string);
491     }
492 
493     verify1;
494 
495     // shall filter out specified env before exporting to the file
496     environment[globalEnvFilterKey] = "DISTSSH_ENV_TEST;junk ";
497     scope (exit)
498         environment.remove(globalEnvFilterKey);
499 
500     void verify2() {
501         cli(opts, envConf);
502         auto env = readEnv(remove_me);
503         assert(env.filter!(a => a.key == env_key).empty, env.to!string);
504     }
505 
506     verify2;
507 }
508 
509 @("shall create symlinks to self")
510 unittest {
511     string[2][] symlinks;
512     void fakeSymlink(string src, string dst) {
513         string[2] v = [src, dst];
514         symlinks ~= v;
515     }
516 
517     Config conf;
518     conf.global.selfBinary = "/foo/src";
519     conf.global.selfDir = "/bar";
520 
521     cli(conf, Config.Install.init, &fakeSymlink);
522 
523     assert(symlinks[0] == ["/foo/src", "/bar/distshell"]);
524     assert(symlinks[1] == ["/foo/src", "/bar/distcmd"]);
525 }
526 
527 /** Execute a command on a remote host.
528  *
529  * #SPC-automatic_env_import
530  */
531 int executeOnHost(const Config conf, Host host) nothrow {
532     import distssh.protocol : ProtocolEnv;
533     import core.thread : Thread;
534     import core.time : dur, MonoTime;
535     import std.file : thisExePath;
536     import std.path : absolutePath;
537     import std.process : tryWait, Redirect, pipeProcess, escapeShellFileName;
538 
539     // #SPC-draft_remote_cmd_spec
540     try {
541         auto args = ["ssh"] ~ sshNoLoginArgs ~ [
542             host, thisExePath, "localrun", "--workdir",
543             conf.global.workDir.escapeShellFileName, "--stdin-msgpack-env", "--"
544         ] ~ conf.global.command;
545 
546         logger.info("Connecting to: ", host);
547         logger.trace("run: ", args.joiner(" "));
548 
549         auto p = pipeProcess(args, Redirect.stdin);
550 
551         auto pwriter = PipeWriter(p.stdin);
552 
553         ProtocolEnv env;
554         if (conf.global.cloneEnv)
555             env = cloneEnv;
556         else if (!conf.global.noImportEnv)
557             env = readEnv(conf.global.importEnv.absolutePath);
558         pwriter.pack(env);
559 
560         while (true) {
561             try {
562                 auto st = p.pid.tryWait;
563                 if (st.terminated)
564                     return st.status;
565 
566                 Watchdog.ping(pwriter);
567             } catch (Exception e) {
568             }
569 
570             Thread.sleep(50.dur!"msecs");
571         }
572     } catch (Exception e) {
573         logger.error(e.msg).collectException;
574         return 1;
575     }
576 }
577 
578 @("shall modify the exported env by adding, removing and modifying")
579 unittest {
580     import std.array;
581     import std.file;
582     import std.process : environment;
583     import std.stdio;
584     import std.typecons : tuple;
585     import std.variant : tryVisit;
586 
587     // arrange
588     immutable remove_me = "remove_me.export";
589     scope (exit)
590         remove(remove_me);
591 
592     environment["FOO_DEL"] = "del me";
593     environment["FOO_MOD"] = "mod me";
594     scope (exit) {
595         environment.remove("FOO_DEL");
596         environment.remove("FOO_MOD");
597         environment.remove("FOO_ADD");
598     }
599 
600     auto conf = parseUserArgs(["distssh", "env", "-e", "--env-file", remove_me]);
601     auto envConf = conf.data.tryVisit!((Config.Env a) => a, () => Config.Env.init);
602     cli(conf, envConf);
603 
604     // act
605     conf = parseUserArgs([
606             "distssh", "env", "-d", "FOO_DEL", "-s", "FOO_MOD=42", "--set",
607             "FOO_ADD=42", "--env-file", remove_me
608             ]);
609     envConf = conf.data.tryVisit!((Config.Env a) => a, () => Config.Env.init);
610     cli(conf, envConf);
611 
612     // assert
613     auto env = readEnv(remove_me).map!(a => tuple(a.key, a.value)).assocArray;
614     assert(env["FOO_MOD"] == "42");
615     assert(env["FOO_ADD"] == "42");
616     assert("FOO_DEL" !in env);
617 }
618 
619 struct NonblockingFd {
620     int fileno;
621 
622     private const int old_fcntl;
623 
624     this(int fd) {
625         this.fileno = fd;
626 
627         import core.sys.posix.fcntl : fcntl, F_SETFL, F_GETFL, O_NONBLOCK;
628 
629         old_fcntl = fcntl(fileno, F_GETFL);
630         fcntl(fileno, F_SETFL, old_fcntl | O_NONBLOCK);
631     }
632 
633     ~this() {
634         import core.sys.posix.fcntl : fcntl, F_SETFL;
635 
636         fcntl(fileno, F_SETFL, old_fcntl);
637     }
638 
639     void read(ref ubyte[] buf) {
640         static import core.sys.posix.unistd;
641 
642         auto len = core.sys.posix.unistd.read(fileno, buf.ptr, buf.length);
643         if (len > 0)
644             buf = buf[0 .. len];
645     }
646 }
647 
648 struct Watchdog {
649     import std.datetime.stopwatch : StopWatch;
650 
651     enum State {
652         ok,
653         timeout
654     }
655 
656     private {
657         State st;
658         Duration timeout;
659         NullableRef!PipeReader pread;
660         StopWatch sw;
661     }
662 
663     this(ref PipeReader pread, Duration timeout) {
664         this.pread = &pread;
665         this.timeout = timeout;
666         sw.start;
667     }
668 
669     void update() {
670         import distssh.protocol : HeartBeat;
671 
672         if (!pread.unpack!HeartBeat.isNull) {
673             sw.reset;
674             sw.start;
675         } else if (sw.peek > timeout) {
676             st = State.timeout;
677         }
678     }
679 
680     bool isTimeout() {
681         return State.timeout == st;
682     }
683 
684     static void ping(ref PipeWriter f) {
685         import distssh.protocol : HeartBeat;
686 
687         f.pack!HeartBeat;
688     }
689 }
690 
691 @("shall print the load of the localhost")
692 unittest {
693     string load;
694     auto exit_status = cli(Config.LocalLoad.init, (string s) => load = s);
695     assert(exit_status == 0);
696     assert(load.length > 0, load);
697 }
698 
699 /**
700  * #SPC-env_export_filter
701  *
702  * Params:
703  *  env = a null terminated array of C strings.
704  *
705  * Returns: a clone of the environment.
706  */
707 auto cloneEnv() nothrow {
708     import std.process : environment;
709     import std..string : strip;
710     import distssh.protocol : ProtocolEnv, EnvVariable;
711 
712     ProtocolEnv app;
713 
714     try {
715         auto env = environment.toAA;
716 
717         foreach (k; environment.get(globalEnvFilterKey, null)
718                 .strip.splitter(';').map!(a => a.strip)
719                 .filter!(a => a.length != 0)) {
720             if (env.remove(k)) {
721                 logger.infof("Removed '%s' from the exported environment", k);
722             }
723         }
724 
725         foreach (const a; env.byKeyValue) {
726             app ~= EnvVariable(a.key, a.value);
727         }
728     } catch (Exception e) {
729         logger.warning(e.msg).collectException;
730     }
731 
732     return app;
733 }
734 
735 struct PipeReader {
736     import distssh.protocol : Deserialize;
737 
738     NonblockingFd nfd;
739     Deserialize deser;
740 
741     alias deser this;
742 
743     this(int fd) {
744         this.nfd = NonblockingFd(fd);
745     }
746 
747     // Update the buffer with data from the pipe.
748     void update() nothrow {
749         ubyte[128] buf;
750         ubyte[] s = buf[];
751 
752         try {
753             nfd.read(s);
754             if (s.length > 0)
755                 deser.put(s);
756         } catch (Exception e) {
757         }
758 
759         deser.cleanupUntilKind;
760     }
761 }
762 
763 struct PipeWriter {
764     import distssh.protocol : Serialize;
765 
766     File fout;
767     Serialize!(void delegate(const(ubyte)[]) @safe) ser;
768 
769     alias ser this;
770 
771     this(File f) {
772         this.fout = f;
773         this.ser = typeof(ser)(&this.put);
774     }
775 
776     void put(const(ubyte)[] v) @safe {
777         fout.rawWrite(v);
778         fout.flush;
779     }
780 }
781 
782 from.distssh.protocol.ProtocolEnv readEnv(string filename) nothrow {
783     import distssh.protocol : ProtocolEnv, EnvVariable, Deserialize;
784     import std.file : exists;
785     import std.stdio : File;
786 
787     ProtocolEnv rval;
788 
789     if (!exists(filename)) {
790         logger.info("File to import the environment from do not exist: ",
791                 filename).collectException;
792         return rval;
793     }
794 
795     try {
796         auto fin = File(filename);
797         Deserialize deser;
798 
799         ubyte[128] buf;
800         while (!fin.eof) {
801             auto read_ = fin.rawRead(buf[]);
802             deser.put(read_);
803         }
804 
805         rval = deser.unpack!(ProtocolEnv);
806     } catch (Exception e) {
807         logger.error(e.msg).collectException;
808         logger.errorf("Unable to import environment from '%s'", filename).collectException;
809     }
810 
811     return rval;
812 }
813 
814 void writeEnv(string filename, from.distssh.protocol.ProtocolEnv env) {
815     import core.sys.posix.sys.stat : fchmod, S_IRUSR, S_IWUSR;
816     import std.stdio : File;
817     import distssh.protocol : Serialize;
818 
819     auto fout = File(filename, "w");
820     fchmod(fout.fileno, S_IRUSR | S_IWUSR);
821 
822     auto ser = Serialize!(void delegate(const(ubyte)[]) @safe)((const(ubyte)[] a) => fout.rawWrite(
823             a));
824 
825     ser.pack(env);
826 }
827 
828 /**
829   * Searches all dirs on path for exe if required,
830   * or simply calls it if it's a relative or absolute path
831   */
832 string pathToExe(string exe) {
833     import std.path : dirSeparator, pathSeparator, buildPath;
834     import std.algorithm : splitter;
835     import std.file : exists;
836     import std.process : environment;
837 
838     // if it already has a / or . at the start, assume the exe is correct
839     if (exe[0 .. 1] == dirSeparator || exe[0 .. 1] == ".")
840         return exe;
841     auto matches = environment["PATH"].splitter(pathSeparator).map!(path => buildPath(path, exe))
842         .filter!(path => exists(path));
843     return matches.empty ? exe : matches.front;
844 }