I have an issue of Spring Webflux project. First I made 2 pojo model classes, User and Post.
User.java
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Table("blog_user")
public class User {
@Id
@Column("user_id")
private Long id;
@Column
private String username;
@Column
@JsonIgnore
private String password;
}
Post.java
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Table
public class Post {
@Id
@Column("post_id")
private Long id;
@Column
private String title;
@Column
private String body;
@Column("user_id")
private User user;
}
and relative repository interfaces
public interface UserReactiveMysqlRepository extends ReactiveCrudRepository<User, Long> {
}
public interface PostReactiveMysqlRepository extends ReactiveCrudRepository<Post, Long> {
}
I made the webflux handler class like below, which try to extract user from Mono and put the user value into post class.
@Component
public class PostHandler {
@Autowired
private PostReactiveMysqlRepository postRepository;
@Autowired
private UserReactiveMysqlRepository userRepository;
public Mono<ServerResponse> findAll(ServerRequest request) {
Flux<Post> fluxPost = postRepository.findAll()
.filter(p -> (p.getUser() == null))
.map(p -> {
User u = userRepository.findById(p.getId()).block(); // This line throws Exception.
p.setUser(u);
return p;
});
return ServerResponse.ok().body(fluxPost, Post.class);
}
But the block() api line throws the error messages.
java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.1.jar:3.5.1]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Error has been observed at the following site(s):
*__checkpoint ⇢ Handler com.aaa.blog.wf.router.BlogWebFluxEndpointRouter$$Lambda$1434/0x00000008006ce410@7d628cef [DispatcherHandler]
*__checkpoint ⇢ HTTP GET "/route/post/all" [ExceptionHandlingWebHandler]
Original Stack Trace:
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Mono.block(Mono.java:1710) ~[reactor-core-3.5.1.jar:3.5.1]
at com.aaa.blog.wf.handler.PostHandler.lambda$1(PostHandler.java:47) ~[classes/:na]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:258) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMap$WeakScalarSubscription.request(FluxConcatMap.java:479) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.1.jar:3.5.1]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:296) ~[spring-web-6.0.3.jar:6.0.3]
at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.netty.channel.MonoSendMany$SendManyInner.onSubscribe(MonoSendMany.java:254) ~[reactor-netty-core-1.1.1.jar:1.1.1]
at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.5.1.jar:3.5.1]
at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.subscribe(ChannelSendOperator.java:358) ~[spring-web-6.0.3.jar:6.0.3]
at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.Flux.subscribe(Flux.java:8660) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102) ~[reactor-netty-core-1.1.1.jar:1.1.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.5.1.jar:3.5.1]
at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196) ~[reactor-netty-core-1.1.1.jar:1.1.1]
at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:421) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:941) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]
As you know the default web server of Spring WebFlux is netty. And the netty seems not to support block() api function. Some replies advise to change the web server to tomcat, But I think it can not be the correct solution. How can I extract the user class value from Mono<User>? Or does my code contain the wrong grammar?
blockthat is what the error is telling you. Generally if you are usingblockin a reactive pipeline you are doing it wrong and basically loose the reactive benefits. You should probably usemapinstead ofblockto have access to the user and place it in the post. That being said your code also looks wrong as getting the user with the id of the post doesn't look really right. It will also not return what you expect, it will onlyu return the posts that initially had no user attached, so it isn't reallyfindAll.blockwithmapit isn't harder than that.map/flatMapI always get a headache with those ;). This isn't JPA, this is probably Spring Data R2DBC and not JPA (which wouldn't be reactive in anyway).