@@ -118,10 +118,14 @@ pg_create_physical_replication_slot(PG_FUNCTION_ARGS)
118118 * Helper function for creating a new logical replication slot with
119119 * given arguments. Note that this function doesn't release the created
120120 * slot.
121+ *
122+ * When find_startpoint is false, the slot's confirmed_flush is not set; it's
123+ * caller's responsibility to ensure it's set to something sensible.
121124 */
122125static void
123126create_logical_replication_slot (char * name , char * plugin ,
124- bool temporary , XLogRecPtr restart_lsn )
127+ bool temporary , XLogRecPtr restart_lsn ,
128+ bool find_startpoint )
125129{
126130 LogicalDecodingContext * ctx = NULL ;
127131
@@ -139,16 +143,24 @@ create_logical_replication_slot(char *name, char *plugin,
139143 temporary ? RS_TEMPORARY : RS_EPHEMERAL );
140144
141145 /*
142- * Create logical decoding context, to build the initial snapshot.
146+ * Create logical decoding context to find start point or, if we don't
147+ * need it, to 1) bump slot's restart_lsn and xmin 2) check plugin sanity.
148+ *
149+ * Note: when !find_startpoint this is still important, because it's at
150+ * this point that the output plugin is validated.
143151 */
144152 ctx = CreateInitDecodingContext (plugin , NIL ,
145- false, /* do not build snapshot */
153+ false, /* just catalogs is OK */
146154 restart_lsn ,
147155 logical_read_local_xlog_page , NULL , NULL ,
148156 NULL );
149157
150- /* build initial snapshot, might take a while */
151- DecodingContextFindStartpoint (ctx );
158+ /*
159+ * If caller needs us to determine the decoding start point, do so now.
160+ * This might take a while.
161+ */
162+ if (find_startpoint )
163+ DecodingContextFindStartpoint (ctx );
152164
153165 /* don't need the decoding context anymore */
154166 FreeDecodingContext (ctx );
@@ -179,7 +191,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS)
179191 create_logical_replication_slot (NameStr (* name ),
180192 NameStr (* plugin ),
181193 temporary ,
182- InvalidXLogRecPtr );
194+ InvalidXLogRecPtr ,
195+ true);
183196
184197 values [0 ] = NameGetDatum (& MyReplicationSlot -> data .name );
185198 values [1 ] = LSNGetDatum (MyReplicationSlot -> data .confirmed_flush );
@@ -683,10 +696,18 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
683696
684697 /* Create new slot and acquire it */
685698 if (logical_slot )
699+ {
700+ /*
701+ * We must not try to read WAL, since we haven't reserved it yet --
702+ * hence pass find_startpoint false. confirmed_flush will be set
703+ * below, by copying from the source slot.
704+ */
686705 create_logical_replication_slot (NameStr (* dst_name ),
687706 plugin ,
688707 temporary ,
689- src_restart_lsn );
708+ src_restart_lsn ,
709+ false);
710+ }
690711 else
691712 create_physical_replication_slot (NameStr (* dst_name ),
692713 true,
@@ -703,6 +724,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
703724 TransactionId copy_xmin ;
704725 TransactionId copy_catalog_xmin ;
705726 XLogRecPtr copy_restart_lsn ;
727+ XLogRecPtr copy_confirmed_flush ;
706728 bool copy_islogical ;
707729 char * copy_name ;
708730
@@ -714,6 +736,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
714736 copy_xmin = src -> data .xmin ;
715737 copy_catalog_xmin = src -> data .catalog_xmin ;
716738 copy_restart_lsn = src -> data .restart_lsn ;
739+ copy_confirmed_flush = src -> data .confirmed_flush ;
717740
718741 /* for existence check */
719742 copy_name = pstrdup (NameStr (src -> data .name ));
@@ -738,6 +761,14 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
738761 NameStr (* src_name )),
739762 errdetail ("The source replication slot was modified incompatibly during the copy operation." )));
740763
764+ /* The source slot must have a consistent snapshot */
765+ if (src_islogical && XLogRecPtrIsInvalid (copy_confirmed_flush ))
766+ ereport (ERROR ,
767+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
768+ errmsg ("cannot copy unfinished logical replication slot \"%s\"" ,
769+ NameStr (* src_name )),
770+ errhint ("Retry when the source replication slot's confirmed_flush_lsn is valid." )));
771+
741772 /* Install copied values again */
742773 SpinLockAcquire (& MyReplicationSlot -> mutex );
743774 MyReplicationSlot -> effective_xmin = copy_effective_xmin ;
@@ -746,6 +777,7 @@ copy_replication_slot(FunctionCallInfo fcinfo, bool logical_slot)
746777 MyReplicationSlot -> data .xmin = copy_xmin ;
747778 MyReplicationSlot -> data .catalog_xmin = copy_catalog_xmin ;
748779 MyReplicationSlot -> data .restart_lsn = copy_restart_lsn ;
780+ MyReplicationSlot -> data .confirmed_flush = copy_confirmed_flush ;
749781 SpinLockRelease (& MyReplicationSlot -> mutex );
750782
751783 ReplicationSlotMarkDirty ();
0 commit comments