1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Updates the distssh server cache.
7 
8 # Design
9 
10 The overall design is based around a shared, local database that is used by
11 both the daemon and clients to exchange statistics about the cluster. By
12 leveraging sqlite for the database it becomes safe for processes to read/write
13 to it concurrently.
14 
15 The heartbeats are used by both the client and daemon:
16 
17  * The client spawn a daemon if the daemon heartbeats have stopped being
18    updated.
19  * The server slow down the update of the cluster statistics if no client has
20    used it in a while until it finally terminates itself.
21 */
22 module distssh.daemon;
23 
24 import core.thread : Thread;
25 import core.time : dur;
26 import logger = std.experimental.logger;
27 import std.algorithm : map, filter, max;
28 import std.array : array, empty;
29 import std.datetime;
30 import std.exception : collectException;
31 import std.process : environment;
32 import std.typecons : Flag;
33 
34 import colorlog;
35 import miniorm : SpinSqlTimeout;
36 import my.from_;
37 import my.named_type;
38 import my.set;
39 import my.timer;
40 
41 import distssh.config;
42 import distssh.database;
43 import distssh.metric;
44 import distssh.types;
45 import distssh.utility;
46 
47 Duration[3] updateLeastLoadTimersInterval = [
48     10.dur!"seconds", 20.dur!"seconds", 60.dur!"seconds"
49 ];
50 
51 int cli(const Config fconf, Config.Daemon conf) {
52     auto db = openDatabase(fconf.global.dbPath);
53     const origNode = getInode(fconf.global.dbPath);
54 
55     if (fconf.global.verbosity == VerboseMode.trace)
56         db.log(true);
57 
58     if (conf.background) {
59         const beat = db.getDaemonBeat;
60         logger.trace("daemon beat: ", beat);
61         // do not spawn if a daemon is already running.
62         if (beat < heartBeatDaemonTimeout && !conf.forceStart)
63             return 0;
64         // by only updating the beat when in background mode it ensures that
65         // the daemon will sooner or later start in persistant background mode.
66         db.daemonBeat;
67     }
68 
69     initMetrics(db, fconf.global.cluster, fconf.global.timeout);
70 
71     if (!conf.background)
72         return 0;
73 
74     // when starting the daemon for the first time we assume that if there are
75     // any data in the database that is old.
76     db.removeUnusedServers(1.dur!"minutes");
77 
78     bool running = true;
79     // the daemon is at most running for 24h. This is a workaround for if/when
80     // the client beat error out in such a way that it is always "zero".
81     const forceShutdown = Clock.currTime + 24.dur!"hours";
82     auto clientBeat = db.getClientBeat;
83     auto lastDaemonBeat = db.getDaemonBeatClock;
84 
85     auto timers = makeTimers;
86 
87     makeInterval(timers, () @trusted {
88         // update the local clientBeat continiouesly
89         clientBeat = db.getClientBeat;
90         logger.tracef("client beat: %s timeout: %s", clientBeat, conf.timeout);
91         return 10.dur!"seconds";
92     }, 10.dur!"seconds");
93 
94     makeInterval(timers, () @trusted {
95         import std.math : abs;
96         import std.random : Mt19937, dice, unpredictableSeed;
97 
98         // the current daemon beat in the database should match the last one
99         // this daemon wrote.  if it doesn't match it means there are multiple
100         // daemons running thus roll the dice, 50% chance this instance should
101         // shutdown.
102         const beat = db.getDaemonBeatClock;
103         const diff = abs((lastDaemonBeat - beat).total!"msecs");
104         logger.tracef("lastDaemonBeat: %s beat: %s diff: %s", lastDaemonBeat, beat, diff);
105 
106         if (diff > 2) {
107             Mt19937 gen;
108             gen.seed(unpredictableSeed);
109             running = gen.dice(0.5, 0.5) == 0;
110             logger.trace(!running,
111                 "multiple instances of distssh daemon is running. Terminating this instance.");
112         }
113         return 1.dur!"minutes";
114     }, 1.dur!"minutes");
115 
116     makeInterval(timers, () @trusted {
117         clientBeat = db.getClientBeat;
118         logger.tracef("client beat: %s timeout: %s", clientBeat, conf.timeout);
119         // no client is interested in the metric so stop collecting
120         if (clientBeat > conf.timeout) {
121             running = false;
122         }
123         if (Clock.currTime > forceShutdown) {
124             running = false;
125         }
126         return max(1.dur!"minutes", conf.timeout - clientBeat);
127     }, 10.dur!"seconds");
128 
129     makeInterval(timers, () @safe {
130         // the database may have been removed/recreated
131         if (getInode(fconf.global.dbPath) != origNode) {
132             running = false;
133         }
134         return 5.dur!"seconds";
135     }, 5.dur!"seconds");
136 
137     makeInterval(timers, () @trusted {
138         db.daemonBeat;
139         lastDaemonBeat = db.getDaemonBeatClock;
140         return 15.dur!"seconds";
141     }, 15.dur!"seconds");
142 
143     // the times are arbitrarily chosen.
144     // assumption. The oldest statistic do not have to be updated that often
145     // because the other loop, updating the best candidate, is running "fast".
146     // assumption. If a user use distssh slower than five minutes it mean that
147     // a long running processes is used and the user wont interact with distssh
148     // for a while.
149     makeInterval(timers, () @trusted {
150         auto host = db.getOldestServer;
151         if (!host.isNull) {
152             updateServer(db, host.get, fconf.global.timeout);
153         }
154 
155         if (clientBeat < 30.dur!"seconds")
156             return 10.dur!"seconds";
157         if (clientBeat < 5.dur!"minutes")
158             return 30.dur!"seconds";
159         return 60.dur!"seconds";
160     }, 15.dur!"seconds");
161 
162     // the times are arbitrarily chosen.
163     // assumption. The least loaded server will be getting jobs put on it not
164     // only from *this* instance of distssh but also from all other instances
165     // using the cluster. For this instance to be quick at moving job to
166     // another host it has to update the statistics often.
167     // assumption. A user that is using distssh less than 90s isn't using
168     // distssh interactively/in quick succession. By backing of/slowing down
169     // the update it lowers the network load.
170     long updateLeastLoadedTimerTick;
171     makeInterval(timers, () @trusted {
172         auto s = db.getLeastLoadedServer;
173         if (s.length > 0 && s.length < topCandidades) {
174             updateServer(db, s[updateLeastLoadedTimerTick % s.length], fconf.global.timeout);
175         } else if (s.length >= topCandidades) {
176             updateServer(db, s[updateLeastLoadedTimerTick], fconf.global.timeout);
177         }
178 
179         updateLeastLoadedTimerTick = ++updateLeastLoadedTimerTick % topCandidades;
180 
181         if (clientBeat < 30.dur!"seconds")
182             return updateLeastLoadTimersInterval[0];
183         if (clientBeat < 90.dur!"seconds")
184             return updateLeastLoadTimersInterval[1];
185         return updateLeastLoadTimersInterval[2];
186     }, 10.dur!"seconds");
187 
188     makeInterval(timers, () @trusted nothrow{
189         try {
190             db.removeUnusedServers(30.dur!"minutes");
191         } catch (Exception e) {
192             logger.warning(e.msg).collectException;
193         }
194         return 1.dur!"minutes";
195     }, 1.dur!"minutes");
196 
197     makeInterval(timers, () @trusted nothrow{
198         import std.range : take;
199         import distssh.connection;
200 
201         try {
202             // keep the multiplex connections open to the top candidates
203             foreach (h; db.getLeastLoadedServer.take(topCandidades)) {
204                 auto m = makeMaster(h);
205                 if (!m.isAlive) {
206                     m.connect;
207                 }
208             }
209         } catch (Exception e) {
210             logger.trace(e.msg).collectException;
211         }
212 
213         // open connections fast to the cluster while the client is using them
214         if (clientBeat < 5.dur!"minutes")
215             return 15.dur!"seconds";
216         return 1.dur!"minutes";
217     }, 5.dur!"seconds");
218 
219     if (globalEnvPurge in environment && globalEnvPurgeWhiteList in environment) {
220         import distssh.purge : readPurgeEnvWhiteList;
221 
222         Config.Purge pconf;
223         pconf.kill = true;
224         pconf.userFilter = true;
225         auto econf = ExecuteOnHostConf(fconf.global.workDir, typeof(fconf.global.command)
226                 .init, typeof(fconf.global.importEnv).init,
227                 typeof(fconf.global.cloneEnv)(false), typeof(fconf.global.noImportEnv)(true));
228         Set!Host clearedServers;
229 
230         logger.tracef("Server purge whitelist from %s is %s",
231                 globalEnvPurgeWhiteList, readPurgeEnvWhiteList);
232 
233         makeInterval(timers, () @safe nothrow{
234             try {
235                 purgeServer(db, econf, pconf, clearedServers, fconf.global.timeout);
236             } catch (Exception e) {
237                 logger.warning(e.msg).collectException;
238             }
239             if (clientBeat < 2.dur!"minutes")
240                 return 1.dur!"minutes";
241             return 2.dur!"minutes";
242         }, 2.dur!"minutes");
243     } else {
244         logger.tracef("Automatic purge not running because both %s and %s must be set",
245                 globalEnvPurge, globalEnvPurgeWhiteList);
246     }
247 
248     while (running && !timers.empty) {
249         try {
250             timers.tick(100.dur!"msecs");
251         } catch (SpinSqlTimeout e) {
252             // the database is removed or something else "bad" has happend that
253             // the database access has started throwing exceptions.
254             return 1;
255         }
256     }
257 
258     return 0;
259 }
260 
261 /** Start the daemon in either as a persistant background process or a oneshot
262  * update.
263  *
264  * Returns: true if the daemon where started.
265  */
266 bool startDaemon(ref from.miniorm.Miniorm db, Flag!"background" bg) nothrow {
267     import std.file : thisExePath;
268     import my.process : spawnDaemon;
269 
270     try {
271         if (bg && db.getDaemonBeat < heartBeatDaemonTimeout) {
272             return false;
273         }
274 
275         const flags = () {
276             if (bg)
277                 return ["--background"];
278             return null;
279         }();
280 
281         spawnDaemon([thisExePath, "daemon"] ~ flags);
282         logger.trace("daemon spawned");
283         return true;
284     } catch (Exception e) {
285         logger.error(e.msg).collectException;
286     }
287 
288     return false;
289 }
290 
291 private:
292 
293 immutable heartBeatDaemonTimeout = 60.dur!"seconds";
294 
295 void initMetrics(ref from.miniorm.Miniorm db, const(Host)[] cluster, Duration timeout) nothrow {
296     import std.parallelism : TaskPool;
297     import std.random : randomCover;
298     import std.typecons : tuple;
299 
300     static auto loadHost(T)(T host_timeout) nothrow {
301         import std.concurrency : thisTid;
302 
303         logger.trace("load testing thread id: ", thisTid).collectException;
304         return HostLoad(host_timeout[0], getLoad(host_timeout[0], host_timeout[1]));
305     }
306 
307     try {
308         auto pool = new TaskPool();
309         scope (exit)
310             pool.stop;
311 
312         foreach (v; pool.amap!(loadHost)(cluster.randomCover.map!(a => tuple(a, timeout)).array)) {
313             db.newServer(v);
314         }
315     } catch (Exception e) {
316         logger.trace(e.msg).collectException;
317     }
318 }
319 
320 void updateServer(ref from.miniorm.Miniorm db, Host host, Duration timeout) {
321     auto load = getLoad(host, timeout);
322     distssh.database.updateServer(db, HostLoad(host, load));
323     logger.tracef("Update %s with %s", host, load).collectException;
324 }
325 
326 /// Round robin clearing of the servers.
327 void purgeServer(ref from.miniorm.Miniorm db, ExecuteOnHostConf econf,
328         const Config.Purge pconf, ref Set!Host clearedServers, const Duration timeout) @safe {
329     import std.algorithm : joiner;
330     import std.random : randomCover;
331     import std.range : only;
332     import distssh.purge;
333 
334     auto servers = distssh.database.getServerLoads(db, clearedServers.toArray,
335             timeout, 10.dur!"minutes");
336 
337     logger.trace("Round robin server purge list ", clearedServers.toArray);
338 
339     bool clearedAServer;
340     foreach (a; only(servers.online, servers.unused).joiner
341             .array
342             .randomCover
343             .map!(a => a.host)
344             .filter!(a => !clearedServers.contains(a))) {
345         logger.trace("Purge server ", a);
346         clearedAServer = true;
347         distssh.purge.purgeServer(econf, pconf, a);
348         clearedServers.add(a);
349         break;
350     }
351 
352     if (!clearedAServer) {
353         logger.trace("Reset server purge list ");
354         clearedServers = Set!Host.init;
355     }
356 }
357 
358 struct Inode {
359     ulong dev;
360     ulong ino;
361 
362     bool opEquals()(auto ref const typeof(this) s) const {
363         return dev == s.dev && ino == s.ino;
364     }
365 }
366 
367 Inode getInode(const Path p) @trusted nothrow {
368     import core.sys.posix.sys.stat : stat_t, stat;
369     import std.file : isSymlink, exists;
370     import std..string : toStringz;
371 
372     const pz = p.toString.toStringz;
373 
374     if (!exists(p.toString)) {
375         return Inode(0, 0);
376     } else {
377         stat_t st = void;
378         // should NOT use lstat because we want to know even if the symlink is
379         // redirected etc.
380         stat(pz, &st);
381         return Inode(st.st_dev, st.st_ino);
382     }
383 }