1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module distssh.metric; 7 8 import logger = std.experimental.logger; 9 import std.algorithm : map, filter, splitter; 10 import std.datetime : Duration; 11 import std.exception : collectException; 12 import std.typecons : Nullable; 13 14 import distssh.types; 15 16 /** Login on host and measure its load. 17 * 18 * Params: 19 * h = remote host to check 20 * 21 * Returns: the Load of the remote host 22 */ 23 Load getLoad(Host h, Duration timeout) nothrow { 24 import std.conv : to; 25 import std.datetime.stopwatch : StopWatch, AutoStart; 26 import std.file : thisExePath; 27 import std.process : tryWait, pipeProcess, kill, wait, escapeShellFileName; 28 import std.range : takeOne, retro; 29 import std.stdio : writeln; 30 import core.time : dur; 31 import core.sys.posix.signal : SIGKILL; 32 import distssh.timer : makeTimers, makeInterval; 33 34 enum ExitCode { 35 none, 36 error, 37 timeout, 38 ok, 39 } 40 41 ExitCode exit_code; 42 43 Nullable!Load measure() { 44 auto sw = StopWatch(AutoStart.yes); 45 46 immutable abs_distssh = thisExePath; 47 auto res = pipeProcess(["ssh", "-q"] ~ sshNoLoginArgs ~ [ 48 h, abs_distssh.escapeShellFileName, "localload" 49 ]); 50 51 bool checkExitCode() @trusted { 52 auto st = res.pid.tryWait; 53 54 if (st.terminated && st.status == 0) { 55 exit_code = ExitCode.ok; 56 } else if (st.terminated && st.status != 0) { 57 exit_code = ExitCode.error; 58 } else if (sw.peek >= timeout) { 59 exit_code = ExitCode.timeout; 60 res.pid.kill(SIGKILL); 61 // must read the exit or a zombie process is left behind 62 res.pid.wait; 63 } 64 return exit_code == ExitCode.none; 65 } 66 67 // 25 because it is at the perception of human "lag" and less than the 100 68 // msecs that is the intention of the average delay. 69 auto timers = makeTimers; 70 makeInterval(timers, &checkExitCode, 25.dur!"msecs"); 71 72 while (!timers.empty) { 73 timers.tick(25.dur!"msecs"); 74 } 75 76 sw.stop; 77 78 Nullable!Load rval; 79 80 if (exit_code != ExitCode.ok) 81 return rval; 82 83 try { 84 string last_line; 85 foreach (a; res.stdout.byLineCopy) { 86 last_line = a; 87 } 88 89 rval = Load(last_line.to!double, sw.peek); 90 } catch (Exception e) { 91 logger.trace(res.stdout).collectException; 92 logger.trace(res.stderr).collectException; 93 logger.trace(e.msg).collectException; 94 } 95 96 return rval; 97 } 98 99 try { 100 auto r = measure(); 101 if (!r.isNull) 102 return r.get; 103 } catch (Exception e) { 104 logger.trace(e.msg).collectException; 105 } 106 107 return Load(int.max, 3600.dur!"seconds", true); 108 } 109 110 /** 111 * #SPC-load_balance 112 * #SPC-best_remote_host 113 * 114 * TODO: it can be empty. how to handle that? 115 */ 116 struct RemoteHostCache { 117 import std.array : array; 118 119 HostLoad[] remoteByLoad; 120 121 static auto make(string dbPath, const Host[] cluster) nothrow { 122 import distssh.daemon : startDaemon; 123 import distssh.database; 124 import std.algorithm : sort; 125 126 try { 127 auto db = openDatabase(dbPath); 128 startDaemon(db); 129 db.syncCluster(cluster); 130 auto servers = db.getServerLoads(cluster); 131 db.removeUnusedServers(servers.unused); 132 return RemoteHostCache(servers.online.sort!((a, b) => a[1] < b[1]).array); 133 } catch (Exception e) { 134 logger.error(e.msg).collectException; 135 } 136 return RemoteHostCache.init; 137 } 138 139 /// Returns: the lowest loaded server. 140 Host randomAndPop() @safe nothrow { 141 import std.range : take; 142 import std.random : randomSample; 143 144 assert(!empty, "should never happen"); 145 146 auto rval = remoteByLoad[0][0]; 147 148 try { 149 auto topX = remoteByLoad.filter!(a => !a[1].unknown).array; 150 if (topX.length == 0) { 151 rval = remoteByLoad[0][0]; 152 } else if (topX.length < 3) { 153 rval = topX[0][0]; 154 } else { 155 rval = topX.take(3).randomSample(1).front[0]; 156 } 157 } catch (Exception e) { 158 logger.trace(e.msg).collectException; 159 } 160 161 remoteByLoad = remoteByLoad.filter!(a => a[0] != rval).array; 162 163 return rval; 164 } 165 166 Host front() @safe pure nothrow { 167 assert(!empty); 168 return remoteByLoad[0][0]; 169 } 170 171 void popFront() @safe pure nothrow { 172 assert(!empty); 173 remoteByLoad = remoteByLoad[1 .. $]; 174 } 175 176 bool empty() @safe pure nothrow { 177 return remoteByLoad.length == 0; 178 } 179 }