0

I have developed an integration flow where I get the users from a MongoDbMessageSource and for each social medium associated with the user I get the comments addressed to him.

Those comments I want to persist them in MongoDB with help of MongoDbStoringMessageHandler linked to channel storeChannel.

The flow is as follows:

@Configuration
@IntegrationComponentScan
public class InfrastructureConfiguration {

    private static Logger logger = LoggerFactory.getLogger(InfrastructureConfiguration.class);

    /**
     * The Pollers builder factory can be used to configure common bean definitions or 
     * those created from IntegrationFlowBuilder EIP-methods
     */
    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(10, TimeUnit.SECONDS).get();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);
        executor.setMaxPoolSize(10);
        executor.setQueueCapacity(25);
        return executor;
    }

    /**
     * 
     * MongoDbMessageSource is an instance of MessageSource which returns a Message with a payload 
     * which is the result of execution of a Query
     */
    @Bean
    @Autowired
    public MessageSource<Object> mongoMessageSource(MongoDbFactory mongo) {
        MongoDbMessageSource messageSource = new MongoDbMessageSource(mongo, new LiteralExpression("{}"));
        messageSource.setExpectSingleResult(false);
        messageSource.setEntityClass(UserEntity.class);
        messageSource.setCollectionNameExpression(new LiteralExpression("users"));
        return messageSource;
    }

    @Bean
    @ServiceActivator(inputChannel = "storeChannel")
    public MessageHandler mongodbAdapter(MongoDbFactory mongo) throws Exception {
        MongoDbStoringMessageHandler adapter = new MongoDbStoringMessageHandler(mongo);
        adapter.setCollectionNameExpression(new LiteralExpression("comments"));
        return adapter;
    }

    @Bean
    @Autowired
    public IntegrationFlow processUsers(MongoDbFactory mongo, PollerMetadata poller) {
        return IntegrationFlows.from(mongoMessageSource(mongo), c -> c.poller(poller))
                .<List<UserEntity>, Map<ObjectId, List<SocialMediaEntity>>>transform(userEntitiesList
                        -> userEntitiesList.stream().collect(Collectors.toMap(UserEntity::getId, UserEntity::getSocialMedia))
                )
                .split(new AbstractMessageSplitter() {
                    @Override
                    protected Object splitMessage(Message<?> msg) {
                        return ((Map<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).entrySet();
                    }
                })
                .channel("directChannel_1")
                .enrichHeaders(s -> s.headerExpressions(h -> h.put("user-id", "payload.key")))
                .split(new AbstractMessageSplitter() {
                    @Override
                    protected Object splitMessage(Message<?> msg) {
                        return ((Entry<ObjectId, List<SocialMediaEntity>>) msg.getPayload()).getValue();
                    }
                })
                .channel(MessageChannels.executor("executorChannel", this.taskExecutor()))
                .<SocialMediaEntity, SocialMediaTypeEnum>route(p -> p.getType(),
                        m
                        -> m.subFlowMapping(SocialMediaTypeEnum.FACEBOOK, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
                                @Override
                                public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
                                    ObjectId userId = (ObjectId)headers.get("user-id");
                                    logger.info("TEST FACEBOOK Channel for user id: " + userId);
                                    return Arrays.asList(new CommentEntity[] { 
                                        new CommentEntity("Comentario 1 from facebook dirigido a " + userId, userId),
                                        new CommentEntity("Comentario 2 from facebook dirigido a " + userId, userId)
                                    });
                                }
                            }))
                            .subFlowMapping(SocialMediaTypeEnum.YOUTUBE, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
                                @Override
                                public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
                                    ObjectId userId = (ObjectId)headers.get("user-id");
                                    logger.info("TEST YOUTUBE Channel for user id: " + userId);
                                    return Arrays.asList(new CommentEntity[] { 
                                        new CommentEntity("Comentario 1 from youtube dirigido a " + userId, userId),
                                        new CommentEntity("Comentario 2 from youtube dirigido a " + userId, userId)
                                    });
                                }
                            }))
                            .subFlowMapping(SocialMediaTypeEnum.INSTAGRAM, sf -> sf.handle(new GenericHandler<SocialMediaEntity>() {
                                @Override
                                public Object handle(SocialMediaEntity payload, Map<String, Object> headers) {
                                    ObjectId userId = (ObjectId)headers.get("user-id");
                                    logger.info("TEST INSTAGRAM Channel for user id: " + userId);
                                    return Arrays.asList(new CommentEntity[] { 
                                        new CommentEntity("Comentario 1 from instagram dirigido a " + userId, userId),
                                        new CommentEntity("Comentario 2 from instagram dirigido a " + userId, userId)
                                    });
                                }
                            }))
                )
                .channel("directChannel_2")
                .aggregate()
                .channel("directChannel_3")
                .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
                        comments.stream().flatMap(List::stream).collect(Collectors.toList()))
                .aggregate()
                .channel("directChannel_4")
                .<List<List<CommentEntity>>, List<CommentEntity>>transform(comments -> 
                        comments.stream().flatMap(List::stream).collect(Collectors.toList()))
                .channel("storeChannel")
                .get();
    }

}

