@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
108108PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
109109PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
110110PG_FUNCTION_INFO_V1 (mtm_get_cluster_state );
111+ PG_FUNCTION_INFO_V1 (mtm_get_cluster_info );
111112PG_FUNCTION_INFO_V1 (mtm_make_table_local );
112113PG_FUNCTION_INFO_V1 (mtm_dump_lock_graph );
113114
@@ -166,7 +167,8 @@ char const* const MtmNodeStatusMnem[] =
166167 "Connected" ,
167168 "Online" ,
168169 "Recovery" ,
169- "InMinor"
170+ "InMinor" ,
171+ "OutOfService"
170172};
171173
172174bool MtmDoReplication ;
@@ -1014,6 +1016,26 @@ void MtmAbortTransaction(MtmTransState* ts)
10141016 * -------------------------------------------
10151017 */
10161018
1019+ void MtmHandleApplyError (void )
1020+ {
1021+ ErrorData * edata = CopyErrorData ();
1022+ switch (edata -> sqlerrcode ) {
1023+ case ERRCODE_DISK_FULL :
1024+ case ERRCODE_INSUFFICIENT_RESOURCES :
1025+ case ERRCODE_IO_ERROR :
1026+ case ERRCODE_DATA_CORRUPTED :
1027+ case ERRCODE_INDEX_CORRUPTED :
1028+ case ERRCODE_SYSTEM_ERROR :
1029+ case ERRCODE_INTERNAL_ERROR :
1030+ case ERRCODE_OUT_OF_MEMORY :
1031+ elog (WARNING , "Node is excluded from cluster because of non-recoverable error %d" , edata -> sqlerrcode );
1032+ MtmSwitchClusterMode (MTM_OUT_OF_SERVICE );
1033+ kill (PostmasterPid , SIGQUIT );
1034+ break ;
1035+ }
1036+ }
1037+
1038+
10171039void MtmRecoveryCompleted (void )
10181040{
10191041 MTM_LOG1 ("Recovery of node %d is completed" , MtmNodeId );
@@ -1609,7 +1631,7 @@ _PG_init(void)
16091631 "Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes" ,
16101632 "Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)" ,
16111633 & Mtm2PCMinTimeout ,
1612- 10000 ,
1634+ 100000 , /* 100 seconds */
16131635 0 ,
16141636 INT_MAX ,
16151637 PGC_BACKEND ,
@@ -1624,7 +1646,7 @@ _PG_init(void)
16241646 "Percent of prepare time for maximal time of second phase of two-pahse commit" ,
16251647 "Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)" ,
16261648 & Mtm2PCPrepareRatio ,
1627- 100 ,
1649+ 1000 , /* 10 times */
16281650 0 ,
16291651 INT_MAX ,
16301652 PGC_BACKEND ,
@@ -2178,10 +2200,9 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
21782200typedef struct
21792201{
21802202 int nodeId ;
2181- char * connStrPtr ;
21822203 TupleDesc desc ;
2183- Datum values [8 ];
2184- bool nulls [8 ];
2204+ Datum values [Natts_mtm_nodes_state ];
2205+ bool nulls [Natts_mtm_nodes_state ];
21852206} MtmGetNodeStateCtx ;
21862207
21872208Datum
@@ -2190,7 +2211,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
21902211 FuncCallContext * funcctx ;
21912212 MtmGetNodeStateCtx * usrfctx ;
21922213 MemoryContext oldcontext ;
2193- char * p ;
21942214 int64 lag ;
21952215 bool is_first_call = SRF_IS_FIRSTCALL ();
21962216
@@ -2200,7 +2220,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22002220 usrfctx = (MtmGetNodeStateCtx * )palloc (sizeof (MtmGetNodeStateCtx ));
22012221 get_call_result_type (fcinfo , NULL , & usrfctx -> desc );
22022222 usrfctx -> nodeId = 1 ;
2203- usrfctx -> connStrPtr = pstrdup (MtmConnStrs );
22042223 memset (usrfctx -> nulls , false, sizeof (usrfctx -> nulls ));
22052224 funcctx -> user_fctx = usrfctx ;
22062225 MemoryContextSwitchTo (oldcontext );
@@ -2219,23 +2238,19 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22192238 usrfctx -> nulls [4 ] = lag < 0 ;
22202239 usrfctx -> values [5 ] = Int64GetDatum (Mtm -> transCount ? Mtm -> nodes [usrfctx -> nodeId - 1 ].transDelay /Mtm -> transCount : 0 );
22212240 usrfctx -> values [6 ] = TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime ));
2222- p = strchr (usrfctx -> connStrPtr , ',' );
2223- if (p != NULL ) {
2224- * p ++ = '\0' ;
2225- }
2226- usrfctx -> values [7 ] = CStringGetTextDatum (usrfctx -> connStrPtr );
2227- usrfctx -> connStrPtr = p ;
2241+ usrfctx -> values [7 ] = CStringGetTextDatum (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
22282242 usrfctx -> nodeId += 1 ;
22292243
22302244 SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (heap_form_tuple (usrfctx -> desc , usrfctx -> values , usrfctx -> nulls )));
22312245}
22322246
2247+
22332248Datum
22342249mtm_get_cluster_state (PG_FUNCTION_ARGS )
22352250{
22362251 TupleDesc desc ;
2237- Datum values [10 ];
2238- bool nulls [10 ] = {false};
2252+ Datum values [Natts_mtm_cluster_state ];
2253+ bool nulls [Natts_mtm_cluster_state ] = {false};
22392254 get_call_result_type (fcinfo , NULL , & desc );
22402255
22412256 values [0 ] = CStringGetTextDatum (MtmNodeStatusMnem [Mtm -> status ]);
@@ -2244,16 +2259,73 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
22442259 values [3 ] = Int64GetDatum (Mtm -> nodeLockerMask );
22452260 values [4 ] = Int32GetDatum (Mtm -> nNodes );
22462261 values [5 ] = Int32GetDatum ((int )Mtm -> pool .active );
2247- values [6 ] = Int64GetDatum ( BgwPoolGetQueueSize ( & Mtm -> pool ) );
2248- values [7 ] = Int64GetDatum (Mtm -> transCount );
2249- values [8 ] = Int64GetDatum (Mtm -> timeShift );
2250- values [9 ] = Int32GetDatum (Mtm -> recoverySlot );
2251- nulls [ 9 ] = Mtm -> recoverySlot == 0 ;
2262+ values [6 ] = Int32GetDatum (( int ) Mtm -> pool . pending );
2263+ values [7 ] = Int64GetDatum (BgwPoolGetQueueSize ( & Mtm -> pool ) );
2264+ values [8 ] = Int64GetDatum (Mtm -> transCount );
2265+ values [9 ] = Int64GetDatum (Mtm -> timeShift );
2266+ values [ 10 ] = Int32GetDatum ( Mtm -> recoverySlot ) ;
22522267
22532268 PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc , values , nulls )));
22542269}
22552270
22562271
2272+ typedef struct
2273+ {
2274+ int nodeId ;
2275+ } MtmGetClusterInfoCtx ;
2276+
2277+
2278+ Datum
2279+ mtm_get_cluster_info (PG_FUNCTION_ARGS )
2280+ {
2281+
2282+ FuncCallContext * funcctx ;
2283+ MtmGetClusterInfoCtx * usrfctx ;
2284+ MemoryContext oldcontext ;
2285+ TupleDesc desc ;
2286+ bool is_first_call = SRF_IS_FIRSTCALL ();
2287+ int i ;
2288+ PGconn * conn ;
2289+ PGresult * result ;
2290+ char * values [Natts_mtm_cluster_state ];
2291+ HeapTuple tuple ;
2292+
2293+ if (is_first_call ) {
2294+ funcctx = SRF_FIRSTCALL_INIT ();
2295+ oldcontext = MemoryContextSwitchTo (funcctx -> multi_call_memory_ctx );
2296+ usrfctx = (MtmGetClusterInfoCtx * )palloc (sizeof (MtmGetNodeStateCtx ));
2297+ get_call_result_type (fcinfo , NULL , & desc );
2298+ funcctx -> attinmeta = TupleDescGetAttInMetadata (desc );
2299+ usrfctx -> nodeId = 1 ;
2300+ funcctx -> user_fctx = usrfctx ;
2301+ MemoryContextSwitchTo (oldcontext );
2302+ }
2303+ funcctx = SRF_PERCALL_SETUP ();
2304+ usrfctx = (MtmGetClusterInfoCtx * )funcctx -> user_fctx ;
2305+ if (usrfctx -> nodeId > MtmNodes ) {
2306+ SRF_RETURN_DONE (funcctx );
2307+ }
2308+ conn = PQconnectdb (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
2309+ if (PQstatus (conn ) != CONNECTION_OK ) {
2310+ elog (ERROR , "Failed to establish connection '%s' to node %d" , Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr , usrfctx -> nodeId );
2311+ }
2312+ result = PQexec (conn , "select * from mtm.get_cluster_state()" );
2313+
2314+ if (PQresultStatus (result ) != PGRES_TUPLES_OK || PQntuples (result ) != 1 ) {
2315+ elog (ERROR , "Failed to receive data from %d" , usrfctx -> nodeId );
2316+ }
2317+
2318+ for (i = 0 ; i < Natts_mtm_cluster_state ; i ++ ) {
2319+ values [i ] = PQgetvalue (result , 0 , i );
2320+ }
2321+ tuple = BuildTupleFromCStrings (funcctx -> attinmeta , values );
2322+ PQclear (result );
2323+ PQfinish (conn );
2324+ usrfctx -> nodeId += 1 ;
2325+ SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (tuple ));
2326+ }
2327+
2328+
22572329Datum mtm_make_table_local (PG_FUNCTION_ARGS )
22582330{
22592331 Oid reloid = PG_GETARG_OID (1 );
0 commit comments