Skip to content

reactive @Transactional fails with java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking #1527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mikereiche opened this issue Aug 4, 2022 · 13 comments · Fixed by #1528

Comments

@mikereiche
Copy link
Collaborator

mikereiche commented Aug 4, 2022

From #1524.

The underlying issue is that TransactionInterceptor is being called instead of CouchbaseTransactionInterceptor. And it should be calling executeReactive() instead of execute().

@Service
class ScheduleService(
    private val reactiveOperations: ReactiveCouchbaseOperations
) {
    @Transactional
    fun saveScheduleWithTransaction(schedule: Schedule): Mono<Schedule> {
        return reactiveOperations.insertById(Schedule::class.java).one(schedule)
    }
}
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ HTTP POST "/schedule" [ExceptionHandlingWebHandler]
Original Stack Trace:
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Mono.block(Mono.java:1675) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at org.springframework.data.couchbase.transaction.CouchbaseCallbackTransactionManager.handlePropagation(CouchbaseCallbackTransactionManager.java:191) ~[spring-data-couchbase-5.0.0-M5.jar:5.0.0-M5]
		at org.springframework.data.couchbase.transaction.CouchbaseCallbackTransactionManager.execute(CouchbaseCallbackTransactionManager.java:76) ~[spring-data-couchbase-5.0.0-M5.jar:5.0.0-M5]
		at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:417) ~[spring-tx-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119) ~[spring-tx-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:708) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at com.rbleuse.spring.reactive.couchbase.service.ScheduleService$$EnhancerBySpringCGLIB$$f091fb23.saveScheduleWithTransaction(<generated>) ~[main/:na]
		at com.rbleuse.spring.reactive.couchbase.handler.ScheduleHandler.createSchedule(ScheduleHandler.kt:20) ~[main/:na]
@mikereiche
Copy link
Collaborator Author

mikereiche commented Aug 4, 2022

I was able to reproduce this. There are a couple issues - one is the call to block() in the non-blocking thread, and the other is that the TransactionInterceptor is used instead of the CouchbaseTransactionInterceptor.

