I've a use case where I need to send messages to an external TCP Server with 2 IP/ports for Round Robin distribution (each connection for each pair of IP/port, and it should be keep-alive for message exchange).
I'm using Spring Integration and I don't have information about the underlying implementation of TCP Server but the message structure consists of a header with 4 bytes of ASCII characters used to specify the length of the message (excluding the header) followed by the message itself.
For example, if a message is 128 bytes long, the header value "0128" will be added to the beginning of the message. Therefore, the actual length of the data being sent is 132 bytes.
The example message will be like this: 0022thisisanexamplemessage
I've follow this post to setup 2 TcpSendingMessageHandler instances with 2 FailoverClientConnectionFactory instances and use the same outboundChannel as the input channel to achieve Round Robin distribution as below (and successfully sent messages to the TCP Server):
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelOne() {
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(failoverClientConnectionFactoryOne());
handler.setClientMode(true);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelTwo() {
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(failoverClientConnectionFactoryTwo());
handler.setClientMode(true);
return handler;
}
@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryOne() {
List<AbstractClientConnectionFactory> factories = new ArrayList<>();
factories.add(tcpNetClientConnectionFactoryOne());
factories.add(tcpNetClientConnectionFactoryTwo());
FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setCloseOnRefresh(true);
cf.setRefreshSharedInterval(10000L);
return cf;
}
@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryTwo() {
List<AbstractClientConnectionFactory> factories = new ArrayList<>();
factories.add(tcpNetClientConnectionFactoryTwo());
factories.add(tcpNetClientConnectionFactoryOne());
FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setCloseOnRefresh(true);
cf.setRefreshSharedInterval(10000L);
return cf;
}
@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryOne() {
TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort1);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setSerializer(codec());
cf.setDeserializer(codec());
cf.setConnectionTimeout(connectionTimeout);
return cf;
}
@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryTwo() {
TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort2);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setSerializer(codec());
cf.setDeserializer(codec());
cf.setConnectionTimeout(connectionTimeout);
return cf;
}
private ByteArrayLengthHeaderSerializer codec() {
ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer();
serializer.setMaxMessageSize(8 * 1024);
serializer.setInclusive(false);
return serializer;
}
For incoming messages from the TCP Server, I've setup 2 TcpReceivingChannelAdapter instances with 2 FailoverClientConnectionFactory instances as above and use a single inboundChannel as the output channel:
@Bean
public MessageChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer tcpInboundChannelOne() {
TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryOne());
inboundChannelAdapter.setOutputChannel(inboundChannel());
return inboundChannelAdapter;
}
@Bean
public MessageProducer tcpInboundChannelTwo() {
TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryTwo());
inboundChannelAdapter.setOutputChannel(inboundChannel());
return inboundChannelAdapter;
}
@Bean
public IntegrationFlow tcpInboundFlow() {
return IntegrationFlow.from(inboundChannel())
.handle(message -> handleMessage(byte[] message.getPayload()))
.get();
}
private void handleMessage(byte[] payload) {
// Process incoming message here
log.info("Received: {}", new String(payload));
}
The problem is I can only send messages to the TCP Server but unable to receive any messages sent from the Server. What have I done wrong? Please help.
org.springframework.integration.ipcategory to see what is going on when thoseTcpReceivingChannelAdapterbeans are started.Read 4 bytes, buffer is now at 4 of 4 - [org.springframework.core.log.Accessor.debug(313)]andMessage length is 808465973 - [org.springframework.core.log.Accessor.debug(313)], after that it will close the socket connection and come with ERROR lineRead exception ... IOException:Message length 808465973 exceeds max message length: 8192this happened for every message I've sent to the server.Read exception ... IOException:Message length 808465973 exceeds max message length: 8192serializer.setMaxMessageSize(8 * 1024);. However that does not mean that all those808465973bytes belong to the same message. So, or try to adjust the size for deserializer respectively, or consult with your server side what is exactly that808465973length means.