Skip to content

Commit 3c21dbd

Browse files
isoosmkustermann
authored andcommitted
Refactor and merge datastore head task pollers. (#320)
1 parent 886f839 commit 3c21dbd

File tree

6 files changed

+121
-64
lines changed

6 files changed

+121
-64
lines changed

app/bin/service/search.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ void _main(int isolateId) {
6767
batchIndexUpdater,
6868
[
6969
new ManualTriggerTaskSource(taskReceivePort),
70-
new IndexUpdateTaskSource(batchIndexUpdater),
70+
new IndexUpdateTaskSource(db.dbService, batchIndexUpdater),
7171
],
7272
);
7373
scheduler.run();

app/lib/analyzer/task_sources.dart

Lines changed: 12 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import 'package:logging/logging.dart';
99

1010
import '../frontend/models.dart';
1111
import '../shared/task_scheduler.dart';
12+
import '../shared/task_sources.dart';
1213
import '../shared/utils.dart';
1314

1415
import 'models.dart';
@@ -18,43 +19,20 @@ final Logger _logger = new Logger('pub.analyzer.source');
1819

1920
/// Creates a task when a version uploaded in the past 10 minutes has no
2021
/// analysis yet.
21-
class DatastoreHeadTaskSource implements TaskSource {
22+
class DatastoreHeadTaskSource extends DatastoreVersionsHeadTaskSource {
2223
final DatastoreDB _db;
23-
DateTime _lastTs;
24-
DatastoreHeadTaskSource(this._db);
24+
DatastoreHeadTaskSource(DatastoreDB db)
25+
: _db = db,
26+
super(db, skipHistory: true);
2527

2628
@override
27-
Stream<Task> startStreaming() async* {
28-
for (;;) {
29-
try {
30-
final DateTime now = new DateTime.now().toUtc();
31-
final DateTime tenMinutesAgo =
32-
now.subtract(const Duration(minutes: 10));
33-
_lastTs ??= tenMinutesAgo;
34-
final DateTime minCreated =
35-
_lastTs.isBefore(tenMinutesAgo) ? _lastTs : tenMinutesAgo;
36-
37-
final Query q = _db.query(PackageVersion)
38-
..filter('created >=', minCreated)
39-
..order('created');
40-
await for (PackageVersion pv in q.run()) {
41-
if (_lastTs == null || _lastTs.isBefore(pv.created)) {
42-
_lastTs = pv.created;
43-
}
44-
final List<PackageVersionAnalysis> items = await _db.lookup([
45-
_db.emptyKey
46-
.append(PackageAnalysis, id: pv.package)
47-
.append(PackageVersionAnalysis, id: pv.version)
48-
]);
49-
if (items.first == null) {
50-
yield new Task(pv.package, pv.version);
51-
}
52-
}
53-
} catch (e, st) {
54-
_logger.severe('Error polling head.', e, st);
55-
}
56-
await new Future.delayed(const Duration(minutes: 1));
57-
}
29+
Future<bool> shouldYieldTask(Task task) async {
30+
final List<PackageVersionAnalysis> items = await _db.lookup([
31+
_db.emptyKey
32+
.append(PackageAnalysis, id: task.package)
33+
.append(PackageVersionAnalysis, id: task.version)
34+
]);
35+
return items.first == null;
5836
}
5937
}
6038

app/lib/search/backend.dart

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,14 +91,6 @@ class SearchBackend {
9191
}
9292
return results;
9393
}
94-
95-
Stream<String> listPackages({DateTime updatedAfter}) {
96-
final Query q = _db.query(Package);
97-
if (updatedAfter != null) {
98-
q.filter('updated >=', updatedAfter);
99-
}
100-
return q.run().map((Model m) => (m as Package).name);
101-
}
10294
}
10395

10496
String _toUrl(String package) => 'https://pub.dartlang.org/packages/$package';

app/lib/search/updater.dart

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -4,35 +4,26 @@
44

55
import 'dart:async';
66

7+
import 'package:gcloud/db.dart';
78
import 'package:logging/logging.dart';
89

910
import '../shared/search_service.dart';
1011
import '../shared/task_scheduler.dart';
12+
import '../shared/task_sources.dart';
1113

1214
import 'backend.dart';
1315
import 'index_simple.dart';
1416

1517
Logger _logger = new Logger('pub.search.updater');
1618

17-
class IndexUpdateTaskSource implements TaskSource {
19+
class IndexUpdateTaskSource extends DatastoreVersionsHeadTaskSource {
1820
final BatchIndexUpdater _batchIndexUpdater;
19-
DateTime _lastTs;
20-
IndexUpdateTaskSource(this._batchIndexUpdater);
21+
IndexUpdateTaskSource(DatastoreDB db, this._batchIndexUpdater)
22+
: super(db, onlyLatest: true, sleep: const Duration(minutes: 30));
2123

2224
@override
23-
Stream<Task> startStreaming() async* {
24-
for (;;) {
25-
final DateTime now = new DateTime.now().toUtc();
26-
int count = 0;
27-
await for (String package
28-
in searchBackend.listPackages(updatedAfter: _lastTs)) {
29-
count++;
30-
yield new Task(package, null);
31-
}
32-
_batchIndexUpdater.reportScanCount(count);
33-
_lastTs = now.subtract(const Duration(minutes: 10));
34-
await new Future.delayed(new Duration(minutes: 30));
35-
}
25+
Future dbScanComplete(int count) async {
26+
_batchIndexUpdater.reportScanCount(count);
3627
}
3728
}
3829

app/lib/shared/task_sources.dart

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
import 'package:gcloud/db.dart';
8+
import 'package:logging/logging.dart';
9+
10+
import '../frontend/models.dart';
11+
12+
import 'task_scheduler.dart';
13+
14+
final Logger _logger = new Logger('pub.shared.task_sources');
15+
16+
const Duration _defaultWindow = const Duration(minutes: 5);
17+
const Duration _defaultSleep = const Duration(minutes: 1);
18+
19+
/// Creates tasks by polling the datastore for new versions.
20+
class DatastoreVersionsHeadTaskSource implements TaskSource {
21+
final DatastoreDB _db;
22+
final Duration _window;
23+
final Duration _sleep;
24+
final bool _onlyLatest;
25+
DateTime _lastTs;
26+
27+
DatastoreVersionsHeadTaskSource(
28+
this._db, {
29+
30+
/// Whether to return only the latest versions of the packages.
31+
bool onlyLatest: false,
32+
33+
/// Whether to scan the entire datastore in the first run or skip old ones.
34+
bool skipHistory: false,
35+
36+
/// Tolerance window for eventually consistency in Datastore.
37+
Duration window,
38+
39+
/// Inactivity duration between two polls.
40+
Duration sleep,
41+
})
42+
: _window = window ?? _defaultWindow,
43+
_sleep = sleep ?? _defaultSleep,
44+
_onlyLatest = onlyLatest,
45+
_lastTs =
46+
skipHistory ? new DateTime.now().toUtc().subtract(window) : null;
47+
48+
@override
49+
Stream<Task> startStreaming() async* {
50+
for (;;) {
51+
try {
52+
final DateTime now = new DateTime.now().toUtc();
53+
if (_onlyLatest) {
54+
yield* _pollPackages();
55+
} else {
56+
yield* _pollPackageVersions();
57+
}
58+
_lastTs = now.subtract(_window);
59+
} catch (e, st) {
60+
_logger.severe('Error polling head.', e, st);
61+
}
62+
await new Future.delayed(_sleep);
63+
}
64+
}
65+
66+
Future<bool> shouldYieldTask(Task task) async => true;
67+
68+
Future dbScanComplete(int count) async {}
69+
70+
Stream<Task> _pollPackages() async* {
71+
final Query q = _db.query(Package);
72+
if (_lastTs != null) {
73+
q.filter('updated >=', _lastTs);
74+
}
75+
int count = 0;
76+
await for (Package p in q.run()) {
77+
final task = new Task(p.name, p.latestVersion ?? p.latestDevVersion);
78+
if (await shouldYieldTask(task)) {
79+
count++;
80+
yield task;
81+
}
82+
}
83+
await dbScanComplete(count);
84+
}
85+
86+
Stream<Task> _pollPackageVersions() async* {
87+
final Query q = _db.query(PackageVersion);
88+
if (_lastTs != null) {
89+
q.filter('created >=', _lastTs);
90+
}
91+
int count = 0;
92+
await for (PackageVersion pv in q.run()) {
93+
final task = new Task(pv.package, pv.version);
94+
if (await shouldYieldTask(task)) {
95+
count++;
96+
yield task;
97+
}
98+
}
99+
await dbScanComplete(count);
100+
}
101+
}

app/test/search/handlers_test.dart

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -135,11 +135,6 @@ void main() {
135135
class MockSearchBackend implements SearchBackend {
136136
List<String> packages = ['pkg_foo'];
137137

138-
@override
139-
Stream<String> listPackages({DateTime updatedAfter}) {
140-
return new Stream.fromIterable(packages);
141-
}
142-
143138
@override
144139
Future<List<PackageDocument>> loadDocuments(List<String> packages) async {
145140
return packages.map((String package) {

0 commit comments

Comments
 (0)