Skip to content
95 changes: 89 additions & 6 deletions lib/model/store.dart
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ abstract class GlobalStore extends ChangeNotifier {
}

final Map<int, PerAccountStore> _perAccountStores = {};

int get debugNumPerAccountStoresLoading => _perAccountStoresLoading.length;
final Map<int, Future<PerAccountStore>> _perAccountStoresLoading = {};

/// The store's per-account data for the given account, if already loaded.
Expand Down Expand Up @@ -144,7 +146,21 @@ abstract class GlobalStore extends ChangeNotifier {
/// This method should be called only by the implementation of [perAccount].
/// Other callers interested in per-account data should use [perAccount]
/// and/or [perAccountSync].
Future<PerAccountStore> loadPerAccount(int accountId);
Future<PerAccountStore> loadPerAccount(int accountId) async {
assert(_accounts.containsKey(accountId));
final store = await doLoadPerAccount(accountId);
if (!_accounts.containsKey(accountId)) {
// [removeAccount] was called during [doLoadPerAccount].
store.dispose();
throw AccountNotFoundException();
}
return store;
}

/// Load per-account data for the given account, unconditionally.
///
/// This method should be called only by [loadPerAccount].
Future<PerAccountStore> doLoadPerAccount(int accountId);

// Just the Iterables, not the actual Map, to avoid clients mutating the map.
// Mutations should go through the setters/mutators below.
Expand Down Expand Up @@ -192,10 +208,26 @@ abstract class GlobalStore extends ChangeNotifier {
/// Update an account in the underlying data store.
Future<void> doUpdateAccount(int accountId, AccountsCompanion data);

/// Remove an account from the store.
Future<void> removeAccount(int accountId) async {
assert(_accounts.containsKey(accountId));
await doRemoveAccount(accountId);
if (!_accounts.containsKey(accountId)) return; // Already removed.
_accounts.remove(accountId);
_perAccountStores.remove(accountId)?.dispose();
unawaited(_perAccountStoresLoading.remove(accountId));
notifyListeners();
}

/// Remove an account from the underlying data store.
Future<void> doRemoveAccount(int accountId);

@override
String toString() => '${objectRuntimeType(this, 'GlobalStore')}#${shortHash(this)}';
}

class AccountNotFoundException implements Exception {}

/// Store for the user's data for a given Zulip account.
///
/// This should always have a consistent snapshot of the state on the server,
Expand Down Expand Up @@ -303,6 +335,14 @@ class PerAccountStore extends ChangeNotifier with EmojiStore, ChannelStore, Mess
final GlobalStore _globalStore;
final ApiConnection connection; // TODO(#135): update zulipFeatureLevel with events

UpdateMachine? get updateMachine => _updateMachine;
UpdateMachine? _updateMachine;
set updateMachine(UpdateMachine? value) {
assert(_updateMachine == null);
assert(value != null);
_updateMachine = value;
}

bool get isLoading => _isLoading;
bool _isLoading = false;
@visibleForTesting
Expand Down Expand Up @@ -361,6 +401,10 @@ class PerAccountStore extends ChangeNotifier with EmojiStore, ChannelStore, Mess
// Data attached to the self-account on the realm.

final int accountId;

/// The [Account] this store belongs to.
///
/// Will throw if called after [dispose] has been called.
Account get account => _globalStore.getAccount(accountId)!;

/// Always equal to `account.userId`.
Expand Down Expand Up @@ -439,16 +483,24 @@ class PerAccountStore extends ChangeNotifier with EmojiStore, ChannelStore, Mess
autocompleteViewManager.reassemble();
}

bool _disposed = false;

@override
void dispose() {
assert(!_disposed);
recentDmConversationsView.dispose();
unreads.dispose();
_messages.dispose();
typingStatus.dispose();
updateMachine?.dispose();
connection.close();
_disposed = true;
super.dispose();
}

