1 /** 2 Copyright: Copyright (c) 2018, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 Methods prefixed with `cli_` are strongly related to user commands. 7 They more or less fully implement a command line interface command. 8 */ 9 module distssh.main_; 10 11 import core.time : Duration; 12 import logger = std.experimental.logger; 13 import std.algorithm : splitter, map, filter, joiner; 14 import std.array : empty; 15 import std.exception : collectException; 16 import std.stdio : File; 17 import std.typecons : Nullable, NullableRef; 18 19 import colorlog; 20 21 import my.from_; 22 import my.path; 23 24 import distssh.config; 25 import distssh.metric; 26 import distssh.types; 27 import distssh.utility; 28 29 static import std.getopt; 30 31 import unit_threaded.attrs : Serial; 32 33 version (unittest) { 34 import unit_threaded.assertions; 35 } 36 37 int rmain(string[] args) { 38 import distssh.daemon; 39 static import distssh.purge; 40 41 confLogger(VerboseMode.info); 42 43 auto conf = parseUserArgs(args); 44 45 if (conf.global.helpInfo.helpWanted) { 46 return cli(conf); 47 } 48 49 confLogger(conf.global.verbosity); 50 logger.trace(conf); 51 52 import core.stdc.signal : signal; 53 import std.file : symlink; 54 import std.stdio : writeln; 55 import std.variant : visit; 56 57 // register a handler for writing to a closed pipe in case it ever happens. 58 // the handler ignores it. 59 signal(13, &handleSIGPIPE); 60 61 // dfmt off 62 return conf.data.visit!( 63 (Config.Help a) => cli(conf), 64 (Config.Shell a) => cli(conf, a), 65 (Config.Cmd a) => cli(conf, a), 66 (Config.LocalRun a) => cli(conf, a), 67 (Config.Install a) => cli(conf, a, (string src, string dst) => symlink(src, dst)), 68 (Config.MeasureHosts a) => cli(conf, a), 69 (Config.LocalLoad a) => cli(a, (string s) => writeln(s)), 70 (Config.RunOnAll a) => cli(conf, a), 71 (Config.LocalShell a) => cli(conf, a), 72 (Config.Env a) => cli(conf, a), 73 (Config.Daemon a) => distssh.daemon.cli(conf, a), 74 (Config.Purge a) => distssh.purge.cli(conf, a), 75 (Config.LocalPurge a) => distssh.purge.cli(conf, a), 76 ); 77 // dfmt on 78 } 79 80 private: 81 82 /// dummy used to ignore SIGPIPE 83 extern (C) void handleSIGPIPE(int sig) nothrow @nogc @system { 84 } 85 86 int cli(Config conf) { 87 conf.printHelp; 88 return 0; 89 } 90 91 int cli(const Config fconf, Config.Shell conf) { 92 import std.datetime.stopwatch : StopWatch, AutoStart; 93 import std.process : spawnProcess, wait; 94 import std.stdio : writeln, writefln; 95 import distssh.connection : sshShellArgs; 96 97 auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster).bestSelectRange; 98 99 if (hosts.empty) { 100 logger.errorf("No remote host online").collectException; 101 } 102 103 auto bgBeat = BackgroundClientBeat(AbsolutePath(fconf.global.dbPath)); 104 105 const timout_until_considered_successfull_connection = fconf.global.timeout * 2; 106 107 foreach (host; hosts) { 108 try { 109 writeln("Connecting to ", host); 110 111 auto sw = StopWatch(AutoStart.yes); 112 113 auto exit_status = spawnProcess(sshShellArgs(host, 114 fconf.global.workDir.get.Path).toArgs).wait; 115 116 // #SPC-fallback_remote_host 117 if (exit_status == 0 || sw.peek > timout_until_considered_successfull_connection) { 118 writefln("Connection to %s closed.", host); 119 return exit_status; 120 } else { 121 logger.warningf("Connection failed to %s. Falling back on next available host", 122 host); 123 } 124 } catch (Exception e) { 125 logger.error(e.msg).collectException; 126 } 127 } 128 129 logger.error("No remote host online").collectException; 130 return 1; 131 } 132 133 // #SPC-fast_env_startup 134 int cli(Config fconf, Config.Cmd conf) { 135 import std.stdio : stdout; 136 137 if (fconf.global.command.empty) { 138 logger.error("No command specified"); 139 logger.error("Specify by adding -- <my command>"); 140 return 1; 141 } 142 143 auto bgBeat = BackgroundClientBeat(AbsolutePath(fconf.global.dbPath)); 144 145 foreach (host; RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster).bestSelectRange) { 146 logger.info("Connecting to ", host); 147 // to ensure connecting to is at the top of e.g. logfiles when the user run 148 // distcmd env > log.txt 149 stdout.flush; 150 return executeOnHost(ExecuteOnHostConf(fconf.global.workDir, fconf.global.command, 151 fconf.global.importEnv, fconf.global.cloneEnv, fconf.global.noImportEnv), host); 152 } 153 154 logger.error("No remote host online from the available ", 155 fconf.global.cluster).collectException; 156 return 1; 157 } 158 159 // #SPC-fast_env_startup 160 int cli(Config fconf, Config.LocalRun conf) { 161 import core.time : dur; 162 import std.datetime : Clock; 163 import std.file : exists; 164 import std.process : PConfig = Config, Redirect, userShell, thisProcessID; 165 import std.stdio : File, stdin, stdout, stderr; 166 import std.utf : toUTF8; 167 import proc; 168 import sumtype; 169 import my.timer : makeTimers, makeInterval; 170 import distssh.protocol; 171 172 static struct LocalRunConf { 173 import core.sys.posix.termios; 174 175 string[string] env; 176 string[] cmd; 177 Path workdir; 178 termios mode; 179 } 180 181 static string[string] readEnvFromStdin(ProtocolEnv src) { 182 string[string] rval; 183 foreach (kv; src) { 184 rval[kv.key] = kv.value; 185 } 186 return rval; 187 } 188 189 static string[string] readEnvFromFile(const Config fconf) { 190 string[string] rval; 191 foreach (kv; readEnv(fconf.global.importEnv.get.Path)) { 192 rval[kv.key] = kv.value; 193 } 194 return rval; 195 } 196 197 try { 198 auto pread = PipeReader(stdin.fileno); 199 200 auto localConf = () { 201 LocalRunConf conf; 202 conf.cmd = fconf.global.command.get; 203 conf.workdir = fconf.global.workDir.get.Path; 204 205 // guard against an error occuring when transfering the data such 206 // that ConfDone is trashed. 207 const timeout = Clock.currTime + 5.dur!"minutes"; 208 209 bool running = fconf.global.stdinMsgPackEnv.get; 210 while (running && Clock.currTime < timeout) { 211 pread.update; 212 pread.unpack().match!((None x) {}, (ConfDone x) { 213 running = false; 214 }, (ProtocolEnv x) { conf.env = readEnvFromStdin(x); }, (HeartBeat x) { 215 }, (Command x) { conf.cmd = x.value; }, (Workdir x) { 216 conf.workdir = x.value.Path; 217 }, (Key x) {}, (TerminalCapability x) { conf.mode = x.value; }); 218 } 219 220 if (!fconf.global.stdinMsgPackEnv) { 221 conf.env = readEnvFromFile(fconf); 222 } 223 224 return conf; 225 }(); 226 227 auto res = () { 228 if (conf.useFakeTerminal) { 229 import core.sys.posix.termios : tcsetattr, TCSAFLUSH; 230 231 auto p = ttyProcess([userShell] ~ shellSwitch(userShell) ~ [ 232 localConf.cmd.joiner(" ").toUTF8 233 ], localConf.env, PConfig.none, localConf.workdir).sandbox.scopeKill; 234 tcsetattr(p.stdin.file.fileno, TCSAFLUSH, &localConf.mode); 235 return p; 236 } 237 return pipeShell(localConf.cmd.joiner(" ").toUTF8, 238 Redirect.stdin | Redirect.stdout | Redirect.stderr, 239 localConf.env, PConfig.none, localConf.workdir).sandbox.scopeKill; 240 }(); 241 242 import core.sys.posix.unistd : getppid; 243 244 const parent_pid = getppid; 245 bool loop_running = true; 246 247 auto timers = makeTimers; 248 makeInterval(timers, () { 249 // detect ctrl+c on the client side 250 if (getppid != parent_pid) { 251 loop_running = false; 252 } 253 return 500.dur!"msecs"; 254 }, 50.dur!"msecs"); 255 256 ubyte[4096] buf; 257 bool pipeOutputToUser() @trusted { 258 // guard against an error occuring when writing to the users 259 // stdout. 260 const timeout = Clock.currTime + 5.dur!"minutes"; 261 262 bool hasWritten; 263 { 264 bool doFlush; 265 while (res.stdout.hasPendingData && Clock.currTime < timeout) { 266 auto r = res.stdout.read(buf[]); 267 stdout.rawWrite(r); 268 hasWritten = true; 269 doFlush = true; 270 } 271 if (doFlush) { 272 stdout.flush; 273 } 274 } 275 276 { 277 bool doFlush; 278 while (res.stderr.hasPendingData && Clock.currTime < timeout) { 279 auto r = res.stderr.read(buf[]); 280 stderr.rawWrite(r); 281 hasWritten = true; 282 doFlush = true; 283 } 284 if (doFlush) { 285 stderr.flush; 286 } 287 } 288 289 return hasWritten; 290 } 291 292 scope (exit) 293 pipeOutputToUser; 294 295 makeInterval(timers, () @safe { 296 if (pipeOutputToUser) { 297 return 10.dur!"msecs"; 298 } 299 // slower if not much is happening 300 return 100.dur!"msecs"; 301 }, 25.dur!"msecs"); 302 303 // a dummy event that ensure that it tick each 50 msec. 304 makeInterval(timers, () => 50.dur!"msecs", 50.dur!"msecs"); 305 306 int exit_status = 1; 307 308 auto wd = HeartBeatMonitor(fconf.global.timeout * 2); 309 310 while (!wd.isTimeout && loop_running) { 311 pread.update; 312 313 try { 314 if (res.tryWait) { 315 exit_status = res.status; 316 loop_running = false; 317 } 318 319 pread.unpack().match!((None x) {}, (ConfDone x) {}, (ProtocolEnv x) { 320 }, (HeartBeat x) { wd.beat; }, (Command x) {}, (Workdir x) {}, (Key x) { 321 auto data = x.value; 322 while (!data.empty) { 323 auto written = res.stdin.write(x.value); 324 if (written.empty) { 325 // closed so break. 326 break; 327 } 328 data = data[written.length .. $]; 329 } 330 }, (TerminalCapability x) {}); 331 } catch (Exception e) { 332 } 333 334 timers.tick(25.dur!"msecs"); 335 } 336 337 return exit_status; 338 } catch (Exception e) { 339 () @trusted { logger.trace(e).collectException; }(); 340 logger.error(e.msg).collectException; 341 } 342 343 return 1; 344 } 345 346 int cli(const Config fconf, Config.Install conf, void delegate(string src, string dst) symlink) nothrow { 347 import std.path : buildPath; 348 import std.file : exists, remove; 349 350 void replace(string src, string dst) { 351 if (exists(dst)) 352 remove(dst); 353 symlink(src, dst); 354 } 355 356 try { 357 replace(fconf.global.selfBinary.get, buildPath(fconf.global.selfDir.get, distShell)); 358 replace(fconf.global.selfBinary.get, buildPath(fconf.global.selfDir.get, distCmd)); 359 return 0; 360 } catch (Exception e) { 361 logger.error(e.msg).collectException; 362 } 363 364 return 1; 365 } 366 367 // #SPC-measure_remote_hosts 368 int cli(const Config fconf, Config.MeasureHosts conf) nothrow { 369 import std.algorithm : sort; 370 import std.conv : to; 371 import std.stdio : writefln, writeln; 372 import distssh.table; 373 import distssh.connection; 374 375 writeln("Host is overloaded if Load is >1").collectException; 376 377 auto tbl = Table!5(["Host", "Load", "Access Time", "Updated", "Multiplex"]); 378 void addRow(HostLoad a) nothrow { 379 static string toInternal(Duration d) { 380 import std.format : format; 381 382 int seconds; 383 short msecs; 384 d.split!("seconds", "msecs")(seconds, msecs); 385 if (seconds == 0) 386 return format("%sms", msecs); 387 else 388 return format("%ss %sms", seconds, msecs); 389 } 390 391 string[5] row; 392 try { 393 row[0] = a.host; 394 row[1] = a.load.loadAvg.to!string; 395 row[2] = toInternal(a.load.accessTime); 396 row[3] = a.updated.toLocalTime.to!string; 397 row[4] = makeMaster(a.host).isAlive ? "yes" : "no"; 398 tbl.put(row); 399 } catch (Exception e) { 400 logger.trace(e.msg).collectException; 401 } 402 } 403 404 auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster); 405 406 foreach (a; hosts.onlineRange.sort!((a, b) => a.load < b.load)) { 407 addRow(a); 408 } 409 410 auto unused = hosts.unusedRange; 411 if (!unused.empty) { 412 tbl.put(["-", "-", "-", "-", "-"]).collectException; 413 foreach (a; unused) { 414 addRow(a); 415 } 416 } 417 418 try { 419 writeln(tbl); 420 } catch (Exception e) { 421 logger.error(e.msg).collectException; 422 } 423 424 return 0; 425 } 426 427 /** Print the load of localhost. 428 * 429 * #SPC-measure_local_load 430 */ 431 int cli(WriterT)(Config.LocalLoad conf, scope WriterT writer) { 432 import std.ascii : newline; 433 import std.conv : to; 434 import std.parallelism : totalCPUs; 435 import distssh.libc : getloadavg; 436 437 try { 438 double[3] loadavg; 439 int samples = getloadavg(&loadavg[0], 3); 440 441 if (samples == -1 || samples == 0) 442 loadavg[0] = totalCPUs > 0 ? totalCPUs : 1; 443 444 double cores = totalCPUs; 445 446 // make sure the loadavg is on a new line because the last line parsed is expected to contain the loadavg. 447 writer(newline); 448 449 if (cores > 0) 450 writer((loadavg[0] / cores).to!string); 451 else 452 writer(loadavg[0].to!string); 453 } catch (Exception e) { 454 logger.trace(e.msg).collectException; 455 return -1; 456 } 457 458 return 0; 459 } 460 461 int cli(Config fconf, Config.RunOnAll conf) nothrow { 462 import std.algorithm : sort; 463 import std.stdio : writefln, writeln, stdout; 464 465 auto hosts = RemoteHostCache.make(fconf.global.dbPath, fconf.global.cluster); 466 467 writefln("Hosts (%s): %(%s|%)", globalEnvHostKey, hosts.allRange.map!(a => a.host)) 468 .collectException; 469 470 bool exit_status = true; 471 foreach (a; hosts.allRange 472 .filter!(a => !a.load.unknown) 473 .map!(a => a.host)) { 474 stdout.writefln("Connecting to %s", a).collectException; 475 try { 476 // #SPC-flush_buffers 477 stdout.flush; 478 } catch (Exception e) { 479 } 480 481 auto status = executeOnHost(ExecuteOnHostConf(fconf.global.workDir, fconf.global.command, 482 fconf.global.importEnv, fconf.global.cloneEnv, fconf.global.noImportEnv), a); 483 484 if (status != 0) { 485 writeln("Failed, error code: ", status).collectException; 486 exit_status = false; 487 } 488 489 stdout.writefln("Connection to %s closed.", a).collectException; 490 } 491 492 return exit_status ? 0 : 1; 493 } 494 495 // #SPC-shell_current_dir 496 int cli(const Config fconf, Config.LocalShell conf) { 497 import std.file : exists; 498 import std.process : spawnProcess, wait, userShell, Config, Pid; 499 500 try { 501 auto pid = () { 502 if (exists(fconf.global.workDir.get)) 503 return spawnProcess([userShell], null, Config.none, fconf.global.workDir.get); 504 return spawnProcess([userShell]); 505 }(); 506 return pid.wait; 507 } catch (Exception e) { 508 logger.error(e.msg).collectException; 509 return 1; 510 } 511 } 512 513 // #SPC-modify_env 514 int cli(const Config fconf, Config.Env conf) { 515 import std.algorithm : map, filter; 516 import std.array : assocArray, array; 517 import std.stdio : writeln, writefln; 518 import std..string : split; 519 import std.typecons : tuple; 520 import distssh.protocol : ProtocolEnv, EnvVariable; 521 522 if (conf.exportEnv) { 523 try { 524 writeEnv(fconf.global.importEnv.get, cloneEnv); 525 logger.info("Exported environment to ", fconf.global.importEnv.get); 526 } catch (Exception e) { 527 logger.error(e.msg).collectException; 528 return 1; 529 } 530 531 return 0; 532 } 533 534 string[string] set_envs; 535 try { 536 set_envs = conf.envSet 537 .map!(a => a.split('=')) 538 .filter!(a => !a.empty) 539 .map!(a => tuple(a[0], a[1])) 540 .assocArray; 541 } catch (Exception e) { 542 writeln("Unable to parse supplied envs to modify: ", e.msg).collectException; 543 return 1; 544 } 545 546 try { 547 auto env = readEnv(fconf.global.importEnv.get.Path).map!(a => tuple(a.key, 548 a.value)).assocArray; 549 550 foreach (k; conf.envDel.filter!(a => a in env)) { 551 writeln("Removing ", k); 552 env.remove(k); 553 } 554 555 foreach (kv; set_envs.byKeyValue) { 556 if (kv.key in env) 557 writefln("Setting %s=%s", kv.key, kv.value); 558 else 559 writefln("Adding %s=%s", kv.key, kv.value); 560 env[kv.key] = kv.value; 561 } 562 563 if (conf.print) { 564 foreach (const kv; env.byKeyValue) 565 writefln(`%s="%s"`, kv.key, kv.value); 566 } 567 568 writeEnv(fconf.global.importEnv.get, 569 ProtocolEnv(env.byKeyValue.map!(a => EnvVariable(a.key, a.value)).array)); 570 } catch (Exception e) { 571 logger.error(e.msg).collectException; 572 return 1; 573 } 574 575 return 0; 576 } 577 578 @("shall export the environment") 579 @Serial unittest { 580 import std.conv : to; 581 import std.file; 582 import std.process : environment; 583 import std.variant : tryVisit; 584 585 // arrange 586 immutable remove_me = "remove_me.export".Path; 587 scope (exit) 588 remove(remove_me); 589 590 auto opts = parseUserArgs(["distssh", "env", "-e", "--env-file", remove_me]); 591 auto envConf = opts.data.tryVisit!((Config.Env a) => a, () => Config.Env.init); 592 593 const env_key = "DISTSSH_ENV_TEST"; 594 environment[env_key] = env_key ~ remove_me; 595 scope (exit) 596 environment.remove(env_key); 597 598 // shall export the environment to the file 599 void verify1() { 600 // test normal export 601 cli(opts, envConf); 602 auto env = readEnv(remove_me); 603 assert(!env.filter!(a => a.key == env_key).empty, env.to!string); 604 } 605 606 verify1; 607 608 // shall filter out specified env before exporting to the file 609 environment[globalEnvFilterKey] = "DISTSSH_ENV_TEST;junk "; 610 scope (exit) 611 environment.remove(globalEnvFilterKey); 612 613 void verify2() { 614 cli(opts, envConf); 615 auto env = readEnv(remove_me); 616 assert(env.filter!(a => a.key == env_key).empty, env.to!string); 617 } 618 619 verify2; 620 } 621 622 @("shall create symlinks to self") 623 @Serial unittest { 624 string[2][] symlinks; 625 void fakeSymlink(string src, string dst) { 626 string[2] v = [src, dst]; 627 symlinks ~= v; 628 } 629 630 Config conf; 631 conf.global.selfBinary = typeof(conf.global.selfBinary)("/foo/src"); 632 conf.global.selfDir = typeof(conf.global.selfDir)("/bar"); 633 634 cli(conf, Config.Install.init, &fakeSymlink); 635 636 assert(symlinks[0] == ["/foo/src", "/bar/distshell"]); 637 assert(symlinks[1] == ["/foo/src", "/bar/distcmd"]); 638 } 639 640 @("shall modify the exported env by adding, removing and modifying") 641 @Serial unittest { 642 import std.array; 643 import std.file; 644 import std.process : environment; 645 import std.stdio; 646 import std.typecons : tuple; 647 import std.variant : tryVisit; 648 649 // arrange 650 immutable remove_me = "remove_me.export".Path; 651 scope (exit) 652 remove(remove_me); 653 654 environment["FOO_DEL"] = "del me"; 655 environment["FOO_MOD"] = "mod me"; 656 scope (exit) { 657 environment.remove("FOO_DEL"); 658 environment.remove("FOO_MOD"); 659 environment.remove("FOO_ADD"); 660 } 661 662 auto conf = parseUserArgs(["distssh", "env", "-e", "--env-file", remove_me]); 663 auto envConf = conf.data.tryVisit!((Config.Env a) => a, () => Config.Env.init); 664 cli(conf, envConf); 665 666 // act 667 conf = parseUserArgs([ 668 "distssh", "env", "-d", "FOO_DEL", "-s", "FOO_MOD=42", "--set", 669 "FOO_ADD=42", "--env-file", remove_me 670 ]); 671 envConf = conf.data.tryVisit!((Config.Env a) => a, () => Config.Env.init); 672 cli(conf, envConf); 673 674 // assert 675 auto env = readEnv(remove_me).map!(a => tuple(a.key, a.value)).assocArray; 676 assert(env["FOO_MOD"] == "42"); 677 assert(env["FOO_ADD"] == "42"); 678 assert("FOO_DEL" !in env); 679 } 680 681 @("shall print the load of the localhost") 682 @Serial unittest { 683 string load; 684 auto exit_status = cli(Config.LocalLoad.init, (string s) => load = s); 685 assert(exit_status == 0); 686 assert(load.length > 0, load); 687 } 688 689 void writeEnv(string filename, from.distssh.protocol.ProtocolEnv env) { 690 import core.sys.posix.sys.stat : fchmod, S_IRUSR, S_IWUSR; 691 import std.stdio : File; 692 import distssh.protocol : Serialize; 693 694 auto fout = File(filename, "w"); 695 fchmod(fout.fileno, S_IRUSR | S_IWUSR); 696 697 auto ser = Serialize!(void delegate(const(ubyte)[]) @safe)((const(ubyte)[] a) => fout.rawWrite( 698 a)); 699 700 ser.pack(env); 701 }