Skip to content

Commit 7091faf

Browse files
committed
Fix handling pause immediately after emit
1 parent 96ea53b commit 7091faf

File tree

2 files changed

+66
-5
lines changed

2 files changed

+66
-5
lines changed

packages/sqlite_async/lib/src/update_notification.dart

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ Stream<T> _throttleStream<T extends Object>({
9797
return Stream.multi((listener) {
9898
T? pendingData;
9999
Timer? activeTimeoutWindow;
100+
var needsTimeoutWindowAfterResume = false;
100101

101102
/// Add pending data, bypassing the active timeout window.
102103
///
@@ -106,6 +107,7 @@ Stream<T> _throttleStream<T extends Object>({
106107
pendingData = null;
107108
listener.addSync(data);
108109
activeTimeoutWindow?.cancel();
110+
activeTimeoutWindow = null;
109111
return true;
110112
} else {
111113
return false;
@@ -118,10 +120,14 @@ Stream<T> _throttleStream<T extends Object>({
118120
if (activeTimeoutWindow == null && !listener.isPaused) {
119121
final didAdd = addPendingEvents();
120122
if (didAdd) {
121-
activeTimeoutWindow = Timer(timeout, () {
122-
activeTimeoutWindow = null;
123-
maybeEmit();
124-
});
123+
if (listener.isPaused) {
124+
needsTimeoutWindowAfterResume = true;
125+
} else {
126+
activeTimeoutWindow = Timer(timeout, () {
127+
activeTimeoutWindow = null;
128+
maybeEmit();
129+
});
130+
}
125131
}
126132
}
127133
}
@@ -152,11 +158,11 @@ Stream<T> _throttleStream<T extends Object>({
152158
}
153159

154160
final subscription = input.listen(onData, onError: onError, onDone: onDone);
155-
var needsTimeoutWindowAfterResume = false;
156161

157162
listener.onPause = () {
158163
needsTimeoutWindowAfterResume = activeTimeoutWindow != null;
159164
activeTimeoutWindow?.cancel();
165+
activeTimeoutWindow = null;
160166
};
161167
listener.onResume = () {
162168
if (needsTimeoutWindowAfterResume) {

packages/sqlite_async/test/update_notification_test.dart

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,61 @@ void main() {
4747
});
4848
});
4949

50+
test('increases delay after pause', () {
51+
fakeAsync((control) {
52+
final source = StreamController<UpdateNotification>(sync: true);
53+
final events = <UpdateNotification>[];
54+
55+
final sub = UpdateNotification.throttleStream(source.stream, timeout)
56+
.listen(null);
57+
sub.onData((event) {
58+
events.add(event);
59+
sub.pause();
60+
});
61+
62+
source.add(UpdateNotification({'a'}));
63+
control.elapse(timeout);
64+
expect(events, hasLength(1));
65+
66+
// Assume the stream stays paused for the timeout window that would
67+
// be created after emitting the notification.
68+
control.elapse(timeout * 2);
69+
source.add(UpdateNotification({'b'}));
70+
control.elapse(timeout * 2);
71+
72+
// A full timeout needs to pass after resuming before a new item is
73+
// emitted.
74+
sub.resume();
75+
expect(events, hasLength(1));
76+
77+
control.elapse(halfTimeout);
78+
expect(events, hasLength(1));
79+
control.elapse(halfTimeout);
80+
expect(events, hasLength(2));
81+
});
82+
});
83+
84+
test('does not introduce artificial delay in pause', () {
85+
fakeAsync((control) {
86+
final source = StreamController<UpdateNotification>(sync: true);
87+
final events = <UpdateNotification>[];
88+
89+
final sub = UpdateNotification.throttleStream(source.stream, timeout)
90+
.listen(events.add);
91+
92+
// Await the initial delay
93+
control.elapse(timeout);
94+
95+
sub.pause();
96+
source.add(UpdateNotification({'a'}));
97+
// Resuming should not introduce a timeout window because no window
98+
// was active when the stream was paused.
99+
sub.resume();
100+
control.flushMicrotasks();
101+
expect(events, hasLength(1));
102+
});
103+
});
104+
50105
test('merges events', () {
51106
fakeAsync((control) {
52107
final source = StreamController<UpdateNotification>(sync: true);

0 commit comments

Comments
 (0)