@@ -82,6 +82,7 @@ static void notify_listeners(Transaction *t, int status) {
8282 // notify 'status' listeners about the transaction status
8383 case BLANK :
8484 while ((listener = transaction_pop_listener (t , 's' ))) {
85+ debug ("[%d] notifying the client about xid=%u (unknown)\n" , CLIENT_ID (listener ), t -> xid );
8586 client_message_shortcut (
8687 (client_t )listener ,
8788 RES_TRANSACTION_UNKNOWN
@@ -90,7 +91,7 @@ static void notify_listeners(Transaction *t, int status) {
9091 break ;
9192 case NEGATIVE :
9293 while ((listener = transaction_pop_listener (t , 's' ))) {
93- // notify 'status' listeners about the transaction status
94+ debug ( "[%d] notifying the client about xid=%u (aborted)\n" , CLIENT_ID ( listener ), t -> xid );
9495 client_message_shortcut (
9596 (client_t )listener ,
9697 RES_TRANSACTION_ABORTED
@@ -99,7 +100,7 @@ static void notify_listeners(Transaction *t, int status) {
99100 break ;
100101 case POSITIVE :
101102 while ((listener = transaction_pop_listener (t , 's' ))) {
102- // notify 'status' listeners about the transaction status
103+ debug ( "[%d] notifying the client about xid=%u (committed)\n" , CLIENT_ID ( listener ), t -> xid );
103104 client_message_shortcut (
104105 (client_t )listener ,
105106 RES_TRANSACTION_COMMITTED
@@ -108,7 +109,7 @@ static void notify_listeners(Transaction *t, int status) {
108109 break ;
109110 case DOUBT :
110111 while ((listener = transaction_pop_listener (t , 's' ))) {
111- // notify 'status' listeners about the transaction status
112+ debug ( "[%d] notifying the client about xid=%u (inprogress)\n" , CLIENT_ID ( listener ), t -> xid );
112113 client_message_shortcut (
113114 (client_t )listener ,
114115 RES_TRANSACTION_INPROGRESS
@@ -122,19 +123,22 @@ static void apply_clog_update(int action, int argument) {
122123 int status = action ;
123124 xid_t xid = argument ;
124125 assert ((status == NEGATIVE ) || (status == POSITIVE ));
126+ debug ("APPLYING: xid=%u, status=%d\n" , xid , status );
125127
126128 if (!clog_write (clg , xid , status )) {
127- shout ("APPLY: failed to write to clog, xid=%d \n" , xid );
129+ shout ("APPLY: failed to write to clog, xid=%u \n" , xid );
128130 }
129131
130- Transaction * t = find_transaction (xid );
131- if (t == NULL ) {
132- debug ("APPLY: xid %u is not active\n" , xid );
133- return ;
134- }
132+ if (!use_raft || (raft .role == ROLE_LEADER )) {
133+ Transaction * t = find_transaction (xid );
134+ if (t == NULL ) {
135+ debug ("APPLY: xid=%u is not active\n" , xid );
136+ return ;
137+ }
135138
136- notify_listeners (t , status );
137- free_transaction (t );
139+ notify_listeners (t , status );
140+ free_transaction (t );
141+ }
138142}
139143
140144static int next_client_id = 0 ;
@@ -153,14 +157,16 @@ static void ondisconnect(client_t client) {
153157 // need to abort the transaction this client is participating in
154158 for (t = (Transaction * )active_transactions .next ; t != (Transaction * )& active_transactions ; t = (Transaction * )t -> elem .next ) {
155159 if (t -> xid == CLIENT_XID (client )) {
156- raft_emit (& raft , NEGATIVE , t -> xid );
160+ if (use_raft && (raft .role == ROLE_LEADER )) {
161+ raft_emit (& raft , NEGATIVE , t -> xid );
162+ }
157163 break ;
158164 }
159165 }
160166
161167 if (t == (Transaction * )& active_transactions ) {
162168 shout (
163- "[%d] DISCONNECT: transaction %u not found O_o\n" ,
169+ "[%d] DISCONNECT: transaction xid= %u not found O_o\n" ,
164170 CLIENT_ID (client ), CLIENT_XID (client )
165171 );
166172 }
@@ -175,6 +181,7 @@ static void debug_cmd(client_t client, int argc, xid_t *argv) {
175181 char * cmdname ;
176182 assert (argc > 0 );
177183 switch (argv [0 ]) {
184+ case CMD_HELLO : cmdname = "HELLO" ; break ;
178185 case CMD_RESERVE : cmdname = "RESERVE" ; break ;
179186 case CMD_BEGIN : cmdname = "BEGIN" ; break ;
180187 case CMD_FOR : cmdname = "FOR" ; break ;
@@ -203,6 +210,9 @@ static void debug_cmd(client_t client, int argc, xid_t *argv) {
203210 } \
204211 } while (0)
205212
213+ #define CHECKLEADER (CLIENT ) \
214+ CHECK(raft.role == ROLE_LEADER, CLIENT, "not a leader")
215+
206216static xid_t max_of_xids (xid_t a , xid_t b ) {
207217 return a > b ? a : b ;
208218}
@@ -232,6 +242,17 @@ static void gen_snapshot(Snapshot *s) {
232242 }
233243}
234244
245+ static void onhello (client_t client , int argc , xid_t * argv ) {
246+ CHECK (argc == 1 , client , "HELLO: wrong number of arguments" );
247+
248+ debug ("[%d] HELLO\n" , CLIENT_ID (client ));
249+ if (raft .role == ROLE_LEADER ) {
250+ client_message_shortcut (client , RES_OK );
251+ } else {
252+ client_message_shortcut (client , RES_FAILED );
253+ }
254+ }
255+
235256static void onreserve (client_t client , int argc , xid_t * argv ) {
236257 CHECK (argc == 3 , client , "RESERVE: wrong number of arguments" );
237258
@@ -344,10 +365,12 @@ static void onbegin(client_t client, int argc, xid_t *argv) {
344365static bool queue_for_transaction_finish (client_t client , xid_t xid , char cmd ) {
345366 assert ((cmd >= 'a' ) && (cmd <= 'z' ));
346367
368+ debug ("[%d] QUEUE for xid=%u status\n" , CLIENT_ID (client ), xid );
369+
347370 Transaction * t = find_transaction (xid );
348371 if (t == NULL ) {
349372 shout (
350- "[%d] QUEUE: xid %u not found\n" ,
373+ "[%d] QUEUE: xid= %u not found\n" ,
351374 CLIENT_ID (client ), xid
352375 );
353376 client_message_shortcut (client , RES_FAILED );
@@ -378,7 +401,7 @@ static void onvote(client_t client, int argc, xid_t *argv, int vote) {
378401 Transaction * t = find_transaction (xid );
379402 if (t == NULL ) {
380403 shout (
381- "[%d] VOTE: xid %u not found\n" ,
404+ "[%d] VOTE: xid= %u not found\n" ,
382405 CLIENT_ID (client ), xid
383406 );
384407 client_message_shortcut (client , RES_FAILED );
@@ -445,7 +468,7 @@ static void onsnapshot(client_t client, int argc, xid_t *argv) {
445468 Transaction * t = find_transaction (xid );
446469 if (t == NULL ) {
447470 shout (
448- "[%d] SNAPSHOT: xid %u not found\n" ,
471+ "[%d] SNAPSHOT: xid= %u not found\n" ,
449472 CLIENT_ID (client ), xid
450473 );
451474 client_message_shortcut (client , RES_FAILED );
@@ -543,22 +566,31 @@ static void oncmd(client_t client, int argc, xid_t *argv) {
543566
544567 assert (argc > 0 );
545568 switch (argv [0 ]) {
569+ case CMD_HELLO :
570+ onhello (client , argc , argv );
571+ break ;
546572 case CMD_RESERVE :
573+ CHECKLEADER (client );
547574 onreserve (client , argc , argv );
548575 break ;
549576 case CMD_BEGIN :
577+ CHECKLEADER (client );
550578 onbegin (client , argc , argv );
551579 break ;
552580 case CMD_FOR :
581+ CHECKLEADER (client );
553582 onvote (client , argc , argv , POSITIVE );
554583 break ;
555584 case CMD_AGAINST :
585+ CHECKLEADER (client );
556586 onvote (client , argc , argv , NEGATIVE );
557587 break ;
558588 case CMD_SNAPSHOT :
589+ CHECKLEADER (client );
559590 onsnapshot (client , argc , argv );
560591 break ;
561592 case CMD_STATUS :
593+ CHECKLEADER (client );
562594 onstatus (client , argc , argv );
563595 break ;
564596 default :
0 commit comments