@@ -143,10 +143,11 @@ struct shm_mq_handle
143143};
144144
145145static void shm_mq_detach_internal (shm_mq * mq );
146- static shm_mq_result shm_mq_send_bytes (shm_mq_handle * mq , Size nbytes ,
146+ static shm_mq_result shm_mq_send_bytes (shm_mq_handle * mqh , Size nbytes ,
147147 const void * data , bool nowait , Size * bytes_written );
148- static shm_mq_result shm_mq_receive_bytes (shm_mq * mq , Size bytes_needed ,
149- bool nowait , Size * nbytesp , void * * datap );
148+ static shm_mq_result shm_mq_receive_bytes (shm_mq_handle * mqh ,
149+ Size bytes_needed , bool nowait , Size * nbytesp ,
150+ void * * datap );
150151static bool shm_mq_counterparty_gone (shm_mq * mq ,
151152 BackgroundWorkerHandle * handle );
152153static bool shm_mq_wait_internal (shm_mq * mq , PGPROC * * ptr ,
@@ -582,8 +583,14 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
582583 mqh -> mqh_counterparty_attached = true;
583584 }
584585
585- /* Consume any zero-copy data from previous receive operation. */
586- if (mqh -> mqh_consume_pending > 0 )
586+ /*
587+ * If we've consumed an amount of data greater than 1/4th of the ring
588+ * size, mark it consumed in shared memory. We try to avoid doing this
589+ * unnecessarily when only a small amount of data has been consumed,
590+ * because SetLatch() is fairly expensive and we don't want to do it too
591+ * often.
592+ */
593+ if (mqh -> mqh_consume_pending > mq -> mq_ring_size / 4 )
587594 {
588595 shm_mq_inc_bytes_read (mq , mqh -> mqh_consume_pending );
589596 mqh -> mqh_consume_pending = 0 ;
@@ -594,7 +601,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
594601 {
595602 /* Try to receive the message length word. */
596603 Assert (mqh -> mqh_partial_bytes < sizeof (Size ));
597- res = shm_mq_receive_bytes (mq , sizeof (Size ) - mqh -> mqh_partial_bytes ,
604+ res = shm_mq_receive_bytes (mqh , sizeof (Size ) - mqh -> mqh_partial_bytes ,
598605 nowait , & rb , & rawdata );
599606 if (res != SHM_MQ_SUCCESS )
600607 return res ;
@@ -614,13 +621,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
614621 needed = MAXALIGN (sizeof (Size )) + MAXALIGN (nbytes );
615622 if (rb >= needed )
616623 {
617- /*
618- * Technically, we could consume the message length
619- * information at this point, but the extra write to shared
620- * memory wouldn't be free and in most cases we would reap no
621- * benefit.
622- */
623- mqh -> mqh_consume_pending = needed ;
624+ mqh -> mqh_consume_pending += needed ;
624625 * nbytesp = nbytes ;
625626 * datap = ((char * ) rawdata ) + MAXALIGN (sizeof (Size ));
626627 return SHM_MQ_SUCCESS ;
@@ -632,7 +633,7 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
632633 */
633634 mqh -> mqh_expected_bytes = nbytes ;
634635 mqh -> mqh_length_word_complete = true;
635- shm_mq_inc_bytes_read ( mq , MAXALIGN (sizeof (Size ) ));
636+ mqh -> mqh_consume_pending += MAXALIGN (sizeof (Size ));
636637 rb -= MAXALIGN (sizeof (Size ));
637638 }
638639 else
@@ -651,15 +652,15 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
651652 }
652653 Assert (mqh -> mqh_buflen >= sizeof (Size ));
653654
654- /* Copy and consume partial length word. */
655+ /* Copy partial length word; remember to consume it . */
655656 if (mqh -> mqh_partial_bytes + rb > sizeof (Size ))
656657 lengthbytes = sizeof (Size ) - mqh -> mqh_partial_bytes ;
657658 else
658659 lengthbytes = rb ;
659660 memcpy (& mqh -> mqh_buffer [mqh -> mqh_partial_bytes ], rawdata ,
660661 lengthbytes );
661662 mqh -> mqh_partial_bytes += lengthbytes ;
662- shm_mq_inc_bytes_read ( mq , MAXALIGN (lengthbytes ) );
663+ mqh -> mqh_consume_pending += MAXALIGN (lengthbytes );
663664 rb -= lengthbytes ;
664665
665666 /* If we now have the whole word, we're ready to read payload. */
@@ -681,13 +682,13 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
681682 * we need not copy the data and can return a pointer directly into
682683 * shared memory.
683684 */
684- res = shm_mq_receive_bytes (mq , nbytes , nowait , & rb , & rawdata );
685+ res = shm_mq_receive_bytes (mqh , nbytes , nowait , & rb , & rawdata );
685686 if (res != SHM_MQ_SUCCESS )
686687 return res ;
687688 if (rb >= nbytes )
688689 {
689690 mqh -> mqh_length_word_complete = false;
690- mqh -> mqh_consume_pending = MAXALIGN (nbytes );
691+ mqh -> mqh_consume_pending + = MAXALIGN (nbytes );
691692 * nbytesp = nbytes ;
692693 * datap = rawdata ;
693694 return SHM_MQ_SUCCESS ;
@@ -727,21 +728,21 @@ shm_mq_receive(shm_mq_handle *mqh, Size *nbytesp, void **datap, bool nowait)
727728 mqh -> mqh_partial_bytes += rb ;
728729
729730 /*
730- * Update count of bytes read, with alignment padding. Note that this
731- * will never actually insert any padding except at the end of a
732- * message, because the buffer size is a multiple of MAXIMUM_ALIGNOF,
733- * and each read and write is as well.
731+ * Update count of bytes that can be consumed, accounting for
732+ * alignment padding. Note that this will never actually insert any
733+ * padding except at the end of a message, because the buffer size is
734+ * a multiple of MAXIMUM_ALIGNOF, and each read and write is as well.
734735 */
735736 Assert (mqh -> mqh_partial_bytes == nbytes || rb == MAXALIGN (rb ));
736- shm_mq_inc_bytes_read ( mq , MAXALIGN (rb ) );
737+ mqh -> mqh_consume_pending += MAXALIGN (rb );
737738
738739 /* If we got all the data, exit the loop. */
739740 if (mqh -> mqh_partial_bytes >= nbytes )
740741 break ;
741742
742743 /* Wait for some more data. */
743744 still_needed = nbytes - mqh -> mqh_partial_bytes ;
744- res = shm_mq_receive_bytes (mq , still_needed , nowait , & rb , & rawdata );
745+ res = shm_mq_receive_bytes (mqh , still_needed , nowait , & rb , & rawdata );
745746 if (res != SHM_MQ_SUCCESS )
746747 return res ;
747748 if (rb > still_needed )
@@ -1007,9 +1008,10 @@ shm_mq_send_bytes(shm_mq_handle *mqh, Size nbytes, const void *data,
10071008 * is SHM_MQ_SUCCESS.
10081009 */
10091010static shm_mq_result
1010- shm_mq_receive_bytes (shm_mq * mq , Size bytes_needed , bool nowait ,
1011+ shm_mq_receive_bytes (shm_mq_handle * mqh , Size bytes_needed , bool nowait ,
10111012 Size * nbytesp , void * * datap )
10121013{
1014+ shm_mq * mq = mqh -> mqh_queue ;
10131015 Size ringsize = mq -> mq_ring_size ;
10141016 uint64 used ;
10151017 uint64 written ;
@@ -1021,7 +1023,13 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
10211023
10221024 /* Get bytes written, so we can compute what's available to read. */
10231025 written = pg_atomic_read_u64 (& mq -> mq_bytes_written );
1024- read = pg_atomic_read_u64 (& mq -> mq_bytes_read );
1026+
1027+ /*
1028+ * Get bytes read. Include bytes we could consume but have not yet
1029+ * consumed.
1030+ */
1031+ read = pg_atomic_read_u64 (& mq -> mq_bytes_read ) +
1032+ mqh -> mqh_consume_pending ;
10251033 used = written - read ;
10261034 Assert (used <= ringsize );
10271035 offset = read % (uint64 ) ringsize ;
@@ -1052,6 +1060,16 @@ shm_mq_receive_bytes(shm_mq *mq, Size bytes_needed, bool nowait,
10521060 if (mq -> mq_detached )
10531061 return SHM_MQ_DETACHED ;
10541062
1063+ /*
1064+ * We didn't get enough data to satisfy the request, so mark any data
1065+ * previously-consumed as read to make more buffer space.
1066+ */
1067+ if (mqh -> mqh_consume_pending > 0 )
1068+ {
1069+ shm_mq_inc_bytes_read (mq , mqh -> mqh_consume_pending );
1070+ mqh -> mqh_consume_pending = 0 ;
1071+ }
1072+
10551073 /* Skip manipulation of our latch if nowait = true. */
10561074 if (nowait )
10571075 return SHM_MQ_WOULD_BLOCK ;
0 commit comments