If you create a ScheduleService that uses CouchbaseOperations instead of ReactiveOperations, then you will avoid the issue (I realize this isn't the same functionality, but it will let you try out transactions in spring).

@rbleuse
Copy link
Contributor

rbleuse commented Aug 4, 2022

Thanks for your suggestion !

@mikereiche
Copy link
Collaborator Author

I think you might be missing the @EnableTransactionManagement in your configuration class which extends AbstractCouchbaseConfiguration. (If it's in a different class, you might have transaction management, but it won't use the CouchbaseTransactionInterceptor, and the default TransactionInterceptor will attempt to execute reactive transactions as non-reactive transactions.

@Configuration
@EnableCouchbaseRepositories(...)
@EnableTransactionManagement // must be in the class that extends AbstractCouchbaseConfiguration
public class Config extends AbstractCouchbaseConfiguration {

If you print out the stacktrace in your @transactional method, it should show CouchbaseTransactionInterceptor in the stacktrace.
(new Exception(Thread.currentThread().getName())).printStackTrace();

java.lang.Exception: cb-txn-9
	at com.example.demo.UserService.insertReactive(UserService.java:24)
	at com.example.demo.UserService$$FastClassBySpringCGLIB$$86043490.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:793)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:763)
	at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
	at org.springframework.data.couchbase.transaction.CouchbaseTransactionInterceptor.lambda$invokeWithinTransaction$0(CouchbaseTransactionInterceptor.java:63)
	at org.springframework.data.couchbase.transaction.CouchbaseCallbackTransactionManager.lambda$executeNewReactiveTr

@rbleuse
Copy link
Contributor

rbleuse commented Aug 4, 2022

Thanks for investigating.

A couple of things :

I don't override AbstractCouchbaseConfiguration, instead I rely on the auto configuration with those parameters :

spring:
  couchbase:
    connection-string: couchbase://127.0.0.1
    username: admin
    password: password
  data:
    couchbase:
      bucket-name: test

And I declared a configuration like this :

@Configuration
@EnableTransactionManagement
class CouchbaseConfig {

    @Bean(BeanNames.COUCHBASE_TRANSACTION_MANAGER)
    fun couchbaseTransactionManager(clientFactory: CouchbaseClientFactory): CouchbaseCallbackTransactionManager {
        return CouchbaseCallbackTransactionManager(clientFactory)
    }
}

This is my set up with current exception.

Now I tried to follow your suggestion, I keep same code base but instead declare a config that extends AbstractCouchbaseConfiguration :

@Configuration
@EnableTransactionManagement
class CouchbaseConfig(
    private val properties: CouchbaseProperties
) : AbstractCouchbaseConfiguration() {
    override fun getConnectionString(): String {
        return properties.connectionString
    }

    override fun getUserName(): String {
        return properties.username
    }

    override fun getPassword(): String {
        return properties.password
    }

    override fun getBucketName(): String {
        return "test"
    }
}

By doing so, indeed as you mentioned I have to add allow-bean-definition-overriding: true else the app doesn't start at all :

The bean 'transactionInterceptor', defined in class path resource [com/rbleuse/spring/reactive/couchbase/config/CouchbaseConfig.class], could not be registered. A bean with that name has already been defined in class path resource [org/springframework/transaction/annotation/ProxyTransactionManagementConfiguration.class] and overriding is disabled.

Then after adding this configuration properties, the app starts well but I still have the same exception.
This exception occurs before entering my service saveScheduleWithTransaction function, annotated with @Transactional.
If I comment the annotation, my function is well executed (without transaction of course).

I can push my code on a dummy repo if you would like to investigate with my configuration

@mikereiche
Copy link
Collaborator Author

Thanks for your patience.
Yes, if you could make a repo that I could access to reproduce the issue that would help.
If you want to try something yourself - Maybe completely abandon the spring: properties and use only the CouchbaseConfig class - it will also need @EnableCouchbaseRepository. (all our testing is done with only the configuration class).

Then after adding this configuration properties, the app starts well but I still have the same exception.
This exception occurs before entering my service saveScheduleWithTransaction function, annotated with @transactional.

I lost track - which exception? The block() exception? Is CouchbaseTransactionInterceptor in the stack trace?

@rbleuse
Copy link
Contributor

rbleuse commented Aug 4, 2022

I lost track - which exception? The block() exception? Is CouchbaseTransactionInterceptor in the stack trace?

Yes the blocking exception, a bit different stack trace now but still referring to CouchbaseCallbackTransactionManager.java:191 :

stack trace
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ HTTP POST "/schedule" [ExceptionHandlingWebHandler]
Original Stack Trace:
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Mono.block(Mono.java:1675) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at org.springframework.data.couchbase.transaction.CouchbaseCallbackTransactionManager.handlePropagation(CouchbaseCallbackTransactionManager.java:191) ~[spring-data-couchbase-5.0.0-M5.jar:5.0.0-M5]
		at org.springframework.data.couchbase.transaction.CouchbaseCallbackTransactionManager.lambda$executeReactive$0(CouchbaseCallbackTransactionManager.java:91) ~[spring-data-couchbase-5.0.0-M5.jar:5.0.0-M5]
		at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:46) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:258) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:122) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:201) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.onNext(FluxConcatArray.java:201) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2194) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2068) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onSubscribe(FluxConcatMapNoPrefetch.java:164) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Mono.subscribe(Mono.java:4321) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:967) ~[reactor-netty-http-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677) ~[reactor-netty-core-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:478) ~[reactor-netty-core-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:570) ~[reactor-netty-http-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.1.0-M4.jar:1.1.0-M4]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:222) ~[reactor-netty-http-1.1.0-M4.jar:1.1.0-M4]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

I'll push my code on a reroducible repo

@rbleuse
Copy link
Contributor

rbleuse commented Aug 4, 2022

