@@ -324,6 +324,19 @@ dmq_sender_at_exit(int status, Datum arg)
324324 LWLockRelease (dmq_state -> lock );
325325}
326326
327+ static void
328+ switch_destination_state (DmqDestinationId dest_id , DmqConnState state )
329+ {
330+ DmqDestination * dest ;
331+
332+ LWLockAcquire (dmq_state -> lock , LW_EXCLUSIVE );
333+ dest = & (dmq_state -> destinations [dest_id ]);
334+ Assert (dest -> active );
335+
336+ dest -> state = state ;
337+ LWLockRelease (dmq_state -> lock );
338+ }
339+
327340void
328341dmq_sender_main (Datum main_arg )
329342{
@@ -402,6 +415,7 @@ dmq_sender_main(Datum main_arg)
402415 conns [i ] = * dest ;
403416 Assert (conns [i ].pgconn == NULL );
404417 conns [i ].state = Idle ;
418+ dest -> state = Idle ;
405419 prev_timer_at = 0 ; /* do not wait for timer event */
406420 }
407421 /* close connection to deleted destination */
@@ -443,6 +457,7 @@ dmq_sender_main(Datum main_arg)
443457 {
444458 // Assert(PQstatus(conns[conn_id].pgconn) != CONNECTION_OK);
445459 conns [conn_id ].state = Idle ;
460+ switch_destination_state (conn_id , Idle );
446461 // DeleteWaitEvent(set, conns[conn_id].pos);
447462
448463 mtm_log (DmqStateFinal ,
@@ -532,6 +547,7 @@ dmq_sender_main(Datum main_arg)
532547 if (PQstatus (conns [conn_id ].pgconn ) == CONNECTION_BAD )
533548 {
534549 conns [conn_id ].state = Idle ;
550+ switch_destination_state (conn_id , Idle );
535551
536552 mtm_log (DmqStateIntermediate ,
537553 "[DMQ] failed to start connection with %s (%s): %s" ,
@@ -542,6 +558,7 @@ dmq_sender_main(Datum main_arg)
542558 else
543559 {
544560 conns [conn_id ].state = Connecting ;
561+ switch_destination_state (conn_id , Connecting );
545562 conns [conn_id ].pos = AddWaitEventToSet (set , WL_SOCKET_CONNECTED ,
546563 PQsocket (conns [conn_id ].pgconn ),
547564 NULL , (void * ) conn_id );
@@ -559,6 +576,7 @@ dmq_sender_main(Datum main_arg)
559576 if (ret < 0 )
560577 {
561578 conns [conn_id ].state = Idle ;
579+ switch_destination_state (conn_id , Idle );
562580 // DeleteWaitEvent(set, conns[conn_id].pos);
563581 // Assert(PQstatus(conns[i].pgconn) != CONNECTION_OK);
564582
@@ -622,6 +640,7 @@ dmq_sender_main(Datum main_arg)
622640 sender_name );
623641
624642 conns [conn_id ].state = Negotiating ;
643+ switch_destination_state (conn_id , Negotiating );
625644 ModifyWaitEvent (set , event .pos , WL_SOCKET_READABLE , NULL );
626645 PQsendQuery (conns [conn_id ].pgconn , query );
627646
@@ -632,6 +651,7 @@ dmq_sender_main(Datum main_arg)
632651 else if (status == PGRES_POLLING_FAILED )
633652 {
634653 conns [conn_id ].state = Idle ;
654+ switch_destination_state (conn_id , Idle );
635655 DeleteWaitEvent (set , event .pos );
636656
637657 mtm_log (DmqStateIntermediate ,
@@ -655,6 +675,7 @@ dmq_sender_main(Datum main_arg)
655675 if (!PQconsumeInput (conns [conn_id ].pgconn ))
656676 {
657677 conns [conn_id ].state = Idle ;
678+ switch_destination_state (conn_id , Idle );
658679 DeleteWaitEvent (set , event .pos );
659680
660681 mtm_log (DmqStateIntermediate ,
@@ -665,6 +686,7 @@ dmq_sender_main(Datum main_arg)
665686 if (!PQisBusy (conns [conn_id ].pgconn ))
666687 {
667688 conns [conn_id ].state = Active ;
689+ switch_destination_state (conn_id , Active );
668690 DeleteWaitEvent (set , event .pos );
669691
670692 mtm_log (DmqStateFinal ,
@@ -679,6 +701,7 @@ dmq_sender_main(Datum main_arg)
679701 if (!PQconsumeInput (conns [conn_id ].pgconn ))
680702 {
681703 conns [conn_id ].state = Idle ;
704+ switch_destination_state (conn_id , Idle );
682705
683706 mtm_log (DmqStateFinal ,
684707 "[DMQ] connection error with %s: %s" ,
@@ -1645,6 +1668,31 @@ dmq_destination_add(char *connstr, char *sender_name, char *receiver_name,
16451668 return dest_id ;
16461669}
16471670
1671+ /*
1672+ * Check availability of destination node.
1673+ * It is needed before sending process to prevent data loss.
1674+ */
1675+ DmqConnState
1676+ dmq_get_destination_status (DmqDestinationId dest_id )
1677+ {
1678+ DmqConnState state ;
1679+
1680+ if ((dest_id < 0 ) || (dest_id >= DMQ_MAX_DESTINATIONS ))
1681+ return -2 ;
1682+
1683+ LWLockAcquire (dmq_state -> lock , LW_EXCLUSIVE );
1684+ DmqDestination * dest = & (dmq_state -> destinations [dest_id ]);
1685+ if (!dest -> active )
1686+ {
1687+ LWLockRelease (dmq_state -> lock );
1688+ return -1 ;
1689+ }
1690+
1691+ state = dest -> state ;
1692+ LWLockRelease (dmq_state -> lock );
1693+ return state ;
1694+ }
1695+
16481696void
16491697dmq_destination_drop (char * receiver_name )
16501698{
0 commit comments