1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 Updates the distssh server cache.
7 
8 # Design
9 
10 The overall design is based around a shared, local database that is used by
11 both the daemon and clients to exchange statistics about the cluster. By
12 leveraging sqlite for the database it becomes safe for processes to read/write
13 to it concurrently.
14 
15 The heartbeats are used by both the client and daemon:
16 
17  * The client spawn a daemon if the daemon heartbeats have stopped being
18    updated.
19  * The server slow down the update of the cluster statistics if no client has
20    used it in a while until it finally terminates itself.
21 */
22 module distssh.daemon;
23 
24 import core.thread : Thread;
25 import core.time : dur;
26 import logger = std.experimental.logger;
27 import std.algorithm : map;
28 import std.array : array;
29 import std.datetime;
30 import std.exception : collectException;
31 
32 import colorlog;
33 import miniorm : SpinSqlTimeout;
34 
35 import from_;
36 
37 import distssh.database;
38 import distssh.types;
39 import distssh.config;
40 import distssh.database;
41 import distssh.metric;
42 import distssh.timer;
43 
44 int cli(const Config fconf, Config.Daemon conf) {
45     auto db = openDatabase(fconf.global.dbPath);
46     const origNode = getInode(fconf.global.dbPath);
47 
48     {
49         const beat = db.getDaemonBeat;
50         logger.trace("daemon beat: ", beat);
51         // do not spawn if a daemon is already running.
52         if (beat < heartBeatDaemonTimeout)
53             return 0;
54     }
55 
56     db.daemonBeat;
57     initMetrics(db, fconf.global.cluster, fconf.global.timeout);
58 
59     bool running = true;
60     auto clientBeat = db.getClientBeat;
61 
62     auto timers = makeTimers;
63 
64     makeInterval(timers, () @trusted {
65         clientBeat = db.getClientBeat;
66         logger.trace("client beat: ", clientBeat);
67         // no client is interested in the metric so stop collecting
68         if (clientBeat > heartBeatClientTimeout)
69             running = false;
70         return running;
71     }, 5.dur!"seconds");
72 
73     makeInterval(timers, () @safe {
74         // the database may have been removed/recreated
75         if (getInode(fconf.global.dbPath) != origNode)
76             running = false;
77         return running;
78     }, 5.dur!"seconds");
79 
80     makeInterval(timers, () @trusted { db.daemonBeat; return running; }, 15.dur!"seconds");
81 
82     void updateOldestTimer(ref Timers ts) @trusted {
83         auto host = db.getOldestServer;
84         if (!host.isNull)
85             updateServer(db, host.get, fconf.global.timeout);
86 
87         // the times are arbitrarily chosen.
88         // assumption. The oldest statistic do not have to be updated that
89         // often because the other loop, updating the best candidate, is
90         // running "fast".
91         // assumption. If a user use distssh slower than five minutes it mean
92         // that a long running processes is used and the user wont interact
93         // with distssh for a while.
94         auto next = () {
95             if (clientBeat < 5.dur!"minutes")
96                 return 30.dur!"seconds";
97             else
98                 return 60.dur!"seconds";
99         }();
100 
101         ts.put(&updateOldestTimer, next);
102     }
103 
104     timers.put(&updateOldestTimer, Duration.zero);
105 
106     void updateLeastLoadedTimer(ref Timers ts) @trusted {
107         auto host = db.getLeastLoadedServer;
108         if (!host.isNull)
109             updateServer(db, host.get, fconf.global.timeout);
110 
111         // the times are arbitrarily chosen.
112         // assumption. The least loaded server will be getting jobs put on it
113         // not only from *this* instance of distssh but also from all other
114         // instances using the cluster. For this instance to be quick at moving
115         // job to another host it has to update the statistics often.
116         // assumption. A user that is using distssh less than 90s isn't using
117         // distssh interactively/in quick succession. By backing of/slowing
118         // down the update it lowers the network load.
119         auto next = () {
120             if (clientBeat < 30.dur!"seconds")
121                 return 10.dur!"seconds";
122             else if (clientBeat < 90.dur!"seconds")
123                 return 20.dur!"seconds";
124             else
125                 return 60.dur!"seconds";
126         }();
127 
128         ts.put(&updateLeastLoadedTimer, next);
129     }
130 
131     timers.put(&updateLeastLoadedTimer, Duration.zero);
132 
133     while (running && !timers.empty) {
134         try {
135             timers.tick(100.dur!"msecs");
136         } catch (SpinSqlTimeout e) {
137             // the database is removed or something else "bad" has happend that
138             // the database access has started throwing exceptions.
139             return 1;
140         }
141     }
142 
143     return 0;
144 }
145 
146 void startDaemon(ref from.miniorm.Miniorm db) nothrow {
147     import distssh.process : spawnDaemon;
148     import std.file : thisExePath;
149 
150     try {
151         db.clientBeat;
152         if (db.getDaemonBeat > heartBeatDaemonTimeout) {
153             db.purgeServers; // assuming the data is old
154             spawnDaemon([thisExePath, "daemon"]);
155             logger.trace("daemon spawned");
156         }
157     } catch (Exception e) {
158         logger.error(e.msg).collectException;
159     }
160 }
161 
162 private:
163 
164 immutable heartBeatDaemonTimeout = 60.dur!"seconds";
165 immutable heartBeatClientTimeout = 30.dur!"minutes";
166 immutable updateOldestInterval = 30.dur!"seconds";
167 
168 void initMetrics(ref from.miniorm.Miniorm db, const(Host)[] cluster, Duration timeout) nothrow {
169     import std.parallelism : TaskPool;
170 
171     static auto loadHost(T)(T host_timeout) nothrow {
172         import std.concurrency : thisTid;
173 
174         logger.trace("load testing thread id: ", thisTid).collectException;
175         return HostLoad(host_timeout[0], getLoad(host_timeout[0], host_timeout[1]));
176     }
177 
178     try {
179         auto shosts = cluster.map!(a => tuple(a, timeout)).array;
180 
181         auto pool = new TaskPool();
182         scope (exit)
183             pool.stop;
184 
185         foreach (v; pool.amap!(loadHost)(shosts)) {
186             db.newServer(v);
187         }
188     } catch (Exception e) {
189         logger.trace(e.msg).collectException;
190     }
191 }
192 
193 void updateServer(ref from.miniorm.Miniorm db, Host host, Duration timeout) {
194     auto load = getLoad(host, timeout);
195     distssh.database.updateServer(db, HostLoad(host, load));
196     logger.tracef("Update %s with %s", host, load).collectException;
197 }
198 
199 struct Inode {
200     ulong dev;
201     ulong ino;
202 
203     bool opEquals()(auto ref const typeof(this) s) const {
204         return dev == s.dev && ino == s.ino;
205     }
206 }
207 
208 Inode getInode(const string p) @trusted nothrow {
209     import core.sys.posix.sys.stat : stat_t, stat;
210     import std.file : isSymlink, exists;
211     import std..string : toStringz;
212 
213     const pz = p.toStringz;
214 
215     if (!exists(p)) {
216         return Inode(0, 0);
217     } else {
218         stat_t st = void;
219         // should NOT use lstat because we want to know even if the symlink is
220         // redirected etc.
221         stat(pz, &st);
222         return Inode(st.st_dev, st.st_ino);
223     }
224 }