1212 */
1313#include "postgres.h"
1414
15+ #include "pglogical_output/compat.h"
1516#include "pglogical_config.h"
1617#include "pglogical_output.h"
1718#include "pglogical_proto.h"
1819#include "pglogical_hooks.h"
20+ #include "pglogical_relmetacache.h"
1921
2022#include "access/hash.h"
2123#include "access/sysattr.h"
3335
3436#include "replication/output_plugin.h"
3537#include "replication/logical.h"
38+ #ifdef HAVE_REPLICATION_ORIGINS
3639#include "replication/origin.h"
40+ #endif
3741
3842#include "utils/builtins.h"
3943#include "utils/catcache.h"
4751#include "utils/syscache.h"
4852#include "utils/typcache.h"
4953
54+ PG_MODULE_MAGIC ;
55+
5056extern void _PG_output_plugin_init (OutputPluginCallbacks * cb );
5157
5258/* These must be available to pg_dlsym() */
@@ -61,8 +67,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx,
6167 ReorderBufferTXN * txn , Relation rel ,
6268 ReorderBufferChange * change );
6369
70+ #ifdef HAVE_REPLICATION_ORIGINS
6471static bool pg_decode_origin_filter (LogicalDecodingContext * ctx ,
6572 RepOriginId origin_id );
73+ #endif
6674
6775static void send_startup_message (LogicalDecodingContext * ctx ,
6876 PGLogicalOutputData * data , bool last_message );
@@ -79,7 +87,9 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
7987 cb -> begin_cb = pg_decode_begin_txn ;
8088 cb -> change_cb = pg_decode_change ;
8189 cb -> commit_cb = pg_decode_commit_txn ;
90+ #ifdef HAVE_REPLICATION_ORIGINS
8291 cb -> filter_by_origin_cb = pg_decode_origin_filter ;
92+ #endif
8393 cb -> shutdown_cb = pg_decode_shutdown ;
8494}
8595
@@ -99,42 +109,42 @@ check_binary_compatibility(PGLogicalOutputData *data)
99109 if (data -> client_binary_sizeofdatum != 0
100110 && data -> client_binary_sizeofdatum != sizeof (Datum ))
101111 {
102- elog (DEBUG1 , "Binary mode rejected: Server and client endian sizeof(Datum) mismatch" );
112+ elog (DEBUG1 , "Binary mode rejected: Server and client sizeof(Datum) mismatch" );
103113 return false;
104114 }
105115
106116 if (data -> client_binary_sizeofint != 0
107117 && data -> client_binary_sizeofint != sizeof (int ))
108118 {
109- elog (DEBUG1 , "Binary mode rejected: Server and client endian sizeof(int) mismatch" );
119+ elog (DEBUG1 , "Binary mode rejected: Server and client sizeof(int) mismatch" );
110120 return false;
111121 }
112122
113123 if (data -> client_binary_sizeoflong != 0
114124 && data -> client_binary_sizeoflong != sizeof (long ))
115125 {
116- elog (DEBUG1 , "Binary mode rejected: Server and client endian sizeof(long) mismatch" );
126+ elog (DEBUG1 , "Binary mode rejected: Server and client sizeof(long) mismatch" );
117127 return false;
118128 }
119129
120130 if (data -> client_binary_float4byval_set
121131 && data -> client_binary_float4byval != server_float4_byval ())
122132 {
123- elog (DEBUG1 , "Binary mode rejected: Server and client endian float4byval mismatch" );
133+ elog (DEBUG1 , "Binary mode rejected: Server and client float4byval mismatch" );
124134 return false;
125135 }
126136
127137 if (data -> client_binary_float8byval_set
128138 && data -> client_binary_float8byval != server_float8_byval ())
129139 {
130- elog (DEBUG1 , "Binary mode rejected: Server and client endian float8byval mismatch" );
140+ elog (DEBUG1 , "Binary mode rejected: Server and client float8byval mismatch" );
131141 return false;
132142 }
133143
134144 if (data -> client_binary_intdatetimes_set
135145 && data -> client_binary_intdatetimes != server_integer_datetimes ())
136146 {
137- elog (DEBUG1 , "Binary mode rejected: Server and client endian integer datetimes mismatch" );
147+ elog (DEBUG1 , "Binary mode rejected: Server and client integer datetimes mismatch" );
138148 return false;
139149 }
140150
@@ -148,7 +158,7 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
148158{
149159 PGLogicalOutputData * data = palloc0 (sizeof (PGLogicalOutputData ));
150160
151- data -> context = AllocSetContextCreate (TopMemoryContext ,
161+ data -> context = AllocSetContextCreate (ctx -> context ,
152162 "pglogical conversion context" ,
153163 ALLOCSET_DEFAULT_MINSIZE ,
154164 ALLOCSET_DEFAULT_INITSIZE ,
@@ -202,17 +212,17 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
202212 errmsg ("client sent startup parameters in format %d but we only support format 1" ,
203213 params_format )));
204214
205- if (data -> client_min_proto_version > PG_LOGICAL_PROTO_VERSION_NUM )
215+ if (data -> client_min_proto_version > PGLOGICAL_PROTO_VERSION_NUM )
206216 ereport (ERROR ,
207217 (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
208218 errmsg ("client sent min_proto_version=%d but we only support protocol %d or lower" ,
209- data -> client_min_proto_version , PG_LOGICAL_PROTO_VERSION_NUM )));
219+ data -> client_min_proto_version , PGLOGICAL_PROTO_VERSION_NUM )));
210220
211- if (data -> client_max_proto_version < PG_LOGICAL_PROTO_MIN_VERSION_NUM )
221+ if (data -> client_max_proto_version < PGLOGICAL_PROTO_MIN_VERSION_NUM )
212222 ereport (ERROR ,
213223 (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
214224 errmsg ("client sent max_proto_version=%d but we only support protocol %d or higher" ,
215- data -> client_max_proto_version , PG_LOGICAL_PROTO_MIN_VERSION_NUM )));
225+ data -> client_max_proto_version , PGLOGICAL_PROTO_MIN_VERSION_NUM )));
216226
217227 /*
218228 * Set correct protocol format.
@@ -308,42 +318,17 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
308318 }
309319
310320 /*
311- * Will we forward changesets? We have to if we're on 9.4;
312- * otherwise honour the client's request.
321+ * 9.4 lacks origins info so don't forward it.
322+ *
323+ * There's currently no knob for clients to use to suppress
324+ * this info and it's sent if it's supported and available.
313325 */
314326 if (PG_VERSION_NUM /100 == 904 )
315- {
316- /*
317- * 9.4 unconditionally forwards changesets due to lack of
318- * replication origins, and it can't ever send origin info
319- * for the same reason.
320- */
321- data -> forward_changesets = true;
322327 data -> forward_changeset_origins = false;
323-
324- if (data -> client_forward_changesets_set
325- && !data -> client_forward_changesets )
326- {
327- ereport (DEBUG1 ,
328- (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
329- errmsg ("Cannot disable changeset forwarding on PostgreSQL 9.4" )));
330- }
331- }
332- else if (data -> client_forward_changesets_set
333- && data -> client_forward_changesets )
334- {
335- /* Client explicitly asked for forwarding; forward csets and origins */
336- data -> forward_changesets = true;
337- data -> forward_changeset_origins = true;
338- }
339328 else
340- {
341- /* Default to not forwarding or honour client's request not to fwd */
342- data -> forward_changesets = false;
343- data -> forward_changeset_origins = false;
344- }
329+ data -> forward_changeset_origins = true;
345330
346- if (data -> hooks_setup_funcname != NIL || data -> api -> setup_hooks )
331+ if (data -> hooks_setup_funcname != NIL )
347332 {
348333
349334 data -> hooks_mctxt = AllocSetContextCreate (ctx -> context ,
@@ -355,6 +340,43 @@ pg_decode_startup(LogicalDecodingContext * ctx, OutputPluginOptions *opt,
355340 load_hooks (data );
356341 call_startup_hook (data , ctx -> output_plugin_options );
357342 }
343+
344+ if (data -> client_relmeta_cache_size < -1 )
345+ {
346+ ereport (ERROR ,
347+ (errcode (ERRCODE_INVALID_PARAMETER_VALUE ),
348+ errmsg ("relmeta_cache_size must be -1, 0, or positive" )));
349+ }
350+
351+ /*
352+ * Relation metadata cache configuration.
353+ *
354+ * TODO: support fixed size cache
355+ *
356+ * Need a LRU for eviction, and need to implement a new message type for
357+ * cache purge notifications for clients. In the mean time force it to 0
358+ * (off). The client will be told via a startup param and must respect
359+ * that.
360+ */
361+ if (data -> client_relmeta_cache_size != 0
362+ && data -> client_relmeta_cache_size != -1 )
363+ {
364+ ereport (INFO ,
365+ (errcode (ERRCODE_FEATURE_NOT_SUPPORTED ),
366+ errmsg ("fixed size cache not supported, forced to off" ),
367+ errdetail ("only relmeta_cache_size=0 (off) or relmeta_cache_size=-1 (unlimited) supported" )));
368+
369+ data -> relmeta_cache_size = 0 ;
370+ }
371+ else
372+ {
373+ /* ack client request */
374+ data -> relmeta_cache_size = data -> client_relmeta_cache_size ;
375+ }
376+
377+ /* if cache enabled, init it */
378+ if (data -> relmeta_cache_size != 0 )
379+ pglogical_init_relmetacache (ctx -> context );
358380 }
359381}
360382
@@ -370,12 +392,15 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
370392 if (!startup_message_sent )
371393 send_startup_message (ctx , data , false /* can't be last message */ );
372394
395+ #ifdef HAVE_REPLICATION_ORIGINS
373396 /* If the record didn't originate locally, send origin info */
374397 send_replication_origin &= txn -> origin_id != InvalidRepOriginId ;
398+ #endif
375399
376400 OutputPluginPrepareWrite (ctx , !send_replication_origin );
377401 data -> api -> write_begin (ctx -> out , data , txn );
378402
403+ #ifdef HAVE_REPLICATION_ORIGINS
379404 if (send_replication_origin )
380405 {
381406 char * origin ;
@@ -397,6 +422,7 @@ pg_decode_begin_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn)
397422 replorigin_by_oid (txn -> origin_id , true, & origin ))
398423 data -> api -> write_origin (ctx -> out , origin , txn -> origin_lsn );
399424 }
425+ #endif
400426
401427 OutputPluginWrite (ctx , true);
402428}
@@ -421,6 +447,8 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
421447{
422448 PGLogicalOutputData * data = ctx -> output_plugin_private ;
423449 MemoryContext old ;
450+ struct PGLRelMetaCacheEntry * cached_relmeta = NULL ;
451+
424452
425453 /* First check the table filter */
426454 if (!call_row_filter_hook (data , txn , relation , change ))
@@ -429,11 +457,18 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
429457 /* Avoid leaking memory by using and resetting our own context */
430458 old = MemoryContextSwitchTo (data -> context );
431459
432- /* TODO: add caching (send only if changed) */
433- if (data -> api -> write_rel )
460+ /*
461+ * If the protocol wants to write relation information and the client
462+ * isn't known to have metadata cached for this relation already,
463+ * send relation metadata.
464+ *
465+ * TODO: track hit/miss stats
466+ */
467+ if (data -> api -> write_rel != NULL &&
468+ !pglogical_cache_relmeta (data , relation , & cached_relmeta ))
434469 {
435470 OutputPluginPrepareWrite (ctx , false);
436- data -> api -> write_rel (ctx -> out , data , relation );
471+ data -> api -> write_rel (ctx -> out , data , relation , cached_relmeta );
437472 OutputPluginWrite (ctx , false);
438473 }
439474
@@ -477,28 +512,22 @@ pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
477512 MemoryContextReset (data -> context );
478513}
479514
515+ #ifdef HAVE_REPLICATION_ORIGINS
480516/*
481517 * Decide if the whole transaction with specific origin should be filtered out.
482518 */
483- extern int MtmReplicationNodeId ;
484-
485519static bool
486520pg_decode_origin_filter (LogicalDecodingContext * ctx ,
487521 RepOriginId origin_id )
488522{
489523 PGLogicalOutputData * data = ctx -> output_plugin_private ;
490524
491- if (!call_txn_filter_hook (data , origin_id )) {
492- return true;
493- }
494-
495- if (!data -> forward_changesets && origin_id != InvalidRepOriginId ) {
496- * (int * )0 = 0 ;
525+ if (!call_txn_filter_hook (data , origin_id ))
497526 return true;
498- }
499527
500528 return false;
501529}
530+ #endif
502531
503532static void
504533send_startup_message (LogicalDecodingContext * ctx ,
@@ -532,9 +561,10 @@ static void pg_decode_shutdown(LogicalDecodingContext * ctx)
532561
533562 call_shutdown_hook (data );
534563
535- if (data -> hooks_mctxt != NULL )
536- {
537- MemoryContextDelete (data -> hooks_mctxt );
538- data -> hooks_mctxt = NULL ;
539- }
564+ pglogical_destroy_relmetacache ();
565+
566+ /*
567+ * no need to delete data->context or data->hooks_mctxt as they're children
568+ * of ctx->context which will expire on return.
569+ */
540570}
0 commit comments