1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 Updates the distssh server cache. 7 8 # Design 9 10 The overall design is based around a shared, local database that is used by 11 both the daemon and clients to exchange statistics about the cluster. By 12 leveraging sqlite for the database it becomes safe for processes to read/write 13 to it concurrently. 14 15 The heartbeats are used by both the client and daemon: 16 17 * The client spawn a daemon if the daemon heartbeats have stopped being 18 updated. 19 * The server slow down the update of the cluster statistics if no client has 20 used it in a while until it finally terminates itself. 21 */ 22 module distssh.daemon; 23 24 import core.thread : Thread; 25 import core.time : dur; 26 import logger = std.experimental.logger; 27 import std.algorithm : map, filter, max; 28 import std.array : array, empty; 29 import std.datetime; 30 import std.exception : collectException; 31 import std.process : environment; 32 import std.typecons : Flag; 33 34 import colorlog; 35 import miniorm : SpinSqlTimeout; 36 import my.from_; 37 import my.set; 38 import my.timer; 39 40 import distssh.config; 41 import distssh.database; 42 import distssh.metric; 43 import distssh.types; 44 import distssh.utility; 45 46 Duration[3] updateLeastLoadTimersInterval = [ 47 10.dur!"seconds", 20.dur!"seconds", 60.dur!"seconds" 48 ]; 49 50 int cli(const Config fconf, Config.Daemon conf) { 51 auto db = openDatabase(fconf.global.dbPath); 52 const origNode = getInode(fconf.global.dbPath); 53 54 if (fconf.global.verbosity == VerboseMode.trace) 55 db.log(true); 56 57 if (conf.background) { 58 const beat = db.getDaemonBeat; 59 logger.trace("daemon beat: ", beat); 60 // do not spawn if a daemon is already running. 61 if (beat < heartBeatDaemonTimeout && !conf.forceStart) 62 return 0; 63 // by only updating the beat when in background mode it ensures that 64 // the daemon will sooner or later start in persistant background mode. 65 db.daemonBeat; 66 } 67 68 initMetrics(db, fconf.global.cluster, fconf.global.timeout); 69 70 if (!conf.background) 71 return 0; 72 73 // when starting the daemon for the first time we assume that if there are 74 // any data in the database that is old. 75 db.removeUnusedServers(1.dur!"minutes"); 76 77 bool running = true; 78 // the daemon is at most running for 24h. This is a workaround for if/when 79 // the client beat error out in such a way that it is always "zero". 80 const forceShutdown = Clock.currTime + 24.dur!"hours"; 81 auto clientBeat = db.getClientBeat; 82 83 auto timers = makeTimers; 84 85 makeInterval(timers, () @trusted { 86 clientBeat = db.getClientBeat; 87 logger.tracef("client beat: %s timeout: %s", clientBeat, conf.timeout); 88 return 10.dur!"seconds"; 89 }, 10.dur!"seconds"); 90 91 makeInterval(timers, () @trusted { 92 clientBeat = db.getClientBeat; 93 logger.tracef("client beat: %s timeout: %s", clientBeat, conf.timeout); 94 // no client is interested in the metric so stop collecting 95 if (clientBeat > conf.timeout) { 96 running = false; 97 } 98 if (Clock.currTime > forceShutdown) { 99 running = false; 100 } 101 return max(1.dur!"minutes", conf.timeout - clientBeat); 102 }, 10.dur!"seconds"); 103 104 makeInterval(timers, () @safe { 105 // the database may have been removed/recreated 106 if (getInode(fconf.global.dbPath) != origNode) { 107 running = false; 108 } 109 return 5.dur!"seconds"; 110 }, 5.dur!"seconds"); 111 112 makeInterval(timers, () @trusted { db.daemonBeat; return 15.dur!"seconds"; }, 15.dur!"seconds"); 113 114 // the times are arbitrarily chosen. 115 // assumption. The oldest statistic do not have to be updated that often 116 // because the other loop, updating the best candidate, is running "fast". 117 // assumption. If a user use distssh slower than five minutes it mean that 118 // a long running processes is used and the user wont interact with distssh 119 // for a while. 120 makeInterval(timers, () @trusted { 121 auto host = db.getOldestServer; 122 if (!host.isNull) { 123 updateServer(db, host.get, fconf.global.timeout); 124 } 125 126 if (clientBeat < 30.dur!"seconds") 127 return 10.dur!"seconds"; 128 if (clientBeat < 5.dur!"minutes") 129 return 30.dur!"seconds"; 130 return 60.dur!"seconds"; 131 }, 15.dur!"seconds"); 132 133 // the times are arbitrarily chosen. 134 // assumption. The least loaded server will be getting jobs put on it not 135 // only from *this* instance of distssh but also from all other instances 136 // using the cluster. For this instance to be quick at moving job to 137 // another host it has to update the statistics often. 138 // assumption. A user that is using distssh less than 90s isn't using 139 // distssh interactively/in quick succession. By backing of/slowing down 140 // the update it lowers the network load. 141 long updateLeastLoadedTimerTick; 142 makeInterval(timers, () @trusted { 143 auto s = db.getLeastLoadedServer; 144 if (s.length > 0 && s.length < topCandidades) { 145 updateServer(db, s[updateLeastLoadedTimerTick % s.length], fconf.global.timeout); 146 } else if (s.length >= topCandidades) { 147 updateServer(db, s[updateLeastLoadedTimerTick], fconf.global.timeout); 148 } 149 150 updateLeastLoadedTimerTick = ++updateLeastLoadedTimerTick % topCandidades; 151 152 if (clientBeat < 30.dur!"seconds") 153 return updateLeastLoadTimersInterval[0]; 154 if (clientBeat < 90.dur!"seconds") 155 return updateLeastLoadTimersInterval[1]; 156 return updateLeastLoadTimersInterval[2]; 157 }, 10.dur!"seconds"); 158 159 makeInterval(timers, () @trusted nothrow{ 160 try { 161 db.removeUnusedServers(30.dur!"minutes"); 162 } catch (Exception e) { 163 logger.warning(e.msg).collectException; 164 } 165 return 1.dur!"minutes"; 166 }, 1.dur!"minutes"); 167 168 makeInterval(timers, () @trusted nothrow{ 169 import std.range : take; 170 import distssh.connection; 171 172 try { 173 // keep the multiplex connections open to the top candidates 174 foreach (h; db.getLeastLoadedServer.take(topCandidades)) { 175 auto m = makeMaster(h); 176 if (!m.isAlive) { 177 m.connect; 178 } 179 } 180 } catch (Exception e) { 181 logger.trace(e.msg).collectException; 182 } 183 184 // open connections fast to the cluster while the client is using them 185 if (clientBeat < 5.dur!"minutes") 186 return 15.dur!"seconds"; 187 return 1.dur!"minutes"; 188 }, 5.dur!"seconds"); 189 190 if (globalEnvPurge in environment && globalEnvPurgeWhiteList in environment) { 191 import distssh.purge : readPurgeEnvWhiteList; 192 193 Config.Purge pconf; 194 pconf.kill = true; 195 pconf.userFilter = true; 196 auto econf = ExecuteOnHostConf(fconf.global.workDir, null, null, false, true); 197 Set!Host clearedServers; 198 199 logger.tracef("Server purge whitelist from %s is %s", 200 globalEnvPurgeWhiteList, readPurgeEnvWhiteList); 201 202 makeInterval(timers, () @safe nothrow{ 203 try { 204 purgeServer(db, econf, pconf, clearedServers, fconf.global.timeout); 205 } catch (Exception e) { 206 logger.warning(e.msg).collectException; 207 } 208 if (clientBeat < 2.dur!"minutes") 209 return 1.dur!"minutes"; 210 return 2.dur!"minutes"; 211 }, 2.dur!"minutes"); 212 } else { 213 logger.tracef("Automatic purge not running because both %s and %s must be set", 214 globalEnvPurge, globalEnvPurgeWhiteList); 215 } 216 217 while (running && !timers.empty) { 218 try { 219 timers.tick(100.dur!"msecs"); 220 } catch (SpinSqlTimeout e) { 221 // the database is removed or something else "bad" has happend that 222 // the database access has started throwing exceptions. 223 return 1; 224 } 225 } 226 227 return 0; 228 } 229 230 /** Start the daemon in either as a persistant background process or a oneshot 231 * update. 232 * 233 * Returns: true if the daemon where started. 234 */ 235 bool startDaemon(ref from.miniorm.Miniorm db, Flag!"background" bg) nothrow { 236 import std.file : thisExePath; 237 import my.process : spawnDaemon; 238 239 try { 240 if (bg && db.getDaemonBeat < heartBeatDaemonTimeout) { 241 return false; 242 } 243 244 const flags = () { 245 if (bg) 246 return ["--background"]; 247 return null; 248 }(); 249 250 spawnDaemon([thisExePath, "daemon"] ~ flags); 251 logger.trace("daemon spawned"); 252 return true; 253 } catch (Exception e) { 254 logger.error(e.msg).collectException; 255 } 256 257 return false; 258 } 259 260 private: 261 262 immutable heartBeatDaemonTimeout = 60.dur!"seconds"; 263 264 void initMetrics(ref from.miniorm.Miniorm db, const(Host)[] cluster, Duration timeout) nothrow { 265 import std.parallelism : TaskPool; 266 import std.random : randomCover; 267 import std.typecons : tuple; 268 269 static auto loadHost(T)(T host_timeout) nothrow { 270 import std.concurrency : thisTid; 271 272 logger.trace("load testing thread id: ", thisTid).collectException; 273 return HostLoad(host_timeout[0], getLoad(host_timeout[0], host_timeout[1])); 274 } 275 276 try { 277 auto pool = new TaskPool(); 278 scope (exit) 279 pool.stop; 280 281 foreach (v; pool.amap!(loadHost)(cluster.randomCover.map!(a => tuple(a, timeout)).array)) { 282 db.newServer(v); 283 } 284 } catch (Exception e) { 285 logger.trace(e.msg).collectException; 286 } 287 } 288 289 void updateServer(ref from.miniorm.Miniorm db, Host host, Duration timeout) { 290 auto load = getLoad(host, timeout); 291 distssh.database.updateServer(db, HostLoad(host, load)); 292 logger.tracef("Update %s with %s", host, load).collectException; 293 } 294 295 /// Round robin clearing of the servers. 296 void purgeServer(ref from.miniorm.Miniorm db, ExecuteOnHostConf econf, 297 const Config.Purge pconf, ref Set!Host clearedServers, const Duration timeout) @safe { 298 import std.algorithm : joiner; 299 import std.random : randomCover; 300 import std.range : only; 301 import distssh.purge; 302 303 auto servers = distssh.database.getServerLoads(db, clearedServers.toArray, 304 timeout, 10.dur!"minutes"); 305 306 logger.trace("Round robin server purge list ", clearedServers.toArray); 307 308 bool clearedAServer; 309 foreach (a; only(servers.online, servers.unused).joiner 310 .array 311 .randomCover 312 .map!(a => a.host) 313 .filter!(a => !clearedServers.contains(a))) { 314 logger.trace("Purge server ", a); 315 clearedAServer = true; 316 distssh.purge.purgeServer(econf, pconf, a); 317 clearedServers.add(a); 318 break; 319 } 320 321 if (!clearedAServer) { 322 logger.trace("Reset server purge list "); 323 clearedServers = Set!Host.init; 324 } 325 } 326 327 struct Inode { 328 ulong dev; 329 ulong ino; 330 331 bool opEquals()(auto ref const typeof(this) s) const { 332 return dev == s.dev && ino == s.ino; 333 } 334 } 335 336 Inode getInode(const Path p) @trusted nothrow { 337 import core.sys.posix.sys.stat : stat_t, stat; 338 import std.file : isSymlink, exists; 339 import std..string : toStringz; 340 341 const pz = p.toString.toStringz; 342 343 if (!exists(p.toString)) { 344 return Inode(0, 0); 345 } else { 346 stat_t st = void; 347 // should NOT use lstat because we want to know even if the symlink is 348 // redirected etc. 349 stat(pz, &st); 350 return Inode(st.st_dev, st.st_ino); 351 } 352 }