1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module distssh.metric; 7 8 import logger = std.experimental.logger; 9 import std.algorithm : map, filter, splitter, joiner, sort, copy; 10 import std.array : empty, array, appender; 11 import std.datetime : Duration, dur; 12 import std.exception : collectException, ifThrown; 13 import std.range : take, drop, only; 14 import std.typecons : Nullable, Yes, No; 15 16 import distssh.types; 17 18 /** Login on host and measure its load. 19 * 20 * Params: 21 * h = remote host to check 22 * 23 * Returns: the Load of the remote host 24 */ 25 Load getLoad(Host h, Duration timeout) nothrow { 26 import std.conv : to; 27 import std.datetime.stopwatch : StopWatch, AutoStart; 28 import std.process : tryWait, pipeProcess, kill, wait; 29 import std.range : takeOne, retro; 30 import std.stdio : writeln; 31 import core.sys.posix.signal : SIGKILL; 32 import my.timer : makeTimers, makeInterval; 33 import distssh.connection : sshLoadArgs; 34 35 enum ExitCode { 36 none, 37 error, 38 timeout, 39 ok, 40 } 41 42 ExitCode exit_code; 43 44 Nullable!Load measure() { 45 auto sw = StopWatch(AutoStart.yes); 46 47 auto res = pipeProcess(sshLoadArgs(h).toArgs); 48 49 Duration checkExitCode() @trusted { 50 auto st = res.pid.tryWait; 51 52 if (st.terminated && st.status == 0) { 53 exit_code = ExitCode.ok; 54 } else if (st.terminated && st.status != 0) { 55 exit_code = ExitCode.error; 56 } else if (sw.peek >= timeout) { 57 exit_code = ExitCode.timeout; 58 res.pid.kill(SIGKILL); 59 // must read the exit or a zombie process is left behind 60 res.pid.wait; 61 } 62 63 if (exit_code == ExitCode.none) 64 return 25.dur!"msecs"; 65 return Duration.min; 66 } 67 68 // 25 because it is at the perception of human "lag" and less than the 69 // 100 msecs that is the intention of the average delay. 70 auto timers = makeTimers; 71 makeInterval(timers, &checkExitCode, 25.dur!"msecs"); 72 73 while (!timers.empty) { 74 timers.tick(25.dur!"msecs"); 75 } 76 77 sw.stop; 78 79 Nullable!Load rval; 80 81 if (exit_code != ExitCode.ok) 82 return rval; 83 84 try { 85 string last_line; 86 foreach (a; res.stdout.byLineCopy) { 87 last_line = a; 88 } 89 90 rval = Load(last_line.to!double, sw.peek, false); 91 } catch (Exception e) { 92 logger.trace(res.stdout).collectException; 93 logger.trace(res.stderr).collectException; 94 logger.trace(e.msg).collectException; 95 } 96 97 return rval; 98 } 99 100 try { 101 auto r = measure(); 102 if (!r.isNull) 103 return r.get; 104 } catch (Exception e) { 105 logger.trace(e.msg).collectException; 106 } 107 108 return Load.init; 109 } 110 111 /** 112 * #SPC-load_balance 113 * #SPC-best_remote_host 114 * 115 * TODO: it can be empty. how to handle that? 116 */ 117 struct RemoteHostCache { 118 private { 119 HostLoad[] online; 120 HostLoad[] unused; 121 } 122 123 static auto make(Path dbPath, const Host[] cluster) nothrow { 124 import distssh.daemon : startDaemon, updateLeastLoadTimersInterval; 125 import distssh.database; 126 127 try { 128 auto db = openDatabase(dbPath); 129 db.clientBeat; 130 const started = startDaemon(db, Yes.background); 131 if (!started) { 132 db.syncCluster(cluster); 133 } 134 db.updateLastUse(cluster); 135 136 // if no hosts are found in the db within the timeout then go over 137 // into a fast mode. This happens if the client e.g. switches the 138 // cluster it is using. 139 auto servers = db.getServerLoads(cluster, 2.dur!"seconds", 140 updateLeastLoadTimersInterval[$ - 1]); 141 if (servers.online.empty) { 142 logger.trace("starting daemon in oneshot mode"); 143 startDaemon(db, No.background); 144 145 import core.thread : Thread; 146 import core.time : dur; 147 148 // give the background process time to update some servers 149 Thread.sleep(2.dur!"seconds"); 150 servers = db.getServerLoads(cluster, 60.dur!"seconds", 151 updateLeastLoadTimersInterval[$ - 1]); 152 } 153 154 return RemoteHostCache(servers.online, servers.unused); 155 } catch (Exception e) { 156 logger.error(e.msg).collectException; 157 } 158 return RemoteHostCache.init; 159 } 160 161 /// Returns: a range starting with the best server to use going to the worst. 162 auto bestSelectRange() @safe nothrow { 163 return BestSelectRange(online); 164 } 165 166 auto unusedRange() @safe nothrow { 167 return unused.sort!((a, b) => a.host < b.host); 168 } 169 170 /// Returns: a range over the online servers 171 auto onlineRange() @safe nothrow { 172 return online.sort!((a, b) => a.host < b.host); 173 } 174 175 /// Returns: a range over all servers 176 auto allRange() @safe nothrow { 177 return only(online, unused).joiner.array.sort!((a, b) => a.host < b.host); 178 } 179 } 180 181 @safe struct BestSelectRange { 182 private { 183 HostLoad[] servers; 184 } 185 186 this(HostLoad[] servers) nothrow { 187 import std.algorithm : sort; 188 189 this.servers = reorder(servers.sort!((a, b) => a.load < b.load) 190 .filter!(a => !a.load.unknown) 191 .array); 192 } 193 194 /// Reorder the first three candidates in the server list in a random order. 195 static private HostLoad[] reorder(HostLoad[] servers) @safe nothrow { 196 import std.random : randomCover; 197 198 auto app = appender!(HostLoad[])(); 199 200 if (servers.length < 3) { 201 servers.randomCover.copy(app); 202 } else { 203 servers.take(topCandidades).randomCover.copy(app); 204 servers.drop(topCandidades).copy(app); 205 } 206 207 return app.data; 208 } 209 210 Host front() @safe pure nothrow { 211 assert(!empty, "should never happen"); 212 return servers[0].host; 213 } 214 215 void popFront() @safe pure nothrow { 216 assert(!empty, "should never happen"); 217 servers = servers[1 .. $]; 218 } 219 220 bool empty() @safe pure nothrow { 221 return servers.empty; 222 } 223 }