Skip to content

store: Replace event queue on expiry #466

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jan 3, 2024
127 changes: 108 additions & 19 deletions lib/model/store.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:convert';
import 'dart:io';

Expand All @@ -7,6 +8,7 @@ import 'package:path/path.dart' as p;
import 'package:path_provider/path_provider.dart';

import '../api/core.dart';
import '../api/exception.dart';
import '../api/model/events.dart';
import '../api/model/initial_snapshot.dart';
import '../api/model/model.dart';
Expand Down Expand Up @@ -95,11 +97,26 @@ abstract class GlobalStore extends ChangeNotifier {
future = loadPerAccount(account!);
_perAccountStoresLoading[accountId] = future;
store = await future;
_perAccountStores[accountId] = store;
_setPerAccount(accountId, store);
_perAccountStoresLoading.remove(accountId);
return store;
}

Future<void> _reloadPerAccount(Account account) async {
assert(identical(_accounts[account.id], account));
assert(_perAccountStores.containsKey(account.id));
assert(!_perAccountStoresLoading.containsKey(account.id));
final store = await loadPerAccount(account);
_setPerAccount(account.id, store);
}

void _setPerAccount(int accountId, PerAccountStore store) {
final oldStore = _perAccountStores[accountId];
_perAccountStores[accountId] = store;
notifyListeners();
oldStore?.dispose();
}

/// Load per-account data for the given account, unconditionally.
///
/// This method should be called only by the implementation of [perAccount].
Expand Down Expand Up @@ -133,28 +150,29 @@ abstract class GlobalStore extends ChangeNotifier {

// More mutators as needed:
// Future<void> updateAccount...

@override
String toString() => '${objectRuntimeType(this, 'GlobalStore')}#${shortHash(this)}';
}

