4545#include "storage/pmsignal.h"
4646#include "storage/proc.h"
4747#include "utils/syscache.h"
48+ #include "utils/lsyscache.h"
4849#include "replication/walsender.h"
4950#include "replication/walsender_private.h"
5051#include "replication/slot.h"
5354#include "nodes/makefuncs.h"
5455#include "access/htup_details.h"
5556#include "catalog/indexing.h"
57+ #include "catalog/namespace.h"
5658#include "pglogical_output/hooks.h"
5759
5860#include "multimaster.h"
@@ -105,6 +107,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
105107PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
106108PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
107109PG_FUNCTION_INFO_V1 (mtm_get_cluster_state );
110+ PG_FUNCTION_INFO_V1 (mtm_make_table_local );
108111
109112static Snapshot MtmGetSnapshot (Snapshot snapshot );
110113static void MtmInitialize (void );
@@ -135,6 +138,7 @@ MtmState* Mtm;
135138
136139HTAB * MtmXid2State ;
137140static HTAB * MtmGid2State ;
141+ static HTAB * MtmLocalTables ;
138142
139143static MtmCurrentTrans MtmTx ;
140144
@@ -176,11 +180,12 @@ bool MtmUseRaftable;
176180MtmConnectionInfo * MtmConnections ;
177181
178182static char * MtmConnStrs ;
179- static int MtmQueueSize ;
180- static int MtmWorkers ;
181- static int MtmVacuumDelay ;
182- static int MtmMinRecoveryLag ;
183- static int MtmMaxRecoveryLag ;
183+ static int MtmQueueSize ;
184+ static int MtmWorkers ;
185+ static int MtmVacuumDelay ;
186+ static int MtmMinRecoveryLag ;
187+ static int MtmMaxRecoveryLag ;
188+ static bool MtmIgnoreTablesWithoutPk ;
184189
185190static ExecutorFinish_hook_type PreviousExecutorFinishHook ;
186191static ProcessUtility_hook_type PreviousProcessUtilityHook ;
@@ -1280,6 +1285,71 @@ MtmCreateGidMap(void)
12801285 return htab ;
12811286}
12821287
1288+ static HTAB *
1289+ MtmCreateLocalTableMap (void )
1290+ {
1291+ HASHCTL info ;
1292+ HTAB * htab ;
1293+ memset (& info , 0 , sizeof (info ));
1294+ info .keysize = sizeof (Oid );
1295+ htab = ShmemInitHash (
1296+ "MtmLocalTables" ,
1297+ MULTIMASTER_MAX_LOCAL_TABLES , MULTIMASTER_MAX_LOCAL_TABLES ,
1298+ & info ,
1299+ 0
1300+ );
1301+ return htab ;
1302+ }
1303+
1304+ static void MtmMakeRelationLocal (Oid relid )
1305+ {
1306+ if (OidIsValid (relid )) {
1307+ MtmLock (LW_EXCLUSIVE );
1308+ hash_search (MtmLocalTables , & relid , HASH_ENTER , NULL );
1309+ MtmUnlock ();
1310+ }
1311+ }
1312+
1313+
1314+ void MtmMakeTableLocal (char * schema , char * name )
1315+ {
1316+ RangeVar * rv = makeRangeVar (schema , name , -1 );
1317+ Oid relid = RangeVarGetRelid (rv , NoLock , true);
1318+ MtmMakeRelationLocal (relid );
1319+ }
1320+
1321+
1322+ typedef struct {
1323+ NameData schema ;
1324+ NameData name ;
1325+ } MtmLocalTablesTuple ;
1326+
1327+ static void MtmLoadLocalTables (void )
1328+ {
1329+ RangeVar * rv ;
1330+ Relation rel ;
1331+ SysScanDesc scan ;
1332+ HeapTuple tuple ;
1333+
1334+ Assert (IsTransactionState ());
1335+
1336+ rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME , MULTIMASTER_LOCAL_TABLES_TABLE , -1 );
1337+ rel = heap_openrv_extended (rv , RowExclusiveLock , true);
1338+ if (rel != NULL ) {
1339+ scan = systable_beginscan (rel , 0 , true, NULL , 0 , NULL );
1340+
1341+ while (HeapTupleIsValid (tuple = systable_getnext (scan )))
1342+ {
1343+ MtmLocalTablesTuple * t = (MtmLocalTablesTuple * ) GETSTRUCT (tuple );
1344+ MtmMakeTableLocal (NameStr (t -> schema ), NameStr (t -> name ));
1345+ }
1346+
1347+ systable_endscan (scan );
1348+ heap_close (rel , RowExclusiveLock );
1349+ }
1350+ }
1351+
1352+
12831353static void MtmInitialize ()
12841354{
12851355 bool found ;
@@ -1309,6 +1379,7 @@ static void MtmInitialize()
13091379 Mtm -> nReceivers = 0 ;
13101380 Mtm -> timeShift = 0 ;
13111381 Mtm -> transCount = 0 ;
1382+ Mtm -> localTablesHashLoaded = false;
13121383 for (i = 0 ; i < MtmNodes ; i ++ ) {
13131384 Mtm -> nodes [i ].oldestSnapshot = 0 ;
13141385 Mtm -> nodes [i ].transDelay = 0 ;
@@ -1324,6 +1395,7 @@ static void MtmInitialize()
13241395 }
13251396 MtmXid2State = MtmCreateXidMap ();
13261397 MtmGid2State = MtmCreateGidMap ();
1398+ MtmLocalTables = MtmCreateLocalTableMap ();
13271399 MtmDoReplication = true;
13281400 TM = & MtmTM ;
13291401 LWLockRelease (AddinShmemInitLock );
@@ -1476,6 +1548,19 @@ _PG_init(void)
14761548 NULL
14771549 );
14781550
1551+ DefineCustomBoolVariable (
1552+ "multimaster.ignore_tables_without_pk" ,
1553+ "Do not replicate tables withpout primary key" ,
1554+ NULL ,
1555+ & MtmIgnoreTablesWithoutPk ,
1556+ false,
1557+ PGC_BACKEND ,
1558+ 0 ,
1559+ NULL ,
1560+ NULL ,
1561+ NULL
1562+ );
1563+
14791564 DefineCustomIntVariable (
14801565 "multimaster.workers" ,
14811566 "Number of multimaster executor workers per node" ,
@@ -1805,11 +1890,30 @@ MtmReplicationTxnFilterHook(struct PGLogicalTxnFilterArgs* args)
18051890 return res ;
18061891}
18071892
1893+ static bool
1894+ MtmReplicationRowFilterHook (struct PGLogicalRowFilterArgs * args )
1895+ {
1896+ bool isDistributed ;
1897+ MtmLock (LW_SHARED );
1898+ if (!Mtm -> localTablesHashLoaded ) {
1899+ MtmUnlock ();
1900+ MtmLock (LW_EXCLUSIVE );
1901+ if (!Mtm -> localTablesHashLoaded ) {
1902+ MtmLoadLocalTables ();
1903+ Mtm -> localTablesHashLoaded = true;
1904+ }
1905+ }
1906+ isDistributed = hash_search (MtmLocalTables , & RelationGetRelid (args -> changed_rel ), HASH_FIND , NULL ) == NULL ;
1907+ MtmUnlock ();
1908+ return isDistributed ;
1909+ }
1910+
18081911void MtmSetupReplicationHooks (struct PGLogicalHooks * hooks )
18091912{
18101913 hooks -> startup_hook = MtmReplicationStartupHook ;
18111914 hooks -> shutdown_hook = MtmReplicationShutdownHook ;
18121915 hooks -> txn_filter_hook = MtmReplicationTxnFilterHook ;
1916+ hooks -> row_filter_hook = MtmReplicationRowFilterHook ;
18131917}
18141918
18151919
@@ -1936,6 +2040,52 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
19362040 PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc , values , nulls )));
19372041}
19382042
2043+
2044+ Datum mtm_make_table_local (PG_FUNCTION_ARGS )
2045+ {
2046+ Oid reloid = PG_GETARG_OID (1 );
2047+ RangeVar * rv ;
2048+ Relation rel ;
2049+ TupleDesc tupDesc ;
2050+ HeapTuple tup ;
2051+ Datum values [Natts_mtm_local_tables ];
2052+ bool nulls [Natts_mtm_local_tables ];
2053+
2054+ MtmMakeRelationLocal (reloid );
2055+
2056+ rv = makeRangeVar (MULTIMASTER_SCHEMA_NAME , MULTIMASTER_LOCAL_TABLES_TABLE , -1 );
2057+ rel = heap_openrv (rv , RowExclusiveLock );
2058+ if (rel != NULL ) {
2059+ char * tableName = get_rel_name (reloid );
2060+ Oid schemaid = get_rel_namespace (reloid );
2061+ char * schemaName = get_namespace_name (schemaid );
2062+
2063+ tupDesc = RelationGetDescr (rel );
2064+
2065+ /* Form a tuple. */
2066+ memset (nulls , false, sizeof (nulls ));
2067+
2068+ values [Anum_mtm_local_tables_rel_schema - 1 ] = CStringGetTextDatum (schemaName );
2069+ values [Anum_mtm_local_tables_rel_name - 1 ] = CStringGetTextDatum (tableName );
2070+
2071+ tup = heap_form_tuple (tupDesc , values , nulls );
2072+
2073+ /* Insert the tuple to the catalog. */
2074+ simple_heap_insert (rel , tup );
2075+
2076+ /* Update the indexes. */
2077+ CatalogUpdateIndexes (rel , tup );
2078+
2079+ /* Cleanup. */
2080+ heap_freetuple (tup );
2081+ heap_close (rel , RowExclusiveLock );
2082+
2083+ MtmTx .containsDML = true;
2084+ }
2085+ return false;
2086+ }
2087+
2088+
19392089/*
19402090 * -------------------------------------------
19412091 * Broadcast utulity statements
@@ -2248,10 +2398,17 @@ MtmExecutorFinish(QueryDesc *queryDesc)
22482398 if (estate -> es_processed != 0 && (operation == CMD_INSERT || operation == CMD_UPDATE || operation == CMD_DELETE )) {
22492399 int i ;
22502400 for (i = 0 ; i < estate -> es_num_result_relations ; i ++ ) {
2251- if (RelationNeedsWAL (estate -> es_result_relations [i ].ri_RelationDesc )) {
2401+ Relation rel = estate -> es_result_relations [i ].ri_RelationDesc ;
2402+ if (RelationNeedsWAL (rel )) {
22522403 MtmTx .containsDML = true;
22532404 break ;
22542405 }
2406+ if (MtmIgnoreTablesWithoutPk ) {
2407+ if (!rel -> rd_indexvalid ) {
2408+ RelationGetIndexList (rel );
2409+ }
2410+ MtmMakeRelationLocal (rel -> rd_replidindex );
2411+ }
22552412 }
22562413 }
22572414 }
0 commit comments