1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module distssh.utility; 7 8 import core.time : Duration; 9 import std.algorithm : splitter, map, filter, joiner; 10 import std.array : empty; 11 import std.exception : collectException; 12 import std.stdio : File; 13 import std.typecons : Nullable, NullableRef; 14 import logger = std.experimental.logger; 15 16 import colorlog; 17 18 import my.from_; 19 import my.fswatch : FdPoller, PollStatus, PollEvent, PollResult, FdPoll; 20 import my.named_type; 21 import my.path; 22 23 import distssh.config; 24 import distssh.metric; 25 import distssh.types; 26 27 struct ExecuteOnHostConf { 28 NamedType!(string, Tag!"Workdir") workDir; 29 NamedType!(string[], Tag!"Command", null, Lengthable) command; 30 NamedType!(string, Tag!"ImportEnvFile") importEnv; 31 NamedType!(bool, Tag!"CloneEnv") cloneEnv; 32 NamedType!(bool, Tag!"NoImportEnv") noImportEnv; 33 } 34 35 bool isInteractive() { 36 import my.tty; 37 38 return isStdoutInteractive && isStderrInteractive; 39 } 40 41 /** Execute a command on a remote host. 42 * 43 * #SPC-automatic_env_import 44 */ 45 int executeOnHost(const ExecuteOnHostConf conf, Host host) nothrow { 46 import core.thread : Thread; 47 import core.time : dur, MonoTime; 48 import std.file : thisExePath; 49 import std.format : format; 50 import std.path : absolutePath; 51 import std.process : tryWait, Redirect, pipeProcess; 52 import std.stdio : stdin, stdout, stderr; 53 static import core.sys.posix.unistd; 54 55 import my.timer : makeInterval, makeTimers; 56 import my.tty : setCBreak, CBreak; 57 import distssh.protocol : ProtocolEnv, ConfDone, Command, Workdir, Key, TerminalCapability; 58 import distssh.connection : sshCmdArgs, setupMultiplexDir; 59 60 try { 61 const isInteractive_ = isInteractive; 62 63 CBreak consoleChange; 64 scope (exit) 65 () { 66 if (isInteractive_) { 67 consoleChange.reset(stdin.fileno); 68 consoleChange.reset(stdout.fileno); 69 consoleChange.reset(stderr.fileno); 70 } 71 }(); 72 73 auto args = () { 74 auto a = ["localrun", "--stdin-msgpack-env"]; 75 if (isInteractive_) { 76 a ~= "--pseudo-terminal"; 77 } 78 return sshCmdArgs(host, a).toArgs; 79 }(); 80 81 logger.tracef("Connecting to %s. Run %s", host, args.joiner(" ")); 82 83 auto p = pipeProcess(args, Redirect.stdin); 84 85 auto pwriter = PipeWriter(p.stdin); 86 87 ProtocolEnv env = () { 88 if (conf.cloneEnv) 89 return cloneEnv; 90 else if (!conf.noImportEnv) 91 return readEnv(conf.importEnv.get.Path); 92 return ProtocolEnv.init; 93 }(); 94 pwriter.pack(env); 95 pwriter.pack(Command(conf.command.get.dup)); 96 pwriter.pack(Workdir(conf.workDir.get)); 97 if (isInteractive_) { 98 import core.sys.posix.termios; 99 100 termios mode; 101 if (tcgetattr(1, &mode) == 0) { 102 pwriter.pack(TerminalCapability(mode)); 103 } 104 } 105 pwriter.pack!ConfDone; 106 107 FdPoller poller; 108 poller.put(FdPoll(stdin.fileno), [PollEvent.in_]); 109 110 if (isInteractive_) { 111 consoleChange = setCBreak(stdin.fileno); 112 } 113 114 auto timers = makeTimers; 115 makeInterval(timers, () @trusted { 116 HeartBeatMonitor.ping(pwriter); 117 return 250.dur!"msecs"; 118 }, 50.dur!"msecs"); 119 120 // send stdin to the other side 121 ubyte[4096] stdinBuf; 122 makeInterval(timers, () @trusted { 123 auto res = poller.wait(10.dur!"msecs"); 124 if (!res.empty && res[0].status[PollStatus.in_]) { 125 auto len = core.sys.posix.unistd.read(stdin.fileno, stdinBuf.ptr, stdinBuf.length); 126 if (len > 0) { 127 pwriter.pack(Key(stdinBuf[0 .. len])); 128 } 129 130 return 1.dur!"msecs"; 131 } 132 133 // slower if not much is happening 134 return 100.dur!"msecs"; 135 }, 25.dur!"msecs"); 136 137 // dummy event to force the timer to return after 50ms 138 makeInterval(timers, () @safe { return 50.dur!"msecs"; }, 50.dur!"msecs"); 139 140 while (true) { 141 try { 142 auto st = p.pid.tryWait; 143 if (st.terminated) 144 return st.status; 145 146 timers.tick(50.dur!"msecs"); 147 } catch (Exception e) { 148 } 149 } 150 } catch (Exception e) { 151 logger.error(e.msg).collectException; 152 return 1; 153 } 154 } 155 156 struct PipeReader { 157 import distssh.protocol : Deserialize; 158 159 private { 160 FdPoller poller; 161 ubyte[4096] buf; 162 int fd; 163 } 164 165 Deserialize deser; 166 167 alias deser this; 168 169 this(int fd) { 170 this.fd = fd; 171 poller.put(FdPoll(fd), [PollEvent.in_]); 172 } 173 174 // Update the buffer with data from the pipe. 175 void update() nothrow { 176 try { 177 auto s = read; 178 if (!s.empty) 179 deser.put(s); 180 } catch (Exception e) { 181 } 182 } 183 184 /// The returned slice is to a local, static array. It must be used/copied 185 /// before next call to read. 186 private ubyte[] read() return { 187 static import core.sys.posix.unistd; 188 189 auto res = poller.wait(); 190 if (res.empty) { 191 return null; 192 } 193 194 if (!res[0].status[PollStatus.in_]) { 195 return null; 196 } 197 198 auto len = core.sys.posix.unistd.read(fd, buf.ptr, buf.length); 199 if (len > 0) 200 return buf[0 .. len]; 201 return null; 202 } 203 } 204 205 struct PipeWriter { 206 import distssh.protocol : Serialize; 207 208 File fout; 209 Serialize!(void delegate(const(ubyte)[]) @safe) ser; 210 211 alias ser this; 212 213 this(File f) { 214 this.fout = f; 215 this.ser = typeof(ser)(&this.put); 216 } 217 218 void put(const(ubyte)[] v) @safe { 219 fout.rawWrite(v); 220 fout.flush; 221 } 222 } 223 224 from.distssh.protocol.ProtocolEnv readEnv(Path filename) nothrow { 225 import distssh.protocol : ProtocolEnv, EnvVariable, Deserialize; 226 import std.file : exists; 227 import std.stdio : File; 228 import sumtype; 229 import distssh.protocol; 230 231 ProtocolEnv rval; 232 233 if (!exists(filename.toString)) { 234 logger.trace("File to import the environment from do not exist: ", 235 filename).collectException; 236 return rval; 237 } 238 239 try { 240 auto fin = File(filename); 241 Deserialize deser; 242 243 ubyte[4096] buf; 244 while (!fin.eof) { 245 auto read_ = fin.rawRead(buf[]); 246 deser.put(read_); 247 } 248 249 deser.unpack.match!((None x) {}, (ConfDone x) {}, (ProtocolEnv x) { 250 rval = x; 251 }, (HeartBeat x) {}, (Command x) {}, (Workdir x) {}, (Key x) {}, (TerminalCapability x) { 252 }); 253 } catch (Exception e) { 254 logger.error(e.msg).collectException; 255 logger.errorf("Unable to import environment from '%s'", filename).collectException; 256 } 257 258 return rval; 259 } 260 261 /** 262 * #SPC-env_export_filter 263 * 264 * Params: 265 * env = a null terminated array of C strings. 266 * 267 * Returns: a clone of the environment. 268 */ 269 auto cloneEnv() nothrow { 270 import std.process : environment; 271 import std..string : strip; 272 import distssh.protocol : ProtocolEnv, EnvVariable; 273 274 ProtocolEnv app; 275 276 try { 277 auto env = environment.toAA; 278 279 foreach (k; environment.get(globalEnvFilterKey, null) 280 .strip.splitter(';').map!(a => a.strip) 281 .filter!(a => a.length != 0)) { 282 if (env.remove(k)) { 283 logger.tracef("Removed '%s' from the exported environment", k); 284 } 285 } 286 287 foreach (const a; env.byKeyValue) { 288 app ~= EnvVariable(a.key, a.value); 289 } 290 } catch (Exception e) { 291 logger.warning(e.msg).collectException; 292 } 293 294 return app; 295 } 296 297 /// Monitor the heartbeats from the client. 298 struct HeartBeatMonitor { 299 import std.datetime.stopwatch : StopWatch; 300 301 private { 302 Duration timeout; 303 StopWatch sw; 304 } 305 306 this(Duration timeout) { 307 this.timeout = timeout; 308 sw.start; 309 } 310 311 /// A heartbeat has been received. 312 void beat() { 313 sw.reset; 314 sw.start; 315 } 316 317 bool isTimeout() { 318 if (sw.peek > timeout) { 319 return true; 320 } 321 return false; 322 } 323 324 static void ping(ref PipeWriter f) { 325 import distssh.protocol : HeartBeat; 326 327 f.pack!HeartBeat; 328 } 329 } 330 331 /// Update the client beat in a separate thread, slowely, to keep the daemon 332 /// running if the client is executing a long running job. 333 struct BackgroundClientBeat { 334 import std.concurrency : send, spawn, receiveTimeout, Tid; 335 336 private { 337 bool isRunning; 338 Tid bg; 339 340 enum Msg { 341 stop, 342 } 343 } 344 345 this(AbsolutePath dbPath) { 346 bg = spawn(&tick, dbPath); 347 isRunning = true; 348 } 349 350 ~this() @trusted { 351 if (!isRunning) 352 return; 353 354 isRunning = false; 355 send(bg, Msg.stop); 356 } 357 358 private static void tick(AbsolutePath dbPath) nothrow { 359 import core.time : dur; 360 import distssh.database; 361 362 const tickInterval = 10.dur!"minutes"; 363 364 bool running = true; 365 while (running) { 366 try { 367 receiveTimeout(tickInterval, (Msg x) { running = false; }); 368 } catch (Exception e) { 369 running = false; 370 } 371 372 try { 373 auto db = openDatabase(dbPath); 374 db.clientBeat; 375 } catch (Exception e) { 376 } 377 } 378 } 379 } 380 381 /// Returns: the switches to use to execute the shell. 382 string[] shellSwitch(string shell) { 383 import std.path : baseName; 384 385 if (shell.baseName == "bash") 386 return ["--noprofile", "--norc", "-c"]; 387 return ["-c"]; 388 }