1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module distssh.database; 7 8 import logger = std.experimental.logger; 9 import std.algorithm : map, filter; 10 import std.array : array, empty; 11 import std.datetime; 12 import std.exception : collectException, ifThrown; 13 import std.meta : AliasSeq; 14 import std.typecons : Nullable, Tuple; 15 16 import miniorm; 17 18 import distssh.types; 19 20 version (unittest) { 21 import unit_threaded.assertions; 22 } 23 24 immutable timeout = 30.dur!"seconds"; 25 enum SchemaVersion = 2; 26 27 struct VersionTbl { 28 @ColumnName("version") 29 ulong version_; 30 } 31 32 @TablePrimaryKey("address") 33 struct ServerTbl { 34 string address; 35 SysTime lastUpdate; 36 long accessTime; 37 double loadAvg; 38 bool unknown; 39 40 // Last time used the entry in unix time. Assuming that it is always 41 // running locally. 42 long lastUse; 43 } 44 45 /// The daemon beats ones per minute. 46 struct DaemonBeat { 47 ulong id; 48 SysTime beat; 49 } 50 51 /// Clients beat each time they access the database. 52 struct ClientBeat { 53 ulong id; 54 SysTime beat; 55 } 56 57 Miniorm openDatabase(Path dbFile) nothrow { 58 return openDatabase(dbFile.toString); 59 } 60 61 Miniorm openDatabase(string dbFile) nothrow { 62 logger.trace("opening database ", dbFile).collectException; 63 while (true) { 64 try { 65 auto db = Miniorm(dbFile); 66 const schemaVersion = () { 67 foreach (a; db.run(select!VersionTbl)) 68 return a; 69 return VersionTbl(0); 70 }().ifThrown(VersionTbl(0)); 71 72 alias Schema = AliasSeq!(VersionTbl, ServerTbl, DaemonBeat, ClientBeat); 73 74 if (schemaVersion.version_ < SchemaVersion) { 75 db.begin; 76 static foreach (tbl; Schema) 77 db.run("DROP TABLE " ~ tbl.stringof).collectException; 78 db.run(buildSchema!Schema); 79 db.run(insert!VersionTbl, VersionTbl(SchemaVersion)); 80 db.commit; 81 } 82 return db; 83 } catch (Exception e) { 84 logger.tracef("Trying to open/create database %s: %s", dbFile, e.msg).collectException; 85 } 86 87 rndSleep(25.dur!"msecs", 50); 88 } 89 } 90 91 /** Get all servers. 92 * 93 * Waiting up to `timeout` for servers to be added. This handles the case where 94 * a daemon have been spawned in the background. 95 * 96 * Params: 97 * db = database instance to read from 98 * filterBy_ = only hosts that are among these are added to the online set 99 * timeout = max time to wait for the `online` set to contain at least one host 100 * maxAge = only hosts that have a status newer than this is added to online 101 */ 102 Tuple!(HostLoad[], "online", HostLoad[], "unused") getServerLoads(ref Miniorm db, 103 const Host[] filterBy_, const Duration timeout, const Duration maxAge) @trusted { 104 import std.datetime : Clock, dur; 105 import my.set; 106 107 const lastUseLimit = Clock.currTime - maxAge; 108 auto onlineHostSet = toSet(filterBy_.map!(a => a.get)); 109 bool filterBy(ServerTbl host) { 110 return host.address in onlineHostSet && host.lastUpdate > lastUseLimit; 111 } 112 113 auto stopAt = Clock.currTime + timeout; 114 while (Clock.currTime < stopAt) { 115 typeof(return) rval; 116 foreach (a; spinSql!(() => db.run(select!ServerTbl), logger.trace)(timeout)) { 117 auto h = HostLoad(Host(a.address), Load(a.loadAvg, 118 a.accessTime.dur!"msecs", a.unknown), a.lastUpdate); 119 if (!a.unknown && filterBy(a)) { 120 rval.online ~= h; 121 } else { 122 rval.unused ~= h; 123 } 124 } 125 126 if (!rval.online.empty || filterBy_.empty) 127 return rval; 128 } 129 130 return typeof(return).init; 131 } 132 133 /** Sync the hosts in the database with those that the client expect to exist. 134 * 135 * The client may from one invocation to another change the cluster. Those in 136 * the database should in that case be updated. 137 */ 138 void syncCluster(ref Miniorm db, const Host[] cluster) { 139 immutable highAccessTime = 1.dur!"minutes" 140 .total!"msecs"; 141 immutable highLoadAvg = 9999.0; 142 immutable forceEarlyUpdate = Clock.currTime - 1.dur!"hours"; 143 144 auto stmt = spinSql!(() { 145 return db.prepare(`INSERT OR IGNORE INTO ServerTbl (address,lastUpdate,accessTime,loadAvg,unknown,lastUse) VALUES(:address, :lastUpdate, :accessTime, :loadAvg, :unknown, :lastUse)`); 146 }, logger.trace)(timeout); 147 148 foreach (const h; cluster) { 149 spinSql!(() { 150 stmt.get.reset; 151 stmt.get.bind(":address", h.get); 152 stmt.get.bind(":lastUpdate", forceEarlyUpdate.toSqliteDateTime); 153 stmt.get.bind(":accessTime", highAccessTime); 154 stmt.get.bind(":loadAvg", highLoadAvg); 155 stmt.get.bind(":unknown", true); 156 stmt.get.bind(":lastUse", Clock.currTime.toUnixTime); 157 stmt.get.execute; 158 }, logger.trace)(timeout); 159 } 160 } 161 162 void updateLastUse(ref Miniorm db, const Host[] cluster) { 163 auto stmt = spinSql!(() { 164 return db.prepare( 165 `UPDATE OR IGNORE ServerTbl SET lastUse = :lastUse WHERE address = :address`); 166 }, logger.trace)(timeout); 167 168 const lastUse = Clock.currTime.toUnixTime; 169 170 foreach (const h; cluster) { 171 spinSql!(() { 172 stmt.get.reset; 173 stmt.get.bind(":address", h.get); 174 stmt.get.bind(":lastUse", lastUse); 175 stmt.get.execute; 176 }, logger.trace)(timeout); 177 } 178 } 179 180 /// Update the data for a server. 181 void newServer(ref Miniorm db, HostLoad a) { 182 spinSql!(() { 183 db.run(insertOrReplace!ServerTbl, ServerTbl(a.host, Clock.currTime, 184 a.load.accessTime.total!"msecs", a.load.loadAvg, a.load.unknown, 185 Clock.currTime.toUnixTime)); 186 }, logger.trace)(timeout, 100.dur!"msecs", 300.dur!"msecs"); 187 } 188 189 /// Update the data for a server. 190 void updateServer(ref Miniorm db, HostLoad a, SysTime updateTime = Clock.currTime) { 191 spinSql!(() { 192 // using IGNORE because the host could have been removed. 193 auto stmt = db.prepare(`UPDATE OR IGNORE ServerTbl SET lastUpdate = :lastUpdate, accessTime = :accessTime, loadAvg = :loadAvg, unknown = :unknown WHERE address = :address`); 194 stmt.get.bind(":address", a.host.get); 195 stmt.get.bind(":lastUpdate", updateTime.toSqliteDateTime); 196 stmt.get.bind(":accessTime", a.load.accessTime.total!"msecs"); 197 stmt.get.bind(":loadAvg", a.load.loadAvg); 198 stmt.get.bind(":unknown", a.load.unknown); 199 stmt.get.execute; 200 }, logger.trace)(timeout, 100.dur!"msecs", 300.dur!"msecs"); 201 } 202 203 /// Those that haven't been used for `unused` seconds. 204 void removeUnusedServers(ref Miniorm db, Duration unused) { 205 spinSql!(() { 206 auto stmt = db.prepare(`DELETE FROM ServerTbl WHERE lastUse < :lastUse`); 207 stmt.get.bind(":lastUse", Clock.currTime.toUnixTime - unused.total!"seconds"); 208 stmt.get.execute(); 209 }, logger.trace)(timeout); 210 } 211 212 void daemonBeat(ref Miniorm db) { 213 spinSql!(() { 214 db.run(insertOrReplace!DaemonBeat, DaemonBeat(0, Clock.currTime)); 215 }, logger.trace)(timeout); 216 } 217 218 SysTime getDaemonBeatClock(ref Miniorm db) { 219 return spinSql!(() { 220 foreach (a; db.run(select!DaemonBeat.where("id = 0", null))) { 221 return a.beat; 222 } 223 return Clock.currTime; 224 }, logger.trace)(timeout); 225 } 226 227 /// The heartbeat when daemon was last executed. 228 Duration getDaemonBeat(ref Miniorm db) { 229 auto d = spinSql!(() { 230 foreach (a; db.run(select!DaemonBeat.where("id = 0", null))) { 231 return Clock.currTime - a.beat; 232 } 233 return Duration.max; 234 }, logger.trace)(timeout); 235 236 // can happen if there is a "junk" value but it has to be a little bit 237 // robust against possible "jitter" thus accepting up to 1 minute "lag". 238 if (d < (-1).dur!"minutes") { 239 d = Duration.max; 240 } else if (d < Duration.zero) { 241 d = Duration.zero; 242 } 243 return d; 244 } 245 246 void clientBeat(ref Miniorm db) { 247 spinSql!(() { 248 db.run(insertOrReplace!ClientBeat, ClientBeat(0, Clock.currTime)); 249 }, logger.trace)(timeout); 250 } 251 252 Duration getClientBeat(ref Miniorm db) { 253 auto d = spinSql!(() { 254 foreach (a; db.run(select!ClientBeat.where("id = 0", null))) 255 return Clock.currTime - a.beat; 256 return Duration.max; 257 }, logger.trace)(timeout); 258 259 // can happen if there is a "junk" value but it has to be a little bit 260 // robust against possible "jitter" thus accepting up to 1 minute "lag". 261 if (d < (-1).dur!"minutes") { 262 d = Duration.max; 263 } else if (d < Duration.zero) { 264 d = Duration.zero; 265 } 266 return d; 267 } 268 269 /// Returns: the server that have the oldest update timestamp. 270 Nullable!Host getOldestServer(ref Miniorm db) { 271 auto stmt = spinSql!(() { 272 return db.prepare( 273 `SELECT address FROM ServerTbl ORDER BY datetime(lastUpdate) ASC LIMIT 1`); 274 }, logger.trace)(timeout); 275 276 return spinSql!(() { 277 foreach (a; stmt.get.execute) { 278 auto address = a.peek!string(0); 279 return Nullable!Host(Host(address)); 280 } 281 return Nullable!Host.init; 282 }, logger.trace)(timeout); 283 } 284 285 Host[] getLeastLoadedServer(ref Miniorm db) { 286 import std.format : format; 287 288 auto stmt = spinSql!(() { 289 return db.prepare( 290 format!`SELECT address,lastUse,loadAvg FROM ServerTbl ORDER BY lastUse DESC, loadAvg ASC LIMIT %s`( 291 topCandidades)); 292 }, logger.trace)(timeout); 293 294 return spinSql!(() { 295 return stmt.get.execute.map!(a => Host(a.peek!string(0))).array; 296 }, logger.trace)(timeout); 297 } 298 299 void purgeServers(ref Miniorm db) { 300 spinSql!(() { db.run("DELETE FROM ServerTbl"); })(timeout); 301 } 302 303 /** Sleep for a random time that is min_ + rnd(0, span). 304 * 305 * Params: 306 * span = unit is msecs. 307 */ 308 private void rndSleep(Duration min_, ulong span) nothrow @trusted { 309 import core.thread : Thread; 310 import core.time : dur; 311 import std.random : uniform; 312 313 auto t_span = () { 314 try { 315 return uniform(0, span).dur!"msecs"; 316 } catch (Exception e) { 317 } 318 return span.dur!"msecs"; 319 }(); 320 321 Thread.sleep(min_ + t_span); 322 } 323 324 version (unittest) { 325 private struct UnittestDb { 326 string name; 327 Miniorm db; 328 Host[] hosts; 329 330 alias db this; 331 332 ~this() { 333 import std.file : remove; 334 335 db.close; 336 remove(name); 337 } 338 } 339 340 private UnittestDb makeUnittestDb(string file = __FILE__, uint line = __LINE__)() { 341 import std.format : format; 342 import std.path : baseName; 343 344 immutable dbFname = format!"%s_%s.sqlite3"(file.baseName, line); 345 return UnittestDb(dbFname, openDatabase(dbFname)); 346 } 347 348 private void populate(ref UnittestDb db, uint nr) { 349 import std.format : format; 350 import std.range; 351 import std.algorithm; 352 353 db.hosts = iota(0, nr).map!(a => Host(format!"h%s"(a))).array; 354 syncCluster(db, db.hosts); 355 } 356 357 private void fejkLoad(ref UnittestDb db, double start, double step) { 358 foreach (a; db.hosts) { 359 updateServer(db, HostLoad(a, Load(start, 50.dur!"msecs", false))); 360 start += step; 361 } 362 } 363 } 364 365 @("shall filter out all servers with an unknown status") 366 unittest { 367 auto db = makeUnittestDb(); 368 populate(db, 10); 369 auto res = getServerLoads(db, db.hosts[0 .. $ / 2], 1.dur!"seconds", 10.dur!"minutes"); 370 res.online.length.shouldEqual(0); 371 } 372 373 @("shall split the returned hosts by the host set when retrieving the load") 374 unittest { 375 auto db = makeUnittestDb(); 376 populate(db, 10); 377 fejkLoad(db, 0.1, 0.3); 378 379 auto res = getServerLoads(db, db.hosts[0 .. $ / 2], 1.dur!"seconds", 10.dur!"minutes"); 380 res.online.length.shouldEqual(5); 381 res.online.map!(a => a.host).array.shouldEqual([ 382 "h0", "h1", "h2", "h3", "h4" 383 ]); 384 res.unused.length.shouldEqual(5); 385 } 386 387 @("shall put hosts with a too old status update in the unused set when splitting") 388 unittest { 389 auto db = makeUnittestDb(); 390 populate(db, 10); 391 fejkLoad(db, 0.1, 0.3); 392 updateServer(db, HostLoad(Host("h3"), Load(0.5, 50.dur!"msecs", false)), 393 Clock.currTime - 11.dur!"minutes"); 394 395 auto res = getServerLoads(db, db.hosts[0 .. $ / 2], 1.dur!"seconds", 10.dur!"minutes"); 396 res.online.length.shouldEqual(4); 397 res.online.map!(a => a.host).array.shouldEqual(["h0", "h1", "h2", "h4"]); 398 res.unused.length.shouldEqual(6); 399 }