@@ -168,9 +168,12 @@ static bool msg_size_is(raft_msg_t *m, int mlen) {
168168
169169static void raft_send (raft_t * r , int dst , void * m , int mlen ) {
170170 assert (msg_size_is ((raft_msg_t * )m , mlen ));
171+ assert (((raft_msg_t * )m )-> msgtype >= 0 );
172+ assert (((raft_msg_t * )m )-> msgtype < 4 );
171173 assert (dst >= 0 );
172174 assert (dst < r -> servernum );
173175 assert (dst != r -> me );
176+ assert (((raft_msg_t * )m )-> from == r -> me );
174177
175178 raft_server_t * server = r -> servers + dst ;
176179
@@ -209,7 +212,12 @@ static void raft_beat(raft_t *r, int dst) {
209212 m .msg .from = r -> me ;
210213
211214 if (s -> tosend < r -> log .first + r -> log .size ) {
212- // TODO: implement snapshot sending
215+ raft_entry_t * e = & RAFT_LOG (r , s -> tosend );
216+ if (e -> snapshot ) {
217+ // TODO: implement snapshot sending
218+ shout ("tosend = %d, first = %d, size = %d\n" , s -> tosend , r -> log .first , r -> log .size );
219+ assert (false); // snapshot sending not implemented
220+ }
213221
214222 // the follower is a bit behind: send an update
215223 m .previndex = s -> tosend - 1 ;
@@ -218,7 +226,7 @@ static void raft_beat(raft_t *r, int dst) {
218226 } else {
219227 m .prevterm = -1 ;
220228 }
221- m .entry = RAFT_LOG ( r , s -> tosend ) ;
229+ m .entry = * e ;
222230 m .empty = false;
223231 } else {
224232 // the follower is up to date: send a heartbeat
@@ -313,6 +321,7 @@ static int raft_log_compact(raft_log_t *l, int keep_applied) {
313321 snap .minarg = min (snap .minarg , e -> argument );
314322 snap .maxarg = max (snap .maxarg , e -> argument );
315323 }
324+ e -> snapshot = false; // FIXME: should not need this, find the code where it is not set on new entry insertion
316325 compacted ++ ;
317326 }
318327 if (compacted ) {
@@ -331,7 +340,7 @@ bool raft_emit(raft_t *r, int action, int argument) {
331340 if (r -> log .size == RAFT_LOGLEN ) {
332341 int compacted = raft_log_compact (& r -> log , RAFT_KEEP_APPLIED );
333342 if (compacted ) {
334- debug ("compacted %d entries\n" , compacted );
343+ shout ("compacted %d entries\n" , compacted );
335344 } else {
336345 shout (
337346 "cannot emit new entries, the log is"
@@ -355,7 +364,9 @@ bool raft_emit(raft_t *r, int action, int argument) {
355364}
356365
357366static bool log_append (raft_log_t * l , int previndex , int prevterm , raft_entry_t * e ) {
358- assert (!e -> snapshot );
367+ if (e -> snapshot ) {
368+ assert (false);
369+ }
359370 debug (
360371 "log_append(%p, previndex=%d, prevterm=%d,"
361372 " term=%d, action=%d, argument=%d)\n" ,
@@ -470,11 +481,12 @@ static void raft_handle_update(raft_t *r, raft_msg_update_t *m) {
470481
471482static void raft_refresh_acked (raft_t * r ) {
472483 // pick each server's acked and see if it is acked on the majority
484+ // TODO: count 'acked' inside the entry itself to remove the nested loop here
473485 int i , j ;
474486 for (i = 0 ; i < r -> servernum ; i ++ ) {
475487 if (i == r -> me ) continue ;
476488 int newacked = r -> servers [i ].acked ;
477- if (newacked < r -> log .acked ) continue ;
489+ if (newacked <= r -> log .acked ) continue ;
478490
479491 int replication = 1 ; // count self as yes
480492 for (j = 0 ; j < r -> servernum ; j ++ ) {
@@ -486,7 +498,12 @@ static void raft_refresh_acked(raft_t *r) {
486498 }
487499 }
488500
501+ assert (replication <= r -> servernum );
502+
489503 if (replication * 2 > r -> servernum ) {
504+ #ifdef MAJORITY_IS_NOT_ENOUGH
505+ if (replication < r -> servernum ) continue ;
506+ #endif
490507 r -> log .acked = newacked ;
491508 }
492509 }
@@ -524,6 +541,7 @@ static void raft_handle_done(raft_t *r, raft_msg_done_t *m) {
524541 // the client should have specified the last index it had gotten
525542 server -> tosend = m -> index + 1 ;
526543 }
544+ assert (server -> tosend >= server -> acked ); // FIXME: remove this, because 'tosend' is actually allowed to be less than 'acked' if the follower has restarted
527545 }
528546
529547 if (server -> tosend < r -> log .first + r -> log .size ) {
@@ -609,6 +627,8 @@ void raft_handle_message(raft_t *r, raft_msg_t *m) {
609627 r -> role = ROLE_FOLLOWER ;
610628 }
611629
630+ assert (m -> msgtype >= 0 );
631+ assert (m -> msgtype < 4 );
612632 switch (m -> msgtype ) {
613633 case RAFT_MSG_UPDATE :
614634 raft_handle_update (r , (raft_msg_update_t * )m );
0 commit comments