0

I've a use case where I need to send messages to an external TCP Server with 2 IP/ports for Round Robin distribution (each connection for each pair of IP/port, and it should be keep-alive for message exchange).

I'm using Spring Integration and I don't have information about the underlying implementation of TCP Server but the message structure consists of a header with 4 bytes of ASCII characters used to specify the length of the message (excluding the header) followed by the message itself.

For example, if a message is 128 bytes long, the header value "0128" will be added to the beginning of the message. Therefore, the actual length of the data being sent is 132 bytes.

The example message will be like this: 0022thisisanexamplemessage

I've follow this post to setup 2 TcpSendingMessageHandler instances with 2 FailoverClientConnectionFactory instances and use the same outboundChannel as the input channel to achieve Round Robin distribution as below (and successfully sent messages to the TCP Server):

@Bean
public MessageChannel outboundChannel() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelOne() {
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
    handler.setConnectionFactory(failoverClientConnectionFactoryOne());
    handler.setClientMode(true);
    return handler;
}

@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelTwo() {
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
    handler.setConnectionFactory(failoverClientConnectionFactoryTwo());
    handler.setClientMode(true);
    return handler;
}

@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryOne() {
    List<AbstractClientConnectionFactory> factories = new ArrayList<>();
    factories.add(tcpNetClientConnectionFactoryOne());
    factories.add(tcpNetClientConnectionFactoryTwo());
    FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setCloseOnRefresh(true);
    cf.setRefreshSharedInterval(10000L);
    return cf;
}

@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryTwo() {
    List<AbstractClientConnectionFactory> factories = new ArrayList<>();
    factories.add(tcpNetClientConnectionFactoryTwo());
    factories.add(tcpNetClientConnectionFactoryOne());
    FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setCloseOnRefresh(true);
    cf.setRefreshSharedInterval(10000L);
    return cf;
}

@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryOne() {
    TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort1);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setSerializer(codec());
    cf.setDeserializer(codec());
    cf.setConnectionTimeout(connectionTimeout);
    return cf;
}


@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryTwo() {
    TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort2);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setSerializer(codec());
    cf.setDeserializer(codec());
    cf.setConnectionTimeout(connectionTimeout);
    return cf;
}

private ByteArrayLengthHeaderSerializer codec() {
    ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer();
    serializer.setMaxMessageSize(8 * 1024);
    serializer.setInclusive(false);
    return serializer;
}

For incoming messages from the TCP Server, I've setup 2 TcpReceivingChannelAdapter instances with 2 FailoverClientConnectionFactory instances as above and use a single inboundChannel as the output channel:

@Bean
public MessageChannel inboundChannel() {
    return new DirectChannel();
}

@Bean
public MessageProducer tcpInboundChannelOne() {
    TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
    inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryOne());
    inboundChannelAdapter.setOutputChannel(inboundChannel());
    return inboundChannelAdapter;
}

@Bean
public MessageProducer tcpInboundChannelTwo() {
    TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
    inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryTwo());
    inboundChannelAdapter.setOutputChannel(inboundChannel());
    return inboundChannelAdapter;
}

@Bean
public IntegrationFlow tcpInboundFlow() {
    return IntegrationFlow.from(inboundChannel())
            .handle(message -> handleMessage(byte[] message.getPayload()))
            .get();
}

private void handleMessage(byte[] payload) {
    // Process incoming message here
    log.info("Received: {}", new String(payload));
}

The problem is I can only send messages to the TCP Server but unable to receive any messages sent from the Server. What have I done wrong? Please help.

7
  • Probably the server does not send messages back to you in the same format. You can turn on DEBUG logging level for the org.springframework.integration.ip category to see what is going on when those TcpReceivingChannelAdapter beans are started. Commented Jul 3, 2024 at 14:51
  • Thank you @ArtemBilan, I have turn on DEBUG logging level and what I've notice is: After clientConnectionFactory open a socket connection and sent message there are 2 DEBUG line: Read 4 bytes, buffer is now at 4 of 4 - [org.springframework.core.log.Accessor.debug(313)] and Message length is 808465973 - [org.springframework.core.log.Accessor.debug(313)], after that it will close the socket connection and come with ERROR line Read exception ... IOException:Message length 808465973 exceeds max message length: 8192 this happened for every message I've sent to the server. Commented Jul 3, 2024 at 15:24
  • What I meant is I've try to send some messages to the TCP Server with the same format but difference time stamp and I got the same Message length 808465973... log ERROR line Read exception ... IOException:Message length 808465973 exceeds max message length: 8192 Commented Jul 3, 2024 at 15:38
  • Right. Because you have serializer.setMaxMessageSize(8 * 1024);. However that does not mean that all those 808465973 bytes belong to the same message. So, or try to adjust the size for deserializer respectively, or consult with your server side what is exactly that 808465973 length means. Commented Jul 3, 2024 at 16:13
  • I just want to clarify the message exchange procedure a little bit: We will open 2 TCP connections with 2 provided IP/Ports from the Server, and we will exchange messages on those open connections (client send request-receive response, or server send request to our client and we response to server). We should keep that 2 connection alive for message exchange. So with the message structure as described above, how do I determine for the incoming messages? Is my config missing anything? Or Any ideas to help me receive incoming messages would be appreciated. Thank you. Commented Jul 3, 2024 at 16:51

1 Answer 1

1

Thanks to @ArtemBilan's suggestion I have create a custom Deserializer that extends AbstractByteArraySerializer and add to the TcpNetCientConnectionFactory then the inboundChannel is able to receives incoming messages.

public class CustomSerializer extends AbstractByteArraySerializer {

    @Override
    public byte[] deserialize(InputStream inputStream) throws IOException {
        byte[] messageHeaderBytes = new byte[4];
        int bytesRead = inputStream.read(messageHeaderBytes);
        if (bytesRead != 4) {
            log.error("Invalid message header length");
            throw new IOException("Invalid message header length");
        }
        int messageLength = Integer.parseInt(new String( messageHeaderBytes, StandardCharsets.US_ASCII));
        byte[] messageBytes = new byte[messageLength];
        bytesRead = inputStream.read(messageBytes);
        if (bytesRead != messageLength) {
            log.error("Invalid message length");
            throw new IOException("Invalid message length");
        }
        return messageBytes;
    }

    @Override
    public void serialize(byte[] bytes, OutputStream outputStream) throws IOException {
        outputStream.write(bytes);
    }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.