1 /**
2 Copyright: Copyright (c) 2020, Joakim Brännström. All rights reserved.
3 License: $(LINK2 http://www.boost.org/LICENSE_1_0.txt, Boost Software License 1.0)
4 Author: Joakim Brännström (joakim.brannstrom@gmx.com)
5 
6 This is based on webfreak's
7 [fswatch](git@github.com:WebFreak001/FSWatch.git). I had problems with the
8 API as it where because I needed to be able to watch multiple directories,
9 filter what files are to be watched and to be robust against broken symlinks.
10 
11 Lets say you want to watch a directory for changes and add all directories to
12 be watched too.
13 
14 ---
15 auto fw = fileWatch();
16 fw.watchRecurse("my_dir");
17 while (true) {
18     auto ev = fw.wait;
19     foreach (e; ev) {
20         e.match!(
21         (Event.Access x) => writeln(x),
22         (Event.Attribute x) => writeln(x),
23         (Event.CloseWrite x) => writeln(x),
24         (Event.CloseNoWrite x) => writeln(x),
25         (Event.Create x) { fw.watchRecurse(x.path); },
26         (Event.Delete x) => writeln(x),
27         (Event.DeleteSelf x) => writeln(x),
28         (Event.Modify x) => writeln(x),
29         (Event.MoveSelf x) => writeln(x),
30         (Event.Rename x) => writeln(x),
31         (Event.Open x) => writeln(x),
32         );
33     }
34 }
35 ---
36 */
37 module my.fswatch;
38 
39 import core.sys.linux.errno : errno;
40 import core.sys.linux.fcntl : fcntl, F_SETFD, FD_CLOEXEC;
41 import core.sys.linux.sys.inotify : inotify_rm_watch, inotify_init1, inotify_add_watch, inotify_event, IN_CLOEXEC,
42     IN_NONBLOCK, IN_ACCESS, IN_MODIFY, IN_ATTRIB, IN_CLOSE_WRITE,
43     IN_CLOSE_NOWRITE, IN_OPEN, IN_MOVED_FROM, IN_MOVED_TO,
44     IN_CREATE, IN_DELETE, IN_DELETE_SELF, IN_MOVE_SELF, IN_UNMOUNT, IN_IGNORED, IN_EXCL_UNLINK;
45 import core.sys.linux.unistd : close, read;
46 import core.sys.posix.poll : pollfd, poll, POLLIN, POLLNVAL;
47 import core.thread : Thread;
48 import core.time : dur, Duration;
49 import logger = std.experimental.logger;
50 import std.array : appender, empty, array;
51 import std.conv : to;
52 import std.file : DirEntry, isDir, dirEntries, rmdirRecurse, write, append,
53     rename, remove, exists, SpanMode, mkdir, rmdir;
54 import std.path : buildPath;
55 import std.range : isInputRange;
56 import std..string : toStringz, fromStringz;
57 import std.exception : collectException;
58 
59 import sumtype;
60 
61 import my.path : AbsolutePath, Path;
62 import my.set;
63 
64 struct Event {
65     /// File was accessed (e.g., read(2), execve(2)).
66     static struct Access {
67         AbsolutePath path;
68         this(this) {
69         }
70     }
71 
72     /** Metadata changed—for example, permissions (e.g., chmod(2)), timestamps
73      * (e.g., utimensat(2)), extended attributes (setxattr(2)), link count
74      * (since Linux 2.6.25; e.g., for the target of link(2) and for unlink(2)),
75      * and user/group ID (e.g., chown(2)).
76      */
77     static struct Attribute {
78         AbsolutePath path;
79         this(this) {
80         }
81     }
82 
83     /// File opened for writing was closed.
84     static struct CloseWrite {
85         AbsolutePath path;
86         this(this) {
87         }
88     }
89 
90     /// File or directory not opened for writing was closed.
91     static struct CloseNoWrite {
92         AbsolutePath path;
93         this(this) {
94         }
95     }
96 
97     /** File/directory created in watched directory (e.g., open(2) O_CREAT,
98      * mkdir(2), link(2), symlink(2), bind(2) on a UNIX domain socket).
99      */
100     static struct Create {
101         AbsolutePath path;
102         this(this) {
103         }
104     }
105 
106     /// File/directory deleted from watched directory.
107     static struct Delete {
108         AbsolutePath path;
109         this(this) {
110         }
111     }
112 
113     /** Watched file/directory was itself deleted. (This event also occurs if
114      * an object is moved to another filesystem, since mv(1) in effect copies
115      * the file to the other filesystem and then deletes it from the original
116      * filesys‐ tem.)  In addition, an IN_IGNORED event will subsequently be
117      * generated for the watch descriptor.
118      */
119     static struct DeleteSelf {
120         AbsolutePath path;
121         this(this) {
122         }
123     }
124 
125     /// File was modified (e.g., write(2), truncate(2)).
126     static struct Modify {
127         AbsolutePath path;
128         this(this) {
129         }
130     }
131 
132     /// Watched file/directory was itself moved.
133     static struct MoveSelf {
134         AbsolutePath path;
135         this(this) {
136         }
137     }
138 
139     /// Occurs when a file or folder inside a folder is renamed.
140     static struct Rename {
141         AbsolutePath from;
142         AbsolutePath to;
143         this(this) {
144         }
145     }
146 
147     /// File or directory was opened.
148     static struct Open {
149         AbsolutePath path;
150         this(this) {
151         }
152     }
153 }
154 
155 alias FileChangeEvent = SumType!(Event.Access, Event.Attribute, Event.CloseWrite,
156         Event.CloseNoWrite, Event.Create, Event.Delete, Event.DeleteSelf,
157         Event.Modify, Event.MoveSelf, Event.Rename, Event.Open);
158 
159 /// Construct a FileWatch.
160 auto fileWatch() {
161     int fd = inotify_init1(IN_NONBLOCK | IN_CLOEXEC);
162     if (fd == -1) {
163         throw new Exception(
164                 "inotify_init1 returned invalid file descriptor. Error code " ~ errno.to!string);
165     }
166     return FileWatch(fd);
167 }
168 
169 /// Listens for create/modify/removal of files and directories.
170 enum ContentEvents = IN_CREATE | IN_DELETE | IN_DELETE_SELF | IN_MODIFY
171     | IN_MOVE_SELF | IN_MOVED_FROM | IN_MOVED_TO | IN_EXCL_UNLINK | IN_CLOSE_WRITE;
172 
173 /// Listen for events that change the metadata.
174 enum MetadataEvents = IN_ACCESS | IN_ATTRIB | IN_OPEN | IN_CLOSE_NOWRITE | IN_EXCL_UNLINK;
175 
176 /** An instance of a FileWatcher
177  */
178 struct FileWatch {
179     import std.functional : toDelegate;
180 
181     private {
182         FdPoller poller;
183         int fd;
184         ubyte[1024 * 4] eventBuffer; // 4kb buffer for events
185         struct FDInfo {
186             int wd;
187             bool watched;
188             Path path;
189 
190             this(this) {
191             }
192         }
193 
194         FDInfo[int] directoryMap; // map every watch descriptor to a directory
195     }
196 
197     private this(int fd) {
198         this.fd = fd;
199         poller.put(FdPoll(fd), [PollEvent.in_]);
200     }
201 
202     ~this() {
203         if (fd) {
204             foreach (fdinfo; directoryMap.byValue) {
205                 if (fdinfo.watched)
206                     inotify_rm_watch(fd, fdinfo.wd);
207             }
208             close(fd);
209         }
210     }
211 
212     /** Add a path to watch for events.
213      *
214      * Params:
215      *  path = path to watch
216      *  events = events to watch for. See man inotify and core.sys.linux.sys.inotify.
217      *
218      * Returns: true if the path was successfully added.
219      */
220     bool watch(Path path, uint events = ContentEvents) {
221         const wd = inotify_add_watch(fd, path.toStringz, events);
222         if (wd != -1) {
223             const fc = fcntl(fd, F_SETFD, FD_CLOEXEC);
224             if (fc != -1) {
225                 directoryMap[wd] = FDInfo(wd, true, path);
226                 return true;
227             }
228         }
229 
230         return false;
231     }
232 
233     ///
234     bool watch(string p, uint events = ContentEvents) {
235         return watch(Path(p));
236     }
237 
238     private static bool allFiles(string p) {
239         return true;
240     }
241 
242     /** Recursively add the path and all its subdirectories and files to be watched.
243      *
244      * Params:
245      *  pred = only those files and directories that `pred` returns true for are watched, by default every file/directory.
246      *  root = directory to watch together with its content and subdirectories.
247      *  events = events to watch for. See man inotify and core.sys.linux.sys.inotify.
248      *
249      * Returns: paths that failed to be added.
250      */
251     AbsolutePath[] watchRecurse(Path root, uint events = ContentEvents,
252             bool delegate(string) pred = toDelegate(&allFiles)) {
253         import std.algorithm : filter;
254         import my.file : existsAnd;
255 
256         auto failed = appender!(AbsolutePath[])();
257 
258         if (!watch(root, events)) {
259             failed.put(AbsolutePath(root));
260         }
261 
262         if (!existsAnd!isDir(root)) {
263             return failed.data;
264         }
265 
266         auto dirs = [AbsolutePath(root)];
267         Set!AbsolutePath visited;
268         while (!dirs.empty) {
269             auto front = dirs[0];
270             dirs = dirs[1 .. $];
271             if (front in visited)
272                 continue;
273             visited.add(front);
274 
275             try {
276                 foreach (p; dirEntries(front, SpanMode.shallow).filter!(a => pred(a.name))) {
277                     if (!watch(Path(p.name), events)) {
278                         failed.put(AbsolutePath(p.name));
279                     }
280                     if (existsAnd!isDir(Path(p.name))) {
281                         dirs ~= AbsolutePath(p.name);
282                     }
283                 }
284             } catch (Exception e) {
285                 () @trusted { logger.trace(e); }();
286                 logger.trace(e.msg);
287                 failed.put(AbsolutePath(front));
288             }
289         }
290 
291         return failed.data;
292     }
293 
294     ///
295     AbsolutePath[] watchRecurse(string root, uint events = ContentEvents,
296             bool delegate(string) pred = toDelegate(&allFiles)) {
297         return watchRecurse(Path(root), events, pred);
298     }
299 
300     /** The events that have occured since last query.
301      *
302      * Params:
303      *  timeout = max time to wait for events.
304      *
305      * Returns: the events that has occured to the watched paths.
306      */
307     FileChangeEvent[] getEvents(Duration timeout = Duration.zero) {
308         import std.algorithm : min;
309 
310         FileChangeEvent[] events;
311         if (!fd)
312             return events;
313 
314         auto res = poller.wait(timeout);
315 
316         if (res.empty) {
317             return events;
318         }
319 
320         if (res[0].status[PollStatus.nval]) {
321             throw new Exception("Failed to poll events. File descriptor not open " ~ fd.to!string);
322         }
323 
324         if (!res[0].status[PollStatus.in_]) {
325             // no events to read
326             return events;
327         }
328 
329         const receivedBytes = read(fd, eventBuffer.ptr, eventBuffer.length);
330         int i = 0;
331         AbsolutePath[uint] cookie;
332         while (true) {
333             auto info = cast(inotify_event*)(eventBuffer.ptr + i);
334 
335             if (info.wd !in directoryMap)
336                 continue;
337 
338             auto fname = () {
339                 string fileName = info.name.ptr.fromStringz.idup;
340                 return AbsolutePath(buildPath(directoryMap[info.wd].path, fileName));
341             }();
342 
343             if ((info.mask & IN_MOVED_TO) == 0) {
344                 if (auto v = info.cookie in cookie) {
345                     events ~= FileChangeEvent(Event.Delete(*v));
346                     cookie.remove(info.cookie);
347                 }
348             }
349 
350             if ((info.mask & IN_ACCESS) != 0) {
351                 events ~= FileChangeEvent(Event.Access(fname));
352             }
353 
354             if ((info.mask & IN_ATTRIB) != 0) {
355                 events ~= FileChangeEvent(Event.Attribute(fname));
356             }
357 
358             if ((info.mask & IN_CLOSE_WRITE) != 0) {
359                 events ~= FileChangeEvent(Event.CloseWrite(fname));
360             }
361 
362             if ((info.mask & IN_CLOSE_NOWRITE) != 0) {
363                 events ~= FileChangeEvent(Event.CloseNoWrite(fname));
364             }
365 
366             if ((info.mask & IN_CREATE) != 0) {
367                 events ~= FileChangeEvent(Event.Create(fname));
368             }
369 
370             if ((info.mask & IN_DELETE) != 0) {
371                 events ~= FileChangeEvent(Event.Delete(fname));
372             }
373 
374             if ((info.mask & IN_DELETE_SELF) != 0) {
375                 // must go via the mapping or there may be trailing junk in fname.
376                 events ~= FileChangeEvent(Event.DeleteSelf(directoryMap[info.wd].path.AbsolutePath));
377             }
378 
379             if ((info.mask & IN_MODIFY) != 0) {
380                 events ~= FileChangeEvent(Event.Modify(fname));
381             }
382 
383             if ((info.mask & IN_MOVE_SELF) != 0) {
384                 // must go via the mapping or there may be trailing junk in fname.
385                 events ~= FileChangeEvent(Event.MoveSelf(directoryMap[info.wd].path.AbsolutePath));
386             }
387 
388             if ((info.mask & IN_MOVED_FROM) != 0) {
389                 cookie[info.cookie] = fname;
390             }
391 
392             if ((info.mask & IN_MOVED_TO) != 0) {
393                 if (auto v = info.cookie in cookie) {
394                     events ~= FileChangeEvent(Event.Rename(*v, fname));
395                     cookie.remove(info.cookie);
396                 } else {
397                     events ~= FileChangeEvent(Event.Create(fname));
398                 }
399             }
400 
401             if ((info.mask & IN_DELETE_SELF) != 0 || (info.mask & IN_MOVE_SELF) != 0) {
402                 inotify_rm_watch(fd, info.wd);
403                 directoryMap[info.wd].watched = false;
404             }
405 
406             i += inotify_event.sizeof + info.len;
407 
408             if (i >= receivedBytes)
409                 break;
410         }
411 
412         foreach (c; cookie.byValue) {
413             events ~= FileChangeEvent(Event.Delete(AbsolutePath(c)));
414         }
415 
416         return events;
417     }
418 }
419 
420 ///
421 unittest {
422     import core.thread;
423 
424     if (exists("test"))
425         rmdirRecurse("test");
426     scope (exit) {
427         if (exists("test"))
428             rmdirRecurse("test");
429     }
430 
431     auto watcher = fileWatch();
432 
433     mkdir("test");
434     assert(watcher.watch("test"));
435 
436     write("test/a.txt", "abc");
437     auto ev = watcher.getEvents(5.dur!"seconds");
438     assert(ev.length > 0);
439     assert(ev[0].tryMatch!((Event.Create x) {
440             assert(x.path == AbsolutePath("test/a.txt"));
441             return true;
442         }));
443 
444     append("test/a.txt", "def");
445     ev = watcher.getEvents(5.dur!"seconds");
446     assert(ev.length > 0);
447     assert(ev[0].tryMatch!((Event.Modify x) {
448             assert(x.path == AbsolutePath("test/a.txt"));
449             return true;
450         }));
451 
452     rename("test/a.txt", "test/b.txt");
453     ev = watcher.getEvents(5.dur!"seconds");
454     assert(ev.length > 0);
455     assert(ev[0].tryMatch!((Event.Rename x) {
456             assert(x.from == AbsolutePath("test/a.txt"));
457             assert(x.to == AbsolutePath("test/b.txt"));
458             return true;
459         }));
460 
461     remove("test/b.txt");
462     ev = watcher.getEvents(5.dur!"seconds");
463     assert(ev.length > 0);
464     assert(ev[0].tryMatch!((Event.Delete x) {
465             assert(x.path == AbsolutePath("test/b.txt"));
466             return true;
467         }));
468 
469     rmdirRecurse("test");
470     ev = watcher.getEvents(5.dur!"seconds");
471     assert(ev.length > 0);
472     assert(ev[0].tryMatch!((Event.DeleteSelf x) {
473             assert(x.path == AbsolutePath("test"));
474             return true;
475         }));
476 }
477 
478 ///
479 unittest {
480     import std.algorithm : canFind;
481 
482     if (exists("test2"))
483         rmdirRecurse("test2");
484     if (exists("test3"))
485         rmdirRecurse("test3");
486     scope (exit) {
487         if (exists("test2"))
488             rmdirRecurse("test2");
489         if (exists("test3"))
490             rmdirRecurse("test3");
491     }
492 
493     auto watcher = fileWatch();
494     mkdir("test2");
495     assert(watcher.watchRecurse("test2").length == 0);
496 
497     write("test2/a.txt", "abc");
498     auto ev = watcher.getEvents(5.dur!"seconds");
499     assert(ev.length == 3);
500     assert(ev[0].tryMatch!((Event.Create x) {
501             assert(x.path == AbsolutePath("test2/a.txt"));
502             return true;
503         }));
504     assert(ev[1].tryMatch!((Event.Modify x) {
505             assert(x.path == AbsolutePath("test2/a.txt"));
506             return true;
507         }));
508     assert(ev[2].tryMatch!((Event.CloseWrite x) {
509             assert(x.path == AbsolutePath("test2/a.txt"));
510             return true;
511         }));
512 
513     rename("test2/a.txt", "./testfile-a.txt");
514     ev = watcher.getEvents(5.dur!"seconds");
515     assert(ev.length == 1);
516     assert(ev[0].tryMatch!((Event.Delete x) {
517             assert(x.path == AbsolutePath("test2/a.txt"));
518             return true;
519         }));
520 
521     rename("./testfile-a.txt", "test2/b.txt");
522     ev = watcher.getEvents(5.dur!"seconds");
523     assert(ev.length == 1);
524     assert(ev[0].tryMatch!((Event.Create x) {
525             assert(x.path == AbsolutePath("test2/b.txt"));
526             return true;
527         }));
528 
529     remove("test2/b.txt");
530     ev = watcher.getEvents(5.dur!"seconds");
531     assert(ev.length == 1);
532     assert(ev[0].tryMatch!((Event.Delete x) {
533             assert(x.path == AbsolutePath("test2/b.txt"));
534             return true;
535         }));
536 
537     mkdir("test2/mydir");
538     rmdir("test2/mydir");
539     ev = watcher.getEvents(5.dur!"seconds");
540     assert(ev.length == 2);
541     assert(ev[0].tryMatch!((Event.Create x) {
542             assert(x.path == AbsolutePath("test2/mydir"));
543             return true;
544         }));
545     assert(ev[1].tryMatch!((Event.Delete x) {
546             assert(x.path == AbsolutePath("test2/mydir"));
547             return true;
548         }));
549 
550     // test for creation, modification, removal of subdirectory
551     mkdir("test2/subdir");
552     ev = watcher.getEvents(5.dur!"seconds");
553     assert(ev.length == 1);
554     assert(ev[0].tryMatch!((Event.Create x) {
555             assert(x.path == AbsolutePath("test2/subdir"));
556             // add the created directory to be watched
557             watcher.watchRecurse(x.path);
558             return true;
559         }));
560 
561     write("test2/subdir/c.txt", "abc");
562     ev = watcher.getEvents(5.dur!"seconds");
563     assert(ev.length == 3);
564     assert(ev[0].tryMatch!((Event.Create x) {
565             assert(x.path == AbsolutePath("test2/subdir/c.txt"));
566             return true;
567         }));
568 
569     write("test2/subdir/c.txt", "\nabc");
570     ev = watcher.getEvents(5.dur!"seconds");
571     assert(ev.length == 2);
572     assert(ev[0].tryMatch!((Event.Modify x) {
573             assert(x.path == AbsolutePath("test2/subdir/c.txt"));
574             return true;
575         }));
576 
577     rmdirRecurse("test2/subdir");
578     ev = watcher.getEvents(5.dur!"seconds");
579     assert(ev.length == 3);
580     foreach (e; ev) {
581         assert(ev[0].tryMatch!((Event.Delete x) {
582                 assert(canFind([
583                     AbsolutePath("test2/subdir/c.txt"),
584                     AbsolutePath("test2/subdir")
585                 ], x.path));
586                 return true;
587             }, (Event.DeleteSelf x) {
588                 assert(x.path == AbsolutePath("test2/subdir"));
589                 return true;
590             }));
591     }
592 
593     // removal of watched folder
594     rmdirRecurse("test2");
595     ev = watcher.getEvents(5.dur!"seconds");
596     assert(ev.length == 1);
597     assert(ev[0].tryMatch!((Event.DeleteSelf x) {
598             assert(x.path == AbsolutePath("test2"));
599             return true;
600         }));
601 }
602 
603 struct MonitorResult {
604     enum Kind {
605         Access,
606         Attribute,
607         CloseWrite,
608         CloseNoWrite,
609         Create,
610         Delete,
611         DeleteSelf,
612         Modify,
613         MoveSelf,
614         Rename,
615         Open,
616     }
617 
618     Kind kind;
619     AbsolutePath path;
620 }
621 
622 /** Monitor root's for filesystem changes which create/remove/modify
623  * files/directories.
624  */
625 struct Monitor {
626     import std.array : appender;
627     import std.file : isDir;
628     import std.utf : UTFException;
629     import my.filter : GlobFilter;
630     import my.fswatch;
631 
632     private {
633         Set!AbsolutePath roots;
634         FileWatch fw;
635         GlobFilter fileFilter;
636         uint events;
637 
638         // roots that has been removed that may be re-added later on. the user
639         // expects them to trigger events.
640         Set!AbsolutePath monitorRoots;
641     }
642 
643     /**
644      * Params:
645      *  roots = directories to recursively monitor
646      */
647     this(AbsolutePath[] roots, GlobFilter fileFilter, uint events = ContentEvents) {
648         this.roots = toSet(roots);
649         this.fileFilter = fileFilter;
650         this.events = events;
651 
652         auto app = appender!(AbsolutePath[])();
653         fw = fileWatch();
654         foreach (r; roots) {
655             app.put(fw.watchRecurse(r, events, (a) {
656                     return isInteresting(fileFilter, a);
657                 }));
658         }
659 
660         logger.trace(!app.data.empty, "unable to watch ", app.data);
661     }
662 
663     static bool isInteresting(GlobFilter fileFilter, string p) nothrow {
664         import my.file;
665 
666         try {
667             const ap = AbsolutePath(p);
668 
669             if (existsAnd!isDir(ap)) {
670                 return true;
671             }
672             return fileFilter.match(ap);
673         } catch (Exception e) {
674             collectException(logger.trace(e.msg));
675         }
676 
677         return false;
678     }
679 
680     /** Wait up to `timeout` for an event to occur for the monitored `roots`.
681      *
682      * Params:
683      *  timeout = how long to wait for the event
684      */
685     MonitorResult[] wait(Duration timeout) {
686         import std.array : array;
687         import std.algorithm : canFind, startsWith, filter;
688 
689         auto rval = appender!(MonitorResult[])();
690 
691         {
692             auto rm = appender!(AbsolutePath[])();
693             foreach (a; monitorRoots.toRange.filter!(a => exists(a))) {
694                 fw.watchRecurse(a, events, a => isInteresting(fileFilter, a));
695                 rm.put(a);
696                 rval.put(MonitorResult(MonitorResult.Kind.Create, a));
697             }
698             foreach (a; rm.data) {
699                 monitorRoots.remove(a);
700             }
701         }
702 
703         if (!rval.data.empty) {
704             // collect whatever events that happend to have queued up together
705             // with the artifically created.
706             timeout = Duration.zero;
707         }
708 
709         try {
710             foreach (e; fw.getEvents(timeout)) {
711                 e.match!((Event.Access x) {
712                     rval.put(MonitorResult(MonitorResult.Kind.Access, x.path));
713                 }, (Event.Attribute x) {
714                     rval.put(MonitorResult(MonitorResult.Kind.Attribute, x.path));
715                 }, (Event.CloseWrite x) {
716                     rval.put(MonitorResult(MonitorResult.Kind.CloseWrite, x.path));
717                 }, (Event.CloseNoWrite x) {
718                     rval.put(MonitorResult(MonitorResult.Kind.CloseNoWrite, x.path));
719                 }, (Event.Create x) {
720                     rval.put(MonitorResult(MonitorResult.Kind.Create, x.path));
721                     fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, a));
722                 }, (Event.Modify x) {
723                     rval.put(MonitorResult(MonitorResult.Kind.Modify, x.path));
724                 }, (Event.MoveSelf x) {
725                     rval.put(MonitorResult(MonitorResult.Kind.MoveSelf, x.path));
726                     fw.watchRecurse(x.path, events, a => isInteresting(fileFilter, a));
727 
728                     if (x.path in roots) {
729                         monitorRoots.add(x.path);
730                     }
731                 }, (Event.Delete x) {
732                     rval.put(MonitorResult(MonitorResult.Kind.Delete, x.path));
733                 }, (Event.DeleteSelf x) {
734                     rval.put(MonitorResult(MonitorResult.Kind.DeleteSelf, x.path));
735 
736                     if (x.path in roots) {
737                         monitorRoots.add(x.path);
738                     }
739                 }, (Event.Rename x) {
740                     rval.put(MonitorResult(MonitorResult.Kind.Rename, x.to));
741                 }, (Event.Open x) {
742                     rval.put(MonitorResult(MonitorResult.Kind.Open, x.path));
743                 },);
744             }
745         } catch (Exception e) {
746             logger.trace(e.msg);
747         }
748 
749         return rval.data.filter!(a => fileFilter.match(a.path)).array;
750     }
751 
752     /** Collects events from the monitored `roots` over a period.
753      *
754      * Params:
755      *  collectTime = for how long to clear the queue
756      */
757     MonitorResult[] collect(Duration collectTime) {
758         import std.algorithm : max, min;
759         import std.datetime : Clock;
760 
761         auto rval = appender!(MonitorResult[])();
762         const stopAt = Clock.currTime + collectTime;
763 
764         do {
765             collectTime = max(stopAt - Clock.currTime, 1.dur!"msecs");
766             if (!monitorRoots.empty) {
767                 // must use a hybrid approach of poll + inotify because if a
768                 // root is added it will only be detected by polling.
769                 collectTime = min(10.dur!"msecs", collectTime);
770             }
771 
772             rval.put(wait(collectTime));
773         }
774         while (Clock.currTime < stopAt);
775 
776         return rval.data;
777     }
778 }
779 
780 @("shall re-apply monitoring for a file that is removed")
781 unittest {
782     import my.filter : GlobFilter;
783     import my.test;
784 
785     auto ta = makeTestArea("re-apply monitoring");
786     const testTxt = ta.inSandbox("test.txt").AbsolutePath;
787 
788     write(testTxt, "abc");
789     auto fw = Monitor([testTxt], GlobFilter(["*"], null));
790     write(testTxt, "abcc");
791     assert(!fw.wait(Duration.zero).empty);
792 
793     remove(testTxt);
794     assert(!fw.wait(Duration.zero).empty);
795 
796     write(testTxt, "abcc");
797     assert(!fw.wait(Duration.zero).empty);
798 }
799 
800 /** A file descriptor to poll.
801  */
802 struct FdPoll {
803     int value;
804 }
805 
806 /// Uses the linux poll syscall to wait for activity on the file descriptors.
807 struct FdPoller {
808     import std.algorithm : min, filter;
809 
810     private {
811         pollfd[] fds;
812         PollResult[] results;
813     }
814 
815     void put(FdPoll fd, PollEvent[] evs) {
816         import core.sys.posix.poll;
817 
818         pollfd pfd;
819         pfd.fd = fd.value;
820         foreach (e; evs) {
821             final switch (e) with (PollEvent) {
822             case in_:
823                 pfd.events |= POLLIN;
824                 break;
825             case out_:
826                 pfd.events |= POLLOUT;
827                 break;
828             }
829         }
830         fds ~= pfd;
831 
832         // they must be the same length or else `wait` will fail.
833         results.length = fds.length;
834     }
835 
836     void remove(FdPoll fd) {
837         fds = fds.filter!(a => a.fd != fd.value).array;
838 
839         results.length = fds.length;
840     }
841 
842     PollResult[] wait(Duration timeout = Duration.zero) {
843         import core.sys.posix.poll;
844         import std.bitmanip : BitArray;
845 
846         const code = poll(&fds[0], fds.length, cast(int) min(int.max, timeout.total!"msecs"));
847 
848         if (code < 0) {
849             import core.stdc.errno : errno, EINTR;
850 
851             if (errno == EINTR) {
852                 // poll just interrupted. try again.
853                 return (PollResult[]).init;
854             }
855 
856             throw new Exception("Failed to poll events. Error code " ~ errno.to!string);
857         } else if (code == 0) {
858             // timeout triggered
859             return (PollResult[]).init;
860         }
861 
862         size_t idx;
863         foreach (a; fds.filter!(a => a.revents != 0)) {
864             PollResult res;
865             res.status = BitArray([
866                     (a.revents & POLLIN) != 0, (a.revents & POLLOUT) != 0,
867                     (a.revents & POLLPRI) != 0, (a.revents & POLLERR) != 0,
868                     (a.revents & POLLHUP) != 0, (a.revents & POLLNVAL) != 0,
869                     ]);
870             res.fd = FdPoll(a.fd);
871             results[idx] = res;
872             idx++;
873         }
874 
875         return results[0 .. idx];
876     }
877 }
878 
879 /// Type of event to poll for.
880 enum PollEvent {
881     in_,
882     out_,
883 }
884 
885 /// What each bit in `PollResult.status` represent.
886 enum PollStatus {
887     // There is data to read.
888     in_,
889     // Writing  is  now  possible,  though a write larger that the available
890     // space in a socket or pipe will still block (unless O_NONBLOCK is set).
891     out_,
892     // There is some exceptional condition on the file descriptor.  Possibilities include:
893     // *  There is out-of-band data on a TCP socket (see tcp(7)).
894     // *  A pseudoterminal master in packet mode has seen a state change on the slave (see ioctl_tty(2)).
895     // *  A cgroup.events file has been modified (see cgroups(7)).
896     pri,
897     // Error condition (only returned in revents; ignored in events). This bit
898     // is also set for a file descriptor referring to the write end of a pipe
899     // when the read end has been closed.
900     error,
901     // Hang up (only returned in revents; ignored in events). Note that when
902     // reading from a channel such as a pipe or a stream socket, this event
903     // merely indicates that the peer closed its end of the channel.
904     // Subsequent reads from the channel will re‐ turn 0 (end of file) only
905     // after all outstanding data in the channel has been consumed.
906     hup,
907     /// Invalid request: fd not open (only returned in revents; ignored in events).
908     nval,
909 }
910 
911 /// File descriptors that triggered.
912 struct PollResult {
913     import std.bitmanip : BitArray;
914 
915     BitArray status;
916     FdPoll fd;
917 }