PostgreSQL Source Code git master
sequencesync.c
Go to the documentation of this file.
1/*-------------------------------------------------------------------------
2 * sequencesync.c
3 * PostgreSQL logical replication: sequence synchronization
4 *
5 * Copyright (c) 2025, PostgreSQL Global Development Group
6 *
7 * IDENTIFICATION
8 * src/backend/replication/logical/sequencesync.c
9 *
10 * NOTES
11 * This file contains code for sequence synchronization for
12 * logical replication.
13 *
14 * Sequences requiring synchronization are tracked in the pg_subscription_rel
15 * catalog.
16 *
17 * Sequences to be synchronized will be added with state INIT when either of
18 * the following commands is executed:
19 * CREATE SUBSCRIPTION
20 * ALTER SUBSCRIPTION ... REFRESH PUBLICATION
21 *
22 * Executing the following command resets all sequences in the subscription to
23 * state INIT, triggering re-synchronization:
24 * ALTER SUBSCRIPTION ... REFRESH SEQUENCES
25 *
26 * The apply worker periodically scans pg_subscription_rel for sequences in
27 * INIT state. When such sequences are found, it spawns a sequencesync worker
28 * to handle synchronization.
29 *
30 * A single sequencesync worker is responsible for synchronizing all sequences.
31 * It begins by retrieving the list of sequences that are flagged for
32 * synchronization, i.e., those in the INIT state. These sequences are then
33 * processed in batches, allowing multiple entries to be synchronized within a
34 * single transaction. The worker fetches the current sequence values and page
35 * LSNs from the remote publisher, updates the corresponding sequences on the
36 * local subscriber, and finally marks each sequence as READY upon successful
37 * synchronization.
38 *
39 * Sequence state transitions follow this pattern:
40 * INIT -> READY
41 *
42 * To avoid creating too many transactions, up to MAX_SEQUENCES_SYNC_PER_BATCH
43 * sequences are synchronized per transaction. The locks on the sequence
44 * relation will be periodically released at each transaction commit.
45 *
46 * XXX: We didn't choose launcher process to maintain the launch of sequencesync
47 * worker as it didn't have database connection to access the sequences from the
48 * pg_subscription_rel system catalog that need to be synchronized.
49 *-------------------------------------------------------------------------
50 */
51
52#include "postgres.h"
53
54#include "access/table.h"
55#include "catalog/pg_sequence.h"
57#include "commands/sequence.h"
58#include "pgstat.h"
62#include "utils/acl.h"
63#include "utils/builtins.h"
64#include "utils/fmgroids.h"
65#include "utils/guc.h"
66#include "utils/inval.h"
67#include "utils/lsyscache.h"
68#include "utils/memutils.h"
69#include "utils/pg_lsn.h"
70#include "utils/syscache.h"
71#include "utils/usercontext.h"
72
73#define REMOTE_SEQ_COL_COUNT 10
74
75typedef enum CopySeqResult
76{
82
83static List *seqinfos = NIL;
84
85/*
86 * Apply worker determines if sequence synchronization is needed.
87 *
88 * Start a sequencesync worker if one is not already running. The active
89 * sequencesync worker will handle all pending sequence synchronization. If any
90 * sequences remain unsynchronized after it exits, a new worker can be started
91 * in the next iteration.
92 */
93void
95{
96 LogicalRepWorker *sequencesync_worker;
97 int nsyncworkers;
98 bool has_pending_sequences;
99 bool started_tx;
100
101 FetchRelationStates(NULL, &has_pending_sequences, &started_tx);
102
103 if (started_tx)
104 {
106 pgstat_report_stat(true);
107 }
108
109 if (!has_pending_sequences)
110 return;
111
112 LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
113
114 /* Check if there is a sequencesync worker already running? */
117 InvalidOid, true);
118 if (sequencesync_worker)
119 {
120 LWLockRelease(LogicalRepWorkerLock);
121 return;
122 }
123
124 /*
125 * Count running sync workers for this subscription, while we have the
126 * lock.
127 */
129 LWLockRelease(LogicalRepWorkerLock);
130
131 /*
132 * It is okay to read/update last_seqsync_start_time here in apply worker
133 * as we have already ensured that sync worker doesn't exist.
134 */
137}
138
139/*
140 * get_sequences_string
141 *
142 * Build a comma-separated string of schema-qualified sequence names
143 * for the given list of sequence indexes.
144 */
145static void
147{
149 foreach_int(seqidx, seqindexes)
150 {
151 LogicalRepSequenceInfo *seqinfo =
153
154 if (buf->len > 0)
156
157 appendStringInfo(buf, "\"%s.%s\"", seqinfo->nspname, seqinfo->seqname);
158 }
159}
160
161/*
162 * report_sequence_errors
163 *
164 * Report discrepancies found during sequence synchronization between
165 * the publisher and subscriber. Emits warnings for:
166 * a) mismatched definitions or concurrent rename
167 * b) insufficient privileges
168 * c) missing sequences on the subscriber
169 * Then raises an ERROR to indicate synchronization failure.
170 */
171static void
172report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx,
173 List *missing_seqs_idx)
174{
175 StringInfo seqstr;
176
177 /* Quick exit if there are no errors to report */
178 if (!mismatched_seqs_idx && !insuffperm_seqs_idx && !missing_seqs_idx)
179 return;
180
181 seqstr = makeStringInfo();
182
183 if (mismatched_seqs_idx)
184 {
185 get_sequences_string(mismatched_seqs_idx, seqstr);
187 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
188 errmsg_plural("mismatched or renamed sequence on subscriber (%s)",
189 "mismatched or renamed sequences on subscriber (%s)",
190 list_length(mismatched_seqs_idx),
191 seqstr->data));
192 }
193
194 if (insuffperm_seqs_idx)
195 {
196 get_sequences_string(insuffperm_seqs_idx, seqstr);
198 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
199 errmsg_plural("insufficient privileges on sequence (%s)",
200 "insufficient privileges on sequences (%s)",
201 list_length(insuffperm_seqs_idx),
202 seqstr->data));
203 }
204
205 if (missing_seqs_idx)
206 {
207 get_sequences_string(missing_seqs_idx, seqstr);
209 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
210 errmsg_plural("missing sequence on publisher (%s)",
211 "missing sequences on publisher (%s)",
212 list_length(missing_seqs_idx),
213 seqstr->data));
214 }
215
217 errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
218 errmsg("logical replication sequence synchronization failed for subscription \"%s\"",
220}
221
222/*
223 * get_and_validate_seq_info
224 *
225 * Extracts remote sequence information from the tuple slot received from the
226 * publisher, and validates it against the corresponding local sequence
227 * definition.
228 */
229static CopySeqResult
231 LogicalRepSequenceInfo **seqinfo, int *seqidx)
232{
233 bool isnull;
234 int col = 0;
235 Oid remote_typid;
236 int64 remote_start;
237 int64 remote_increment;
238 int64 remote_min;
239 int64 remote_max;
240 bool remote_cycle;
242 HeapTuple tup;
243 Form_pg_sequence local_seq;
244 LogicalRepSequenceInfo *seqinfo_local;
245
246 *seqidx = DatumGetInt32(slot_getattr(slot, ++col, &isnull));
247 Assert(!isnull);
248
249 /* Identify the corresponding local sequence for the given index. */
250 *seqinfo = seqinfo_local =
252
253 seqinfo_local->last_value = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
254 Assert(!isnull);
255
256 seqinfo_local->is_called = DatumGetBool(slot_getattr(slot, ++col, &isnull));
257 Assert(!isnull);
258
259 seqinfo_local->page_lsn = DatumGetLSN(slot_getattr(slot, ++col, &isnull));
260 Assert(!isnull);
261
262 remote_typid = DatumGetObjectId(slot_getattr(slot, ++col, &isnull));
263 Assert(!isnull);
264
265 remote_start = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
266 Assert(!isnull);
267
268 remote_increment = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
269 Assert(!isnull);
270
271 remote_min = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
272 Assert(!isnull);
273
274 remote_max = DatumGetInt64(slot_getattr(slot, ++col, &isnull));
275 Assert(!isnull);
276
277 remote_cycle = DatumGetBool(slot_getattr(slot, ++col, &isnull));
278 Assert(!isnull);
279
280 /* Sanity check */
282
283 seqinfo_local->found_on_pub = true;
284
285 *sequence_rel = try_table_open(seqinfo_local->localrelid, RowExclusiveLock);
286
287 /* Sequence was concurrently dropped? */
288 if (!*sequence_rel)
289 return COPYSEQ_SKIPPED;
290
291 tup = SearchSysCache1(SEQRELID, ObjectIdGetDatum(seqinfo_local->localrelid));
292
293 /* Sequence was concurrently dropped? */
294 if (!HeapTupleIsValid(tup))
295 elog(ERROR, "cache lookup failed for sequence %u",
296 seqinfo_local->localrelid);
297
298 local_seq = (Form_pg_sequence) GETSTRUCT(tup);
299
300 /* Sequence parameters for remote/local are the same? */
301 if (local_seq->seqtypid != remote_typid ||
302 local_seq->seqstart != remote_start ||
303 local_seq->seqincrement != remote_increment ||
304 local_seq->seqmin != remote_min ||
305 local_seq->seqmax != remote_max ||
306 local_seq->seqcycle != remote_cycle)
307 result = COPYSEQ_MISMATCH;
308
309 /* Sequence was concurrently renamed? */
310 if (strcmp(seqinfo_local->nspname,
311 get_namespace_name(RelationGetNamespace(*sequence_rel))) ||
312 strcmp(seqinfo_local->seqname, RelationGetRelationName(*sequence_rel)))
313 result = COPYSEQ_MISMATCH;
314
315 ReleaseSysCache(tup);
316 return result;
317}
318
319/*
320 * Apply remote sequence state to local sequence and mark it as
321 * synchronized (READY).
322 */
323static CopySeqResult
325{
326 UserContext ucxt;
327 AclResult aclresult;
328 bool run_as_owner = MySubscription->runasowner;
329 Oid seqoid = seqinfo->localrelid;
330
331 /*
332 * If the user did not opt to run as the owner of the subscription
333 * ('run_as_owner'), then copy the sequence as the owner of the sequence.
334 */
335 if (!run_as_owner)
336 SwitchToUntrustedUser(seqowner, &ucxt);
337
338 aclresult = pg_class_aclcheck(seqoid, GetUserId(), ACL_UPDATE);
339
340 if (aclresult != ACLCHECK_OK)
341 {
342 if (!run_as_owner)
343 RestoreUserContext(&ucxt);
344
346 }
347
348 /*
349 * The log counter (log_cnt) tracks how many sequence values are still
350 * unused locally. It is only relevant to the local node and managed
351 * internally by nextval() when allocating new ranges. Since log_cnt does
352 * not affect the visible sequence state (like last_value or is_called)
353 * and is only used for local caching, it need not be copied to the
354 * subscriber during synchronization.
355 */
356 SetSequence(seqoid, seqinfo->last_value, seqinfo->is_called);
357
358 if (!run_as_owner)
359 RestoreUserContext(&ucxt);
360
361 /*
362 * Record the remote sequence's LSN in pg_subscription_rel and mark the
363 * sequence as READY.
364 */
365 UpdateSubscriptionRelState(MySubscription->oid, seqoid, SUBREL_STATE_READY,
366 seqinfo->page_lsn, false);
367
368 return COPYSEQ_SUCCESS;
369}
370
371/*
372 * Copy existing data of sequences from the publisher.
373 */
374static void
376{
377 int cur_batch_base_index = 0;
378 int n_seqinfos = list_length(seqinfos);
379 List *mismatched_seqs_idx = NIL;
380 List *missing_seqs_idx = NIL;
381 List *insuffperm_seqs_idx = NIL;
382 StringInfo seqstr = makeStringInfo();
384 MemoryContext oldctx;
385
386#define MAX_SEQUENCES_SYNC_PER_BATCH 100
387
388 elog(DEBUG1,
389 "logical replication sequence synchronization for subscription \"%s\" - total unsynchronized: %d",
390 MySubscription->name, n_seqinfos);
391
392 while (cur_batch_base_index < n_seqinfos)
393 {
394 Oid seqRow[REMOTE_SEQ_COL_COUNT] = {INT8OID, INT8OID,
395 BOOLOID, LSNOID, OIDOID, INT8OID, INT8OID, INT8OID, INT8OID, BOOLOID};
396 int batch_size = 0;
397 int batch_succeeded_count = 0;
398 int batch_mismatched_count = 0;
399 int batch_skipped_count = 0;
400 int batch_insuffperm_count = 0;
401 int batch_missing_count;
402 Relation sequence_rel;
403
404 WalRcvExecResult *res;
405 TupleTableSlot *slot;
406
408
409 for (int idx = cur_batch_base_index; idx < n_seqinfos; idx++)
410 {
411 char *nspname_literal;
412 char *seqname_literal;
413
414 LogicalRepSequenceInfo *seqinfo =
416
417 if (seqstr->len > 0)
418 appendStringInfoString(seqstr, ", ");
419
420 nspname_literal = quote_literal_cstr(seqinfo->nspname);
421 seqname_literal = quote_literal_cstr(seqinfo->seqname);
422
423 appendStringInfo(seqstr, "(%s, %s, %d)",
424 nspname_literal, seqname_literal, idx);
425
426 if (++batch_size == MAX_SEQUENCES_SYNC_PER_BATCH)
427 break;
428 }
429
430 /*
431 * We deliberately avoid acquiring a local lock on the sequence before
432 * querying the publisher to prevent potential distributed deadlocks
433 * in bi-directional replication setups.
434 *
435 * Example scenario:
436 *
437 * - On each node, a background worker acquires a lock on a sequence
438 * as part of a sync operation.
439 *
440 * - Concurrently, a user transaction attempts to alter the same
441 * sequence, waiting on the background worker's lock.
442 *
443 * - Meanwhile, a query from the other node tries to access metadata
444 * that depends on the completion of the alter operation.
445 *
446 * - This creates a circular wait across nodes:
447 *
448 * Node-1: Query -> waits on Alter -> waits on Sync Worker
449 *
450 * Node-2: Query -> waits on Alter -> waits on Sync Worker
451 *
452 * Since each node only sees part of the wait graph, the deadlock may
453 * go undetected, leading to indefinite blocking.
454 *
455 * Note: Each entry in VALUES includes an index 'seqidx' that
456 * represents the sequence's position in the local 'seqinfos' list.
457 * This index is propagated to the query results and later used to
458 * directly map the fetched publisher sequence rows back to their
459 * corresponding local entries without relying on result order or name
460 * matching.
461 */
463 "SELECT s.seqidx, ps.*, seq.seqtypid,\n"
464 " seq.seqstart, seq.seqincrement, seq.seqmin,\n"
465 " seq.seqmax, seq.seqcycle\n"
466 "FROM ( VALUES %s ) AS s (schname, seqname, seqidx)\n"
467 "JOIN pg_namespace n ON n.nspname = s.schname\n"
468 "JOIN pg_class c ON c.relnamespace = n.oid AND c.relname = s.seqname\n"
469 "JOIN pg_sequence seq ON seq.seqrelid = c.oid\n"
470 "JOIN LATERAL pg_get_sequence_data(seq.seqrelid) AS ps ON true\n",
471 seqstr->data);
472
473 res = walrcv_exec(conn, cmd->data, lengthof(seqRow), seqRow);
474 if (res->status != WALRCV_OK_TUPLES)
476 errcode(ERRCODE_CONNECTION_FAILURE),
477 errmsg("could not fetch sequence information from the publisher: %s",
478 res->err));
479
481 while (tuplestore_gettupleslot(res->tuplestore, true, false, slot))
482 {
483 CopySeqResult sync_status;
484 LogicalRepSequenceInfo *seqinfo;
485 int seqidx;
486
488
490 {
491 ConfigReloadPending = false;
493 }
494
495 sync_status = get_and_validate_seq_info(slot, &sequence_rel,
496 &seqinfo, &seqidx);
497 if (sync_status == COPYSEQ_SUCCESS)
498 sync_status = copy_sequence(seqinfo,
499 sequence_rel->rd_rel->relowner);
500
501 switch (sync_status)
502 {
503 case COPYSEQ_SUCCESS:
504 elog(DEBUG1,
505 "logical replication synchronization for subscription \"%s\", sequence \"%s.%s\" has finished",
506 MySubscription->name, seqinfo->nspname,
507 seqinfo->seqname);
508 batch_succeeded_count++;
509 break;
510 case COPYSEQ_MISMATCH:
511
512 /*
513 * Remember mismatched sequences in a long-lived memory
514 * context since these will be used after the transaction
515 * is committed.
516 */
518 mismatched_seqs_idx = lappend_int(mismatched_seqs_idx,
519 seqidx);
520 MemoryContextSwitchTo(oldctx);
521 batch_mismatched_count++;
522 break;
524
525 /*
526 * Remember sequences with insufficient privileges in a
527 * long-lived memory context since these will be used
528 * after the transaction is committed.
529 */
531 insuffperm_seqs_idx = lappend_int(insuffperm_seqs_idx,
532 seqidx);
533 MemoryContextSwitchTo(oldctx);
534 batch_insuffperm_count++;
535 break;
536 case COPYSEQ_SKIPPED:
537 ereport(LOG,
538 errmsg("skip synchronization of sequence \"%s.%s\" because it has been dropped concurrently",
539 seqinfo->nspname,
540 seqinfo->seqname));
541 batch_skipped_count++;
542 break;
543 }
544
545 if (sequence_rel)
546 table_close(sequence_rel, NoLock);
547 }
548
551 resetStringInfo(seqstr);
552 resetStringInfo(cmd);
553
554 batch_missing_count = batch_size - (batch_succeeded_count +
555 batch_mismatched_count +
556 batch_insuffperm_count +
557 batch_skipped_count);
558
559 elog(DEBUG1,
560 "logical replication sequence synchronization for subscription \"%s\" - batch #%d = %d attempted, %d succeeded, %d mismatched, %d insufficient permission, %d missing from publisher, %d skipped",
562 (cur_batch_base_index / MAX_SEQUENCES_SYNC_PER_BATCH) + 1,
563 batch_size, batch_succeeded_count, batch_mismatched_count,
564 batch_insuffperm_count, batch_missing_count, batch_skipped_count);
565
566 /* Commit this batch, and prepare for next batch */
568
569 if (batch_missing_count)
570 {
571 for (int idx = cur_batch_base_index; idx < cur_batch_base_index + batch_size; idx++)
572 {
573 LogicalRepSequenceInfo *seqinfo =
575
576 /* If the sequence was not found on publisher, record it */
577 if (!seqinfo->found_on_pub)
578 missing_seqs_idx = lappend_int(missing_seqs_idx, idx);
579 }
580 }
581
582 /*
583 * cur_batch_base_index is not incremented sequentially because some
584 * sequences may be missing, and the number of fetched rows may not
585 * match the batch size.
586 */
587 cur_batch_base_index += batch_size;
588 }
589
590 /* Report mismatches, permission issues, or missing sequences */
591 report_sequence_errors(mismatched_seqs_idx, insuffperm_seqs_idx,
592 missing_seqs_idx);
593}
594
595/*
596 * Identifies sequences that require synchronization and initiates the
597 * synchronization process.
598 */
599static void
601{
602 char *err;
603 bool must_use_password;
604 Relation rel;
605 HeapTuple tup;
606 ScanKeyData skey[2];
607 SysScanDesc scan;
609 StringInfoData app_name;
610
612
613 rel = table_open(SubscriptionRelRelationId, AccessShareLock);
614
615 ScanKeyInit(&skey[0],
616 Anum_pg_subscription_rel_srsubid,
617 BTEqualStrategyNumber, F_OIDEQ,
618 ObjectIdGetDatum(subid));
619
620 ScanKeyInit(&skey[1],
621 Anum_pg_subscription_rel_srsubstate,
622 BTEqualStrategyNumber, F_CHAREQ,
623 CharGetDatum(SUBREL_STATE_INIT));
624
625 scan = systable_beginscan(rel, InvalidOid, false,
626 NULL, 2, skey);
627 while (HeapTupleIsValid(tup = systable_getnext(scan)))
628 {
631 Relation sequence_rel;
632 MemoryContext oldctx;
633
635
636 subrel = (Form_pg_subscription_rel) GETSTRUCT(tup);
637
638 sequence_rel = try_table_open(subrel->srrelid, RowExclusiveLock);
639
640 /* Skip if sequence was dropped concurrently */
641 if (!sequence_rel)
642 continue;
643
644 /* Skip if the relation is not a sequence */
645 if (sequence_rel->rd_rel->relkind != RELKIND_SEQUENCE)
646 {
647 table_close(sequence_rel, NoLock);
648 continue;
649 }
650
651 /*
652 * Worker needs to process sequences across transaction boundary, so
653 * allocate them under long-lived context.
654 */
656
658 seq->localrelid = subrel->srrelid;
659 seq->nspname = get_namespace_name(RelationGetNamespace(sequence_rel));
660 seq->seqname = pstrdup(RelationGetRelationName(sequence_rel));
661 seqinfos = lappend(seqinfos, seq);
662
663 MemoryContextSwitchTo(oldctx);
664
665 table_close(sequence_rel, NoLock);
666 }
667
668 /* Cleanup */
669 systable_endscan(scan);
671
673
674 /*
675 * Exit early if no catalog entries found, likely due to concurrent drops.
676 */
677 if (!seqinfos)
678 return;
679
680 /* Is the use of a password mandatory? */
681 must_use_password = MySubscription->passwordrequired &&
683
684 initStringInfo(&app_name);
685 appendStringInfo(&app_name, "pg_%u_sequence_sync_" UINT64_FORMAT,
687
688 /*
689 * Establish the connection to the publisher for sequence synchronization.
690 */
693 must_use_password,
694 app_name.data, &err);
695 if (LogRepWorkerWalRcvConn == NULL)
697 errcode(ERRCODE_CONNECTION_FAILURE),
698 errmsg("sequencesync worker for subscription \"%s\" could not connect to the publisher: %s",
700
701 pfree(app_name.data);
702
704}
705
706/*
707 * Execute the initial sync with error handling. Disable the subscription,
708 * if required.
709 *
710 * Note that we don't handle FATAL errors which are probably because of system
711 * resource error and are not repeatable.
712 */
713static void
715{
717
718 PG_TRY();
719 {
720 /* Call initial sync. */
722 }
723 PG_CATCH();
724 {
727 else
728 {
729 /*
730 * Report the worker failed during sequence synchronization. Abort
731 * the current transaction so that the stats message is sent in an
732 * idle state.
733 */
737
738 PG_RE_THROW();
739 }
740 }
741 PG_END_TRY();
742}
743
744/* Logical Replication sequencesync worker entry point */
745void
747{
748 int worker_slot = DatumGetInt32(main_arg);
749
750 SetupApplyOrSyncWorker(worker_slot);
751
753
755}
Datum idx(PG_FUNCTION_ARGS)
Definition: _int_op.c:262
AclResult
Definition: acl.h:182
@ ACLCHECK_OK
Definition: acl.h:183
AclResult pg_class_aclcheck(Oid table_oid, Oid roleid, AclMode mode)
Definition: aclchk.c:4037
void DisableSubscriptionAndExit(void)
Definition: worker.c:5943
MemoryContext ApplyContext
Definition: worker.c:472
void SetupApplyOrSyncWorker(int worker_slot)
Definition: worker.c:5869
WalReceiverConn * LogRepWorkerWalRcvConn
Definition: worker.c:477
Subscription * MySubscription
Definition: worker.c:479
int64_t int64
Definition: c.h:540
#define UINT64_FORMAT
Definition: c.h:562
#define lengthof(array)
Definition: c.h:792
void SetSequence(Oid relid, int64 next, bool iscalled)
Definition: sequence.c:957
int errmsg_plural(const char *fmt_singular, const char *fmt_plural, unsigned long n,...)
Definition: elog.c:1193
int errcode(int sqlerrcode)
Definition: elog.c:863
int errmsg(const char *fmt,...)
Definition: elog.c:1080
#define LOG
Definition: elog.h:31
#define PG_RE_THROW()
Definition: elog.h:405
#define PG_TRY(...)
Definition: elog.h:372
#define WARNING
Definition: elog.h:36
#define PG_END_TRY(...)
Definition: elog.h:397
#define DEBUG1
Definition: elog.h:30
#define ERROR
Definition: elog.h:39
#define PG_CATCH(...)
Definition: elog.h:382
#define elog(elevel,...)
Definition: elog.h:226
#define ereport(elevel,...)
Definition: elog.h:150
void err(int eval, const char *fmt,...)
Definition: err.c:43
TupleTableSlot * MakeSingleTupleTableSlot(TupleDesc tupdesc, const TupleTableSlotOps *tts_ops)
Definition: execTuples.c:1427
void ExecDropSingleTupleTableSlot(TupleTableSlot *slot)
Definition: execTuples.c:1443
const TupleTableSlotOps TTSOpsMinimalTuple
Definition: execTuples.c:86
#define palloc0_object(type)
Definition: fe_memutils.h:75
void systable_endscan(SysScanDesc sysscan)
Definition: genam.c:603
HeapTuple systable_getnext(SysScanDesc sysscan)
Definition: genam.c:514
SysScanDesc systable_beginscan(Relation heapRelation, Oid indexId, bool indexOK, Snapshot snapshot, int nkeys, ScanKey key)
Definition: genam.c:388
void ProcessConfigFile(GucContext context)
Definition: guc-file.l:120
@ PGC_SIGHUP
Definition: guc.h:75
Assert(PointerIsAligned(start, uint64))
#define HeapTupleIsValid(tuple)
Definition: htup.h:78
static void * GETSTRUCT(const HeapTupleData *tuple)
Definition: htup_details.h:728
volatile sig_atomic_t ConfigReloadPending
Definition: interrupt.c:27
LogicalRepWorker * logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid, bool only_running)
Definition: launcher.c:258
LogicalRepWorker * MyLogicalRepWorker
Definition: launcher.c:56
int logicalrep_sync_worker_count(Oid subid)
Definition: launcher.c:927
List * lappend(List *list, void *datum)
Definition: list.c:339
List * lappend_int(List *list, int datum)
Definition: list.c:357
#define NoLock
Definition: lockdefs.h:34
#define AccessShareLock
Definition: lockdefs.h:36
#define RowExclusiveLock
Definition: lockdefs.h:38
char * get_namespace_name(Oid nspid)
Definition: lsyscache.c:3533
bool LWLockAcquire(LWLock *lock, LWLockMode mode)
Definition: lwlock.c:1174
void LWLockRelease(LWLock *lock)
Definition: lwlock.c:1894
@ LW_SHARED
Definition: lwlock.h:113
char * pstrdup(const char *in)
Definition: mcxt.c:1759
void pfree(void *pointer)
Definition: mcxt.c:1594
#define CHECK_FOR_INTERRUPTS()
Definition: miscadmin.h:123
Oid GetUserId(void)
Definition: miscinit.c:469
static MemoryContext MemoryContextSwitchTo(MemoryContext context)
Definition: palloc.h:124
#define ACL_UPDATE
Definition: parsenodes.h:78
static int list_length(const List *l)
Definition: pg_list.h:152
#define NIL
Definition: pg_list.h:68
static void * list_nth(const List *list, int n)
Definition: pg_list.h:299
#define foreach_int(var, lst)
Definition: pg_list.h:470
static XLogRecPtr DatumGetLSN(Datum X)
Definition: pg_lsn.h:25
FormData_pg_sequence * Form_pg_sequence
Definition: pg_sequence.h:40
void UpdateSubscriptionRelState(Oid subid, Oid relid, char state, XLogRecPtr sublsn, bool already_locked)
FormData_pg_subscription_rel * Form_pg_subscription_rel
static char * buf
Definition: pg_test_fsync.c:72
long pgstat_report_stat(bool force)
Definition: pgstat.c:694
void pgstat_report_subscription_error(Oid subid, LogicalRepWorkerType wtype)
static bool DatumGetBool(Datum X)
Definition: postgres.h:100
static int64 DatumGetInt64(Datum X)
Definition: postgres.h:393
static Oid DatumGetObjectId(Datum X)
Definition: postgres.h:252
static Datum ObjectIdGetDatum(Oid X)
Definition: postgres.h:262
uint64_t Datum
Definition: postgres.h:70
static int32 DatumGetInt32(Datum X)
Definition: postgres.h:212
static Datum CharGetDatum(char X)
Definition: postgres.h:132
#define InvalidOid
Definition: postgres_ext.h:37
unsigned int Oid
Definition: postgres_ext.h:32
char * quote_literal_cstr(const char *rawstr)
Definition: quote.c:103
#define RelationGetRelationName(relation)
Definition: rel.h:549
#define RelationGetNamespace(relation)
Definition: rel.h:556
void ScanKeyInit(ScanKey entry, AttrNumber attributeNumber, StrategyNumber strategy, RegProcedure procedure, Datum argument)
Definition: scankey.c:76
#define REMOTE_SEQ_COL_COUNT
Definition: sequencesync.c:73
CopySeqResult
Definition: sequencesync.c:76
@ COPYSEQ_INSUFFICIENT_PERM
Definition: sequencesync.c:79
@ COPYSEQ_MISMATCH
Definition: sequencesync.c:78
@ COPYSEQ_SUCCESS
Definition: sequencesync.c:77
@ COPYSEQ_SKIPPED
Definition: sequencesync.c:80
static CopySeqResult get_and_validate_seq_info(TupleTableSlot *slot, Relation *sequence_rel, LogicalRepSequenceInfo **seqinfo, int *seqidx)
Definition: sequencesync.c:230
static void start_sequence_sync()
Definition: sequencesync.c:714
static List * seqinfos
Definition: sequencesync.c:83
#define MAX_SEQUENCES_SYNC_PER_BATCH
void SequenceSyncWorkerMain(Datum main_arg)
Definition: sequencesync.c:746
static void LogicalRepSyncSequences(void)
Definition: sequencesync.c:600
static void copy_sequences(WalReceiverConn *conn)
Definition: sequencesync.c:375
static void get_sequences_string(List *seqindexes, StringInfo buf)
Definition: sequencesync.c:146
static void report_sequence_errors(List *mismatched_seqs_idx, List *insuffperm_seqs_idx, List *missing_seqs_idx)
Definition: sequencesync.c:172
void ProcessSequencesForSync(void)
Definition: sequencesync.c:94
static CopySeqResult copy_sequence(LogicalRepSequenceInfo *seqinfo, Oid seqowner)
Definition: sequencesync.c:324
#define BTEqualStrategyNumber
Definition: stratnum.h:31
PGconn * conn
Definition: streamutil.c:52
StringInfo makeStringInfo(void)
Definition: stringinfo.c:72
void resetStringInfo(StringInfo str)
Definition: stringinfo.c:126
void appendStringInfo(StringInfo str, const char *fmt,...)
Definition: stringinfo.c:145
void appendStringInfoString(StringInfo str, const char *s)
Definition: stringinfo.c:230
void initStringInfo(StringInfo str)
Definition: stringinfo.c:97
Definition: pg_list.h:54
TimestampTz last_seqsync_start_time
Form_pg_class rd_rel
Definition: rel.h:111
Tuplestorestate * tuplestore
Definition: walreceiver.h:223
TupleDesc tupledesc
Definition: walreceiver.h:224
WalRcvExecStatus status
Definition: walreceiver.h:220
void launch_sync_worker(LogicalRepWorkerType wtype, int nsyncworkers, Oid relid, TimestampTz *last_start_time)
Definition: syncutils.c:117
pg_noreturn void FinishSyncWorker(void)
Definition: syncutils.c:50
void FetchRelationStates(bool *has_pending_subtables, bool *has_pending_subsequences, bool *started_tx)
Definition: syncutils.c:202
void ReleaseSysCache(HeapTuple tuple)
Definition: syscache.c:264
HeapTuple SearchSysCache1(int cacheId, Datum key1)
Definition: syscache.c:220
Relation try_table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:60
void table_close(Relation relation, LOCKMODE lockmode)
Definition: table.c:126
Relation table_open(Oid relationId, LOCKMODE lockmode)
Definition: table.c:40
bool tuplestore_gettupleslot(Tuplestorestate *state, bool forward, bool copy, TupleTableSlot *slot)
Definition: tuplestore.c:1130
static Datum slot_getattr(TupleTableSlot *slot, int attnum, bool *isnull)
Definition: tuptable.h:398
void SwitchToUntrustedUser(Oid userid, UserContext *context)
Definition: usercontext.c:33
void RestoreUserContext(UserContext *context)
Definition: usercontext.c:87
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
Definition: walreceiver.h:435
@ WALRCV_OK_TUPLES
Definition: walreceiver.h:207
static void walrcv_clear_result(WalRcvExecResult *walres)
Definition: walreceiver.h:471
#define walrcv_exec(conn, exec, nRetTypes, retTypes)
Definition: walreceiver.h:465
@ WORKERTYPE_SEQUENCESYNC
static bool am_sequencesync_worker(void)
void StartTransactionCommand(void)
Definition: xact.c:3077
void CommitTransactionCommand(void)
Definition: xact.c:3175
void AbortOutOfAnyTransaction(void)
Definition: xact.c:4880
uint64 GetSystemIdentifier(void)
Definition: xlog.c:4609