@@ -30,6 +30,10 @@ Transaction* transaction_hash[MAX_TRANSACTIONS];
3030// We reserve the local xids if they fit between (prev, next) range, and
3131// reserve something in (next, x) range otherwise, moving 'next' after 'x'.
3232xid_t prev_gxid , next_gxid ;
33+
34+ xid_t threshold_gxid ; // when to start worrying about starting a new term
35+ xid_t last_gxid ; // the greatest gxid we can provide on BEGIN or RESERVE
36+
3337xid_t global_xmin = INVALID_XID ;
3438
3539static Transaction * find_transaction (xid_t xid ) {
@@ -41,7 +45,11 @@ static Transaction *find_transaction(xid_t xid) {
4145typedef struct client_userdata_t {
4246 int id ;
4347 int snapshots_sent ;
44- xid_t xid ;
48+
49+ // FIXME: use some meaningful words for these. E.g. "expectee" instead
50+ // of "xwait".
51+ Transaction * xpart ; // the transaction this client is participating in
52+ Transaction * xwait ; // the transaction this client is waiting for
4553} client_userdata_t ;
4654
4755clog_t clg ;
@@ -51,13 +59,15 @@ bool use_raft;
5159#define CLIENT_USERDATA (CLIENT ) ((client_userdata_t*)client_get_userdata(CLIENT))
5260#define CLIENT_ID (CLIENT ) (CLIENT_USERDATA(CLIENT)->id)
5361#define CLIENT_SNAPSENT (CLIENT ) (CLIENT_USERDATA(CLIENT)->snapshots_sent)
54- #define CLIENT_XID (CLIENT ) (CLIENT_USERDATA(CLIENT)->xid)
62+ #define CLIENT_XPART (CLIENT ) (CLIENT_USERDATA(CLIENT)->xpart)
63+ #define CLIENT_XWAIT (CLIENT ) (CLIENT_USERDATA(CLIENT)->xwait)
5564
5665static client_userdata_t * create_client_userdata (int id ) {
5766 client_userdata_t * cd = malloc (sizeof (client_userdata_t ));
5867 cd -> id = id ;
5968 cd -> snapshots_sent = 0 ;
60- cd -> xid = INVALID_XID ;
69+ cd -> xpart = NULL ;
70+ cd -> xwait = NULL ;
6171 return cd ;
6272}
6373
@@ -66,9 +76,10 @@ static void free_client_userdata(client_userdata_t *cd) {
6676}
6777
6878inline static void free_transaction (Transaction * t ) {
69- Transaction * * tpp ;
70- for (tpp = & transaction_hash [t -> xid % MAX_TRANSACTIONS ]; * tpp != t ; tpp = & (* tpp )-> collision );
71- * tpp = t -> collision ;
79+ assert (transaction_pop_listener (t , 's' ) == NULL );
80+ Transaction * * tpp ;
81+ for (tpp = & transaction_hash [t -> xid % MAX_TRANSACTIONS ]; * tpp != t ; tpp = & (* tpp )-> collision );
82+ * tpp = t -> collision ;
7283 l2_list_unlink (& t -> elem );
7384 t -> elem .next = free_transactions ;
7485 free_transactions = & t -> elem ;
@@ -84,7 +95,7 @@ static void notify_listeners(Transaction *t, int status) {
8495 case BLANK :
8596 while ((listener = transaction_pop_listener (t , 's' ))) {
8697 debug ("[%d] notifying the client about xid=%u (unknown)\n" , CLIENT_ID (listener ), t -> xid );
87- shout ( "%p DEREF(notify): %d\n" , listener , client_deref ( listener )) ;
98+ CLIENT_XWAIT ( listener ) = NULL ;
8899 client_message_shortcut (
89100 (client_t )listener ,
90101 RES_TRANSACTION_UNKNOWN
@@ -94,7 +105,7 @@ static void notify_listeners(Transaction *t, int status) {
94105 case NEGATIVE :
95106 while ((listener = transaction_pop_listener (t , 's' ))) {
96107 debug ("[%d] notifying the client about xid=%u (aborted)\n" , CLIENT_ID (listener ), t -> xid );
97- shout ( "%p DEREF(notify): %d\n" , listener , client_deref ( listener )) ;
108+ CLIENT_XWAIT ( listener ) = NULL ;
98109 client_message_shortcut (
99110 (client_t )listener ,
100111 RES_TRANSACTION_ABORTED
@@ -104,7 +115,7 @@ static void notify_listeners(Transaction *t, int status) {
104115 case POSITIVE :
105116 while ((listener = transaction_pop_listener (t , 's' ))) {
106117 debug ("[%d] notifying the client about xid=%u (committed)\n" , CLIENT_ID (listener ), t -> xid );
107- shout ( "%p DEREF(notify): %d\n" , listener , client_deref ( listener )) ;
118+ CLIENT_XWAIT ( listener ) = NULL ;
108119 client_message_shortcut (
109120 (client_t )listener ,
110121 RES_TRANSACTION_COMMITTED
@@ -114,7 +125,7 @@ static void notify_listeners(Transaction *t, int status) {
114125 case DOUBT :
115126 while ((listener = transaction_pop_listener (t , 's' ))) {
116127 debug ("[%d] notifying the client about xid=%u (inprogress)\n" , CLIENT_ID (listener ), t -> xid );
117- shout ( "%p DEREF(notify): %d\n" , listener , client_deref ( listener )) ;
128+ CLIENT_XWAIT ( listener ) = NULL ;
118129 client_message_shortcut (
119130 (client_t )listener ,
120131 RES_TRANSACTION_INPROGRESS
@@ -124,6 +135,21 @@ static void notify_listeners(Transaction *t, int status) {
124135 }
125136}
126137
138+ static void set_next_gxid (xid_t value ) {
139+ assert (next_gxid < value );
140+ if (use_raft && raft .role == ROLE_LEADER ) {
141+ assert (value <= last_gxid );
142+ if (inrange (next_gxid + 1 , threshold_gxid , value )) {
143+ // Time to worry has come.
144+ raft .term ++ ;
145+ } else {
146+ // It is either too early to worry,
147+ // or we have already increased the term.
148+ }
149+ }
150+ next_gxid = value ;
151+ }
152+
127153static void apply_clog_update (int action , int argument ) {
128154 int status = action ;
129155 xid_t xid = argument ;
@@ -154,28 +180,20 @@ static void onconnect(client_t client) {
154180}
155181
156182static void ondisconnect (client_t client ) {
157- debug ("[%d] disconnected\n" , CLIENT_ID (client ));
158-
159- if (CLIENT_XID (client ) != INVALID_XID ) {
160- Transaction * t = find_transaction (CLIENT_XID (client ));
161- if (t != NULL ) {
162- if (transaction_remove_listener (t , 's' , client )) {
163- shout ("%p DEREF(disconn): %d\n" , client , client_deref (client ));
164- } else {
165- shout ("%p DEREF(disconn): not found\n" , client );
166- }
167-
168- if (use_raft && (raft .role == ROLE_LEADER )) {
169- raft_emit (& raft , NEGATIVE , t -> xid );
170- }
171- } else {
172- shout (
173- "[%d] DISCONNECT: transaction xid=%u not found O_o\n" ,
174- CLIENT_ID (client ), CLIENT_XID (client )
175- );
183+ Transaction * t ;
184+ debug ("[%d, %p] disconnected\n" , CLIENT_ID (client ), client );
185+
186+ if ((t = CLIENT_XPART (client ))) {
187+ transaction_remove_listener (t , 's' , client );
188+ if (use_raft && (raft .role == ROLE_LEADER )) {
189+ raft_emit (& raft , NEGATIVE , t -> xid );
176190 }
177191 }
178192
193+ if ((t = CLIENT_XWAIT (client ))) {
194+ transaction_remove_listener (t , 's' , client );
195+ }
196+
179197 free_client_userdata (CLIENT_USERDATA (client ));
180198 client_set_userdata (client , NULL );
181199}
@@ -274,15 +292,20 @@ static void onreserve(client_t client, int argc, xid_t *argv) {
274292
275293 if ((prev_gxid >= minxid ) || (maxxid >= next_gxid )) {
276294 debug (
277- "[%d] RESERVE: local range %u-%u is not between global range %u-%u\n" ,
295+ "[%d] RESERVE: local range %u-%u is not inside global range %u-%u\n" ,
278296 CLIENT_ID (client ),
279297 minxid , maxxid ,
280298 prev_gxid , next_gxid
281299 );
282300
283301 minxid = max_of_xids (minxid , next_gxid );
284302 maxxid = max_of_xids (maxxid , minxid + minsize - 1 );
285- next_gxid = maxxid + 1 ;
303+ CHECK (
304+ maxxid <= last_gxid ,
305+ client ,
306+ "not enough xids left in this term"
307+ );
308+ set_next_gxid (maxxid + 1 );
286309 }
287310 debug (
288311 "[%d] RESERVE: allocating range %u-%u\n" ,
@@ -318,7 +341,7 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
318341 );
319342
320343 CHECK (
321- CLIENT_XID (client ) == INVALID_XID ,
344+ CLIENT_XPART (client ) == NULL ,
322345 client ,
323346 "BEGIN: already participating in another transaction"
324347 );
@@ -332,15 +355,21 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
332355 transaction_clear (t );
333356 l2_list_link (& active_transactions , & t -> elem );
334357
335- prev_gxid = t -> xid = next_gxid ++ ;
358+ CHECK (
359+ next_gxid <= last_gxid ,
360+ client ,
361+ "not enought xids left in this term"
362+ );
363+ set_next_gxid (next_gxid + 1 );
364+ prev_gxid = t -> xid = next_gxid ;
336365 t -> snapshots_count = 0 ;
337366 t -> size = 1 ;
338367
339- t -> collision = transaction_hash [t -> xid % MAX_TRANSACTIONS ];
340- transaction_hash [t -> xid % MAX_TRANSACTIONS ] = t ;
368+ t -> collision = transaction_hash [t -> xid % MAX_TRANSACTIONS ];
369+ transaction_hash [t -> xid % MAX_TRANSACTIONS ] = t ;
341370
342371 CLIENT_SNAPSENT (client ) = 0 ;
343- CLIENT_XID (client ) = t -> xid ;
372+ CLIENT_XPART (client ) = t ;
344373
345374 if (!clog_write (clg , t -> xid , DOUBT )) {
346375 shout (
@@ -390,8 +419,8 @@ static bool queue_for_transaction_finish(client_t client, xid_t xid, char cmd) {
390419 // CLIENT_XID(client) and 'xid', i.e. we are able to tell which
391420 // transaction waits which transaction.
392421
422+ CLIENT_XWAIT (client ) = t ;
393423 transaction_push_listener (t , cmd , client );
394- shout ("%p REF: %d\n" , client , client_ref (client ));
395424 return true;
396425}
397426
@@ -403,7 +432,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
403432 bool wait = argv [2 ];
404433
405434 CHECK (
406- CLIENT_XID (client ) == xid ,
435+ CLIENT_XPART (client ) && ( CLIENT_XPART ( client ) -> xid == xid ) ,
407436 client ,
408437 "VOTE: voting for a transaction not participated in"
409438 );
@@ -427,7 +456,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
427456 }
428457 assert (t -> votes_for + t -> votes_against <= t -> size );
429458
430- CLIENT_XID (client ) = INVALID_XID ; // not participating any more
459+ CLIENT_XPART (client ) = NULL ; // not participating any more
431460
432461 int s = transaction_status (t );
433462 switch (s ) {
@@ -485,14 +514,14 @@ static void onsnapshot(client_t client, int argc, xid_t *argv) {
485514 return ;
486515 }
487516
488- if (CLIENT_XID (client ) == INVALID_XID ) {
517+ if (CLIENT_XPART (client ) == NULL ) {
489518 CLIENT_SNAPSENT (client ) = 0 ;
490- CLIENT_XID (client ) = t -> xid ;
519+ CLIENT_XPART (client ) = t ;
491520 t -> size += 1 ;
492521 }
493522
494523 CHECK (
495- CLIENT_XID (client ) == t -> xid ,
524+ CLIENT_XPART (client ) && ( CLIENT_XPART ( client ) -> xid == xid ) ,
496525 client ,
497526 "SNAPSHOT: getting snapshot for a transaction not participated in"
498527 );
@@ -839,6 +868,7 @@ int main(int argc, char **argv) {
839868
840869 prev_gxid = MIN_XID ;
841870 next_gxid = MIN_XID ;
871+ last_gxid = INVALID_XID ;
842872
843873 int raftsock = raft_create_udp_socket (& raft );
844874 if (raftsock == -1 ) {
@@ -856,8 +886,6 @@ int main(int argc, char **argv) {
856886 return EXIT_FAILURE ;
857887 }
858888
859- srand (getpid ());
860-
861889 mstimer_t t ;
862890 mstimer_reset (& t );
863891 while (true) {
@@ -874,11 +902,6 @@ int main(int argc, char **argv) {
874902 assert (m ); // m should not be NULL, because the message should be ready to recv
875903 }
876904
877- if (rand () % 10000 == 0 ) {
878- shout ("sleeping to test raft features\n" );
879- sleep (1 );
880- }
881-
882905 if (use_raft ) {
883906 int applied = raft_apply (& raft , apply_clog_update );
884907 if (applied ) {
@@ -890,6 +913,22 @@ int main(int argc, char **argv) {
890913 }
891914
892915 server_set_enabled (server , raft .role == ROLE_LEADER );
916+
917+ // Update the gxid limits based on current term and leadership.
918+ xid_t recent_last_gxid = raft .term * XIDS_PER_TERM ;
919+ if (last_gxid < recent_last_gxid ) {
920+ shout ("updating last_gxid from %u to %u\n" , last_gxid , recent_last_gxid );
921+ last_gxid = recent_last_gxid ;
922+ threshold_gxid = last_gxid - NEW_TERM_THRESHOLD ;
923+ if (raft .role == ROLE_FOLLOWER ) {
924+ // If we become a leader, we will use
925+ // the range of xids after the current
926+ // last_gxid.
927+ prev_gxid = last_gxid ;
928+ next_gxid = prev_gxid + 1 ;
929+ shout ("updated range to %u-%u\n" , prev_gxid , next_gxid );
930+ }
931+ }
893932 } else {
894933 server_set_enabled (server , true);
895934 }
0 commit comments