HeapTupleHeaderSetXminFrozen(tup->t_data);
HeapTupleHeaderSetCmin(tup->t_data, cid);
- HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
+ HeapTupleHeaderSetXmax(tup->t_data, 0); /* for cleanliness */
tup->t_tableOid = RelationGetRelid(relation);
+#ifdef PGXC
+ tup->t_xc_node_id = PGXCNodeIdentifier;
+#endif
/*
* If the new tuple is too big for storage or contains already toasted
LWLockRelease(TwoPhaseStateLock);
- ereport(ERROR,
- (errcode(ERRCODE_UNDEFINED_OBJECT),
- errmsg("prepared transaction with identifier \"%s\" does not exist",
- gid)));
+#ifdef PGXC
+ /*
+ * In PGXC, if xc_maintenance_mode is on, COMMIT/ROLLBACK PREPARED may be issued to the
+ * node where the given xid does not exist.
+ */
+ if (!xc_maintenance_mode)
+ {
+#endif
+ ereport(ERROR,
+ (errcode(ERRCODE_UNDEFINED_OBJECT),
+ errmsg("prepared transaction with identifier \"%s\" does not exist",
+ gid)));
+#ifdef PGXC
+ }
+#endif
/* NOTREACHED */
return NULL;
int maxChildXids; /* allocated size of childXids[] */
Oid prevUser; /* previous CurrentUserId setting */
int prevSecContext; /* previous SecurityRestrictionContext */
- bool prevXactReadOnly; /* entry-time xact r/o state */
- bool startedInRecovery; /* did we start in recovery? */
+ bool prevXactReadOnly; /* entry-time xact r/o state */
+ bool startedInRecovery; /* did we start in recovery? */
bool didLogXid; /* has xid been included in WAL record? */
- int parallelModeLevel; /* Enter/ExitParallelMode counter */
- struct TransactionStateData *parent; /* back link to parent */
+ int parallelModeLevel; /* Enter/ExitParallelMode counter */
+ struct TransactionStateData *parent; /* back link to parent */
+#ifdef XCP
+ int waitedForXidsCount; /* count of xids we waited to finish */
+ TransactionId *waitedForXids; /* xids we waited to finish */
+#endif
} TransactionStateData;
typedef TransactionStateData *TransactionState;
Int32GetDatum(-1)));
ereport(DEBUG2,
(errmsg_internal("recovery_target_time = '%s'",
- timestamptz_to_str(recoveryTargetTime))));
+ timestamptz_to_str(recoveryTargetTime))));
}
+#ifdef PGXC
+ else if (strcmp(item->name, "recovery_target_barrier") == 0)
+ {
+ recoveryTarget = RECOVERY_TARGET_BARRIER;
+ recoveryTargetBarrierId = pstrdup(item->value);
+ }
+#endif
else if (strcmp(item->name, "recovery_target_name") == 0)
{
recoveryTarget = RECOVERY_TARGET_NAME;
recoveryStopTime = 0;
recoveryStopName[0] = '\0';
ereport(LOG,
- (errmsg("recovery stopping before WAL location (LSN) \"%X/%X\"",
- (uint32) (recoveryStopLSN >> 32),
- (uint32) recoveryStopLSN)));
+ (errmsg("recovery stopping before WAL location (LSN) \"%X/%X\"",
+ (uint32) (recoveryStopLSN >> 32),
+ (uint32) recoveryStopLSN)));
return true;
}
-
+#ifdef PGXC
+ /* Otherwise we only consider stopping before COMMIT, ABORT or BARRIER records. */
+ if ((XLogRecGetRmid(record) != RM_XACT_ID) && (XLogRecGetRmid(record) != RM_BARRIER_ID))
+#else
/* Otherwise we only consider stopping before COMMIT or ABORT records. */
if (XLogRecGetRmid(record) != RM_XACT_ID)
+#endif
return false;
xact_info = XLogRecGetInfo(record) & XLOG_XACT_OPMASK;
#include "utils/relmapper.h"
#include "utils/tqual.h"
- uint32 bootstrap_data_checksum_version = 0; /* No checksum */
+#ifdef PGXC
+#include "nodes/nodes.h"
+#include "pgxc/poolmgr.h"
+#include "postmaster/clustermon.h"
+#endif
+
+ uint32 bootstrap_data_checksum_version = 0; /* No checksum */
#define ALLOC(t, c) \
if (rd_rel->relkind != RELKIND_INDEX)
visibilitymap_count(rel, &relallvisible, NULL);
- else /* don't bother for indexes */
+ else /* don't bother for indexes */
relallvisible = 0;
- if (rd_rel->relpages != (int32) relpages)
- {
- rd_rel->relpages = (int32) relpages;
- dirty = true;
- }
- if (rd_rel->reltuples != (float4) reltuples)
- {
- rd_rel->reltuples = (float4) reltuples;
- dirty = true;
- }
- if (rd_rel->relallvisible != (int32) relallvisible)
- {
- rd_rel->relallvisible = (int32) relallvisible;
- dirty = true;
- }
+#ifdef XCP
+ /*
+ * Coordinator stats are populated using data from remote
+ * datanodes. Hence we must not use local information to set those
+ * stats
+ */
+ if (!IS_PGXC_COORDINATOR || !rel->rd_locator_info)
+#endif
+ if (rd_rel->relpages != (int32) relpages)
+ {
+ rd_rel->relpages = (int32) relpages;
+ dirty = true;
+ }
+#ifdef XCP
+ if (!IS_PGXC_COORDINATOR || !rel->rd_locator_info)
+#endif
+ if (rd_rel->reltuples != (float4) reltuples)
+ {
+ rd_rel->reltuples = (float4) reltuples;
+ dirty = true;
+ }
+#ifdef XCP
+ if (!IS_PGXC_COORDINATOR || !rel->rd_locator_info)
+#endif
+ if (rd_rel->relallvisible != (int32) relallvisible)
+ {
+ rd_rel->relallvisible = (int32) relallvisible;
+ dirty = true;
+ }
}
/*
RawStmt *parsetree = lfirst_node(RawStmt, lc);
List *querytree_sublist;
+#ifdef PGXC
+ /* Block CTAS in SQL functions */
+ if (IsA(parsetree, CreateTableAsStmt))
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ errmsg("In XC, SQL functions cannot contain utility statements")));
+#endif
+
querytree_sublist = pg_analyze_and_rewrite_params(parsetree,
prosrc,
- (ParserSetupHook) sql_fn_parser_setup,
+ (ParserSetupHook) sql_fn_parser_setup,
pinfo,
NULL);
querytree_list = list_concat(querytree_list,
/* See Multibyte encoding comment above */
cstate->encoding_embeds_ascii = PG_ENCODING_IS_CLIENT_ONLY(cstate->file_encoding);
- cstate->copy_dest = COPY_FILE; /* default */
+ cstate->copy_dest = COPY_FILE; /* default */
+#ifdef PGXC
+ /*
+ * We are here just at copy begin process,
+ * so only pick up the list of connections.
+ */
+ if (IS_PGXC_COORDINATOR)
+ {
+ RemoteCopyData *remoteCopyState = cstate->remoteCopyState;
+
+ /*
+ * In the case of CopyOut, it is just necessary to pick up one node randomly.
+ * This is done when rel_loc is found.
+ */
+ if (remoteCopyState && remoteCopyState->rel_loc)
+ {
+ DataNodeCopyBegin(remoteCopyState);
+ if (!remoteCopyState->locator)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_EXCEPTION),
+ errmsg("Failed to initialize Datanodes for COPY")));
+ }
+ }
+#endif
+
MemoryContextSwitchTo(oldcontext);
return cstate;
errmsg("encoding \"%s\" does not match locale \"%s\"",
pg_encoding_to_char(encoding),
collate),
- errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
- pg_encoding_to_char(collate_encoding))));
+ errdetail("The chosen LC_COLLATE setting requires encoding \"%s\".",
+ pg_encoding_to_char(collate_encoding))));
}
+#ifdef PGXC
+/*
+ * Error cleanup callback for createdb. Aftec createdb() succeeds, the
+ * transaction can still be aborted due to other nodes. So on abort-transaction,
+ * this function is called to do the cleanup. This involves removing directories
+ * created after successful completion.
+ * Nothing to be done on commit.
+ */
+static void
+createdb_xact_callback(bool isCommit, void *arg)
+{
+ if (isCommit)
+ return;
+
+ /* Throw away any successfully copied subdirectories */
+ remove_dbtablespaces(*(Oid *) arg);
+}
+#endif
+
/* Error cleanup callback for createdb */
static void
createdb_failure_callback(int code, Datum arg)
if (list_length(stmt->options) != 1)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
- errmsg("option \"%s\" cannot be specified with other options",
- dtablespace->defname),
+ errmsg("option \"%s\" cannot be specified with other options",
+ dtablespace->defname),
parser_errposition(pstate, dtablespace->location)));
/* this case isn't allowed within a transaction block */
- PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
+#ifdef PGXC
+ /* ... but we allow it on remote nodes */
+ if (IS_PGXC_LOCAL_COORDINATOR)
+#endif
+ PreventTransactionChain(isTopLevel, "ALTER DATABASE SET TABLESPACE");
+
movedb(stmt->dbname, defGetString(dtablespace));
return InvalidOid;
}
* a pass determined by subcommand type.
*/
- #define AT_PASS_UNSET -1 /* UNSET will cause ERROR */
- #define AT_PASS_DROP 0 /* DROP (all flavors) */
- #define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */
- #define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */
- #define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */
- #define AT_PASS_COL_ATTRS 4 /* set other column attributes */
+ #define AT_PASS_UNSET -1 /* UNSET will cause ERROR */
+ #define AT_PASS_DROP 0 /* DROP (all flavors) */
+ #define AT_PASS_ALTER_TYPE 1 /* ALTER COLUMN TYPE */
+ #define AT_PASS_OLD_INDEX 2 /* re-add existing indexes */
+ #define AT_PASS_OLD_CONSTR 3 /* re-add existing constraints */
+ #define AT_PASS_COL_ATTRS 4 /* set other column attributes */
/* We could support a RENAME COLUMN pass here, but not currently used */
- #define AT_PASS_ADD_COL 5 /* ADD COLUMN */
- #define AT_PASS_ADD_INDEX 6 /* ADD indexes */
- #define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */
- #define AT_PASS_MISC 8 /* other stuff */
+ #define AT_PASS_ADD_COL 5 /* ADD COLUMN */
+ #define AT_PASS_ADD_INDEX 6 /* ADD indexes */
+ #define AT_PASS_ADD_CONSTR 7 /* ADD constraints, defaults */
+ #define AT_PASS_MISC 8 /* other stuff */
+#ifdef PGXC
- #define AT_PASS_DISTRIB 9 /* Redistribution pass */
++#define AT_PASS_DISTRIB 9 /* Redistribution pass */
+#define AT_NUM_PASSES 10
+#else
#define AT_NUM_PASSES 9
+#endif
typedef struct AlteredTableInfo
{
* reference the whole path here, but mkdir() uses the first two parts.
*/
if (strlen(location) + 1 + strlen(TABLESPACE_VERSION_DIRECTORY) + 1 +
- OIDCHARS + 1 + OIDCHARS + 1 + FORKNAMECHARS + 1 + OIDCHARS > MAXPGPATH)
+#ifdef PGXC
+ /*
+ * In Postgres-XC, node name is added in the tablespace folder name to
+ * insure unique names for nodes sharing the same server.
+ * So real format is PG_XXX_<nodename>/<dboid>/<relid>.<nnn>''
+ */
+ strlen(PGXCNodeName) + 1 +
+#endif
+ OIDCHARS + 1 + OIDCHARS + 1 + FORKNAMECHARS + 1 + OIDCHARS > MAXPGPATH)
ereport(ERROR,
(errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
errmsg("tablespace location \"%s\" is too long",
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
/* translator: %s is a SQL statement name */
- errmsg("%s is not allowed in a non-volatile function",
- CreateCommandTag((Node *) stmt))));
+ errmsg("%s is not allowed in a non-volatile function",
+ CreateCommandTag((Node *) stmt))));
+#ifdef PGXC
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ {
+ if (queryTree->commandType != CMD_UTILITY)
+ {
+ /*
+ * The parameterised queries in RemoteQuery nodes will be prepared
+ * on the Datanode, and need parameter types for the same. Set the
+ * parameter types and their number in all RemoteQuery nodes in the
+ * plan
+ */
+ SetRemoteStatementName(((PlannedStmt *)stmt)->planTree, NULL,
+ fcache->pinfo->nargs,
+ fcache->pinfo->argtypes, 0);
+ }
+ }
+#endif /* PGXC */
+
if (IsInParallelMode() && !CommandIsReadOnly(stmt))
PreventCommandIfParallelMode(CreateCommandTag((Node *) stmt));
RawStmt *parsetree = lfirst_node(RawStmt, list_item);
CachedPlanSource *plansource;
+
plansource = CreateOneShotCachedPlan(parsetree,
src,
- CreateCommandTag(parsetree->stmt));
+ CreateCommandTag(parsetree->stmt));
plancache_list = lappend(plancache_list, plansource);
}
Oid loOid = PG_GETARG_OID(0);
bytea *str = PG_GETARG_BYTEA_PP(1);
LargeObjectDesc *loDesc;
- int written PG_USED_FOR_ASSERTS_ONLY;
+ int written PG_USED_FOR_ASSERTS_ONLY;
+#ifdef PGXC
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("Postgres-XL does not yet support large objects"),
+ errdetail("The feature is not currently supported")));
+#endif
+
CreateFSContext();
loOid = inv_create(loOid);
int64 offset = PG_GETARG_INT64(1);
bytea *str = PG_GETARG_BYTEA_PP(2);
LargeObjectDesc *loDesc;
- int written PG_USED_FOR_ASSERTS_ONLY;
+ int written PG_USED_FOR_ASSERTS_ONLY;
+#ifdef PGXC
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("Postgres-XL does not yet support large objects"),
+ errdetail("The feature is not currently supported")));
+#endif
+
CreateFSContext();
loDesc = inv_open(loOid, INV_WRITE, fscxt);
printf(_(" -r FILENAME send stdout and stderr to given file\n"));
printf(_(" -x NUM internal use\n"));
+#ifdef PGXC
+ printf(_("\nNode options:\n"));
+ printf(_(" --coordinator start as a Coordinator\n"));
+ printf(_(" --datanode start as a Datanode\n"));
+ printf(_(" --restoremode start to restore existing schema on the new node to be added\n"));
+#endif
+
printf(_("\nPlease read the documentation for the complete list of run-time\n"
- "configuration settings and how to set them on the command line or in\n"
+ "configuration settings and how to set them on the command line or in\n"
"the configuration file.\n\n"
"Report bugs to <pgsql-bugs@postgresql.org>.\n"));
}
token = pg_strtok(&length); /* skip :constvalue */
if (local_node->constisnull)
- token = pg_strtok(&length); /* skip "<>" */
+ token = pg_strtok(&length); /* skip "<>" */
else
+#ifdef XCP
+ if (portable_input)
+ local_node->constvalue = scanDatum(local_node->consttype,
+ local_node->consttypmod);
+ else
+#endif
local_node->constvalue = readDatum(local_node->constbyval);
READ_DONE();
pathkeys = convert_subquery_pathkeys(root,
rel,
subpath->pathkeys,
- make_tlist_from_pathtarget(subpath->pathtarget));
+ make_tlist_from_pathtarget(subpath->pathtarget));
+ if (subroot->distribution && subroot->distribution->distributionExpr)
+ {
+ ListCell *lc;
+
+ /* FIXME Could we use pathtarget directly? */
+ List *targetlist = make_tlist_from_pathtarget(subpath->pathtarget);
+
+ /*
+ * The distribution expression from the subplan's tlist, but it should
+ * be from the rel, need conversion.
+ */
+ distribution = makeNode(Distribution);
+ distribution->distributionType = subroot->distribution->distributionType;
+ distribution->nodes = bms_copy(subroot->distribution->nodes);
+ distribution->restrictNodes = bms_copy(subroot->distribution->restrictNodes);
+
+ foreach(lc, targetlist)
+ {
+ TargetEntry *tle = (TargetEntry *) lfirst(lc);
+ if (equal(tle->expr, subroot->distribution->distributionExpr))
+ {
+ distribution->distributionExpr = (Node *)
+ makeVarFromTargetEntry(rel->relid, tle);
+ break;
+ }
+ }
+ }
+ else
+ distribution = subroot->distribution;
+
/* Generate outer path using this subpath */
add_path(rel, (Path *)
create_subqueryscan_path(root, rel, subpath,
NULL,
true);
tlist = lappend(tlist, tle);
- lefttree->targetlist = tlist; /* just in case NIL before */
+ lefttree->targetlist = tlist; /* just in case NIL before */
+#ifdef XCP
+ /*
+ * RemoteSubplan is conditionally projection capable - it is
+ * pushing projection to the data nodes
+ */
+ if (IsA(lefttree, RemoteSubplan))
+ lefttree->lefttree->targetlist = tlist;
+#endif
}
/*
pathnode->path.parallel_aware = false;
pathnode->path.parallel_safe = rel->consider_parallel;
pathnode->path.parallel_workers = parallel_workers;
- pathnode->path.pathkeys = NIL; /* result is always considered
- * unsorted */
+ pathnode->path.pathkeys = NIL; /* result is always considered unsorted */
+#ifdef XCP
+ /*
+ * Append path is used to implement scans of inherited tables and some
+ * "set" operations, like UNION ALL. While all inherited tables should
+ * have the same distribution, UNION'ed queries may have different.
+ * When paths being appended have the same distribution it is OK to push
+ * Append down to the data nodes. If not, perform "coordinator" Append.
+ */
+
+ /* Special case of the dummy relation, if the subpaths list is empty */
+ if (subpaths)
+ {
+ /* Take distribution of the first node */
+ l = list_head(subpaths);
+ subpath = (Path *) lfirst(l);
+ distribution = copyObject(subpath->distribution);
+ /*
+ * Check remaining subpaths, if all distributions equal to the first set
+ * it as a distribution of the Append path; otherwise make up coordinator
+ * Append
+ */
+ while ((l = lnext(l)))
+ {
+ subpath = (Path *) lfirst(l);
+
+ /*
+ * For Append and MergeAppend paths, we are most often dealing with
+ * different relations, appended together. So its very likely that
+ * the distribution for each relation will have a different varno.
+ * But we should be able to push down Append and MergeAppend as
+ * long as rest of the distribution information matches.
+ *
+ * equalDistribution() compares everything except the varnos
+ */
+ if (equalDistribution(distribution, subpath->distribution))
+ {
+ /*
+ * Both distribution and subpath->distribution may be NULL at
+ * this point, or they both are not null.
+ */
+ if (distribution && subpath->distribution->restrictNodes)
+ distribution->restrictNodes = bms_union(
+ distribution->restrictNodes,
+ subpath->distribution->restrictNodes);
+ }
+ else
+ {
+ break;
+ }
+ }
+ if (l)
+ {
+ List *newsubpaths = NIL;
+ foreach(l, subpaths)
+ {
+ subpath = (Path *) lfirst(l);
+ if (subpath->distribution)
+ subpath = redistribute_path(NULL, subpath, NIL,
+ LOCATOR_TYPE_NONE, NULL,
+ NULL, NULL);
+ newsubpaths = lappend(newsubpaths, subpath);
+ }
+ subpaths = newsubpaths;
+ pathnode->path.distribution = NULL;
+ }
+ else
+ pathnode->path.distribution = distribution;
+ }
+#endif
-
pathnode->partitioned_rels = list_copy(partitioned_rels);
pathnode->subpaths = subpaths;
pathnode->path.parallel_aware = false;
pathnode->path.parallel_safe = false;
pathnode->path.parallel_workers = 0;
- pathnode->path.pathkeys = NIL; /* Gather has unordered result */
+ pathnode->path.pathkeys = NIL; /* Gather has unordered result */
+ /* distribution is the same as in the subpath */
+ pathnode->path.distribution = (Distribution *) copyObject(subpath->distribution);
+
pathnode->subpath = subpath;
pathnode->num_workers = subpath->parallel_workers;
pathnode->single_copy = false;
(ExplainStmt *) parseTree);
break;
+#ifdef PGXC
+ case T_ExecDirectStmt:
+ result = transformExecDirectStmt(pstate,
+ (ExecDirectStmt *) parseTree);
+ break;
+#endif
+
case T_CreateTableAsStmt:
result = transformCreateTableAsStmt(pstate,
- (CreateTableAsStmt *) parseTree);
+ (CreateTableAsStmt *) parseTree);
break;
default:
root->parse = qry;
root->planner_cxt = CurrentMemoryContext;
root->hasJoinRTEs = true;
+ root->recursiveOk = true;
groupClauses = (List *) flatten_join_alias_vars(root,
- (Node *) groupClauses);
+ (Node *) groupClauses);
}
/*
rte->inh = inh;
rte->inFromCl = inFromCl;
+#ifdef XCP
+ /*
+ * Ugly workaround against permission check error when non-privileged
+ * user executes ANALYZE command.
+ * To update local statistics coordinator queries pg_statistic tables on
+ * datanodes, but these are not selectable by PUBLIC. It would be better
+ * to define view, but pg_statistic contains fields of anyarray pseudotype
+ * which is not allowed in view.
+ * So we just disable check for SELECT permission if query referring the
+ * pg_statistic table is parsed on datanodes. That might be a security hole,
+ * but fortunately any user query against pg_statistic would be parsed on
+ * coordinator, and permission check would take place; the only way to
+ * have arbitrary query parsed on datanode is EXECUTE DIRECT, it is only
+ * available for superuser.
+ */
+ if (IS_PGXC_DATANODE && rte->relid == StatisticRelationId)
+ rte->requiredPerms = 0;
+ else
+#endif
rte->requiredPerms = ACL_SELECT;
- rte->checkAsUser = InvalidOid; /* not set-uid by default, either */
+ rte->checkAsUser = InvalidOid; /* not set-uid by default, either */
rte->selectedCols = NULL;
rte->insertedCols = NULL;
rte->updatedCols = NULL;
List *alist; /* "after list" of things to do after creating
* the table */
IndexStmt *pkey; /* PRIMARY KEY index, if any */
+#ifdef PGXC
+ FallbackSrc fallback_source;
+ List *fallback_dist_cols;
+ DistributeBy *distributeby; /* original distribute by column of CREATE TABLE */
+ PGXCSubCluster *subcluster; /* original subcluster option of CREATE TABLE */
+#endif
bool ispartitioned; /* true if table is partitioned */
- PartitionBoundSpec *partbound; /* transformed FOR VALUES */
+ PartitionBoundSpec *partbound; /* transformed FOR VALUES */
} CreateStmtContext;
/* State shared by transformCreateSchemaStmt and its subroutines */
List *constraintList);
static void transformColumnType(CreateStmtContext *cxt, ColumnDef *column);
static void setSchemaName(char *context_schema, char **stmt_schema_name);
+#ifdef PGXC
+static void checkLocalFKConstraints(CreateStmtContext *cxt);
+#endif
+#ifdef XCP
+static List *transformSubclusterNodes(PGXCSubCluster *subcluster);
+static PGXCSubCluster *makeSubCluster(List *nodelist);
+#endif
static void transformPartitionCmd(CreateStmtContext *cxt, PartitionCmd *cmd);
static Const *transformPartitionBoundValue(ParseState *pstate, A_Const *con,
- const char *colName, Oid colType, int32 colTypmod);
+ const char *colName, Oid colType, int32 colTypmod);
/*
static void ShmemBackendArrayAdd(Backend *bn);
static void ShmemBackendArrayRemove(Backend *bn);
- #endif /* EXEC_BACKEND */
+ #endif /* EXEC_BACKEND */
+#ifdef XCP
+char *parentPGXCNode = NULL;
+int parentPGXCNodeId = -1;
+int parentPGXCPid = -1;
+char parentPGXCNodeType = PGXC_NODE_DATANODE;
+#endif
+
+#ifdef PGXC
+bool isPGXCCoordinator = false;
+bool isPGXCDataNode = false;
+
+/*
+ * While adding a new node to the cluster we need to restore the schema of
+ * an existing database to the new node.
+ * If the new node is a datanode and we connect directly to it,
+ * it does not allow DDL, because it is in read only mode &
+ * If the new node is a coordinator it will send DDLs to all the other
+ * coordinators which we do not want it to do
+ * To provide ability to restore on the new node a new command line
+ * argument is provided called --restoremode
+ * It is to be provided in place of --coordinator OR --datanode.
+ * In restore mode both coordinator and datanode are internally
+ * treated as a datanode.
+ */
+bool isRestoreMode = false;
+
+int remoteConnType = REMOTE_CONN_APP;
+
+/* key pair to be used as object id while using advisory lock for backup */
+Datum xc_lockForBackupKey1;
+Datum xc_lockForBackupKey2;
+
+#define StartPoolManager() StartChildProcess(PoolerProcess)
+#define StartClusterMonitor() StartChildProcess(ClusterMonitorProcess)
+#endif
+
#define StartupDataBase() StartChildProcess(StartupProcess)
#define StartBackgroundWriter() StartChildProcess(BgWriterProcess)
#define StartCheckpointer() StartChildProcess(CheckpointerProcess)
#define xc_by_known_assigned_inc() ((void) 0)
#define xc_no_overflow_inc() ((void) 0)
#define xc_slow_answer_inc() ((void) 0)
- #endif /* XIDCACHE_DEBUG */
+ #endif /* XIDCACHE_DEBUG */
+#ifdef PGXC /* PGXC_DATANODE */
+
+static bool GetPGXCSnapshotData(Snapshot snapshot, bool latest);
+
+typedef struct
+{
+ /* Global snapshot data */
+ SnapshotSource snapshot_source;
+ TransactionId gxmin;
+ TransactionId gxmax;
+ int gxcnt;
+ int max_gxcnt;
+ TransactionId *gxip;
+} GlobalSnapshotData;
+
+GlobalSnapshotData globalSnapshot = {
+ SNAPSHOT_UNDEFINED,
+ InvalidTransactionId,
+ InvalidTransactionId,
+ 0,
+ 0,
+ NULL
+};
+
+static void GetSnapshotFromGlobalSnapshot(Snapshot snapshot);
+static void GetSnapshotDataFromGTM(Snapshot snapshot);
+#endif
+
/* Primitives for KnownAssignedXids array handling for standby */
static void KnownAssignedXidsCompress(bool force);
static void KnownAssignedXidsAdd(TransactionId from_xid, TransactionId to_xid,
xc_no_overflow,
xc_slow_answer);
}
- #endif /* XIDCACHE_DEBUG */
+ #endif /* XIDCACHE_DEBUG */
+#ifdef PGXC
+/*
+ * Store snapshot data received from the Coordinator
+ */
+void
+SetGlobalSnapshotData(TransactionId xmin, TransactionId xmax,
+ int xcnt, TransactionId *xip, SnapshotSource source)
+{
+ if (globalSnapshot.max_gxcnt < xcnt)
+ {
+ globalSnapshot.gxip = (TransactionId *) realloc(globalSnapshot.gxip,
+ sizeof (TransactionId) * xcnt);
+ if (globalSnapshot.gxip == NULL)
+ elog(ERROR, "Out of memory");
+ globalSnapshot.max_gxcnt = xcnt;
+ }
+
+ globalSnapshot.snapshot_source = source;
+ globalSnapshot.gxmin = xmin;
+ globalSnapshot.gxmax = xmax;
+ globalSnapshot.gxcnt = xcnt;
+ memcpy(globalSnapshot.gxip, xip, sizeof (TransactionId) * xcnt);
+ elog (DEBUG1, "global snapshot info: gxmin: %d, gxmax: %d, gxcnt: %d", xmin, xmax, xcnt);
+}
+
+/*
+ * Force Datanode to use local snapshot data
+ */
+void
+UnsetGlobalSnapshotData(void)
+{
+ globalSnapshot.snapshot_source = SNAPSHOT_UNDEFINED;
+ globalSnapshot.gxmin = InvalidTransactionId;
+ globalSnapshot.gxmax = InvalidTransactionId;
+ globalSnapshot.gxcnt = 0;
+ elog (DEBUG1, "unset snapshot info");
+}
+
+
+/*
+ * Entry of snapshot obtention for Postgres-XC node
+ */
+static bool
+GetPGXCSnapshotData(Snapshot snapshot, bool latest)
+{
+ /*
+ * If this node is in recovery phase,
+ * snapshot has to be taken directly from WAL information.
+ */
+ if (RecoveryInProgress())
+ return false;
+
+ /*
+ * The typical case is that the local Coordinator passes down the snapshot to the
+ * remote nodes to use, while it itself obtains it from GTM. Autovacuum processes
+ * need however to connect directly to GTM themselves to obtain XID and snapshot
+ * information for autovacuum worker threads.
+ * A vacuum analyze uses a special function to get a transaction ID and signal
+ * GTM not to include this transaction ID in snapshot.
+ * A vacuum worker starts as a normal transaction would.
+ */
+
+ if ((IsConnFromCoord() || IsConnFromDatanode())
+ && !IsInitProcessingMode() && !GetForceXidFromGTM() &&
+ !latest)
+ {
+ if (globalSnapshot.snapshot_source == SNAPSHOT_COORDINATOR)
+ GetSnapshotFromGlobalSnapshot(snapshot);
+ else
+ GetSnapshotDataFromGTM(snapshot);
+ return true;
+ }
+ else if (IsPostmasterEnvironment)
+ {
+ GetSnapshotDataFromGTM(snapshot);
+ return true;
+ }
+
+ return false;
+}
+
+static void
+GetSnapshotDataFromGTM(Snapshot snapshot)
+{
+ GTM_Snapshot gtm_snapshot;
+ GlobalTransactionId reporting_xmin;
+ bool canbe_grouped = (!FirstSnapshotSet) || (!IsolationUsesXactSnapshot());
+ bool xmin_changed = false;
+
+ /*
+ * We never want to use a snapshot whose xmin is older than the
+ * RecentGlobalXmin computed by the GTM. While it does not look likely that
+ * that this will ever happen because both these computations happen on the
+ * GTM, we are still worried about a race condition where a backend sends a
+ * snapshot request, and before snapshot is received, the cluster monitor
+ * reports our Xmin (which obviously does not include this snapshot's
+ * xmin). Now if GTM processes the snapshot request first, computes
+ * snapshot's xmin and then receives our Xmin-report, it may actually moves
+ * RecentGlobalXmin beyond snapshot's xmin assuming some transactions
+ * finished in between.
+ *
+ * We try to introduce some interlock between the Xmin reporting and
+ * snapshot request. Since we don't want to wait on a lock while Xmin is
+ * being reported by the cluster monitor process, we just make sure that
+ * the snapshot's xmin is not older than the Xmin we are currently
+ * reporting. Given that this is a very rare possibility, we just get a
+ * fresh snapshot from the GTM.
+ *
+ */
+
+ LWLockAcquire(ClusterMonitorLock, LW_SHARED);
+
+retry:
+ reporting_xmin = ClusterMonitorGetReportingGlobalXmin();
+
+ xmin_changed = false;
+ if (TransactionIdIsValid(reporting_xmin) &&
+ !TransactionIdIsValid(MyPgXact->xmin))
+ {
+ MyPgXact->xmin = reporting_xmin;
+ xmin_changed = true;
+ }
+
+ gtm_snapshot = GetSnapshotGTM(GetCurrentTransactionIdIfAny(), canbe_grouped);
+
+ if (!gtm_snapshot)
+ ereport(ERROR,
+ (errcode(ERRCODE_CONNECTION_FAILURE),
+ errmsg("GTM error, could not obtain snapshot. Current XID = %d, Autovac = %d", GetCurrentTransactionId(), IsAutoVacuumWorkerProcess())));
+ else
+ {
+ if (xmin_changed)
+ MyPgXact->xmin = InvalidTransactionId;
+ if (TransactionIdPrecedes(gtm_snapshot->sn_xmin, reporting_xmin))
+ goto retry;
+
+ /*
+ * Set RecentGlobalXmin by copying from the shared memory state
+ * maintained by the Clutser Monitor
+ */
+ RecentGlobalXmin = ClusterMonitorGetGlobalXmin();
+ if (!TransactionIdIsValid(RecentGlobalXmin))
+ RecentGlobalXmin = FirstNormalTransactionId;
+ /*
+ * XXX Is it ok to set RecentGlobalDataXmin same as RecentGlobalXmin ?
+ */
+ RecentGlobalDataXmin = RecentGlobalXmin;
+ SetGlobalSnapshotData(gtm_snapshot->sn_xmin, gtm_snapshot->sn_xmax,
+ gtm_snapshot->sn_xcnt, gtm_snapshot->sn_xip, SNAPSHOT_DIRECT);
+ GetSnapshotFromGlobalSnapshot(snapshot);
+ }
+ LWLockRelease(ClusterMonitorLock);
+}
+
+static void
+GetSnapshotFromGlobalSnapshot(Snapshot snapshot)
+{
+ if ((globalSnapshot.snapshot_source == SNAPSHOT_COORDINATOR ||
+ globalSnapshot.snapshot_source == SNAPSHOT_DIRECT)
+ && TransactionIdIsValid(globalSnapshot.gxmin))
+ {
+ TransactionId global_xmin;
+
+ snapshot->xmin = globalSnapshot.gxmin;
+ snapshot->xmax = globalSnapshot.gxmax;
+ snapshot->xcnt = globalSnapshot.gxcnt;
+ /*
+ * Allocating space for maxProcs xids is usually overkill; numProcs would
+ * be sufficient. But it seems better to do the malloc while not holding
+ * the lock, so we can't look at numProcs. Likewise, we allocate much
+ * more subxip storage than is probably needed.
+ *
+ * This does open a possibility for avoiding repeated malloc/free: since
+ * maxProcs does not change at runtime, we can simply reuse the previous
+ * xip arrays if any. (This relies on the fact that all callers pass
+ * static SnapshotData structs.) */
+ if (snapshot->xip == NULL)
+ {
+ ProcArrayStruct *arrayP = procArray;
+ /*
+ * First call for this snapshot
+ */
+ snapshot->xip = (TransactionId *)
+ malloc(Max(arrayP->maxProcs, globalSnapshot.gxcnt) * sizeof(TransactionId));
+ if (snapshot->xip == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ snapshot->max_xcnt = Max(arrayP->maxProcs, globalSnapshot.gxcnt);
+
+ Assert(snapshot->subxip == NULL);
+ snapshot->subxip = (TransactionId *)
+ malloc(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+ if (snapshot->subxip == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ }
+ else if (snapshot->max_xcnt < globalSnapshot.gxcnt)
+ {
+ snapshot->xip = (TransactionId *)
+ realloc(snapshot->xip, globalSnapshot.gxcnt * sizeof(TransactionId));
+ if (snapshot->xip == NULL)
+ ereport(ERROR,
+ (errcode(ERRCODE_OUT_OF_MEMORY),
+ errmsg("out of memory")));
+ snapshot->max_xcnt = globalSnapshot.gxcnt;
+ }
+
+ /* PGXCTODO - set this until we handle subtransactions. */
+ snapshot->subxcnt = 0;
+
+ /*
+ * This is a new snapshot, so set both refcounts are zero, and mark it
+ * as not copied in persistent memory.
+ */
+ snapshot->active_count = 0;
+ snapshot->regd_count = 0;
+ snapshot->copied = false;
+
+ /*
+ * Start of handling for local ANALYZE
+ * Make adjustments for any running auto ANALYZE commands
+ */
+ LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+ /*
+ * Once we have a SHARED lock on the ProcArrayLock, fetch the
+ * GlobalXmin and ensure that the snapshot we are dealing with isn't
+ * too old. Since such a snapshot may need to see rows that have
+ * already been removed by the server
+ *
+ * These scenarios are not very likely to happen because the
+ * ClusterMonitor will ensure that GlobalXmins are reported to GTM in
+ * time and the GlobalXmin on the GTM can't advance past the reported
+ * xmins. But in some cases where a node fails to report its GlobalXmin
+ * and gets excluded from the list of nodes on GTM, the GlobalXmin will
+ * be advanced. Usually such node will shoot itself in the head
+ * and rejoin the cluster, but if at all it sends a snapshot to us, we
+ * should protect ourselves from using it
+ */
+ global_xmin = ClusterMonitorGetGlobalXmin();
+ if (!TransactionIdIsValid(global_xmin))
+ global_xmin = FirstNormalTransactionId;
+
+ if (TransactionIdPrecedes(globalSnapshot.gxmin, global_xmin))
+ elog(ERROR, "Snapshot too old - RecentGlobalXmin (%d) has already "
+ "advanced past the snapshot xmin (%d)",
+ global_xmin, globalSnapshot.gxmin);
+
+ memcpy(snapshot->xip, globalSnapshot.gxip,
+ globalSnapshot.gxcnt * sizeof(TransactionId));
+ snapshot->curcid = GetCurrentCommandId(false);
+
+ if (!TransactionIdIsValid(MyPgXact->xmin))
+ MyPgXact->xmin = TransactionXmin = globalSnapshot.gxmin;
+
+ RecentXmin = globalSnapshot.gxmin;
+ RecentGlobalXmin = global_xmin;
+
+ /*
+ * XXX Is it ok to set RecentGlobalDataXmin same as RecentGlobalXmin ?
+ */
+ RecentGlobalDataXmin = RecentGlobalXmin;
+
+ if (!TransactionIdIsValid(MyPgXact->xmin))
+ MyPgXact->xmin = snapshot->xmin;
+
+ LWLockRelease(ProcArrayLock);
+ /* End handling of local analyze XID in snapshots */
+ }
+ else
+ elog(ERROR, "Cannot set snapshot from global snapshot");
+}
+#endif /* PGXC */
+
/* ----------------------------------------------
* KnownAssignedTransactions sub-module
* ----------------------------------------------
*/
static void
exec_parse_message(const char *query_string, /* string to execute */
- const char *stmt_name, /* name for prepared stmt */
- Oid *paramTypes, /* parameter types */
+ const char *stmt_name, /* name for prepared stmt */
+ Oid *paramTypes, /* parameter types */
+ char **paramTypeNames, /* parameter type names */
- int numParams) /* number of parameters */
+ int numParams) /* number of parameters */
{
MemoryContext unnamed_stmt_context = NULL;
MemoryContext oldcontext;
appendStringInfoString(&str, "! system usage stats:\n");
appendStringInfo(&str,
- "!\t%ld.%06ld s user, %ld.%06ld s system, %ld.%06ld s elapsed\n",
+ "!\t%ld.%06ld s user, %ld.%06ld s system, %ld.%06ld s elapsed\n",
- (long) (r.ru_utime.tv_sec - Save_r.ru_utime.tv_sec),
- (long) (r.ru_utime.tv_usec - Save_r.ru_utime.tv_usec),
- (long) (r.ru_stime.tv_sec - Save_r.ru_stime.tv_sec),
- (long) (r.ru_stime.tv_usec - Save_r.ru_stime.tv_usec),
- (long) (elapse_t.tv_sec - Save_t.tv_sec),
- (long) (elapse_t.tv_usec - Save_t.tv_usec));
+ (long) (r.ru_utime.tv_sec - save_r->ru_utime.tv_sec),
+ (long) (r.ru_utime.tv_usec - save_r->ru_utime.tv_usec),
+ (long) (r.ru_stime.tv_sec - save_r->ru_stime.tv_sec),
+ (long) (r.ru_stime.tv_usec - save_r->ru_stime.tv_usec),
+ (long) (elapse_t.tv_sec - save_t->tv_sec),
+ (long) (elapse_t.tv_usec - save_t->tv_usec));
appendStringInfo(&str,
"!\t[%ld.%06ld s user, %ld.%06ld s system total]\n",
(long) user.tv_sec,
#if defined(HAVE_GETRUSAGE)
appendStringInfo(&str,
"!\t%ld/%ld [%ld/%ld] filesystem blocks in/out\n",
- r.ru_inblock - Save_r.ru_inblock,
+ r.ru_inblock - save_r->ru_inblock,
/* they only drink coffee at dec */
- r.ru_oublock - Save_r.ru_oublock,
+ r.ru_oublock - save_r->ru_oublock,
r.ru_inblock, r.ru_oublock);
appendStringInfo(&str,
- "!\t%ld/%ld [%ld/%ld] page faults/reclaims, %ld [%ld] swaps\n",
- "!\t%ld/%ld [%ld/%ld] page faults/reclaims, %ld [%ld] swaps\n",
- r.ru_majflt - Save_r.ru_majflt,
- r.ru_minflt - Save_r.ru_minflt,
++ "!\t%ld/%ld [%ld/%ld] page faults/reclaims, %ld [%ld] swaps\n",
+ r.ru_majflt - save_r->ru_majflt,
+ r.ru_minflt - save_r->ru_minflt,
r.ru_majflt, r.ru_minflt,
- r.ru_nswap - Save_r.ru_nswap,
+ r.ru_nswap - save_r->ru_nswap,
r.ru_nswap);
appendStringInfo(&str,
- "!\t%ld [%ld] signals rcvd, %ld/%ld [%ld/%ld] messages rcvd/sent\n",
- "!\t%ld [%ld] signals rcvd, %ld/%ld [%ld/%ld] messages rcvd/sent\n",
- r.ru_nsignals - Save_r.ru_nsignals,
++ "!\t%ld [%ld] signals rcvd, %ld/%ld [%ld/%ld] messages rcvd/sent\n",
+ r.ru_nsignals - save_r->ru_nsignals,
r.ru_nsignals,
- r.ru_msgrcv - Save_r.ru_msgrcv,
- r.ru_msgsnd - Save_r.ru_msgsnd,
+ r.ru_msgrcv - save_r->ru_msgrcv,
+ r.ru_msgsnd - save_r->ru_msgsnd,
r.ru_msgrcv, r.ru_msgsnd);
appendStringInfo(&str,
- "!\t%ld/%ld [%ld/%ld] voluntary/involuntary context switches\n",
- "!\t%ld/%ld [%ld/%ld] voluntary/involuntary context switches\n",
- r.ru_nvcsw - Save_r.ru_nvcsw,
- r.ru_nivcsw - Save_r.ru_nivcsw,
++ "!\t%ld/%ld [%ld/%ld] voluntary/involuntary context switches\n",
+ r.ru_nvcsw - save_r->ru_nvcsw,
+ r.ru_nivcsw - save_r->ru_nivcsw,
r.ru_nvcsw, r.ru_nivcsw);
- #endif /* HAVE_GETRUSAGE */
+ #endif /* HAVE_GETRUSAGE */
/* remove trailing newline */
if (str.data[str.len - 1] == '\n')
/* ... and do it */
EventTriggerAlterTableStart(parsetree);
address =
- DefineIndex(relid, /* OID of heap relation */
+ DefineIndex(relid, /* OID of heap relation */
stmt,
InvalidOid, /* no predefined OID */
- false, /* is_alter_table */
- true, /* check_rights */
- true, /* check_not_in_use */
- false, /* skip_build */
- false); /* quiet */
+ false, /* is_alter_table */
+ true, /* check_rights */
+ true, /* check_not_in_use */
+ false, /* skip_build */
+ false); /* quiet */
+#ifdef PGXC
+ if (IS_PGXC_COORDINATOR && !stmt->isconstraint && !IsConnFromCoord())
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
+ stmt->concurrent, exec_type, is_temp);
+#endif
+
/*
* Add the CREATE INDEX node itself to stash right away;
* if there were any commands stashed in the ALTER TABLE
address = DefineCompositeType(stmt->typevar,
stmt->coldeflist);
}
+#ifdef PGXC
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_ALL_NODES, false);
+#endif
break;
- case T_CreateEnumStmt: /* CREATE TYPE AS ENUM */
+ case T_CreateEnumStmt: /* CREATE TYPE AS ENUM */
address = DefineEnum((CreateEnumStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_ALL_NODES, false);
+#endif
break;
- case T_CreateRangeStmt: /* CREATE TYPE AS RANGE */
+ case T_CreateRangeStmt: /* CREATE TYPE AS RANGE */
address = DefineRange((CreateRangeStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_ALL_NODES, false);
+#endif
break;
- case T_AlterEnumStmt: /* ALTER TYPE (enum) */
+ case T_AlterEnumStmt: /* ALTER TYPE (enum) */
address = AlterEnum((AlterEnumStmt *) parsetree);
+#ifdef PGXC
+ /*
+ * In this case force autocommit, this transaction cannot be launched
+ * inside a transaction block.
+ */
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote,
+ true, EXEC_ON_ALL_NODES, false);
+#endif
break;
case T_ViewStmt: /* CREATE VIEW */
PG_TRY();
{
address = ExecRefreshMatView((RefreshMatViewStmt *) parsetree,
- queryString, params, completionTag);
+ queryString, params, completionTag);
+#ifdef PGXC
+ if ((IS_PGXC_COORDINATOR) && !IsConnFromCoord())
+ {
+ RefreshMatViewStmt *stmt = (RefreshMatViewStmt *) parsetree;
+ if (stmt->relation->relpersistence != RELPERSISTENCE_TEMP)
+ ExecUtilityStmtOnNodes(queryString, NULL,
+ sentToRemote, false, EXEC_ON_COORDS, false);
+ }
+#endif
}
PG_CATCH();
{
case T_CreatePolicyStmt: /* CREATE POLICY */
address = CreatePolicy((CreatePolicyStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_ALL_NODES, false);
+#endif
break;
- case T_AlterPolicyStmt: /* ALTER POLICY */
+ case T_AlterPolicyStmt: /* ALTER POLICY */
address = AlterPolicy((AlterPolicyStmt *) parsetree);
+#ifdef PGXC
+ if (IS_PGXC_LOCAL_COORDINATOR)
+ ExecUtilityStmtOnNodes(queryString, NULL, sentToRemote, false, EXEC_ON_ALL_NODES, false);
+#endif
break;
case T_SecLabelStmt:
int wrapColumn; /* max line length, or -1 for no limit */
int indentLevel; /* current indent level for prettyprint */
bool varprefix; /* TRUE to print prefixes on Vars */
- ParseExprKind special_exprkind; /* set only for exprkinds needing
- * special handling */
+ ParseExprKind special_exprkind; /* set only for exprkinds needing special
+ * handling */
+#ifdef PGXC
+ bool finalise_aggs; /* should Datanode finalise the aggregates? */
+ bool sortgroup_colno;/* instead of expression use resno for
+ * sortgrouprefs.
+ */
+#endif /* PGXC */
} deparse_context;
/*
int VacuumPageMiss = 0;
int VacuumPageDirty = 0;
- int VacuumCostBalance = 0; /* working state for vacuum */
+ int VacuumCostBalance = 0; /* working state for vacuum */
bool VacuumCostActive = false;
+
+#ifdef PGXC
+bool useLocalXid = false;
+#endif
bool log_parser_stats = false;
bool log_planner_stats = false;
bool log_executor_stats = false;
- bool log_statement_stats = false; /* this is sort of all three
- * above together */
+ bool log_statement_stats = false; /* this is sort of all three above
+ * together */
+#ifdef XCP
+bool log_gtm_stats = false;
+bool log_remotesubplan_stats = false;
+#endif
+
bool log_btree_build_stats = false;
char *event_source;
* from the slab memory arena, or is palloc'd, see readtup_alloc().
*/
void (*readtup) (Tuplesortstate *state, SortTuple *stup,
- int tapenum, unsigned int len);
+ int tapenum, unsigned int len);
+#ifdef PGXC
+ /*
+ * Function to read length of next stored tuple.
+ * Used as 'len' parameter for readtup function.
+ */
+ unsigned int (*getlen) (Tuplesortstate *state, int tapenum, bool eofOK);
+#endif
+
/*
* This array holds the tuples now in sort memory. If we are in state
* INITIAL, the tuples are in no particular order; if we are in state
SEEK_CUR) != 0)
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not seek in tuplestore temporary file: %m")));
+ errmsg("could not seek in tuplestore temporary file: %m")));
tup = READTUP(state, tuplen);
+ if (state->stat_name && tup)
+ state->stat_read_count++;
+
return tup;
default:
sizeof(tuplen)) != sizeof(tuplen))
ereport(ERROR,
(errcode_for_file_access(),
- errmsg("could not read from tuplestore temporary file: %m")));
+ errmsg("could not read from tuplestore temporary file: %m")));
return (void *) tuple;
}
+
+
+#ifdef XCP
+/*
+ * Routines to support Datarow tuple format, used for exchange between nodes
+ * as well as send data to client
+ */
+Tuplestorestate *
+tuplestore_begin_datarow(bool interXact, int maxKBytes,
+ MemoryContext tmpcxt)
+{
+ Tuplestorestate *state;
+
+ state = tuplestore_begin_common(0, interXact, maxKBytes);
+
+ state->format = TSF_DATAROW;
+ state->copytup = copytup_datarow;
+ state->writetup = writetup_datarow;
+ state->readtup = readtup_datarow;
+ state->tmpcxt = tmpcxt;
+
+ return state;
+}
+
+
+/*
+ * Do we need this at all?
+ */
+static void *
+copytup_datarow(Tuplestorestate *state, void *tup)
+{
+ Assert(false);
+ return NULL;
+}
+
+static void
+writetup_datarow(Tuplestorestate *state, void *tup)
+{
+ RemoteDataRow tuple = (RemoteDataRow) tup;
+
+ /* the part of the MinimalTuple we'll write: */
+ char *tupbody = tuple->msg;
+ unsigned int tupbodylen = tuple->msglen;
+
+ /* total on-disk footprint: */
+ unsigned int tuplen = tupbodylen + sizeof(int) + sizeof(tuple->msgnode);
+
+ if (BufFileWrite(state->myfile, (void *) &tuplen,
+ sizeof(int)) != sizeof(int))
+ elog(ERROR, "write failed");
+ if (BufFileWrite(state->myfile, (void *) &tuple->msgnode,
+ sizeof(tuple->msgnode)) != sizeof(tuple->msgnode))
+ elog(ERROR, "write failed");
+ if (BufFileWrite(state->myfile, (void *) tupbody,
+ tupbodylen) != (size_t) tupbodylen)
+ elog(ERROR, "write failed");
+ if (state->backward) /* need trailing length word? */
+ if (BufFileWrite(state->myfile, (void *) &tuplen,
+ sizeof(tuplen)) != sizeof(tuplen))
+ elog(ERROR, "write failed");
+
+ FREEMEM(state, GetMemoryChunkSpace(tuple));
+ pfree(tuple);
+}
+
+static void *
+readtup_datarow(Tuplestorestate *state, unsigned int len)
+{
+ RemoteDataRow tuple = (RemoteDataRow) palloc(len);
+ unsigned int tupbodylen = len - sizeof(int) - sizeof(tuple->msgnode);
+
+ USEMEM(state, GetMemoryChunkSpace(tuple));
+ /* read in the tuple proper */
+ tuple->msglen = tupbodylen;
+ if (BufFileRead(state->myfile, (void *) &tuple->msgnode,
+ sizeof(tuple->msgnode)) != sizeof(tuple->msgnode))
+ elog(ERROR, "unexpected end of data");
+ if (BufFileRead(state->myfile, (void *) tuple->msg,
+ tupbodylen) != (size_t) tupbodylen)
+ elog(ERROR, "unexpected end of data");
+ if (state->backward) /* need trailing length word? */
+ if (BufFileRead(state->myfile, (void *) &len,
+ sizeof(len)) != sizeof(len))
+ elog(ERROR, "unexpected end of data");
+ return (void *) tuple;
+}
+
+
+/*
+ * Routines to support storage of protocol message data
+ */
+Tuplestorestate *
+tuplestore_begin_message(bool interXact, int maxKBytes)
+{
+ Tuplestorestate *state;
+
+ state = tuplestore_begin_common(0, interXact, maxKBytes);
+
+ state->format = TSF_MESSAGE;
+ state->copytup = copytup_message;
+ state->writetup = writetup_message;
+ state->readtup = readtup_message;
+ state->tmpcxt = NULL;
+
+ return state;
+}
+
+
+void
+tuplestore_putmessage(Tuplestorestate *state, int len, char* msg)
+{
+ msg_data m;
+ void *tuple;
+ MemoryContext oldcxt = MemoryContextSwitchTo(state->context);
+
+ Assert(state->format == TSF_MESSAGE);
+
+ m.msglen = len;
+ m.msg = msg;
+
+ tuple = COPYTUP(state, &m);
+ tuplestore_puttuple_common(state, tuple);
+
+ MemoryContextSwitchTo(oldcxt);
+}
+
+
+char *
+tuplestore_getmessage(Tuplestorestate *state, int *len)
+{
+ bool should_free;
+ void *result;
+ void *tuple = tuplestore_gettuple(state, true, &should_free);
+
+ Assert(state->format == TSF_MESSAGE);
+
+ /* done? */
+ if (!tuple)
+ return NULL;
+
+ *len = *((int *) tuple);
+
+ result = palloc(*len);
+ memcpy(result, ((char *) tuple) + sizeof(int), *len);
+ if (should_free)
+ pfree(tuple);
+
+ return (char *) result;
+}
+
+
+static void *
+copytup_message(Tuplestorestate *state, void *tup)
+{
+ msg_data *m = (msg_data *) tup;
+ void *tuple;
+
+ tuple = palloc(m->msglen + sizeof(int));
+ *((int *) tuple) = m->msglen;
+ memcpy(((char *) tuple) + sizeof(int), m->msg, m->msglen);
+ USEMEM(state, GetMemoryChunkSpace(tuple));
+ return tuple;
+}
+
+
+static void
+writetup_message(Tuplestorestate *state, void *tup)
+{
+ void *tupbody = ((char *) tup) + sizeof(int);
+ unsigned int tupbodylen = *((int *) tup);
+ /* total on-disk footprint: */
+ unsigned int tuplen = tupbodylen + sizeof(tuplen);
+
+ if (BufFileWrite(state->myfile, &tuplen,
+ sizeof(tuplen)) != sizeof(tuplen))
+ elog(ERROR, "write failed");
+ if (BufFileWrite(state->myfile, tupbody, tupbodylen) != tupbodylen)
+ elog(ERROR, "write failed");
+ if (state->backward) /* need trailing length word? */
+ if (BufFileWrite(state->myfile, &tuplen,
+ sizeof(tuplen)) != sizeof(tuplen))
+ elog(ERROR, "write failed");
+
+ FREEMEM(state, GetMemoryChunkSpace(tup));
+ pfree(tup);
+}
+
+static void *
+readtup_message(Tuplestorestate *state, unsigned int len)
+{
+ void *tuple = palloc(len);
+ int tupbodylen = len - sizeof(int);
+ void *tupbody = ((char *) tuple) + sizeof(int);
+ *((int *) tuple) = tupbodylen;
+
+ USEMEM(state, GetMemoryChunkSpace(tuple));
+ /* read in the tuple proper */
+ if (BufFileRead(state->myfile, tupbody, tupbodylen) != tupbodylen)
+ elog(ERROR, "unexpected end of data");
+ if (state->backward) /* need to read trailing length word? */
+ if (BufFileRead(state->myfile, (void *) &len,
+ sizeof(len)) != sizeof(len))
+ elog(ERROR, "unexpected end of data");
+ return tuple;
+}
+#endif
+
+
+void
+tuplestore_collect_stat(Tuplestorestate *state, char *name)
+{
+ if (state->status != TSS_INMEM || state->memtupcount != 0)
+ {
+ elog(WARNING, "tuplestore %s is already in use, to late to get statistics",
+ name);
+ return;
+ }
+
+ state->stat_name = pstrdup(name);
+}
"default_text_search_config = 'pg_catalog.%s'",
escape_quotes(default_text_search_config));
conflines = replace_token(conflines,
- "#default_text_search_config = 'pg_catalog.simple'",
+ "#default_text_search_config = 'pg_catalog.simple'",
repltok);
+#ifdef PGXC
+ /* Add Postgres-XC node name to configuration file */
+ snprintf(repltok, sizeof(repltok),
+ "pgxc_node_name = '%s'",
+ escape_quotes(nodename));
+ conflines = replace_token(conflines, "#pgxc_node_name = ''", repltok);
+#endif
default_timezone = select_default_timezone(share_path);
if (default_timezone)
setup_depend(cmdfd);
+ /*
+ * Note that no objects created after setup_depend() will be "pinned".
+ * They are all droppable at the whim of the DBA.
+ */
+
setup_sysviews(cmdfd);
+#ifdef PGXC
+ /* Initialize catalog information about the node self */
+ setup_nodeself(cmdfd);
+#endif
setup_description(cmdfd);
setup_collation(cmdfd);
RestoreOptions *ropt; /* options, if restoring */
int verbose;
- char *remoteVersionStr; /* server's version string */
+ char *remoteVersionStr; /* server's version string */
int remoteVersion; /* same in numeric form */
bool isStandby; /* is server a standby node */
+ bool isPostgresXL; /* is server a Postgres-XL node */
- int minRemoteVersion; /* allowable range */
+ int minRemoteVersion; /* allowable range */
int maxRemoteVersion;
int numWorkers; /* number of parallel processes */
"d.classid = c.tableoid AND d.objid = c.oid AND "
"d.objsubid = 0 AND "
"d.refclassid = c.tableoid AND d.deptype = 'a') "
- "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
- "WHERE c.relkind in ('%c', '%c', '%c', '%c', '%c', '%c') "
+ "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
+ "WHERE c.relkind in ('%c', '%c', '%c', '%c', '%c', '%c') "
"ORDER BY c.oid",
username_subquery,
+ fout->isPostgresXL
+ ? "(SELECT pclocatortype from pgxc_class v where v.pcrelid = c.oid) AS pgxclocatortype,"
+ "(SELECT pcattnum from pgxc_class v where v.pcrelid = c.oid) AS pgxcattnum,"
+ "(SELECT string_agg(node_name,',') AS pgxc_node_names from pgxc_node n where n.oid in (select unnest(nodeoids) from pgxc_class v where v.pcrelid=c.oid) ) , "
+ : "",
RELKIND_SEQUENCE,
RELKIND_RELATION, RELKIND_SEQUENCE,
RELKIND_VIEW, RELKIND_COMPOSITE_TYPE,
"d.classid = c.tableoid AND d.objid = c.oid AND "
"d.objsubid = 0 AND "
"d.refclassid = c.tableoid AND d.deptype = 'a') "
- "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
- "WHERE c.relkind in ('%c', '%c', '%c', '%c', '%c', '%c') "
+ "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
+ "WHERE c.relkind in ('%c', '%c', '%c', '%c', '%c', '%c') "
"ORDER BY c.oid",
username_subquery,
+ fout->isPostgresXL
+ ? "(SELECT pclocatortype from pgxc_class v where v.pcrelid = c.oid) AS pgxclocatortype,"
+ "(SELECT pcattnum from pgxc_class v where v.pcrelid = c.oid) AS pgxcattnum,"
+ "(SELECT string_agg(node_name,',') AS pgxc_node_names from pgxc_node n where n.oid in (select unnest(nodeoids) from pgxc_class v where v.pcrelid=c.oid) ) , "
+ : "",
RELKIND_SEQUENCE,
RELKIND_RELATION, RELKIND_SEQUENCE,
RELKIND_VIEW, RELKIND_COMPOSITE_TYPE,
"d.classid = c.tableoid AND d.objid = c.oid AND "
"d.objsubid = 0 AND "
"d.refclassid = c.tableoid AND d.deptype = 'a') "
- "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
- "WHERE c.relkind in ('%c', '%c', '%c', '%c', '%c', '%c') "
+ "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
+ "WHERE c.relkind in ('%c', '%c', '%c', '%c', '%c', '%c') "
"ORDER BY c.oid",
username_subquery,
+ fout->isPostgresXL
+ ? "(SELECT pclocatortype from pgxc_class v where v.pcrelid = c.oid) AS pgxclocatortype,"
+ "(SELECT pcattnum from pgxc_class v where v.pcrelid = c.oid) AS pgxcattnum,"
+ "(SELECT string_agg(node_name,',') AS pgxc_node_names from pgxc_node n where n.oid in (select unnest(nodeoids) from pgxc_class v where v.pcrelid=c.oid) ) , "
+ : "",
RELKIND_SEQUENCE,
RELKIND_RELATION, RELKIND_SEQUENCE,
RELKIND_VIEW, RELKIND_COMPOSITE_TYPE,
"d.classid = c.tableoid AND d.objid = c.oid AND "
"d.objsubid = 0 AND "
"d.refclassid = c.tableoid AND d.deptype = 'a') "
- "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
- "WHERE c.relkind in ('%c', '%c', '%c', '%c', '%c', '%c') "
+ "LEFT JOIN pg_class tc ON (c.reltoastrelid = tc.oid) "
+ "WHERE c.relkind in ('%c', '%c', '%c', '%c', '%c', '%c') "
"ORDER BY c.oid",
username_subquery,
+ fout->isPostgresXL
+ ? "(SELECT pclocatortype from pgxc_class v where v.pcrelid = c.oid) AS pgxclocatortype,"
+ "(SELECT pcattnum from pgxc_class v where v.pcrelid = c.oid) AS pgxcattnum,"
+ "(SELECT string_agg(node_name,',') AS pgxc_node_names from pgxc_node n where n.oid in (select unnest(nodeoids) from pgxc_class v where v.pcrelid=c.oid) ) , "
+ : "",
RELKIND_SEQUENCE,
RELKIND_RELATION, RELKIND_SEQUENCE,
RELKIND_VIEW, RELKIND_COMPOSITE_TYPE,
"\nInitialization options:\n"
" -i, --initialize invokes initialization mode\n"
" -F, --fillfactor=NUM set fill factor\n"
- " -n, --no-vacuum do not run VACUUM after initialization\n"
- " -q, --quiet quiet logging (one message each 5 seconds)\n"
+#ifdef PGXC
+ " -k distribute by primary key branch id - bid\n"
+#endif
+ " -n, --no-vacuum do not run VACUUM after initialization\n"
+ " -q, --quiet quiet logging (one message each 5 seconds)\n"
" -s, --scale=NUM scaling factor\n"
" --foreign-keys create foreign key constraints between tables\n"
" --index-tablespace=TABLESPACE\n"
" -c, --client=NUM number of concurrent database clients (default: 1)\n"
" -C, --connect establish new connection for each transaction\n"
" -D, --define=VARNAME=VALUE\n"
- " define variable for use by custom script\n"
+ " define variable for use by custom script\n"
+#ifdef PGXC
+ " -k query with default key and additional key branch id (bid)\n"
+#endif
" -j, --jobs=NUM number of threads (default: 1)\n"
" -l, --log write transaction times to log file\n"
" -L, --latency-limit=NUM count transactions lasting more than NUM ms as late\n"
bool bucket_has_garbage,
IndexBulkDeleteCallback callback, void *callback_state);
- #endif /* HASH_H */
+#ifdef PGXC
+extern Datum compute_hash(Oid type, Datum value, char locator);
+extern char *get_compute_hash_function(Oid type, char locator);
+#endif
+
+ #endif /* HASH_H */
#define MaxTransactionIdAttributeNumber (-5)
#define MaxCommandIdAttributeNumber (-6)
#define TableOidAttributeNumber (-7)
+#ifdef PGXC
+#define XC_NodeIdAttributeNumber (-8)
+#define FirstLowInvalidHeapAttributeNumber (-9)
+#else
#define FirstLowInvalidHeapAttributeNumber (-8)
+#endif
+
- #endif /* SYSATTR_H */
+ #endif /* SYSATTR_H */
} SubXactEvent;
typedef void (*SubXactCallback) (SubXactEvent event, SubTransactionId mySubid,
- SubTransactionId parentSubid, void *arg);
+ SubTransactionId parentSubid, void *arg);
+#ifdef PGXC
+/*
+ * GTM callback events
+ */
+typedef enum
+{
+ GTM_EVENT_COMMIT,
+ GTM_EVENT_ABORT,
+ GTM_EVENT_PREPARE
+} GTMEvent;
+
+typedef void (*GTMCallback) (GTMEvent event, void *arg);
+#endif
/* ----------------
* transaction-related XLOG entries
DESCR("");
- #define RELKIND_RELATION 'r' /* ordinary table */
- #define RELKIND_INDEX 'i' /* secondary index */
- #define RELKIND_SEQUENCE 'S' /* sequence object */
- #define RELKIND_TOASTVALUE 't' /* for out-of-line values */
- #define RELKIND_VIEW 'v' /* view */
- #define RELKIND_MATVIEW 'm' /* materialized view */
- #define RELKIND_COMPOSITE_TYPE 'c' /* composite type */
- #define RELKIND_FOREIGN_TABLE 'f' /* foreign table */
- #define RELKIND_PARTITIONED_TABLE 'p' /* partitioned table */
-
- #define RELPERSISTENCE_PERMANENT 'p' /* regular table */
- #define RELPERSISTENCE_UNLOGGED 'u' /* unlogged permanent table */
- #define RELPERSISTENCE_TEMP 't' /* temporary table */
+ #define RELKIND_RELATION 'r' /* ordinary table */
+ #define RELKIND_INDEX 'i' /* secondary index */
+ #define RELKIND_SEQUENCE 'S' /* sequence object */
+ #define RELKIND_TOASTVALUE 't' /* for out-of-line values */
+ #define RELKIND_VIEW 'v' /* view */
+ #define RELKIND_MATVIEW 'm' /* materialized view */
+ #define RELKIND_COMPOSITE_TYPE 'c' /* composite type */
+ #define RELKIND_FOREIGN_TABLE 'f' /* foreign table */
+ #define RELKIND_PARTITIONED_TABLE 'p' /* partitioned table */
+
+ #define RELPERSISTENCE_PERMANENT 'p' /* regular table */
+ #define RELPERSISTENCE_UNLOGGED 'u' /* unlogged permanent table */
+ #define RELPERSISTENCE_TEMP 't' /* temporary table */
+#ifdef PGXC
+#define RELPERSISTENCE_LOCAL_TEMP 'l' /* local temp table */
+#endif
+
/* default selection for replica identity (primary key or nothing) */
#define REPLICA_IDENTITY_DEFAULT 'd'
/* no replica identity is logged for this relation */
DATA(insert OID = 6014 ( pg_show_replication_origin_status PGNSP PGUID 12 1 100 0 0 f f f f f t v r 0 0 2249 "" "{26,25,3220,3220}" "{o,o,o,o}" "{local_id, external_id, remote_lsn, local_lsn}" _null_ _null_ pg_show_replication_origin_status _null_ _null_ _null_ ));
DESCR("get progress for all replication origins");
+#ifdef USE_MODULE_MSGIDS
+DATA(insert OID = 6015 ( pg_msgmodule_set PGNSP PGUID 12 1 1 0 0 f f f f t t i s 4 0 16 "20 20 20 25" _null_ _null_ _null_ _null_ _null_ pg_msgmodule_set _null_ _null_ _null_ ));
+DESCR("set debugging level for module/file/msg");
+DATA(insert OID = 6016 ( pg_msgmodule_change PGNSP PGUID 12 1 1 0 0 f f f f t t i s 4 0 16 "20 20 20 20" _null_ _null_ _null_ _null_ _null_ pg_msgmodule_change _null_ _null_ _null_ ));
+DESCR("change debugging level for module/file/msg");
+DATA(insert OID = 6017 ( pg_msgmodule_enable PGNSP PGUID 12 1 1 0 0 f f f f t t i s 1 0 16 "20" _null_ _null_ _null_ _null_ _null_ pg_msgmodule_enable _null_ _null_ _null_ ));
+DESCR("pid to honour overriden log levels");
+DATA(insert OID = 6018 ( pg_msgmodule_disable PGNSP PGUID 12 1 1 0 0 f f f f t t i s 1 0 16 "20" _null_ _null_ _null_ _null_ _null_ pg_msgmodule_disable _null_ _null_ _null_ ));
+DESCR("pid to ignore overriden log levels");
+DATA(insert OID = 6019 ( pg_msgmodule_enable_all PGNSP PGUID 12 1 1 0 0 f f f f t t i s 1 0 16 "16" _null_ _null_ _null_ _null_ _null_ pg_msgmodule_enable_all _null_ _null_ _null_ ));
+DESCR("all current/future processes to honour overriden log levels");
+DATA(insert OID = 6020 ( pg_msgmodule_disable_all PGNSP PGUID 12 1 1 0 0 f f f f t t i s 0 0 16 "" _null_ _null_ _null_ _null_ _null_ pg_msgmodule_disable_all _null_ _null_ _null_ ));
+DESCR("all processes to ignore overriden log levels");
+#endif
/* publications */
- DATA(insert OID = 6119 ( pg_get_publication_tables PGNSP PGUID 12 1 1000 0 0 f f t f t t s s 1 0 26 "25" "{25,26}" "{i,o}" "{pubname,relid}" _null_ _null_ pg_get_publication_tables _null_ _null_ _null_ ));
+ DATA(insert OID = 6119 ( pg_get_publication_tables PGNSP PGUID 12 1 1000 0 0 f f f f t t s s 1 0 26 "25" "{25,26}" "{i,o}" "{pubname,relid}" _null_ _null_ pg_get_publication_tables _null_ _null_ _null_ ));
DESCR("get OIDs of tables in a publication");
+ DATA(insert OID = 6121 ( pg_relation_is_publishable PGNSP PGUID 12 1 0 0 0 f f f f t f s s 1 0 16 "2205" _null_ _null_ _null_ _null_ _null_ pg_relation_is_publishable _null_ _null_ _null_ ));
+ DESCR("returns whether a relation can be part of a publication");
/* rls */
DATA(insert OID = 3298 ( row_security_active PGNSP PGUID 12 1 0 0 0 f f f f t f s s 1 0 16 "26" _null_ _null_ _null_ _null_ _null_ row_security_active _null_ _null_ _null_ ));
extern void check_encoding_locale_matches(int encoding, const char *collate, const char *ctype);
- #endif /* DBCOMMANDS_H */
+#ifdef PGXC
+extern bool IsSetTableSpace(AlterDatabaseStmt *stmt);
+#endif
+
+ #endif /* DBCOMMANDS_H */
{
/* dynahash.c requires key to be first field */
char stmt_name[NAMEDATALEN];
- CachedPlanSource *plansource; /* the actual cached plan */
+ CachedPlanSource *plansource; /* the actual cached plan */
bool from_sql; /* prepared via SQL, not FE/BE protocol? */
+#ifdef XCP
+ bool use_resowner; /* does it use resowner for tracking? */
+#endif
TimestampTz prepare_time; /* the time when the stmt was prepared */
} PreparedStatement;
extern void DropAllPreparedStatements(void);
- #endif /* PREPARE_H */
+#ifdef PGXC
+extern DatanodeStatement *FetchDatanodeStatement(const char *stmt_name, bool throwError);
+extern bool ActivateDatanodeStatementOnNode(const char *stmt_name, int noid);
+extern bool HaveActiveDatanodeStatements(void);
+extern void DropDatanodeStatement(const char *stmt_name);
+extern int SetRemoteStatementName(Plan *plan, const char *stmt_name, int num_params,
+ Oid *param_types, int n);
+#endif
+
+ #endif /* PREPARE_H */
extern const char *seq_identify(uint8 info);
extern void seq_mask(char *pagedata, BlockNumber blkno);
- #endif /* SEQUENCE_H */
+#ifdef XCP
+#define DEFAULT_CACHEVAL 1
+extern int SequenceRangeVal;
+#endif
+#ifdef PGXC
+/*
+ * List of actions that registered the callback.
+ * This is listed here and not in sequence.c because callback can also
+ * be registered in dependency.c and tablecmds.c as sequences can be dropped
+ * or renamed in cascade.
+ */
+typedef enum
+{
+ GTM_CREATE_SEQ,
+ GTM_DROP_SEQ
+} GTM_SequenceDropType;
+
+extern bool IsTempSequence(Oid relid);
+extern char *GetGlobalSeqName(Relation rel, const char *new_seqname, const char *new_schemaname);
+#endif
+
+ #endif /* SEQUENCE_H */
/* First argument is a RelFileNode */
#define relpathperm(rnode, forknum) \
relpathbackend(rnode, InvalidBackendId, forknum)
+#ifdef XCP
+#define relpathperm_client(rnode, forknum, nodename) \
+ relpathbackend_client(rnode, InvalidBackendId, forknum, nodename)
+#endif
/* First argument is a RelFileNodeBackend */
+#ifdef XCP
+#define relpath(rnode, forknum) \
+ relpathbackend((rnode).node, InvalidBackendId, forknum)
+#else
#define relpath(rnode, forknum) \
relpathbackend((rnode).node, (rnode).backend, forknum)
-
+#endif
- #endif /* RELPATH_H */
+ #endif /* RELPATH_H */
EState *estate; /* executor's query-wide state */
PlanState *planstate; /* tree of per-plan-node state */
+#ifdef XCP
+ SharedQueue squeue; /* the shared memory queue to sent data to other
+ * nodes */
+ int myindex; /* -1 if locally executed subplan is producing
+ * data and distribute via squeue. Otherwise
+ * get local data from squeue */
+#endif
/* This field is set by ExecutorRun */
- bool already_executed; /* true if previously executed */
+ bool already_executed; /* true if previously executed */
/* This is always set NULL by the core system, but plugins can change it */
struct Instrumentation *totaltime; /* total time spent in ExecutorRun */
extern void AtEOXact_SPI(bool isCommit);
extern void AtEOSubXact_SPI(bool isCommit, SubTransactionId mySubid);
- #endif /* SPI_H */
+#ifdef PGXC
+extern int SPI_execute_direct(const char *src, char *nodename);
+#endif
+
+ #endif /* SPI_H */
NodeTag type;
bool tts_isempty; /* true = slot is empty */
bool tts_shouldFree; /* should pfree tts_tuple? */
- bool tts_shouldFreeMin; /* should pfree tts_mintuple? */
+ bool tts_shouldFreeMin; /* should pfree tts_mintuple? */
bool tts_slow; /* saved state for slot_deform_tuple */
HeapTuple tts_tuple; /* physical tuple, or NULL if virtual */
+#ifdef PGXC
+ RemoteDataRow tts_datarow; /* Tuple data in DataRow format */
+ MemoryContext tts_drowcxt; /* Context to store deformed */
+ bool tts_shouldFreeRow; /* should pfree tts_dataRow? */
+ struct AttInMetadata *tts_attinmeta; /* store here info to extract values from the DataRow */
+#endif
TupleDesc tts_tupleDescriptor; /* slot's tuple descriptor */
MemoryContext tts_mcxt; /* slot itself is in this context */
Buffer tts_buffer; /* tuple's buffer, or InvalidBuffer */
/* support for hashtables using Bitmapsets as keys: */
extern uint32 bms_hash_value(const Bitmapset *a);
- #endif /* BITMAPSET_H */
+#ifdef XCP
+extern int bms_any_member(Bitmapset *a);
+#endif
+
+ #endif /* BITMAPSET_H */
bool canSetTag; /* do we set the command tag/es_processed? */
bool mt_done; /* are we done? */
PlanState **mt_plans; /* subplans (one per target rel) */
+#ifdef PGXC
+ PlanState **mt_remoterels; /* per-target remote query node */
+#endif
int mt_nplans; /* number of plans in the array */
int mt_whichplan; /* which one is being executed (0..n-1) */
- ResultRelInfo *resultRelInfo; /* per-subplan target relations */
+ ResultRelInfo *resultRelInfo; /* per-subplan target relations */
ResultRelInfo *rootResultRelInfo; /* root target relation (partitioned
* table root) */
List **mt_arowmarks; /* per-subplan ExecAuxRowMark lists */
VACOPT_FULL = 1 << 4, /* FULL (non-concurrent) vacuum */
VACOPT_NOWAIT = 1 << 5, /* don't wait to get lock (autovacuum only) */
VACOPT_SKIPTOAST = 1 << 6, /* don't process the TOAST table, if any */
- VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7, /* don't skip any pages */
- VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7 /* don't skip any pages */
++ VACOPT_DISABLE_PAGE_SKIPPING = 1 << 7, /* don't skip any pages */
+ VACOPT_COORDINATOR = 1 << 8 /* don't trigger analyze on the datanodes, but
+ * just collect existing info and populate
+ * coordinator side stats.
+ */
} VacuumOption;
typedef struct VacuumStmt
Path *outerjoinpath; /* path for the outer side of the join */
Path *innerjoinpath; /* path for the inner side of the join */
- List *joinrestrictinfo; /* RestrictInfos to apply to join */
+ List *joinrestrictinfo; /* RestrictInfos to apply to join */
+#ifdef XCP
+ List *movedrestrictinfo; /* RestrictInfos moved down to inner path */
+#endif
+
/*
* See the notes for RelOptInfo and ParamPathInfo to understand why
* joinrestrictinfo is needed in JoinPath, and can't be merged into the
LockClauseStrength strength,
LockWaitPolicy waitPolicy, bool pushedDown);
- #endif /* ANALYZE_H */
+#ifdef XCP
+extern void ParseAnalyze_callback(ParseState *pstate, Query *query);
+extern post_parse_analyze_hook_type prev_ParseAnalyze_callback;
+#endif
+ #endif /* ANALYZE_H */
extern void check_srf_call_placement(ParseState *pstate, Node *last_srf,
int location);
- #endif /* PARSE_FUNC_H */
+extern void check_pg_get_expr_args(ParseState *pstate, Oid fnoid, List *args);
+ #endif /* PARSE_FUNC_H */
extern Oid attnumCollationId(Relation rd, int attid);
extern bool isQueryUsingTempRelation(Query *query);
- #endif /* PARSE_RELATION_H */
+#ifdef PGXC
+extern int specialAttNum(const char *attname);
+#endif
+
+ #endif /* PARSE_RELATION_H */
*/
extern void ExceptionalCondition(const char *conditionName,
const char *errorType,
- const char *fileName, int lineNumber) pg_attribute_noreturn();
-
- //#define PGXC_COORD // for PGXC coordinator compiling
- //#define PGXC_DATANODE // for PGXC data node compiling
+ const char *fileName, int lineNumber) pg_attribute_noreturn();
- #endif /* POSTGRES_H */
+
+extern void ResetUsageCommon(struct rusage *save_r, struct timeval *save_t);
+extern void ResetUsage(void);
+extern void ShowUsageCommon(const char *title, struct rusage *save_r, struct
+ timeval *save_t);
+
+ #endif /* POSTGRES_H */
bool include_triggers,
Bitmapset *include_cols);
- #endif /* REWRITEHANDLER_H */
+#ifdef PGXC
+extern List *QueryRewriteCTAS(Query *parsetree);
+#endif
+
+ #endif /* REWRITEHANDLER_H */
#define InvalidBackendId (-1)
- extern PGDLLIMPORT BackendId MyBackendId; /* backend id of this backend */
+ extern PGDLLIMPORT BackendId MyBackendId; /* backend id of this backend */
+#ifdef XCP
+/*
+ * Two next variables make up distributed session id. Actual distributed
+ * session id is a string, which includes coordinator node name, but
+ * it is better to use Oid to store and compare with distributed session ids
+ * of other backends under the same postmaster.
+ */
+extern PGDLLIMPORT Oid MyCoordId;
+extern PGDLLIMPORT char MyCoordName[NAMEDATALEN];
+
+extern PGDLLIMPORT int MyCoordPid;
+extern PGDLLIMPORT LocalTransactionId MyCoordLxid;
+
+/* BackendId of the first backend of the distributed session on the node */
+extern PGDLLIMPORT BackendId MyFirstBackendId;
+#endif
/* backend id of our parallel session leader, or InvalidBackendId if none */
extern PGDLLIMPORT BackendId ParallelMasterBackendId;
BackendId backendId; /* This backend's backend ID (if assigned) */
Oid databaseId; /* OID of database this backend is using */
Oid roleId; /* OID of role using this backend */
+#ifdef XCP
+ Oid coordId; /* Oid of originating coordinator */
+ int coordPid; /* Pid of the originating session */
+ BackendId firstBackendId; /* Backend ID of the first backend of
+ * the distributed session */
+#endif
- bool isBackgroundWorker; /* true if background worker. */
+ bool isBackgroundWorker; /* true if background worker. */
/*
* While in hot standby mode, shows that a conflict signal has been sent
extern void XidCacheRemoveRunningXids(TransactionId xid,
int nxids, const TransactionId *xids,
TransactionId latestXid);
+#ifdef XCP
+extern void GetGlobalSessionInfo(int pid, Oid *coordId, int *coordPid);
+extern int GetFirstBackendId(int *numBackends, int *backends);
+#endif /* XCP */
extern void ProcArraySetReplicationSlotXmin(TransactionId xmin,
- TransactionId catalog_xmin, bool already_locked);
+ TransactionId catalog_xmin, bool already_locked);
extern void ProcArrayGetReplicationSlotXmin(TransactionId *xmin,
TransactionId *catalog_xmin);
{
PROCSIG_CATCHUP_INTERRUPT, /* sinval catchup interrupt */
PROCSIG_NOTIFY_INTERRUPT, /* listen/notify interrupt */
+#ifdef PGXC
+ PROCSIG_PGXCPOOL_RELOAD, /* abort current transaction and reconnect to pooler */
+ PROCSIG_PGXCPOOL_REFRESH, /* refresh local view of connection handles */
+#endif
PROCSIG_PARALLEL_MESSAGE, /* message from cooperating parallel backend */
- PROCSIG_WALSND_INIT_STOPPING, /* ask walsenders to prepare for
- * shutdown */
+ PROCSIG_WALSND_INIT_STOPPING, /* ask walsenders to prepare for shutdown */
/* Recovery conflict reasons */
PROCSIG_RECOVERY_CONFLICT_DATABASE,
long count,
DestReceiver *dest);
- #endif /* PQUERY_H */
+#ifdef XCP
+extern int AdvanceProducingPortal(Portal portal, bool can_wait);
+extern void cleanupClosedProducers(void);
+#endif
+
+ #endif /* PQUERY_H */
/* Hook for plugins to get control in ProcessUtility() */
typedef void (*ProcessUtility_hook_type) (PlannedStmt *pstmt,
- const char *queryString, ProcessUtilityContext context,
- ParamListInfo params,
- QueryEnvironment *queryEnv,
- DestReceiver *dest,
- bool sentToRemote,
- char *completionTag);
+ const char *queryString, ProcessUtilityContext context,
+ ParamListInfo params,
+ QueryEnvironment *queryEnv,
- DestReceiver *dest, char *completionTag);
++ DestReceiver *dest,
++ bool sentToRemote,
++ char *completionTag);
extern PGDLLIMPORT ProcessUtility_hook_type ProcessUtility_hook;
extern void ProcessUtility(PlannedStmt *pstmt, const char *queryString,
extern bool CommandIsReadOnly(PlannedStmt *pstmt);
- #endif /* UTILITY_H */
+#ifdef PGXC
+extern bool pgxc_lock_for_utility_stmt(Node *parsetree);
+#endif
+
+ #endif /* UTILITY_H */
/* quote.c */
extern char *quote_literal_cstr(const char *rawstr);
- #endif /* BUILTINS_H */
+#ifdef PGXC
+/* backend/pgxc/pool/poolutils.c */
+extern Datum pgxc_pool_check(PG_FUNCTION_ARGS);
+extern Datum pgxc_pool_reload(PG_FUNCTION_ARGS);
+
+/* backend/access/transam/transam.c */
+extern Datum pgxc_is_committed(PG_FUNCTION_ARGS);
+extern Datum pgxc_is_inprogress(PG_FUNCTION_ARGS);
+#endif
+extern Datum pg_msgmodule_set(PG_FUNCTION_ARGS);
+extern Datum pg_msgmodule_change(PG_FUNCTION_ARGS);
+extern Datum pg_msgmodule_enable(PG_FUNCTION_ARGS);
+extern Datum pg_msgmodule_disable(PG_FUNCTION_ARGS);
+extern Datum pg_msgmodule_enable_all(PG_FUNCTION_ARGS);
+extern Datum pg_msgmodule_disable_all(PG_FUNCTION_ARGS);
+ #endif /* BUILTINS_H */
if (elevel_ >= ERROR) \
pg_unreachable(); \
} while(0)
- #endif /* HAVE__BUILTIN_CONSTANT_P */
+ #endif /* HAVE__BUILTIN_CONSTANT_P */
+#endif
#define ereport(elevel, rest) \
ereport_domain(elevel, TEXTDOMAIN, rest)
pg_unreachable(); \
} \
} while(0)
- #endif /* HAVE__BUILTIN_CONSTANT_P */
+ #endif /* HAVE__BUILTIN_CONSTANT_P */
#else /* !HAVE__VA_ARGS */
#define elog \
- elog_start(__FILE__, __LINE__, PG_FUNCNAME_MACRO), \
+ do { \
+ elog_start(__FILE__, __LINE__, PG_FUNCNAME_MACRO); \
+ } while (0); \
elog_finish
- #endif /* HAVE__VA_ARGS */
+ #endif /* HAVE__VA_ARGS */
+#endif
-extern void elog_start(const char *filename, int lineno, const char *funcname);
+extern void elog_start(const char *filename, int lineno,
+#ifdef USE_MODULE_MSGIDS
+ int moduleid, int flieid, int msgid,
+#endif
+ const char *funcname
+ );
extern void elog_finish(int elevel, const char *fmt,...) pg_attribute_printf(2, 3);
/* in access/transam/xlog.c */
extern bool check_wal_buffers(int *newval, void **extra, GucSource source);
extern void assign_xlog_sync_method(int new_sync_method, void *extra);
+extern const char *quote_guc_value(const char *value, int flags);
- #endif /* GUC_H */
+ #endif /* GUC_H */
bool is_valid; /* is the query_list currently valid? */
int generation; /* increments each time we create a plan */
/* If CachedPlanSource has been saved, it is a member of a global list */
- struct CachedPlanSource *next_saved; /* list link, if so */
+ struct CachedPlanSource *next_saved; /* list link, if so */
/* State kept to help decide whether to use custom or generic plans: */
double generic_cost; /* cost of generic plan, or -1 if not known */
- double total_custom_cost; /* total cost of custom plans so far */
- int num_custom_plans; /* number of plans included in total */
+ double total_custom_cost; /* total cost of custom plans so far */
+ int num_custom_plans; /* number of plans included in total */
+#ifdef PGXC
+ char *stmt_name; /* If set, this is a copy of prepared stmt name */
+#endif
} CachedPlanSource;
/*
bool useResOwner,
QueryEnvironment *queryEnv);
extern void ReleaseCachedPlan(CachedPlan *plan, bool useResOwner);
+#ifdef XCP
+extern void SetRemoteSubplan(CachedPlanSource *plansource,
+ const char *plan_string);
+#endif
- #endif /* PLANCACHE_H */
+ #endif /* PLANCACHE_H */
extern PlannedStmt *PortalGetPrimaryStmt(Portal portal);
extern void PortalCreateHoldStore(Portal portal);
extern void PortalHashTableDeleteAll(void);
+#ifdef XCP
+extern void PortalCreateProducerStore(Portal portal);
+extern List *getProducingPortals(void);
+extern void addProducingPortal(Portal portal);
+extern void removeProducingPortal(Portal portal);
+extern bool portalIsProducing(Portal portal);
+#endif
extern bool ThereAreNoReadyPortals(void);
- #endif /* PORTAL_H */
+ #endif /* PORTAL_H */
Oid rd_toastoid; /* Real TOAST table's OID, or InvalidOid */
/* use "struct" here to avoid needing to include pgstat.h: */
- struct PgStat_TableStatus *pgstat_info; /* statistics collection area */
+ struct PgStat_TableStatus *pgstat_info; /* statistics collection area */
+#ifdef PGXC
+ RelationLocInfo *rd_locator_info;
+#endif
} RelationData;
extern void ResourceOwnerForgetDSM(ResourceOwner owner,
dsm_segment *);
- #endif /* RESOWNER_PRIVATE_H */
+#ifdef XCP
+/* support for prepared statement management */
+extern void ResourceOwnerEnlargePreparedStmts(ResourceOwner owner);
+extern void ResourceOwnerRememberPreparedStmt(ResourceOwner owner,
+ char *stmt);
+extern void ResourceOwnerForgetPreparedStmt(ResourceOwner owner,
+ char *stmt);
+#endif
+
+ #endif /* RESOWNER_PRIVATE_H */
extern void tuplestore_end(Tuplestorestate *state);
- #endif /* TUPLESTORE_H */
+#ifdef XCP
+extern Tuplestorestate *tuplestore_begin_datarow(bool interXact, int maxKBytes,
+ MemoryContext tmpcxt);
+extern Tuplestorestate *tuplestore_begin_message(bool interXact, int maxKBytes);
+extern void tuplestore_putmessage(Tuplestorestate *state, int len, char* msg);
+extern char *tuplestore_getmessage(Tuplestorestate *state, int *len);
+#endif
+
+extern void tuplestore_collect_stat(Tuplestorestate *state, char *name);
+
+ #endif /* TUPLESTORE_H */
LINE 1: ...xx1 using lateral (select * from int4_tbl where f1 = x1) ss;
^
HINT: There is an entry for table "xx1", but it cannot be referenced from this part of the query.
+-- demonstrate problem with extrememly slow join
+CREATE TABLE testr (a int, b int) DISTRIBUTE BY REPLICATION;
+INSERT INTO testr SELECT generate_series(1, 10000), generate_series(5001, 15000);
+CREATE TABLE testh (a int, b int);
+INSERT INTO testh SELECT generate_series(1, 10000), generate_series(8001, 18000);
+set enable_mergejoin TO false;
+set enable_hashjoin TO false;
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM testr WHERE NOT EXISTS (SELECT * FROM testh WHERE testr.b = testh.b);
+ QUERY PLAN
+-----------------------------------------------------------------------------------
+ Finalize Aggregate
+ Output: count(*)
+ -> Remote Subquery Scan on all (datanode_1,datanode_2)
+ Output: PARTIAL count(*)
+ -> Partial Aggregate
+ Output: PARTIAL count(*)
+ -> Nested Loop Anti Join
+ Join Filter: (testr.b = testh.b)
+ -> Remote Subquery Scan on all (datanode_1)
+ Output: testr.b
+ Distribute results by H: b
+ -> Seq Scan on public.testr
+ Output: testr.b
+ -> Materialize
+ Output: testh.b
+ -> Remote Subquery Scan on all (datanode_1,datanode_2)
+ Output: testh.b
+ Distribute results by H: b
+ -> Seq Scan on public.testh
+ Output: testh.b
+(20 rows)
+
+SELECT count(*) FROM testr WHERE NOT EXISTS (SELECT * FROM testh WHERE testr.b = testh.b);
+ count
+-------
+ 3000
+(1 row)
+
+
+ --
+ -- test that foreign key join estimation performs sanely for outer joins
+ --
+ begin;
+ create table fkest (a int, b int, c int unique, primary key(a,b));
+ create table fkest1 (a int, b int, primary key(a,b));
+ insert into fkest select x/10, x%10, x from generate_series(1,1000) x;
+ insert into fkest1 select x/10, x%10 from generate_series(1,1000) x;
+ alter table fkest1
+ add constraint fkest1_a_b_fkey foreign key (a,b) references fkest;
+ analyze fkest;
+ analyze fkest1;
+ explain (costs off)
+ select *
+ from fkest f
+ left join fkest1 f1 on f.a = f1.a and f.b = f1.b
+ left join fkest1 f2 on f.a = f2.a and f.b = f2.b
+ left join fkest1 f3 on f.a = f3.a and f.b = f3.b
+ where f.c = 1;
+ QUERY PLAN
+ ------------------------------------------------------------------
+ Nested Loop Left Join
+ -> Nested Loop Left Join
+ -> Nested Loop Left Join
+ -> Index Scan using fkest_c_key on fkest f
+ Index Cond: (c = 1)
+ -> Index Only Scan using fkest1_pkey on fkest1 f1
+ Index Cond: ((a = f.a) AND (b = f.b))
+ -> Index Only Scan using fkest1_pkey on fkest1 f2
+ Index Cond: ((a = f.a) AND (b = f.b))
+ -> Index Only Scan using fkest1_pkey on fkest1 f3
+ Index Cond: ((a = f.a) AND (b = f.b))
+ (11 rows)
+
+ rollback;
--
-- test planner's ability to mark joins as unique
--
delete from xx1 using (select * from int4_tbl where f1 = xx1.x1) ss;
delete from xx1 using lateral (select * from int4_tbl where f1 = x1) ss;
+-- demonstrate problem with extrememly slow join
+CREATE TABLE testr (a int, b int) DISTRIBUTE BY REPLICATION;
+INSERT INTO testr SELECT generate_series(1, 10000), generate_series(5001, 15000);
+CREATE TABLE testh (a int, b int);
+INSERT INTO testh SELECT generate_series(1, 10000), generate_series(8001, 18000);
+set enable_mergejoin TO false;
+set enable_hashjoin TO false;
+EXPLAIN (VERBOSE, COSTS OFF) SELECT count(*) FROM testr WHERE NOT EXISTS (SELECT * FROM testh WHERE testr.b = testh.b);
+SELECT count(*) FROM testr WHERE NOT EXISTS (SELECT * FROM testh WHERE testr.b = testh.b);
+
+ --
+ -- test that foreign key join estimation performs sanely for outer joins
+ --
+
+ begin;
+
+ create table fkest (a int, b int, c int unique, primary key(a,b));
+ create table fkest1 (a int, b int, primary key(a,b));
+
+ insert into fkest select x/10, x%10, x from generate_series(1,1000) x;
+ insert into fkest1 select x/10, x%10 from generate_series(1,1000) x;
+
+ alter table fkest1
+ add constraint fkest1_a_b_fkey foreign key (a,b) references fkest;
+
+ analyze fkest;
+ analyze fkest1;
+
+ explain (costs off)
+ select *
+ from fkest f
+ left join fkest1 f1 on f.a = f1.a and f.b = f1.b
+ left join fkest1 f2 on f.a = f2.a and f.b = f2.b
+ left join fkest1 f3 on f.a = f3.a and f.b = f3.b
+ where f.c = 1;
+
+ rollback;
+
--
-- test planner's ability to mark joins as unique
--