@@ -271,6 +271,7 @@ int debug_discard_caches = 0;
271271
272272#define MAX_SYSCACHE_CALLBACKS 64
273273#define MAX_RELCACHE_CALLBACKS 10
274+ #define MAX_RELSYNC_CALLBACKS 10
274275
275276static struct SYSCACHECALLBACK
276277{
@@ -292,6 +293,15 @@ static struct RELCACHECALLBACK
292293
293294static int relcache_callback_count = 0 ;
294295
296+ static struct RELSYNCCALLBACK
297+ {
298+ RelSyncCallbackFunction function ;
299+ Datum arg ;
300+ } relsync_callback_list [MAX_RELSYNC_CALLBACKS ];
301+
302+ static int relsync_callback_count = 0 ;
303+
304+
295305/* ----------------------------------------------------------------
296306 * Invalidation subgroup support functions
297307 * ----------------------------------------------------------------
@@ -484,6 +494,36 @@ AddRelcacheInvalidationMessage(InvalidationMsgsGroup *group,
484494 AddInvalidationMessage (group , RelCacheMsgs , & msg );
485495}
486496
497+ /*
498+ * Add a relsync inval entry
499+ *
500+ * We put these into the relcache subgroup for simplicity. This message is the
501+ * same as AddRelcacheInvalidationMessage() except that it is for
502+ * RelationSyncCache maintained by decoding plugin pgoutput.
503+ */
504+ static void
505+ AddRelsyncInvalidationMessage (InvalidationMsgsGroup * group ,
506+ Oid dbId , Oid relId )
507+ {
508+ SharedInvalidationMessage msg ;
509+
510+ /* Don't add a duplicate item. */
511+ ProcessMessageSubGroup (group , RelCacheMsgs ,
512+ if (msg -> rc .id == SHAREDINVALRELSYNC_ID &&
513+ (msg -> rc .relId == relId ||
514+ msg -> rc .relId == InvalidOid ))
515+ return );
516+
517+ /* OK, add the item */
518+ msg .rc .id = SHAREDINVALRELSYNC_ID ;
519+ msg .rc .dbId = dbId ;
520+ msg .rc .relId = relId ;
521+ /* check AddCatcacheInvalidationMessage() for an explanation */
522+ VALGRIND_MAKE_MEM_DEFINED (& msg , sizeof (msg ));
523+
524+ AddInvalidationMessage (group , RelCacheMsgs , & msg );
525+ }
526+
487527/*
488528 * Add a snapshot inval entry
489529 *
@@ -611,6 +651,17 @@ RegisterRelcacheInvalidation(InvalidationInfo *info, Oid dbId, Oid relId)
611651 info -> RelcacheInitFileInval = true;
612652}
613653
654+ /*
655+ * RegisterRelsyncInvalidation
656+ *
657+ * As above, but register a relsynccache invalidation event.
658+ */
659+ static void
660+ RegisterRelsyncInvalidation (InvalidationInfo * info , Oid dbId , Oid relId )
661+ {
662+ AddRelsyncInvalidationMessage (& info -> CurrentCmdInvalidMsgs , dbId , relId );
663+ }
664+
614665/*
615666 * RegisterSnapshotInvalidation
616667 *
@@ -751,6 +802,13 @@ InvalidateSystemCachesExtended(bool debug_discard)
751802
752803 ccitem -> function (ccitem -> arg , InvalidOid );
753804 }
805+
806+ for (i = 0 ; i < relsync_callback_count ; i ++ )
807+ {
808+ struct RELSYNCCALLBACK * ccitem = relsync_callback_list + i ;
809+
810+ ccitem -> function (ccitem -> arg , InvalidOid );
811+ }
754812}
755813
756814/*
@@ -832,6 +890,12 @@ LocalExecuteInvalidationMessage(SharedInvalidationMessage *msg)
832890 else if (msg -> sn .dbId == MyDatabaseId )
833891 InvalidateCatalogSnapshot ();
834892 }
893+ else if (msg -> id == SHAREDINVALRELSYNC_ID )
894+ {
895+ /* We only care about our own database */
896+ if (msg -> rs .dbId == MyDatabaseId )
897+ CallRelSyncCallbacks (msg -> rs .relid );
898+ }
835899 else
836900 elog (FATAL , "unrecognized SI message ID: %d" , msg -> id );
837901}
@@ -1621,6 +1685,32 @@ CacheInvalidateRelcacheByRelid(Oid relid)
16211685 ReleaseSysCache (tup );
16221686}
16231687
1688+ /*
1689+ * CacheInvalidateRelSync
1690+ * Register invalidation of the cache in logical decoding output plugin
1691+ * for a database.
1692+ *
1693+ * This type of invalidation message is used for the specific purpose of output
1694+ * plugins. Processes which do not decode WALs would do nothing even when it
1695+ * receives the message.
1696+ */
1697+ void
1698+ CacheInvalidateRelSync (Oid relid )
1699+ {
1700+ RegisterRelsyncInvalidation (PrepareInvalidationState (),
1701+ MyDatabaseId , relid );
1702+ }
1703+
1704+ /*
1705+ * CacheInvalidateRelSyncAll
1706+ * Register invalidation of the whole cache in logical decoding output
1707+ * plugin.
1708+ */
1709+ void
1710+ CacheInvalidateRelSyncAll (void )
1711+ {
1712+ CacheInvalidateRelSync (InvalidOid );
1713+ }
16241714
16251715/*
16261716 * CacheInvalidateSmgr
@@ -1763,6 +1853,27 @@ CacheRegisterRelcacheCallback(RelcacheCallbackFunction func,
17631853 ++ relcache_callback_count ;
17641854}
17651855
1856+ /*
1857+ * CacheRegisterRelSyncCallback
1858+ * Register the specified function to be called for all future
1859+ * relsynccache invalidation events.
1860+ *
1861+ * This function is intended to be call from the logical decoding output
1862+ * plugins.
1863+ */
1864+ void
1865+ CacheRegisterRelSyncCallback (RelSyncCallbackFunction func ,
1866+ Datum arg )
1867+ {
1868+ if (relsync_callback_count >= MAX_RELSYNC_CALLBACKS )
1869+ elog (FATAL , "out of relsync_callback_list slots" );
1870+
1871+ relsync_callback_list [relsync_callback_count ].function = func ;
1872+ relsync_callback_list [relsync_callback_count ].arg = arg ;
1873+
1874+ ++ relsync_callback_count ;
1875+ }
1876+
17661877/*
17671878 * CallSyscacheCallbacks
17681879 *
@@ -1788,6 +1899,20 @@ CallSyscacheCallbacks(int cacheid, uint32 hashvalue)
17881899 }
17891900}
17901901
1902+ /*
1903+ * CallSyscacheCallbacks
1904+ */
1905+ void
1906+ CallRelSyncCallbacks (Oid relid )
1907+ {
1908+ for (int i = 0 ; i < relsync_callback_count ; i ++ )
1909+ {
1910+ struct RELSYNCCALLBACK * ccitem = relsync_callback_list + i ;
1911+
1912+ ccitem -> function (ccitem -> arg , relid );
1913+ }
1914+ }
1915+
17911916/*
17921917 * LogLogicalInvalidations
17931918 *
0 commit comments