1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Updates the distssh server cache.
7 
8 # Design
9 
10 The overall design is based around a shared, local database that is used by
11 both the daemon and clients to exchange statistics about the cluster. By
12 leveraging sqlite for the database it becomes safe for processes to read/write
13 to it concurrently.
14 
15 The heartbeats are used by both the client and daemon:
16 
17  * The client spawn a daemon if the daemon heartbeats have stopped being
18    updated.
19  * The server slow down the update of the cluster statistics if no client has
20    used it in a while until it finally terminates itself.
21 */
22 module distssh.daemon;
23 
24 import core.thread : Thread;
25 import core.time : dur;
26 import logger = std.experimental.logger;
27 import std.algorithm : map, filter, max;
28 import std.array : array, empty;
29 import std.datetime;
30 import std.exception : collectException;
31 import std.process : environment;
32 import std.typecons : Flag;
33 
34 import colorlog;
35 import miniorm : SpinSqlTimeout;
36 import my.from_;
37 import my.set;
38 import my.timer;
39 
40 import distssh.config;
41 import distssh.database;
42 import distssh.metric;
43 import distssh.types;
44 import distssh.utility;
45 
46 Duration[3] updateLeastLoadTimersInterval = [
47     10.dur!"seconds", 20.dur!"seconds", 60.dur!"seconds"
48 ];
49 
50 int cli(const Config fconf, Config.Daemon conf) {
51     auto db = openDatabase(fconf.global.dbPath);
52     const origNode = getInode(fconf.global.dbPath);
53 
54     if (fconf.global.verbosity == VerboseMode.trace)
55         db.log(true);
56 
57     if (conf.background) {
58         const beat = db.getDaemonBeat;
59         logger.trace("daemon beat: ", beat);
60         // do not spawn if a daemon is already running.
61         if (beat < heartBeatDaemonTimeout && !conf.forceStart)
62             return 0;
63         // by only updating the beat when in background mode it ensures that
64         // the daemon will sooner or later start in persistant background mode.
65         db.daemonBeat;
66     }
67 
68     initMetrics(db, fconf.global.cluster, fconf.global.timeout);
69 
70     if (!conf.background)
71         return 0;
72 
73     // when starting the daemon for the first time we assume that if there are
74     // any data in the database that is old.
75     db.removeUnusedServers(1.dur!"minutes");
76 
77     bool running = true;
78     // the daemon is at most running for 24h. This is a workaround for if/when
79     // the client beat error out in such a way that it is always "zero".
80     const forceShutdown = Clock.currTime + 24.dur!"hours";
81     auto clientBeat = db.getClientBeat;
82 
83     auto timers = makeTimers;
84 
85     makeInterval(timers, () @trusted {
86         clientBeat = db.getClientBeat;
87         logger.tracef("client beat: %s timeout: %s", clientBeat, conf.timeout);
88         return 10.dur!"seconds";
89     }, 10.dur!"seconds");
90 
91     makeInterval(timers, () @trusted {
92         clientBeat = db.getClientBeat;
93         logger.tracef("client beat: %s timeout: %s", clientBeat, conf.timeout);
94         // no client is interested in the metric so stop collecting
95         if (clientBeat > conf.timeout) {
96             running = false;
97         }
98         if (Clock.currTime > forceShutdown) {
99             running = false;
100         }
101         return max(1.dur!"minutes", conf.timeout - clientBeat);
102     }, 10.dur!"seconds");
103 
104     makeInterval(timers, () @safe {
105         // the database may have been removed/recreated
106         if (getInode(fconf.global.dbPath) != origNode) {
107             running = false;
108         }
109         return 5.dur!"seconds";
110     }, 5.dur!"seconds");
111 
112     makeInterval(timers, () @trusted { db.daemonBeat; return 15.dur!"seconds"; }, 15.dur!"seconds");
113 
114     // the times are arbitrarily chosen.
115     // assumption. The oldest statistic do not have to be updated that often
116     // because the other loop, updating the best candidate, is running "fast".
117     // assumption. If a user use distssh slower than five minutes it mean that
118     // a long running processes is used and the user wont interact with distssh
119     // for a while.
120     makeInterval(timers, () @trusted {
121         auto host = db.getOldestServer;
122         if (!host.isNull) {
123             updateServer(db, host.get, fconf.global.timeout);
124         }
125 
126         if (clientBeat < 30.dur!"seconds")
127             return 10.dur!"seconds";
128         if (clientBeat < 5.dur!"minutes")
129             return 30.dur!"seconds";
130         return 60.dur!"seconds";
131     }, 15.dur!"seconds");
132 
133     // the times are arbitrarily chosen.
134     // assumption. The least loaded server will be getting jobs put on it not
135     // only from *this* instance of distssh but also from all other instances
136     // using the cluster. For this instance to be quick at moving job to
137     // another host it has to update the statistics often.
138     // assumption. A user that is using distssh less than 90s isn't using
139     // distssh interactively/in quick succession. By backing of/slowing down
140     // the update it lowers the network load.
141     long updateLeastLoadedTimerTick;
142     makeInterval(timers, () @trusted {
143         auto s = db.getLeastLoadedServer;
144         if (s.length > 0 && s.length < topCandidades) {
145             updateServer(db, s[updateLeastLoadedTimerTick % s.length], fconf.global.timeout);
146         } else if (s.length >= topCandidades) {
147             updateServer(db, s[updateLeastLoadedTimerTick], fconf.global.timeout);
148         }
149 
150         updateLeastLoadedTimerTick = ++updateLeastLoadedTimerTick % topCandidades;
151 
152         if (clientBeat < 30.dur!"seconds")
153             return updateLeastLoadTimersInterval[0];
154         if (clientBeat < 90.dur!"seconds")
155             return updateLeastLoadTimersInterval[1];
156         return updateLeastLoadTimersInterval[2];
157     }, 10.dur!"seconds");
158 
159     makeInterval(timers, () @trusted nothrow{
160         try {
161             db.removeUnusedServers(30.dur!"minutes");
162         } catch (Exception e) {
163             logger.warning(e.msg).collectException;
164         }
165         return 1.dur!"minutes";
166     }, 1.dur!"minutes");
167 
168     makeInterval(timers, () @trusted nothrow{
169         import std.range : take;
170         import distssh.connection;
171 
172         try {
173             // keep the multiplex connections open to the top candidates
174             foreach (h; db.getLeastLoadedServer.take(topCandidades)) {
175                 auto m = makeMaster(h);
176                 if (!m.isAlive) {
177                     m.connect;
178                 }
179             }
180         } catch (Exception e) {
181             logger.trace(e.msg).collectException;
182         }
183 
184         // open connections fast to the cluster while the client is using them
185         if (clientBeat < 5.dur!"minutes")
186             return 15.dur!"seconds";
187         return 1.dur!"minutes";
188     }, 5.dur!"seconds");
189 
190     if (globalEnvPurge in environment && globalEnvPurgeWhiteList in environment) {
191         import distssh.purge : readPurgeEnvWhiteList;
192 
193         Config.Purge pconf;
194         pconf.kill = true;
195         pconf.userFilter = true;
196         auto econf = ExecuteOnHostConf(fconf.global.workDir, null, null, false, true);
197         Set!Host clearedServers;
198 
199         logger.tracef("Server purge whitelist from %s is %s",
200                 globalEnvPurgeWhiteList, readPurgeEnvWhiteList);
201 
202         makeInterval(timers, () @safe nothrow{
203             try {
204                 purgeServer(db, econf, pconf, clearedServers, fconf.global.timeout);
205             } catch (Exception e) {
206                 logger.warning(e.msg).collectException;
207             }
208             if (clientBeat < 2.dur!"minutes")
209                 return 1.dur!"minutes";
210             return 2.dur!"minutes";
211         }, 2.dur!"minutes");
212     } else {
213         logger.tracef("Automatic purge not running because both %s and %s must be set",
214                 globalEnvPurge, globalEnvPurgeWhiteList);
215     }
216 
217     while (running && !timers.empty) {
218         try {
219             timers.tick(100.dur!"msecs");
220         } catch (SpinSqlTimeout e) {
221             // the database is removed or something else "bad" has happend that
222             // the database access has started throwing exceptions.
223             return 1;
224         }
225     }
226 
227     return 0;
228 }
229 
230 /** Start the daemon in either as a persistant background process or a oneshot
231  * update.
232  *
233  * Returns: true if the daemon where started.
234  */
235 bool startDaemon(ref from.miniorm.Miniorm db, Flag!"background" bg) nothrow {
236     import std.file : thisExePath;
237     import my.process : spawnDaemon;
238 
239     try {
240         if (bg && db.getDaemonBeat < heartBeatDaemonTimeout) {
241             return false;
242         }
243 
244         const flags = () {
245             if (bg)
246                 return ["--background"];
247             return null;
248         }();
249 
250         spawnDaemon([thisExePath, "daemon"] ~ flags);
251         logger.trace("daemon spawned");
252         return true;
253     } catch (Exception e) {
254         logger.error(e.msg).collectException;
255     }
256 
257     return false;
258 }
259 
260 private:
261 
262 immutable heartBeatDaemonTimeout = 60.dur!"seconds";
263 
264 void initMetrics(ref from.miniorm.Miniorm db, const(Host)[] cluster, Duration timeout) nothrow {
265     import std.parallelism : TaskPool;
266     import std.random : randomCover;
267     import std.typecons : tuple;
268 
269     static auto loadHost(T)(T host_timeout) nothrow {
270         import std.concurrency : thisTid;
271 
272         logger.trace("load testing thread id: ", thisTid).collectException;
273         return HostLoad(host_timeout[0], getLoad(host_timeout[0], host_timeout[1]));
274     }
275 
276     try {
277         auto pool = new TaskPool();
278         scope (exit)
279             pool.stop;
280 
281         foreach (v; pool.amap!(loadHost)(cluster.randomCover.map!(a => tuple(a, timeout)).array)) {
282             db.newServer(v);
283         }
284     } catch (Exception e) {
285         logger.trace(e.msg).collectException;
286     }
287 }
288 
289 void updateServer(ref from.miniorm.Miniorm db, Host host, Duration timeout) {
290     auto load = getLoad(host, timeout);
291     distssh.database.updateServer(db, HostLoad(host, load));
292     logger.tracef("Update %s with %s", host, load).collectException;
293 }
294 
295 /// Round robin clearing of the servers.
296 void purgeServer(ref from.miniorm.Miniorm db, ExecuteOnHostConf econf,
297         const Config.Purge pconf, ref Set!Host clearedServers, const Duration timeout) @safe {
298     import std.algorithm : joiner;
299     import std.random : randomCover;
300     import std.range : only;
301     import distssh.purge;
302 
303     auto servers = distssh.database.getServerLoads(db, clearedServers.toArray,
304             timeout, 10.dur!"minutes");
305 
306     logger.trace("Round robin server purge list ", clearedServers.toArray);
307 
308     bool clearedAServer;
309     foreach (a; only(servers.online, servers.unused).joiner
310             .array
311             .randomCover
312             .map!(a => a.host)
313             .filter!(a => !clearedServers.contains(a))) {
314         logger.trace("Purge server ", a);
315         clearedAServer = true;
316         distssh.purge.purgeServer(econf, pconf, a);
317         clearedServers.add(a);
318         break;
319     }
320 
321     if (!clearedAServer) {
322         logger.trace("Reset server purge list ");
323         clearedServers = Set!Host.init;
324     }
325 }
326 
327 struct Inode {
328     ulong dev;
329     ulong ino;
330 
331     bool opEquals()(auto ref const typeof(this) s) const {
332         return dev == s.dev && ino == s.ino;
333     }
334 }
335 
336 Inode getInode(const Path p) @trusted nothrow {
337     import core.sys.posix.sys.stat : stat_t, stat;
338     import std.file : isSymlink, exists;
339     import std..string : toStringz;
340 
341     const pz = p.toString.toStringz;
342 
343     if (!exists(p.toString)) {
344         return Inode(0, 0);
345     } else {
346         stat_t st = void;
347         // should NOT use lstat because we want to know even if the symlink is
348         // redirected etc.
349         stat(pz, &st);
350         return Inode(st.st_dev, st.st_ino);
351     }
352 }