155 if (
state == SUBREL_STATE_UNKNOWN)
158 if (
state == expected_state)
172 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
214 if (worker && worker->
proc)
226 1000L, WAIT_EVENT_LOGICAL_SYNC_STATE_CHANGE);
288 sizeof(syncslotname));
370 struct tablesync_start_time_mapping
378 bool should_exit =
false;
395 ctl.keysize =
sizeof(
Oid);
396 ctl.entrysize =
sizeof(
struct tablesync_start_time_mapping);
426 if (rstate->
state == SUBREL_STATE_SYNCDONE)
433 if (current_lsn >= rstate->
lsn)
437 rstate->
state = SUBREL_STATE_READY;
438 rstate->
lsn = current_lsn;
486 rstate->
relid,
false);
494 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
500 syncworker->
relstate = SUBREL_STATE_CATCHUP;
507 if (rstate->
state == SUBREL_STATE_SYNCWAIT)
510 if (syncworker->
proc)
544 SUBREL_STATE_SYNCDONE);
558 struct tablesync_start_time_mapping *hentry;
567 hentry->last_start_time = 0;
570 rstate->
relid, &hentry->last_start_time);
600 (
errmsg(
"logical replication apply worker for subscription \"%s\" will restart so that two_phase can be enabled",
633 attnamelist =
lappend(attnamelist,
663 while (maxread > 0 && bytesread < minread)
691 outbuf = (
char *) outbuf + avail;
697 if (maxread <= 0 || bytesread >= minread)
707 fd, 1000L, WAIT_EVENT_LOGICAL_SYNC_DATA);
726 List **qual,
bool *gencol_published)
731 Oid tableRow[] = {OIDOID, CHAROID, CHAROID};
732 Oid attrRow[] = {INT2OID, TEXTOID, OIDOID, BOOLOID, BOOLOID};
733 Oid qualRow[] = {TEXTOID};
746 " FROM pg_catalog.pg_class c"
747 " INNER JOIN pg_catalog.pg_namespace n"
748 " ON (c.relnamespace = n.oid)"
749 " WHERE n.nspname = %s"
750 " AND c.relname = %s",
758 (
errcode(ERRCODE_CONNECTION_FAILURE),
759 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
765 (
errcode(ERRCODE_UNDEFINED_OBJECT),
766 errmsg(
"table \"%s.%s\" not found on publisher",
790 Oid attrsRow[] = {INT2VECTOROID};
803 " (CASE WHEN (array_length(gpt.attrs, 1) = c.relnatts)"
804 " THEN NULL ELSE gpt.attrs END)"
805 " FROM pg_publication p,"
806 " LATERAL pg_get_publication_tables(p.pubname) gpt,"
808 " WHERE gpt.relid = %u AND c.oid = gpt.relid"
809 " AND p.pubname IN ( %s )",
818 (
errcode(ERRCODE_CONNECTION_FAILURE),
819 errmsg(
"could not fetch column list info for table \"%s.%s\" from publisher: %s",
832 errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
833 errmsg(
"cannot use different column lists for table \"%s.%s\" in different publications",
857 for (natt = 0; natt < nelems; natt++)
876 " a.attnum = ANY(i.indkey)");
883 " FROM pg_catalog.pg_attribute a"
884 " LEFT JOIN pg_catalog.pg_index i"
885 " ON (i.indexrelid = pg_get_replica_identity_index(%u))"
886 " WHERE a.attnum > 0::pg_catalog.int2"
887 " AND NOT a.attisdropped %s"
888 " AND a.attrelid = %u"
889 " ORDER BY a.attnum",
892 "AND a.attgenerated = ''" :
""),
899 (
errcode(ERRCODE_CONNECTION_FAILURE),
900 errmsg(
"could not fetch table info for table \"%s.%s\" from publisher: %s",
948 elog(
ERROR,
"too many columns in remote table \"%s.%s\"",
981 Assert(pub_names != NULL);
986 "SELECT DISTINCT pg_get_expr(gpt.qual, gpt.relid)"
987 " FROM pg_publication p,"
988 " LATERAL pg_get_publication_tables(p.pubname) gpt"
989 " WHERE gpt.relid = %u"
990 " AND p.pubname IN ( %s )",
998 (
errmsg(
"could not fetch table WHERE clause info for table \"%s.%s\" from publisher: %s",
1054 bool gencol_published =
false;
1072 if ((lrel.
relkind == RELKIND_RELATION || lrel.
relkind == RELKIND_PARTITIONED_TABLE)
1073 && qual ==
NIL && !gencol_published)
1087 for (
int i = 0;
i < lrel.
natts;
i++)
1114 for (
int i = 0;
i < lrel.
natts;
i++)
1127 if (lrel.
relkind == RELKIND_RELATION)
1165 (
errcode(ERRCODE_CONNECTION_FAILURE),
1166 errmsg(
"could not start initial contents copy for table \"%s.%s\": %s",
1174 NULL,
false,
false);
1204 char *syncslotname,
Size szslot)
1231 bool must_use_password;
1256 case SUBREL_STATE_SYNCDONE:
1257 case SUBREL_STATE_READY:
1258 case SUBREL_STATE_UNKNOWN:
1280 (
errcode(ERRCODE_CONNECTION_FAILURE),
1281 errmsg(
"table synchronization worker for subscription \"%s\" could not connect to the publisher: %s",
1292 sizeof(originname));
1328 goto copy_table_done;
1362 "BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ",
1366 (
errcode(ERRCODE_CONNECTION_FAILURE),
1367 errmsg(
"table copy could not start transaction on publisher: %s",
1377 slotname,
false ,
false ,
1410 errmsg(
"replication origin \"%s\" already exists",
1442 (
errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1443 errmsg(
"user \"%s\" cannot replicate into relation with row-level security enabled: \"%s\"",
1455 (
errcode(ERRCODE_CONNECTION_FAILURE),
1456 errmsg(
"table copy could not finish transaction on publisher: %s",
1474 SUBREL_STATE_FINISHEDCOPY,
1483 "LogicalRepSyncTableStart: '%s' origin_startpos lsn %X/%08X",
1513 char *sync_slotname = NULL;
1544 pfree(sync_slotname);
1558 char *slotname = NULL;
1566 sizeof(originname));
1655 bool nulls[Natts_pg_subscription];
1656 bool replaces[Natts_pg_subscription];
1659 Assert(new_state == LOGICALREP_TWOPHASE_STATE_DISABLED ||
1660 new_state == LOGICALREP_TWOPHASE_STATE_PENDING ||
1661 new_state == LOGICALREP_TWOPHASE_STATE_ENABLED);
1667 "cache lookup failed for subscription oid %u",
1672 memset(nulls,
false,
sizeof(nulls));
1673 memset(replaces,
false,
sizeof(replaces));
1677 replaces[Anum_pg_subscription_subtwophasestate - 1] =
true;
1680 values, nulls, replaces);
void aclcheck_error(AclResult aclerr, ObjectType objtype, const char *objectname)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
#define DatumGetArrayTypeP(X)
void set_stream_options(WalRcvStreamOptions *options, char *slotname, XLogRecPtr *origin_startpos)
void start_apply(XLogRecPtr origin_startpos)
void DisableSubscriptionAndExit(void)
void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid, char *originname, Size szoriginname)
void set_apply_error_context_origin(char *originname)
MemoryContext ApplyContext
void SetupApplyOrSyncWorker(int worker_slot)
WalReceiverConn * LogRepWorkerWalRcvConn
Subscription * MySubscription
bool bms_is_member(int x, const Bitmapset *a)
Bitmapset * bms_add_member(Bitmapset *a, int x)
static Datum values[MAXATTR]
#define TextDatumGetCString(d)
#define OidIsValid(objectId)
CopyFromState BeginCopyFrom(ParseState *pstate, Relation rel, Node *whereClause, const char *filename, bool is_program, copy_data_source_cb data_source_cb, List *attnamelist, List *options)
uint64 CopyFrom(CopyFromState cstate)
void * hash_search(HTAB *hashp, const void *keyPtr, HASHACTION action, bool *foundPtr)
HTAB * hash_create(const char *tabname, int64 nelem, const HASHCTL *info, int flags)
void hash_destroy(HTAB *hashp)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
Assert(PointerIsAligned(start, uint64))
HeapTuple heap_modify_tuple(HeapTuple tuple, TupleDesc tupleDesc, const Datum *replValues, const bool *replIsnull, const bool *doReplace)
void heap_freetuple(HeapTuple htup)
#define HeapTupleIsValid(tuple)
#define MaxTupleAttributeNumber
void CatalogTupleUpdate(Relation heapRel, const ItemPointerData *otid, HeapTuple tup)
int WaitLatchOrSocket(Latch *latch, int wakeEvents, pgsocket sock, long timeout, uint32 wait_event_info)
void ResetLatch(Latch *latch)
int WaitLatch(Latch *latch, int wakeEvents, long timeout, uint32 wait_event_info)
void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker)
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
static dshash_table * last_start_times
LogicalRepWorker * MyLogicalRepWorker
int logicalrep_sync_worker_count(Oid subid)
void ApplyLauncherForgetWorkerStartTime(Oid subid)
List * lappend(List *list, void *datum)
void list_free_deep(List *list)
void LockSharedObject(Oid classid, Oid objid, uint16 objsubid, LOCKMODE lockmode)
void UnlockRelationOid(Oid relid, LOCKMODE lockmode)
void LockRelationOid(Oid relid, LOCKMODE lockmode)
char get_rel_relkind(Oid relid)
char * get_namespace_name(Oid nspid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
DefElem * makeDefElem(char *name, Node *arg, int location)
char * MemoryContextStrdup(MemoryContext context, const char *string)
void pfree(void *pointer)
void * palloc0(Size size)
#define CHECK_FOR_INTERRUPTS()
char * GetUserNameFromId(Oid roleid, bool noerr)
ObjectType get_relkind_objtype(char relkind)
TimestampTz replorigin_session_origin_timestamp
RepOriginId replorigin_by_name(const char *roname, bool missing_ok)
RepOriginId replorigin_create(const char *roname)
void replorigin_session_reset(void)
void replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
RepOriginId replorigin_session_origin
void replorigin_advance(RepOriginId node, XLogRecPtr remote_commit, XLogRecPtr local_commit, bool go_backward, bool wal_log)
void replorigin_session_setup(RepOriginId node, int acquired_by)
XLogRecPtr replorigin_session_get_progress(bool flush)
XLogRecPtr replorigin_session_origin_lsn
#define InvalidRepOriginId
ParseState * make_parsestate(ParseState *parentParseState)
ParseNamespaceItem * addRangeTableEntryForRelation(ParseState *pstate, Relation rel, int lockmode, Alias *alias, bool inh, bool inFromCl)
static int server_version
#define for_each_from(cell, lst, N)
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
char GetSubscriptionRelState(Oid subid, Oid relid, XLogRecPtr *sublsn)
void GetPublicationsStr(List *publications, StringInfo dest, bool quote_literal)
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
static bool DatumGetBool(Datum X)
static Oid DatumGetObjectId(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static char DatumGetChar(Datum X)
static int16 DatumGetInt16(Datum X)
static int32 DatumGetInt32(Datum X)
static Datum CharGetDatum(char X)
static int fd(const char *x, int i)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetRelid(relation)
#define RelationGetDescr(relation)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
int check_enable_rls(Oid relid, Oid checkAsUser, bool noError)
char * quote_qualified_identifier(const char *qualifier, const char *ident)
const char * quote_identifier(const char *ident)
Snapshot GetTransactionSnapshot(void)
void PushActiveSnapshot(Snapshot snapshot)
void PopActiveSnapshot(void)
void InvalidateCatalogSnapshot(void)
#define SpinLockRelease(lock)
#define SpinLockAcquire(lock)
void logicalrep_relmap_update(LogicalRepRelation *remoterel)
void logicalrep_rel_close(LogicalRepRelMapEntry *rel, LOCKMODE lockmode)
LogicalRepRelMapEntry * logicalrep_rel_open(LogicalRepRelId remoteid, LOCKMODE lockmode)
#define ERRCODE_DUPLICATE_OBJECT
void destroyStringInfo(StringInfo str)
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void appendStringInfoChar(StringInfo str, char ch)
void initStringInfo(StringInfo str)
LogicalRepRelation remoterel
Tuplestorestate * tuplestore
void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok)
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
pg_noreturn void FinishSyncWorker(void)
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
#define SearchSysCacheCopy1(cacheId, key1)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
List * table_states_not_ready
bool AllTablesyncsReady(void)
static bool wait_for_worker_state_change(char expected_state)
static bool wait_for_table_state_change(Oid relid, char expected_state)
void ProcessSyncingTablesForSync(XLogRecPtr current_lsn)
static List * make_copy_attnamelist(LogicalRepRelMapEntry *rel)
bool HasSubscriptionTablesCached(void)
void TableSyncWorkerMain(Datum main_arg)
static void fetch_remote_table_info(char *nspname, char *relname, LogicalRepRelation *lrel, List **qual, bool *gencol_published)
void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot)
static void run_tablesync_worker()
static int copy_read_data(void *outbuf, int minread, int maxread)
static char * LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
static void copy_table(Relation rel)
void ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
static void start_table_sync(XLogRecPtr *origin_startpos, char **slotname)
static StringInfo copybuf
void UpdateTwoPhaseState(Oid suboid, char new_state)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
int64 tuplestore_tuple_count(Tuplestorestate *state)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
static TupleTableSlot * ExecClearTuple(TupleTableSlot *slot)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
String * makeString(char *str)
#define WL_SOCKET_READABLE
#define WL_EXIT_ON_PM_DEATH
#define walrcv_startstreaming(conn, options)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_server_version(conn)
#define walrcv_endstreaming(conn, next_tli)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
#define walrcv_receive(conn, buffer, wait_fd)
static bool am_tablesync_worker(void)
bool IsTransactionState(void)
void CommandCounterIncrement(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
uint64 GetSystemIdentifier(void)
#define LSN_FORMAT_ARGS(lsn)
#define InvalidXLogRecPtr