Skip to content

Refactor and merge datastore head task pollers. #320

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Aug 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/bin/service/search.dart
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void _main(int isolateId) {
batchIndexUpdater,
[
new ManualTriggerTaskSource(taskReceivePort),
new IndexUpdateTaskSource(batchIndexUpdater),
new IndexUpdateTaskSource(db.dbService, batchIndexUpdater),
],
);
scheduler.run();
Expand Down
46 changes: 12 additions & 34 deletions app/lib/analyzer/task_sources.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import 'package:logging/logging.dart';

import '../frontend/models.dart';
import '../shared/task_scheduler.dart';
import '../shared/task_sources.dart';
import '../shared/utils.dart';

import 'models.dart';
Expand All @@ -18,43 +19,20 @@ final Logger _logger = new Logger('pub.analyzer.source');

/// Creates a task when a version uploaded in the past 10 minutes has no
/// analysis yet.
class DatastoreHeadTaskSource implements TaskSource {
class DatastoreHeadTaskSource extends DatastoreVersionsHeadTaskSource {
final DatastoreDB _db;
DateTime _lastTs;
DatastoreHeadTaskSource(this._db);
DatastoreHeadTaskSource(DatastoreDB db)
: _db = db,
super(db, skipHistory: true);

@override
Stream<Task> startStreaming() async* {
for (;;) {
try {
final DateTime now = new DateTime.now().toUtc();
final DateTime tenMinutesAgo =
now.subtract(const Duration(minutes: 10));
_lastTs ??= tenMinutesAgo;
final DateTime minCreated =
_lastTs.isBefore(tenMinutesAgo) ? _lastTs : tenMinutesAgo;

final Query q = _db.query(PackageVersion)
..filter('created >=', minCreated)
..order('created');
await for (PackageVersion pv in q.run()) {
if (_lastTs == null || _lastTs.isBefore(pv.created)) {
_lastTs = pv.created;
}
final List<PackageVersionAnalysis> items = await _db.lookup([
_db.emptyKey
.append(PackageAnalysis, id: pv.package)
.append(PackageVersionAnalysis, id: pv.version)
]);
if (items.first == null) {
yield new Task(pv.package, pv.version);
}
}
} catch (e, st) {
_logger.severe('Error polling head.', e, st);
}
await new Future.delayed(const Duration(minutes: 1));
}
Future<bool> shouldYieldTask(Task task) async {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally I would prefer separating these concerns via composition rather than inheritance, e.g. new FilteredTaskSource(new DatastoreHeadTaskSource()).

final List<PackageVersionAnalysis> items = await _db.lookup([
_db.emptyKey
.append(PackageAnalysis, id: task.package)
.append(PackageVersionAnalysis, id: task.version)
]);
return items.first == null;
}
}

Expand Down
8 changes: 0 additions & 8 deletions app/lib/search/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,6 @@ class SearchBackend {
}
return results;
}

Stream<String> listPackages({DateTime updatedAfter}) {
final Query q = _db.query(Package);
if (updatedAfter != null) {
q.filter('updated >=', updatedAfter);
}
return q.run().map((Model m) => (m as Package).name);
}
}

String _toUrl(String package) => 'https://pub.dartlang.org/packages/$package';
Expand Down
23 changes: 7 additions & 16 deletions app/lib/search/updater.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,26 @@

import 'dart:async';

import 'package:gcloud/db.dart';
import 'package:logging/logging.dart';

import '../shared/search_service.dart';
import '../shared/task_scheduler.dart';
import '../shared/task_sources.dart';

import 'backend.dart';
import 'index_simple.dart';

Logger _logger = new Logger('pub.search.updater');

class IndexUpdateTaskSource implements TaskSource {
class IndexUpdateTaskSource extends DatastoreVersionsHeadTaskSource {
final BatchIndexUpdater _batchIndexUpdater;
DateTime _lastTs;
IndexUpdateTaskSource(this._batchIndexUpdater);
IndexUpdateTaskSource(DatastoreDB db, this._batchIndexUpdater)
: super(db, onlyLatest: true, sleep: const Duration(minutes: 30));

@override
Stream<Task> startStreaming() async* {
for (;;) {
final DateTime now = new DateTime.now().toUtc();
int count = 0;
await for (String package
in searchBackend.listPackages(updatedAfter: _lastTs)) {
count++;
yield new Task(package, null);
}
_batchIndexUpdater.reportScanCount(count);
_lastTs = now.subtract(const Duration(minutes: 10));
await new Future.delayed(new Duration(minutes: 30));
}
Future dbScanComplete(int count) async {
_batchIndexUpdater.reportScanCount(count);
}
}

Expand Down
101 changes: 101 additions & 0 deletions app/lib/shared/task_sources.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';

import 'package:gcloud/db.dart';
import 'package:logging/logging.dart';

import '../frontend/models.dart';

import 'task_scheduler.dart';

final Logger _logger = new Logger('pub.shared.task_sources');

const Duration _defaultWindow = const Duration(minutes: 5);
const Duration _defaultSleep = const Duration(minutes: 1);

/// Creates tasks by polling the datastore for new versions.
class DatastoreVersionsHeadTaskSource implements TaskSource {
final DatastoreDB _db;
final Duration _window;
final Duration _sleep;
final bool _onlyLatest;
DateTime _lastTs;

DatastoreVersionsHeadTaskSource(
this._db, {

/// Whether to return only the latest versions of the packages.
bool onlyLatest: false,

/// Whether to scan the entire datastore in the first run or skip old ones.
bool skipHistory: false,

/// Tolerance window for eventually consistency in Datastore.
Duration window,

/// Inactivity duration between two polls.
Duration sleep,
})
: _window = window ?? _defaultWindow,
_sleep = sleep ?? _defaultSleep,
_onlyLatest = onlyLatest,
_lastTs =
skipHistory ? new DateTime.now().toUtc().subtract(window) : null;

@override
Stream<Task> startStreaming() async* {
for (;;) {
try {
final DateTime now = new DateTime.now().toUtc();
if (_onlyLatest) {
yield* _pollPackages();
} else {
yield* _pollPackageVersions();
}
_lastTs = now.subtract(_window);
} catch (e, st) {
_logger.severe('Error polling head.', e, st);
}
await new Future.delayed(_sleep);
}
}

Future<bool> shouldYieldTask(Task task) async => true;

Future dbScanComplete(int count) async {}

Stream<Task> _pollPackages() async* {
final Query q = _db.query(Package);
if (_lastTs != null) {
q.filter('updated >=', _lastTs);
}
int count = 0;
await for (Package p in q.run()) {
final task = new Task(p.name, p.latestVersion ?? p.latestDevVersion);
if (await shouldYieldTask(task)) {
count++;
yield task;
}
}
await dbScanComplete(count);
}

Stream<Task> _pollPackageVersions() async* {
final Query q = _db.query(PackageVersion);
if (_lastTs != null) {
q.filter('created >=', _lastTs);
}
int count = 0;
await for (PackageVersion pv in q.run()) {
final task = new Task(pv.package, pv.version);
if (await shouldYieldTask(task)) {
count++;
yield task;
}
}
await dbScanComplete(count);
}
}
5 changes: 0 additions & 5 deletions app/test/search/handlers_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -135,11 +135,6 @@ void main() {
class MockSearchBackend implements SearchBackend {
List<String> packages = ['pkg_foo'];

@override
Stream<String> listPackages({DateTime updatedAfter}) {
return new Stream.fromIterable(packages);
}

@override
Future<List<PackageDocument>> loadDocuments(List<String> packages) async {
return packages.map((String package) {
Expand Down