1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module distssh.utility;
7 
8 import core.time : Duration;
9 import std.algorithm : splitter, map, filter, joiner;
10 import std.array : empty;
11 import std.exception : collectException;
12 import std.stdio : File;
13 import std.typecons : Nullable, NullableRef;
14 import logger = std.experimental.logger;
15 
16 import colorlog;
17 
18 import my.from_;
19 import my.path;
20 import my.fswatch : FdPoller, PollStatus, PollEvent, PollResult, FdPoll;
21 
22 import distssh.config;
23 import distssh.metric;
24 import distssh.types;
25 
26 struct ExecuteOnHostConf {
27     string workDir;
28     string[] command;
29     string importEnv;
30     bool cloneEnv;
31     bool noImportEnv;
32 }
33 
34 /** Execute a command on a remote host.
35  *
36  * #SPC-automatic_env_import
37  */
38 int executeOnHost(const ExecuteOnHostConf conf, Host host) nothrow {
39     import core.thread : Thread;
40     import core.time : dur, MonoTime;
41     import std.file : thisExePath;
42     import std.format : format;
43     import std.path : absolutePath;
44     import std.process : tryWait, Redirect, pipeProcess;
45     import std.stdio : stdin, stdout, stderr;
46     static import core.sys.posix.unistd;
47 
48     import my.timer : makeInterval, makeTimers;
49     import my.tty : setCBreak, CBreak;
50     import distssh.protocol : ProtocolEnv, ConfDone, Command, Workdir, Key, TerminalCapability;
51     import distssh.connection : sshCmdArgs, setupMultiplexDir;
52 
53     try {
54         const isInteractive = core.sys.posix.unistd.isatty(stdin.fileno) == 1;
55 
56         CBreak consoleChange;
57         scope (exit)
58             () {
59             if (isInteractive) {
60                 consoleChange.reset(stdin.fileno);
61                 consoleChange.reset(stdout.fileno);
62                 consoleChange.reset(stderr.fileno);
63             }
64         }();
65 
66         auto args = () {
67             auto a = ["localrun", "--stdin-msgpack-env"];
68             if (isInteractive) {
69                 a ~= "--pseudo-terminal";
70             }
71             return sshCmdArgs(host, a).toArgs;
72         }();
73 
74         logger.tracef("Connecting to %s. Run %s", host, args.joiner(" "));
75 
76         auto p = pipeProcess(args, Redirect.stdin);
77 
78         auto pwriter = PipeWriter(p.stdin);
79 
80         ProtocolEnv env;
81         if (conf.cloneEnv)
82             env = cloneEnv;
83         else if (!conf.noImportEnv)
84             env = readEnv(conf.importEnv.absolutePath);
85         pwriter.pack(env);
86         pwriter.pack(Command(conf.command.dup));
87         pwriter.pack(Workdir(conf.workDir));
88         if (isInteractive) {
89             import core.sys.posix.termios;
90 
91             termios mode;
92             if (tcgetattr(1, &mode) == 0) {
93                 pwriter.pack(TerminalCapability(mode));
94             }
95         }
96         pwriter.pack!ConfDone;
97 
98         FdPoller poller;
99         poller.put(FdPoll(stdin.fileno), [PollEvent.in_]);
100 
101         if (isInteractive) {
102             consoleChange = setCBreak(stdin.fileno);
103         }
104 
105         auto timers = makeTimers;
106         makeInterval(timers, () @trusted {
107             HeartBeatMonitor.ping(pwriter);
108             return 250.dur!"msecs";
109         }, 50.dur!"msecs");
110 
111         // send stdin to the other side
112         ubyte[4096] stdinBuf;
113         makeInterval(timers, () @trusted {
114             auto res = poller.wait(10.dur!"msecs");
115             if (!res.empty && res[0].status[PollStatus.in_]) {
116                 auto len = core.sys.posix.unistd.read(stdin.fileno, stdinBuf.ptr, stdinBuf.length);
117                 if (len > 0) {
118                     pwriter.pack(Key(stdinBuf[0 .. len]));
119                 }
120 
121                 return 1.dur!"msecs";
122             }
123 
124             // slower if not much is happening
125             return 100.dur!"msecs";
126         }, 25.dur!"msecs");
127 
128         // dummy event to force the timer to return after 50ms
129         makeInterval(timers, () @safe { return 50.dur!"msecs"; }, 50.dur!"msecs");
130 
131         while (true) {
132             try {
133                 auto st = p.pid.tryWait;
134                 if (st.terminated)
135                     return st.status;
136 
137                 timers.tick(50.dur!"msecs");
138             } catch (Exception e) {
139             }
140         }
141     } catch (Exception e) {
142         logger.error(e.msg).collectException;
143         return 1;
144     }
145 }
146 
147 struct PipeReader {
148     import distssh.protocol : Deserialize;
149 
150     private {
151         FdPoller poller;
152         ubyte[4096] buf;
153         int fd;
154     }
155 
156     Deserialize deser;
157 
158     alias deser this;
159 
160     this(int fd) {
161         this.fd = fd;
162         poller.put(FdPoll(fd), [PollEvent.in_]);
163     }
164 
165     // Update the buffer with data from the pipe.
166     void update() nothrow {
167         try {
168             auto s = read;
169             if (!s.empty)
170                 deser.put(s);
171         } catch (Exception e) {
172         }
173     }
174 
175     /// The returned slice is to a local, static array. It must be used/copied
176     /// before next call to read.
177     private ubyte[] read() return  {
178         static import core.sys.posix.unistd;
179 
180         auto res = poller.wait();
181         if (res.empty) {
182             return null;
183         }
184 
185         if (!res[0].status[PollStatus.in_]) {
186             return null;
187         }
188 
189         auto len = core.sys.posix.unistd.read(fd, buf.ptr, buf.length);
190         if (len > 0)
191             return buf[0 .. len];
192         return null;
193     }
194 }
195 
196 struct PipeWriter {
197     import distssh.protocol : Serialize;
198 
199     File fout;
200     Serialize!(void delegate(const(ubyte)[]) @safe) ser;
201 
202     alias ser this;
203 
204     this(File f) {
205         this.fout = f;
206         this.ser = typeof(ser)(&this.put);
207     }
208 
209     void put(const(ubyte)[] v) @safe {
210         fout.rawWrite(v);
211         fout.flush;
212     }
213 }
214 
215 from.distssh.protocol.ProtocolEnv readEnv(string filename) nothrow {
216     import distssh.protocol : ProtocolEnv, EnvVariable, Deserialize;
217     import std.file : exists;
218     import std.stdio : File;
219     import sumtype;
220     import distssh.protocol;
221 
222     ProtocolEnv rval;
223 
224     if (!exists(filename)) {
225         logger.trace("File to import the environment from do not exist: ",
226                 filename).collectException;
227         return rval;
228     }
229 
230     try {
231         auto fin = File(filename);
232         Deserialize deser;
233 
234         ubyte[4096] buf;
235         while (!fin.eof) {
236             auto read_ = fin.rawRead(buf[]);
237             deser.put(read_);
238         }
239 
240         deser.unpack.match!((None x) {}, (ConfDone x) {}, (ProtocolEnv x) {
241             rval = x;
242         }, (HeartBeat x) {}, (Command x) {}, (Workdir x) {}, (Key x) {}, (TerminalCapability x) {
243         });
244     } catch (Exception e) {
245         logger.error(e.msg).collectException;
246         logger.errorf("Unable to import environment from '%s'", filename).collectException;
247     }
248 
249     return rval;
250 }
251 
252 /**
253  * #SPC-env_export_filter
254  *
255  * Params:
256  *  env = a null terminated array of C strings.
257  *
258  * Returns: a clone of the environment.
259  */
260 auto cloneEnv() nothrow {
261     import std.process : environment;
262     import std..string : strip;
263     import distssh.protocol : ProtocolEnv, EnvVariable;
264 
265     ProtocolEnv app;
266 
267     try {
268         auto env = environment.toAA;
269 
270         foreach (k; environment.get(globalEnvFilterKey, null)
271                 .strip.splitter(';').map!(a => a.strip)
272                 .filter!(a => a.length != 0)) {
273             if (env.remove(k)) {
274                 logger.tracef("Removed '%s' from the exported environment", k);
275             }
276         }
277 
278         foreach (const a; env.byKeyValue) {
279             app ~= EnvVariable(a.key, a.value);
280         }
281     } catch (Exception e) {
282         logger.warning(e.msg).collectException;
283     }
284 
285     return app;
286 }
287 
288 /// Monitor the heartbeats from the client.
289 struct HeartBeatMonitor {
290     import std.datetime.stopwatch : StopWatch;
291 
292     private {
293         Duration timeout;
294         StopWatch sw;
295     }
296 
297     this(Duration timeout) {
298         this.timeout = timeout;
299         sw.start;
300     }
301 
302     /// A heartbeat has been received.
303     void beat() {
304         sw.reset;
305         sw.start;
306     }
307 
308     bool isTimeout() {
309         if (sw.peek > timeout) {
310             return true;
311         }
312         return false;
313     }
314 
315     static void ping(ref PipeWriter f) {
316         import distssh.protocol : HeartBeat;
317 
318         f.pack!HeartBeat;
319     }
320 }
321 
322 /// Update the client beat in a separate thread, slowely, to keep the daemon
323 /// running if the client is executing a long running job.
324 struct BackgroundClientBeat {
325     import std.concurrency : send, spawn, receiveTimeout, Tid;
326 
327     private {
328         bool isRunning;
329         Tid bg;
330 
331         enum Msg {
332             stop,
333         }
334     }
335 
336     this(AbsolutePath dbPath) {
337         bg = spawn(&tick, dbPath);
338         isRunning = true;
339     }
340 
341     ~this() @trusted {
342         if (!isRunning)
343             return;
344 
345         isRunning = false;
346         send(bg, Msg.stop);
347     }
348 
349     private static void tick(AbsolutePath dbPath) nothrow {
350         import core.time : dur;
351         import distssh.database;
352 
353         const tickInterval = 10.dur!"minutes";
354 
355         bool running = true;
356         while (running) {
357             try {
358                 receiveTimeout(tickInterval, (Msg x) { running = false; });
359             } catch (Exception e) {
360                 running = false;
361             }
362 
363             try {
364                 auto db = openDatabase(dbPath);
365                 db.clientBeat;
366             } catch (Exception e) {
367             }
368         }
369     }
370 }
371 
372 /// Returns: the switches to use to execute the shell.
373 string[] shellSwitch(string shell) {
374     import std.path : baseName;
375 
376     if (shell.baseName == "bash")
377         return ["--noprofile", "--norc", "-c"];
378     return ["-c"];
379 }