@@ -31,9 +31,6 @@ Transaction* transaction_hash[MAX_TRANSACTIONS];
3131// reserve something in (next, x) range otherwise, moving 'next' after 'x'.
3232xid_t prev_gxid , next_gxid ;
3333
34- xid_t threshold_gxid ; // when to start worrying about starting a new term
35- xid_t last_gxid ; // the greatest gxid we can provide on BEGIN or RESERVE
36-
3734xid_t global_xmin = INVALID_XID ;
3835
3936static Transaction * find_transaction (xid_t xid ) {
@@ -262,14 +259,41 @@ static void onhello(client_t client, int argc, xid_t *argv) {
262259 }
263260}
264261
262+ // the greatest gxid we can provide on BEGIN or RESERVE
263+ static xid_t last_xid_in_term () {
264+ return raft .term * XIDS_PER_TERM - 1 ;
265+ }
266+
267+ static xid_t first_xid_in_term () {
268+ return (raft .term - 1 ) * XIDS_PER_TERM ;
269+ }
270+
271+ static int xid2term (xid_t xid ) {
272+ int term = xid / XIDS_PER_TERM + 1 ;
273+ return term ;
274+ }
275+
276+ // when to start worrying about starting a new term
277+ static xid_t get_threshold_xid () {
278+ return last_xid_in_term () - NEW_TERM_THRESHOLD ;
279+ }
280+
281+ static bool xid_is_safe (xid_t xid ) {
282+ return xid <= last_xid_in_term ();
283+ }
284+
285+ static bool xid_is_disturbing (xid_t xid ) {
286+ return inrange (next_gxid + 1 , get_threshold_xid (), xid );
287+ }
288+
265289static void set_next_gxid (xid_t value ) {
266290 assert (next_gxid < value ); // The value should only grow.
267291
268292 if (use_raft && raft .role == ROLE_LEADER ) {
269- assert (value <= last_gxid );
270- if (inrange ( next_gxid + 1 , threshold_gxid , value )) {
293+ assert (xid_is_safe ( value ) );
294+ if (xid_is_disturbing ( value )) {
271295 // Time to worry has come.
272- raft_start_next_term (& raft );
296+ raft_ensure_term (& raft , xid2term ( value ) );
273297 } else {
274298 // It is either too early to worry,
275299 // or we have already increased the term.
@@ -293,6 +317,15 @@ static void set_next_gxid(xid_t value) {
293317 next_gxid = value ;
294318}
295319
320+ static bool use_xid (xid_t xid ) {
321+ if (!xid_is_safe (xid )) {
322+ return false;
323+ }
324+ shout ("setting next_gxid to %u\n" , xid + 1 );
325+ set_next_gxid (xid + 1 );
326+ return true;
327+ }
328+
296329static void onreserve (client_t client , int argc , xid_t * argv ) {
297330 CHECK (argc == 3 , client , "RESERVE: wrong number of arguments" );
298331
@@ -317,11 +350,10 @@ static void onreserve(client_t client, int argc, xid_t *argv) {
317350 minxid = max_of_xids (minxid , next_gxid );
318351 maxxid = max_of_xids (maxxid , minxid + minsize - 1 );
319352 CHECK (
320- maxxid <= last_gxid ,
353+ use_xid ( maxxid ) ,
321354 client ,
322355 "not enough xids left in this term"
323356 );
324- set_next_gxid (maxxid + 1 );
325357 }
326358 debug (
327359 "[%d] RESERVE: allocating range %u-%u\n" ,
@@ -371,13 +403,13 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
371403 transaction_clear (t );
372404 l2_list_link (& active_transactions , & t -> elem );
373405
406+ t -> xid = next_gxid ;
374407 CHECK (
375- next_gxid <= last_gxid ,
408+ use_xid ( next_gxid ) ,
376409 client ,
377410 "not enought xids left in this term"
378411 );
379- prev_gxid = t -> xid = next_gxid ;
380- set_next_gxid (next_gxid + 1 );
412+ prev_gxid = t -> xid ;
381413 t -> snapshots_count = 0 ;
382414 t -> size = 1 ;
383415
@@ -865,9 +897,16 @@ int main(int argc, char **argv) {
865897
866898 next_gxid = MIN_XID ;
867899 clg = clog_open (datadir );
868- set_next_gxid (clog_find_last_used (clg ) + 1 );
900+
901+ xid_t last_used_xid = clog_find_last_used (clg );
902+ shout ("will use %u\n" , last_used_xid );
903+ if (!use_xid (last_used_xid )) {
904+ shout ("could not set last used xid to %u\n" , last_used_xid );
905+ return EXIT_FAILURE ;
906+ }
907+ raft .term = xid2term (next_gxid );
908+
869909 prev_gxid = next_gxid - 1 ;
870- last_gxid = INVALID_XID ;
871910 debug ("initial next_gxid = %u\n" , next_gxid );
872911 if (!clg ) {
873912 shout ("could not open clog at '%s'\n" , datadir );
@@ -906,6 +945,7 @@ int main(int argc, char **argv) {
906945
907946 mstimer_t t ;
908947 mstimer_reset (& t );
948+ int old_term = 0 ;
909949 while (true) {
910950 int ms = mstimer_reset (& t );
911951 raft_msg_t * m = NULL ;
@@ -933,19 +973,16 @@ int main(int argc, char **argv) {
933973 server_set_enabled (server , raft .role == ROLE_LEADER );
934974
935975 // Update the gxid limits based on current term and leadership.
936- xid_t recent_last_gxid = raft .term * XIDS_PER_TERM ;
937- if (last_gxid < recent_last_gxid ) {
938- shout ("updating last_gxid from %u to %u\n" , last_gxid , recent_last_gxid );
939- last_gxid = recent_last_gxid ;
940- threshold_gxid = last_gxid - NEW_TERM_THRESHOLD ;
976+ if (old_term < raft .term ) {
941977 if (raft .role == ROLE_FOLLOWER ) {
942978 // If we become a leader, we will use
943979 // the range of xids after the current
944980 // last_gxid.
945- prev_gxid = last_gxid ;
946- next_gxid = prev_gxid + 1 ;
981+ prev_gxid = last_xid_in_term () ;
982+ set_next_gxid ( prev_gxid + 1 ) ;
947983 shout ("updated range to %u-%u\n" , prev_gxid , next_gxid );
948984 }
985+ old_term = raft .term ;
949986 }
950987 } else {
951988 server_set_enabled (server , true);
0 commit comments