1 /** 2 Copyright: Copyright (c) 2019, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 Updates the distssh server cache. 7 8 # Design 9 10 The overall design is based around a shared, local database that is used by 11 both the daemon and clients to exchange statistics about the cluster. By 12 leveraging sqlite for the database it becomes safe for processes to read/write 13 to it concurrently. 14 15 The heartbeats are used by both the client and daemon: 16 17 * The client spawn a daemon if the daemon heartbeats have stopped being 18 updated. 19 * The server slow down the update of the cluster statistics if no client has 20 used it in a while until it finally terminates itself. 21 */ 22 module distssh.daemon; 23 24 import core.thread : Thread; 25 import core.time : dur; 26 import logger = std.experimental.logger; 27 import std.algorithm : map, filter, max; 28 import std.array : array, empty; 29 import std.datetime; 30 import std.exception : collectException; 31 import std.process : environment; 32 import std.typecons : Flag; 33 34 import colorlog; 35 import miniorm : SpinSqlTimeout; 36 import my.from_; 37 import my.named_type; 38 import my.set; 39 import my.timer; 40 41 import distssh.config; 42 import distssh.database; 43 import distssh.metric; 44 import distssh.types; 45 import distssh.utility; 46 47 Duration[3] updateLeastLoadTimersInterval = [ 48 10.dur!"seconds", 20.dur!"seconds", 60.dur!"seconds" 49 ]; 50 51 int cli(const Config fconf, Config.Daemon conf) { 52 auto db = openDatabase(fconf.global.dbPath); 53 const origNode = getInode(fconf.global.dbPath); 54 55 if (fconf.global.verbosity == VerboseMode.trace) 56 db.log(true); 57 58 if (conf.background) { 59 const beat = db.getDaemonBeat; 60 logger.trace("daemon beat: ", beat); 61 // do not spawn if a daemon is already running. 62 if (beat < heartBeatDaemonTimeout && !conf.forceStart) 63 return 0; 64 // by only updating the beat when in background mode it ensures that 65 // the daemon will sooner or later start in persistant background mode. 66 db.daemonBeat; 67 } 68 69 initMetrics(db, fconf.global.cluster, fconf.global.timeout); 70 71 if (!conf.background) 72 return 0; 73 74 // when starting the daemon for the first time we assume that if there are 75 // any data in the database that is old. 76 db.removeUnusedServers(1.dur!"minutes"); 77 78 bool running = true; 79 // the daemon is at most running for 24h. This is a workaround for if/when 80 // the client beat error out in such a way that it is always "zero". 81 const forceShutdown = Clock.currTime + 24.dur!"hours"; 82 auto clientBeat = db.getClientBeat; 83 auto lastDaemonBeat = db.getDaemonBeatClock; 84 85 auto timers = makeTimers; 86 87 makeInterval(timers, () @trusted { 88 // update the local clientBeat continiouesly 89 clientBeat = db.getClientBeat; 90 logger.tracef("client beat: %s timeout: %s", clientBeat, conf.timeout); 91 return 10.dur!"seconds"; 92 }, 10.dur!"seconds"); 93 94 makeInterval(timers, () @trusted { 95 import std.math : abs; 96 import std.random : Mt19937, dice, unpredictableSeed; 97 98 // the current daemon beat in the database should match the last one 99 // this daemon wrote. if it doesn't match it means there are multiple 100 // daemons running thus roll the dice, 50% chance this instance should 101 // shutdown. 102 const beat = db.getDaemonBeatClock; 103 const diff = abs((lastDaemonBeat - beat).total!"msecs"); 104 logger.tracef("lastDaemonBeat: %s beat: %s diff: %s", lastDaemonBeat, beat, diff); 105 106 if (diff > 2) { 107 Mt19937 gen; 108 gen.seed(unpredictableSeed); 109 running = gen.dice(0.5, 0.5) == 0; 110 logger.trace(!running, 111 "multiple instances of distssh daemon is running. Terminating this instance."); 112 } 113 return 1.dur!"minutes"; 114 }, 1.dur!"minutes"); 115 116 makeInterval(timers, () @trusted { 117 clientBeat = db.getClientBeat; 118 logger.tracef("client beat: %s timeout: %s", clientBeat, conf.timeout); 119 // no client is interested in the metric so stop collecting 120 if (clientBeat > conf.timeout) { 121 running = false; 122 } 123 if (Clock.currTime > forceShutdown) { 124 running = false; 125 } 126 return max(1.dur!"minutes", conf.timeout - clientBeat); 127 }, 10.dur!"seconds"); 128 129 makeInterval(timers, () @safe { 130 // the database may have been removed/recreated 131 if (getInode(fconf.global.dbPath) != origNode) { 132 running = false; 133 } 134 return 5.dur!"seconds"; 135 }, 5.dur!"seconds"); 136 137 makeInterval(timers, () @trusted { 138 db.daemonBeat; 139 lastDaemonBeat = db.getDaemonBeatClock; 140 return 15.dur!"seconds"; 141 }, 15.dur!"seconds"); 142 143 // the times are arbitrarily chosen. 144 // assumption. The oldest statistic do not have to be updated that often 145 // because the other loop, updating the best candidate, is running "fast". 146 // assumption. If a user use distssh slower than five minutes it mean that 147 // a long running processes is used and the user wont interact with distssh 148 // for a while. 149 makeInterval(timers, () @trusted { 150 auto host = db.getOldestServer; 151 if (!host.isNull) { 152 updateServer(db, host.get, fconf.global.timeout); 153 } 154 155 if (clientBeat < 30.dur!"seconds") 156 return 10.dur!"seconds"; 157 if (clientBeat < 5.dur!"minutes") 158 return 30.dur!"seconds"; 159 return 60.dur!"seconds"; 160 }, 15.dur!"seconds"); 161 162 // the times are arbitrarily chosen. 163 // assumption. The least loaded server will be getting jobs put on it not 164 // only from *this* instance of distssh but also from all other instances 165 // using the cluster. For this instance to be quick at moving job to 166 // another host it has to update the statistics often. 167 // assumption. A user that is using distssh less than 90s isn't using 168 // distssh interactively/in quick succession. By backing of/slowing down 169 // the update it lowers the network load. 170 long updateLeastLoadedTimerTick; 171 makeInterval(timers, () @trusted { 172 auto s = db.getLeastLoadedServer; 173 if (s.length > 0 && s.length < topCandidades) { 174 updateServer(db, s[updateLeastLoadedTimerTick % s.length], fconf.global.timeout); 175 } else if (s.length >= topCandidades) { 176 updateServer(db, s[updateLeastLoadedTimerTick], fconf.global.timeout); 177 } 178 179 updateLeastLoadedTimerTick = ++updateLeastLoadedTimerTick % topCandidades; 180 181 if (clientBeat < 30.dur!"seconds") 182 return updateLeastLoadTimersInterval[0]; 183 if (clientBeat < 90.dur!"seconds") 184 return updateLeastLoadTimersInterval[1]; 185 return updateLeastLoadTimersInterval[2]; 186 }, 10.dur!"seconds"); 187 188 makeInterval(timers, () @trusted nothrow{ 189 try { 190 db.removeUnusedServers(30.dur!"minutes"); 191 } catch (Exception e) { 192 logger.warning(e.msg).collectException; 193 } 194 return 1.dur!"minutes"; 195 }, 1.dur!"minutes"); 196 197 makeInterval(timers, () @trusted nothrow{ 198 import std.range : take; 199 import distssh.connection; 200 201 try { 202 // keep the multiplex connections open to the top candidates 203 foreach (h; db.getLeastLoadedServer.take(topCandidades)) { 204 auto m = makeMaster(h); 205 if (!m.isAlive) { 206 m.connect; 207 } 208 } 209 } catch (Exception e) { 210 logger.trace(e.msg).collectException; 211 } 212 213 // open connections fast to the cluster while the client is using them 214 if (clientBeat < 5.dur!"minutes") 215 return 15.dur!"seconds"; 216 return 1.dur!"minutes"; 217 }, 5.dur!"seconds"); 218 219 if (globalEnvPurge in environment && globalEnvPurgeWhiteList in environment) { 220 import distssh.purge : readPurgeEnvWhiteList; 221 222 Config.Purge pconf; 223 pconf.kill = true; 224 pconf.userFilter = true; 225 auto econf = ExecuteOnHostConf(fconf.global.workDir, typeof(fconf.global.command) 226 .init, typeof(fconf.global.importEnv).init, 227 typeof(fconf.global.cloneEnv)(false), typeof(fconf.global.noImportEnv)(true)); 228 Set!Host clearedServers; 229 230 logger.tracef("Server purge whitelist from %s is %s", 231 globalEnvPurgeWhiteList, readPurgeEnvWhiteList); 232 233 makeInterval(timers, () @safe nothrow{ 234 try { 235 purgeServer(db, econf, pconf, clearedServers, fconf.global.timeout); 236 } catch (Exception e) { 237 logger.warning(e.msg).collectException; 238 } 239 if (clientBeat < 2.dur!"minutes") 240 return 1.dur!"minutes"; 241 return 2.dur!"minutes"; 242 }, 2.dur!"minutes"); 243 } else { 244 logger.tracef("Automatic purge not running because both %s and %s must be set", 245 globalEnvPurge, globalEnvPurgeWhiteList); 246 } 247 248 while (running && !timers.empty) { 249 try { 250 timers.tick(100.dur!"msecs"); 251 } catch (SpinSqlTimeout e) { 252 // the database is removed or something else "bad" has happend that 253 // the database access has started throwing exceptions. 254 return 1; 255 } 256 } 257 258 return 0; 259 } 260 261 /** Start the daemon in either as a persistant background process or a oneshot 262 * update. 263 * 264 * Returns: true if the daemon where started. 265 */ 266 bool startDaemon(ref from.miniorm.Miniorm db, Flag!"background" bg) nothrow { 267 import std.file : thisExePath; 268 import my.process : spawnDaemon; 269 270 try { 271 if (bg && db.getDaemonBeat < heartBeatDaemonTimeout) { 272 return false; 273 } 274 275 const flags = () { 276 if (bg) 277 return ["--background"]; 278 return null; 279 }(); 280 281 spawnDaemon([thisExePath, "daemon"] ~ flags); 282 logger.trace("daemon spawned"); 283 return true; 284 } catch (Exception e) { 285 logger.error(e.msg).collectException; 286 } 287 288 return false; 289 } 290 291 private: 292 293 immutable heartBeatDaemonTimeout = 60.dur!"seconds"; 294 295 void initMetrics(ref from.miniorm.Miniorm db, const(Host)[] cluster, Duration timeout) nothrow { 296 import std.parallelism : TaskPool; 297 import std.random : randomCover; 298 import std.typecons : tuple; 299 300 static auto loadHost(T)(T host_timeout) nothrow { 301 import std.concurrency : thisTid; 302 303 logger.trace("load testing thread id: ", thisTid).collectException; 304 return HostLoad(host_timeout[0], getLoad(host_timeout[0], host_timeout[1])); 305 } 306 307 try { 308 auto pool = new TaskPool(); 309 scope (exit) 310 pool.stop; 311 312 foreach (v; pool.amap!(loadHost)(cluster.randomCover.map!(a => tuple(a, timeout)).array)) { 313 db.newServer(v); 314 } 315 } catch (Exception e) { 316 logger.trace(e.msg).collectException; 317 } 318 } 319 320 void updateServer(ref from.miniorm.Miniorm db, Host host, Duration timeout) { 321 auto load = getLoad(host, timeout); 322 distssh.database.updateServer(db, HostLoad(host, load)); 323 logger.tracef("Update %s with %s", host, load).collectException; 324 } 325 326 /// Round robin clearing of the servers. 327 void purgeServer(ref from.miniorm.Miniorm db, ExecuteOnHostConf econf, 328 const Config.Purge pconf, ref Set!Host clearedServers, const Duration timeout) @safe { 329 import std.algorithm : joiner; 330 import std.random : randomCover; 331 import std.range : only; 332 import distssh.purge; 333 334 auto servers = distssh.database.getServerLoads(db, clearedServers.toArray, 335 timeout, 10.dur!"minutes"); 336 337 logger.trace("Round robin server purge list ", clearedServers.toArray); 338 339 bool clearedAServer; 340 foreach (a; only(servers.online, servers.unused).joiner 341 .array 342 .randomCover 343 .map!(a => a.host) 344 .filter!(a => !clearedServers.contains(a))) { 345 logger.trace("Purge server ", a); 346 clearedAServer = true; 347 distssh.purge.purgeServer(econf, pconf, a); 348 clearedServers.add(a); 349 break; 350 } 351 352 if (!clearedAServer) { 353 logger.trace("Reset server purge list "); 354 clearedServers = Set!Host.init; 355 } 356 } 357 358 struct Inode { 359 ulong dev; 360 ulong ino; 361 362 bool opEquals()(auto ref const typeof(this) s) const { 363 return dev == s.dev && ino == s.ino; 364 } 365 } 366 367 Inode getInode(const Path p) @trusted nothrow { 368 import core.sys.posix.sys.stat : stat_t, stat; 369 import std.file : isSymlink, exists; 370 import std..string : toStringz; 371 372 const pz = p.toString.toStringz; 373 374 if (!exists(p.toString)) { 375 return Inode(0, 0); 376 } else { 377 stat_t st = void; 378 // should NOT use lstat because we want to know even if the symlink is 379 // redirected etc. 380 stat(pz, &st); 381 return Inode(st.st_dev, st.st_ino); 382 } 383 }