1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module distssh.utility; 7 8 import core.time : Duration; 9 import std.algorithm : splitter, map, filter, joiner; 10 import std.array : empty; 11 import std.exception : collectException; 12 import std.stdio : File; 13 import std.typecons : Nullable, NullableRef; 14 import logger = std.experimental.logger; 15 16 import colorlog; 17 18 import my.from_; 19 import my.path; 20 import my.fswatch : FdPoller, PollStatus, PollEvent, PollResult, FdPoll; 21 22 import distssh.config; 23 import distssh.metric; 24 import distssh.types; 25 26 struct ExecuteOnHostConf { 27 string workDir; 28 string[] command; 29 string importEnv; 30 bool cloneEnv; 31 bool noImportEnv; 32 } 33 34 /** Execute a command on a remote host. 35 * 36 * #SPC-automatic_env_import 37 */ 38 int executeOnHost(const ExecuteOnHostConf conf, Host host) nothrow { 39 import core.thread : Thread; 40 import core.time : dur, MonoTime; 41 import std.file : thisExePath; 42 import std.format : format; 43 import std.path : absolutePath; 44 import std.process : tryWait, Redirect, pipeProcess; 45 import std.stdio : stdin; 46 static import core.sys.posix.unistd; 47 48 import my.timer : makeInterval, makeTimers; 49 import my.tty : setCBreak, CBreak; 50 import distssh.protocol : ProtocolEnv, ConfDone, Command, Workdir, Key, TerminalCapability; 51 52 try { 53 const isInteractive = core.sys.posix.unistd.isatty(stdin.fileno) == 1; 54 55 CBreak consoleChange; 56 scope (exit) 57 () { 58 if (isInteractive) 59 consoleChange.reset(stdin.fileno); 60 }(); 61 62 auto args = ["ssh"] ~ sshNoLoginArgs ~ [ 63 host, thisExePath, "localrun", "--stdin-msgpack-env" 64 ]; 65 66 if (isInteractive) { 67 args ~= "--pseudo-terminal"; 68 } 69 70 logger.tracef("Connecting to %s. Run %s", host, args.joiner(" ")); 71 72 auto p = pipeProcess(args, Redirect.stdin); 73 74 auto pwriter = PipeWriter(p.stdin); 75 76 ProtocolEnv env; 77 if (conf.cloneEnv) 78 env = cloneEnv; 79 else if (!conf.noImportEnv) 80 env = readEnv(conf.importEnv.absolutePath); 81 pwriter.pack(env); 82 pwriter.pack(Command(conf.command.dup)); 83 pwriter.pack(Workdir(conf.workDir)); 84 if (isInteractive) { 85 import core.sys.posix.termios; 86 87 termios mode; 88 if (tcgetattr(1, &mode) == 0) { 89 pwriter.pack(TerminalCapability(mode)); 90 } 91 } 92 pwriter.pack!ConfDone; 93 94 FdPoller poller; 95 poller.put(FdPoll(stdin.fileno), [PollEvent.in_]); 96 97 if (isInteractive) { 98 consoleChange = setCBreak(stdin.fileno); 99 } 100 101 auto timers = makeTimers; 102 makeInterval(timers, () @trusted { 103 HeartBeatMonitor.ping(pwriter); 104 return 250.dur!"msecs"; 105 }, 50.dur!"msecs"); 106 107 // send stdin to the other side 108 ubyte[4096] stdinBuf; 109 makeInterval(timers, () @trusted { 110 auto res = poller.wait(); 111 if (!res.empty && res[0].status[PollStatus.in_]) { 112 auto len = core.sys.posix.unistd.read(stdin.fileno, stdinBuf.ptr, stdinBuf.length); 113 if (len > 0) { 114 pwriter.pack(Key(stdinBuf[0 .. len])); 115 } 116 117 return 25.dur!"msecs"; 118 } 119 120 // slower if not much is happening 121 return 100.dur!"msecs"; 122 }, 25.dur!"msecs"); 123 124 // dummy event to force the timer to return after 50ms 125 makeInterval(timers, () @safe { return 50.dur!"msecs"; }, 50.dur!"msecs"); 126 127 while (true) { 128 try { 129 auto st = p.pid.tryWait; 130 if (st.terminated) 131 return st.status; 132 133 timers.tick(50.dur!"msecs"); 134 } catch (Exception e) { 135 } 136 } 137 } catch (Exception e) { 138 logger.error(e.msg).collectException; 139 return 1; 140 } 141 } 142 143 struct PipeReader { 144 import distssh.protocol : Deserialize; 145 146 private { 147 FdPoller poller; 148 ubyte[4096] buf; 149 int fd; 150 } 151 152 Deserialize deser; 153 154 alias deser this; 155 156 this(int fd) { 157 this.fd = fd; 158 poller.put(FdPoll(fd), [PollEvent.in_]); 159 } 160 161 // Update the buffer with data from the pipe. 162 void update() nothrow { 163 try { 164 auto s = read; 165 if (!s.empty) 166 deser.put(s); 167 } catch (Exception e) { 168 } 169 } 170 171 /// The returned slice is to a local, static array. It must be used/copied 172 /// before next call to read. 173 private ubyte[] read() return { 174 static import core.sys.posix.unistd; 175 176 auto res = poller.wait(); 177 if (res.empty) { 178 return null; 179 } 180 181 if (!res[0].status[PollStatus.in_]) { 182 return null; 183 } 184 185 auto len = core.sys.posix.unistd.read(fd, buf.ptr, buf.length); 186 if (len > 0) 187 return buf[0 .. len]; 188 return null; 189 } 190 } 191 192 struct PipeWriter { 193 import distssh.protocol : Serialize; 194 195 File fout; 196 Serialize!(void delegate(const(ubyte)[]) @safe) ser; 197 198 alias ser this; 199 200 this(File f) { 201 this.fout = f; 202 this.ser = typeof(ser)(&this.put); 203 } 204 205 void put(const(ubyte)[] v) @safe { 206 fout.rawWrite(v); 207 fout.flush; 208 } 209 } 210 211 from.distssh.protocol.ProtocolEnv readEnv(string filename) nothrow { 212 import distssh.protocol : ProtocolEnv, EnvVariable, Deserialize; 213 import std.file : exists; 214 import std.stdio : File; 215 import sumtype; 216 import distssh.protocol; 217 218 ProtocolEnv rval; 219 220 if (!exists(filename)) { 221 logger.trace("File to import the environment from do not exist: ", 222 filename).collectException; 223 return rval; 224 } 225 226 try { 227 auto fin = File(filename); 228 Deserialize deser; 229 230 ubyte[4096] buf; 231 while (!fin.eof) { 232 auto read_ = fin.rawRead(buf[]); 233 deser.put(read_); 234 } 235 236 deser.unpack.match!((None x) {}, (ConfDone x) {}, (ProtocolEnv x) { 237 rval = x; 238 }, (HeartBeat x) {}, (Command x) {}, (Workdir x) {}, (Key x) {}, (TerminalCapability x) { 239 }); 240 } catch (Exception e) { 241 logger.error(e.msg).collectException; 242 logger.errorf("Unable to import environment from '%s'", filename).collectException; 243 } 244 245 return rval; 246 } 247 248 /** 249 * #SPC-env_export_filter 250 * 251 * Params: 252 * env = a null terminated array of C strings. 253 * 254 * Returns: a clone of the environment. 255 */ 256 auto cloneEnv() nothrow { 257 import std.process : environment; 258 import std..string : strip; 259 import distssh.protocol : ProtocolEnv, EnvVariable; 260 261 ProtocolEnv app; 262 263 try { 264 auto env = environment.toAA; 265 266 foreach (k; environment.get(globalEnvFilterKey, null) 267 .strip.splitter(';').map!(a => a.strip) 268 .filter!(a => a.length != 0)) { 269 if (env.remove(k)) { 270 logger.tracef("Removed '%s' from the exported environment", k); 271 } 272 } 273 274 foreach (const a; env.byKeyValue) { 275 app ~= EnvVariable(a.key, a.value); 276 } 277 } catch (Exception e) { 278 logger.warning(e.msg).collectException; 279 } 280 281 return app; 282 } 283 284 /// Monitor the heartbeats from the client. 285 struct HeartBeatMonitor { 286 import std.datetime.stopwatch : StopWatch; 287 288 private { 289 Duration timeout; 290 StopWatch sw; 291 } 292 293 this(Duration timeout) { 294 this.timeout = timeout; 295 sw.start; 296 } 297 298 /// A heartbeat has been received. 299 void beat() { 300 sw.reset; 301 sw.start; 302 } 303 304 bool isTimeout() { 305 if (sw.peek > timeout) { 306 return true; 307 } 308 return false; 309 } 310 311 static void ping(ref PipeWriter f) { 312 import distssh.protocol : HeartBeat; 313 314 f.pack!HeartBeat; 315 } 316 } 317 318 /// Update the client beat in a separate thread, slowely, to keep the daemon 319 /// running if the client is executing a long running job. 320 struct BackgroundClientBeat { 321 import std.concurrency : send, spawn, receiveTimeout, Tid; 322 323 private { 324 bool isRunning; 325 Tid bg; 326 327 enum Msg { 328 stop, 329 } 330 } 331 332 this(AbsolutePath dbPath) { 333 bg = spawn(&tick, dbPath); 334 isRunning = true; 335 } 336 337 ~this() @trusted { 338 if (!isRunning) 339 return; 340 341 isRunning = false; 342 send(bg, Msg.stop); 343 } 344 345 private static void tick(AbsolutePath dbPath) nothrow { 346 import core.time : dur; 347 import distssh.database; 348 349 const tickInterval = 10.dur!"minutes"; 350 351 bool running = true; 352 while (running) { 353 try { 354 receiveTimeout(tickInterval, (Msg x) { running = false; }); 355 } catch (Exception e) { 356 running = false; 357 } 358 359 try { 360 auto db = openDatabase(dbPath); 361 db.clientBeat; 362 } catch (Exception e) { 363 } 364 } 365 } 366 }