/// Store for the user's data for a given Zulip account.
///
/// This should always have a consistent snapshot of the state on the server,
/// as provided by the Zulip event system.
///
/// An instance directly of this class will not attempt to poll an event queue
/// to keep the data up to date. For that behavior, see the subclass
/// [LivePerAccountStore].
/// This class does not attempt to poll an event queue
/// to keep the data up to date. For that behavior, see
/// [UpdateMachine].
class PerAccountStore extends ChangeNotifier with StreamStore {
/// Create a per-account data store that does not automatically stay up to date.
///
/// For a [PerAccountStore] that polls an event queue to keep itself up to
/// date, use [LivePerAccountStore.fromInitialSnapshot].
factory PerAccountStore.fromInitialSnapshot({
required GlobalStore globalStore,
required Account account,
required ApiConnection connection,
required InitialSnapshot initialSnapshot,
}) {
final streams = StreamStoreImpl(initialSnapshot: initialSnapshot);
return PerAccountStore._(
globalStore: globalStore,
account: account,
connection: connection,
zulipVersion: initialSnapshot.zulipVersion,
Expand All @@ -180,6 +198,7 @@ class PerAccountStore extends ChangeNotifier with StreamStore {
}

PerAccountStore._({
required GlobalStore globalStore,
required this.account,
required this.connection,
required this.zulipVersion,
Expand All @@ -192,7 +211,10 @@ class PerAccountStore extends ChangeNotifier with StreamStore {
required this.users,
required streams,
required this.recentDmConversationsView,
}) : _streams = streams;
}) : _globalStore = globalStore,
_streams = streams;

final GlobalStore _globalStore;

final Account account;
final ApiConnection connection; // TODO(#135): update zulipFeatureLevel with events
Expand Down Expand Up @@ -232,7 +254,6 @@ class PerAccountStore extends ChangeNotifier with StreamStore {

// TODO lots more data. When adding, be sure to update handleEvent too.

// TODO call [RecentDmConversationsView.dispose] in [dispose]
final RecentDmConversationsView recentDmConversationsView;

final Set<MessageListView> _messageListViews = {};
Expand Down Expand Up @@ -260,6 +281,16 @@ class PerAccountStore extends ChangeNotifier with StreamStore {
autocompleteViewManager.reassemble();
}

@override
void dispose() {
unreads.dispose();
recentDmConversationsView.dispose();
for (final view in _messageListViews.toList()) {
view.dispose();
}
super.dispose();
}

void handleEvent(Event event) {
if (event is HeartbeatEvent) {
assert(debugLog("server event: heartbeat"));
Expand Down Expand Up @@ -401,6 +432,9 @@ class PerAccountStore extends ChangeNotifier with StreamStore {
final nonDisplayFields = initialCustomProfileFields.where((e) => e.displayInProfileSummary != true);
return displayFields.followedBy(nonDisplayFields).toList();
}

@override
String toString() => '${objectRuntimeType(this, 'PerAccountStore')}#${shortHash(this)}';
}

const _apiSendMessage = sendMessage; // Bit ugly; for alternatives, see: https://chat.zulip.org/#narrow/stream/243-mobile-team/topic/flutter.3A.20PerAccountStore.20methods/near/1545809
Expand All @@ -410,8 +444,8 @@ const _apiSendMessage = sendMessage; // Bit ugly; for alternatives, see: https:/
/// The underlying data store is an [AppDatabase] corresponding to a SQLite
/// database file in the app's persistent storage on the device.
///
/// The per-account stores will be instances of [LivePerAccountStore],
/// with data loaded through a live [ApiConnection].
/// The per-account stores will use a live [ApiConnection],
/// and will have an associated [UpdateMachine].
class LiveGlobalStore extends GlobalStore {
LiveGlobalStore._({
required AppDatabase db,
Expand Down Expand Up @@ -449,7 +483,7 @@ class LiveGlobalStore extends GlobalStore {

@override
Future<PerAccountStore> loadPerAccount(Account account) async {
final updateMachine = await UpdateMachine.load(account);
final updateMachine = await UpdateMachine.load(this, account);
return updateMachine.store;
}

Expand All @@ -465,6 +499,9 @@ class LiveGlobalStore extends GlobalStore {
..where((a) => a.id.equals(accountId))
).getSingle();
}

@override
String toString() => '${objectRuntimeType(this, 'LiveGlobalStore')}#${shortHash(this)}';
}

/// A [PerAccountStore] plus an event-polling loop to stay up to date.
Expand All @@ -484,7 +521,7 @@ class UpdateMachine {
/// Load the user's data from the server, and start an event queue going.
///
/// In the future this might load an old snapshot from local storage first.
static Future<UpdateMachine> load(Account account) async {
static Future<UpdateMachine> load(GlobalStore globalStore, Account account) async {
final connection = ApiConnection.live(
realmUrl: account.realmUrl, zulipFeatureLevel: account.zulipFeatureLevel,
email: account.email, apiKey: account.apiKey);
Expand All @@ -496,6 +533,7 @@ class UpdateMachine {
if (kDebugMode) print("initial fetch time: ${t.inMilliseconds}ms");

final store = PerAccountStore.fromInitialSnapshot(
globalStore: globalStore,
account: account,
connection: connection,
initialSnapshot: initialSnapshot,
Expand All @@ -513,12 +551,57 @@ class UpdateMachine {
final String queueId;
int lastEventId;

Completer<void>? _debugLoopSignal;

/// In debug mode, causes the polling loop to pause before the next
/// request and wait for [debugAdvanceLoop] to be called.
void debugPauseLoop() {
assert((){
assert(_debugLoopSignal == null);
_debugLoopSignal = Completer();
return true;
}());
}

/// In debug mode, after a call to [debugPauseLoop], causes the
/// polling loop to make one more request and then pause again.
void debugAdvanceLoop() {
assert((){
_debugLoopSignal!.complete();
return true;
}());
}

void poll() async {
while (true) {
final result = await getEvents(store.connection,
queueId: queueId, lastEventId: lastEventId);
// TODO handle errors on get-events; retry with backoff
// TODO abort long-poll and close ApiConnection on [dispose]
if (_debugLoopSignal != null) {
await _debugLoopSignal!.future;
assert(() {
_debugLoopSignal = Completer();
return true;
}());
}

final GetEventsResult result;
try {
result = await getEvents(store.connection,
queueId: queueId, lastEventId: lastEventId);
} catch (e) {
switch (e) {
case ZulipApiException(code: 'BAD_EVENT_QUEUE_ID'):
assert(debugLog('Lost event queue for $store. Replacing…'));
await store._globalStore._reloadPerAccount(store.account);
dispose();
debugLog('… Event queue replaced.');
return;

default:
assert(debugLog('Error polling event queue for $store: $e'));
// TODO(#184) handle errors on get-events; retry with backoff
rethrow;
}
}

final events = result.events;
for (final event in events) {
store.handleEvent(event);
Expand All @@ -537,7 +620,6 @@ class UpdateMachine {
// TODO(#322) save acked token, to dedupe updating it on the server
// TODO(#323) track the registerFcmToken/etc request, warn if not succeeding
Future<void> registerNotificationToken() async {
// TODO call removeListener on [dispose]
NotificationService.instance.token.addListener(_registerNotificationToken);
await _registerNotificationToken();
}
Expand All @@ -547,4 +629,11 @@ class UpdateMachine {
if (token == null) return;
await NotificationService.registerToken(store.connection, token: token);
}

void dispose() { // TODO abort long-poll and close ApiConnection
NotificationService.instance.token.removeListener(_registerNotificationToken);
}

@override
String toString() => '${objectRuntimeType(this, 'UpdateMachine')}#${shortHash(this)}';
}
3 changes: 1 addition & 2 deletions lib/widgets/message_list.dart
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,7 @@ class _MessageListState extends State<MessageList> with PerAccountStoreAwareStat
}

@override
void onNewStore() {
model?.dispose();
void onNewStore() { // TODO(#464) try to keep using old model until new one gets messages
_initModel(PerAccountStoreWidget.of(context));
}

Expand Down
26 changes: 13 additions & 13 deletions lib/widgets/store.dart
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,17 @@ class PerAccountStoreWidget extends StatefulWidget {
/// use [State.didChangeDependencies] instead. For discussion, see
/// [BuildContext.dependOnInheritedWidgetOfExactType].
///
/// A [State] that calls this method in [State.didChangeDependencies]
/// should typically also mix in [PerAccountStoreAwareStateMixin],
/// in order to handle the store itself being replaced with a new store.
/// This happens in particular if the old store's event queue expires
/// on the server.
///
/// See also:
/// * [accountIdOf], for the account ID corresponding to the same data.
/// * [GlobalStoreWidget.of], for the app's data beyond that of a
/// particular account.
/// * [InheritedNotifier], which provides the "dependency" mechanism.
// TODO(#185): Explain in dartdoc that the returned [PerAccountStore] might
// differ from one call to the next, and to handle that with
// [PerAccountStoreAwareStateMixin].
static PerAccountStore of(BuildContext context) {
final widget = context.dependOnInheritedWidgetOfExactType<_PerAccountStoreInheritedWidget>();
assert(widget != null, 'No PerAccountStoreWidget ancestor');
Expand Down Expand Up @@ -192,10 +195,8 @@ class _PerAccountStoreWidgetState extends State<PerAccountStoreWidget> {
if (store != null) {
_setStore(store);
} else {
// If we don't already have data, wait for it.
(() async {
_setStore(await globalStore.perAccount(widget.accountId));
})();
// If we don't already have data, request it.
globalStore.perAccount(widget.accountId);
}
}

Expand Down Expand Up @@ -255,13 +256,11 @@ class LoadingPage extends StatelessWidget {
/// The ambient [PerAccountStore] can be replaced in some circumstances,
/// such as when an event queue expires. See [PerAccountStoreWidget.of].
/// When that happens, stateful widgets should
/// - remove listeners on the old [PerAccountStore], and
/// - stop using the old [PerAccountStore], which will already have
/// been disposed;
/// - add listeners on the new one.
///
/// Use this mixin, overriding [onNewStore], to do that concisely.
// TODO(#185): Until #185, I think [PerAccountStoreWidget.of] never actually
// returns a different [PerAccountStore] from one call to the next.
// But it will, and when it does, we want our [StatefulWidgets] to handle it.
mixin PerAccountStoreAwareStateMixin<T extends StatefulWidget> on State<T> {
PerAccountStore? _store;

Expand All @@ -272,8 +271,9 @@ mixin PerAccountStoreAwareStateMixin<T extends StatefulWidget> on State<T> {
/// and again whenever dependencies change so that [PerAccountStoreWidget.of]
/// would return a different store from previously.
///
/// In this, remove any listeners on the old store
/// and add them on the new store.
/// In this, add any needed listeners on the new store
/// and drop any references to the old store, which will already
/// have been disposed.
void onNewStore();

@override
Expand Down
13 changes: 10 additions & 3 deletions test/example_data.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import 'package:zulip/model/narrow.dart';
import 'package:zulip/model/store.dart';

import 'api/fake_api.dart';
import 'model/test_store.dart';
import 'stdlib_checks.dart';

////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -414,9 +415,13 @@ UpdateMessageFlagsRemoveEvent updateMessageFlagsRemoveEvent(
}

////////////////////////////////////////////////////////////////
// The entire per-account state.
// The entire per-account or global state.
//

GlobalStore globalStore({List<Account> accounts = const []}) {
return TestGlobalStore(accounts: accounts);
}

InitialSnapshot initialSnapshot({
String? queueId,
int? lastEventId,
Expand Down Expand Up @@ -467,9 +472,11 @@ InitialSnapshot initialSnapshot({
const _initialSnapshot = initialSnapshot;

PerAccountStore store({Account? account, InitialSnapshot? initialSnapshot}) {
final effectiveAccount = account ?? selfAccount;
return PerAccountStore.fromInitialSnapshot(
account: account ?? selfAccount,
connection: FakeApiConnection.fromAccount(account ?? selfAccount),
globalStore: globalStore(accounts: [effectiveAccount]),
account: effectiveAccount,
connection: FakeApiConnection.fromAccount(effectiveAccount),
initialSnapshot: initialSnapshot ?? _initialSnapshot(),
);
}
Expand Down
Loading