1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 */ 6 module distssh.database; 7 8 import logger = std.experimental.logger; 9 import std.algorithm : map, filter; 10 import std.array : array, empty; 11 import std.datetime; 12 import std.exception : collectException, ifThrown; 13 import std.meta : AliasSeq; 14 import std.typecons : Nullable, Tuple; 15 16 import miniorm; 17 18 import distssh.types; 19 20 immutable timeout = 30.dur!"seconds"; 21 enum SchemaVersion = 1; 22 23 struct VersionTbl { 24 @ColumnName("version") 25 ulong version_; 26 } 27 28 @TablePrimaryKey("address") 29 struct ServerTbl { 30 string address; 31 SysTime lastUpdate; 32 long accessTime; 33 double loadAvg; 34 bool unknown; 35 } 36 37 /// The daemon beats ones per minute. 38 struct DaemonBeat { 39 ulong id; 40 SysTime beat; 41 } 42 43 /// Clients beat each time they access the database. 44 struct ClientBeat { 45 ulong id; 46 SysTime beat; 47 } 48 49 Miniorm openDatabase(string dbFile) nothrow { 50 while (true) { 51 try { 52 auto db = Miniorm(dbFile); 53 const schemaVersion = () { 54 foreach (a; db.run(select!VersionTbl)) 55 return a; 56 return VersionTbl(0); 57 }().ifThrown(VersionTbl(0)); 58 59 alias Schema = AliasSeq!(VersionTbl, ServerTbl, DaemonBeat, ClientBeat); 60 61 if (schemaVersion.version_ < SchemaVersion) { 62 db.begin; 63 static foreach (tbl; Schema) 64 db.run("DROP TABLE " ~ tbl.stringof).collectException; 65 db.run(buildSchema!Schema); 66 db.run(insert!VersionTbl, VersionTbl(SchemaVersion)); 67 db.commit; 68 } 69 return db; 70 } catch (Exception e) { 71 logger.tracef("Trying to open/create database %s: %s", dbFile, e.msg).collectException; 72 } 73 74 rndSleep(25.dur!"msecs", 50); 75 } 76 } 77 78 /** Get all servers. 79 * 80 * Waiting for up to 10s for servers to be added. This handles the case where a 81 * daemon have been spawned in the background. 82 */ 83 Tuple!(HostLoad[], "online", Host[], "unused") getServerLoads(ref Miniorm db, const Host[] filterBy_) nothrow { 84 import std.datetime : Clock, dur; 85 import distssh.set; 86 87 auto getData() { 88 return db.run(select!ServerTbl).map!(a => HostLoad(Host(a.address), 89 Load(a.loadAvg, a.accessTime.dur!"msecs", a.unknown))).array; 90 } 91 92 auto filterBy = toSet(filterBy_.map!(a => a.payload)); 93 94 try { 95 auto stopAt = Clock.currTime + timeout; 96 while (Clock.currTime < stopAt) { 97 typeof(return) rval; 98 foreach (h; spinSql!(getData, logger.trace)(timeout)) { 99 if (filterBy.contains(h[0].payload)) 100 rval.online ~= h; 101 else 102 rval.unused ~= h[0]; 103 } 104 105 if (!rval.online.empty) 106 return rval; 107 } 108 } catch (Exception e) { 109 logger.warning("Failed reading from the database: ", e.msg).collectException; 110 } 111 112 return typeof(return).init; 113 } 114 115 /** Sync the hosts in the database with those that the client expect to exist. 116 * 117 * The client may from one invocation to another change the cluster. Those in 118 * the database should in that case be updated. 119 */ 120 void syncCluster(ref Miniorm db, const Host[] cluster) { 121 immutable highAccessTime = 1.dur!"minutes" 122 .total!"msecs"; 123 immutable highLoadAvg = 9999.0; 124 immutable forceEarlyUpdate = Clock.currTime - 1.dur!"hours"; 125 126 auto stmt = spinSql!(() { 127 return db.prepare(`INSERT OR IGNORE INTO ServerTbl (address,lastUpdate,accessTime,loadAvg,unknown) VALUES(:address, :lastUpdate, :accessTime, :loadAvg, :unknown)`); 128 }, logger.trace)(timeout); 129 130 foreach (const h; cluster) { 131 spinSql!(() { 132 stmt.reset; 133 stmt.bind(":address", h.payload); 134 stmt.bind(":lastUpdate", forceEarlyUpdate.toSqliteDateTime); 135 stmt.bind(":accessTime", highAccessTime); 136 stmt.bind(":loadAvg", highLoadAvg); 137 stmt.bind(":unknown", true); 138 stmt.execute; 139 }, logger.trace)(timeout); 140 } 141 } 142 143 /// Update the data for a server. 144 void newServer(ref Miniorm db, HostLoad a) { 145 spinSql!(() { 146 db.run(insertOrReplace!ServerTbl, ServerTbl(a[0].payload, 147 Clock.currTime, a[1].accessTime.total!"msecs", a[1].loadAvg, a[1].unknown)); 148 }, logger.trace)(timeout, 100.dur!"msecs", 300.dur!"msecs"); 149 } 150 151 /// Update the data for a server. 152 void updateServer(ref Miniorm db, HostLoad a) { 153 spinSql!(() { 154 // using IGNORE because the host could have been removed. 155 auto stmt = db.prepare(`UPDATE OR IGNORE ServerTbl SET lastUpdate = :lastUpdate, accessTime = :accessTime, loadAvg = :loadAvg, unknown = :unknown WHERE address = :address`); 156 stmt.bind(":address", a[0].payload); 157 stmt.bind(":lastUpdate", Clock.currTime.toSqliteDateTime); 158 stmt.bind(":accessTime", a[1].accessTime.total!"msecs"); 159 stmt.bind(":loadAvg", a[1].loadAvg); 160 stmt.bind(":unknown", a[1].unknown); 161 stmt.execute; 162 })(timeout, 100.dur!"msecs", 300.dur!"msecs"); 163 } 164 165 void removeUnusedServers(ref Miniorm db, Host[] hosts) { 166 if (hosts.empty) 167 return; 168 169 auto stmt = spinSql!(() { 170 return db.prepare(`DELETE FROM ServerTbl WHERE address = :address`); 171 }, logger.trace)(timeout); 172 173 foreach (h; hosts) { 174 spinSql!(() { 175 stmt.reset; 176 stmt.bind(":address", h.payload); 177 stmt.execute; 178 }, logger.trace)(timeout); 179 } 180 } 181 182 void daemonBeat(ref Miniorm db) { 183 spinSql!(() { 184 db.run(insertOrReplace!DaemonBeat, DaemonBeat(0, Clock.currTime)); 185 }, logger.trace)(timeout); 186 } 187 188 /// The heartbeat when daemon was last executed. 189 Duration getDaemonBeat(ref Miniorm db) { 190 return spinSql!(() { 191 foreach (a; db.run(select!DaemonBeat.where("id =", 0))) 192 return Clock.currTime - a.beat; 193 return Duration.max; 194 }, logger.trace)(timeout); 195 } 196 197 void clientBeat(ref Miniorm db) { 198 spinSql!(() { 199 db.run(insertOrReplace!ClientBeat, ClientBeat(0, Clock.currTime)); 200 }, logger.trace)(timeout); 201 } 202 203 Duration getClientBeat(ref Miniorm db) { 204 return spinSql!(() { 205 foreach (a; db.run(select!ClientBeat.where("id =", 0))) 206 return Clock.currTime - a.beat; 207 return Duration.max; 208 }, logger.trace)(timeout); 209 } 210 211 /// Returns: the server that have the oldest update timestamp. 212 Nullable!Host getOldestServer(ref Miniorm db) { 213 auto stmt = spinSql!(() { 214 return db.prepare( 215 `SELECT address FROM ServerTbl ORDER BY datetime(lastUpdate) ASC LIMIT 1`); 216 }, logger.trace)(timeout); 217 218 return spinSql!(() { 219 foreach (a; stmt.execute) { 220 auto address = a.peek!string(0); 221 return Nullable!Host(Host(address)); 222 } 223 return Nullable!Host.init; 224 }, logger.trace)(timeout); 225 } 226 227 Nullable!Host getLeastLoadedServer(ref Miniorm db) { 228 auto stmt = spinSql!(() { 229 return db.prepare(`SELECT address FROM ServerTbl ORDER BY loadAvg ASC LIMIT 1`); 230 }, logger.trace)(timeout); 231 232 return spinSql!(() { 233 foreach (a; stmt.execute) { 234 auto address = a.peek!string(0); 235 return Nullable!Host(Host(address)); 236 } 237 return Nullable!Host.init; 238 }, logger.trace)(timeout); 239 } 240 241 void purgeServers(ref Miniorm db) { 242 spinSql!(() { db.run("DELETE FROM ServerTbl"); })(timeout); 243 }