2424#include "replication/message.h"
2525#include "replication/origin.h"
2626
27+ #include "storage/procarray.h"
28+
2729#include "utils/builtins.h"
2830#include "utils/lsyscache.h"
2931#include "utils/memutils.h"
@@ -283,15 +285,35 @@ pg_filter_prepare(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
283285{
284286 TestDecodingData * data = ctx -> output_plugin_private ;
285287
286- // has_catalog_changes?
287- // LWLockAcquire(TwoPhaseStateLock, LW_SHARED) ;
288+ if (! data -> twophase_decoding )
289+ return true ;
288290
289- // OutputPluginPrepareWrite(ctx, true);
291+ if (txn -> has_catalog_changes )
292+ {
293+ LWLockAcquire (TwoPhaseStateLock , LW_SHARED );
294+
295+ if (TransactionIdIsInProgress (txn -> xid ))
296+ {
297+ /*
298+ * XXX
299+ */
300+ LWLockRelease (TwoPhaseStateLock );
301+ return true;
302+ }
303+ else if (TransactionIdDidAbort (txn -> xid ))
304+ {
305+ /*
306+ * Here we know that it is already aborted and should humble
307+ * ourselves.
308+ */
309+ LWLockRelease (TwoPhaseStateLock );
310+ return true;
311+ }
290312
291- // appendStringInfo(ctx->out, "pg_filter_prepare %s", gid);
313+ LWLockRelease (TwoPhaseStateLock );
314+ }
292315
293- // OutputPluginWrite(ctx, true);
294- return true;
316+ return false;
295317}
296318
297319
@@ -307,7 +329,7 @@ pg_decode_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
307329
308330 OutputPluginPrepareWrite (ctx , true);
309331
310- appendStringInfo (ctx -> out , "PREPARE! '%s'" , txn -> gid );
332+ appendStringInfo (ctx -> out , "PREPARE '%s'" , txn -> gid );
311333
312334 if (data -> include_xids )
313335 appendStringInfo (ctx -> out , " %u" , txn -> xid );
0 commit comments