@@ -7,29 +7,28 @@ library watcher.directory_watcher;
7
7
import 'dart:async' ;
8
8
import 'dart:io' ;
9
9
10
- import 'package:crypto/crypto.dart' ;
11
-
12
- import 'async_queue.dart' ;
13
- import 'stat.dart' ;
14
- import 'utils.dart' ;
15
10
import 'watch_event.dart' ;
11
+ import 'directory_watcher/linux.dart' ;
12
+ import 'directory_watcher/polling.dart' ;
16
13
17
14
/// Watches the contents of a directory and emits [WatchEvent] s when something
18
15
/// in the directory has changed.
19
- class DirectoryWatcher {
16
+ abstract class DirectoryWatcher {
20
17
/// The directory whose contents are being monitored.
21
- final String directory;
18
+ String get directory;
22
19
23
20
/// The broadcast [Stream] of events that have occurred to files in
24
21
/// [directory] .
25
22
///
26
23
/// Changes will only be monitored while this stream has subscribers. Any
27
24
/// file changes that occur during periods when there are no subscribers
28
25
/// will not be reported the next time a subscriber is added.
29
- Stream <WatchEvent > get events => _events.stream;
30
- StreamController <WatchEvent > _events;
26
+ Stream <WatchEvent > get events;
31
27
32
- _WatchState _state = _WatchState .UNSUBSCRIBED ;
28
+ /// Whether the watcher is initialized and watching for file changes.
29
+ ///
30
+ /// This is true if and only if [ready] is complete.
31
+ bool get isReady;
33
32
34
33
/// A [Future] that completes when the watcher is initialized and watching
35
34
/// for file changes.
@@ -41,241 +40,20 @@ class DirectoryWatcher {
41
40
///
42
41
/// If the watcher is already monitoring, this returns an already complete
43
42
/// future.
44
- Future get ready => _ready.future;
45
- Completer _ready = new Completer ();
46
-
47
- /// The amount of time the watcher pauses between successive polls of the
48
- /// directory contents.
49
- final Duration pollingDelay;
50
-
51
- /// The previous status of the files in the directory.
52
- ///
53
- /// Used to tell which files have been modified.
54
- final _statuses = new Map <String , _FileStatus >();
55
-
56
- /// The subscription used while [directory] is being listed.
57
- ///
58
- /// Will be `null` if a list is not currently happening.
59
- StreamSubscription <FileSystemEntity > _listSubscription;
60
-
61
- /// The queue of files waiting to be processed to see if they have been
62
- /// modified.
63
- ///
64
- /// Processing a file is asynchronous, as is listing the directory, so the
65
- /// queue exists to let each of those proceed at their own rate. The lister
66
- /// will enqueue files as quickly as it can. Meanwhile, files are dequeued
67
- /// and processed sequentially.
68
- AsyncQueue <String > _filesToProcess;
69
-
70
- /// The set of files that have been seen in the current directory listing.
71
- ///
72
- /// Used to tell which files have been removed: files that are in [_statuses]
73
- /// but not in here when a poll completes have been removed.
74
- final _polledFiles = new Set <String >();
43
+ Future get ready;
75
44
76
45
/// Creates a new [DirectoryWatcher] monitoring [directory] .
77
46
///
78
- /// If [pollingDelay] is passed, it specifies the amount of time the watcher
79
- /// will pause between successive polls of the directory contents. Making
80
- /// this shorter will give more immediate feedback at the expense of doing
81
- /// more IO and higher CPU usage. Defaults to one second.
82
- DirectoryWatcher (this .directory, {Duration pollingDelay})
83
- : pollingDelay = pollingDelay != null ? pollingDelay :
84
- new Duration (seconds: 1 ) {
85
- _events = new StreamController <WatchEvent >.broadcast (
86
- onListen: _watch, onCancel: _cancel);
87
-
88
- _filesToProcess = new AsyncQueue <String >(_processFile,
89
- onError: _events.addError);
90
- }
91
-
92
- /// Scans to see which files were already present before the watcher was
93
- /// subscribed to, and then starts watching the directory for changes.
94
- void _watch () {
95
- assert (_state == _WatchState .UNSUBSCRIBED );
96
- _state = _WatchState .SCANNING ;
97
- _poll ();
98
- }
99
-
100
- /// Stops watching the directory when there are no more subscribers.
101
- void _cancel () {
102
- assert (_state != _WatchState .UNSUBSCRIBED );
103
- _state = _WatchState .UNSUBSCRIBED ;
104
-
105
- // If we're in the middle of listing the directory, stop.
106
- if (_listSubscription != null ) _listSubscription.cancel ();
107
-
108
- // Don't process any remaining files.
109
- _filesToProcess.clear ();
110
- _polledFiles.clear ();
111
- _statuses.clear ();
112
-
113
- _ready = new Completer ();
114
- }
115
-
116
- /// Scans the contents of the directory once to see which files have been
117
- /// added, removed, and modified.
118
- void _poll () {
119
- _filesToProcess.clear ();
120
- _polledFiles.clear ();
121
-
122
- endListing () {
123
- assert (_state != _WatchState .UNSUBSCRIBED );
124
- _listSubscription = null ;
125
-
126
- // Null tells the queue consumer that we're done listing.
127
- _filesToProcess.add (null );
128
- }
129
-
130
- var stream = new Directory (directory).list (recursive: true );
131
- _listSubscription = stream.listen ((entity) {
132
- assert (_state != _WatchState .UNSUBSCRIBED );
133
-
134
- if (entity is ! File ) return ;
135
- _filesToProcess.add (entity.path);
136
- }, onError: (error, StackTrace stackTrace) {
137
- if (! isDirectoryNotFoundException (error)) {
138
- // It's some unknown error. Pipe it over to the event stream so the
139
- // user can see it.
140
- _events.addError (error, stackTrace);
141
- }
142
-
143
- // When an error occurs, we end the listing normally, which has the
144
- // desired effect of marking all files that were in the directory as
145
- // being removed.
146
- endListing ();
147
- }, onDone: endListing, cancelOnError: true );
148
- }
149
-
150
- /// Processes [file] to determine if it has been modified since the last
151
- /// time it was scanned.
152
- Future _processFile (String file) {
153
- assert (_state != _WatchState .UNSUBSCRIBED );
154
-
155
- // `null` is the sentinel which means the directory listing is complete.
156
- if (file == null ) return _completePoll ();
157
-
158
- return getModificationTime (file).then ((modified) {
159
- if (_checkForCancel ()) return null ;
160
-
161
- var lastStatus = _statuses[file];
162
-
163
- // If its modification time hasn't changed, assume the file is unchanged.
164
- if (lastStatus != null && lastStatus.modified == modified) {
165
- // The file is still here.
166
- _polledFiles.add (file);
167
- return null ;
168
- }
169
-
170
- return _hashFile (file).then ((hash) {
171
- if (_checkForCancel ()) return ;
172
-
173
- var status = new _FileStatus (modified, hash);
174
- _statuses[file] = status;
175
- _polledFiles.add (file);
176
-
177
- // Only notify while in the watching state.
178
- if (_state != _WatchState .WATCHING ) return ;
179
-
180
- // And the file is different.
181
- var changed = lastStatus == null || ! _sameHash (lastStatus.hash, hash);
182
- if (! changed) return ;
183
-
184
- var type = lastStatus == null ? ChangeType .ADD : ChangeType .MODIFY ;
185
- _events.add (new WatchEvent (type, file));
186
- });
187
- });
188
- }
189
-
190
- /// After the directory listing is complete, this determines which files were
191
- /// removed and then restarts the next poll.
192
- Future _completePoll () {
193
- // Any files that were not seen in the last poll but that we have a
194
- // status for must have been removed.
195
- var removedFiles = _statuses.keys.toSet ().difference (_polledFiles);
196
- for (var removed in removedFiles) {
197
- if (_state == _WatchState .WATCHING ) {
198
- _events.add (new WatchEvent (ChangeType .REMOVE , removed));
199
- }
200
- _statuses.remove (removed);
201
- }
202
-
203
- if (_state == _WatchState .SCANNING ) {
204
- _state = _WatchState .WATCHING ;
205
- _ready.complete ();
206
- }
207
-
208
- // Wait and then poll again.
209
- return new Future .delayed (pollingDelay).then ((_) {
210
- if (_checkForCancel ()) return ;
211
- _poll ();
212
- });
213
- }
214
-
215
- /// Returns `true` and clears the processing queue if the watcher has been
216
- /// unsubscribed.
217
- bool _checkForCancel () {
218
- if (_state != _WatchState .UNSUBSCRIBED ) return false ;
219
-
220
- // Don't process any more files.
221
- _filesToProcess.clear ();
222
- return true ;
223
- }
224
-
225
- /// Calculates the SHA-1 hash of the file at [path] .
226
- Future <List <int >> _hashFile (String path) {
227
- return new File (path).readAsBytes ().then ((bytes) {
228
- var sha1 = new SHA1 ();
229
- sha1.add (bytes);
230
- return sha1.close ();
231
- });
232
- }
233
-
234
- /// Returns `true` if [a] and [b] are the same hash value, i.e. the same
235
- /// series of byte values.
236
- bool _sameHash (List <int > a, List <int > b) {
237
- // Hashes should always be the same size.
238
- assert (a.length == b.length);
239
-
240
- for (var i = 0 ; i < a.length; i++ ) {
241
- if (a[i] != b[i]) return false ;
242
- }
243
-
244
- return true ;
245
- }
246
- }
247
-
248
- /// Enum class for the states that the [DirectoryWatcher] can be in.
249
- class _WatchState {
250
- /// There are no subscribers to the watcher's event stream and no watching
251
- /// is going on.
252
- static const UNSUBSCRIBED = const _WatchState ("unsubscribed" );
253
-
254
- /// There are subscribers and the watcher is doing an initial scan of the
255
- /// directory to see which files were already present before watching started.
47
+ /// If a native directory watcher is available for this platform, this will
48
+ /// use it. Otherwise, it will fall back to a [PollingDirectoryWatcher] .
256
49
///
257
- /// The watcher does not send notifications for changes that occurred while
258
- /// there were no subscribers, or for files already present before watching.
259
- /// The initial scan is used to determine what "before watching" state of
260
- /// the file system was.
261
- static const SCANNING = const _WatchState ("scanning" );
262
-
263
- /// There are subscribers and the watcher is polling the directory to look
264
- /// for changes.
265
- static const WATCHING = const _WatchState ("watching" );
266
-
267
- /// The name of the state.
268
- final String name;
269
-
270
- const _WatchState (this .name);
50
+ /// If [_pollingDelay] is passed, it specifies the amount of time the watcher
51
+ /// will pause between successive polls of the directory contents. Making this
52
+ /// shorter will give more immediate feedback at the expense of doing more IO
53
+ /// and higher CPU usage. Defaults to one second. Ignored for non-polling
54
+ /// watchers.
55
+ factory DirectoryWatcher (String directory, {Duration pollingDelay}) {
56
+ if (Platform .isLinux) return new LinuxDirectoryWatcher (directory);
57
+ return new PollingDirectoryWatcher (directory, pollingDelay: pollingDelay);
58
+ }
271
59
}
272
-
273
- class _FileStatus {
274
- /// The last time the file was modified.
275
- DateTime modified;
276
-
277
- /// The SHA-1 hash of the contents of the file.
278
- List <int > hash;
279
-
280
- _FileStatus (this .modified, this .hash);
281
- }
0 commit comments