1 /** 2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved. 3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0) 4 Author: Joakim Brännström (joakim.brannstrom@gmx.com) 5 6 This is based on webfreak's 7 [fswatch](git@github.com:WebFreak001/FSWatch.git). I had problems with the 8 API as it where because I needed to be able to watch multiple directories, 9 filter what files are to be watched and to be robust against broken symlinks. 10 11 Lets say you want to watch a directory for changes and add all directories to 12 be watched too. 13 14 --- 15 auto fw = fileWatch(); 16 fw.watchRecurse("my_dir"); 17 while (true) { 18 auto ev = fw.wait; 19 foreach (e; ev) { 20 e.match!( 21 (Event.Access x) => writeln(x), 22 (Event.Attribute x) => writeln(x), 23 (Event.CloseWrite x) => writeln(x), 24 (Event.CloseNoWrite x) => writeln(x), 25 (Event.Create x) { fw.watchRecurse(x.path); }, 26 (Event.Delete x) => writeln(x), 27 (Event.DeleteSelf x) => writeln(x), 28 (Event.Modify x) => writeln(x), 29 (Event.MoveSelf x) => writeln(x), 30 (Event.Rename x) => writeln(x), 31 (Event.Open x) => writeln(x), 32 ); 33 } 34 } 35 --- 36 */ 37 module my.fswatch; 38 39 import core.sys.linux.errno : errno; 40 import core.sys.linux.fcntl : fcntl, F_SETFD, FD_CLOEXEC; 41 import core.sys.linux.sys.inotify : inotify_rm_watch, inotify_init1, inotify_add_watch, inotify_event, IN_CLOEXEC, 42 IN_NONBLOCK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_WRITE, 43 IN_CLOSE_NOWRITE, IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO, 44 IN_CREATE, IN_DELETE, IN_DELETE_SELF, IN_MOVE_SELF, IN_UNMOUNT, IN_IGNORED, IN_EXCL_UNLINK; 45 import core.sys.linux.unistd : close, read; 46 import core.sys.posix.poll : pollfd, poll, POLLIN, POLLNVAL; 47 import core.thread : Thread; 48 import core.time : dur, Duration; 49 import logger = std.experimental.logger; 50 import std.array : appender, empty, array; 51 import std.conv : to; 52 import std.file : DirEntry, isDir, dirEntries, rmdirRecurse, write, append, 53 rename, remove, exists, SpanMode, mkdir, rmdir; 54 import std.path : buildPath; 55 import std.range : isInputRange; 56 import std..string : toStringz, fromStringz; 57 import std.exception : collectException; 58 59 import sumtype; 60 61 import my.path : AbsolutePath, Path; 62 import my.set; 63 64 struct Event { 65 /// File was accessed (e.g., read(2), execve(2)). 66 static struct Access { 67 AbsolutePath path; 68 this(this) { 69 } 70 } 71 72 /** Metadata changed—for example, permissions (e.g., chmod(2)), timestamps 73 * (e.g., utimensat(2)), extended attributes (setxattr(2)), link count 74 * (since Linux 2.6.25; e.g., for the target of link(2) and for unlink(2)), 75 * and user/group ID (e.g., chown(2)). 76 */ 77 static struct Attribute { 78 AbsolutePath path; 79 this(this) { 80 } 81 } 82 83 /// File opened for writing was closed. 84 static struct CloseWrite { 85 AbsolutePath path; 86 this(this) { 87 } 88 } 89 90 /// File or directory not opened for writing was closed. 91 static struct CloseNoWrite { 92 AbsolutePath path; 93 this(this) { 94 } 95 } 96 97 /** File/directory created in watched directory (e.g., open(2) O_CREAT, 98 * mkdir(2), link(2), symlink(2), bind(2) on a UNIX domain socket). 99 */ 100 static struct Create { 101 AbsolutePath path; 102 this(this) { 103 } 104 } 105 106 /// File/directory deleted from watched directory. 107 static struct Delete { 108 AbsolutePath path; 109 this(this) { 110 } 111 } 112 113 /** Watched file/directory was itself deleted. (This event also occurs if 114 * an object is moved to another filesystem, since mv(1) in effect copies 115 * the file to the other filesystem and then deletes it from the original 116 * filesys‐ tem.) In addition, an IN_IGNORED event will subsequently be 117 * generated for the watch descriptor. 118 */ 119 static struct DeleteSelf { 120 AbsolutePath path; 121 this(this) { 122 } 123 } 124 125 /// File was modified (e.g., write(2), truncate(2)). 126 static struct Modify { 127 AbsolutePath path; 128 this(this) { 129 } 130 } 131 132 /// Watched file/directory was itself moved. 133 static struct MoveSelf { 134 AbsolutePath path; 135 this(this) { 136 } 137 } 138 139 /// Occurs when a file or folder inside a folder is renamed. 140 static struct Rename { 141 AbsolutePath from; 142 AbsolutePath to; 143 this(this) { 144 } 145 } 146 147 /// File or directory was opened. 148 static struct Open { 149 AbsolutePath path; 150 this(this) { 151 } 152 } 153 } 154 155 alias FileChangeEvent = SumType!(Event.Access, Event.Attribute, Event.CloseWrite, 156 Event.CloseNoWrite, Event.Create, Event.Delete, Event.DeleteSelf, 157 Event.Modify, Event.MoveSelf, Event.Rename, Event.Open); 158 159 /// Construct a FileWatch. 160 auto fileWatch() { 161 int fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC); 162 if (fd == -1) { 163 throw new Exception( 164 "inotify_init1 returned invalid file descriptor. Error code " ~ errno.to!string); 165 } 166 return FileWatch(fd); 167 } 168 169 /// Listens for create/modify/removal of files and directories. 170 enum ContentEvents = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY 171 | IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO | IN_EXCL_UNLINK | IN_CLOSE_WRITE; 172 173 /// Listen for events that change the metadata. 174 enum MetadataEvents = IN_ACCESS | IN_ATTRIB | IN_OPEN | IN_CLOSE_NOWRITE | IN_EXCL_UNLINK; 175 176 /** An instance of a FileWatcher 177 */ 178 struct FileWatch { 179 import std.functional : toDelegate; 180 181 private { 182 FdPoller poller; 183 int fd; 184 ubyte[1024 * 4] eventBuffer; // 4kb buffer for events 185 struct FDInfo { 186 int wd; 187 bool watched; 188 Path path; 189 190 this(this) { 191 } 192 } 193 194 FDInfo[int] directoryMap; // map every watch descriptor to a directory 195 } 196 197 private this(int fd) { 198 this.fd = fd; 199 poller.put(FdPoll(fd), [PollEvent.in_]); 200 } 201 202 ~this() { 203 if (fd) { 204 foreach (fdinfo; directoryMap.byValue) { 205 if (fdinfo.watched) 206 inotify_rm_watch(fd, fdinfo.wd); 207 } 208 close(fd); 209 } 210 } 211 212 /** Add a path to watch for events. 213 * 214 * Params: 215 * path = path to watch 216 * events = events to watch for. See man inotify and core.sys.linux.sys.inotify. 217 * 218 * Returns: true if the path was successfully added. 219 */ 220 bool watch(Path path, uint events = ContentEvents) { 221 const wd = inotify_add_watch(fd, path.toStringz, events); 222 if (wd != -1) { 223 const fc = fcntl(fd, F_SETFD, FD_CLOEXEC); 224 if (fc != -1) { 225 directoryMap[wd] = FDInfo(wd, true, path); 226 return true; 227 } 228 } 229 230 return false; 231 } 232 233 /// 234 bool watch(string p, uint events = ContentEvents) { 235 return watch(Path(p)); 236 } 237 238 private static bool allFiles(string p) { 239 return true; 240 } 241 242 /** Recursively add the path and all its subdirectories and files to be watched. 243 * 244 * Params: 245 * pred = only those files and directories that `pred` returns true for are watched, by default every file/directory. 246 * root = directory to watch together with its content and subdirectories. 247 * events = events to watch for. See man inotify and core.sys.linux.sys.inotify. 248 * 249 * Returns: paths that failed to be added. 250 */ 251 AbsolutePath[] watchRecurse(Path root, uint events = ContentEvents, 252 bool delegate(string) pred = toDelegate(&allFiles)) { 253 import std.algorithm : filter; 254 import my.file : existsAnd; 255 256 auto failed = appender!(AbsolutePath[])(); 257 258 if (!watch(root, events)) { 259 failed.put(AbsolutePath(root)); 260 } 261 262 if (!existsAnd!isDir(root)) { 263 return failed.data; 264 } 265 266 auto dirs = [AbsolutePath(root)]; 267 Set!AbsolutePath visited; 268 while (!dirs.empty) { 269 auto front = dirs[0]; 270 dirs = dirs[1 .. $]; 271 if (front in visited) 272 continue; 273 visited.add(front); 274 275 try { 276 foreach (p; dirEntries(front, SpanMode.shallow).filter!(a => pred(a.name))) { 277 if (!watch(Path(p.name), events)) { 278 failed.put(AbsolutePath(p.name)); 279 } 280 if (existsAnd!isDir(Path(p.name))) { 281 dirs ~= AbsolutePath(p.name); 282 } 283 } 284 } catch (Exception e) { 285 () @trusted { logger.trace(e); }(); 286 logger.trace(e.msg); 287 failed.put(AbsolutePath(front)); 288 } 289 } 290 291 return failed.data; 292 } 293 294 /// 295 AbsolutePath[] watchRecurse(string root, uint events = ContentEvents, 296 bool delegate(string) pred = toDelegate(&allFiles)) { 297 return watchRecurse(Path(root), events, pred); 298 } 299 300 /** The events that have occured since last query. 301 * 302 * Params: 303 * timeout = max time to wait for events. 304 * 305 * Returns: the events that has occured to the watched paths. 306 */ 307 FileChangeEvent[] getEvents(Duration timeout = Duration.zero) { 308 import std.algorithm : min; 309 310 FileChangeEvent[] events; 311 if (!fd) 312 return events; 313 314 auto res = poller.wait(timeout); 315 316 if (res.empty) { 317 return events; 318 } 319 320 if (res[0].status[PollStatus.nval]) { 321 throw new Exception("Failed to poll events. File descriptor not open " ~ fd.to!string); 322 } 323 324 if (!res[0].status[PollStatus.in_]) { 325 // no events to read 326 return events; 327 } 328 329 const receivedBytes = read(fd, eventBuffer.ptr, eventBuffer.length); 330 int i = 0; 331 AbsolutePath[uint] cookie; 332 while (true) { 333 auto info = cast(inotify_event*)(eventBuffer.ptr + i); 334 335 if (info.wd !in directoryMap) 336 continue; 337 338 auto fname = () { 339 string fileName = info.name.ptr.fromStringz.idup; 340 return AbsolutePath(buildPath(directoryMap[info.wd].path, fileName)); 341 }(); 342 343 if ((info.mask & IN_MOVED_TO) == 0) { 344 if (auto v = info.cookie in cookie) { 345 events ~= FileChangeEvent(Event.Delete(*v)); 346 cookie.remove(info.cookie); 347 } 348 } 349 350 if ((info.mask & IN_ACCESS) != 0) { 351 events ~= FileChangeEvent(Event.Access(fname)); 352 } 353 354 if ((info.mask & IN_ATTRIB) != 0) { 355 events ~= FileChangeEvent(Event.Attribute(fname)); 356 } 357 358 if ((info.mask & IN_CLOSE_WRITE) != 0) { 359 events ~= FileChangeEvent(Event.CloseWrite(fname)); 360 } 361 362 if ((info.mask & IN_CLOSE_NOWRITE) != 0) { 363 events ~= FileChangeEvent(Event.CloseNoWrite(fname)); 364 } 365 366 if ((info.mask & IN_CREATE) != 0) { 367 events ~= FileChangeEvent(Event.Create(fname)); 368 } 369 370 if ((info.mask & IN_DELETE) != 0) { 371 events ~= FileChangeEvent(Event.Delete(fname)); 372 } 373 374 if ((info.mask & IN_DELETE_SELF) != 0) { 375 // must go via the mapping or there may be trailing junk in fname. 376 events ~= FileChangeEvent(Event.DeleteSelf(directoryMap[info.wd].path.AbsolutePath)); 377 } 378 379 if ((info.mask & IN_MODIFY) != 0) { 380 events ~= FileChangeEvent(Event.Modify(fname)); 381 } 382 383 if ((info.mask & IN_MOVE_SELF) != 0) { 384 // must go via the mapping or there may be trailing junk in fname. 385 events ~= FileChangeEvent(Event.MoveSelf(directoryMap[info.wd].path.AbsolutePath)); 386 } 387 388 if ((info.mask & IN_MOVED_FROM) != 0) { 389 cookie[info.cookie] = fname; 390 } 391 392 if ((info.mask & IN_MOVED_TO) != 0) { 393 if (auto v = info.cookie in cookie) { 394 events ~= FileChangeEvent(Event.Rename(*v, fname)); 395 cookie.remove(info.cookie); 396 } else { 397 events ~= FileChangeEvent(Event.Create(fname)); 398 } 399 } 400 401 if ((info.mask & IN_DELETE_SELF) != 0 || (info.mask & IN_MOVE_SELF) != 0) { 402 inotify_rm_watch(fd, info.wd); 403 directoryMap[info.wd].watched = false; 404 } 405 406 i += inotify_event.sizeof + info.len; 407 408 if (i >= receivedBytes) 409 break; 410 } 411 412 foreach (c; cookie.byValue) { 413 events ~= FileChangeEvent(Event.Delete(AbsolutePath(c))); 414 } 415 416 return events; 417 } 418 } 419 420 /// 421 unittest { 422 import core.thread; 423 424 if (exists("test")) 425 rmdirRecurse("test"); 426 scope (exit) { 427 if (exists("test")) 428 rmdirRecurse("test"); 429 } 430 431 auto watcher = fileWatch(); 432 433 mkdir("test"); 434 assert(watcher.watch("test")); 435 436 write("test/a.txt", "abc"); 437 auto ev = watcher.getEvents(5.dur!"seconds"); 438 assert(ev.length > 0); 439 assert(ev[0].tryMatch!((Event.Create x) { 440 assert(x.path == AbsolutePath("test/a.txt")); 441 return true; 442 })); 443 444 append("test/a.txt", "def"); 445 ev = watcher.getEvents(5.dur!"seconds"); 446 assert(ev.length > 0); 447 assert(ev[0].tryMatch!((Event.Modify x) { 448 assert(x.path == AbsolutePath("test/a.txt")); 449 return true; 450 })); 451 452 rename("test/a.txt", "test/b.txt"); 453 ev = watcher.getEvents(5.dur!"seconds"); 454 assert(ev.length > 0); 455 assert(ev[0].tryMatch!((Event.Rename x) { 456 assert(x.from == AbsolutePath("test/a.txt")); 457 assert(x.to == AbsolutePath("test/b.txt")); 458 return true; 459 })); 460 461 remove("test/b.txt"); 462 ev = watcher.getEvents(5.dur!"seconds"); 463 assert(ev.length > 0); 464 assert(ev[0].tryMatch!((Event.Delete x) { 465 assert(x.path == AbsolutePath("test/b.txt")); 466 return true; 467 })); 468 469 rmdirRecurse("test"); 470 ev = watcher.getEvents(5.dur!"seconds"); 471 assert(ev.length > 0); 472 assert(ev[0].tryMatch!((Event.DeleteSelf x) { 473 assert(x.path == AbsolutePath("test")); 474 return true; 475 })); 476 } 477 478 /// 479 unittest { 480 import std.algorithm : canFind; 481 482 if (exists("test2")) 483 rmdirRecurse("test2"); 484 if (exists("test3")) 485 rmdirRecurse("test3"); 486 scope (exit) { 487 if (exists("test2")) 488 rmdirRecurse("test2"); 489 if (exists("test3")) 490 rmdirRecurse("test3"); 491 } 492 493 auto watcher = fileWatch(); 494 mkdir("test2"); 495 assert(watcher.watchRecurse("test2").length == 0); 496 497 write("test2/a.txt", "abc"); 498 auto ev = watcher.getEvents(5.dur!"seconds"); 499 assert(ev.length == 3); 500 assert(ev[0].tryMatch!((Event.Create x) { 501 assert(x.path == AbsolutePath("test2/a.txt")); 502 return true; 503 })); 504 assert(ev[1].tryMatch!((Event.Modify x) { 505 assert(x.path == AbsolutePath("test2/a.txt")); 506 return true; 507 })); 508 assert(ev[2].tryMatch!((Event.CloseWrite x) { 509 assert(x.path == AbsolutePath("test2/a.txt")); 510 return true; 511 })); 512 513 rename("test2/a.txt", "./testfile-a.txt"); 514 ev = watcher.getEvents(5.dur!"seconds"); 515 assert(ev.length == 1); 516 assert(ev[0].tryMatch!((Event.Delete x) { 517 assert(x.path == AbsolutePath("test2/a.txt")); 518 return true; 519 })); 520 521 rename("./testfile-a.txt", "test2/b.txt"); 522 ev = watcher.getEvents(5.dur!"seconds"); 523 assert(ev.length == 1); 524 assert(ev[0].tryMatch!((Event.Create x) { 525 assert(x.path == AbsolutePath("test2/b.txt")); 526 return true; 527 })); 528 529 remove("test2/b.txt"); 530 ev = watcher.getEvents(5.dur!"seconds"); 531 assert(ev.length == 1); 532 assert(ev[0].tryMatch!((Event.Delete x) { 533 assert(x.path == AbsolutePath("test2/b.txt")); 534 return true; 535 })); 536 537 mkdir("test2/mydir"); 538 rmdir("test2/mydir"); 539 ev = watcher.getEvents(5.dur!"seconds"); 540 assert(ev.length == 2); 541 assert(ev[0].tryMatch!((Event.Create x) { 542 assert(x.path == AbsolutePath("test2/mydir")); 543 return true; 544 })); 545 assert(ev[1].tryMatch!((Event.Delete x) { 546 assert(x.path == AbsolutePath("test2/mydir")); 547 return true; 548 })); 549 550 // test for creation, modification, removal of subdirectory 551 mkdir("test2/subdir"); 552 ev = watcher.getEvents(5.dur!"seconds"); 553 assert(ev.length == 1); 554 assert(ev[0].tryMatch!((Event.Create x) { 555 assert(x.path == AbsolutePath("test2/subdir")); 556 // add the created directory to be watched 557 watcher.watchRecurse(x.path); 558 return true; 559 })); 560 561 write("test2/subdir/c.txt", "abc"); 562 ev = watcher.getEvents(5.dur!"seconds"); 563 assert(ev.length == 3); 564 assert(ev[0].tryMatch!((Event.Create x) { 565 assert(x.path == AbsolutePath("test2/subdir/c.txt")); 566 return true; 567 })); 568 569 write("test2/subdir/c.txt", "\nabc"); 570 ev = watcher.getEvents(5.dur!"seconds"); 571 assert(ev.length == 2); 572 assert(ev[0].tryMatch!((Event.Modify x) { 573 assert(x.path == AbsolutePath("test2/subdir/c.txt")); 574 return true; 575 })); 576 577 rmdirRecurse("test2/subdir"); 578 ev = watcher.getEvents(5.dur!"seconds"); 579 assert(ev.length == 3); 580 foreach (e; ev) { 581 assert(ev[0].tryMatch!((Event.Delete x) { 582 assert(canFind([ 583 AbsolutePath("test2/subdir/c.txt"), 584 AbsolutePath("test2/subdir") 585 ], x.path)); 586 return true; 587 }, (Event.DeleteSelf x) { 588 assert(x.path == AbsolutePath("test2/subdir")); 589 return true; 590 })); 591 } 592 593 // removal of watched folder 594 rmdirRecurse("test2"); 595 ev = watcher.getEvents(5.dur!"seconds"); 596 assert(ev.length == 1); 597 assert(ev[0].tryMatch!((Event.DeleteSelf x) { 598 assert(x.path == AbsolutePath("test2")); 599 return true; 600 })); 601 } 602 603 struct MonitorResult { 604 enum Kind { 605 Access, 606 Attribute, 607 CloseWrite, 608 CloseNoWrite, 609 Create, 610 Delete, 611 DeleteSelf, 612 Modify, 613 MoveSelf, 614 Rename, 615 Open, 616 } 617 618 Kind kind; 619 AbsolutePath path; 620 } 621 622 /** Monitor root's for filesystem changes which create/remove/modify 623 * files/directories. 624 */ 625 struct Monitor { 626 import std.array : appender; 627 import std.file : isDir; 628 import std.utf : UTFException; 629 import my.filter : GlobFilter; 630 import my.fswatch; 631 632 private { 633 Set!AbsolutePath roots; 634 FileWatch fw; 635 GlobFilter fileFilter; 636 uint events; 637 638 // roots that has been removed that may be re-added later on. the user 639 // expects them to trigger events. 640 Set!AbsolutePath monitorRoots; 641 } 642 643 /** 644 * Params: 645 * roots = directories to recursively monitor 646 */ 647 this(AbsolutePath[] roots, GlobFilter fileFilter, uint events = ContentEvents) { 648 this.roots = toSet(roots); 649 this.fileFilter = fileFilter; 650 this.events = events; 651 652 auto app = appender!(AbsolutePath[])(); 653 fw = fileWatch(); 654 foreach (r; roots) { 655 app.put(fw.watchRecurse(r, events, (a) { 656 return isInteresting(fileFilter, a); 657 })); 658 } 659 660 logger.trace(!app.data.empty, "unable to watch ", app.data); 661 } 662 663 static bool isInteresting(GlobFilter fileFilter, string p) nothrow { 664 import my.file; 665 666 try { 667 const ap = AbsolutePath(p); 668 669 if (existsAnd!isDir(ap)) { 670 return true; 671 } 672 return fileFilter.match(ap); 673 } catch (Exception e) { 674 collectException(logger.trace(e.msg)); 675 } 676 677 return false; 678 } 679 680 /** Wait up to `timeout` for an event to occur for the monitored `roots`. 681 * 682 * Params: 683 * timeout = how long to wait for the event 684 */ 685 MonitorResult[] wait(Duration timeout) { 686 import std.array : array; 687 import std.algorithm : canFind, startsWith, filter; 688 689 auto rval = appender!(MonitorResult[])(); 690 691 { 692 auto rm = appender!(AbsolutePath[])(); 693 foreach (a; monitorRoots.toRange.filter!(a => exists(a))) { 694 fw.watchRecurse(a, events, a => isInteresting(fileFilter, a)); 695 rm.put(a); 696 rval.put(MonitorResult(MonitorResult.Kind.Create, a)); 697 } 698 foreach (a; rm.data) { 699 monitorRoots.remove(a); 700 } 701 } 702 703 if (!rval.data.empty) { 704 // collect whatever events that happend to have queued up together 705 // with the artifically created. 706 timeout = Duration.zero; 707 } 708 709 try { 710 foreach (e; fw.getEvents(timeout)) { 711 e.match!((Event.Access x) { 712 rval.put(MonitorResult(MonitorResult.Kind.Access, x.path)); 713 }, (Event.Attribute x) { 714 rval.put(MonitorResult(MonitorResult.Kind.Attribute, x.path)); 715 }, (Event.CloseWrite x) { 716 rval.put(MonitorResult(MonitorResult.Kind.CloseWrite, x.path)); 717 }, (Event.CloseNoWrite x) { 718 rval.put(MonitorResult(MonitorResult.Kind.CloseNoWrite, x.path)); 719 }, (Event.Create x) { 720 rval.put(MonitorResult(MonitorResult.Kind.Create, x.path)); 721 fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, a)); 722 }, (Event.Modify x) { 723 rval.put(MonitorResult(MonitorResult.Kind.Modify, x.path)); 724 }, (Event.MoveSelf x) { 725 rval.put(MonitorResult(MonitorResult.Kind.MoveSelf, x.path)); 726 fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, a)); 727 728 if (x.path in roots) { 729 monitorRoots.add(x.path); 730 } 731 }, (Event.Delete x) { 732 rval.put(MonitorResult(MonitorResult.Kind.Delete, x.path)); 733 }, (Event.DeleteSelf x) { 734 rval.put(MonitorResult(MonitorResult.Kind.DeleteSelf, x.path)); 735 736 if (x.path in roots) { 737 monitorRoots.add(x.path); 738 } 739 }, (Event.Rename x) { 740 rval.put(MonitorResult(MonitorResult.Kind.Rename, x.to)); 741 }, (Event.Open x) { 742 rval.put(MonitorResult(MonitorResult.Kind.Open, x.path)); 743 },); 744 } 745 } catch (Exception e) { 746 logger.trace(e.msg); 747 } 748 749 return rval.data.filter!(a => fileFilter.match(a.path)).array; 750 } 751 752 /** Collects events from the monitored `roots` over a period. 753 * 754 * Params: 755 * collectTime = for how long to clear the queue 756 */ 757 MonitorResult[] collect(Duration collectTime) { 758 import std.algorithm : max, min; 759 import std.datetime : Clock; 760 761 auto rval = appender!(MonitorResult[])(); 762 const stopAt = Clock.currTime + collectTime; 763 764 do { 765 collectTime = max(stopAt - Clock.currTime, 1.dur!"msecs"); 766 if (!monitorRoots.empty) { 767 // must use a hybrid approach of poll + inotify because if a 768 // root is added it will only be detected by polling. 769 collectTime = min(10.dur!"msecs", collectTime); 770 } 771 772 rval.put(wait(collectTime)); 773 } 774 while (Clock.currTime < stopAt); 775 776 return rval.data; 777 } 778 } 779 780 @("shall re-apply monitoring for a file that is removed") 781 unittest { 782 import my.filter : GlobFilter; 783 import my.test; 784 785 auto ta = makeTestArea("re-apply monitoring"); 786 const testTxt = ta.inSandbox("test.txt").AbsolutePath; 787 788 write(testTxt, "abc"); 789 auto fw = Monitor([testTxt], GlobFilter(["*"], null)); 790 write(testTxt, "abcc"); 791 assert(!fw.wait(Duration.zero).empty); 792 793 remove(testTxt); 794 assert(!fw.wait(Duration.zero).empty); 795 796 write(testTxt, "abcc"); 797 assert(!fw.wait(Duration.zero).empty); 798 } 799 800 /** A file descriptor to poll. 801 */ 802 struct FdPoll { 803 int value; 804 } 805 806 /// Uses the linux poll syscall to wait for activity on the file descriptors. 807 struct FdPoller { 808 import std.algorithm : min, filter; 809 810 private { 811 pollfd[] fds; 812 PollResult[] results; 813 } 814 815 void put(FdPoll fd, PollEvent[] evs) { 816 import core.sys.posix.poll; 817 818 pollfd pfd; 819 pfd.fd = fd.value; 820 foreach (e; evs) { 821 final switch (e) with (PollEvent) { 822 case in_: 823 pfd.events |= POLLIN; 824 break; 825 case out_: 826 pfd.events |= POLLOUT; 827 break; 828 } 829 } 830 fds ~= pfd; 831 832 // they must be the same length or else `wait` will fail. 833 results.length = fds.length; 834 } 835 836 void remove(FdPoll fd) { 837 fds = fds.filter!(a => a.fd != fd.value).array; 838 839 results.length = fds.length; 840 } 841 842 PollResult[] wait(Duration timeout = Duration.zero) { 843 import core.sys.posix.poll; 844 import std.bitmanip : BitArray; 845 846 const code = poll(&fds[0], fds.length, cast(int) min(int.max, timeout.total!"msecs")); 847 848 if (code < 0) { 849 import core.stdc.errno : errno, EINTR; 850 851 if (errno == EINTR) { 852 // poll just interrupted. try again. 853 return (PollResult[]).init; 854 } 855 856 throw new Exception("Failed to poll events. Error code " ~ errno.to!string); 857 } else if (code == 0) { 858 // timeout triggered 859 return (PollResult[]).init; 860 } 861 862 size_t idx; 863 foreach (a; fds.filter!(a => a.revents != 0)) { 864 PollResult res; 865 res.status = BitArray([ 866 (a.revents & POLLIN) != 0, (a.revents & POLLOUT) != 0, 867 (a.revents & POLLPRI) != 0, (a.revents & POLLERR) != 0, 868 (a.revents & POLLHUP) != 0, (a.revents & POLLNVAL) != 0, 869 ]); 870 res.fd = FdPoll(a.fd); 871 results[idx] = res; 872 idx++; 873 } 874 875 return results[0 .. idx]; 876 } 877 } 878 879 /// Type of event to poll for. 880 enum PollEvent { 881 in_, 882 out_, 883 } 884 885 /// What each bit in `PollResult.status` represent. 886 enum PollStatus { 887 // There is data to read. 888 in_, 889 // Writing is now possible, though a write larger that the available 890 // space in a socket or pipe will still block (unless O_NONBLOCK is set). 891 out_, 892 // There is some exceptional condition on the file descriptor. Possibilities include: 893 // * There is out-of-band data on a TCP socket (see tcp(7)). 894 // * A pseudoterminal master in packet mode has seen a state change on the slave (see ioctl_tty(2)). 895 // * A cgroup.events file has been modified (see cgroups(7)). 896 pri, 897 // Error condition (only returned in revents; ignored in events). This bit 898 // is also set for a file descriptor referring to the write end of a pipe 899 // when the read end has been closed. 900 error, 901 // Hang up (only returned in revents; ignored in events). Note that when 902 // reading from a channel such as a pipe or a stream socket, this event 903 // merely indicates that the peer closed its end of the channel. 904 // Subsequent reads from the channel will re‐ turn 0 (end of file) only 905 // after all outstanding data in the channel has been consumed. 906 hup, 907 /// Invalid request: fd not open (only returned in revents; ignored in events). 908 nval, 909 } 910 911 /// File descriptors that triggered. 912 struct PollResult { 913 import std.bitmanip : BitArray; 914 915 BitArray status; 916 FdPoll fd; 917 }