@@ -60,6 +60,9 @@ static void qs_ExecutorRun(QueryDesc *queryDesc, ScanDirection direction,
6060#endif
6161static void qs_ExecutorFinish (QueryDesc * queryDesc );
6262
63+ static shm_mq_result receive_msg_by_parts (shm_mq_handle * mqh , Size * total ,
64+ void * * datap , bool nowait );
65+
6366/* Global variables */
6467List * QueryDescStack = NIL ;
6568static ProcSignalReason UserIdPollReason = INVALID_PROCSIGNAL ;
@@ -777,7 +780,7 @@ shm_mq_receive_with_timeout(shm_mq_handle *mqh,
777780 {
778781 shm_mq_result mq_receive_result ;
779782
780- mq_receive_result = shm_mq_receive (mqh , nbytesp , datap , true);
783+ mq_receive_result = receive_msg_by_parts (mqh , nbytesp , datap , true);
781784 if (mq_receive_result != SHM_MQ_WOULD_BLOCK )
782785 return mq_receive_result ;
783786 if (rc & WL_TIMEOUT || delay <= 0 )
@@ -960,18 +963,9 @@ copy_msg(shm_mq_msg *msg)
960963 return result ;
961964}
962965
963- // ----------------- DEBUG -----------------
964- static void
965- print_recv_bytes (int num , char * src , int offset )
966- {
967- elog (INFO , "======= RECV MSG SEGMENT START (%d bytes) =======" , num );
968- for (int i = offset ; i < offset + num ; i ++ )
969- elog (INFO , "RECV byte #%d = %02x" , i , (unsigned char ) * (src + i ));
970- }
971- // ----------------- DEBUG -----------------
972-
973966static shm_mq_result
974- shm_mq_receive_by_bytes (shm_mq_handle * mqh , Size * total , void * * datap )
967+ receive_msg_by_parts (shm_mq_handle * mqh , Size * total , void * * datap ,
968+ bool nowait )
975969{
976970 shm_mq_result mq_receive_result ;
977971 shm_mq_msg * buff ;
@@ -982,29 +976,25 @@ shm_mq_receive_by_bytes(shm_mq_handle *mqh, Size *total, void **datap)
982976
983977 /* Get the expected number of bytes in message */
984978 mq_receive_result = shm_mq_receive (mqh , & len , (void * * ) & expected , false);
979+ mq_receive_result = shm_mq_receive (mqh , & len , (void * * ) & expected , nowait );
985980 if (mq_receive_result != SHM_MQ_SUCCESS )
986981 return mq_receive_result ;
987982 Assert (len == sizeof (int ));
988- // elog(INFO, "======= RECV MSG (expecting %d bytes) =======", *expected);
989983
990984 * datap = palloc0 (* expected );
991985
992986 /* Get the message itself */
993- for (offset = 0 , ii = 0 ; offset < * expected ; ii ++ )
987+ for (offset = 0 ; offset < * expected ; )
994988 {
995- // Keep receiving new messages until we assemble the full message
996- mq_receive_result = shm_mq_receive (mqh , & len , ((void * * ) & buff ), false );
989+ /* Keep receiving new messages until we assemble the full message */
990+ mq_receive_result = shm_mq_receive (mqh , & len , ((void * * ) & buff ), nowait );
997991 memcpy ((char * ) * datap + offset , buff , len );
998- // print_recv_bytes(len, (char *) *datap, offset);
999992 offset += len ;
1000993 if (mq_receive_result != SHM_MQ_SUCCESS )
1001994 return mq_receive_result ;
1002995 }
1003996
1004- // elog(INFO, "RECV: END cycle - %d", ii);
1005997 * total = offset ;
1006- // mq_receive_result = shm_mq_receive(mqh, &len, (void **) &msg, false);
1007- // *datap = buff;
1008998
1009999 return mq_receive_result ;
10101000}
@@ -1081,7 +1071,8 @@ GetRemoteBackendQueryStates(PGPROC *leader,
10811071 /* extract query state from leader process */
10821072 mqh = shm_mq_attach (mq , NULL , NULL );
10831073 elog (DEBUG1 , "Wait response from leader %d" , leader -> pid );
1084- mq_receive_result = shm_mq_receive_by_bytes (mqh , & len , ((void * * ) & msg ));
1074+ mq_receive_result = receive_msg_by_parts (mqh , & len , (void * * ) & msg ,
1075+ false);
10851076 if (mq_receive_result != SHM_MQ_SUCCESS )
10861077 goto mq_error ;
10871078 if (msg -> reqid != reqid )
0 commit comments