Future<void> handleEvent(Event event) async {
assert(!_disposed);

switch (event) {
case HeartbeatEvent():
assert(debugLog("server event: heartbeat"));
Expand Down Expand Up @@ -590,6 +642,8 @@ class PerAccountStore extends ChangeNotifier with EmojiStore, ChannelStore, Mess
}

Future<void> sendMessage({required MessageDestination destination, required String content}) {
assert(!_disposed);

// TODO implement outbox; see design at
// https://chat.zulip.org/#narrow/stream/243-mobile-team/topic/.23M3881.20Sending.20outbox.20messages.20is.20fraught.20with.20issues/near/1405739
return _apiSendMessage(connection,
Expand Down Expand Up @@ -682,7 +736,7 @@ class LiveGlobalStore extends GlobalStore {
final AppDatabase _db;

@override
Future<PerAccountStore> loadPerAccount(int accountId) async {
Future<PerAccountStore> doLoadPerAccount(int accountId) async {
final updateMachine = await UpdateMachine.load(this, accountId);
return updateMachine.store;
}
Expand All @@ -708,6 +762,14 @@ class LiveGlobalStore extends GlobalStore {
assert(rowsAffected == 1);
}

@override
Future<void> doRemoveAccount(int accountId) async {
final rowsAffected = await (_db.delete(_db.accounts)
..where((a) => a.id.equals(accountId))
).go();
assert(rowsAffected == 1);
}

@override
String toString() => '${objectRuntimeType(this, 'LiveGlobalStore')}#${shortHash(this)}';
}
Expand All @@ -722,7 +784,9 @@ class UpdateMachine {
// case of unauthenticated access to a web-public realm. We authenticated.
throw Exception("bad initial snapshot: missing queueId");
})(),
lastEventId = initialSnapshot.lastEventId;
lastEventId = initialSnapshot.lastEventId {
store.updateMachine = this;
}

/// Load the user's data from the server, and start an event queue going.
///
Expand Down Expand Up @@ -772,6 +836,8 @@ class UpdateMachine {
final String queueId;
int lastEventId;

bool _disposed = false;

static Future<InitialSnapshot> _registerQueueWithRetry(
ApiConnection connection) async {
BackoffMachine? backoffMachine;
Expand Down Expand Up @@ -847,11 +913,13 @@ class UpdateMachine {
}

void poll() async {
BackoffMachine? backoffMachine;
assert(!_disposed);

BackoffMachine? backoffMachine;
while (true) {
if (_debugLoopSignal != null) {
await _debugLoopSignal!.future;
if (_disposed) return;
assert(() {
_debugLoopSignal = Completer();
return true;
Expand All @@ -862,13 +930,16 @@ class UpdateMachine {
try {
result = await getEvents(store.connection,
queueId: queueId, lastEventId: lastEventId);
if (_disposed) return;
} catch (e) {
if (_disposed) return;

store.isLoading = true;
switch (e) {
case ZulipApiException(code: 'BAD_EVENT_QUEUE_ID'):
assert(debugLog('Lost event queue for $store. Replacing…'));
// This disposes the store, which disposes this update machine.
await store._globalStore._reloadPerAccount(store.accountId);
dispose();
debugLog('… Event queue replaced.');
return;

Expand Down Expand Up @@ -911,6 +982,7 @@ class UpdateMachine {
final events = result.events;
for (final event in events) {
await store.handleEvent(event);
if (_disposed) return;
}
if (events.isNotEmpty) {
lastEventId = events.last.id;
Expand All @@ -926,6 +998,7 @@ class UpdateMachine {
// TODO(#322) save acked token, to dedupe updating it on the server
// TODO(#323) track the addFcmToken/etc request, warn if not succeeding
Future<void> registerNotificationToken() async {
assert(!_disposed);
if (!debugEnableRegisterNotificationToken) {
return;
}
Expand All @@ -939,8 +1012,18 @@ class UpdateMachine {
await NotificationService.registerToken(store.connection, token: token);
}

void dispose() { // TODO abort long-poll and close ApiConnection
/// Cleans up resources and tells the instance not to make new API requests.
///
/// After this is called, the instance is not in a usable state
/// and should be abandoned.
///
/// To abort polling mid-request, [store]'s [PerAccountStore.connection]
/// needs to be closed using [ApiConnection.close], which causes in-progress
/// requests to error. [PerAccountStore.dispose] does that.
void dispose() {
assert(!_disposed);
NotificationService.instance.token.removeListener(_registerNotificationToken);
_disposed = true;
}

/// In debug mode, controls whether [fetchEmojiData] should
Expand Down
14 changes: 13 additions & 1 deletion lib/widgets/store.dart
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,19 @@ class _PerAccountStoreWidgetState extends State<PerAccountStoreWidget> {
_setStore(store);
} else {
// If we don't already have data, request it.
globalStore.perAccount(widget.accountId);
() async {
try {
// If this succeeds, globalStore will notify listeners, and
// [didChangeDependencies] will run again, this time in the
// `store != null` case above.
await globalStore.perAccount(widget.accountId);
} on AccountNotFoundException {
// The account was logged out while its store was loading.
// This widget will be showing [placeholder] perpetually,
// but that's OK as long as other code will be removing it from the UI
// (for example by removing a per-account route from the nav).
}
}();
}
}

Expand Down
1 change: 1 addition & 0 deletions test/api/fake_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class FakeApiConnection extends ApiConnection {
String? body,
Duration delay = Duration.zero,
}) {
assert(isOpen);
client.prepare(
exception: exception,
httpStatus: httpStatus, json: json, body: body,
Expand Down
84 changes: 83 additions & 1 deletion test/model/store_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,84 @@ void main() {
// TODO test database gets updated correctly (an integration test with sqlite?)
});

group('GlobalStore.removeAccount', () {
void checkGlobalStore(GlobalStore store, int accountId, {
required bool expectAccount,
required bool expectStore,
}) {
expectAccount
? check(store.getAccount(accountId)).isNotNull()
: check(store.getAccount(accountId)).isNull();
expectStore
? check(store.perAccountSync(accountId)).isNotNull()
: check(store.perAccountSync(accountId)).isNull();
}

test('when store loaded', () async {
final globalStore = eg.globalStore();
await globalStore.add(eg.selfAccount, eg.initialSnapshot());
await globalStore.perAccount(eg.selfAccount.id);

checkGlobalStore(globalStore, eg.selfAccount.id,
expectAccount: true, expectStore: true);
int notifyCount = 0;
globalStore.addListener(() => notifyCount++);

await globalStore.removeAccount(eg.selfAccount.id);

// TODO test that the removed store got disposed and its connection closed
checkGlobalStore(globalStore, eg.selfAccount.id,
expectAccount: false, expectStore: false);
check(notifyCount).equals(1);
});

test('when store not loaded', () async {
final globalStore = eg.globalStore();
await globalStore.add(eg.selfAccount, eg.initialSnapshot());

checkGlobalStore(globalStore, eg.selfAccount.id,
expectAccount: true, expectStore: false);
int notifyCount = 0;
globalStore.addListener(() => notifyCount++);

await globalStore.removeAccount(eg.selfAccount.id);

checkGlobalStore(globalStore, eg.selfAccount.id,
expectAccount: false, expectStore: false);
check(notifyCount).equals(1);
});

test('when store loading', () async {
final globalStore = LoadingTestGlobalStore(accounts: [eg.selfAccount]);
checkGlobalStore(globalStore, eg.selfAccount.id,
expectAccount: true, expectStore: false);

// don't await; we'll complete/await it manually after removeAccount
final loadingFuture = globalStore.perAccount(eg.selfAccount.id);

checkGlobalStore(globalStore, eg.selfAccount.id,
expectAccount: true, expectStore: false);
int notifyCount = 0;
globalStore.addListener(() => notifyCount++);

await globalStore.removeAccount(eg.selfAccount.id);

checkGlobalStore(globalStore, eg.selfAccount.id,
expectAccount: false, expectStore: false);
check(notifyCount).equals(1);

globalStore.completers[eg.selfAccount.id]!.single
.complete(eg.store(account: eg.selfAccount, initialSnapshot: eg.initialSnapshot()));
// TODO test that the never-used store got disposed and its connection closed
await check(loadingFuture).throws<AccountNotFoundException>();
checkGlobalStore(globalStore, eg.selfAccount.id,
expectAccount: false, expectStore: false);
check(notifyCount).equals(1); // no extra notify

check(globalStore.debugNumPerAccountStoresLoading).equals(0);
});
});

group('PerAccountStore.handleEvent', () {
// Mostly this method just dispatches to ChannelStore and MessageStore etc.,
// and so most of the tests live in the test files for those
Expand Down Expand Up @@ -247,6 +325,8 @@ void main() {
test('smoke', () => awaitFakeAsync((async) async {
await prepareStore();
final users = [eg.selfUser, eg.otherUser];

globalStore.useCachedApiConnections = true;
connection.prepare(json: eg.initialSnapshot(realmUsers: users).toJson());
final updateMachine = await UpdateMachine.load(
globalStore, eg.selfAccount.id);
Expand Down Expand Up @@ -274,6 +354,7 @@ void main() {
..zulipMergeBase.equals('6.0')
..zulipFeatureLevel.equals(123);

globalStore.useCachedApiConnections = true;
connection.prepare(json: eg.initialSnapshot(
zulipVersion: '8.0+g9876',
zulipMergeBase: '8.0',
Expand All @@ -292,6 +373,7 @@ void main() {
await prepareStore();

// Try to load, inducing an error in the request.
globalStore.useCachedApiConnections = true;
connection.prepare(exception: Exception('failed'));
final future = UpdateMachine.load(globalStore, eg.selfAccount.id);
bool complete = false;
Expand Down Expand Up @@ -670,7 +752,7 @@ class LoadingTestGlobalStore extends TestGlobalStore {
Map<int, List<Completer<PerAccountStore>>> completers = {};

@override
Future<PerAccountStore> loadPerAccount(int accountId) {
Future<PerAccountStore> doLoadPerAccount(int accountId) {
final completer = Completer<PerAccountStore>();
(completers[accountId] ??= []).add(completer);
return completer.future;
Expand Down
Loading