1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module distssh.database; 7 8 import logger = std.experimental.logger; 9 import std.algorithm : map, filter; 10 import std.array : array, empty; 11 import std.datetime; 12 import std.exception : collectException, ifThrown; 13 import std.meta : AliasSeq; 14 import std.typecons : Nullable, Tuple; 15 16 import miniorm; 17 18 import distssh.types; 19 20 version (unittest) { 21 import unit_threaded.assertions; 22 } 23 24 immutable timeout = 30.dur!"seconds"; 25 enum SchemaVersion = 2; 26 27 struct VersionTbl { 28 @ColumnName("version") 29 ulong version_; 30 } 31 32 @TablePrimaryKey("address") 33 struct ServerTbl { 34 string address; 35 SysTime lastUpdate; 36 long accessTime; 37 double loadAvg; 38 bool unknown; 39 40 // Last time used the entry in unix time. Assuming that it is always 41 // running locally. 42 long lastUse; 43 } 44 45 /// The daemon beats ones per minute. 46 struct DaemonBeat { 47 ulong id; 48 SysTime beat; 49 } 50 51 /// Clients beat each time they access the database. 52 struct ClientBeat { 53 ulong id; 54 SysTime beat; 55 } 56 57 Miniorm openDatabase(Path dbFile) nothrow { 58 return openDatabase(dbFile.toString); 59 } 60 61 Miniorm openDatabase(string dbFile) nothrow { 62 logger.trace("opening database ", dbFile).collectException; 63 while (true) { 64 try { 65 auto db = Miniorm(dbFile); 66 const schemaVersion = () { 67 foreach (a; db.run(select!VersionTbl)) 68 return a; 69 return VersionTbl(0); 70 }().ifThrown(VersionTbl(0)); 71 72 alias Schema = AliasSeq!(VersionTbl, ServerTbl, DaemonBeat, ClientBeat); 73 74 if (schemaVersion.version_ < SchemaVersion) { 75 db.begin; 76 static foreach (tbl; Schema) 77 db.run("DROP TABLE " ~ tbl.stringof).collectException; 78 db.run(buildSchema!Schema); 79 db.run(insert!VersionTbl, VersionTbl(SchemaVersion)); 80 db.commit; 81 } 82 return db; 83 } catch (Exception e) { 84 logger.tracef("Trying to open/create database %s: %s", dbFile, e.msg).collectException; 85 } 86 87 rndSleep(25.dur!"msecs", 50); 88 } 89 } 90 91 /** Get all servers. 92 * 93 * Waiting up to `timeout` for servers to be added. This handles the case where 94 * a daemon have been spawned in the background. 95 * 96 * Params: 97 * db = database instance to read from 98 * filterBy_ = only hosts that are among these are added to the online set 99 * timeout = max time to wait for the `online` set to contain at least one host 100 * maxAge = only hosts that have a status newer than this is added to online 101 */ 102 Tuple!(HostLoad[], "online", HostLoad[], "unused") getServerLoads(ref Miniorm db, 103 const Host[] filterBy_, const Duration timeout, const Duration maxAge) @trusted { 104 import std.datetime : Clock, dur; 105 import my.set; 106 107 const lastUseLimit = Clock.currTime - maxAge; 108 auto onlineHostSet = toSet(filterBy_.map!(a => a.payload)); 109 bool filterBy(ServerTbl host) { 110 return host.address in onlineHostSet && host.lastUpdate > lastUseLimit; 111 } 112 113 auto stopAt = Clock.currTime + timeout; 114 while (Clock.currTime < stopAt) { 115 typeof(return) rval; 116 foreach (a; spinSql!(() => db.run(select!ServerTbl), logger.trace)(timeout)) { 117 auto h = HostLoad(Host(a.address), Load(a.loadAvg, 118 a.accessTime.dur!"msecs", a.unknown), a.lastUpdate); 119 if (!a.unknown && filterBy(a)) { 120 rval.online ~= h; 121 } else { 122 rval.unused ~= h; 123 } 124 } 125 126 if (!rval.online.empty || filterBy_.empty) 127 return rval; 128 } 129 130 return typeof(return).init; 131 } 132 133 /** Sync the hosts in the database with those that the client expect to exist. 134 * 135 * The client may from one invocation to another change the cluster. Those in 136 * the database should in that case be updated. 137 */ 138 void syncCluster(ref Miniorm db, const Host[] cluster) { 139 immutable highAccessTime = 1.dur!"minutes" 140 .total!"msecs"; 141 immutable highLoadAvg = 9999.0; 142 immutable forceEarlyUpdate = Clock.currTime - 1.dur!"hours"; 143 144 auto stmt = spinSql!(() { 145 return db.prepare(`INSERT OR IGNORE INTO ServerTbl (address,lastUpdate,accessTime,loadAvg,unknown,lastUse) VALUES(:address, :lastUpdate, :accessTime, :loadAvg, :unknown, :lastUse)`); 146 }, logger.trace)(timeout); 147 148 foreach (const h; cluster) { 149 spinSql!(() { 150 stmt.get.reset; 151 stmt.get.bind(":address", h.payload); 152 stmt.get.bind(":lastUpdate", forceEarlyUpdate.toSqliteDateTime); 153 stmt.get.bind(":accessTime", highAccessTime); 154 stmt.get.bind(":loadAvg", highLoadAvg); 155 stmt.get.bind(":unknown", true); 156 stmt.get.bind(":lastUse", Clock.currTime.toUnixTime); 157 stmt.get.execute; 158 }, logger.trace)(timeout); 159 } 160 } 161 162 void updateLastUse(ref Miniorm db, const Host[] cluster) { 163 auto stmt = spinSql!(() { 164 return db.prepare( 165 `UPDATE OR IGNORE ServerTbl SET lastUse = :lastUse WHERE address = :address`); 166 }, logger.trace)(timeout); 167 168 const lastUse = Clock.currTime.toUnixTime; 169 170 foreach (const h; cluster) { 171 spinSql!(() { 172 stmt.get.reset; 173 stmt.get.bind(":address", h.payload); 174 stmt.get.bind(":lastUse", lastUse); 175 stmt.get.execute; 176 }, logger.trace)(timeout); 177 } 178 } 179 180 /// Update the data for a server. 181 void newServer(ref Miniorm db, HostLoad a) { 182 spinSql!(() { 183 db.run(insertOrReplace!ServerTbl, ServerTbl(a.host.payload, 184 Clock.currTime, a.load.accessTime.total!"msecs", a.load.loadAvg, 185 a.load.unknown, Clock.currTime.toUnixTime)); 186 }, logger.trace)(timeout, 100.dur!"msecs", 300.dur!"msecs"); 187 } 188 189 /// Update the data for a server. 190 void updateServer(ref Miniorm db, HostLoad a, SysTime updateTime = Clock.currTime) { 191 spinSql!(() { 192 // using IGNORE because the host could have been removed. 193 auto stmt = db.prepare(`UPDATE OR IGNORE ServerTbl SET lastUpdate = :lastUpdate, accessTime = :accessTime, loadAvg = :loadAvg, unknown = :unknown WHERE address = :address`); 194 stmt.get.bind(":address", a.host.payload); 195 stmt.get.bind(":lastUpdate", updateTime.toSqliteDateTime); 196 stmt.get.bind(":accessTime", a.load.accessTime.total!"msecs"); 197 stmt.get.bind(":loadAvg", a.load.loadAvg); 198 stmt.get.bind(":unknown", a.load.unknown); 199 stmt.get.execute; 200 }, logger.trace)(timeout, 100.dur!"msecs", 300.dur!"msecs"); 201 } 202 203 /// Those that haven't been used for `unused` seconds. 204 void removeUnusedServers(ref Miniorm db, Duration unused) { 205 spinSql!(() { 206 auto stmt = db.prepare(`DELETE FROM ServerTbl WHERE lastUse < :lastUse`); 207 stmt.get.bind(":lastUse", Clock.currTime.toUnixTime - unused.total!"seconds"); 208 stmt.get.execute(); 209 }, logger.trace)(timeout); 210 } 211 212 void daemonBeat(ref Miniorm db) { 213 spinSql!(() { 214 db.run(insertOrReplace!DaemonBeat, DaemonBeat(0, Clock.currTime)); 215 }, logger.trace)(timeout); 216 } 217 218 /// The heartbeat when daemon was last executed. 219 Duration getDaemonBeat(ref Miniorm db) { 220 auto d = spinSql!(() { 221 foreach (a; db.run(select!DaemonBeat.where("id = 0", null))) 222 return Clock.currTime - a.beat; 223 return Duration.max; 224 }, logger.trace)(timeout); 225 226 // can happen if there is a "junk" value but it has to be a little bit 227 // robust against possible "jitter" thus accepting up to 1 minute "lag". 228 if (d < (-1).dur!"minutes") { 229 d = Duration.max; 230 } else if (d < Duration.zero) { 231 d = Duration.zero; 232 } 233 return d; 234 } 235 236 void clientBeat(ref Miniorm db) { 237 spinSql!(() { 238 db.run(insertOrReplace!ClientBeat, ClientBeat(0, Clock.currTime)); 239 }, logger.trace)(timeout); 240 } 241 242 Duration getClientBeat(ref Miniorm db) { 243 auto d = spinSql!(() { 244 foreach (a; db.run(select!ClientBeat.where("id = 0", null))) 245 return Clock.currTime - a.beat; 246 return Duration.max; 247 }, logger.trace)(timeout); 248 249 // can happen if there is a "junk" value but it has to be a little bit 250 // robust against possible "jitter" thus accepting up to 1 minute "lag". 251 if (d < (-1).dur!"minutes") { 252 d = Duration.max; 253 } else if (d < Duration.zero) { 254 d = Duration.zero; 255 } 256 return d; 257 } 258 259 /// Returns: the server that have the oldest update timestamp. 260 Nullable!Host getOldestServer(ref Miniorm db) { 261 auto stmt = spinSql!(() { 262 return db.prepare( 263 `SELECT address FROM ServerTbl ORDER BY datetime(lastUpdate) ASC LIMIT 1`); 264 }, logger.trace)(timeout); 265 266 return spinSql!(() { 267 foreach (a; stmt.get.execute) { 268 auto address = a.peek!string(0); 269 return Nullable!Host(Host(address)); 270 } 271 return Nullable!Host.init; 272 }, logger.trace)(timeout); 273 } 274 275 Host[] getLeastLoadedServer(ref Miniorm db) { 276 import std.format : format; 277 278 auto stmt = spinSql!(() { 279 return db.prepare( 280 format!`SELECT address,lastUse,loadAvg FROM ServerTbl ORDER BY lastUse DESC, loadAvg ASC LIMIT %s`( 281 topCandidades)); 282 }, logger.trace)(timeout); 283 284 return spinSql!(() { 285 return stmt.get.execute.map!(a => Host(a.peek!string(0))).array; 286 }, logger.trace)(timeout); 287 } 288 289 void purgeServers(ref Miniorm db) { 290 spinSql!(() { db.run("DELETE FROM ServerTbl"); })(timeout); 291 } 292 293 /** Sleep for a random time that is min_ + rnd(0, span). 294 * 295 * Params: 296 * span = unit is msecs. 297 */ 298 private void rndSleep(Duration min_, ulong span) nothrow @trusted { 299 import core.thread : Thread; 300 import core.time : dur; 301 import std.random : uniform; 302 303 auto t_span = () { 304 try { 305 return uniform(0, span).dur!"msecs"; 306 } catch (Exception e) { 307 } 308 return span.dur!"msecs"; 309 }(); 310 311 Thread.sleep(min_ + t_span); 312 } 313 314 version (unittest) { 315 private struct UnittestDb { 316 string name; 317 Miniorm db; 318 Host[] hosts; 319 320 alias db this; 321 322 ~this() { 323 import std.file : remove; 324 325 db.close; 326 remove(name); 327 } 328 } 329 330 private UnittestDb makeUnittestDb(string file = __FILE__, uint line = __LINE__)() { 331 import std.format : format; 332 import std.path : baseName; 333 334 immutable dbFname = format!"%s_%s.sqlite3"(file.baseName, line); 335 return UnittestDb(dbFname, openDatabase(dbFname)); 336 } 337 338 private void populate(ref UnittestDb db, uint nr) { 339 import std.format : format; 340 import std.range; 341 import std.algorithm; 342 343 db.hosts = iota(0, nr).map!(a => Host(format!"h%s"(a))).array; 344 syncCluster(db, db.hosts); 345 } 346 347 private void fejkLoad(ref UnittestDb db, double start, double step) { 348 foreach (a; db.hosts) { 349 updateServer(db, HostLoad(a, Load(start, 50.dur!"msecs", false))); 350 start += step; 351 } 352 } 353 } 354 355 @("shall filter out all servers with an unknown status") 356 unittest { 357 auto db = makeUnittestDb(); 358 populate(db, 10); 359 auto res = getServerLoads(db, db.hosts[0 .. $ / 2], 1.dur!"seconds", 10.dur!"minutes"); 360 res.online.length.shouldEqual(0); 361 } 362 363 @("shall split the returned hosts by the host set when retrieving the load") 364 unittest { 365 auto db = makeUnittestDb(); 366 populate(db, 10); 367 fejkLoad(db, 0.1, 0.3); 368 369 auto res = getServerLoads(db, db.hosts[0 .. $ / 2], 1.dur!"seconds", 10.dur!"minutes"); 370 res.online.length.shouldEqual(5); 371 res.online.map!(a => a.host.payload).array.shouldEqual([ 372 "h0", "h1", "h2", "h3", "h4" 373 ]); 374 res.unused.length.shouldEqual(5); 375 } 376 377 @("shall put hosts with a too old status update in the unused set when splitting") 378 unittest { 379 auto db = makeUnittestDb(); 380 populate(db, 10); 381 fejkLoad(db, 0.1, 0.3); 382 updateServer(db, HostLoad(Host("h3"), Load(0.5, 50.dur!"msecs", false)), 383 Clock.currTime - 11.dur!"minutes"); 384 385 auto res = getServerLoads(db, db.hosts[0 .. $ / 2], 1.dur!"seconds", 10.dur!"minutes"); 386 res.online.length.shouldEqual(4); 387 res.online.map!(a => a.host.payload).array.shouldEqual([ 388 "h0", "h1", "h2", "h4" 389 ]); 390 res.unused.length.shouldEqual(6); 391 }