@@ -52,10 +52,11 @@ static void *get_shared_state(void)
5252 return shared .state ;
5353}
5454
55- static void try_next_peer (void )
55+ static void select_next_peer (void )
5656{
57- while (! wcfg . peers [ * shared . leader ]. up )
57+ do {
5858 * shared .leader = (* shared .leader + 1 ) % RAFTABLE_PEERS_MAX ;
59+ } while (!wcfg .peers [* shared .leader ].up );
5960}
6061
6162static void disconnect_leader (void )
@@ -64,7 +65,7 @@ static void disconnect_leader(void)
6465 {
6566 close (leadersock );
6667 }
67- try_next_peer ();
68+ select_next_peer ();
6869 leadersock = -1 ;
6970}
7071
@@ -74,6 +75,9 @@ static bool connect_leader(void)
7475 struct addrinfo hint ;
7576 char portstr [6 ];
7677 struct addrinfo * a ;
78+ int rc ;
79+
80+ if (* shared .leader == NOBODY ) select_next_peer ();
7781
7882 HostPort * leaderhp = wcfg .peers + * shared .leader ;
7983
@@ -83,10 +87,12 @@ static bool connect_leader(void)
8387 snprintf (portstr , 6 , "%d" , leaderhp -> port );
8488 hint .ai_protocol = getprotobyname ("tcp" )-> p_proto ;
8589
86- if (getaddrinfo (leaderhp -> host , portstr , & hint , & addrs ))
90+ if (( rc = getaddrinfo (leaderhp -> host , portstr , & hint , & addrs ) ))
8791 {
8892 disconnect_leader ();
89- perror ("failed to resolve address" );
93+ fprintf (stderr , "failed to resolve address '%s:%d': %s" ,
94+ leaderhp -> host , leaderhp -> port ,
95+ gai_strerror (rc ));
9096 return false;
9197 }
9298
@@ -168,11 +174,14 @@ void raftable_set(char *key, char *value)
168174 size += vallen ;
169175 ru = palloc (size );
170176
177+ ru -> expector = wcfg .id ;
178+ ru -> fieldnum = 1 ;
179+
171180 RaftableField * f = (RaftableField * )ru -> data ;
172181 f -> keylen = keylen ;
173182 f -> vallen = vallen ;
174183 memcpy (f -> data , key , keylen );
175- memcpy (f -> data + keylen , key , vallen );
184+ memcpy (f -> data + keylen , value , vallen );
176185
177186 bool ok = false;
178187 while (!ok )
@@ -201,6 +210,18 @@ void raftable_set(char *key, char *value)
201210 }
202211 sent += newbytes ;
203212 }
213+
214+ if (ok )
215+ {
216+ int status ;
217+ int recved = read (s , & status , sizeof (status ));
218+ if (recved != sizeof (status ))
219+ {
220+ disconnect_leader ();
221+ fprintf (stderr , "failed to recv the update status from the leader\n" );
222+ ok = false;
223+ }
224+ }
204225 }
205226
206227 pfree (ru );
@@ -358,6 +379,7 @@ _PG_init(void)
358379 );
359380 parse_peers (wcfg .peers , peerstr );
360381
382+ request_shmem ();
361383 worker_register (& wcfg );
362384
363385 PreviousShmemStartupHook = shmem_startup_hook ;
0 commit comments