1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module distssh.utility;
7 
8 import core.time : Duration;
9 import std.algorithm : splitter, map, filter, joiner;
10 import std.array : empty;
11 import std.exception : collectException;
12 import std.stdio : File;
13 import std.typecons : Nullable, NullableRef;
14 import logger = std.experimental.logger;
15 
16 import colorlog;
17 
18 import my.from_;
19 import my.fswatch : FdPoller, PollStatus, PollEvent, PollResult, FdPoll;
20 import my.named_type;
21 import my.path;
22 
23 import distssh.config;
24 import distssh.metric;
25 import distssh.types;
26 
27 struct ExecuteOnHostConf {
28     NamedType!(string, Tag!"Workdir") workDir;
29     NamedType!(string[], Tag!"Command", null, Lengthable) command;
30     NamedType!(string, Tag!"ImportEnvFile") importEnv;
31     NamedType!(bool, Tag!"CloneEnv") cloneEnv;
32     NamedType!(bool, Tag!"NoImportEnv") noImportEnv;
33 }
34 
35 bool isInteractive() {
36     import my.tty;
37 
38     return isStdoutInteractive && isStderrInteractive;
39 }
40 
41 /** Execute a command on a remote host.
42  *
43  * #SPC-automatic_env_import
44  */
45 int executeOnHost(const ExecuteOnHostConf conf, Host host) nothrow {
46     import core.thread : Thread;
47     import core.time : dur, MonoTime;
48     import std.file : thisExePath;
49     import std.format : format;
50     import std.path : absolutePath;
51     import std.process : tryWait, Redirect, pipeProcess;
52     import std.stdio : stdin, stdout, stderr;
53     static import core.sys.posix.unistd;
54 
55     import my.timer : makeInterval, makeTimers;
56     import my.tty : setCBreak, CBreak;
57     import distssh.protocol : ProtocolEnv, ConfDone, Command, Workdir, Key, TerminalCapability;
58     import distssh.connection : sshCmdArgs, setupMultiplexDir;
59 
60     try {
61         const isInteractive_ = isInteractive;
62 
63         CBreak consoleChange;
64         scope (exit)
65             () {
66             if (isInteractive_) {
67                 consoleChange.reset(stdin.fileno);
68                 consoleChange.reset(stdout.fileno);
69                 consoleChange.reset(stderr.fileno);
70             }
71         }();
72 
73         auto args = () {
74             auto a = ["localrun", "--stdin-msgpack-env"];
75             if (isInteractive_) {
76                 a ~= "--pseudo-terminal";
77             }
78             return sshCmdArgs(host, a).toArgs;
79         }();
80 
81         logger.tracef("Connecting to %s. Run %s", host, args.joiner(" "));
82 
83         auto p = pipeProcess(args, Redirect.stdin);
84 
85         auto pwriter = PipeWriter(p.stdin);
86 
87         ProtocolEnv env = () {
88             if (conf.cloneEnv)
89                 return cloneEnv;
90             else if (!conf.noImportEnv)
91                 return readEnv(conf.importEnv.get.Path);
92             return ProtocolEnv.init;
93         }();
94         pwriter.pack(env);
95         pwriter.pack(Command(conf.command.get.dup));
96         pwriter.pack(Workdir(conf.workDir.get));
97         if (isInteractive_) {
98             import core.sys.posix.termios;
99 
100             termios mode;
101             if (tcgetattr(1, &mode) == 0) {
102                 pwriter.pack(TerminalCapability(mode));
103             }
104         }
105         pwriter.pack!ConfDone;
106 
107         FdPoller poller;
108         poller.put(FdPoll(stdin.fileno), [PollEvent.in_]);
109 
110         if (isInteractive_) {
111             consoleChange = setCBreak(stdin.fileno);
112         }
113 
114         auto timers = makeTimers;
115         makeInterval(timers, () @trusted {
116             HeartBeatMonitor.ping(pwriter);
117             return 250.dur!"msecs";
118         }, 50.dur!"msecs");
119 
120         // send stdin to the other side
121         ubyte[4096] stdinBuf;
122         makeInterval(timers, () @trusted {
123             auto res = poller.wait(10.dur!"msecs");
124             if (!res.empty && res[0].status[PollStatus.in_]) {
125                 auto len = core.sys.posix.unistd.read(stdin.fileno, stdinBuf.ptr, stdinBuf.length);
126                 if (len > 0) {
127                     pwriter.pack(Key(stdinBuf[0 .. len]));
128                 }
129 
130                 return 1.dur!"msecs";
131             }
132 
133             // slower if not much is happening
134             return 100.dur!"msecs";
135         }, 25.dur!"msecs");
136 
137         // dummy event to force the timer to return after 50ms
138         makeInterval(timers, () @safe { return 50.dur!"msecs"; }, 50.dur!"msecs");
139 
140         while (true) {
141             try {
142                 auto st = p.pid.tryWait;
143                 if (st.terminated)
144                     return st.status;
145 
146                 timers.tick(50.dur!"msecs");
147             } catch (Exception e) {
148             }
149         }
150     } catch (Exception e) {
151         logger.error(e.msg).collectException;
152         return 1;
153     }
154 }
155 
156 struct PipeReader {
157     import distssh.protocol : Deserialize;
158 
159     private {
160         FdPoller poller;
161         ubyte[4096] buf;
162         int fd;
163     }
164 
165     Deserialize deser;
166 
167     alias deser this;
168 
169     this(int fd) {
170         this.fd = fd;
171         poller.put(FdPoll(fd), [PollEvent.in_]);
172     }
173 
174     // Update the buffer with data from the pipe.
175     void update() nothrow {
176         try {
177             auto s = read;
178             if (!s.empty)
179                 deser.put(s);
180         } catch (Exception e) {
181         }
182     }
183 
184     /// The returned slice is to a local, static array. It must be used/copied
185     /// before next call to read.
186     private ubyte[] read() return  {
187         static import core.sys.posix.unistd;
188 
189         auto res = poller.wait();
190         if (res.empty) {
191             return null;
192         }
193 
194         if (!res[0].status[PollStatus.in_]) {
195             return null;
196         }
197 
198         auto len = core.sys.posix.unistd.read(fd, buf.ptr, buf.length);
199         if (len > 0)
200             return buf[0 .. len];
201         return null;
202     }
203 }
204 
205 struct PipeWriter {
206     import distssh.protocol : Serialize;
207 
208     File fout;
209     Serialize!(void delegate(const(ubyte)[]) @safe) ser;
210 
211     alias ser this;
212 
213     this(File f) {
214         this.fout = f;
215         this.ser = typeof(ser)(&this.put);
216     }
217 
218     void put(const(ubyte)[] v) @safe {
219         fout.rawWrite(v);
220         fout.flush;
221     }
222 }
223 
224 from.distssh.protocol.ProtocolEnv readEnv(Path filename) nothrow {
225     import distssh.protocol : ProtocolEnv, EnvVariable, Deserialize;
226     import std.file : exists;
227     import std.stdio : File;
228     import sumtype;
229     import distssh.protocol;
230 
231     ProtocolEnv rval;
232 
233     if (!exists(filename.toString)) {
234         logger.trace("File to import the environment from do not exist: ",
235                 filename).collectException;
236         return rval;
237     }
238 
239     try {
240         auto fin = File(filename);
241         Deserialize deser;
242 
243         ubyte[4096] buf;
244         while (!fin.eof) {
245             auto read_ = fin.rawRead(buf[]);
246             deser.put(read_);
247         }
248 
249         deser.unpack.match!((None x) {}, (ConfDone x) {}, (ProtocolEnv x) {
250             rval = x;
251         }, (HeartBeat x) {}, (Command x) {}, (Workdir x) {}, (Key x) {}, (TerminalCapability x) {
252         });
253     } catch (Exception e) {
254         logger.error(e.msg).collectException;
255         logger.errorf("Unable to import environment from '%s'", filename).collectException;
256     }
257 
258     return rval;
259 }
260 
261 /**
262  * #SPC-env_export_filter
263  *
264  * Params:
265  *  env = a null terminated array of C strings.
266  *
267  * Returns: a clone of the environment.
268  */
269 auto cloneEnv() nothrow {
270     import std.process : environment;
271     import std..string : strip;
272     import distssh.protocol : ProtocolEnv, EnvVariable;
273 
274     ProtocolEnv app;
275 
276     try {
277         auto env = environment.toAA;
278 
279         foreach (k; environment.get(globalEnvFilterKey, null)
280                 .strip.splitter(';').map!(a => a.strip)
281                 .filter!(a => a.length != 0)) {
282             if (env.remove(k)) {
283                 logger.tracef("Removed '%s' from the exported environment", k);
284             }
285         }
286 
287         foreach (const a; env.byKeyValue) {
288             app ~= EnvVariable(a.key, a.value);
289         }
290     } catch (Exception e) {
291         logger.warning(e.msg).collectException;
292     }
293 
294     return app;
295 }
296 
297 /// Monitor the heartbeats from the client.
298 struct HeartBeatMonitor {
299     import std.datetime.stopwatch : StopWatch;
300 
301     private {
302         Duration timeout;
303         StopWatch sw;
304     }
305 
306     this(Duration timeout) {
307         this.timeout = timeout;
308         sw.start;
309     }
310 
311     /// A heartbeat has been received.
312     void beat() {
313         sw.reset;
314         sw.start;
315     }
316 
317     bool isTimeout() {
318         if (sw.peek > timeout) {
319             return true;
320         }
321         return false;
322     }
323 
324     static void ping(ref PipeWriter f) {
325         import distssh.protocol : HeartBeat;
326 
327         f.pack!HeartBeat;
328     }
329 }
330 
331 /// Update the client beat in a separate thread, slowely, to keep the daemon
332 /// running if the client is executing a long running job.
333 struct BackgroundClientBeat {
334     import std.concurrency : send, spawn, receiveTimeout, Tid;
335 
336     private {
337         bool isRunning;
338         Tid bg;
339 
340         enum Msg {
341             stop,
342         }
343     }
344 
345     this(AbsolutePath dbPath) {
346         bg = spawn(&tick, dbPath);
347         isRunning = true;
348     }
349 
350     ~this() @trusted {
351         if (!isRunning)
352             return;
353 
354         isRunning = false;
355         send(bg, Msg.stop);
356     }
357 
358     private static void tick(AbsolutePath dbPath) nothrow {
359         import core.time : dur;
360         import distssh.database;
361 
362         const tickInterval = 10.dur!"minutes";
363 
364         bool running = true;
365         while (running) {
366             try {
367                 receiveTimeout(tickInterval, (Msg x) { running = false; });
368             } catch (Exception e) {
369                 running = false;
370             }
371 
372             try {
373                 auto db = openDatabase(dbPath);
374                 db.clientBeat;
375             } catch (Exception e) {
376             }
377         }
378     }
379 }
380 
381 /// Returns: the switches to use to execute the shell.
382 string[] shellSwitch(string shell) {
383     import std.path : baseName;
384 
385     if (shell.baseName == "bash")
386         return ["--noprofile", "--norc", "-c"];
387     return ["-c"];
388 }