Here it is : https://github.com/rbleuse/spring-reactive-couchbase
It's using spring boot 3.0.0-M4 with latest couchbase snapshot

Just run the spring boot app and a curl curl --location --request POST 'localhost:8080/schedule'

By the way, saving with a repository (without transaction) doesn't work since 5.0.0-M5 (works fine with M4), I added comments about it, if you want to investigate.
Thanks for your help

@mikereiche
Copy link
Collaborator Author

 		at org.springframework.data.couchbase.transaction.CouchbaseCallbackTransactionManager.handlePropagation(CouchbaseCallbackTransactionManager.java:191) ~[spring-data-couchbase-5.0.0-M5.jar:5.0.0-M5]
		at org.springframework.data.couchbase.transaction.CouchbaseCallbackTransactionManager.lambda$executeReactive$0(CouchbaseCallbackTransactionManager.java:91) ~[spring-data-couchbase-5.0.0-M5.jar:5.0.0-M5]

Ok - I can fix this.

@mikereiche
Copy link
Collaborator Author

@rbleuse - this should work now. I tried it on your project.

@rbleuse
Copy link
Contributor

rbleuse commented Aug 5, 2022

@mikereiche indeed now transaction works with latest snapshot !

In my dummy repo there's a second exception that is showcased but was commented.
https://github.com/rbleuse/spring-reactive-couchbase/blob/main/src/main/kotlin/com/rbleuse/spring/reactive/couchbase/handler/ScheduleHandler.kt#L21
A simple repository.save(schedule) gives me this exception (still occurs with latest snapshot) :

stack trace
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-3
	at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
	Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
	*__checkpoint ⇢ org.springframework.boot.actuate.metrics.web.reactive.server.MetricsWebFilter [DefaultWebFilterChain]
	*__checkpoint ⇢ HTTP POST "/schedule" [ExceptionHandlingWebHandler]
