1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 Updates the distssh server cache. 7 8 # Design 9 10 The overall design is based around a shared, local database that is used by 11 both the daemon and clients to exchange statistics about the cluster. By 12 leveraging sqlite for the database it becomes safe for processes to read/write 13 to it concurrently. 14 15 The heartbeats are used by both the client and daemon: 16 17 * The client spawn a daemon if the daemon heartbeats have stopped being 18 updated. 19 * The server slow down the update of the cluster statistics if no client has 20 used it in a while until it finally terminates itself. 21 */ 22 module distssh.daemon; 23 24 import core.thread : Thread; 25 import core.time : dur; 26 import logger = std.experimental.logger; 27 import std.algorithm : map; 28 import std.array : array; 29 import std.datetime; 30 import std.exception : collectException; 31 32 import colorlog; 33 import miniorm : SpinSqlTimeout; 34 35 import from_; 36 37 import distssh.database; 38 import distssh.types; 39 import distssh.config; 40 import distssh.database; 41 import distssh.metric; 42 import distssh.timer; 43 44 int cli(const Config fconf, Config.Daemon conf) { 45 auto db = openDatabase(fconf.global.dbPath); 46 const origNode = getInode(fconf.global.dbPath); 47 48 { 49 const beat = db.getDaemonBeat; 50 logger.trace("daemon beat: ", beat); 51 // do not spawn if a daemon is already running. 52 if (beat < heartBeatDaemonTimeout) 53 return 0; 54 } 55 56 db.daemonBeat; 57 initMetrics(db, fconf.global.cluster, fconf.global.timeout); 58 59 bool running = true; 60 auto clientBeat = db.getClientBeat; 61 62 auto timers = makeTimers; 63 64 makeInterval(timers, () @trusted { 65 clientBeat = db.getClientBeat; 66 logger.trace("client beat: ", clientBeat); 67 // no client is interested in the metric so stop collecting 68 if (clientBeat > heartBeatClientTimeout) 69 running = false; 70 return running; 71 }, 5.dur!"seconds"); 72 73 makeInterval(timers, () @safe { 74 // the database may have been removed/recreated 75 if (getInode(fconf.global.dbPath) != origNode) 76 running = false; 77 return running; 78 }, 5.dur!"seconds"); 79 80 makeInterval(timers, () @trusted { db.daemonBeat; return running; }, 15.dur!"seconds"); 81 82 void updateOldestTimer(ref Timers ts) @trusted { 83 auto host = db.getOldestServer; 84 if (!host.isNull) 85 updateServer(db, host.get, fconf.global.timeout); 86 87 // the times are arbitrarily chosen. 88 // assumption. The oldest statistic do not have to be updated that 89 // often because the other loop, updating the best candidate, is 90 // running "fast". 91 // assumption. If a user use distssh slower than five minutes it mean 92 // that a long running processes is used and the user wont interact 93 // with distssh for a while. 94 auto next = () { 95 if (clientBeat < 5.dur!"minutes") 96 return 30.dur!"seconds"; 97 else 98 return 60.dur!"seconds"; 99 }(); 100 101 ts.put(&updateOldestTimer, next); 102 } 103 104 timers.put(&updateOldestTimer, Duration.zero); 105 106 void updateLeastLoadedTimer(ref Timers ts) @trusted { 107 auto host = db.getLeastLoadedServer; 108 if (!host.isNull) 109 updateServer(db, host.get, fconf.global.timeout); 110 111 // the times are arbitrarily chosen. 112 // assumption. The least loaded server will be getting jobs put on it 113 // not only from *this* instance of distssh but also from all other 114 // instances using the cluster. For this instance to be quick at moving 115 // job to another host it has to update the statistics often. 116 // assumption. A user that is using distssh less than 90s isn't using 117 // distssh interactively/in quick succession. By backing of/slowing 118 // down the update it lowers the network load. 119 auto next = () { 120 if (clientBeat < 30.dur!"seconds") 121 return 10.dur!"seconds"; 122 else if (clientBeat < 90.dur!"seconds") 123 return 20.dur!"seconds"; 124 else 125 return 60.dur!"seconds"; 126 }(); 127 128 ts.put(&updateLeastLoadedTimer, next); 129 } 130 131 timers.put(&updateLeastLoadedTimer, Duration.zero); 132 133 while (running && !timers.empty) { 134 try { 135 timers.tick(100.dur!"msecs"); 136 } catch (SpinSqlTimeout e) { 137 // the database is removed or something else "bad" has happend that 138 // the database access has started throwing exceptions. 139 return 1; 140 } 141 } 142 143 return 0; 144 } 145 146 void startDaemon(ref from.miniorm.Miniorm db) nothrow { 147 import distssh.process : spawnDaemon; 148 import std.file : thisExePath; 149 150 try { 151 db.clientBeat; 152 if (db.getDaemonBeat > heartBeatDaemonTimeout) { 153 db.purgeServers; // assuming the data is old 154 spawnDaemon([thisExePath, "daemon"]); 155 logger.trace("daemon spawned"); 156 } 157 } catch (Exception e) { 158 logger.error(e.msg).collectException; 159 } 160 } 161 162 private: 163 164 immutable heartBeatDaemonTimeout = 60.dur!"seconds"; 165 immutable heartBeatClientTimeout = 30.dur!"minutes"; 166 immutable updateOldestInterval = 30.dur!"seconds"; 167 168 void initMetrics(ref from.miniorm.Miniorm db, const(Host)[] cluster, Duration timeout) nothrow { 169 import std.parallelism : TaskPool; 170 171 static auto loadHost(T)(T host_timeout) nothrow { 172 import std.concurrency : thisTid; 173 174 logger.trace("load testing thread id: ", thisTid).collectException; 175 return HostLoad(host_timeout[0], getLoad(host_timeout[0], host_timeout[1])); 176 } 177 178 try { 179 auto shosts = cluster.map!(a => tuple(a, timeout)).array; 180 181 auto pool = new TaskPool(); 182 scope (exit) 183 pool.stop; 184 185 foreach (v; pool.amap!(loadHost)(shosts)) { 186 db.newServer(v); 187 } 188 } catch (Exception e) { 189 logger.trace(e.msg).collectException; 190 } 191 } 192 193 void updateServer(ref from.miniorm.Miniorm db, Host host, Duration timeout) { 194 auto load = getLoad(host, timeout); 195 distssh.database.updateServer(db, HostLoad(host, load)); 196 logger.tracef("Update %s with %s", host, load).collectException; 197 } 198 199 struct Inode { 200 ulong dev; 201 ulong ino; 202 203 bool opEquals()(auto ref const typeof(this) s) const { 204 return dev == s.dev && ino == s.ino; 205 } 206 } 207 208 Inode getInode(const string p) @trusted nothrow { 209 import core.sys.posix.sys.stat : stat_t, stat; 210 import std.file : isSymlink, exists; 211 import std..string : toStringz; 212 213 const pz = p.toStringz; 214 215 if (!exists(p)) { 216 return Inode(0, 0); 217 } else { 218 stat_t st = void; 219 // should NOT use lstat because we want to know even if the symlink is 220 // redirected etc. 221 stat(pz, &st); 222 return Inode(st.st_dev, st.st_ino); 223 } 224 }