The debug messages before the error are these:

2017-07-24 15:43:03.265 DEBUG 15152 --- [ taskExecutor-3] o.s.integration.channel.DirectChannel    : preSend on channel 'storeChannel', message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] ssor$ReplyProducingMessageHandlerWrapper : infrastructureConfiguration.mongodbAdapter.serviceActivator.handler received message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]
2017-07-24 15:43:03.267 DEBUG 15152 --- [ taskExecutor-3] o.s.i.m.o.MongoDbStoringMessageHandler   : mongodbAdapter received message: GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]

Where it is clear that the channel "storeChannel" comes a list of "CommentEntity"

@Document(collection="comments")
public class CommentEntity {

    @Id
    private ObjectId id;

    @Field("message")
    private String message;

    private ObjectId user;

    @PersistenceConstructor
    public CommentEntity(String message, ObjectId user) {
        this.message = message;
        this.user = user;
    }

    public ObjectId getId() {
        return id;
    }

    public void setId(ObjectId id) {
        this.id = id;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public ObjectId getUser() {
        return user;
    }

    public void setUser(ObjectId user) {
        this.user = user;
    }

}

This exception then occurs:

2017-07-24 15:43:03.271 ERROR 15152 --- [ taskExecutor-3] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [mongodbAdapter]; nested exception is java.lang.ClassCastException: com.mongodb.BasicDBObject cannot be cast to com.mongodb.BasicDBList, failedMessage=GenericMessage [payload=[sanchez.sanchez.sergio.persistence.entity.CommentEntity@4de61faa, sanchez.sanchez.sergio.persistence.entity.CommentEntity@587d9f81, sanchez.sanchez.sergio.persistence.entity.CommentEntity@21075b47, sanchez.sanchez.sergio.persistence.entity.CommentEntity@653d282, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4b790cef, sanchez.sanchez.sergio.persistence.entity.CommentEntity@662a5dcd, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1a82309c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@1b99ebf2, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6d1a6380, sanchez.sanchez.sergio.persistence.entity.CommentEntity@13b4363c, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6c5952d0, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3b3e7b7d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3859229, sanchez.sanchez.sergio.persistence.entity.CommentEntity@786af66, sanchez.sanchez.sergio.persistence.entity.CommentEntity@271b5a0e, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3e45e786, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ae0edfb, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6955ab16, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7ae0fb73, sanchez.sanchez.sergio.persistence.entity.CommentEntity@4ed5e239, sanchez.sanchez.sergio.persistence.entity.CommentEntity@6da79744, sanchez.sanchez.sergio.persistence.entity.CommentEntity@39352779, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3a12507d, sanchez.sanchez.sergio.persistence.entity.CommentEntity@51345bc3, sanchez.sanchez.sergio.persistence.entity.CommentEntity@7d95ad, sanchez.sanchez.sergio.persistence.entity.CommentEntity@32ca5648, sanchez.sanchez.sergio.persistence.entity.CommentEntity@616e3510, sanchez.sanchez.sergio.persistence.entity.CommentEntity@53a15bc4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@3aa84ac4, sanchez.sanchez.sergio.persistence.entity.CommentEntity@ed8ac69], headers={sequenceNumber=5, sequenceDetails=[[eda06022-4472-76b2-4ab5-1b24c1929cc2, 5, 5]], mongo_collectionName=users, sequenceSize=5, user-id=5975f9523681ac3b30e547c8, correlationId=eda06022-4472-76b2-4ab5-1b24c1929cc2, id=644a7577-7033-e669-505a-901172364790, timestamp=1500903783265}]

I am currently using an embedded MongoDB:

<dependency>
  <groupId>de.flapdoodle.embed</groupId>
  <artifactId>de.flapdoodle.embed.mongo</artifactId>
</dependency>

Does anyone know I'm doing wrong? Thanks in advance.

1 Answer 1

1

Well, that exception clearly says that MongoDbStoringMessageHandler doesn't support collection for saving:

protected void handleMessageInternal(Message<?> message) throws Exception {
    Assert.isTrue(this.initialized, "This class is not yet initialized. Invoke its afterPropertiesSet() method");
    String collectionName = this.collectionNameExpression.getValue(this.evaluationContext, message, String.class);
    Assert.notNull(collectionName, "'collectionNameExpression' must not evaluate to null");

    Object payload = message.getPayload();

    this.mongoTemplate.save(payload, collectionName);
}

You don't need to .aggregate() to build collections to save. You only can save them with that component only one by one.

I think that should be a good addition to let that component to perform:

/**
 * Insert a mixed Collection of objects into a database collection determining the collection name to use based on the
 * class.
 *
 * @param collectionToSave the list of objects to save.
 */
void insertAll(Collection<? extends Object> objectsToSave);

Please, raise a JIRA on the matter and don't hesitate in contribution!

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for the reply!!. And could you do them through a Spring Data Repository? Would it be elegant?
The Repository isn't related to the Spring Integration goals. This is Messaging protocol-agnostic and that one domain-driven for POJO interaction

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.