2626import io .netty .handler .ssl .SslContext ;
2727import io .netty .handler .ssl .SslHandshakeCompletionEvent ;
2828import io .netty .handler .ssl .util .InsecureTrustManagerFactory ;
29+ import io .netty .util .concurrent .*;
30+ import io .netty .util .concurrent .Future ;
2931
3032import java .net .SocketAddress ;
31- import java .util .ArrayList ;
32- import java .util .List ;
33- import java .util .Map ;
34- import java .util .UUID ;
35- import java .util .concurrent .ConcurrentHashMap ;
36- import java .util .concurrent .ConcurrentMap ;
37- import java .util .Queue ;
38- import java .util .concurrent .ArrayBlockingQueue ;
39- import java .util .concurrent .LinkedBlockingDeque ;
33+ import java .util .*;
34+ import java .util .concurrent .*;
4035import java .util .function .Consumer ;
4136
4237import static java .util .Collections .singletonList ;
@@ -70,11 +65,7 @@ public void connect(StartupMessage startup, Consumer<List<Message>> replyTo) {
7065 .channel (NioSocketChannel .class )
7166 .handler (newProtocolInitializer (newStartupHandler (startup , replyTo )))
7267 .connect (address )
73- .addListener (future -> {
74- if (!future .isSuccess ()) {
75- replyTo .accept (singletonList (new ChannelError (future .cause ())));
76- }
77- });
68+ .addListener (errorListener (replyTo ));
7869 }
7970
8071 @ Override
@@ -83,7 +74,7 @@ public void send(Message message, Consumer<List<Message>> replyTo) {
8374 throw new IllegalStateException ("Channel is closed" );
8475 }
8576 addNewReplyHandler (replyTo );
86- ctx .writeAndFlush (message );
77+ ctx .writeAndFlush (message ). addListener ( errorListener ( replyTo )) ;
8778 }
8879
8980 private void addNewReplyHandler (Consumer <List <Message >> replyTo ) {
@@ -98,7 +89,8 @@ public void send(List<Message> messages, Consumer<List<Message>> replyTo) {
9889 throw new IllegalStateException ("Channel is closed" );
9990 }
10091 addNewReplyHandler (replyTo );
101- messages .forEach (ctx ::write );
92+ GenericFutureListener <Future <Object >> errorListener = errorListener (replyTo );
93+ messages .forEach (msg -> ctx .write (msg ).addListener (errorListener ));
10294 ctx .flush ();
10395 }
10496
@@ -151,6 +143,14 @@ void publishNotification(NotificationResponse notification) {
151143 }
152144 }
153145
146+ <T > GenericFutureListener <io .netty .util .concurrent .Future <T >> errorListener (Consumer <List <Message >> replyTo ) {
147+ return future -> {
148+ if (!future .isSuccess ()) {
149+ replyTo .accept (singletonList (new ChannelError (future .cause ())));
150+ }
151+ };
152+ }
153+
154154 ChannelInboundHandlerAdapter newStartupHandler (StartupMessage startup , Consumer <List <Message >> replyTo ) {
155155 return new ChannelInboundHandlerAdapter () {
156156 @ Override
@@ -162,15 +162,15 @@ public void userEventTriggered(ChannelHandlerContext context, Object evt) throws
162162 @ Override
163163 public void channelActive (ChannelHandlerContext context ) {
164164 if (useSsl ) {
165- context .writeAndFlush (SSLHandshake .INSTANCE );
165+ context .writeAndFlush (SSLHandshake .INSTANCE ). addListener ( errorListener ( replyTo )) ;
166166 } else {
167167 startup (context );
168168 }
169169 }
170170 void startup (ChannelHandlerContext context ) {
171171 ctx = context ;
172172 addNewReplyHandler (replyTo );
173- context .writeAndFlush (startup );
173+ context .writeAndFlush (startup ). addListener ( errorListener ( replyTo )) ;
174174 context .pipeline ().remove (this );
175175 }
176176 };
0 commit comments