1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module distssh.database;
7 
8 import logger = std.experimental.logger;
9 import std.algorithm : map, filter;
10 import std.array : array, empty;
11 import std.datetime;
12 import std.exception : collectException, ifThrown;
13 import std.meta : AliasSeq;
14 import std.typecons : Nullable, Tuple;
15 
16 import miniorm;
17 
18 import distssh.types;
19 
20 version (unittest) {
21     import unit_threaded.assertions;
22 }
23 
24 immutable timeout = 30.dur!"seconds";
25 enum SchemaVersion = 2;
26 
27 struct VersionTbl {
28     @ColumnName("version")
29     ulong version_;
30 }
31 
32 @TablePrimaryKey("address")
33 struct ServerTbl {
34     string address;
35     SysTime lastUpdate;
36     long accessTime;
37     double loadAvg;
38     bool unknown;
39 
40     // Last time used the entry in unix time. Assuming that it is always
41     // running locally.
42     long lastUse;
43 }
44 
45 /// The daemon beats ones per minute.
46 struct DaemonBeat {
47     ulong id;
48     SysTime beat;
49 }
50 
51 /// Clients beat each time they access the database.
52 struct ClientBeat {
53     ulong id;
54     SysTime beat;
55 }
56 
57 Miniorm openDatabase(Path dbFile) nothrow {
58     return openDatabase(dbFile.toString);
59 }
60 
61 Miniorm openDatabase(string dbFile) nothrow {
62     logger.trace("opening database ", dbFile).collectException;
63     while (true) {
64         try {
65             auto db = Miniorm(dbFile);
66             const schemaVersion = () {
67                 foreach (a; db.run(select!VersionTbl))
68                     return a;
69                 return VersionTbl(0);
70             }().ifThrown(VersionTbl(0));
71 
72             alias Schema = AliasSeq!(VersionTbl, ServerTbl, DaemonBeat, ClientBeat);
73 
74             if (schemaVersion.version_ < SchemaVersion) {
75                 db.begin;
76                 static foreach (tbl; Schema)
77                     db.run("DROP TABLE " ~ tbl.stringof).collectException;
78                 db.run(buildSchema!Schema);
79                 db.run(insert!VersionTbl, VersionTbl(SchemaVersion));
80                 db.commit;
81             }
82             return db;
83         } catch (Exception e) {
84             logger.tracef("Trying to open/create database %s: %s", dbFile, e.msg).collectException;
85         }
86 
87         rndSleep(25.dur!"msecs", 50);
88     }
89 }
90 
91 /** Get all servers.
92  *
93  * Waiting up to `timeout` for servers to be added. This handles the case where
94  * a daemon have been spawned in the background.
95  *
96  * Params:
97  *  db = database instance to read from
98  *  filterBy_ = only hosts that are among these are added to the online set
99  *  timeout = max time to wait for the `online` set to contain at least one host
100  *  maxAge = only hosts that have a status newer than this is added to online
101  */
102 Tuple!(HostLoad[], "online", HostLoad[], "unused") getServerLoads(ref Miniorm db,
103         const Host[] filterBy_, const Duration timeout, const Duration maxAge) @trusted {
104     import std.datetime : Clock, dur;
105     import my.set;
106 
107     const lastUseLimit = Clock.currTime - maxAge;
108     auto onlineHostSet = toSet(filterBy_.map!(a => a.payload));
109     bool filterBy(ServerTbl host) {
110         return host.address in onlineHostSet && host.lastUpdate > lastUseLimit;
111     }
112 
113     auto stopAt = Clock.currTime + timeout;
114     while (Clock.currTime < stopAt) {
115         typeof(return) rval;
116         foreach (a; spinSql!(() => db.run(select!ServerTbl), logger.trace)(timeout)) {
117             auto h = HostLoad(Host(a.address), Load(a.loadAvg,
118                     a.accessTime.dur!"msecs", a.unknown), a.lastUpdate);
119             if (!a.unknown && filterBy(a)) {
120                 rval.online ~= h;
121             } else {
122                 rval.unused ~= h;
123             }
124         }
125 
126         if (!rval.online.empty || filterBy_.empty)
127             return rval;
128     }
129 
130     return typeof(return).init;
131 }
132 
133 /** Sync the hosts in the database with those that the client expect to exist.
134  *
135  * The client may from one invocation to another change the cluster. Those in
136  * the database should in that case be updated.
137  */
138 void syncCluster(ref Miniorm db, const Host[] cluster) {
139     immutable highAccessTime = 1.dur!"minutes"
140         .total!"msecs";
141     immutable highLoadAvg = 9999.0;
142     immutable forceEarlyUpdate = Clock.currTime - 1.dur!"hours";
143 
144     auto stmt = spinSql!(() {
145         return db.prepare(`INSERT OR IGNORE INTO ServerTbl (address,lastUpdate,accessTime,loadAvg,unknown,lastUse) VALUES(:address, :lastUpdate, :accessTime, :loadAvg, :unknown, :lastUse)`);
146     }, logger.trace)(timeout);
147 
148     foreach (const h; cluster) {
149         spinSql!(() {
150             stmt.get.reset;
151             stmt.get.bind(":address", h.payload);
152             stmt.get.bind(":lastUpdate", forceEarlyUpdate.toSqliteDateTime);
153             stmt.get.bind(":accessTime", highAccessTime);
154             stmt.get.bind(":loadAvg", highLoadAvg);
155             stmt.get.bind(":unknown", true);
156             stmt.get.bind(":lastUse", Clock.currTime.toUnixTime);
157             stmt.get.execute;
158         }, logger.trace)(timeout);
159     }
160 }
161 
162 void updateLastUse(ref Miniorm db, const Host[] cluster) {
163     auto stmt = spinSql!(() {
164         return db.prepare(
165             `UPDATE OR IGNORE ServerTbl SET lastUse = :lastUse WHERE address = :address`);
166     }, logger.trace)(timeout);
167 
168     const lastUse = Clock.currTime.toUnixTime;
169 
170     foreach (const h; cluster) {
171         spinSql!(() {
172             stmt.get.reset;
173             stmt.get.bind(":address", h.payload);
174             stmt.get.bind(":lastUse", lastUse);
175             stmt.get.execute;
176         }, logger.trace)(timeout);
177     }
178 }
179 
180 /// Update the data for a server.
181 void newServer(ref Miniorm db, HostLoad a) {
182     spinSql!(() {
183         db.run(insertOrReplace!ServerTbl, ServerTbl(a.host.payload,
184             Clock.currTime, a.load.accessTime.total!"msecs", a.load.loadAvg,
185             a.load.unknown, Clock.currTime.toUnixTime));
186     }, logger.trace)(timeout, 100.dur!"msecs", 300.dur!"msecs");
187 }
188 
189 /// Update the data for a server.
190 void updateServer(ref Miniorm db, HostLoad a, SysTime updateTime = Clock.currTime) {
191     spinSql!(() {
192         // using IGNORE because the host could have been removed.
193         auto stmt = db.prepare(`UPDATE OR IGNORE ServerTbl SET lastUpdate = :lastUpdate, accessTime = :accessTime, loadAvg = :loadAvg, unknown = :unknown WHERE address = :address`);
194         stmt.get.bind(":address", a.host.payload);
195         stmt.get.bind(":lastUpdate", updateTime.toSqliteDateTime);
196         stmt.get.bind(":accessTime", a.load.accessTime.total!"msecs");
197         stmt.get.bind(":loadAvg", a.load.loadAvg);
198         stmt.get.bind(":unknown", a.load.unknown);
199         stmt.get.execute;
200     }, logger.trace)(timeout, 100.dur!"msecs", 300.dur!"msecs");
201 }
202 
203 /// Those that haven't been used for `unused` seconds.
204 void removeUnusedServers(ref Miniorm db, Duration unused) {
205     spinSql!(() {
206         auto stmt = db.prepare(`DELETE FROM ServerTbl WHERE lastUse < :lastUse`);
207         stmt.get.bind(":lastUse", Clock.currTime.toUnixTime - unused.total!"seconds");
208         stmt.get.execute();
209     }, logger.trace)(timeout);
210 }
211 
212 void daemonBeat(ref Miniorm db) {
213     spinSql!(() {
214         db.run(insertOrReplace!DaemonBeat, DaemonBeat(0, Clock.currTime));
215     }, logger.trace)(timeout);
216 }
217 
218 /// The heartbeat when daemon was last executed.
219 Duration getDaemonBeat(ref Miniorm db) {
220     auto d = spinSql!(() {
221         foreach (a; db.run(select!DaemonBeat.where("id = 0", null)))
222             return Clock.currTime - a.beat;
223         return Duration.max;
224     }, logger.trace)(timeout);
225 
226     // can happen if there is a "junk" value but it has to be a little bit
227     // robust against possible "jitter" thus accepting up to 1 minute "lag".
228     if (d < (-1).dur!"minutes") {
229         d = Duration.max;
230     } else if (d < Duration.zero) {
231         d = Duration.zero;
232     }
233     return d;
234 }
235 
236 void clientBeat(ref Miniorm db) {
237     spinSql!(() {
238         db.run(insertOrReplace!ClientBeat, ClientBeat(0, Clock.currTime));
239     }, logger.trace)(timeout);
240 }
241 
242 Duration getClientBeat(ref Miniorm db) {
243     auto d = spinSql!(() {
244         foreach (a; db.run(select!ClientBeat.where("id = 0", null)))
245             return Clock.currTime - a.beat;
246         return Duration.max;
247     }, logger.trace)(timeout);
248 
249     // can happen if there is a "junk" value but it has to be a little bit
250     // robust against possible "jitter" thus accepting up to 1 minute "lag".
251     if (d < (-1).dur!"minutes") {
252         d = Duration.max;
253     } else if (d < Duration.zero) {
254         d = Duration.zero;
255     }
256     return d;
257 }
258 
259 /// Returns: the server that have the oldest update timestamp.
260 Nullable!Host getOldestServer(ref Miniorm db) {
261     auto stmt = spinSql!(() {
262         return db.prepare(
263             `SELECT address FROM ServerTbl ORDER BY datetime(lastUpdate) ASC LIMIT 1`);
264     }, logger.trace)(timeout);
265 
266     return spinSql!(() {
267         foreach (a; stmt.get.execute) {
268             auto address = a.peek!string(0);
269             return Nullable!Host(Host(address));
270         }
271         return Nullable!Host.init;
272     }, logger.trace)(timeout);
273 }
274 
275 Host[] getLeastLoadedServer(ref Miniorm db) {
276     import std.format : format;
277 
278     auto stmt = spinSql!(() {
279         return db.prepare(
280             format!`SELECT address,lastUse,loadAvg FROM ServerTbl ORDER BY lastUse DESC, loadAvg ASC LIMIT %s`(
281             topCandidades));
282     }, logger.trace)(timeout);
283 
284     return spinSql!(() {
285         return stmt.get.execute.map!(a => Host(a.peek!string(0))).array;
286     }, logger.trace)(timeout);
287 }
288 
289 void purgeServers(ref Miniorm db) {
290     spinSql!(() { db.run("DELETE FROM ServerTbl"); })(timeout);
291 }
292 
293 /** Sleep for a random time that is min_ + rnd(0, span).
294  *
295  * Params:
296  *  span = unit is msecs.
297  */
298 private void rndSleep(Duration min_, ulong span) nothrow @trusted {
299     import core.thread : Thread;
300     import core.time : dur;
301     import std.random : uniform;
302 
303     auto t_span = () {
304         try {
305             return uniform(0, span).dur!"msecs";
306         } catch (Exception e) {
307         }
308         return span.dur!"msecs";
309     }();
310 
311     Thread.sleep(min_ + t_span);
312 }
313 
314 version (unittest) {
315     private struct UnittestDb {
316         string name;
317         Miniorm db;
318         Host[] hosts;
319 
320         alias db this;
321 
322         ~this() {
323             import std.file : remove;
324 
325             db.close;
326             remove(name);
327         }
328     }
329 
330     private UnittestDb makeUnittestDb(string file = __FILE__, uint line = __LINE__)() {
331         import std.format : format;
332         import std.path : baseName;
333 
334         immutable dbFname = format!"%s_%s.sqlite3"(file.baseName, line);
335         return UnittestDb(dbFname, openDatabase(dbFname));
336     }
337 
338     private void populate(ref UnittestDb db, uint nr) {
339         import std.format : format;
340         import std.range;
341         import std.algorithm;
342 
343         db.hosts = iota(0, nr).map!(a => Host(format!"h%s"(a))).array;
344         syncCluster(db, db.hosts);
345     }
346 
347     private void fejkLoad(ref UnittestDb db, double start, double step) {
348         foreach (a; db.hosts) {
349             updateServer(db, HostLoad(a, Load(start, 50.dur!"msecs", false)));
350             start += step;
351         }
352     }
353 }
354 
355 @("shall filter out all servers with an unknown status")
356 unittest {
357     auto db = makeUnittestDb();
358     populate(db, 10);
359     auto res = getServerLoads(db, db.hosts[0 .. $ / 2], 1.dur!"seconds", 10.dur!"minutes");
360     res.online.length.shouldEqual(0);
361 }
362 
363 @("shall split the returned hosts by the host set when retrieving the load")
364 unittest {
365     auto db = makeUnittestDb();
366     populate(db, 10);
367     fejkLoad(db, 0.1, 0.3);
368 
369     auto res = getServerLoads(db, db.hosts[0 .. $ / 2], 1.dur!"seconds", 10.dur!"minutes");
370     res.online.length.shouldEqual(5);
371     res.online.map!(a => a.host.payload).array.shouldEqual([
372             "h0", "h1", "h2", "h3", "h4"
373             ]);
374     res.unused.length.shouldEqual(5);
375 }
376 
377 @("shall put hosts with a too old status update in the unused set when splitting")
378 unittest {
379     auto db = makeUnittestDb();
380     populate(db, 10);
381     fejkLoad(db, 0.1, 0.3);
382     updateServer(db, HostLoad(Host("h3"), Load(0.5, 50.dur!"msecs", false)),
383             Clock.currTime - 11.dur!"minutes");
384 
385     auto res = getServerLoads(db, db.hosts[0 .. $ / 2], 1.dur!"seconds", 10.dur!"minutes");
386     res.online.length.shouldEqual(4);
387     res.online.map!(a => a.host.payload).array.shouldEqual([
388             "h0", "h1", "h2", "h4"
389             ]);
390     res.unused.length.shouldEqual(6);
391 }