64#include "utils/fmgroids.h"
73#define REMOTE_SEQ_COL_COUNT 10
98 bool has_pending_sequences;
109 if (!has_pending_sequences)
118 if (sequencesync_worker)
173 List *missing_seqs_idx)
178 if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
183 if (mismatched_seqs_idx)
187 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
188 errmsg_plural(
"mismatched or renamed sequence on subscriber (%s)",
189 "mismatched or renamed sequences on subscriber (%s)",
194 if (insuffperm_seqs_idx)
198 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
200 "insufficient privileges on sequences (%s)",
205 if (missing_seqs_idx)
209 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
211 "missing sequences on publisher (%s)",
217 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
218 errmsg(
"logical replication sequence synchronization failed for subscription \"%s\"",
237 int64 remote_increment;
250 *seqinfo = seqinfo_local =
295 elog(
ERROR,
"cache lookup failed for sequence %u",
301 if (local_seq->seqtypid != remote_typid ||
302 local_seq->seqstart != remote_start ||
303 local_seq->seqincrement != remote_increment ||
304 local_seq->seqmin != remote_min ||
305 local_seq->seqmax != remote_max ||
306 local_seq->seqcycle != remote_cycle)
310 if (strcmp(seqinfo_local->
nspname,
377 int cur_batch_base_index = 0;
379 List *mismatched_seqs_idx =
NIL;
381 List *insuffperm_seqs_idx =
NIL;
386#define MAX_SEQUENCES_SYNC_PER_BATCH 100
389 "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
392 while (cur_batch_base_index < n_seqinfos)
395 BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
397 int batch_succeeded_count = 0;
398 int batch_mismatched_count = 0;
399 int batch_skipped_count = 0;
400 int batch_insuffperm_count = 0;
401 int batch_missing_count;
409 for (
int idx = cur_batch_base_index;
idx < n_seqinfos;
idx++)
411 char *nspname_literal;
412 char *seqname_literal;
424 nspname_literal, seqname_literal,
idx);
463 "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
464 " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
465 " seq.seqmax, seq.seqcycle\n"
466 "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
467 "JOIN pg_namespace n ON n.nspname = s.schname\n"
468 "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
469 "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
470 "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
476 errcode(ERRCODE_CONNECTION_FAILURE),
477 errmsg(
"could not fetch sequence information from the publisher: %s",
499 sequence_rel->
rd_rel->relowner);
505 "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
508 batch_succeeded_count++;
518 mismatched_seqs_idx =
lappend_int(mismatched_seqs_idx,
521 batch_mismatched_count++;
531 insuffperm_seqs_idx =
lappend_int(insuffperm_seqs_idx,
534 batch_insuffperm_count++;
538 errmsg(
"skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
541 batch_skipped_count++;
554 batch_missing_count = batch_size - (batch_succeeded_count +
555 batch_mismatched_count +
556 batch_insuffperm_count +
557 batch_skipped_count);
560 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
563 batch_size, batch_succeeded_count, batch_mismatched_count,
564 batch_insuffperm_count, batch_missing_count, batch_skipped_count);
569 if (batch_missing_count)
571 for (
int idx = cur_batch_base_index;
idx < cur_batch_base_index + batch_size;
idx++)
587 cur_batch_base_index += batch_size;
603 bool must_use_password;
616 Anum_pg_subscription_rel_srsubid,
621 Anum_pg_subscription_rel_srsubstate,
645 if (sequence_rel->
rd_rel->relkind != RELKIND_SEQUENCE)
697 errcode(ERRCODE_CONNECTION_FAILURE),
698 errmsg(
"sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
Datum idx(PG_FUNCTION_ARGS)
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
void DisableSubscriptionAndExit(void)
MemoryContext ApplyContext
void SetupApplyOrSyncWorker(int worker_slot)
WalReceiverConn * LogRepWorkerWalRcvConn
Subscription * MySubscription
void SetSequence(Oid relid, int64 next, bool iscalled)
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
int errcode(int sqlerrcode)
int errmsg(const char *fmt,...)
#define ereport(elevel,...)
void err(int eval, const char *fmt,...)
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
const TupleTableSlotOps TTSOpsMinimalTuple
#define palloc0_object(type)
void systable_endscan(SysScanDesc sysscan)
HeapTuple systable_getnext(SysScanDesc sysscan)
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
void ProcessConfigFile(GucContext context)
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
static void * GETSTRUCT(const HeapTupleData *tuple)
volatile sig_atomic_t ConfigReloadPending
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
LogicalRepWorker * MyLogicalRepWorker
int logicalrep_sync_worker_count(Oid subid)
List * lappend(List *list, void *datum)
List * lappend_int(List *list, int datum)
char * get_namespace_name(Oid nspid)
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
void LWLockRelease(LWLock *lock)
char * pstrdup(const char *in)
void pfree(void *pointer)
#define CHECK_FOR_INTERRUPTS()
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
static int list_length(const List *l)
static void * list_nth(const List *list, int n)
#define foreach_int(var, lst)
static XLogRecPtr DatumGetLSN(Datum X)
FormData_pg_sequence * Form_pg_sequence
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
FormData_pg_subscription_rel * Form_pg_subscription_rel
long pgstat_report_stat(bool force)
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
static bool DatumGetBool(Datum X)
static int64 DatumGetInt64(Datum X)
static Oid DatumGetObjectId(Datum X)
static Datum ObjectIdGetDatum(Oid X)
static int32 DatumGetInt32(Datum X)
static Datum CharGetDatum(char X)
char * quote_literal_cstr(const char *rawstr)
#define RelationGetRelationName(relation)
#define RelationGetNamespace(relation)
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
#define REMOTE_SEQ_COL_COUNT
@ COPYSEQ_INSUFFICIENT_PERM
static CopySeqResult get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx)
static void start_sequence_sync()
#define MAX_SEQUENCES_SYNC_PER_BATCH
void SequenceSyncWorkerMain(Datum main_arg)
static void LogicalRepSyncSequences(void)
static void copy_sequences(WalReceiverConn *conn)
static void get_sequences_string(List *seqindexes, StringInfo buf)
static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, List *missing_seqs_idx)
void ProcessSequencesForSync(void)
static CopySeqResult copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
#define BTEqualStrategyNumber
StringInfo makeStringInfo(void)
void resetStringInfo(StringInfo str)
void appendStringInfo(StringInfo str, const char *fmt,...)
void appendStringInfoString(StringInfo str, const char *s)
void initStringInfo(StringInfo str)
TimestampTz last_seqsync_start_time
Tuplestorestate * tuplestore
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
pg_noreturn void FinishSyncWorker(void)
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
void ReleaseSysCache(HeapTuple tuple)
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
void table_close(Relation relation, LOCKMODE lockmode)
Relation table_open(Oid relationId, LOCKMODE lockmode)
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
void SwitchToUntrustedUser(Oid userid, UserContext *context)
void RestoreUserContext(UserContext *context)
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
static void walrcv_clear_result(WalRcvExecResult *walres)
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
@ WORKERTYPE_SEQUENCESYNC
static bool am_sequencesync_worker(void)
void StartTransactionCommand(void)
void CommitTransactionCommand(void)
void AbortOutOfAnyTransaction(void)
uint64 GetSystemIdentifier(void)