1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module distssh.utility; 7 8 import core.time : Duration; 9 import std.algorithm : splitter, map, filter, joiner; 10 import std.array : empty; 11 import std.exception : collectException; 12 import std.stdio : File; 13 import std.typecons : Nullable, NullableRef; 14 import logger = std.experimental.logger; 15 16 import colorlog; 17 18 import my.from_; 19 import my.path; 20 import my.fswatch : FdPoller, PollStatus, PollEvent, PollResult, FdPoll; 21 22 import distssh.config; 23 import distssh.metric; 24 import distssh.types; 25 26 struct ExecuteOnHostConf { 27 string workDir; 28 string[] command; 29 string importEnv; 30 bool cloneEnv; 31 bool noImportEnv; 32 } 33 34 /** Execute a command on a remote host. 35 * 36 * #SPC-automatic_env_import 37 */ 38 int executeOnHost(const ExecuteOnHostConf conf, Host host) nothrow { 39 import core.thread : Thread; 40 import core.time : dur, MonoTime; 41 import std.file : thisExePath; 42 import std.format : format; 43 import std.path : absolutePath; 44 import std.process : tryWait, Redirect, pipeProcess; 45 import std.stdio : stdin, stdout, stderr; 46 static import core.sys.posix.unistd; 47 48 import my.timer : makeInterval, makeTimers; 49 import my.tty : setCBreak, CBreak; 50 import distssh.protocol : ProtocolEnv, ConfDone, Command, Workdir, Key, TerminalCapability; 51 import distssh.connection : sshCmdArgs, setupMultiplexDir; 52 53 try { 54 const isInteractive = core.sys.posix.unistd.isatty(stdin.fileno) == 1; 55 56 CBreak consoleChange; 57 scope (exit) 58 () { 59 if (isInteractive) { 60 consoleChange.reset(stdin.fileno); 61 consoleChange.reset(stdout.fileno); 62 consoleChange.reset(stderr.fileno); 63 } 64 }(); 65 66 auto args = () { 67 auto a = ["localrun", "--stdin-msgpack-env"]; 68 if (isInteractive) { 69 a ~= "--pseudo-terminal"; 70 } 71 return sshCmdArgs(host, a).toArgs; 72 }(); 73 74 logger.tracef("Connecting to %s. Run %s", host, args.joiner(" ")); 75 76 auto p = pipeProcess(args, Redirect.stdin); 77 78 auto pwriter = PipeWriter(p.stdin); 79 80 ProtocolEnv env; 81 if (conf.cloneEnv) 82 env = cloneEnv; 83 else if (!conf.noImportEnv) 84 env = readEnv(conf.importEnv.absolutePath); 85 pwriter.pack(env); 86 pwriter.pack(Command(conf.command.dup)); 87 pwriter.pack(Workdir(conf.workDir)); 88 if (isInteractive) { 89 import core.sys.posix.termios; 90 91 termios mode; 92 if (tcgetattr(1, &mode) == 0) { 93 pwriter.pack(TerminalCapability(mode)); 94 } 95 } 96 pwriter.pack!ConfDone; 97 98 FdPoller poller; 99 poller.put(FdPoll(stdin.fileno), [PollEvent.in_]); 100 101 if (isInteractive) { 102 consoleChange = setCBreak(stdin.fileno); 103 } 104 105 auto timers = makeTimers; 106 makeInterval(timers, () @trusted { 107 HeartBeatMonitor.ping(pwriter); 108 return 250.dur!"msecs"; 109 }, 50.dur!"msecs"); 110 111 // send stdin to the other side 112 ubyte[4096] stdinBuf; 113 makeInterval(timers, () @trusted { 114 auto res = poller.wait(10.dur!"msecs"); 115 if (!res.empty && res[0].status[PollStatus.in_]) { 116 auto len = core.sys.posix.unistd.read(stdin.fileno, stdinBuf.ptr, stdinBuf.length); 117 if (len > 0) { 118 pwriter.pack(Key(stdinBuf[0 .. len])); 119 } 120 121 return 1.dur!"msecs"; 122 } 123 124 // slower if not much is happening 125 return 100.dur!"msecs"; 126 }, 25.dur!"msecs"); 127 128 // dummy event to force the timer to return after 50ms 129 makeInterval(timers, () @safe { return 50.dur!"msecs"; }, 50.dur!"msecs"); 130 131 while (true) { 132 try { 133 auto st = p.pid.tryWait; 134 if (st.terminated) 135 return st.status; 136 137 timers.tick(50.dur!"msecs"); 138 } catch (Exception e) { 139 } 140 } 141 } catch (Exception e) { 142 logger.error(e.msg).collectException; 143 return 1; 144 } 145 } 146 147 struct PipeReader { 148 import distssh.protocol : Deserialize; 149 150 private { 151 FdPoller poller; 152 ubyte[4096] buf; 153 int fd; 154 } 155 156 Deserialize deser; 157 158 alias deser this; 159 160 this(int fd) { 161 this.fd = fd; 162 poller.put(FdPoll(fd), [PollEvent.in_]); 163 } 164 165 // Update the buffer with data from the pipe. 166 void update() nothrow { 167 try { 168 auto s = read; 169 if (!s.empty) 170 deser.put(s); 171 } catch (Exception e) { 172 } 173 } 174 175 /// The returned slice is to a local, static array. It must be used/copied 176 /// before next call to read. 177 private ubyte[] read() return { 178 static import core.sys.posix.unistd; 179 180 auto res = poller.wait(); 181 if (res.empty) { 182 return null; 183 } 184 185 if (!res[0].status[PollStatus.in_]) { 186 return null; 187 } 188 189 auto len = core.sys.posix.unistd.read(fd, buf.ptr, buf.length); 190 if (len > 0) 191 return buf[0 .. len]; 192 return null; 193 } 194 } 195 196 struct PipeWriter { 197 import distssh.protocol : Serialize; 198 199 File fout; 200 Serialize!(void delegate(const(ubyte)[]) @safe) ser; 201 202 alias ser this; 203 204 this(File f) { 205 this.fout = f; 206 this.ser = typeof(ser)(&this.put); 207 } 208 209 void put(const(ubyte)[] v) @safe { 210 fout.rawWrite(v); 211 fout.flush; 212 } 213 } 214 215 from.distssh.protocol.ProtocolEnv readEnv(string filename) nothrow { 216 import distssh.protocol : ProtocolEnv, EnvVariable, Deserialize; 217 import std.file : exists; 218 import std.stdio : File; 219 import sumtype; 220 import distssh.protocol; 221 222 ProtocolEnv rval; 223 224 if (!exists(filename)) { 225 logger.trace("File to import the environment from do not exist: ", 226 filename).collectException; 227 return rval; 228 } 229 230 try { 231 auto fin = File(filename); 232 Deserialize deser; 233 234 ubyte[4096] buf; 235 while (!fin.eof) { 236 auto read_ = fin.rawRead(buf[]); 237 deser.put(read_); 238 } 239 240 deser.unpack.match!((None x) {}, (ConfDone x) {}, (ProtocolEnv x) { 241 rval = x; 242 }, (HeartBeat x) {}, (Command x) {}, (Workdir x) {}, (Key x) {}, (TerminalCapability x) { 243 }); 244 } catch (Exception e) { 245 logger.error(e.msg).collectException; 246 logger.errorf("Unable to import environment from '%s'", filename).collectException; 247 } 248 249 return rval; 250 } 251 252 /** 253 * #SPC-env_export_filter 254 * 255 * Params: 256 * env = a null terminated array of C strings. 257 * 258 * Returns: a clone of the environment. 259 */ 260 auto cloneEnv() nothrow { 261 import std.process : environment; 262 import std..string : strip; 263 import distssh.protocol : ProtocolEnv, EnvVariable; 264 265 ProtocolEnv app; 266 267 try { 268 auto env = environment.toAA; 269 270 foreach (k; environment.get(globalEnvFilterKey, null) 271 .strip.splitter(';').map!(a => a.strip) 272 .filter!(a => a.length != 0)) { 273 if (env.remove(k)) { 274 logger.tracef("Removed '%s' from the exported environment", k); 275 } 276 } 277 278 foreach (const a; env.byKeyValue) { 279 app ~= EnvVariable(a.key, a.value); 280 } 281 } catch (Exception e) { 282 logger.warning(e.msg).collectException; 283 } 284 285 return app; 286 } 287 288 /// Monitor the heartbeats from the client. 289 struct HeartBeatMonitor { 290 import std.datetime.stopwatch : StopWatch; 291 292 private { 293 Duration timeout; 294 StopWatch sw; 295 } 296 297 this(Duration timeout) { 298 this.timeout = timeout; 299 sw.start; 300 } 301 302 /// A heartbeat has been received. 303 void beat() { 304 sw.reset; 305 sw.start; 306 } 307 308 bool isTimeout() { 309 if (sw.peek > timeout) { 310 return true; 311 } 312 return false; 313 } 314 315 static void ping(ref PipeWriter f) { 316 import distssh.protocol : HeartBeat; 317 318 f.pack!HeartBeat; 319 } 320 } 321 322 /// Update the client beat in a separate thread, slowely, to keep the daemon 323 /// running if the client is executing a long running job. 324 struct BackgroundClientBeat { 325 import std.concurrency : send, spawn, receiveTimeout, Tid; 326 327 private { 328 bool isRunning; 329 Tid bg; 330 331 enum Msg { 332 stop, 333 } 334 } 335 336 this(AbsolutePath dbPath) { 337 bg = spawn(&tick, dbPath); 338 isRunning = true; 339 } 340 341 ~this() @trusted { 342 if (!isRunning) 343 return; 344 345 isRunning = false; 346 send(bg, Msg.stop); 347 } 348 349 private static void tick(AbsolutePath dbPath) nothrow { 350 import core.time : dur; 351 import distssh.database; 352 353 const tickInterval = 10.dur!"minutes"; 354 355 bool running = true; 356 while (running) { 357 try { 358 receiveTimeout(tickInterval, (Msg x) { running = false; }); 359 } catch (Exception e) { 360 running = false; 361 } 362 363 try { 364 auto db = openDatabase(dbPath); 365 db.clientBeat; 366 } catch (Exception e) { 367 } 368 } 369 } 370 } 371 372 /// Returns: the switches to use to execute the shell. 373 string[] shellSwitch(string shell) { 374 import std.path : baseName; 375 376 if (shell.baseName == "bash") 377 return ["--noprofile", "--norc", "-c"]; 378 return ["-c"]; 379 }