Skip to content

Use of block on save document with reactive repository #1530

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
rbleuse opened this issue Aug 5, 2022 · 1 comment · Fixed by #1537
Closed

Use of block on save document with reactive repository #1530

rbleuse opened this issue Aug 5, 2022 · 1 comment · Fixed by #1537
Labels
status: waiting-for-triage An issue we've not yet triaged type: bug A general bug

Comments

@rbleuse
Copy link
Contributor

rbleuse commented Aug 5, 2022

A simple repository.save(schedule) gives me this exception :

stack trace
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ HTTP POST "/schedule" [ExceptionHandlingWebHandler]
Original Stack Trace:
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Mono.block(Mono.java:1675) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate.save(ReactiveCouchbaseTemplate.java:94) ~[spring-data-couchbase-5.0.0-SNAPSHOT.jar:5.0.0-SNAPSHOT]
		at org.springframework.data.couchbase.repository.support.SimpleReactiveCouchbaseRepository.save(SimpleReactiveCouchbaseRepository.java:100) ~[spring-data-couchbase-5.0.0-SNAPSHOT.jar:5.0.0-SNAPSHOT]
		at org.springframework.data.couchbase.repository.support.SimpleReactiveCouchbaseRepository.save(SimpleReactiveCouchbaseRepository.java:79) ~[spring-data-couchbase-5.0.0-SNAPSHOT.jar:5.0.0-SNAPSHOT]
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
		at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
		at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
		at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:289) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:137) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:121) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:516) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:285) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:628) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:160) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:135) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.data.couchbase.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:173) ~[spring-data-couchbase-5.0.0-SNAPSHOT.jar:5.0.0-SNAPSHOT]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.data.repository.core.support.MethodInvocationValidator.invoke(MethodInvocationValidator.java:94) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at jdk.proxy2/jdk.proxy2.$Proxy102.save(Unknown Source) ~[na:na]
		at com.rbleuse.spring.reactive.couchbase.service.ScheduleService.saveScheduleWithRepo(ScheduleService.kt:19) ~[main/:na]
		at com.rbleuse.spring.reactive.couchbase.service.ScheduleService$$FastClassBySpringCGLIB$$3d816b45.invoke(<generated>) ~[main/:na]
		at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.CglibAopProxy.invokeMethod(CglibAopProxy.java:386) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:704) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at com.rbleuse.spring.reactive.couchbase.service.ScheduleService$$EnhancerBySpringCGLIB$$1c17fc5d.saveScheduleWithRepo(<generated>) ~[main/:na]
		at com.rbleuse.spring.reactive.couchbase.handler.ScheduleHandler.createSchedule(ScheduleHandler.kt:22) ~[main/:na]
		at org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter.handle(HandlerFunctionAdapter.java:61) ~[spring-webflux-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.web.reactive.DispatcherHandler.invokeHandler(DispatcherHandler.java:171) ~[spring-webflux-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.web.reactive.DispatcherHandler.lambda$handle$1(DispatcherHandler.java:153) ~[spring-webflux-6.0.0-M5.jar:6.0.0-M5]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:258) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2194) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2068) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onSubscribe(FluxConcatMapNoPrefetch.java:164) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Mono.subscribe(Mono.java:4321) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:967) ~[reactor-netty-http-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677) ~[reactor-netty-core-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:478) ~[reactor-netty-core-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:570) ~[reactor-netty-http-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.1.0-M4.jar:1.1.0-M4]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:222) ~[reactor-netty-http-1.1.0-M4.jar:1.1.0-M4]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Originally posted by @rbleuse in #1527 (comment)

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Aug 5, 2022
@mikereiche
Copy link
Collaborator

Workaround is to use template.insertById() or upsertById() or replaceById()

@mikereiche mikereiche added the type: bug A general bug label Aug 9, 2022
mikereiche added a commit that referenced this issue Aug 11, 2022
mikereiche added a commit that referenced this issue Aug 12, 2022
* Fix block() issues with reactive transactions.

Closes #1530.

* Removing the form of checkForTransactionInThreadLocalStorage (#1538)

that does not use TransactionMarkerOwner.  As this bypasses
a critical check on whether we are inside a blocking
transaction.

Two new tests in SDKTransactionsSaveIntegrationTests
(reactiveSaveInBlockingTransaction and blockingSaveInBlockingTransaction)
will fail without this, as the .save() does not realise
it's in a transaction.

Adding new tests, for .save() and for performing transactions
in reactor blocking threads.

Co-authored-by: Graham Pople <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: waiting-for-triage An issue we've not yet triaged type: bug A general bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants