@@ -108,6 +108,7 @@ PG_FUNCTION_INFO_V1(mtm_recover_node);
108108PG_FUNCTION_INFO_V1 (mtm_get_snapshot );
109109PG_FUNCTION_INFO_V1 (mtm_get_nodes_state );
110110PG_FUNCTION_INFO_V1 (mtm_get_cluster_state );
111+ PG_FUNCTION_INFO_V1 (mtm_get_cluster_info );
111112PG_FUNCTION_INFO_V1 (mtm_make_table_local );
112113PG_FUNCTION_INFO_V1 (mtm_dump_lock_graph );
113114
@@ -1609,7 +1610,7 @@ _PG_init(void)
16091610 "Minamal amount of time (milliseconds) to wait 2PC confirmation from all nodes" ,
16101611 "Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)" ,
16111612 & Mtm2PCMinTimeout ,
1612- 10000 ,
1613+ 100000 , /* 100 seconds */
16131614 0 ,
16141615 INT_MAX ,
16151616 PGC_BACKEND ,
@@ -1624,7 +1625,7 @@ _PG_init(void)
16241625 "Percent of prepare time for maximal time of second phase of two-pahse commit" ,
16251626 "Timeout for 2PC is calculated as MAX(prepare_time*2pc_prepare_ratio/100,2pc_min_timeout)" ,
16261627 & Mtm2PCPrepareRatio ,
1627- 100 ,
1628+ 1000 , /* 10 times */
16281629 0 ,
16291630 INT_MAX ,
16301631 PGC_BACKEND ,
@@ -2178,10 +2179,9 @@ mtm_get_snapshot(PG_FUNCTION_ARGS)
21782179typedef struct
21792180{
21802181 int nodeId ;
2181- char * connStrPtr ;
21822182 TupleDesc desc ;
2183- Datum values [8 ];
2184- bool nulls [8 ];
2183+ Datum values [Natts_mtm_nodes_state ];
2184+ bool nulls [Natts_mtm_nodes_state ];
21852185} MtmGetNodeStateCtx ;
21862186
21872187Datum
@@ -2190,7 +2190,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
21902190 FuncCallContext * funcctx ;
21912191 MtmGetNodeStateCtx * usrfctx ;
21922192 MemoryContext oldcontext ;
2193- char * p ;
21942193 int64 lag ;
21952194 bool is_first_call = SRF_IS_FIRSTCALL ();
21962195
@@ -2200,7 +2199,6 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22002199 usrfctx = (MtmGetNodeStateCtx * )palloc (sizeof (MtmGetNodeStateCtx ));
22012200 get_call_result_type (fcinfo , NULL , & usrfctx -> desc );
22022201 usrfctx -> nodeId = 1 ;
2203- usrfctx -> connStrPtr = pstrdup (MtmConnStrs );
22042202 memset (usrfctx -> nulls , false, sizeof (usrfctx -> nulls ));
22052203 funcctx -> user_fctx = usrfctx ;
22062204 MemoryContextSwitchTo (oldcontext );
@@ -2219,23 +2217,19 @@ mtm_get_nodes_state(PG_FUNCTION_ARGS)
22192217 usrfctx -> nulls [4 ] = lag < 0 ;
22202218 usrfctx -> values [5 ] = Int64GetDatum (Mtm -> transCount ? Mtm -> nodes [usrfctx -> nodeId - 1 ].transDelay /Mtm -> transCount : 0 );
22212219 usrfctx -> values [6 ] = TimestampTzGetDatum (time_t_to_timestamptz (Mtm -> nodes [usrfctx -> nodeId - 1 ].lastStatusChangeTime ));
2222- p = strchr (usrfctx -> connStrPtr , ',' );
2223- if (p != NULL ) {
2224- * p ++ = '\0' ;
2225- }
2226- usrfctx -> values [7 ] = CStringGetTextDatum (usrfctx -> connStrPtr );
2227- usrfctx -> connStrPtr = p ;
2220+ usrfctx -> values [7 ] = CStringGetTextDatum (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
22282221 usrfctx -> nodeId += 1 ;
22292222
22302223 SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (heap_form_tuple (usrfctx -> desc , usrfctx -> values , usrfctx -> nulls )));
22312224}
22322225
2226+
22332227Datum
22342228mtm_get_cluster_state (PG_FUNCTION_ARGS )
22352229{
22362230 TupleDesc desc ;
2237- Datum values [10 ];
2238- bool nulls [10 ] = {false};
2231+ Datum values [Natts_mtm_cluster_state ];
2232+ bool nulls [Natts_mtm_cluster_state ] = {false};
22392233 get_call_result_type (fcinfo , NULL , & desc );
22402234
22412235 values [0 ] = CStringGetTextDatum (MtmNodeStatusMnem [Mtm -> status ]);
@@ -2244,16 +2238,73 @@ mtm_get_cluster_state(PG_FUNCTION_ARGS)
22442238 values [3 ] = Int64GetDatum (Mtm -> nodeLockerMask );
22452239 values [4 ] = Int32GetDatum (Mtm -> nNodes );
22462240 values [5 ] = Int32GetDatum ((int )Mtm -> pool .active );
2247- values [6 ] = Int64GetDatum ( BgwPoolGetQueueSize ( & Mtm -> pool ) );
2248- values [7 ] = Int64GetDatum (Mtm -> transCount );
2249- values [8 ] = Int64GetDatum (Mtm -> timeShift );
2250- values [9 ] = Int32GetDatum (Mtm -> recoverySlot );
2251- nulls [ 9 ] = Mtm -> recoverySlot == 0 ;
2241+ values [6 ] = Int32GetDatum (( int ) Mtm -> pool . pending );
2242+ values [7 ] = Int64GetDatum (BgwPoolGetQueueSize ( & Mtm -> pool ) );
2243+ values [8 ] = Int64GetDatum (Mtm -> transCount );
2244+ values [9 ] = Int64GetDatum (Mtm -> timeShift );
2245+ values [ 10 ] = Int32GetDatum ( Mtm -> recoverySlot ) ;
22522246
22532247 PG_RETURN_DATUM (HeapTupleGetDatum (heap_form_tuple (desc , values , nulls )));
22542248}
22552249
22562250
2251+ typedef struct
2252+ {
2253+ int nodeId ;
2254+ } MtmGetClusterInfoCtx ;
2255+
2256+
2257+ Datum
2258+ mtm_get_cluster_info (PG_FUNCTION_ARGS )
2259+ {
2260+
2261+ FuncCallContext * funcctx ;
2262+ MtmGetClusterInfoCtx * usrfctx ;
2263+ MemoryContext oldcontext ;
2264+ TupleDesc desc ;
2265+ bool is_first_call = SRF_IS_FIRSTCALL ();
2266+ int i ;
2267+ PGconn * conn ;
2268+ PGresult * result ;
2269+ char * values [Natts_mtm_cluster_state ];
2270+ HeapTuple tuple ;
2271+
2272+ if (is_first_call ) {
2273+ funcctx = SRF_FIRSTCALL_INIT ();
2274+ oldcontext = MemoryContextSwitchTo (funcctx -> multi_call_memory_ctx );
2275+ usrfctx = (MtmGetClusterInfoCtx * )palloc (sizeof (MtmGetNodeStateCtx ));
2276+ get_call_result_type (fcinfo , NULL , & desc );
2277+ funcctx -> attinmeta = TupleDescGetAttInMetadata (desc );
2278+ usrfctx -> nodeId = 1 ;
2279+ funcctx -> user_fctx = usrfctx ;
2280+ MemoryContextSwitchTo (oldcontext );
2281+ }
2282+ funcctx = SRF_PERCALL_SETUP ();
2283+ usrfctx = (MtmGetClusterInfoCtx * )funcctx -> user_fctx ;
2284+ if (usrfctx -> nodeId > MtmNodes ) {
2285+ SRF_RETURN_DONE (funcctx );
2286+ }
2287+ conn = PQconnectdb (Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr );
2288+ if (PQstatus (conn ) != CONNECTION_OK ) {
2289+ elog (ERROR , "Failed to establish connection '%s' to node %d" , Mtm -> nodes [usrfctx -> nodeId - 1 ].con .connStr , usrfctx -> nodeId );
2290+ }
2291+ result = PQexec (conn , "select * from mtm.get_cluster_state()" );
2292+
2293+ if (PQresultStatus (result ) != PGRES_TUPLES_OK || PQntuples (result ) != 1 ) {
2294+ elog (ERROR , "Failed to receive data from %d" , usrfctx -> nodeId );
2295+ }
2296+
2297+ for (i = 0 ; i < Natts_mtm_cluster_state ; i ++ ) {
2298+ values [i ] = PQgetvalue (result , 0 , i );
2299+ }
2300+ tuple = BuildTupleFromCStrings (funcctx -> attinmeta , values );
2301+ PQclear (result );
2302+ PQfinish (conn );
2303+ usrfctx -> nodeId += 1 ;
2304+ SRF_RETURN_NEXT (funcctx , HeapTupleGetDatum (tuple ));
2305+ }
2306+
2307+
22572308Datum mtm_make_table_local (PG_FUNCTION_ARGS )
22582309{
22592310 Oid reloid = PG_GETARG_OID (1 );
0 commit comments