@@ -1239,27 +1239,44 @@ dmq_push(DmqDestinationId dest_id, char *stream_name, char *msg)
12391239 resetStringInfo (& buf );
12401240}
12411241
1242+ static bool push_state = false;
1243+ static StringInfoData buf ;
12421244
1243- void
1244- dmq_push_buffer (DmqDestinationId dest_id , char * stream_name , const void * payload , size_t len )
1245+ bool
1246+ dmq_push_buffer (DmqDestinationId dest_id , char * stream_name ,
1247+ const void * payload , size_t len , bool nowait )
12451248{
1246- StringInfoData buf ;
12471249 shm_mq_result res ;
12481250
1249- ensure_outq_handle ();
1251+ if (!push_state )
1252+ {
1253+ ensure_outq_handle ();
12501254
1251- initStringInfo (& buf );
1252- pq_sendbyte (& buf , dest_id );
1253- pq_send_ascii_string (& buf , stream_name );
1254- pq_sendbytes (& buf , payload , len );
1255+ initStringInfo (& buf );
1256+ pq_sendbyte (& buf , dest_id );
1257+ pq_send_ascii_string (& buf , stream_name );
1258+ pq_sendbytes (& buf , payload , len );
12551259
1256- mtm_log (DmqTraceOutgoing , "[DMQ] pushing l=%d '%.*s'" ,
1257- buf .len , buf .len , buf .data );
1260+ mtm_log (DmqTraceOutgoing , "[DMQ] pushing l=%d '%.*s'" ,
1261+ buf .len , buf .len , buf .data );
1262+ }
12581263
12591264 // XXX: use sendv instead
1260- res = shm_mq_send (dmq_local .mq_outh , buf .len , buf .data , false);
1265+ res = shm_mq_send (dmq_local .mq_outh , buf .len , buf .data , nowait );
1266+
1267+ if (res == SHM_MQ_WOULD_BLOCK )
1268+ {
1269+ Assert (nowait == true);
1270+ push_state = true;
1271+ /* Report on full queue. */
1272+ return false;
1273+ }
1274+
12611275 if (res != SHM_MQ_SUCCESS )
12621276 mtm_log (WARNING , "[DMQ] dmq_push: can't send to queue" );
1277+
1278+ push_state = false;
1279+ return true;
12631280}
12641281
12651282static bool
@@ -1467,6 +1484,17 @@ dmq_sender_name(DmqSenderId id)
14671484 return dmq_local .inhandles [id ].name ;
14681485}
14691486
1487+ char *
1488+ dmq_receiver_name (DmqDestinationId dest_id )
1489+ {
1490+ char * recvName ;
1491+
1492+ LWLockAcquire (dmq_state -> lock , LW_SHARED );
1493+ recvName = pstrdup (dmq_state -> destinations [dest_id ].receiver_name );
1494+ LWLockRelease (dmq_state -> lock );
1495+ return recvName ;
1496+ }
1497+
14701498DmqDestinationId
14711499dmq_remote_id (const char * name )
14721500{
@@ -1492,7 +1520,7 @@ dmq_remote_id(const char *name)
14921520 * received. Returns false, if an error is occured.
14931521 *
14941522 * sender_id - identifier of the received message sender.
1495- * msg - buffer that contains received message.
1523+ * msg - pointer to local buffer that contains received message.
14961524 * len - size of received message.
14971525 */
14981526const char *
0 commit comments