1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module distssh.metric;
7 
8 import logger = std.experimental.logger;
9 import std.algorithm : map, filter, splitter;
10 import std.datetime : Duration;
11 import std.exception : collectException;
12 import std.typecons : Nullable;
13 
14 import distssh.types;
15 
16 /** Login on host and measure its load.
17  *
18  * Params:
19  *  h = remote host to check
20  *
21  * Returns: the Load of the remote host
22  */
23 Load getLoad(Host h, Duration timeout) nothrow {
24     import std.conv : to;
25     import std.datetime.stopwatch : StopWatch, AutoStart;
26     import std.file : thisExePath;
27     import std.process : tryWait, pipeProcess, kill, wait, escapeShellFileName;
28     import std.range : takeOne, retro;
29     import std.stdio : writeln;
30     import core.time : dur;
31     import core.sys.posix.signal : SIGKILL;
32     import distssh.timer : makeTimers, makeInterval;
33 
34     enum ExitCode {
35         none,
36         error,
37         timeout,
38         ok,
39     }
40 
41     ExitCode exit_code;
42 
43     Nullable!Load measure() {
44         auto sw = StopWatch(AutoStart.yes);
45 
46         immutable abs_distssh = thisExePath;
47         auto res = pipeProcess(["ssh", "-q"] ~ sshNoLoginArgs ~ [
48                 h, abs_distssh.escapeShellFileName, "localload"
49                 ]);
50 
51         bool checkExitCode() @trusted {
52             auto st = res.pid.tryWait;
53 
54             if (st.terminated && st.status == 0) {
55                 exit_code = ExitCode.ok;
56             } else if (st.terminated && st.status != 0) {
57                 exit_code = ExitCode.error;
58             } else if (sw.peek >= timeout) {
59                 exit_code = ExitCode.timeout;
60                 res.pid.kill(SIGKILL);
61                 // must read the exit or a zombie process is left behind
62                 res.pid.wait;
63             }
64             return exit_code == ExitCode.none;
65         }
66 
67         // 25 because it is at the perception of human "lag" and less than the 100
68         // msecs that is the intention of the average delay.
69         auto timers = makeTimers;
70         makeInterval(timers, &checkExitCode, 25.dur!"msecs");
71 
72         while (!timers.empty) {
73             timers.tick(25.dur!"msecs");
74         }
75 
76         sw.stop;
77 
78         Nullable!Load rval;
79 
80         if (exit_code != ExitCode.ok)
81             return rval;
82 
83         try {
84             string last_line;
85             foreach (a; res.stdout.byLineCopy) {
86                 last_line = a;
87             }
88 
89             rval = Load(last_line.to!double, sw.peek);
90         } catch (Exception e) {
91             logger.trace(res.stdout).collectException;
92             logger.trace(res.stderr).collectException;
93             logger.trace(e.msg).collectException;
94         }
95 
96         return rval;
97     }
98 
99     try {
100         auto r = measure();
101         if (!r.isNull)
102             return r.get;
103     } catch (Exception e) {
104         logger.trace(e.msg).collectException;
105     }
106 
107     return Load(int.max, 3600.dur!"seconds", true);
108 }
109 
110 /**
111  * #SPC-load_balance
112  * #SPC-best_remote_host
113  *
114  * TODO: it can be empty. how to handle that?
115  */
116 struct RemoteHostCache {
117     import std.array : array;
118 
119     HostLoad[] remoteByLoad;
120 
121     static auto make(string dbPath, const Host[] cluster) nothrow {
122         import distssh.daemon : startDaemon;
123         import distssh.database;
124         import std.algorithm : sort;
125 
126         try {
127             auto db = openDatabase(dbPath);
128             startDaemon(db);
129             db.syncCluster(cluster);
130             auto servers = db.getServerLoads(cluster);
131             db.removeUnusedServers(servers.unused);
132             return RemoteHostCache(servers.online.sort!((a, b) => a[1] < b[1]).array);
133         } catch (Exception e) {
134             logger.error(e.msg).collectException;
135         }
136         return RemoteHostCache.init;
137     }
138 
139     /// Returns: the lowest loaded server.
140     Host randomAndPop() @safe nothrow {
141         import std.range : take;
142         import std.random : randomSample;
143 
144         assert(!empty, "should never happen");
145 
146         auto rval = remoteByLoad[0][0];
147 
148         try {
149             auto topX = remoteByLoad.filter!(a => !a[1].unknown).array;
150             if (topX.length == 0) {
151                 rval = remoteByLoad[0][0];
152             } else if (topX.length < 3) {
153                 rval = topX[0][0];
154             } else {
155                 rval = topX.take(3).randomSample(1).front[0];
156             }
157         } catch (Exception e) {
158             logger.trace(e.msg).collectException;
159         }
160 
161         remoteByLoad = remoteByLoad.filter!(a => a[0] != rval).array;
162 
163         return rval;
164     }
165 
166     Host front() @safe pure nothrow {
167         assert(!empty);
168         return remoteByLoad[0][0];
169     }
170 
171     void popFront() @safe pure nothrow {
172         assert(!empty);
173         remoteByLoad = remoteByLoad[1 .. $];
174     }
175 
176     bool empty() @safe pure nothrow {
177         return remoteByLoad.length == 0;
178     }
179 }