Skip to content

Commit fa2aa5e

Browse files
committed
store: Replace event queue on expiry
Fixes: #185
1 parent 49eb62f commit fa2aa5e

File tree

3 files changed

+86
-14
lines changed

3 files changed

+86
-14
lines changed

lib/model/store.dart

Lines changed: 37 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import 'package:path/path.dart' as p;
88
import 'package:path_provider/path_provider.dart';
99

1010
import '../api/core.dart';
11+
import '../api/exception.dart';
1112
import '../api/model/events.dart';
1213
import '../api/model/initial_snapshot.dart';
1314
import '../api/model/model.dart';
@@ -96,12 +97,26 @@ abstract class GlobalStore extends ChangeNotifier {
9697
future = loadPerAccount(account!);
9798
_perAccountStoresLoading[accountId] = future;
9899
store = await future;
99-
_perAccountStores[accountId] = store;
100+
_setPerAccount(accountId, store);
100101
_perAccountStoresLoading.remove(accountId);
101-
notifyListeners();
102102
return store;
103103
}
104104

105+
Future<void> _reloadPerAccount(Account account) async {
106+
assert(identical(_accounts[account.id], account));
107+
assert(_perAccountStores.containsKey(account.id));
108+
assert(!_perAccountStoresLoading.containsKey(account.id));
109+
final store = await loadPerAccount(account);
110+
_setPerAccount(account.id, store);
111+
}
112+
113+
void _setPerAccount(int accountId, PerAccountStore store) {
114+
final oldStore = _perAccountStores[accountId];
115+
_perAccountStores[accountId] = store;
116+
notifyListeners();
117+
oldStore?.dispose();
118+
}
119+
105120
/// Load per-account data for the given account, unconditionally.
106121
///
107122
/// This method should be called only by the implementation of [perAccount].
@@ -199,8 +214,6 @@ class PerAccountStore extends ChangeNotifier with StreamStore {
199214
}) : _globalStore = globalStore,
200215
_streams = streams;
201216

202-
// We'll use this in an upcoming commit.
203-
// ignore: unused_field
204217
final GlobalStore _globalStore;
205218

206219
final Account account;
@@ -568,9 +581,26 @@ class UpdateMachine {
568581
}());
569582
}
570583

571-
final result = await getEvents(store.connection,
572-
queueId: queueId, lastEventId: lastEventId);
573-
// TODO handle errors on get-events; retry with backoff
584+
final GetEventsResult result;
585+
try {
586+
result = await getEvents(store.connection,
587+
queueId: queueId, lastEventId: lastEventId);
588+
} catch (e) {
589+
switch (e) {
590+
case ZulipApiException(code: 'BAD_EVENT_QUEUE_ID'):
591+
assert(debugLog('Lost event queue for $store. Replacing…'));
592+
await store._globalStore._reloadPerAccount(store.account);
593+
dispose();
594+
debugLog('… Event queue replaced.');
595+
return;
596+
597+
default:
598+
assert(debugLog('Error polling event queue for $store: $e'));
599+
// TODO(#184) handle errors on get-events; retry with backoff
600+
rethrow;
601+
}
602+
}
603+
574604
final events = result.events;
575605
for (final event in events) {
576606
store.handleEvent(event);

lib/widgets/message_list.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ class _MessageListState extends State<MessageList> with PerAccountStoreAwareStat
186186
}
187187

188188
@override
189-
void onNewStore() {
189+
void onNewStore() { // TODO(#464) try to keep using old model until new one gets messages
190190
model?.dispose();
191191
_initModel(PerAccountStoreWidget.of(context));
192192
}

test/model/store_test.dart

Lines changed: 48 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -140,18 +140,26 @@ void main() {
140140
});
141141

142142
group('UpdateMachine.poll', () {
143+
late TestGlobalStore globalStore;
143144
late UpdateMachine updateMachine;
144145
late PerAccountStore store;
145146
late FakeApiConnection connection;
146147

147-
void prepareStore({int? lastEventId}) {
148-
updateMachine = eg.updateMachine(initialSnapshot: eg.initialSnapshot(
149-
lastEventId: lastEventId,
150-
));
148+
void updateFromGlobalStore() {
149+
updateMachine = globalStore.updateMachines[eg.selfAccount.id]!;
151150
store = updateMachine.store;
151+
assert(identical(store, globalStore.perAccountSync(eg.selfAccount.id)));
152152
connection = store.connection as FakeApiConnection;
153153
}
154154

155+
Future<void> prepareStore({int? lastEventId}) async {
156+
globalStore = TestGlobalStore(accounts: []);
157+
await globalStore.add(eg.selfAccount, eg.initialSnapshot(
158+
lastEventId: lastEventId));
159+
await globalStore.perAccount(eg.selfAccount.id);
160+
updateFromGlobalStore();
161+
}
162+
155163
void checkLastRequest({required int lastEventId}) {
156164
check(connection.lastRequest).isA<http.Request>()
157165
..method.equals('GET')
@@ -163,7 +171,7 @@ void main() {
163171
}
164172

165173
test('loops on success', () async {
166-
prepareStore(lastEventId: 1);
174+
await prepareStore(lastEventId: 1);
167175
check(updateMachine.lastEventId).equals(1);
168176

169177
updateMachine.debugPauseLoop();
@@ -191,7 +199,7 @@ void main() {
191199
});
192200

193201
test('handles events', () async {
194-
prepareStore();
202+
await prepareStore();
195203
updateMachine.debugPauseLoop();
196204
updateMachine.poll();
197205

@@ -206,6 +214,40 @@ void main() {
206214
await Future.delayed(Duration.zero);
207215
check(store.userSettings!.twentyFourHourTime).isTrue();
208216
});
217+
218+
test('handles expired queue', () async {
219+
await prepareStore();
220+
updateMachine.debugPauseLoop();
221+
updateMachine.poll();
222+
check(globalStore.perAccountSync(store.account.id)).identicalTo(store);
223+
224+
// Let the server expire the event queue.
225+
connection.prepare(httpStatus: 400, json: {
226+
'result': 'error', 'code': 'BAD_EVENT_QUEUE_ID',
227+
'queue_id': updateMachine.queueId,
228+
'msg': 'Bad event queue ID: ${updateMachine.queueId}',
229+
});
230+
updateMachine.debugAdvanceLoop();
231+
await null;
232+
await Future.delayed(Duration.zero);
233+
234+
// The global store has a new store.
235+
check(globalStore.perAccountSync(store.account.id)).not(it()..identicalTo(store));
236+
updateFromGlobalStore();
237+
238+
// The new UpdateMachine updates the new store.
239+
updateMachine.debugPauseLoop();
240+
updateMachine.poll();
241+
check(store.userSettings!.twentyFourHourTime).isFalse();
242+
connection.prepare(json: GetEventsResult(events: [
243+
UserSettingsUpdateEvent(id: 2,
244+
property: UserSettingName.twentyFourHourTime, value: true),
245+
], queueId: null).toJson());
246+
updateMachine.debugAdvanceLoop();
247+
await null;
248+
await Future.delayed(Duration.zero);
249+
check(store.userSettings!.twentyFourHourTime).isTrue();
250+
});
209251
});
210252

211253
group('UpdateMachine.registerNotificationToken', () {

0 commit comments

Comments
 (0)