@@ -718,6 +718,33 @@ static bool raft_restore(raft_t r, int previndex, raft_entry_t *e) {
718718 return true;
719719}
720720
721+ static bool raft_appendable (raft_t r , int previndex , int prevterm ) {
722+ int low , high ;
723+
724+ raft_log_t * l = & r -> log ;
725+
726+ low = RAFT_LOG_FIRST_INDEX (r );
727+ if (low == 0 ) low = -1 ; // allow appending at the start
728+ high = RAFT_LOG_LAST_INDEX (r );
729+
730+ if (!inrange (low , previndex , high ))
731+ {
732+ debug (
733+ "previndex %d is outside log range %d-%d\n" ,
734+ previndex , low , high
735+ );
736+ return false;
737+ }
738+
739+ if (previndex != -1 ) {
740+ raft_entry_t * pe = & RAFT_LOG (r , previndex );
741+ if (pe -> term != prevterm ) {
742+ debug ("log term %d != prevterm %d\n" , pe -> term , prevterm );
743+ return false;
744+ }
745+ }
746+ }
747+
721748static bool raft_append (raft_t r , int previndex , int prevterm , raft_entry_t * e ) {
722749 assert (e -> bytes == e -> update .len );
723750 assert (!e -> snapshot );
@@ -730,16 +757,9 @@ static bool raft_append(raft_t r, int previndex, int prevterm, raft_entry_t *e)
730757 l , previndex , prevterm ,
731758 e -> term
732759 );
733- if (previndex != -1 ) {
734- if (previndex < l -> first ) {
735- debug ("previndex < first\n" );
736- return false;
737- }
738- }
739- if (previndex > RAFT_LOG_LAST_INDEX (r )) {
740- debug ("previndex(%d) > last(%d)\n" , previndex , RAFT_LOG_LAST_INDEX (r ));
741- return false;
742- }
760+
761+ if (!raft_appendable (r , previndex , prevterm )) return false;
762+
743763 if (previndex == RAFT_LOG_LAST_INDEX (r )) {
744764 debug ("previndex == last\n" );
745765 // appending to the end
@@ -755,14 +775,6 @@ static bool raft_append(raft_t r, int previndex, int prevterm, raft_entry_t *e)
755775 }
756776 }
757777
758- if (previndex != -1 ) {
759- raft_entry_t * pe = & RAFT_LOG (r , previndex );
760- if (pe -> term != prevterm ) {
761- debug ("log term %d != prevterm %d\n" , pe -> term , prevterm );
762- return false;
763- }
764- }
765-
766778 int index = previndex + 1 ;
767779 raft_entry_t * slot = & RAFT_LOG (r , index );
768780
@@ -797,14 +809,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
797809 raft_entry_t * e = & r -> log .newentry ;
798810 raft_update_t * u = & e -> update ;
799811
800- reply .progress .entries = RAFT_LOG_LAST_INDEX (r ) + 1 ;
801- reply .progress .bytes = e -> bytes ;
802-
803- if (m -> previndex > RAFT_LOG_LAST_INDEX (r ))
804- {
805- debug ("got an update with previndex=%d > lastindex=%d\n" , m -> previndex , RAFT_LOG_LAST_INDEX (r ));
806- goto finish ;
807- }
812+ if (!m -> snapshot && !raft_appendable (r , m -> previndex , m -> prevterm )) goto finish ;
808813
809814 if (reply .progress .entries > 0 ) {
810815 reply .term = RAFT_LOG (r , reply .progress .entries - 1 ).term ;
@@ -838,6 +843,13 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
838843 }
839844
840845 if (!m -> empty ) {
846+ if ((m -> offset > 0 ) && (e -> term < m -> term )) {
847+ shout ("a chunk of newer version of entry received, resetting progress to avoid corruption\n" );
848+ e -> term = m -> term ;
849+ e -> bytes = 0 ;
850+ goto finish ;
851+ }
852+
841853 if (m -> offset > e -> bytes ) {
842854 shout ("unexpectedly large offset %d for a chunk, ignoring to avoid gaps\n" , m -> offset );
843855 goto finish ;
@@ -870,18 +882,18 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
870882 // just a heartbeat
871883 }
872884
873- reply .progress .entries = RAFT_LOG_LAST_INDEX (r ) + 1 ;
874- reply .progress .bytes = e -> bytes ;
875- if (reply .progress .entries > 0 ) {
876- reply .term = RAFT_LOG (r , reply .progress .entries - 1 ).term ;
885+ if (RAFT_LOG_LAST_INDEX (r ) >= 0 ) {
886+ reply .term = RAFT_LOG (r , RAFT_LOG_LAST_INDEX (r )).term ;
877887 } else {
878888 reply .term = -1 ;
879889 }
880890 reply .applied = r -> log .applied ;
881891
882892 reply .success = true;
883893finish :
884- assert ((reply .progress .entries == m -> previndex + 1 ) || (reply .progress .bytes == 0 ));
894+ reply .progress .entries = RAFT_LOG_LAST_INDEX (r ) + 1 ;
895+ reply .progress .bytes = e -> bytes ;
896+
885897 raft_send (r , sender , & reply , sizeof (reply ));
886898}
887899
0 commit comments