66#include <unistd.h>
77#include <stdlib.h>
88#include <assert.h>
9+ #include <time.h>
910
1011#include "libdtm.h"
1112#include "dtmd/include/proto.h"
13+ #include "dtmd/include/dtmdlimits.h"
1214#include "sockhub/sockhub.h"
1315
1416#ifdef TEST
@@ -24,23 +26,36 @@ typedef struct DTMConnData *DTMConn;
2426
2527typedef struct DTMConnData
2628{
29+ char * host ; // use unix socket if host is NULL
30+ int port ;
2731 int sock ;
2832} DTMConnData ;
2933
30- static char * dtmhost = NULL ;
31- static int dtmport = 0 ;
32- static char * dtm_unix_sock_dir ;
34+ static bool connected = false;
35+ static int leader = 0 ;
36+ static int connum = 0 ;
37+ static DTMConnData conns [MAX_SERVERS ];
38+ static char * dtm_unix_sock_dir ;
3339
3440typedef unsigned xid_t ;
3541
42+ static void DiscardConnection ()
43+ {
44+ connected = false;
45+ leader = (leader + 1 ) % connum ;
46+ fprintf (stderr , "next candidate is %s:%d\n" , conns [leader ].host , conns [leader ].port );
47+ }
48+
3649// Connects to the specified DTM.
37- static DTMConn DtmConnect (char * host , int port )
50+ static bool DtmConnect (DTMConn conn )
3851{
39- DTMConn dtm ;
4052 int sd ;
4153
42- if (host == NULL )
54+ if (conn -> host == NULL )
4355 {
56+ perror ("unix socket not supported yet" );
57+ * (int * )0 = 0 ;
58+ /*
4459 // use a UNIX socket
4560 struct sockaddr sock;
4661 int len = offsetof(struct sockaddr, sa_data) + snprintf(sock.sa_data, sizeof(sock.sa_data), "%s/p%u", dtm_unix_sock_dir, port);
@@ -49,17 +64,20 @@ static DTMConn DtmConnect(char *host, int port)
4964 sd = socket(AF_UNIX, SOCK_STREAM, 0);
5065 if (sd == -1)
5166 {
67+ DiscardConnection();
5268 perror("failed to create a unix socket");
69+ return false;
5370 }
5471 if (connect(sd, &sock, len) == -1)
5572 {
73+ DiscardConnection();
5674 perror("failed to connect to the address");
5775 close(sd);
58- return NULL ;
76+ return false ;
5977 }
60- dtm = malloc ( sizeof ( DTMConnData )) ;
61- dtm -> sock = sd ;
62- return dtm ;
78+ conn->sock = sd ;
79+ return (connected = true) ;
80+ */
6381 }
6482 else
6583 {
@@ -72,27 +90,16 @@ static DTMConn DtmConnect(char *host, int port)
7290 memset (& hint , 0 , sizeof (hint ));
7391 hint .ai_socktype = SOCK_STREAM ;
7492 hint .ai_family = AF_INET ;
75- snprintf (portstr , 6 , "%d" , port );
93+ snprintf (portstr , 6 , "%d" , conn -> port );
7694 hint .ai_protocol = getprotobyname ("tcp" )-> p_proto ;
7795
78- while (1 )
96+ while (true )
7997 {
80- char * sep = strchr (host , ',' );
81- if (sep != NULL )
82- {
83- * sep = '\0' ;
84- }
85- if (getaddrinfo (host , portstr , & hint , & addrs ))
98+ if (getaddrinfo (conn -> host , portstr , & hint , & addrs ))
8699 {
100+ DiscardConnection ();
87101 perror ("failed to resolve address" );
88- if (sep == NULL )
89- {
90- return NULL ;
91- }
92- else
93- {
94- goto TryNextHost ;
95- }
102+ return false;
96103 }
97104
98105 for (a = addrs ; a != NULL ; a = a -> ai_next )
@@ -102,39 +109,28 @@ static DTMConn DtmConnect(char *host, int port)
102109 if (sd == -1 )
103110 {
104111 perror ("failed to create a socket" );
105- goto TryNextHost ;
112+ continue ;
106113 }
107114 setsockopt (sd , IPPROTO_TCP , TCP_NODELAY , & one , sizeof (one ));
108115
109116 if (connect (sd , a -> ai_addr , a -> ai_addrlen ) == -1 )
110117 {
111118 perror ("failed to connect to an address" );
112119 close (sd );
113- goto TryNextHost ;
120+ continue ;
114121 }
115122
116123 // success
117124 freeaddrinfo (addrs );
118- dtm = malloc (sizeof (DTMConnData ));
119- dtm -> sock = sd ;
120- if (sep != NULL )
121- {
122- * sep = ',' ;
123- }
124- return dtm ;
125+ conn -> sock = sd ;
126+ return (connected = true);
125127 }
126128 freeaddrinfo (addrs );
127- TryNextHost :
128- if (sep == NULL )
129- {
130- break ;
131- }
132- * sep = ',' ;
133- host = sep + 1 ;
134129 }
135130 }
131+ DiscardConnection ();
136132 fprintf (stderr , "could not connect\n" );
137- return NULL ;
133+ return false ;
138134}
139135
140136static int dtm_recv_results (DTMConn dtm , int maxlen , xid_t * results )
@@ -150,11 +146,13 @@ static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results)
150146 int newbytes = read (dtm -> sock , (char * )& msg + recved , needed - recved );
151147 if (newbytes == -1 )
152148 {
149+ DiscardConnection ();
153150 elog (ERROR , "Failed to recv results header from arbiter" );
154151 return 0 ;
155152 }
156153 if (newbytes == 0 )
157154 {
155+ DiscardConnection ();
158156 elog (ERROR , "Arbiter closed connection during recv" );
159157 return 0 ;
160158 }
@@ -174,11 +172,13 @@ static int dtm_recv_results(DTMConn dtm, int maxlen, xid_t *results)
174172 int newbytes = read (dtm -> sock , (char * )results + recved , needed - recved );
175173 if (newbytes == -1 )
176174 {
175+ DiscardConnection ();
177176 elog (ERROR , "Failed to recv results body from arbiter" );
178177 return 0 ;
179178 }
180179 if (newbytes == 0 )
181180 {
181+ DiscardConnection ();
182182 elog (ERROR , "Arbiter closed connection during recv" );
183183 return 0 ;
184184 }
@@ -223,6 +223,7 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
223223 int newbytes = write (dtm -> sock , buf + sent , datasize - sent );
224224 if (newbytes == -1 )
225225 {
226+ DiscardConnection ();
226227 elog (ERROR , "Failed to send a command to arbiter" );
227228 return false;
228229 }
@@ -231,39 +232,75 @@ static bool dtm_send_command(DTMConn dtm, xid_t cmd, int argc, ...)
231232 return true;
232233}
233234
234- void DtmGlobalConfig (char * host , int port , char * sock_dir ) {
235- if (dtmhost )
236- {
237- free (dtmhost );
238- dtmhost = NULL ;
235+ void DtmGlobalConfig (char * servers , char * sock_dir )
236+ {
237+ char * hstate , * pstate ;
238+ char * hostport , * host , * portstr ;
239+ int port ;
240+
241+ while (connum -- > 0 ) {
242+ if (conns [connum ].host )
243+ free (conns [connum ].host );
239244 }
240- if (host )
245+
246+ hostport = strtok_r (servers , " " , & hstate );
247+ while (hostport )
241248 {
242- dtmhost = strdup (host );
249+ //fprintf(stderr, "hostport = '%s'\n", hostport); sleep(1);
250+ host = strtok_r (hostport , ":" , & pstate );
251+ //fprintf(stderr, "host = '%s'\n", hostport); sleep(1);
252+ if (!host ) break ;
253+
254+ portstr = strtok_r (NULL , ":" , & pstate );
255+ //fprintf(stderr, "portstr = '%s'\n", portstr); sleep(1);
256+ if (portstr )
257+ port = atoi (portstr );
258+ else
259+ port = 5431 ;
260+ //fprintf(stderr, "host = %d\n", port); sleep(1);
261+
262+ if (!sock_dir ) {
263+ conns [connum ].host = strdup (host );
264+ } else {
265+ conns [connum ].host = NULL ;
266+ }
267+ conns [connum ].port = port ;
268+ connum ++ ;
269+
270+ hostport = strtok_r (NULL , " " , & hstate );
243271 }
244- dtmport = port ;
272+
245273 dtm_unix_sock_dir = sock_dir ;
246274}
247275
248276static DTMConn GetConnection ()
249277{
250- static DTMConn dtm = NULL ;
251- if ( dtm == NULL )
278+ int tries = 3 * connum ;
279+ while (! connected && ( tries > 0 ) )
252280 {
253- dtm = DtmConnect ( dtmhost , dtmport ) ;
254- if (dtm == NULL )
281+ DTMConn c = conns + leader ;
282+ if (! DtmConnect ( c ) )
255283 {
256- if (dtmhost )
284+ int timeout_ms = 100 ;
285+ struct timespec timeout = {0 , timeout_ms * 1000000 };
286+ nanosleep (& timeout , NULL );
287+
288+ tries -- ;
289+ if (c -> host )
257290 {
258- elog (ERROR , "Failed to connect to DTMD at tcp %s:%d" , dtmhost , dtmport );
291+ elog (ERROR , "Failed to connect to DTMD at tcp %s:%d" , c -> host , c -> port );
259292 }
260293 else
261294 {
262- elog (ERROR , "Failed to connect to DTMD at unix %d" , dtmport );
295+ elog (ERROR , "Failed to connect to DTMD at unix %d" , c -> port );
263296 }
264297 }
265298 }
266- return dtm ;
299+ if (!tries )
300+ {
301+ return NULL ;
302+ }
303+ return conns + leader ;
267304}
268305
269306void DtmInitSnapshot (Snapshot snapshot )
@@ -333,6 +370,7 @@ TransactionId DtmGlobalStartTransaction(Snapshot snapshot, TransactionId *gxmin)
333370
334371 return xid ;
335372failure :
373+ DiscardConnection ();
336374 fprintf (stderr , "DtmGlobalStartTransaction: transaction failed to start\n" );
337375 return INVALID_XID ;
338376}
@@ -368,6 +406,7 @@ void DtmGlobalGetSnapshot(TransactionId xid, Snapshot snapshot, TransactionId *g
368406
369407 return ;
370408failure :
409+ DiscardConnection ();
371410 fprintf (
372411 stderr ,
373412 "DtmGlobalGetSnapshot: failed to"
@@ -414,6 +453,7 @@ XidStatus DtmGlobalSetTransStatus(TransactionId xid, XidStatus status, bool wait
414453 goto failure ;
415454 }
416455failure :
456+ DiscardConnection ();
417457 fprintf (
418458 stderr ,
419459 "DtmGlobalSetTransStatus: failed to vote"
@@ -453,6 +493,7 @@ XidStatus DtmGlobalGetTransStatus(TransactionId xid, bool wait)
453493 goto failure ;
454494 }
455495failure :
496+ DiscardConnection ();
456497 fprintf (
457498 stderr ,
458499 "DtmGlobalGetTransStatus: failed to get"
@@ -491,6 +532,7 @@ int DtmGlobalReserve(TransactionId xid, int nXids, TransactionId *first)
491532 Assert (count >= nXids );
492533 return count ;
493534failure :
535+ DiscardConnection ();
494536 fprintf (
495537 stderr ,
496538 "DtmGlobalReserve: failed to reserve"
0 commit comments