@@ -101,6 +101,19 @@ final class ValueConcurrentObserver<Reducer: ValueReducer, Scheduler: ValueObser
101101 }
102102 }
103103
104+ /// The fetching state for observation of constant regions.
105+ enum FetchingState {
106+ /// No need to fetch.
107+ case idle
108+
109+ /// Waiting for a fetched value.
110+ case fetching
111+
112+ /// Waiting for a fetched value, and for a subsequent fetch after
113+ /// that, because a change has been detected as we were fetching.
114+ case fetchingAndNeedsFetch
115+ }
116+
104117 /// Ability to notify observation events
105118 private struct NotificationCallbacks {
106119 let events : ValueObservationEvents
@@ -132,6 +145,9 @@ final class ValueConcurrentObserver<Reducer: ValueReducer, Scheduler: ValueObser
132145 /// Ability to notify observation events, protected by `lock`.
133146 private var notificationCallbacks : NotificationCallbacks ?
134147
148+ /// The fetching state for observation of constant regions.
149+ @LockedBox private var fetchingState = FetchingState . idle
150+
135151 /// Support for `TransactionObserver`, protected by the serialized writer
136152 /// dispatch queue.
137153 private var observationState = ObservationState . notObserving
@@ -692,13 +708,10 @@ extension ValueConcurrentObserver: TransactionObserver {
692708 events. databaseDidChange ? ( )
693709
694710 // Fetch
695- let future : DatabaseFuture < Reducer . Fetched >
696-
697711 switch trackingMode {
698712 case . constantRegion, . constantRegionRecordedFromSelection:
699- future = databaseAccess. dbPool. concurrentRead { db in
700- try databaseAccess. fetch ( db)
701- }
713+ setNeedsFetching ( databaseAccess: databaseAccess)
714+
702715 case . nonConstantRegionRecordedFromSelection:
703716 // When the tracked region is not constant, we can't perform
704717 // concurrent fetches of observed values.
@@ -723,26 +736,63 @@ extension ValueConcurrentObserver: TransactionObserver {
723736 }
724737
725738 observationState. region = observedRegion
726- future = DatabaseFuture ( . success( fetchedValue) )
739+ reduce ( . success( fetchedValue) )
727740 } catch {
728741 stopDatabaseObservation ( writerDB)
729742 notifyError ( error)
730743 return
731744 }
732745 }
733-
734- // Reduce
735- //
736- // Reducing is performed asynchronously, so that we do not lock
737- // the writer dispatch queue longer than necessary.
738- //
739- // Important: reduceQueue.async guarantees the same ordering between
740- // transactions and notifications!
746+ }
747+
748+ private func setNeedsFetching( databaseAccess: DatabaseAccess ) {
749+ $fetchingState. update { state in
750+ switch state {
751+ case . idle:
752+ state = . fetching
753+ asyncFetch ( databaseAccess: databaseAccess)
754+
755+ case . fetching:
756+ state = . fetchingAndNeedsFetch
757+
758+ case . fetchingAndNeedsFetch:
759+ break
760+ }
761+ }
762+ }
763+
764+ private func asyncFetch( databaseAccess: DatabaseAccess ) {
765+ databaseAccess. dbPool. asyncRead { [ self ] dbResult in
766+ let isNotifying = self . lock. synchronized { self . notificationCallbacks != nil }
767+ guard isNotifying else { return /* Cancelled */ }
768+
769+ let fetchResult = dbResult. flatMap { db in
770+ Result { try databaseAccess. fetch ( db) }
771+ }
772+
773+ self . reduce ( fetchResult)
774+
775+ $fetchingState. update { state in
776+ switch state {
777+ case . idle:
778+ // GRDB bug
779+ preconditionFailure ( )
780+
781+ case . fetching:
782+ state = . idle
783+
784+ case . fetchingAndNeedsFetch:
785+ state = . fetching
786+ asyncFetch ( databaseAccess: databaseAccess)
787+ }
788+ }
789+ }
790+ }
791+
792+ private func reduce( _ fetchResult: Result < Reducer . Fetched , Error > ) {
741793 reduceQueue. async {
742794 do {
743- // Wait until fetch has completed
744- // TODO: find a way to guarantee correct ordering without waiting for a semaphore and blocking a thread.
745- let fetchedValue = try future. wait ( )
795+ let fetchedValue = try fetchResult. get ( )
746796
747797 let isNotifying = self . lock. synchronized { self . notificationCallbacks != nil }
748798 guard isNotifying else { return /* Cancelled */ }
0 commit comments