|
21 | 21 | import static org.neo4j.driver.internal.reactive.RxUtils.createEmptyPublisher; |
22 | 22 | import static org.neo4j.driver.internal.reactive.RxUtils.createSingleItemPublisher; |
23 | 23 |
|
24 | | -import java.util.Optional; |
25 | 24 | import java.util.Set; |
26 | 25 | import java.util.concurrent.CompletableFuture; |
27 | 26 | import java.util.concurrent.CompletionException; |
@@ -151,65 +150,27 @@ public Set<Bookmark> lastBookmarks() { |
151 | 150 | protected <T> Publisher<T> run(Query query, TransactionConfig config, Function<RxResultCursor, T> cursorToResult) { |
152 | 151 | var cursorPublishFuture = new CompletableFuture<RxResultCursor>(); |
153 | 152 | var cursorReference = new AtomicReference<RxResultCursor>(); |
154 | | - return Mono.<T>create(sink -> { |
155 | | - var state = new SinkState<T>(); |
156 | | - sink.onRequest(ignored -> { |
157 | | - CompletionStage<T> stage; |
158 | | - synchronized (state) { |
159 | | - if (state.isCancelled()) { |
160 | | - return; |
161 | | - } |
162 | | - if (state.getStage() != null) { |
163 | | - return; |
164 | | - } |
165 | | - stage = runAsStage(query, config, cursorPublishFuture) |
166 | | - .thenApply(cursor -> { |
167 | | - cursorReference.set(cursor); |
168 | | - return cursor; |
169 | | - }) |
170 | | - .thenApply(cursorToResult); |
171 | | - state.setStage(stage); |
172 | | - } |
173 | | - stage.whenComplete((item, completionError) -> { |
174 | | - if (completionError == null) { |
175 | | - if (item != null) { |
176 | | - sink.success(item); |
177 | | - } else { |
178 | | - sink.error( |
179 | | - new IllegalStateException( |
180 | | - "Unexpected condition, run call has completed successfully with result being null")); |
181 | | - } |
182 | | - } else { |
183 | | - var error = Optional.ofNullable(Futures.completionExceptionCause(completionError)) |
184 | | - .orElse(completionError); |
185 | | - sink.error(error); |
186 | | - } |
187 | | - }); |
188 | | - }); |
189 | | - sink.onCancel(() -> { |
190 | | - CompletionStage<T> stage; |
191 | | - synchronized (state) { |
192 | | - if (state.isCancelled()) { |
193 | | - return; |
| 153 | + |
| 154 | + return createSingleItemPublisher( |
| 155 | + () -> runAsStage(query, config, cursorPublishFuture) |
| 156 | + .thenApply(cursor -> { |
| 157 | + cursorReference.set(cursor); |
| 158 | + return cursor; |
| 159 | + }) |
| 160 | + .thenApply(cursorToResult), |
| 161 | + () -> new IllegalStateException( |
| 162 | + "Unexpected condition, run call has completed successfully with result being null"), |
| 163 | + value -> { |
| 164 | + if (value != null) { |
| 165 | + cursorReference.get().rollback().whenComplete((unused, throwable) -> { |
| 166 | + if (throwable != null) { |
| 167 | + cursorPublishFuture.completeExceptionally(throwable); |
| 168 | + } else { |
| 169 | + cursorPublishFuture.complete(null); |
| 170 | + } |
| 171 | + }); |
194 | 172 | } |
195 | | - state.setCancelled(true); |
196 | | - stage = state.getStage(); |
197 | | - } |
198 | | - if (stage != null) { |
199 | | - stage.whenComplete((value, ignored) -> { |
200 | | - if (value != null) { |
201 | | - cursorReference.get().rollback().whenComplete((unused, throwable) -> { |
202 | | - if (throwable != null) { |
203 | | - cursorPublishFuture.completeExceptionally(throwable); |
204 | | - } else { |
205 | | - cursorPublishFuture.complete(null); |
206 | | - } |
207 | | - }); |
208 | | - } |
209 | | - }); |
210 | | - } |
211 | | - }); |
212 | | - }) |
| 173 | + }) |
213 | 174 | .doOnNext(value -> cursorPublishFuture.complete(cursorReference.get())) |
214 | 175 | .doOnError(cursorPublishFuture::completeExceptionally); |
215 | 176 | } |
@@ -256,25 +217,4 @@ private <T> CompletionStage<T> releaseConnectionAndRethrow(Throwable throwable) |
256 | 217 | protected <T> Publisher<T> doClose() { |
257 | 218 | return createEmptyPublisher(session::closeAsync); |
258 | 219 | } |
259 | | - |
260 | | - private static class SinkState<T> { |
261 | | - private CompletionStage<T> stage; |
262 | | - private boolean cancelled; |
263 | | - |
264 | | - public CompletionStage<T> getStage() { |
265 | | - return stage; |
266 | | - } |
267 | | - |
268 | | - public void setStage(CompletionStage<T> stage) { |
269 | | - this.stage = stage; |
270 | | - } |
271 | | - |
272 | | - public boolean isCancelled() { |
273 | | - return cancelled; |
274 | | - } |
275 | | - |
276 | | - public void setCancelled(boolean cancelled) { |
277 | | - this.cancelled = cancelled; |
278 | | - } |
279 | | - } |
280 | 220 | } |
0 commit comments