@@ -19,7 +19,6 @@ typedef enum roles {
1919
2020#define UDP_SAFE_SIZE 508
2121
22- // raft module does not care what you mean by action and argument
2322typedef struct raft_entry_t {
2423 int term ;
2524 bool snapshot ;
@@ -46,6 +45,7 @@ typedef struct raft_peer_t {
4645
4746 int seqno ; // the rpc sequence number
4847 raft_progress_t acked ; // the number of entries:bytes acked by this peer
48+ int applied ; // the number of entries applied by this peer
4949
5050 char * host ;
5151 int port ;
@@ -111,6 +111,7 @@ typedef struct raft_msg_done_t {
111111 raft_msg_data_t msg ;
112112 int term ; // the term of the appended entry
113113 raft_progress_t progress ; // the progress after appending
114+ int applied ;
114115 bool success ;
115116 // the message is considered acked when the last chunk appends successfully
116117} raft_msg_done_t ;
@@ -177,6 +178,7 @@ static void raft_peer_init(raft_peer_t *p) {
177178 p -> up = false;
178179 p -> seqno = 0 ;
179180 reset_progress (& p -> acked );
181+ p -> applied = 0 ;
180182
181183 p -> host = DEFAULT_LISTENHOST ;
182184 p -> port = DEFAULT_LISTENPORT ;
@@ -633,7 +635,7 @@ static int raft_compact(raft_t raft) {
633635 return compacted ;
634636}
635637
636- bool raft_emit (raft_t r , raft_update_t update ) {
638+ int raft_emit (raft_t r , raft_update_t update ) {
637639 assert (r -> leader == r -> me );
638640 assert (r -> role == LEADER );
639641
@@ -646,11 +648,12 @@ bool raft_emit(raft_t r, raft_update_t update) {
646648 "cannot emit new entries, the log is"
647649 " full and cannot be compacted\n"
648650 );
649- return false ;
651+ return -1 ;
650652 }
651653 }
652654
653- raft_entry_t * e = & RAFT_LOG (r , r -> log .first + r -> log .size );
655+ int newindex = RAFT_LOG_LAST_INDEX (r ) + 1 ;
656+ raft_entry_t * e = & RAFT_LOG (r , newindex );
654657 e -> term = r -> term ;
655658 assert (e -> update .len == 0 );
656659 assert (e -> update .data == NULL );
@@ -661,7 +664,13 @@ bool raft_emit(raft_t r, raft_update_t update) {
661664
662665 raft_beat (r , NOBODY );
663666 raft_reset_timer (r );
664- return true;
667+ return newindex ;
668+ }
669+
670+ bool raft_applied (raft_t r , int id , int index ) {
671+ raft_peer_t * p = r -> peers + id ;
672+ if (!p -> up ) return false;
673+ return p -> applied >= index ;
665674}
666675
667676static bool raft_restore (raft_t r , int previndex , raft_entry_t * e ) {
@@ -836,6 +845,7 @@ static void raft_handle_update(raft_t r, raft_msg_update_t *m) {
836845 } else {
837846 reply .term = -1 ;
838847 }
848+ reply .applied = r -> log .applied ;
839849
840850 reply .success = true;
841851finish :
@@ -863,6 +873,8 @@ static void raft_handle_done(raft_t r, raft_msg_done_t *m) {
863873 return ;
864874 }
865875
876+ peer -> applied = m -> applied ;
877+
866878 if (m -> success ) {
867879 debug ("[from %d] ============= done\n" , sender );
868880 peer -> acked = m -> progress ;
0 commit comments