1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module distssh.metric;
7 
8 import logger = std.experimental.logger;
9 import std.algorithm : map, filter, splitter, joiner, sort, copy;
10 import std.array : empty, array, appender;
11 import std.datetime : Duration, dur;
12 import std.exception : collectException, ifThrown;
13 import std.range : take, drop, only;
14 import std.typecons : Nullable, Yes, No;
15 
16 import distssh.types;
17 
18 /** Login on host and measure its load.
19  *
20  * Params:
21  *  h = remote host to check
22  *
23  * Returns: the Load of the remote host
24  */
25 Load getLoad(Host h, Duration timeout) nothrow {
26     import std.conv : to;
27     import std.datetime.stopwatch : StopWatch, AutoStart;
28     import std.process : tryWait, pipeProcess, kill, wait;
29     import std.range : takeOne, retro;
30     import std.stdio : writeln;
31     import core.sys.posix.signal : SIGKILL;
32     import my.timer : makeTimers, makeInterval;
33     import distssh.connection : sshLoadArgs;
34 
35     enum ExitCode {
36         none,
37         error,
38         timeout,
39         ok,
40     }
41 
42     ExitCode exit_code;
43 
44     Nullable!Load measure() {
45         auto sw = StopWatch(AutoStart.yes);
46 
47         auto res = pipeProcess(sshLoadArgs(h).toArgs);
48 
49         Duration checkExitCode() @trusted {
50             auto st = res.pid.tryWait;
51 
52             if (st.terminated && st.status == 0) {
53                 exit_code = ExitCode.ok;
54             } else if (st.terminated && st.status != 0) {
55                 exit_code = ExitCode.error;
56             } else if (sw.peek >= timeout) {
57                 exit_code = ExitCode.timeout;
58                 res.pid.kill(SIGKILL);
59                 // must read the exit or a zombie process is left behind
60                 res.pid.wait;
61             }
62 
63             if (exit_code == ExitCode.none)
64                 return 25.dur!"msecs";
65             return Duration.min;
66         }
67 
68         // 25 because it is at the perception of human "lag" and less than the
69         // 100 msecs that is the intention of the average delay.
70         auto timers = makeTimers;
71         makeInterval(timers, &checkExitCode, 25.dur!"msecs");
72 
73         while (!timers.empty) {
74             timers.tick(25.dur!"msecs");
75         }
76 
77         sw.stop;
78 
79         Nullable!Load rval;
80 
81         if (exit_code != ExitCode.ok)
82             return rval;
83 
84         try {
85             string last_line;
86             foreach (a; res.stdout.byLineCopy) {
87                 last_line = a;
88             }
89 
90             rval = Load(last_line.to!double, sw.peek, false);
91         } catch (Exception e) {
92             logger.trace(res.stdout).collectException;
93             logger.trace(res.stderr).collectException;
94             logger.trace(e.msg).collectException;
95         }
96 
97         return rval;
98     }
99 
100     try {
101         auto r = measure();
102         if (!r.isNull)
103             return r.get;
104     } catch (Exception e) {
105         logger.trace(e.msg).collectException;
106     }
107 
108     return Load.init;
109 }
110 
111 /**
112  * #SPC-load_balance
113  * #SPC-best_remote_host
114  *
115  * TODO: it can be empty. how to handle that?
116  */
117 struct RemoteHostCache {
118     private {
119         HostLoad[] online;
120         HostLoad[] unused;
121     }
122 
123     static auto make(Path dbPath, const Host[] cluster) nothrow {
124         import distssh.daemon : startDaemon, updateLeastLoadTimersInterval;
125         import distssh.database;
126 
127         try {
128             auto db = openDatabase(dbPath);
129             db.clientBeat;
130             const started = startDaemon(db, Yes.background);
131             if (!started) {
132                 db.syncCluster(cluster);
133             }
134             db.updateLastUse(cluster);
135 
136             // if no hosts are found in the db within the timeout then go over
137             // into a fast mode. This happens if the client e.g. switches the
138             // cluster it is using.
139             auto servers = db.getServerLoads(cluster, 2.dur!"seconds",
140                     updateLeastLoadTimersInterval[$ - 1]);
141             if (servers.online.empty) {
142                 logger.trace("starting daemon in oneshot mode");
143                 startDaemon(db, No.background);
144 
145                 import core.thread : Thread;
146                 import core.time : dur;
147 
148                 // give the background process time to update some servers
149                 Thread.sleep(2.dur!"seconds");
150                 servers = db.getServerLoads(cluster, 60.dur!"seconds",
151                         updateLeastLoadTimersInterval[$ - 1]);
152             }
153 
154             return RemoteHostCache(servers.online, servers.unused);
155         } catch (Exception e) {
156             logger.error(e.msg).collectException;
157         }
158         return RemoteHostCache.init;
159     }
160 
161     /// Returns: a range starting with the best server to use going to the worst.
162     auto bestSelectRange() @safe nothrow {
163         return BestSelectRange(online);
164     }
165 
166     auto unusedRange() @safe nothrow {
167         return unused.sort!((a, b) => a.host < b.host);
168     }
169 
170     /// Returns: a range over the online servers
171     auto onlineRange() @safe nothrow {
172         return online.sort!((a, b) => a.host < b.host);
173     }
174 
175     /// Returns: a range over all servers
176     auto allRange() @safe nothrow {
177         return only(online, unused).joiner.array.sort!((a, b) => a.host < b.host);
178     }
179 }
180 
181 @safe struct BestSelectRange {
182     private {
183         HostLoad[] servers;
184     }
185 
186     this(HostLoad[] servers) nothrow {
187         import std.algorithm : sort;
188 
189         this.servers = reorder(servers.sort!((a, b) => a.load < b.load)
190                 .filter!(a => !a.load.unknown)
191                 .array);
192     }
193 
194     /// Reorder the first three candidates in the server list in a random order.
195     static private HostLoad[] reorder(HostLoad[] servers) @safe nothrow {
196         import std.random : randomCover;
197 
198         auto app = appender!(HostLoad[])();
199 
200         if (servers.length < 3) {
201             servers.randomCover.copy(app);
202         } else {
203             servers.take(topCandidades).randomCover.copy(app);
204             servers.drop(topCandidades).copy(app);
205         }
206 
207         return app.data;
208     }
209 
210     Host front() @safe pure nothrow {
211         assert(!empty, "should never happen");
212         return servers[0].host;
213     }
214 
215     void popFront() @safe pure nothrow {
216         assert(!empty, "should never happen");
217         servers = servers[1 .. $];
218     }
219 
220     bool empty() @safe pure nothrow {
221         return servers.empty;
222     }
223 }