@@ -104,9 +104,13 @@ static int* sockets;
104104static int gateway ;
105105static bool send_heartbeat ;
106106static TimeoutId heartbeat_timer ;
107+ static int busy_socket ;
107108
108109static void MtmTransSender (Datum arg );
109110static void MtmTransReceiver (Datum arg );
111+ static void MtmSendHeartbeat (void );
112+ static void MtmCheckHeartbeat (void );
113+
110114
111115
112116static char const * const messageText [] =
@@ -218,17 +222,41 @@ static void MtmDisconnect(int node)
218222 MtmOnNodeDisconnect (node + 1 );
219223}
220224
225+ static int MtmWaitWriteSocket (int sd , time_t timeoutMsec )
226+ {
227+ struct timeval tv ;
228+ fd_set out_set ;
229+ int rc ;
230+ tv .tv_sec = timeoutMsec /1000 ;
231+ tv .tv_usec = timeoutMsec %1000 * 1000 ;
232+ FD_ZERO (& out_set );
233+ FD_SET (sd , & out_set );
234+ do {
235+ MtmCheckHeartbeat ();
236+ } while ((rc = select (sd + 1 , NULL , & out_set , NULL , & tv )) < 0 && errno == EINTR );
237+ return rc ;
238+ }
239+
221240static bool MtmWriteSocket (int sd , void const * buf , int size )
222241{
223242 char * src = (char * )buf ;
243+ busy_socket = sd ;
224244 while (size != 0 ) {
225- int n = send (sd , src , size , 0 );
226- if (n <= 0 ) {
245+ int rc = MtmWaitWriteSocket (sd , MtmHeartbeatSendTimeout );
246+ if (rc == 1 ) {
247+ int n = send (sd , src , size , 0 );
248+ if (n < 0 ) {
249+ busy_socket = -1 ;
250+ return false;
251+ }
252+ size -= n ;
253+ src += n ;
254+ } else if (rc < 0 ) {
255+ busy_socket = -1 ;
227256 return false;
228- }
229- size -= n ;
230- src += n ;
257+ }
231258 }
259+ busy_socket = -1 ;
232260 return true;
233261}
234262
@@ -311,9 +339,10 @@ static void MtmSendHeartbeat()
311339
312340 for (i = 0 ; i < Mtm -> nAllNodes ; i ++ )
313341 {
314- if (sockets [i ] >= 0 && !BIT_CHECK (Mtm -> disabledNodeMask |Mtm -> reconnectMask , i ))
342+ if (sockets [i ] >= 0 && sockets [ i ] != busy_socket && !BIT_CHECK (Mtm -> disabledNodeMask |Mtm -> reconnectMask , i ))
315343 {
316- MtmWriteSocket (sockets [i ], & msg , sizeof (msg ));
344+ int rc = send (sockets [i ], & msg , sizeof (msg ), 0 );
345+ Assert (rc <= 0 || (size_t )rc == sizeof (msg ));
317346 }
318347 }
319348
@@ -327,13 +356,15 @@ static void MtmCheckHeartbeat()
327356 MtmSendHeartbeat ();
328357 }
329358}
330-
359+
331360
332361static int MtmConnectSocket (char const * host , int port , int max_attempts )
333362{
334363 struct sockaddr_in sock_inet ;
335364 unsigned addrs [MAX_ROUTES ];
336365 unsigned i , n_addrs = sizeof (addrs ) / sizeof (addrs [0 ]);
366+ MtmHandshakeMessage req ;
367+ MtmArbiterMessage resp ;
337368 int sd ;
338369
339370 sock_inet .sin_family = AF_INET ;
@@ -347,67 +378,80 @@ static int MtmConnectSocket(char const* host, int port, int max_attempts)
347378 while (1 ) {
348379 int rc = -1 ;
349380
350- sd = socket (AF_INET , SOCK_STREAM , 0 );
381+ sd = socket (AF_INET , SOCK_STREAM | SOCK_NONBLOCK , 0 );
351382 if (sd < 0 ) {
352383 elog (ERROR , "Arbiter failed to create socket: %d" , errno );
353384 }
385+ busy_socket = sd ;
354386 for (i = 0 ; i < n_addrs ; ++ i ) {
355387 memcpy (& sock_inet .sin_addr , & addrs [i ], sizeof sock_inet .sin_addr );
356388 do {
357389 rc = connect (sd , (struct sockaddr * )& sock_inet , sizeof (sock_inet ));
358- MtmCheckHeartbeat ();
359390 } while (rc < 0 && errno == EINTR );
360391
361392 if (rc >= 0 || errno == EINPROGRESS ) {
362393 break ;
363394 }
364395 }
365- if (rc < 0 ) {
366- if ((errno != ENOENT && errno != ECONNREFUSED && errno != EINPROGRESS ) || max_attempts == 0 ) {
367- elog (WARNING , "Arbiter failed to connect to %s:%d: error=%d" , host , port , errno );
368- return -1 ;
369- } else {
370- max_attempts -= 1 ;
371- elog (WARNING , "Arbiter trying to connect to %s:%d: error=%d" , host , port , errno );
372- MtmSleep (MtmConnectTimeout );
373- }
374- continue ;
396+ if (rc == 0 ) {
397+ break ;
398+ }
399+ if (errno != EINPROGRESS || max_attempts == 0 ) {
400+ elog (WARNING , "Arbiter failed to connect to %s:%d: error=%d" , host , port , errno );
401+ busy_socket = -1 ;
402+ return -1 ;
375403 } else {
376- MtmHandshakeMessage req ;
377- MtmArbiterMessage resp ;
378- MtmSetSocketOptions (sd );
379- req .hdr .code = MSG_HANDSHAKE ;
380- req .hdr .node = MtmNodeId ;
381- req .hdr .dxid = HANDSHAKE_MAGIC ;
382- req .hdr .sxid = ShmemVariableCache -> nextXid ;
383- req .hdr .csn = MtmGetCurrentTime ();
384- req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
385- strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
386- if (!MtmWriteSocket (sd , & req , sizeof req )) {
387- elog (WARNING , "Arbiter failed to send handshake message to %s:%d: %d" , host , port , errno );
388- close (sd );
389- goto Retry ;
390- }
391- if (MtmReadSocket (sd , & resp , sizeof resp ) != sizeof (resp )) {
392- elog (WARNING , "Arbiter failed to receive response for handshake message from %s:%d: errno=%d" , host , port , errno );
393- close (sd );
394- goto Retry ;
395- }
396- if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
397- elog (WARNING , "Arbiter get unexpected response %d for handshake message from %s:%d" , resp .code , host , port );
398- close (sd );
399- goto Retry ;
400- }
401-
402- /* Some node considered that I am dead, so switch to recovery mode */
403- if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
404- elog (WARNING , "Node %d thinks that I was dead" , resp .node );
405- BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
406- MtmSwitchClusterMode (MTM_RECOVERY );
404+ rc = MtmWaitWriteSocket (sd , MtmConnectTimeout );
405+ if (rc == 1 ) {
406+ socklen_t optlen = sizeof (int );
407+ if (getsockopt (sd , SOL_SOCKET , SO_ERROR , (void * )& rc , & optlen ) < 0 ) {
408+ elog (WARNING , "Arbiter failed to getsockopt for %s:%d: error=%d" , host , port , errno );
409+ busy_socket = -1 ;
410+ return -1 ;
411+ }
412+ if (rc == 0 ) {
413+ break ;
414+ } else {
415+ elog (WARNING , "Arbiter trying to connect to %s:%d: rc=%d, error=%d" , host , port , rc , errno );
416+ }
417+ } else {
418+ elog (WARNING , "Arbiter waiting socket to %s:%d: rc=%d, error=%d" , host , port , rc , errno );
407419 }
408- return sd ;
420+ max_attempts -= 1 ;
421+ MtmSleep (MSEC_TO_USEC (MtmConnectTimeout ));
409422 }
410- }
423+ }
424+ MtmSetSocketOptions (sd );
425+ req .hdr .code = MSG_HANDSHAKE ;
426+ req .hdr .node = MtmNodeId ;
427+ req .hdr .dxid = HANDSHAKE_MAGIC ;
428+ req .hdr .sxid = ShmemVariableCache -> nextXid ;
429+ req .hdr .csn = MtmGetCurrentTime ();
430+ req .hdr .disabledNodeMask = Mtm -> disabledNodeMask ;
431+ strcpy (req .connStr , Mtm -> nodes [MtmNodeId - 1 ].con .connStr );
432+ if (!MtmWriteSocket (sd , & req , sizeof req )) {
433+ elog (WARNING , "Arbiter failed to send handshake message to %s:%d: %d" , host , port , errno );
434+ close (sd );
435+ goto Retry ;
436+ }
437+ if (MtmReadSocket (sd , & resp , sizeof resp ) != sizeof (resp )) {
438+ elog (WARNING , "Arbiter failed to receive response for handshake message from %s:%d: errno=%d" , host , port , errno );
439+ close (sd );
440+ goto Retry ;
441+ }
442+ if (resp .code != MSG_STATUS || resp .dxid != HANDSHAKE_MAGIC ) {
443+ elog (WARNING , "Arbiter get unexpected response %d for handshake message from %s:%d" , resp .code , host , port );
444+ close (sd );
445+ goto Retry ;
446+ }
447+
448+ /* Some node considered that I am dead, so switch to recovery mode */
449+ if (BIT_CHECK (resp .disabledNodeMask , MtmNodeId - 1 )) {
450+ elog (WARNING , "Node %d thinks that I was dead" , resp .node );
451+ BIT_SET (Mtm -> disabledNodeMask , MtmNodeId - 1 );
452+ MtmSwitchClusterMode (MTM_RECOVERY );
453+ }
454+ return sd ;
411455}
412456
413457
0 commit comments