1 /** 2 Copyright: Copyright (c) 2018, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 Methods prefixed with `cli_` are strongly related to user commands. 7 They more or less fully implement a command line interface command. 8 */ 9 module distssh.main_; 10 11 import core.time : Duration; 12 import logger = std.experimental.logger; 13 import std.algorithm : splitter, map, filter, joiner; 14 import std.array : empty; 15 import std.exception : collectException; 16 import std.stdio : File; 17 import std.typecons : Nullable, NullableRef; 18 19 import colorlog; 20 21 import my.from_; 22 import my.path; 23 24 import distssh.config; 25 import distssh.metric; 26 import distssh.types; 27 import distssh.utility; 28 29 static import std.getopt; 30 31 import unit_threaded.attrs : Serial; 32 33 version (unittest) { 34 import unit_threaded.assertions; 35 } 36 37 int rmain(string[] args) { 38 import distssh.daemon; 39 static import distssh.purge; 40 41 confLogger(VerboseMode.info); 42 43 auto conf = parseUserArgs(args); 44 45 if (conf.global.helpInfo.helpWanted) { 46 return cli(conf); 47 } 48 49 confLogger(conf.global.verbosity); 50 logger.trace(conf); 51 52 import core.stdc.signal : signal; 53 import std.file : symlink; 54 import std.stdio : writeln; 55 import std.variant : visit; 56 57 // register a handler for writing to a closed pipe in case it ever happens. 58 // the handler ignores it. 59 signal(13, &handleSIGPIPE); 60 61 // dfmt off 62 return conf.data.visit!( 63 (Config.Help a) => cli(conf), 64 (Config.Shell a) => cli(conf, a), 65 (Config.Cmd a) => cli(conf, a), 66 (Config.LocalRun a) => cli(conf, a), 67 (Config.Install a) => cli(conf, a, (string src, string dst) => symlink(src, dst)), 68 (Config.MeasureHosts a) => cli(conf, a), 69 (Config.LocalLoad a) => cli(a, (string s) => writeln(s)), 70 (Config.RunOnAll a) => cli(conf, a), 71 (Config.LocalShell a) => cli(conf, a), 72 (Config.Env a) => cli(conf, a), 73 (Config.Daemon a) => distssh.daemon.cli(conf, a), 74 (Config.Purge a) => distssh.purge.cli(conf, a), 75 (Config.LocalPurge a) => distssh.purge.cli(conf, a), 76 ); 77 // dfmt on 78 } 79 80 private: 81 82 /// dummy used to ignore SIGPIPE 83 extern (C) void handleSIGPIPE(int sig) nothrow @nogc @system { 84 } 85 86 int cli(Config conf) { 87 conf.printHelp; 88 return 0; 89 } 90 91 int cli(const Config fconf, Config.Shell conf) { 92 import std.datetime.stopwatch : StopWatch, AutoStart; 93 import std.process : spawnProcess, wait; 94 import std.stdio : writeln, writefln; 95 import distssh.connection : sshShellArgs; 96 97 auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster).bestSelectRange; 98 99 if (hosts.empty) { 100 logger.errorf("No remote host online").collectException; 101 } 102 103 auto bgBeat = BackgroundClientBeat(AbsolutePath(fconf.global.dbPath)); 104 105 const timout_until_considered_successfull_connection = fconf.global.timeout * 2; 106 107 foreach (host; hosts) { 108 try { 109 writeln("Connecting to ", host); 110 111 auto sw = StopWatch(AutoStart.yes); 112 113 auto exit_status = spawnProcess(sshShellArgs(host, fconf.global.workDir.Path).toArgs) 114 .wait; 115 116 // #SPC-fallback_remote_host 117 if (exit_status == 0 || sw.peek > timout_until_considered_successfull_connection) { 118 writefln("Connection to %s closed.", host); 119 return exit_status; 120 } else { 121 logger.warningf("Connection failed to %s. Falling back on next available host", 122 host); 123 } 124 } catch (Exception e) { 125 logger.error(e.msg).collectException; 126 } 127 } 128 129 logger.error("No remote host online").collectException; 130 return 1; 131 } 132 133 // #SPC-fast_env_startup 134 int cli(const Config fconf, Config.Cmd conf) { 135 import std.stdio : stdout; 136 137 if (fconf.global.command.empty) { 138 logger.error("No command specified"); 139 logger.error("Specify by adding -- <my command>"); 140 return 1; 141 } 142 143 auto bgBeat = BackgroundClientBeat(AbsolutePath(fconf.global.dbPath)); 144 145 foreach (host; RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster).bestSelectRange) { 146 logger.info("Connecting to ", host); 147 // to ensure connecting to is at the top of e.g. logfiles when the user run 148 // distcmd env > log.txt 149 stdout.flush; 150 return executeOnHost(ExecuteOnHostConf(fconf.global.workDir, fconf.global.command.dup, 151 fconf.global.importEnv, fconf.global.cloneEnv, fconf.global.noImportEnv), host); 152 } 153 154 logger.error("No remote host online from the available ", 155 fconf.global.cluster).collectException; 156 return 1; 157 } 158 159 // #SPC-fast_env_startup 160 int cli(const Config fconf, Config.LocalRun conf) { 161 import core.time : dur; 162 import std.datetime : Clock; 163 import std.file : exists; 164 import std.process : PConfig = Config, Redirect, userShell, thisProcessID; 165 import std.stdio : File, stdin, stdout; 166 import std.utf : toUTF8; 167 import proc; 168 import sumtype; 169 import my.timer : makeTimers, makeInterval; 170 import distssh.protocol; 171 172 static struct LocalRunConf { 173 import core.sys.posix.termios; 174 175 string[string] env; 176 string[] cmd; 177 string workdir; 178 termios mode; 179 } 180 181 static string[string] readEnvFromStdin(ProtocolEnv src) { 182 string[string] rval; 183 foreach (kv; src) { 184 rval[kv.key] = kv.value; 185 } 186 return rval; 187 } 188 189 static string[string] readEnvFromFile(const Config fconf) { 190 string[string] rval; 191 foreach (kv; readEnv(fconf.global.importEnv)) { 192 rval[kv.key] = kv.value; 193 } 194 return rval; 195 } 196 197 try { 198 auto pread = PipeReader(stdin.fileno); 199 200 auto localConf = () { 201 LocalRunConf conf; 202 conf.cmd = fconf.global.command.dup; 203 conf.workdir = fconf.global.workDir; 204 205 // guard against an error occuring when transfering the data such 206 // that ConfDone is trashed. 207 const timeout = Clock.currTime + 5.dur!"minutes"; 208 209 bool running = fconf.global.stdinMsgPackEnv; 210 while (running && Clock.currTime < timeout) { 211 pread.update; 212 pread.unpack().match!((None x) {}, (ConfDone x) { 213 running = false; 214 }, (ProtocolEnv x) { conf.env = readEnvFromStdin(x); }, (HeartBeat x) { 215 }, (Command x) { conf.cmd = x.value; }, (Workdir x) { 216 conf.workdir = x.value; 217 }, (Key x) {}, (TerminalCapability x) { conf.mode = x.value; }); 218 } 219 220 if (!fconf.global.stdinMsgPackEnv) { 221 conf.env = readEnvFromFile(fconf); 222 } 223 224 return conf; 225 }(); 226 227 auto res = () { 228 if (conf.useFakeTerminal) { 229 import core.sys.posix.termios : tcsetattr, TCSAFLUSH; 230 231 auto p = ttyProcess([userShell] ~ shellSwitch(userShell) ~ [ 232 localConf.cmd.joiner(" ").toUTF8 233 ], localConf.env, PConfig.none, localConf.workdir).sandbox.scopeKill; 234 tcsetattr(p.stdin.file.fileno, TCSAFLUSH, &localConf.mode); 235 return p; 236 } 237 return pipeShell(localConf.cmd.joiner(" ").toUTF8, 238 Redirect.stdin | Redirect.stdout | Redirect.stderrToStdout, 239 localConf.env, PConfig.none, localConf.workdir).sandbox.scopeKill; 240 }(); 241 242 import core.sys.posix.unistd : getppid; 243 244 const parent_pid = getppid; 245 bool loop_running = true; 246 247 auto timers = makeTimers; 248 makeInterval(timers, () { 249 // detect ctrl+c on the client side 250 if (getppid != parent_pid) { 251 loop_running = false; 252 } 253 return 500.dur!"msecs"; 254 }, 50.dur!"msecs"); 255 256 ubyte[4096] buf; 257 bool pipeOutputToUser() @trusted { 258 // guard against an error occuring when writing to the users 259 // stdout. 260 const timeout = Clock.currTime + 5.dur!"minutes"; 261 262 bool hasWritten; 263 while (res.stdout.hasPendingData && Clock.currTime < timeout) { 264 auto r = res.stdout.read(buf[]); 265 stdout.rawWrite(r); 266 hasWritten = true; 267 } 268 269 if (hasWritten) { 270 stdout.flush; 271 } 272 return hasWritten; 273 } 274 275 scope (exit) 276 pipeOutputToUser; 277 278 makeInterval(timers, () @safe { 279 if (pipeOutputToUser) { 280 return 10.dur!"msecs"; 281 } 282 // slower if not much is happening 283 return 100.dur!"msecs"; 284 }, 25.dur!"msecs"); 285 286 // a dummy event that ensure that it tick each 50 msec. 287 makeInterval(timers, () => 50.dur!"msecs", 50.dur!"msecs"); 288 289 int exit_status = 1; 290 291 auto wd = HeartBeatMonitor(fconf.global.timeout * 2); 292 293 while (!wd.isTimeout && loop_running) { 294 pread.update; 295 296 try { 297 if (res.tryWait) { 298 exit_status = res.status; 299 loop_running = false; 300 } 301 302 pread.unpack().match!((None x) {}, (ConfDone x) {}, (ProtocolEnv x) { 303 }, (HeartBeat x) { wd.beat; }, (Command x) {}, (Workdir x) {}, (Key x) { 304 auto data = x.value; 305 while (!data.empty) { 306 auto written = res.stdin.write(x.value); 307 if (written.empty) { 308 // closed so break. 309 break; 310 } 311 data = data[written.length .. $]; 312 } 313 }, (TerminalCapability x) {}); 314 } catch (Exception e) { 315 } 316 317 timers.tick(25.dur!"msecs"); 318 } 319 320 return exit_status; 321 } catch (Exception e) { 322 () @trusted { logger.trace(e).collectException; }(); 323 logger.error(e.msg).collectException; 324 } 325 326 return 1; 327 } 328 329 int cli(const Config fconf, Config.Install conf, void delegate(string src, string dst) symlink) nothrow { 330 import std.path : buildPath; 331 import std.file : exists, remove; 332 333 void replace(string src, string dst) { 334 if (exists(dst)) 335 remove(dst); 336 symlink(src, dst); 337 } 338 339 try { 340 replace(fconf.global.selfBinary, buildPath(fconf.global.selfDir, distShell)); 341 replace(fconf.global.selfBinary, buildPath(fconf.global.selfDir, distCmd)); 342 return 0; 343 } catch (Exception e) { 344 logger.error(e.msg).collectException; 345 } 346 347 return 1; 348 } 349 350 // #SPC-measure_remote_hosts 351 int cli(const Config fconf, Config.MeasureHosts conf) nothrow { 352 import std.algorithm : sort; 353 import std.conv : to; 354 import std.stdio : writefln, writeln; 355 import distssh.table; 356 import distssh.connection; 357 358 writeln("Host is overloaded if Load is >1").collectException; 359 360 auto tbl = Table!5(["Host", "Load", "Access Time", "Updated", "Multiplex"]); 361 void addRow(HostLoad a) nothrow { 362 static string toInternal(Duration d) { 363 import std.format : format; 364 365 int seconds; 366 short msecs; 367 d.split!("seconds", "msecs")(seconds, msecs); 368 if (seconds == 0) 369 return format("%sms", msecs); 370 else 371 return format("%ss %sms", seconds, msecs); 372 } 373 374 string[5] row; 375 try { 376 row[0] = a.host; 377 row[1] = a.load.loadAvg.to!string; 378 row[2] = toInternal(a.load.accessTime); 379 row[3] = a.updated.toLocalTime.to!string; 380 row[4] = makeMaster(a.host).isAlive ? "yes" : "no"; 381 tbl.put(row); 382 } catch (Exception e) { 383 logger.trace(e.msg).collectException; 384 } 385 } 386 387 auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster); 388 389 foreach (a; hosts.onlineRange.sort!((a, b) => a.load < b.load)) { 390 addRow(a); 391 } 392 393 auto unused = hosts.unusedRange; 394 if (!unused.empty) { 395 tbl.put(["-", "-", "-", "-", "-"]).collectException; 396 foreach (a; unused) { 397 addRow(a); 398 } 399 } 400 401 try { 402 writeln(tbl); 403 } catch (Exception e) { 404 logger.error(e.msg).collectException; 405 } 406 407 return 0; 408 } 409 410 /** Print the load of localhost. 411 * 412 * #SPC-measure_local_load 413 */ 414 int cli(WriterT)(Config.LocalLoad conf, scope WriterT writer) { 415 import std.ascii : newline; 416 import std.conv : to; 417 import std.parallelism : totalCPUs; 418 import distssh.libc : getloadavg; 419 420 try { 421 double[3] loadavg; 422 int samples = getloadavg(&loadavg[0], 3); 423 424 if (samples == -1 || samples == 0) 425 loadavg[0] = totalCPUs > 0 ? totalCPUs : 1; 426 427 double cores = totalCPUs; 428 429 // make sure the loadavg is on a new line because the last line parsed is expected to contain the loadavg. 430 writer(newline); 431 432 if (cores > 0) 433 writer((loadavg[0] / cores).to!string); 434 else 435 writer(loadavg[0].to!string); 436 } catch (Exception e) { 437 logger.trace(e.msg).collectException; 438 return -1; 439 } 440 441 return 0; 442 } 443 444 int cli(const Config fconf, Config.RunOnAll conf) nothrow { 445 import std.algorithm : sort; 446 import std.stdio : writefln, writeln, stdout; 447 448 auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster); 449 450 writefln("Hosts (%s): %(%s|%)", globalEnvHostKey, hosts.allRange.map!(a => a.host)) 451 .collectException; 452 453 bool exit_status = true; 454 foreach (a; hosts.allRange 455 .filter!(a => !a.load.unknown) 456 .map!(a => a.host)) { 457 stdout.writefln("Connecting to %s", a).collectException; 458 try { 459 // #SPC-flush_buffers 460 stdout.flush; 461 } catch (Exception e) { 462 } 463 464 auto status = executeOnHost(ExecuteOnHostConf(fconf.global.workDir, 465 fconf.global.command.dup, fconf.global.importEnv, 466 fconf.global.cloneEnv, fconf.global.noImportEnv), a); 467 468 if (status != 0) { 469 writeln("Failed, error code: ", status).collectException; 470 exit_status = false; 471 } 472 473 stdout.writefln("Connection to %s closed.", a).collectException; 474 } 475 476 return exit_status ? 0 : 1; 477 } 478 479 // #SPC-shell_current_dir 480 int cli(const Config fconf, Config.LocalShell conf) { 481 import std.file : exists; 482 import std.process : spawnProcess, wait, userShell, Config, Pid; 483 484 try { 485 auto pid = () { 486 if (exists(fconf.global.workDir)) 487 return spawnProcess([userShell], null, Config.none, fconf.global.workDir); 488 return spawnProcess([userShell]); 489 }(); 490 return pid.wait; 491 } catch (Exception e) { 492 logger.error(e.msg).collectException; 493 return 1; 494 } 495 } 496 497 // #SPC-modify_env 498 int cli(const Config fconf, Config.Env conf) { 499 import std.algorithm : map, filter; 500 import std.array : assocArray, array; 501 import std.path : absolutePath; 502 import std.stdio : writeln, writefln; 503 import std..string : split; 504 import std.typecons : tuple; 505 import distssh.protocol : ProtocolEnv, EnvVariable; 506 507 if (conf.exportEnv) { 508 try { 509 writeEnv(fconf.global.importEnv, cloneEnv); 510 logger.info("Exported environment to ", fconf.global.importEnv); 511 } catch (Exception e) { 512 logger.error(e.msg).collectException; 513 return 1; 514 } 515 516 return 0; 517 } 518 519 string[string] set_envs; 520 try { 521 set_envs = conf.envSet 522 .map!(a => a.split('=')) 523 .filter!(a => !a.empty) 524 .map!(a => tuple(a[0], a[1])) 525 .assocArray; 526 } catch (Exception e) { 527 writeln("Unable to parse supplied envs to modify: ", e.msg).collectException; 528 return 1; 529 } 530 531 try { 532 auto env = readEnv(fconf.global.importEnv.absolutePath).map!(a => tuple(a.key, 533 a.value)).assocArray; 534 535 foreach (k; conf.envDel.filter!(a => a in env)) { 536 writeln("Removing ", k); 537 env.remove(k); 538 } 539 540 foreach (kv; set_envs.byKeyValue) { 541 if (kv.key in env) 542 writefln("Setting %s=%s", kv.key, kv.value); 543 else 544 writefln("Adding %s=%s", kv.key, kv.value); 545 env[kv.key] = kv.value; 546 } 547 548 if (conf.print) { 549 foreach (const kv; env.byKeyValue) 550 writefln(`%s="%s"`, kv.key, kv.value); 551 } 552 553 writeEnv(fconf.global.importEnv, 554 ProtocolEnv(env.byKeyValue.map!(a => EnvVariable(a.key, a.value)).array)); 555 } catch (Exception e) { 556 logger.error(e.msg).collectException; 557 return 1; 558 } 559 560 return 0; 561 } 562 563 @("shall export the environment") 564 @Serial unittest { 565 import std.conv : to; 566 import std.file; 567 import std.process : environment; 568 import std.variant : tryVisit; 569 570 // arrange 571 immutable remove_me = "remove_me.export"; 572 scope (exit) 573 remove(remove_me); 574 575 auto opts = parseUserArgs(["distssh", "env", "-e", "--env-file", remove_me]); 576 auto envConf = opts.data.tryVisit!((Config.Env a) => a, () => Config.Env.init); 577 578 const env_key = "DISTSSH_ENV_TEST"; 579 environment[env_key] = env_key ~ remove_me; 580 scope (exit) 581 environment.remove(env_key); 582 583 // shall export the environment to the file 584 void verify1() { 585 // test normal export 586 cli(opts, envConf); 587 auto env = readEnv(remove_me); 588 assert(!env.filter!(a => a.key == env_key).empty, env.to!string); 589 } 590 591 verify1; 592 593 // shall filter out specified env before exporting to the file 594 environment[globalEnvFilterKey] = "DISTSSH_ENV_TEST;junk "; 595 scope (exit) 596 environment.remove(globalEnvFilterKey); 597 598 void verify2() { 599 cli(opts, envConf); 600 auto env = readEnv(remove_me); 601 assert(env.filter!(a => a.key == env_key).empty, env.to!string); 602 } 603 604 verify2; 605 } 606 607 @("shall create symlinks to self") 608 @Serial unittest { 609 string[2][] symlinks; 610 void fakeSymlink(string src, string dst) { 611 string[2] v = [src, dst]; 612 symlinks ~= v; 613 } 614 615 Config conf; 616 conf.global.selfBinary = "/foo/src"; 617 conf.global.selfDir = "/bar"; 618 619 cli(conf, Config.Install.init, &fakeSymlink); 620 621 assert(symlinks[0] == ["/foo/src", "/bar/distshell"]); 622 assert(symlinks[1] == ["/foo/src", "/bar/distcmd"]); 623 } 624 625 @("shall modify the exported env by adding, removing and modifying") 626 @Serial unittest { 627 import std.array; 628 import std.file; 629 import std.process : environment; 630 import std.stdio; 631 import std.typecons : tuple; 632 import std.variant : tryVisit; 633 634 // arrange 635 immutable remove_me = "remove_me.export"; 636 scope (exit) 637 remove(remove_me); 638 639 environment["FOO_DEL"] = "del me"; 640 environment["FOO_MOD"] = "mod me"; 641 scope (exit) { 642 environment.remove("FOO_DEL"); 643 environment.remove("FOO_MOD"); 644 environment.remove("FOO_ADD"); 645 } 646 647 auto conf = parseUserArgs(["distssh", "env", "-e", "--env-file", remove_me]); 648 auto envConf = conf.data.tryVisit!((Config.Env a) => a, () => Config.Env.init); 649 cli(conf, envConf); 650 651 // act 652 conf = parseUserArgs([ 653 "distssh", "env", "-d", "FOO_DEL", "-s", "FOO_MOD=42", "--set", 654 "FOO_ADD=42", "--env-file", remove_me 655 ]); 656 envConf = conf.data.tryVisit!((Config.Env a) => a, () => Config.Env.init); 657 cli(conf, envConf); 658 659 // assert 660 auto env = readEnv(remove_me).map!(a => tuple(a.key, a.value)).assocArray; 661 assert(env["FOO_MOD"] == "42"); 662 assert(env["FOO_ADD"] == "42"); 663 assert("FOO_DEL" !in env); 664 } 665 666 @("shall print the load of the localhost") 667 @Serial unittest { 668 string load; 669 auto exit_status = cli(Config.LocalLoad.init, (string s) => load = s); 670 assert(exit_status == 0); 671 assert(load.length > 0, load); 672 } 673 674 void writeEnv(string filename, from.distssh.protocol.ProtocolEnv env) { 675 import core.sys.posix.sys.stat : fchmod, S_IRUSR, S_IWUSR; 676 import std.stdio : File; 677 import distssh.protocol : Serialize; 678 679 auto fout = File(filename, "w"); 680 fchmod(fout.fileno, S_IRUSR | S_IWUSR); 681 682 auto ser = Serialize!(void delegate(const(ubyte)[]) @safe)((const(ubyte)[] a) => fout.rawWrite( 683 a)); 684 685 ser.pack(env); 686 }