1 /**
2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 */
6 module distssh.database;
7 
8 import logger = std.experimental.logger;
9 import std.algorithm : map, filter;
10 import std.array : array, empty;
11 import std.datetime;
12 import std.exception : collectException, ifThrown;
13 import std.meta : AliasSeq;
14 import std.typecons : Nullable, Tuple;
15 
16 import miniorm;
17 
18 import distssh.types;
19 
20 immutable timeout = 30.dur!"seconds";
21 enum SchemaVersion = 1;
22 
23 struct VersionTbl {
24     @ColumnName("version")
25     ulong version_;
26 }
27 
28 @TablePrimaryKey("address")
29 struct ServerTbl {
30     string address;
31     SysTime lastUpdate;
32     long accessTime;
33     double loadAvg;
34     bool unknown;
35 }
36 
37 /// The daemon beats ones per minute.
38 struct DaemonBeat {
39     ulong id;
40     SysTime beat;
41 }
42 
43 /// Clients beat each time they access the database.
44 struct ClientBeat {
45     ulong id;
46     SysTime beat;
47 }
48 
49 Miniorm openDatabase(string dbFile) nothrow {
50     while (true) {
51         try {
52             auto db = Miniorm(dbFile);
53             const schemaVersion = () {
54                 foreach (a; db.run(select!VersionTbl))
55                     return a;
56                 return VersionTbl(0);
57             }().ifThrown(VersionTbl(0));
58 
59             alias Schema = AliasSeq!(VersionTbl, ServerTbl, DaemonBeat, ClientBeat);
60 
61             if (schemaVersion.version_ < SchemaVersion) {
62                 db.begin;
63                 static foreach (tbl; Schema)
64                     db.run("DROP TABLE " ~ tbl.stringof).collectException;
65                 db.run(buildSchema!Schema);
66                 db.run(insert!VersionTbl, VersionTbl(SchemaVersion));
67                 db.commit;
68             }
69             return db;
70         } catch (Exception e) {
71             logger.tracef("Trying to open/create database %s: %s", dbFile, e.msg).collectException;
72         }
73 
74         rndSleep(25.dur!"msecs", 50);
75     }
76 }
77 
78 /** Get all servers.
79  *
80  * Waiting for up to 10s for servers to be added. This handles the case where a
81  * daemon have been spawned in the background.
82  */
83 Tuple!(HostLoad[], "online", Host[], "unused") getServerLoads(ref Miniorm db, const Host[] filterBy_) nothrow {
84     import std.datetime : Clock, dur;
85     import distssh.set;
86 
87     auto getData() {
88         return db.run(select!ServerTbl).map!(a => HostLoad(Host(a.address),
89                 Load(a.loadAvg, a.accessTime.dur!"msecs", a.unknown))).array;
90     }
91 
92     auto filterBy = toSet(filterBy_.map!(a => a.payload));
93 
94     try {
95         auto stopAt = Clock.currTime + timeout;
96         while (Clock.currTime < stopAt) {
97             typeof(return) rval;
98             foreach (h; spinSql!(getData, logger.trace)(timeout)) {
99                 if (filterBy.contains(h[0].payload))
100                     rval.online ~= h;
101                 else
102                     rval.unused ~= h[0];
103             }
104 
105             if (!rval.online.empty)
106                 return rval;
107         }
108     } catch (Exception e) {
109         logger.warning("Failed reading from the database: ", e.msg).collectException;
110     }
111 
112     return typeof(return).init;
113 }
114 
115 /** Sync the hosts in the database with those that the client expect to exist.
116  *
117  * The client may from one invocation to another change the cluster. Those in
118  * the database should in that case be updated.
119  */
120 void syncCluster(ref Miniorm db, const Host[] cluster) {
121     immutable highAccessTime = 1.dur!"minutes"
122         .total!"msecs";
123     immutable highLoadAvg = 9999.0;
124     immutable forceEarlyUpdate = Clock.currTime - 1.dur!"hours";
125 
126     auto stmt = spinSql!(() {
127         return db.prepare(`INSERT OR IGNORE INTO ServerTbl (address,lastUpdate,accessTime,loadAvg,unknown) VALUES(:address, :lastUpdate, :accessTime, :loadAvg, :unknown)`);
128     }, logger.trace)(timeout);
129 
130     foreach (const h; cluster) {
131         spinSql!(() {
132             stmt.reset;
133             stmt.bind(":address", h.payload);
134             stmt.bind(":lastUpdate", forceEarlyUpdate.toSqliteDateTime);
135             stmt.bind(":accessTime", highAccessTime);
136             stmt.bind(":loadAvg", highLoadAvg);
137             stmt.bind(":unknown", true);
138             stmt.execute;
139         }, logger.trace)(timeout);
140     }
141 }
142 
143 /// Update the data for a server.
144 void newServer(ref Miniorm db, HostLoad a) {
145     spinSql!(() {
146         db.run(insertOrReplace!ServerTbl, ServerTbl(a[0].payload,
147             Clock.currTime, a[1].accessTime.total!"msecs", a[1].loadAvg, a[1].unknown));
148     }, logger.trace)(timeout, 100.dur!"msecs", 300.dur!"msecs");
149 }
150 
151 /// Update the data for a server.
152 void updateServer(ref Miniorm db, HostLoad a) {
153     spinSql!(() {
154         // using IGNORE because the host could have been removed.
155         auto stmt = db.prepare(`UPDATE OR IGNORE ServerTbl SET lastUpdate = :lastUpdate, accessTime = :accessTime, loadAvg = :loadAvg, unknown = :unknown WHERE address = :address`);
156         stmt.bind(":address", a[0].payload);
157         stmt.bind(":lastUpdate", Clock.currTime.toSqliteDateTime);
158         stmt.bind(":accessTime", a[1].accessTime.total!"msecs");
159         stmt.bind(":loadAvg", a[1].loadAvg);
160         stmt.bind(":unknown", a[1].unknown);
161         stmt.execute;
162     })(timeout, 100.dur!"msecs", 300.dur!"msecs");
163 }
164 
165 void removeUnusedServers(ref Miniorm db, Host[] hosts) {
166     if (hosts.empty)
167         return;
168 
169     auto stmt = spinSql!(() {
170         return db.prepare(`DELETE FROM ServerTbl WHERE address = :address`);
171     }, logger.trace)(timeout);
172 
173     foreach (h; hosts) {
174         spinSql!(() {
175             stmt.reset;
176             stmt.bind(":address", h.payload);
177             stmt.execute;
178         }, logger.trace)(timeout);
179     }
180 }
181 
182 void daemonBeat(ref Miniorm db) {
183     spinSql!(() {
184         db.run(insertOrReplace!DaemonBeat, DaemonBeat(0, Clock.currTime));
185     }, logger.trace)(timeout);
186 }
187 
188 /// The heartbeat when daemon was last executed.
189 Duration getDaemonBeat(ref Miniorm db) {
190     return spinSql!(() {
191         foreach (a; db.run(select!DaemonBeat.where("id =", 0)))
192             return Clock.currTime - a.beat;
193         return Duration.max;
194     }, logger.trace)(timeout);
195 }
196 
197 void clientBeat(ref Miniorm db) {
198     spinSql!(() {
199         db.run(insertOrReplace!ClientBeat, ClientBeat(0, Clock.currTime));
200     }, logger.trace)(timeout);
201 }
202 
203 Duration getClientBeat(ref Miniorm db) {
204     return spinSql!(() {
205         foreach (a; db.run(select!ClientBeat.where("id =", 0)))
206             return Clock.currTime - a.beat;
207         return Duration.max;
208     }, logger.trace)(timeout);
209 }
210 
211 /// Returns: the server that have the oldest update timestamp.
212 Nullable!Host getOldestServer(ref Miniorm db) {
213     auto stmt = spinSql!(() {
214         return db.prepare(
215             `SELECT address FROM ServerTbl ORDER BY datetime(lastUpdate) ASC LIMIT 1`);
216     }, logger.trace)(timeout);
217 
218     return spinSql!(() {
219         foreach (a; stmt.execute) {
220             auto address = a.peek!string(0);
221             return Nullable!Host(Host(address));
222         }
223         return Nullable!Host.init;
224     }, logger.trace)(timeout);
225 }
226 
227 Nullable!Host getLeastLoadedServer(ref Miniorm db) {
228     auto stmt = spinSql!(() {
229         return db.prepare(`SELECT address FROM ServerTbl ORDER BY loadAvg ASC LIMIT 1`);
230     }, logger.trace)(timeout);
231 
232     return spinSql!(() {
233         foreach (a; stmt.execute) {
234             auto address = a.peek!string(0);
235             return Nullable!Host(Host(address));
236         }
237         return Nullable!Host.init;
238     }, logger.trace)(timeout);
239 }
240 
241 void purgeServers(ref Miniorm db) {
242     spinSql!(() { db.run("DELETE FROM ServerTbl"); })(timeout);
243 }