1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module distssh.utility;
7 
8 import core.time : Duration;
9 import std.algorithm : splitter, map, filter, joiner;
10 import std.array : empty;
11 import std.exception : collectException;
12 import std.stdio : File;
13 import std.typecons : Nullable, NullableRef;
14 import logger = std.experimental.logger;
15 
16 import colorlog;
17 
18 import my.from_;
19 import my.path;
20 import my.fswatch : FdPoller, PollStatus, PollEvent, PollResult, FdPoll;
21 
22 import distssh.config;
23 import distssh.metric;
24 import distssh.types;
25 
26 struct ExecuteOnHostConf {
27     string workDir;
28     string[] command;
29     string importEnv;
30     bool cloneEnv;
31     bool noImportEnv;
32 }
33 
34 /** Execute a command on a remote host.
35  *
36  * #SPC-automatic_env_import
37  */
38 int executeOnHost(const ExecuteOnHostConf conf, Host host) nothrow {
39     import core.thread : Thread;
40     import core.time : dur, MonoTime;
41     import std.file : thisExePath;
42     import std.format : format;
43     import std.path : absolutePath;
44     import std.process : tryWait, Redirect, pipeProcess;
45     import std.stdio : stdin;
46     static import core.sys.posix.unistd;
47 
48     import my.timer : makeInterval, makeTimers;
49     import my.tty : setCBreak, CBreak;
50     import distssh.protocol : ProtocolEnv, ConfDone, Command, Workdir, Key, TerminalCapability;
51 
52     try {
53         const isInteractive = core.sys.posix.unistd.isatty(stdin.fileno) == 1;
54 
55         CBreak consoleChange;
56         scope (exit)
57             () {
58             if (isInteractive)
59                 consoleChange.reset(stdin.fileno);
60         }();
61 
62         auto args = ["ssh"] ~ sshNoLoginArgs ~ [
63             host, thisExePath, "localrun", "--stdin-msgpack-env"
64         ];
65 
66         if (isInteractive) {
67             args ~= "--pseudo-terminal";
68         }
69 
70         logger.tracef("Connecting to %s. Run %s", host, args.joiner(" "));
71 
72         auto p = pipeProcess(args, Redirect.stdin);
73 
74         auto pwriter = PipeWriter(p.stdin);
75 
76         ProtocolEnv env;
77         if (conf.cloneEnv)
78             env = cloneEnv;
79         else if (!conf.noImportEnv)
80             env = readEnv(conf.importEnv.absolutePath);
81         pwriter.pack(env);
82         pwriter.pack(Command(conf.command.dup));
83         pwriter.pack(Workdir(conf.workDir));
84         if (isInteractive) {
85             import core.sys.posix.termios;
86 
87             termios mode;
88             if (tcgetattr(1, &mode) == 0) {
89                 pwriter.pack(TerminalCapability(mode));
90             }
91         }
92         pwriter.pack!ConfDone;
93 
94         FdPoller poller;
95         poller.put(FdPoll(stdin.fileno), [PollEvent.in_]);
96 
97         if (isInteractive) {
98             consoleChange = setCBreak(stdin.fileno);
99         }
100 
101         auto timers = makeTimers;
102         makeInterval(timers, () @trusted {
103             HeartBeatMonitor.ping(pwriter);
104             return 250.dur!"msecs";
105         }, 50.dur!"msecs");
106 
107         // send stdin to the other side
108         ubyte[4096] stdinBuf;
109         makeInterval(timers, () @trusted {
110             auto res = poller.wait();
111             if (!res.empty && res[0].status[PollStatus.in_]) {
112                 auto len = core.sys.posix.unistd.read(stdin.fileno, stdinBuf.ptr, stdinBuf.length);
113                 if (len > 0) {
114                     pwriter.pack(Key(stdinBuf[0 .. len]));
115                 }
116 
117                 return 25.dur!"msecs";
118             }
119 
120             // slower if not much is happening
121             return 100.dur!"msecs";
122         }, 25.dur!"msecs");
123 
124         // dummy event to force the timer to return after 50ms
125         makeInterval(timers, () @safe { return 50.dur!"msecs"; }, 50.dur!"msecs");
126 
127         while (true) {
128             try {
129                 auto st = p.pid.tryWait;
130                 if (st.terminated)
131                     return st.status;
132 
133                 timers.tick(50.dur!"msecs");
134             } catch (Exception e) {
135             }
136         }
137     } catch (Exception e) {
138         logger.error(e.msg).collectException;
139         return 1;
140     }
141 }
142 
143 struct PipeReader {
144     import distssh.protocol : Deserialize;
145 
146     private {
147         FdPoller poller;
148         ubyte[4096] buf;
149         int fd;
150     }
151 
152     Deserialize deser;
153 
154     alias deser this;
155 
156     this(int fd) {
157         this.fd = fd;
158         poller.put(FdPoll(fd), [PollEvent.in_]);
159     }
160 
161     // Update the buffer with data from the pipe.
162     void update() nothrow {
163         try {
164             auto s = read;
165             if (!s.empty)
166                 deser.put(s);
167         } catch (Exception e) {
168         }
169     }
170 
171     /// The returned slice is to a local, static array. It must be used/copied
172     /// before next call to read.
173     private ubyte[] read() return  {
174         static import core.sys.posix.unistd;
175 
176         auto res = poller.wait();
177         if (res.empty) {
178             return null;
179         }
180 
181         if (!res[0].status[PollStatus.in_]) {
182             return null;
183         }
184 
185         auto len = core.sys.posix.unistd.read(fd, buf.ptr, buf.length);
186         if (len > 0)
187             return buf[0 .. len];
188         return null;
189     }
190 }
191 
192 struct PipeWriter {
193     import distssh.protocol : Serialize;
194 
195     File fout;
196     Serialize!(void delegate(const(ubyte)[]) @safe) ser;
197 
198     alias ser this;
199 
200     this(File f) {
201         this.fout = f;
202         this.ser = typeof(ser)(&this.put);
203     }
204 
205     void put(const(ubyte)[] v) @safe {
206         fout.rawWrite(v);
207         fout.flush;
208     }
209 }
210 
211 from.distssh.protocol.ProtocolEnv readEnv(string filename) nothrow {
212     import distssh.protocol : ProtocolEnv, EnvVariable, Deserialize;
213     import std.file : exists;
214     import std.stdio : File;
215     import sumtype;
216     import distssh.protocol;
217 
218     ProtocolEnv rval;
219 
220     if (!exists(filename)) {
221         logger.trace("File to import the environment from do not exist: ",
222                 filename).collectException;
223         return rval;
224     }
225 
226     try {
227         auto fin = File(filename);
228         Deserialize deser;
229 
230         ubyte[4096] buf;
231         while (!fin.eof) {
232             auto read_ = fin.rawRead(buf[]);
233             deser.put(read_);
234         }
235 
236         deser.unpack.match!((None x) {}, (ConfDone x) {}, (ProtocolEnv x) {
237             rval = x;
238         }, (HeartBeat x) {}, (Command x) {}, (Workdir x) {}, (Key x) {}, (TerminalCapability x) {
239         });
240     } catch (Exception e) {
241         logger.error(e.msg).collectException;
242         logger.errorf("Unable to import environment from '%s'", filename).collectException;
243     }
244 
245     return rval;
246 }
247 
248 /**
249  * #SPC-env_export_filter
250  *
251  * Params:
252  *  env = a null terminated array of C strings.
253  *
254  * Returns: a clone of the environment.
255  */
256 auto cloneEnv() nothrow {
257     import std.process : environment;
258     import std..string : strip;
259     import distssh.protocol : ProtocolEnv, EnvVariable;
260 
261     ProtocolEnv app;
262 
263     try {
264         auto env = environment.toAA;
265 
266         foreach (k; environment.get(globalEnvFilterKey, null)
267                 .strip.splitter(';').map!(a => a.strip)
268                 .filter!(a => a.length != 0)) {
269             if (env.remove(k)) {
270                 logger.tracef("Removed '%s' from the exported environment", k);
271             }
272         }
273 
274         foreach (const a; env.byKeyValue) {
275             app ~= EnvVariable(a.key, a.value);
276         }
277     } catch (Exception e) {
278         logger.warning(e.msg).collectException;
279     }
280 
281     return app;
282 }
283 
284 /// Monitor the heartbeats from the client.
285 struct HeartBeatMonitor {
286     import std.datetime.stopwatch : StopWatch;
287 
288     private {
289         Duration timeout;
290         StopWatch sw;
291     }
292 
293     this(Duration timeout) {
294         this.timeout = timeout;
295         sw.start;
296     }
297 
298     /// A heartbeat has been received.
299     void beat() {
300         sw.reset;
301         sw.start;
302     }
303 
304     bool isTimeout() {
305         if (sw.peek > timeout) {
306             return true;
307         }
308         return false;
309     }
310 
311     static void ping(ref PipeWriter f) {
312         import distssh.protocol : HeartBeat;
313 
314         f.pack!HeartBeat;
315     }
316 }
317 
318 /// Update the client beat in a separate thread, slowely, to keep the daemon
319 /// running if the client is executing a long running job.
320 struct BackgroundClientBeat {
321     import std.concurrency : send, spawn, receiveTimeout, Tid;
322 
323     private {
324         bool isRunning;
325         Tid bg;
326 
327         enum Msg {
328             stop,
329         }
330     }
331 
332     this(AbsolutePath dbPath) {
333         bg = spawn(&tick, dbPath);
334         isRunning = true;
335     }
336 
337     ~this() @trusted {
338         if (!isRunning)
339             return;
340 
341         isRunning = false;
342         send(bg, Msg.stop);
343     }
344 
345     private static void tick(AbsolutePath dbPath) nothrow {
346         import core.time : dur;
347         import distssh.database;
348 
349         const tickInterval = 10.dur!"minutes";
350 
351         bool running = true;
352         while (running) {
353             try {
354                 receiveTimeout(tickInterval, (Msg x) { running = false; });
355             } catch (Exception e) {
356                 running = false;
357             }
358 
359             try {
360                 auto db = openDatabase(dbPath);
361                 db.clientBeat;
362             } catch (Exception e) {
363             }
364         }
365     }
366 }