1212#include "access/htup_details.h"
1313#include "miscadmin.h"
1414#include "funcapi.h"
15+ #include "utils/timestamp.h"
1516
1617#include "raft.h"
1718#include "util.h"
@@ -54,9 +55,19 @@ static void *get_shared_state(void)
5455
5556static void select_next_peer (void )
5657{
57- do {
58- * shared .leader = (* shared .leader + 1 ) % RAFTABLE_PEERS_MAX ;
59- } while (!wcfg .peers [* shared .leader ].up );
58+ int orig_leader = * shared .leader ;
59+ int i ;
60+ for (i = 0 ; i < RAFTABLE_PEERS_MAX ; i ++ )
61+ {
62+ int idx = (orig_leader + i + 1 ) % RAFTABLE_PEERS_MAX ;
63+ HostPort * hp = wcfg .peers + idx ;
64+ if (hp -> up )
65+ {
66+ * shared .leader = idx ;
67+ return ;
68+ }
69+ }
70+ elog (WARNING , "all raftable peers down" );
6071}
6172
6273static void disconnect_leader (void )
@@ -129,56 +140,116 @@ static bool connect_leader(void)
129140
130141static int get_connection (void )
131142{
132- while (leadersock < 0 )
143+ if (leadersock < 0 )
133144 {
134- if (connect_leader ()) break ;
145+ if (connect_leader ()) return leadersock ;
135146
136- int timeout_ms = 1000 ;
147+ int timeout_ms = 100 ;
137148 struct timespec timeout = {0 , timeout_ms * 1000000 };
138149 nanosleep (& timeout , NULL );
139150 }
140151 return leadersock ;
141152}
142153
143- char * raftable_get (char * key )
154+ char * raftable_get (const char * key , size_t * len )
144155{
145- return state_get (shared .state , key );
156+ return state_get (shared .state , key , len );
146157}
147158
148159Datum
149160raftable_sql_get (PG_FUNCTION_ARGS )
150161{
151162 RaftableEntry * e ;
152163 RaftableKey key ;
164+ size_t len ;
153165 text_to_cstring_buffer (PG_GETARG_TEXT_P (0 ), key .data , sizeof (key .data ));
154166
155167 Assert (shared .state );
156168
157- char * s = state_get (shared .state , key .data );
169+ char * s = state_get (shared .state , key .data , & len );
158170 if (s )
159171 {
160- text * t = cstring_to_text ( s );
172+ text * t = cstring_to_text_with_len ( s , len );
161173 pfree (s );
162174 PG_RETURN_TEXT_P (t );
163175 }
164176 else
165177 PG_RETURN_NULL ();
166178}
167179
168- bool raftable_set ( char * key , char * value , int tries )
180+ static void start_timer ( TimestampTz * timer )
169181{
170- RaftableUpdate * ru ;
171- size_t size = sizeof (RaftableUpdate );
172- int keylen , vallen = 0 ;
173- bool ok = false;
182+ * timer -= GetCurrentTimestamp ();
183+ }
184+
185+ static void stop_timer (TimestampTz * timer )
186+ {
187+ * timer += GetCurrentTimestamp ();
188+ }
189+
190+ static long msec (TimestampTz timer )
191+ {
192+ long sec ;
193+ int usec ;
194+ TimestampDifference (0 , timer , & sec , & usec );
195+ return sec * 1000 + usec / 1000 ;
196+ }
197+
198+ static bool try_sending_update (RaftableUpdate * ru , size_t size )
199+ {
200+ int s = get_connection ();
174201
175- if (tries <= 0 )
202+ if (s < 0 ) return false;
203+
204+ int sent = 0 , recved = 0 ;
205+ int status ;
206+
207+ if (write (s , & size , sizeof (size )) != sizeof (size ))
208+ {
209+ disconnect_leader ();
210+ elog (WARNING , "failed to send the update size to the leader" );
211+ return false;
212+ }
213+
214+ while (sent < size )
176215 {
177- elog (ERROR , "raftable set should be called with 'tries' > 0" );
216+ int newbytes = write (s , (char * )ru + sent , size - sent );
217+ if (newbytes == -1 )
218+ {
219+ disconnect_leader ();
220+ elog (WARNING , "failed to send the update to the leader" );
221+ return false;
222+ }
223+ sent += newbytes ;
178224 }
179225
226+ recved = read (s , & status , sizeof (status ));
227+ if (recved != sizeof (status ))
228+ {
229+ disconnect_leader ();
230+ elog (WARNING , "failed to recv the update status from the leader" );
231+ return false;
232+ }
233+
234+ if (status != 1 )
235+ {
236+ disconnect_leader ();
237+ elog (WARNING , "leader returned %d" , status );
238+ return false;
239+ }
240+
241+ return true;
242+ }
243+
244+ bool raftable_set (const char * key , const char * value , size_t vallen , int timeout_ms )
245+ {
246+ RaftableUpdate * ru ;
247+ size_t size = sizeof (RaftableUpdate );
248+ size_t keylen = 0 ;
249+ TimestampTz now ;
250+ int elapsed_ms ;
251+
180252 keylen = strlen (key ) + 1 ;
181- if (value ) vallen = strlen (value ) + 1 ;
182253
183254 size += sizeof (RaftableField ) - 1 ;
184255 size += keylen ;
@@ -194,84 +265,54 @@ bool raftable_set(char *key, char *value, int tries)
194265 memcpy (f -> data , key , keylen );
195266 memcpy (f -> data + keylen , value , vallen );
196267
197- tryagain :
198- if (tries -- )
268+ elapsed_ms = 0 ;
269+ now = GetCurrentTimestamp ();
270+ while ((elapsed_ms <= timeout_ms ) || (timeout_ms == -1 ))
199271 {
200- int s = get_connection ();
201- int sent = 0 , recved = 0 ;
202- int status ;
203-
204- if (write (s , & size , sizeof (size )) != sizeof (size ))
272+ TimestampTz past = now ;
273+ if (try_sending_update (ru , size ))
205274 {
206- disconnect_leader ();
207- elog (WARNING , "failed[%d] to send the update size to the leader" , tries );
208- goto tryagain ;
209- }
210-
211- while (sent < size )
212- {
213- int newbytes = write (s , (char * )ru + sent , size - sent );
214- if (newbytes == -1 )
215- {
216- disconnect_leader ();
217- elog (WARNING , "failed[%d] to send the update to the leader" , tries );
218- goto tryagain ;
219- }
220- sent += newbytes ;
221- }
222-
223- recved = read (s , & status , sizeof (status ));
224- if (recved != sizeof (status ))
225- {
226- disconnect_leader ();
227- elog (WARNING , "failed to recv the update status from the leader\n" );
228- goto tryagain ;
275+ pfree (ru );
276+ return true;
229277 }
230- goto success ;
231- }
232- else
233- {
234- goto failure ;
278+ now = GetCurrentTimestamp ();
279+ elapsed_ms += msec (now - past );
235280 }
236281
237- failure :
238- elog (WARNING , "failed all tries to set raftable value\n" );
239282 pfree (ru );
283+ elog (WARNING , "failed to set raftable value after %d ms" , timeout_ms );
240284 return false;
241-
242- success :
243- pfree (ru );
244- return true;
245285}
246286
247287Datum
248288raftable_sql_set (PG_FUNCTION_ARGS )
249289{
250290 char * key = text_to_cstring (PG_GETARG_TEXT_P (0 ));
251- int tries = PG_GETARG_INT32 (2 );
291+ int timeout_ms = PG_GETARG_INT32 (2 );
252292 if (PG_ARGISNULL (1 ))
253- raftable_set (key , NULL , tries );
293+ raftable_set (key , NULL , 0 , timeout_ms );
254294 else
255295 {
256296 char * value = text_to_cstring (PG_GETARG_TEXT_P (1 ));
257- raftable_set (key , value , tries );
297+ raftable_set (key , value , strlen ( value ), timeout_ms );
258298 pfree (value );
259299 }
260300 pfree (key );
261301
262302 PG_RETURN_VOID ();
263303}
264304
265- void raftable_every (void (* func )(char * , char * , void * ), void * arg )
305+ void raftable_every (void (* func )(const char * , const char * , size_t , void * ), void * arg )
266306{
267307 void * scan ;
268308 char * key , * value ;
309+ size_t len ;
269310 Assert (shared .state );
270311
271312 scan = state_scan (shared .state );
272- while (state_next (shared .state , scan , & key , & value ))
313+ while (state_next (shared .state , scan , & key , & value , & len ))
273314 {
274- func (key , value , arg );
315+ func (key , value , len , arg );
275316 pfree (key );
276317 pfree (value );
277318 }
@@ -281,6 +322,7 @@ Datum
281322raftable_sql_list (PG_FUNCTION_ARGS )
282323{
283324 char * key , * value ;
325+ size_t len ;
284326 FuncCallContext * funcctx ;
285327 MemoryContext oldcontext ;
286328
@@ -309,14 +351,14 @@ raftable_sql_list(PG_FUNCTION_ARGS)
309351
310352 funcctx = SRF_PERCALL_SETUP ();
311353
312- if (state_next (shared .state , funcctx -> user_fctx , & key , & value ))
354+ if (state_next (shared .state , funcctx -> user_fctx , & key , & value , & len ))
313355 {
314356 HeapTuple tuple ;
315357 Datum vals [2 ];
316358 bool isnull [2 ];
317359
318- vals [0 ] = CStringGetTextDatum ( key );
319- vals [1 ] = CStringGetTextDatum ( value );
360+ vals [0 ] = PointerGetDatum ( cstring_to_text ( key ) );
361+ vals [1 ] = PointerGetDatum ( cstring_to_text_with_len ( value , len ) );
320362 isnull [0 ] = isnull [1 ] = false;
321363
322364 tuple = heap_form_tuple (funcctx -> tuple_desc , vals , isnull );
0 commit comments