1

I have an issue of Spring Webflux project. First I made 2 pojo model classes, User and Post.

User.java

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder 
@Table("blog_user") 
public class User {
     
    @Id
    @Column("user_id")
    private Long id;
    
    @Column
    private String username;
 
    @Column
    @JsonIgnore
    private String password;
}

Post.java

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Table
public class Post {
 
    @Id
    @Column("post_id")
    private Long id;
    
    @Column
    private String title;
  
    @Column
    private String body;
    
    @Column("user_id")
    private User user;
}

and relative repository interfaces

public interface UserReactiveMysqlRepository extends ReactiveCrudRepository<User, Long> {
}

public interface PostReactiveMysqlRepository extends ReactiveCrudRepository<Post, Long> {
}

I made the webflux handler class like below, which try to extract user from Mono and put the user value into post class.

@Component
public class PostHandler {
 
    @Autowired
    private PostReactiveMysqlRepository postRepository;
    
    @Autowired
    private UserReactiveMysqlRepository userRepository;

    public Mono<ServerResponse> findAll(ServerRequest request) {
        Flux<Post> fluxPost = postRepository.findAll()
                .filter(p -> (p.getUser() == null))
                .map(p -> {
                    User u = userRepository.findById(p.getId()).block();  // This line throws Exception.
                    p.setUser(u);
                    return p;
                });
    
        return ServerResponse.ok().body(fluxPost, Post.class);
    }

But the block() api line throws the error messages.

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2
    at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.1.jar:3.5.1]
    Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
    *__checkpoint ⇢ Handler com.aaa.blog.wf.router.BlogWebFluxEndpointRouter$$Lambda$1434/0x00000008006ce410@7d628cef [DispatcherHandler]
    *__checkpoint ⇢ HTTP GET "/route/post/all" [ExceptionHandlingWebHandler]
Original Stack Trace:
        at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:83) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.Mono.block(Mono.java:1710) ~[reactor-core-3.5.1.jar:3.5.1]
        at com.aaa.blog.wf.handler.PostHandler.lambda$1(PostHandler.java:47) ~[classes/:na]
        at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:106) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxFilter$FilterSubscriber.onNext(FluxFilter.java:113) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:345) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.innerNext(FluxConcatMapNoPrefetch.java:258) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:863) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxConcatMap$WeakScalarSubscription.request(FluxConcatMap.java:479) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxConcatMapNoPrefetch$FluxConcatMapNoPrefetchSubscriber.request(FluxConcatMapNoPrefetch.java:338) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.request(FluxUsingWhen.java:319) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxFilter$FilterSubscriber.request(FluxFilter.java:186) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.request(Operators.java:2305) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxConcatArray$ConcatArraySubscriber.request(FluxConcatArray.java:276) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.FluxPeek$PeekSubscriber.request(FluxPeek.java:138) ~[reactor-core-3.5.1.jar:3.5.1]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.request(ChannelSendOperator.java:296) ~[spring-web-6.0.3.jar:6.0.3]
        at reactor.core.publisher.FluxMap$MapSubscriber.request(FluxMap.java:164) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.netty.channel.MonoSendMany$SendManyInner.onSubscribe(MonoSendMany.java:254) ~[reactor-netty-core-1.1.1.jar:1.1.1]
        at reactor.core.publisher.FluxMap$MapSubscriber.onSubscribe(FluxMap.java:92) ~[reactor-core-3.5.1.jar:3.5.1]
        at org.springframework.http.server.reactive.ChannelSendOperator$WriteBarrier.subscribe(ChannelSendOperator.java:358) ~[spring-web-6.0.3.jar:6.0.3]
        at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:67) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.Flux.subscribe(Flux.java:8660) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.netty.channel.MonoSendMany.subscribe(MonoSendMany.java:102) ~[reactor-netty-core-1.1.1.jar:1.1.1]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:240) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onComplete(MonoIgnoreThen.java:203) ~[reactor-core-3.5.1.jar:3.5.1]
        at reactor.netty.FutureMono$FutureSubscription.operationComplete(FutureMono.java:196) ~[reactor-netty-core-1.1.1.jar:1.1.1]
        at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:590) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:583) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:559) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:492) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.setValue0(DefaultPromise.java:636) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.setSuccess0(DefaultPromise.java:625) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:105) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.internal.PromiseNotificationUtil.trySuccess(PromiseNotificationUtil.java:48) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.ChannelOutboundBuffer.safeSuccess(ChannelOutboundBuffer.java:717) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.ChannelOutboundBuffer.remove(ChannelOutboundBuffer.java:272) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.ChannelOutboundBuffer.removeBytes(ChannelOutboundBuffer.java:352) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:421) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush0(AbstractChannel.java:931) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.flush0(AbstractNioChannel.java:354) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannel$AbstractUnsafe.flush(AbstractChannel.java:895) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.flush(DefaultChannelPipeline.java:1372) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:921) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.flush(CombinedChannelDuplexHandler.java:531) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:125) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.CombinedChannelDuplexHandler.flush(CombinedChannelDuplexHandler.java:356) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:907) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:893) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:127) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:923) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:941) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:174) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:167) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:569) ~[netty-transport-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.86.Final.jar:4.1.86.Final]
        at java.base/java.lang.Thread.run(Thread.java:833) ~[na:na]

As you know the default web server of Spring WebFlux is netty. And the netty seems not to support block() api function. Some replies advise to change the web server to tomcat, But I think it can not be the correct solution. How can I extract the user class value from Mono<User>? Or does my code contain the wrong grammar?

6
  • You shouldn't be using block that is what the error is telling you. Generally if you are using block in a reactive pipeline you are doing it wrong and basically loose the reactive benefits. You should probably use map instead of block to have access to the user and place it in the post. That being said your code also looks wrong as getting the user with the id of the post doesn't look really right. It will also not return what you expect, it will onlyu return the posts that initially had no user attached, so it isn't really findAll. Commented Jan 5, 2023 at 10:40
  • Thanks for your reply. Would you inform me of the sample codes, plz? Commented Jan 5, 2023 at 11:43
  • Replace block with map it isn't harder than that. Commented Jan 5, 2023 at 12:35
  • @M.Deinum : you mean flatMap I think. But, I have the impression that you try to do JPA job manually. I think that solving the user relation should be delegated to JPA. Maybe you miss an annotation somewhere. Commented Jan 5, 2023 at 12:44
  • I always use code completion for map/flatMap I always get a headache with those ;). This isn't JPA, this is probably Spring Data R2DBC and not JPA (which wouldn't be reactive in anyway). Commented Jan 5, 2023 at 12:45

1 Answer 1

3

Blocking code is not allowed on reactive schedulers. You need to define a flow using reactive API ‘map/flatMap‘ and return it from your controller.

From what I see PostReactiveMysqlRepository is reactive and userRepository.findById() returns Mono<User> you can use

public Mono<ServerResponse> findAll(ServerRequest request) {
    Flux<Post> fluxPost = postRepository.findAll()
            .filter(p -> (p.getUser() == null))
            .flatMap(p -> 
                    userRepository.findById(p.getId()
                            .map(user -> {
                                p.setUser(user);
                                return p;
                            })
            );

    return ServerResponse.ok().body(fluxPost, Post.class);
}

The key points here are

  • use flatMap instead of map in case operation is async (returns Mono or Flux)
  • use map instead of block to complete the reactive flow
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.