Skip to content

Commit 3a86142

Browse files
committed
Tidy retriever
1 parent 0fd682e commit 3a86142

File tree

2 files changed

+40
-25
lines changed

2 files changed

+40
-25
lines changed

lib/src/retriever.dart

Lines changed: 38 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import "package:async/async.dart";
1010

1111
/// Handles rate-limited scheduling of tasks.
1212
///
13-
/// Designed to allow prefetching tasks that will likely be needed
13+
/// Tasks are named with a key of type [K] (should be useful as a Hash-key) and
14+
/// run with a supplied function producing a CancelableOperation.
15+
///
16+
/// Designed to allow prefetching of tasks that will likely be needed
1417
/// later with [prefetch].
1518
///
1619
/// All current operations can be cancelled and future operations removed from
@@ -33,29 +36,43 @@ import "package:async/async.dart";
3336
/// final pubDevBody = await retriever.fetch(Uri.parse('https://pub.dev/'));
3437
/// ```
3538
class Retriever<K, V> {
36-
final CancelableOperation<V> Function(K, Retriever) _get;
39+
final CancelableOperation<V> Function(K, Retriever) _run;
40+
41+
/// The results of ongoing and finished computations.
3742
final Map<K, Completer<V>> _cache = <K, Completer<V>>{};
3843

3944
/// Operations that are waiting to run.
4045
final Queue<K> _queue = Queue<K>();
4146

47+
/// Rate limits the downloads.
4248
final Pool _pool;
4349

44-
/// The active operations
50+
/// The currently active operations.
4551
final Map<K, CancelableOperation<V>> _active = <K, CancelableOperation<V>>{};
46-
bool started = false;
4752

48-
Retriever(this._get, {maxConcurrentOperations = 10})
49-
: _pool = Pool(maxConcurrentOperations);
53+
/// True when the processing loop is running.
54+
bool _started = false;
55+
56+
Retriever(CancelableOperation<V> Function(K, Retriever) run,
57+
{maxConcurrentOperations = 10})
58+
: _run = run,
59+
_pool = Pool(maxConcurrentOperations);
60+
61+
Retriever.nonCancelable(Future<V> Function(K, Retriever) run,
62+
{maxConcurrentOperations = 10})
63+
: this(
64+
(key, retriever) =>
65+
CancelableOperation.fromFuture(run(key, retriever)),
66+
maxConcurrentOperations: maxConcurrentOperations);
5067

5168
/// Starts running operations from the queue. Taking the first items first.
5269
void _process() async {
53-
assert(!started);
54-
started = true;
70+
assert(!_started);
71+
_started = true;
5572
while (_queue.isNotEmpty) {
5673
final resource = await _pool.request();
5774
// This checks if [stop] has been called while waiting for a resource.
58-
if (!started) {
75+
if (!_started) {
5976
resource.release();
6077
break;
6178
}
@@ -75,8 +92,8 @@ class Retriever<K, V> {
7592
continue;
7693
}
7794

78-
// Run operation task.
79-
final operation = _get(task, this);
95+
// Start running the operation for [task].
96+
final operation = _run(task, this);
8097
_active[task] = operation;
8198
operation
8299
.then(completer.complete, onError: completer.completeError)
@@ -86,42 +103,42 @@ class Retriever<K, V> {
86103
_active.remove(task);
87104
});
88105
}
89-
started = false;
106+
_started = false;
90107
}
91108

92109
/// Cancels all active computations, and clears the queue.
93110
void stop() {
94-
// Stop the processing loop
95-
started = false;
96-
// Cancel all active operatios
111+
// Stop the processing loop.
112+
_started = false;
113+
// Cancel all active operations.
97114
for (final operation in _active.values) {
98115
operation.cancel();
99116
}
100-
// Do not process anymore.
117+
// Do not process the rest of the queue.
101118
_queue.clear();
102119
}
103120

104121
/// Puts [task] in the back of the work queue.
105122
///
106-
/// Tasl will be processed when there are free resources, and other already
123+
/// Task will be processed when there are free resources, and other already
107124
/// queued tasks are done.
108125
void prefetch(K task) {
109126
_queue.addLast(task);
110-
if (!started) _process();
127+
if (!_started) _process();
111128
}
112129

113130
/// Returns the result of running [task].
114131
///
115132
/// If [task] is already done, the cached result will be returned.
116133
/// If [task] is not yet active, it will go to the front of the work queue
117-
/// to be scheduled when there are free resources.
134+
/// to be scheduled next when there are free resources.
118135
Future<V> fetch(K task) {
119136
final completer = _cache.putIfAbsent(task, () => Completer());
120137
if (!completer.isCompleted) {
121-
// We don't worry about adding the same task twice.
138+
// We allow adding the same task twice to the queue.
122139
// It will get dedupped by the [_process] loop.
123140
_queue.addFirst(task);
124-
if (!started) _process();
141+
if (!_started) _process();
125142
}
126143
return completer.future;
127144
}

lib/src/source/hosted.dart

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -149,10 +149,8 @@ class BoundHostedSource extends CachedSource {
149149
Retriever<PackageRef, Map<PackageId, Pubspec>> retriever;
150150

151151
BoundHostedSource(this.source, this.systemCache) {
152-
retriever = Retriever(
153-
(url, retriever) =>
154-
CancelableOperation.fromFuture(_getVersions(url, retriever)),
155-
maxConcurrentOperations: 10);
152+
retriever =
153+
Retriever.nonCancelable(_getVersions, maxConcurrentOperations: 10);
156154
}
157155

158156
Future<Map<PackageId, Pubspec>> _getVersions(

0 commit comments

Comments
 (0)