2121#include "worker.h"
2222#include "state.h"
2323
24+ #include <poll.h>
2425#include <sys/socket.h>
2526#include <netinet/in.h>
2627#include <netinet/tcp.h>
@@ -80,17 +81,111 @@ static void disconnect_leader(void)
8081 leadersock = -1 ;
8182}
8283
83- static bool connect_leader (void )
84+ static bool poll_until_writable (int sock , int timeout_ms )
85+ {
86+ struct pollfd pfd = {sock , POLLOUT , 0 };
87+ int r = poll (& pfd , 1 , timeout_ms );
88+ if (r != 1 ) return false;
89+ return pfd .revents & POLLOUT ;
90+ }
91+
92+ static bool poll_until_readable (int sock , int timeout_ms )
93+ {
94+ struct pollfd pfd = {sock , POLLIN , 0 };
95+ int r = poll (& pfd , 1 , timeout_ms );
96+ if (r != 1 ) return false;
97+ return pfd .revents & POLLIN ;
98+ }
99+
100+ static long msec (TimestampTz timer )
101+ {
102+ long sec ;
103+ int usec ;
104+ TimestampDifference (0 , timer , & sec , & usec );
105+ return sec * 1000 + usec / 1000 ;
106+ }
107+
108+ static bool timed_write (int sock , void * data , size_t len , int timeout_ms )
109+ {
110+ TimestampTz start , now ;
111+ int sent = 0 ;
112+
113+ now = start = GetCurrentTimestamp ();
114+
115+ while (sent < len )
116+ {
117+ int newbytes ;
118+ now = GetCurrentTimestamp ();
119+ if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms )) {
120+ elog (WARNING , "write timed out" );
121+ return false;
122+ }
123+
124+ newbytes = write (sock , (char * )data + sent , len - sent );
125+ if (newbytes == -1 )
126+ {
127+ if (errno == EAGAIN ) {
128+ if (poll_until_writable (sock , timeout_ms - msec (now - start ))) {
129+ continue ;
130+ }
131+ }
132+ elog (WARNING , "failed to write: %s" , strerror (errno ));
133+ return false;
134+ }
135+ sent += newbytes ;
136+ }
137+
138+ return true;
139+ }
140+
141+ static bool timed_read (int sock , void * data , size_t len , int timeout_ms )
142+ {
143+ int recved = 0 ;
144+ TimestampTz start , now ;
145+ now = start = GetCurrentTimestamp ();
146+
147+ while (recved < len )
148+ {
149+ int newbytes ;
150+ now = GetCurrentTimestamp ();
151+ if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms )) {
152+ elog (WARNING , "read timed out" );
153+ return false;
154+ }
155+
156+ newbytes = read (sock , (char * )data + recved , len - recved );
157+ if (newbytes == -1 )
158+ {
159+ if (errno == EAGAIN ) {
160+ if (poll_until_readable (sock , timeout_ms - msec (now - start ))) {
161+ continue ;
162+ }
163+ }
164+ elog (WARNING , "failed to read: %s" , strerror (errno ));
165+ return false;
166+ }
167+ recved += newbytes ;
168+ }
169+
170+ return true;
171+ }
172+
173+ static bool connect_leader (int timeout_ms )
84174{
85175 struct addrinfo * addrs = NULL ;
86176 struct addrinfo hint ;
87177 char portstr [6 ];
88178 struct addrinfo * a ;
89179 int rc ;
90180
181+ TimestampTz now ;
182+ int elapsed_ms ;
183+
184+ HostPort * leaderhp ;
185+
91186 if (* shared .leader == NOBODY ) select_next_peer ();
92187
93- HostPort * leaderhp = wcfg .peers + * shared .leader ;
188+ leaderhp = wcfg .peers + * shared .leader ;
94189
95190 memset (& hint , 0 , sizeof (hint ));
96191 hint .ai_socktype = SOCK_STREAM ;
@@ -108,11 +203,13 @@ static bool connect_leader(void)
108203 }
109204
110205 fprintf (stderr , "trying [%d] %s:%d\n" , * shared .leader , leaderhp -> host , leaderhp -> port );
206+ elapsed_ms = 0 ;
207+ now = GetCurrentTimestamp ();
111208 for (a = addrs ; a != NULL ; a = a -> ai_next )
112209 {
113210 int one = 1 ;
114211
115- int sd = socket (a -> ai_family , a -> ai_socktype , a -> ai_protocol );
212+ int sd = socket (a -> ai_family , SOCK_STREAM | SOCK_NONBLOCK , 0 );
116213 if (sd == -1 )
117214 {
118215 perror ("failed to create a socket" );
@@ -122,9 +219,34 @@ static bool connect_leader(void)
122219
123220 if (connect (sd , a -> ai_addr , a -> ai_addrlen ) == -1 )
124221 {
125- perror ("failed to connect to an address" );
126- close (sd );
127- continue ;
222+ if (errno == EINPROGRESS )
223+ {
224+ while ((elapsed_ms <= timeout_ms ) || (timeout_ms == -1 ))
225+ {
226+ TimestampTz past = now ;
227+
228+ if (poll_until_writable (sd , timeout_ms - elapsed_ms ))
229+ {
230+ int err ;
231+ socklen_t optlen = sizeof (err );
232+ getsockopt (sd , SOL_SOCKET , SO_ERROR , & err , & optlen );
233+ if (err == 0 )
234+ {
235+ // success
236+ break ;
237+ }
238+ }
239+
240+ now = GetCurrentTimestamp ();
241+ elapsed_ms += msec (now - past );
242+ }
243+ }
244+ else
245+ {
246+ perror ("failed to connect to an address" );
247+ close (sd );
248+ continue ;
249+ }
128250 }
129251
130252 /* success */
@@ -138,15 +260,14 @@ static bool connect_leader(void)
138260 return false;
139261}
140262
141- static int get_connection (void )
263+ static int get_connection (int timeout_ms )
142264{
143265 if (leadersock < 0 )
144266 {
145- if (connect_leader ()) return leadersock ;
146-
147- int timeout_ms = 100 ;
148- struct timespec timeout = {0 , timeout_ms * 1000000 };
149- nanosleep (& timeout , NULL );
267+ if (connect_leader (timeout_ms )) return leadersock ;
268+ // int timeout_ms = 100;
269+ // struct timespec timeout = {0, timeout_ms * 1000000};
270+ // nanosleep(&timeout, NULL);
150271 }
151272 return leadersock ;
152273}
@@ -162,11 +283,12 @@ raftable_sql_get(PG_FUNCTION_ARGS)
162283{
163284 RaftableKey key ;
164285 size_t len ;
286+ char * s ;
165287 text_to_cstring_buffer (PG_GETARG_TEXT_P (0 ), key .data , sizeof (key .data ));
166288
167289 Assert (shared .state );
168290
169- char * s = state_get (shared .state , key .data , & len );
291+ s = state_get (shared .state , key .data , & len );
170292 if (s )
171293 {
172294 text * t = cstring_to_text_with_len (s , len );
@@ -177,54 +299,65 @@ raftable_sql_get(PG_FUNCTION_ARGS)
177299 PG_RETURN_NULL ();
178300}
179301
180- static long msec ( TimestampTz timer )
302+ static bool try_sending_update ( RaftableUpdate * ru , size_t size , int timeout_ms )
181303{
182- long sec ;
183- int usec ;
184- TimestampDifference (0 , timer , & sec , & usec );
185- return sec * 1000 + usec / 1000 ;
186- }
304+ int s , status ;
305+ TimestampTz start , now ;
187306
188- static bool try_sending_update (RaftableUpdate * ru , size_t size )
189- {
190- int s = get_connection ();
307+ now = start = GetCurrentTimestamp ();
191308
309+ s = get_connection (timeout_ms - (now - start ));
192310 if (s < 0 ) return false;
193311
194- int sent = 0 , recved = 0 ;
195- int status ;
312+ now = GetCurrentTimestamp ();
313+ if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
314+ {
315+ elog (WARNING , "update: connect() timed out" );
316+ return false;
317+ }
196318
197- if (write (s , & size , sizeof (size )) != sizeof ( size ))
319+ if (! timed_write (s , & size , sizeof (size ), timeout_ms - msec ( now - start ) ))
198320 {
199- disconnect_leader ();
200321 elog (WARNING , "failed to send the update size to the leader" );
201322 return false;
202323 }
203324
204- while (sent < size )
325+ now = GetCurrentTimestamp ();
326+ if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
205327 {
206- int newbytes = write (s , (char * )ru + sent , size - sent );
207- if (newbytes == -1 )
208- {
209- disconnect_leader ();
210- elog (WARNING , "failed to send the update to the leader" );
211- return false;
212- }
213- sent += newbytes ;
328+ elog (WARNING , "update: send(size) timed out" );
329+ return false;
214330 }
215331
216- recved = read (s , & status , sizeof (status ));
217- if (recved != sizeof (status ))
332+ if (!timed_write (s , ru , size , timeout_ms - msec (now - start )))
333+ {
334+ elog (WARNING , "failed to send the update to the leader" );
335+ return false;
336+ }
337+
338+ now = GetCurrentTimestamp ();
339+ if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
340+ {
341+ elog (WARNING , "update: send(body) timed out" );
342+ return false;
343+ }
344+
345+ if (!timed_read (s , & status , sizeof (status ), timeout_ms - msec (now - start )))
218346 {
219- disconnect_leader ();
220347 elog (WARNING , "failed to recv the update status from the leader" );
221348 return false;
222349 }
223350
351+ now = GetCurrentTimestamp ();
352+ if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
353+ {
354+ elog (WARNING , "update: recv(status) timed out" );
355+ return false;
356+ }
357+
224358 if (status != 1 )
225359 {
226- disconnect_leader ();
227- elog (WARNING , "leader returned %d" , status );
360+ elog (WARNING , "update: leader returned status = %d" , status );
228361 return false;
229362 }
230363
@@ -233,6 +366,7 @@ static bool try_sending_update(RaftableUpdate *ru, size_t size)
233366
234367bool raftable_set (const char * key , const char * value , size_t vallen , int timeout_ms )
235368{
369+ RaftableField * f ;
236370 RaftableUpdate * ru ;
237371 size_t size = sizeof (RaftableUpdate );
238372 size_t keylen = 0 ;
@@ -251,7 +385,7 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
251385 ru -> expector = wcfg .id ;
252386 ru -> fieldnum = 1 ;
253387
254- RaftableField * f = (RaftableField * )ru -> data ;
388+ f = (RaftableField * )ru -> data ;
255389 f -> keylen = keylen ;
256390 f -> vallen = vallen ;
257391 memcpy (f -> data , key , keylen );
@@ -262,17 +396,21 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
262396 while ((elapsed_ms <= timeout_ms ) || (timeout_ms == -1 ))
263397 {
264398 TimestampTz past = now ;
265- if (try_sending_update (ru , size ))
399+ if (try_sending_update (ru , size , timeout_ms - elapsed_ms ))
266400 {
267401 pfree (ru );
268402 return true;
269403 }
404+ else
405+ {
406+ disconnect_leader ();
407+ }
270408 now = GetCurrentTimestamp ();
271409 elapsed_ms += msec (now - past );
272410 }
273411
274412 pfree (ru );
275- elog (WARNING , "failed to set raftable value after %d ms" , timeout_ms );
413+ elog (WARNING , "failed to set raftable value after %d ms" , elapsed_ms );
276414 return false;
277415}
278416
0 commit comments