Original Stack Trace:
		at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Mono.block(Mono.java:1675) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at org.springframework.data.couchbase.core.ReactiveCouchbaseTemplate.save(ReactiveCouchbaseTemplate.java:94) ~[spring-data-couchbase-5.0.0-SNAPSHOT.jar:5.0.0-SNAPSHOT]
		at org.springframework.data.couchbase.repository.support.SimpleReactiveCouchbaseRepository.save(SimpleReactiveCouchbaseRepository.java:100) ~[spring-data-couchbase-5.0.0-SNAPSHOT.jar:5.0.0-SNAPSHOT]
		at org.springframework.data.couchbase.repository.support.SimpleReactiveCouchbaseRepository.save(SimpleReactiveCouchbaseRepository.java:79) ~[spring-data-couchbase-5.0.0-SNAPSHOT.jar:5.0.0-SNAPSHOT]
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
		at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77) ~[na:na]
		at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
		at java.base/java.lang.reflect.Method.invoke(Method.java:568) ~[na:na]
		at org.springframework.data.repository.core.support.RepositoryMethodInvoker$RepositoryFragmentMethodInvoker.lambda$new$0(RepositoryMethodInvoker.java:289) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:137) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:121) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.RepositoryComposition$RepositoryFragments.invoke(RepositoryComposition.java:516) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.RepositoryComposition.invoke(RepositoryComposition.java:285) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.RepositoryFactorySupport$ImplementationMethodExecutionInterceptor.invoke(RepositoryFactorySupport.java:628) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:160) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:135) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.data.couchbase.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:173) ~[spring-data-couchbase-5.0.0-SNAPSHOT.jar:5.0.0-SNAPSHOT]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.data.repository.core.support.MethodInvocationValidator.invoke(MethodInvocationValidator.java:94) ~[spring-data-commons-3.0.0-M5.jar:3.0.0-M5]
		at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at jdk.proxy2/jdk.proxy2.$Proxy102.save(Unknown Source) ~[na:na]
		at com.rbleuse.spring.reactive.couchbase.service.ScheduleService.saveScheduleWithRepo(ScheduleService.kt:19) ~[main/:na]
		at com.rbleuse.spring.reactive.couchbase.service.ScheduleService$$FastClassBySpringCGLIB$$3d816b45.invoke(<generated>) ~[main/:na]
		at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.CglibAopProxy.invokeMethod(CglibAopProxy.java:386) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:704) ~[spring-aop-6.0.0-M5.jar:6.0.0-M5]
		at com.rbleuse.spring.reactive.couchbase.service.ScheduleService$$EnhancerBySpringCGLIB$$1c17fc5d.saveScheduleWithRepo(<generated>) ~[main/:na]
		at com.rbleuse.spring.reactive.couchbase.handler.ScheduleHandler.createSchedule(ScheduleHandler.kt:22) ~[main/:na]
		at org.springframework.web.reactive.function.server.support.HandlerFunctionAdapter.handle(HandlerFunctionAdapter.java:61) ~[spring-webflux-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.web.reactive.DispatcherHandler.invokeHandler(DispatcherHandler.java:171) ~[spring-webflux-6.0.0-M5.jar:6.0.0-M5]
		at org.springframework.web.reactive.DispatcherHandler.lambda$handle$1(DispatcherHandler.java:153) ~[spring-webflux-6.0.0-M5.jar:6.0.0-M5]
		at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:125) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:258) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2398) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.request(FluxPeekFuseable.java:144) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2158) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.request(MonoNext.java:108) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2194) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2068) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoNext$NextSubscriber.onSubscribe(MonoNext.java:70) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.onSubscribe(FluxConcatMapNoPrefetch.java:164) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:165) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:87) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.Mono.subscribe(Mono.java:4321) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:263) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.core.publisher.MonoDeferContextual.subscribe(MonoDeferContextual.java:55) ~[reactor-core-3.5.0-M4.jar:3.5.0-M4]
		at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:967) ~[reactor-netty-http-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:677) ~[reactor-netty-core-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:478) ~[reactor-netty-core-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:570) ~[reactor-netty-http-1.1.0-M4.jar:1.1.0-M4]
		at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:93) ~[reactor-netty-core-1.1.0-M4.jar:1.1.0-M4]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:222) ~[reactor-netty-http-1.1.0-M4.jar:1.1.0-M4]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:299) ~[netty-codec-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) ~[netty-transport-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.79.Final.jar:4.1.79.Final]
		at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

Should I create another issue for it ?

@mikereiche
Copy link
Collaborator Author

Yes please open an issue.
Sorry for the issues with block(). We do the testing in java as below, and it's not in a non-blocking thread.

	@GetMapping("/hello")
	public Mono<ResponseEntity<String>> hello(@RequestParam(value = "name", defaultValue = "World") String name) {
		return Mono
				.just(new ResponseEntity<>(userService.insertReactive().block().toString(), HttpStatusCode.valueOf(200)));
	}

@rbleuse
Copy link
Contributor

rbleuse commented Aug 5, 2022

No worries it's milestone/snapshot versions, it's not supposed to be perfect ;)

@rbleuse
Copy link
Contributor

rbleuse commented Aug 6, 2022

Sorry for the issues with block(). We do the testing in java as below, and it's not in a non-blocking thread.

@GetMapping("/hello")
 public Mono<ResponseEntity<String>> hello(@RequestParam(value = "name", defaultValue = "World") String name) {
 	return Mono
 			.just(new ResponseEntity<>(userService.insertReactive().block().toString(), HttpStatusCode.valueOf(200)));
 }

I didn't notice earlier, but may I know why there's a block when we want a Mono as result in this endpoint ?
Doesn't it work with a non blocking process ? Something like (didn't try)

@GetMapping("/hello")
public Mono<ResponseEntity<String>> hello(@RequestParam(value = "name", defaultValue = "World") String name) {
       return userService.insertReactive().map(value -> new ResponseEntity<>(value.toString(), HttpStatusCode.valueOf(200)));
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants