diff --git a/app/bin/service/search.dart b/app/bin/service/search.dart index e602763059..ee064ce5f7 100644 --- a/app/bin/service/search.dart +++ b/app/bin/service/search.dart @@ -67,7 +67,7 @@ void _main(int isolateId) { batchIndexUpdater, [ new ManualTriggerTaskSource(taskReceivePort), - new IndexUpdateTaskSource(batchIndexUpdater), + new IndexUpdateTaskSource(db.dbService, batchIndexUpdater), ], ); scheduler.run(); diff --git a/app/lib/analyzer/task_sources.dart b/app/lib/analyzer/task_sources.dart index 4dc76770b1..01cd2ff6ef 100644 --- a/app/lib/analyzer/task_sources.dart +++ b/app/lib/analyzer/task_sources.dart @@ -9,6 +9,7 @@ import 'package:logging/logging.dart'; import '../frontend/models.dart'; import '../shared/task_scheduler.dart'; +import '../shared/task_sources.dart'; import '../shared/utils.dart'; import 'models.dart'; @@ -18,43 +19,20 @@ final Logger _logger = new Logger('pub.analyzer.source'); /// Creates a task when a version uploaded in the past 10 minutes has no /// analysis yet. -class DatastoreHeadTaskSource implements TaskSource { +class DatastoreHeadTaskSource extends DatastoreVersionsHeadTaskSource { final DatastoreDB _db; - DateTime _lastTs; - DatastoreHeadTaskSource(this._db); + DatastoreHeadTaskSource(DatastoreDB db) + : _db = db, + super(db, skipHistory: true); @override - Stream startStreaming() async* { - for (;;) { - try { - final DateTime now = new DateTime.now().toUtc(); - final DateTime tenMinutesAgo = - now.subtract(const Duration(minutes: 10)); - _lastTs ??= tenMinutesAgo; - final DateTime minCreated = - _lastTs.isBefore(tenMinutesAgo) ? _lastTs : tenMinutesAgo; - - final Query q = _db.query(PackageVersion) - ..filter('created >=', minCreated) - ..order('created'); - await for (PackageVersion pv in q.run()) { - if (_lastTs == null || _lastTs.isBefore(pv.created)) { - _lastTs = pv.created; - } - final List items = await _db.lookup([ - _db.emptyKey - .append(PackageAnalysis, id: pv.package) - .append(PackageVersionAnalysis, id: pv.version) - ]); - if (items.first == null) { - yield new Task(pv.package, pv.version); - } - } - } catch (e, st) { - _logger.severe('Error polling head.', e, st); - } - await new Future.delayed(const Duration(minutes: 1)); - } + Future shouldYieldTask(Task task) async { + final List items = await _db.lookup([ + _db.emptyKey + .append(PackageAnalysis, id: task.package) + .append(PackageVersionAnalysis, id: task.version) + ]); + return items.first == null; } } diff --git a/app/lib/search/backend.dart b/app/lib/search/backend.dart index fd9653c526..31f5427a56 100644 --- a/app/lib/search/backend.dart +++ b/app/lib/search/backend.dart @@ -91,14 +91,6 @@ class SearchBackend { } return results; } - - Stream listPackages({DateTime updatedAfter}) { - final Query q = _db.query(Package); - if (updatedAfter != null) { - q.filter('updated >=', updatedAfter); - } - return q.run().map((Model m) => (m as Package).name); - } } String _toUrl(String package) => 'https://pub.dartlang.org/packages/$package'; diff --git a/app/lib/search/updater.dart b/app/lib/search/updater.dart index 59ef0d79d3..ba0e8ed484 100644 --- a/app/lib/search/updater.dart +++ b/app/lib/search/updater.dart @@ -4,35 +4,26 @@ import 'dart:async'; +import 'package:gcloud/db.dart'; import 'package:logging/logging.dart'; import '../shared/search_service.dart'; import '../shared/task_scheduler.dart'; +import '../shared/task_sources.dart'; import 'backend.dart'; import 'index_simple.dart'; Logger _logger = new Logger('pub.search.updater'); -class IndexUpdateTaskSource implements TaskSource { +class IndexUpdateTaskSource extends DatastoreVersionsHeadTaskSource { final BatchIndexUpdater _batchIndexUpdater; - DateTime _lastTs; - IndexUpdateTaskSource(this._batchIndexUpdater); + IndexUpdateTaskSource(DatastoreDB db, this._batchIndexUpdater) + : super(db, onlyLatest: true, sleep: const Duration(minutes: 30)); @override - Stream startStreaming() async* { - for (;;) { - final DateTime now = new DateTime.now().toUtc(); - int count = 0; - await for (String package - in searchBackend.listPackages(updatedAfter: _lastTs)) { - count++; - yield new Task(package, null); - } - _batchIndexUpdater.reportScanCount(count); - _lastTs = now.subtract(const Duration(minutes: 10)); - await new Future.delayed(new Duration(minutes: 30)); - } + Future dbScanComplete(int count) async { + _batchIndexUpdater.reportScanCount(count); } } diff --git a/app/lib/shared/task_sources.dart b/app/lib/shared/task_sources.dart new file mode 100644 index 0000000000..fc55a4de95 --- /dev/null +++ b/app/lib/shared/task_sources.dart @@ -0,0 +1,101 @@ +// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:async'; + +import 'package:gcloud/db.dart'; +import 'package:logging/logging.dart'; + +import '../frontend/models.dart'; + +import 'task_scheduler.dart'; + +final Logger _logger = new Logger('pub.shared.task_sources'); + +const Duration _defaultWindow = const Duration(minutes: 5); +const Duration _defaultSleep = const Duration(minutes: 1); + +/// Creates tasks by polling the datastore for new versions. +class DatastoreVersionsHeadTaskSource implements TaskSource { + final DatastoreDB _db; + final Duration _window; + final Duration _sleep; + final bool _onlyLatest; + DateTime _lastTs; + + DatastoreVersionsHeadTaskSource( + this._db, { + + /// Whether to return only the latest versions of the packages. + bool onlyLatest: false, + + /// Whether to scan the entire datastore in the first run or skip old ones. + bool skipHistory: false, + + /// Tolerance window for eventually consistency in Datastore. + Duration window, + + /// Inactivity duration between two polls. + Duration sleep, + }) + : _window = window ?? _defaultWindow, + _sleep = sleep ?? _defaultSleep, + _onlyLatest = onlyLatest, + _lastTs = + skipHistory ? new DateTime.now().toUtc().subtract(window) : null; + + @override + Stream startStreaming() async* { + for (;;) { + try { + final DateTime now = new DateTime.now().toUtc(); + if (_onlyLatest) { + yield* _pollPackages(); + } else { + yield* _pollPackageVersions(); + } + _lastTs = now.subtract(_window); + } catch (e, st) { + _logger.severe('Error polling head.', e, st); + } + await new Future.delayed(_sleep); + } + } + + Future shouldYieldTask(Task task) async => true; + + Future dbScanComplete(int count) async {} + + Stream _pollPackages() async* { + final Query q = _db.query(Package); + if (_lastTs != null) { + q.filter('updated >=', _lastTs); + } + int count = 0; + await for (Package p in q.run()) { + final task = new Task(p.name, p.latestVersion ?? p.latestDevVersion); + if (await shouldYieldTask(task)) { + count++; + yield task; + } + } + await dbScanComplete(count); + } + + Stream _pollPackageVersions() async* { + final Query q = _db.query(PackageVersion); + if (_lastTs != null) { + q.filter('created >=', _lastTs); + } + int count = 0; + await for (PackageVersion pv in q.run()) { + final task = new Task(pv.package, pv.version); + if (await shouldYieldTask(task)) { + count++; + yield task; + } + } + await dbScanComplete(count); + } +} diff --git a/app/test/search/handlers_test.dart b/app/test/search/handlers_test.dart index f7a3327607..4eb7c81ca1 100644 --- a/app/test/search/handlers_test.dart +++ b/app/test/search/handlers_test.dart @@ -135,11 +135,6 @@ void main() { class MockSearchBackend implements SearchBackend { List packages = ['pkg_foo']; - @override - Stream listPackages({DateTime updatedAfter}) { - return new Stream.fromIterable(packages); - } - @override Future> loadDocuments(List packages) async { return packages.map((String package) {