1212#include "access/htup_details.h"
1313#include "miscadmin.h"
1414#include "funcapi.h"
15- #include "utils/timestamp.h"
1615
1716#include "raft.h"
1817#include "util.h"
1918
2019#include "raftable.h"
2120#include "worker.h"
2221#include "state.h"
22+ #include "timeout.h"
2323
2424#include <poll.h>
2525#include <sys/socket.h>
@@ -81,42 +81,33 @@ static void disconnect_leader(void)
8181 leadersock = -1 ;
8282}
8383
84- static bool poll_until_writable (int sock , int timeout_ms )
84+
85+ static bool poll_until_writable (int sock , timeout_t * timeout )
8586{
8687 struct pollfd pfd = {sock , POLLOUT , 0 };
87- int r = poll (& pfd , 1 , timeout_ms );
88+ int r = poll (& pfd , 1 , timeout_remaining_ms ( timeout ) );
8889 if (r != 1 ) return false;
8990 return (pfd .revents & POLLOUT ) != 0 ;
9091}
9192
92- static bool poll_until_readable (int sock , int timeout_ms )
93+ static bool poll_until_readable (int sock , timeout_t * timeout )
9394{
9495 struct pollfd pfd = {sock , POLLIN , 0 };
95- int r = poll (& pfd , 1 , timeout_ms );
96+ int remain = timeout_remaining_ms (timeout );
97+ int r = poll (& pfd , 1 , remain );
9698 if (r != 1 ) return false;
9799 return (pfd .revents & POLLIN ) != 0 ;
98100}
99101
100- static long msec (TimestampTz timer )
101- {
102- long sec ;
103- int usec ;
104- TimestampDifference (0 , timer , & sec , & usec );
105- return sec * 1000 + usec / 1000 ;
106- }
107-
108- static bool timed_write (int sock , void * data , size_t len , int timeout_ms )
102+ static bool timed_write (int sock , void * data , size_t len , timeout_t * timeout )
109103{
110- TimestampTz start , now ;
111104 int sent = 0 ;
112105
113- now = start = GetCurrentTimestamp ();
114-
115106 while (sent < len )
116107 {
117108 int newbytes ;
118- now = GetCurrentTimestamp ();
119- if (( timeout_ms != -1 ) && ( msec ( now - start ) > timeout_ms )) {
109+ if ( timeout_happened ( timeout ))
110+ {
120111 elog (WARNING , "write timed out" );
121112 return false;
122113 }
@@ -125,12 +116,11 @@ static bool timed_write(int sock, void *data, size_t len, int timeout_ms)
125116 if (newbytes == -1 )
126117 {
127118 if (errno == EAGAIN ) {
128- int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
129- if (poll_until_writable (sock , remaining_ms )) {
119+ if (poll_until_writable (sock , timeout )) {
130120 continue ;
131121 }
132122 }
133- elog (WARNING , "failed to write: %s" , strerror (errno ));
123+ elog (WARNING , "failed to write: error %d: %s" , errno , strerror (errno ));
134124 return false;
135125 }
136126 sent += newbytes ;
@@ -139,17 +129,15 @@ static bool timed_write(int sock, void *data, size_t len, int timeout_ms)
139129 return true;
140130}
141131
142- static bool timed_read (int sock , void * data , size_t len , int timeout_ms )
132+ static bool timed_read (int sock , void * data , size_t len , timeout_t * timeout )
143133{
144134 int recved = 0 ;
145- TimestampTz start , now ;
146- now = start = GetCurrentTimestamp ();
147135
148136 while (recved < len )
149137 {
150138 int newbytes ;
151- now = GetCurrentTimestamp ();
152- if (( timeout_ms != -1 ) && ( msec ( now - start ) > timeout_ms )) {
139+ if ( timeout_happened ( timeout ))
140+ {
153141 elog (WARNING , "read timed out" );
154142 return false;
155143 }
@@ -158,12 +146,11 @@ static bool timed_read(int sock, void *data, size_t len, int timeout_ms)
158146 if (newbytes == -1 )
159147 {
160148 if (errno == EAGAIN ) {
161- int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
162- if (poll_until_readable (sock , remaining_ms )) {
149+ if (poll_until_readable (sock , timeout )) {
163150 continue ;
164151 }
165152 }
166- elog (WARNING , "failed to read: %s" , strerror (errno ));
153+ elog (WARNING , "failed to read: error %d: %s" , errno , strerror (errno ));
167154 return false;
168155 }
169156 recved += newbytes ;
@@ -172,16 +159,14 @@ static bool timed_read(int sock, void *data, size_t len, int timeout_ms)
172159 return true;
173160}
174161
175- static bool connect_leader (int timeout_ms )
162+ static bool connect_leader (timeout_t * timeout )
176163{
177164 struct addrinfo * addrs = NULL ;
178165 struct addrinfo hint ;
179166 char portstr [6 ];
180167 struct addrinfo * a ;
181168 int rc ;
182-
183- TimestampTz now ;
184- int elapsed_ms ;
169+ int sd ;
185170
186171 HostPort * leaderhp ;
187172
@@ -198,23 +183,21 @@ static bool connect_leader(int timeout_ms)
198183 if ((rc = getaddrinfo (leaderhp -> host , portstr , & hint , & addrs )))
199184 {
200185 disconnect_leader ();
201- fprintf ( stderr , "failed to resolve address '%s:%d': %s" ,
202- leaderhp -> host , leaderhp -> port ,
203- gai_strerror (rc ));
186+ elog ( WARNING , "failed to resolve address '%s:%d': %s" ,
187+ leaderhp -> host , leaderhp -> port ,
188+ gai_strerror (rc ));
204189 return false;
205190 }
206191
207- fprintf (stderr , "trying [%d] %s:%d\n" , * shared .leader , leaderhp -> host , leaderhp -> port );
208- elapsed_ms = 0 ;
209- now = GetCurrentTimestamp ();
192+ elog (WARNING , "trying [%d] %s:%d" , * shared .leader , leaderhp -> host , leaderhp -> port );
210193 for (a = addrs ; a != NULL ; a = a -> ai_next )
211194 {
212195 int one = 1 ;
213196
214- int sd = socket (a -> ai_family , SOCK_STREAM | SOCK_NONBLOCK , 0 );
197+ sd = socket (a -> ai_family , SOCK_STREAM | SOCK_NONBLOCK , 0 );
215198 if (sd == -1 )
216199 {
217- perror ( "failed to create a socket" );
200+ elog ( WARNING , "failed to create a socket: %s" , strerror ( errno ) );
218201 continue ;
219202 }
220203 setsockopt (sd , IPPROTO_TCP , TCP_NODELAY , & one , sizeof (one ));
@@ -223,54 +206,54 @@ static bool connect_leader(int timeout_ms)
223206 {
224207 if (errno == EINPROGRESS )
225208 {
226- while (( elapsed_ms <= timeout_ms ) || ( timeout_ms == -1 ))
209+ TIMEOUT_LOOP_START ( timeout );
227210 {
228- TimestampTz past = now ;
229- int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - elapsed_ms ;
230-
231- if (poll_until_writable (sd , remaining_ms ))
211+ if (poll_until_writable (sd , timeout ))
232212 {
233213 int err ;
234214 socklen_t optlen = sizeof (err );
235215 getsockopt (sd , SOL_SOCKET , SO_ERROR , & err , & optlen );
236- if (err == 0 )
237- {
238- // success
239- break ;
240- }
216+ if (err == 0 ) goto success ;
241217 }
242-
243- now = GetCurrentTimestamp ();
244- elapsed_ms += msec (now - past );
245218 }
219+ TIMEOUT_LOOP_END (timeout );
220+ elog (WARNING , "connect timed out" );
221+ goto failure ;
246222 }
247223 else
248224 {
249- perror ( "failed to connect to an address" );
225+ elog ( WARNING , "failed to connect to an address: %s" , strerror ( errno ) );
250226 close (sd );
251227 continue ;
252228 }
253229 }
254230
255- /* success */
256- freeaddrinfo (addrs );
257- leadersock = sd ;
258- return true;
231+ goto success ;
259232 }
233+ failure :
260234 freeaddrinfo (addrs );
261235 disconnect_leader ();
262- fprintf ( stderr , "could not connect\n " );
236+ elog ( WARNING , "could not connect" );
263237 return false;
238+ success :
239+ freeaddrinfo (addrs );
240+ leadersock = sd ;
241+ return true;
242+ }
243+
244+ static void wait_ms (int ms )
245+ {
246+ struct timespec ts = {0 , ms * 1000000 };
247+ nanosleep (& ts , NULL );
264248}
265249
266- static int get_connection (int timeout_ms )
250+ static int get_connection (timeout_t * timeout )
267251{
268252 if (leadersock < 0 )
269253 {
270- if (connect_leader (timeout_ms )) return leadersock ;
271- // int timeout_ms = 100;
272- // struct timespec timeout = {0, timeout_ms * 1000000};
273- // nanosleep(&timeout, NULL);
254+ if (connect_leader (timeout )) return leadersock ;
255+ elog (WARNING , "update: connect_leader() failed" );
256+ wait_ms (100 );
274257 }
275258 return leadersock ;
276259}
@@ -302,66 +285,37 @@ raftable_sql_get(PG_FUNCTION_ARGS)
302285 PG_RETURN_NULL ();
303286}
304287
305- static bool try_sending_update (RaftableUpdate * ru , size_t size , int timeout_ms )
288+ static bool try_sending_update (RaftableUpdate * ru , size_t size , timeout_t * timeout )
306289{
307- int s , status , remaining_ms ;
308- TimestampTz start , now ;
290+ int s , status ;
309291
310- now = start = GetCurrentTimestamp ();
311-
312- s = get_connection (timeout_ms - (now - start ));
292+ s = get_connection (timeout );
313293 if (s < 0 ) return false;
314294
315- now = GetCurrentTimestamp ();
316- remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
317- if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
295+ if (timeout_happened (timeout ))
318296 {
319- elog (WARNING , "update: connect () timed out" );
297+ elog (WARNING , "update: get_connection () timed out" );
320298 return false;
321299 }
322300
323- if (!timed_write (s , & size , sizeof (size ), remaining_ms ))
301+ if (!timed_write (s , & size , sizeof (size ), timeout ))
324302 {
325303 elog (WARNING , "failed to send the update size to the leader" );
326304 return false;
327305 }
328306
329- now = GetCurrentTimestamp ();
330- remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
331- if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
332- {
333- elog (WARNING , "update: send(size) timed out" );
334- return false;
335- }
336-
337- if (!timed_write (s , ru , size , remaining_ms ))
307+ if (!timed_write (s , ru , size , timeout ))
338308 {
339309 elog (WARNING , "failed to send the update to the leader" );
340310 return false;
341311 }
342312
343- now = GetCurrentTimestamp ();
344- remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
345- if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
346- {
347- elog (WARNING , "update: send(body) timed out" );
348- return false;
349- }
350-
351- if (!timed_read (s , & status , sizeof (status ), remaining_ms ))
313+ if (!timed_read (s , & status , sizeof (status ), timeout ))
352314 {
353315 elog (WARNING , "failed to recv the update status from the leader" );
354316 return false;
355317 }
356318
357- now = GetCurrentTimestamp ();
358- remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - msec (now - start );
359- if ((timeout_ms != -1 ) && (msec (now - start ) > timeout_ms ))
360- {
361- elog (WARNING , "update: recv(status) timed out" );
362- return false;
363- }
364-
365319 if (status != 1 )
366320 {
367321 elog (WARNING , "update: leader returned status = %d" , status );
@@ -377,8 +331,8 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
377331 RaftableUpdate * ru ;
378332 size_t size = sizeof (RaftableUpdate );
379333 size_t keylen = 0 ;
380- TimestampTz now ;
381- int elapsed_ms ;
334+ timeout_t timeout ;
335+ timeout_start ( & timeout , timeout_ms ) ;
382336
383337 Assert (wcfg .id >= 0 );
384338
@@ -398,27 +352,20 @@ bool raftable_set(const char *key, const char *value, size_t vallen, int timeout
398352 memcpy (f -> data , key , keylen );
399353 memcpy (f -> data + keylen , value , vallen );
400354
401- elapsed_ms = 0 ;
402- now = GetCurrentTimestamp ();
403- while ((elapsed_ms <= timeout_ms ) || (timeout_ms == -1 ))
355+ TIMEOUT_LOOP_START (& timeout );
404356 {
405- TimestampTz past = now ;
406- int remaining_ms = (timeout_ms == -1 ) ? -1 : timeout_ms - elapsed_ms ;
407- if (try_sending_update (ru , size , remaining_ms ))
357+ if (try_sending_update (ru , size , & timeout ))
408358 {
409359 pfree (ru );
410360 return true;
411361 }
412362 else
413- {
414363 disconnect_leader ();
415- }
416- now = GetCurrentTimestamp ();
417- elapsed_ms += msec (now - past );
418364 }
365+ TIMEOUT_LOOP_END (& timeout );
419366
420367 pfree (ru );
421- elog (WARNING , "failed to set raftable value after %d ms" , elapsed_ms );
368+ elog (WARNING , "failed to set raftable value after %d ms" , timeout_elapsed_ms ( & timeout ) );
422369 return false;
423370}
424371
0 commit comments