5151#include "multimaster.h"
5252
5353#define MAX_CONNECT_ATTEMPTS 10
54- #define TX_BUFFER_SIZE 1024
54+ #define BUFFER_SIZE 1024
55+ #define BUFFER_SIZE 1024
5556
5657typedef struct
5758{
@@ -61,12 +62,11 @@ typedef struct
6162
6263typedef struct
6364{
64- DtmCommitMessage data [TX_BUFFER_SIZE ];
65+ DtmCommitMessage data [BUFFER_SIZE ];
6566 int used ;
66- } DtmTxBuffer ;
67+ } DtmBuffer ;
6768
6869static int * sockets ;
69- static DtmTxBuffer * txBuffers ;
7070
7171static BackgroundWorker DtmSender = {
7272 "mm-sender" ,
@@ -115,6 +115,34 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
115115 return 1 ;
116116}
117117
118+ #ifdef USE_EPOLL
119+ static int epollfd ;
120+ #else
121+ static int max_fd ;
122+ static fd_set inset ;
123+ #endif
124+
125+ inline void registerSocket (int fd , int i )
126+ {
127+ #ifdef USE_EPOLL
128+ struct epoll_event ev ;
129+ ev .events = EPOLLIN ;
130+ ev .data .u32 = i ;
131+ if (epoll_ctl (epollfd , EPOLL_CTL_ADD , fd , & ev ) < 0 ) {
132+ char buf [ERR_BUF_SIZE ];
133+ sprintf (buf , "Failed to add socket %d to epoll set" , fd );
134+ shub -> params -> error_handler (buf , SHUB_FATAL_ERROR );
135+ }
136+ #else
137+ FD_SET (fd , & inset );
138+ if (fd > max_fd ) {
139+ max_fd = fd ;
140+ }
141+ #endif
142+ }
143+
144+
145+
118146static int connectSocket (char const * host , int port )
119147{
120148 struct sockaddr_in sock_inet ;
@@ -206,20 +234,47 @@ static void acceptConnections()
206234 elog (ERROR , "Failed to bind socket: %d" , errno );
207235 }
208236
209- for (i = 0 ; i < nNodes - 1 ; i ++ ) {
210- sockets [ i ] = accept (sd , NULL , NULL );
211- if (sockets [ i ] < 0 ) {
237+ for (i = 0 ; i < nNodes ; i ++ ) {
238+ int fd = accept (sd , NULL , NULL );
239+ if (fd < 0 ) {
212240 elog (ERROR , "Failed to accept socket: %d" , errno );
213241 }
242+ registerSocket (fd , i );
243+ sockets [i ] = fd ;
214244 }
215245}
216246
247+ static void WriteSocket (int sd , void const * buf , int size )
248+ {
249+ char * src = (char * )buf ;
250+ while (size != 0 ) {
251+ int n = send (sd , src , size , 0 );
252+ if (n <= 0 ) {
253+ return 0 ;
254+ }
255+ size -= n ;
256+ src += n ;
257+ }
258+ }
259+
260+ static int ReadSocket (int sd , void * buf , int buf_size )
261+ {
262+ int rc = recv (sd , buf , buf_size , 0 );
263+ if (rc <= 0 ) {
264+ elog (ERROR , "Arbiter failed to read socket: %d" , rc );
265+ }
266+ return rc ;
267+ }
268+
269+
217270static void DtmTransSender (Datum arg )
218271{
219272 int nNodes = dtm -> nNodes ;
220273 int i ;
221- DtmCommitMessage * txBuffer = (DtmCommitMessage * )palloc (sizeof (DtmTxBuffer )* ( nNodes ) );
274+ DtmTxBuffer * txBuffer = (DtmTxBuffer * )palloc (sizeof (DtmTxBuffer )* nNodes );
222275
276+ sockets = (int * )palloc (sizeof (int )* nNodes );
277+
223278 openConnections ();
224279
225280 for (i = 0 ; i < nNodes ; i ++ ) {
@@ -229,31 +284,106 @@ static void DtmTransSender(Datum arg)
229284 while (true) {
230285 DtmTransState * ts ;
231286 PGSemaphoreLock (& dtm -> semphore );
287+ CHECK_FOR_INTERRUPTS ();
232288
233289 SpinLockAcquire (& dtm -> spinlock );
234290 ts = dtm -> pendingTransactions ;
235291 dtm -> pendingTransactions = NULL ;
236292 SpinLockRelease (& dtm -> spinlock );
237293
238- for (ts = dtm -> pendingTransactions ; ts != NULL ; ts = ts -> nextPending ) {
239- int node = ts -> gtid .node ;
240- Assert (node != MMNodeId );
241- node -= 1 ;
242- if (txBuffer [node ].used == TX_BUFFER_SIZE ) {
243- WriteSocket (sockets [node ], txBuffer [node ].data , txBuffer [node ].used * sizeof (DtmCommitRequest ));
244- txBuffer [node ].used = 0 ;
294+ for (; ts != NULL ; ts = ts -> nextPending ) {
295+ i = ts -> gtid .node - 1 ;
296+ Assert (i != MMNodeId );
297+ if (txBuffer [i ].used == BUFFER_SIZE ) {
298+ WriteSocket (sockets [i ], txBuffer [i ].data , txBuffer [i ].used * sizeof (DtmCommitRequest ));
299+ txBuffer [i ].used = 0 ;
245300 }
246- txBuffer [node ].data [txBuffer [node ].used ].xid = ts -> xid ;
247- txBuffer [node ].data [txBuffer [node ].used ].csn = ts -> csn ;
248- txBuffer [node ].used += 1 ;
301+ txBuffer [i ].data [txBuffer [i ].used ].xid = ts -> xid ;
302+ txBuffer [i ].data [txBuffer [i ].used ].csn = ts -> csn ;
303+ txBuffer [i ].used += 1 ;
249304 }
250- dtm -> pendingTransactions = NULL ;
251-
305+ for (i = 0 ; i < nNodes ; i ++ ) {
306+ if (txBuffer [i ].used != 0 ) {
307+ WriteSocket (sockets [i ], txBuffer [i ].data , txBuffer [i ].used * sizeof (DtmCommitRequest ));
308+ txBuffer [i ].used = 0 ;
309+ }
310+ }
252311 }
253312}
254313
255314static void DtmTransReceiver (Datum arg )
256315{
316+ int nNodes = dtm -> nNodes - 1 ;
317+ int i , j , rc ;
318+ int rxBufPos = 0 ;
319+ DtmBuffer * rxBuffer = (DtmBuffer * )palloc (sizeof (DtmBuffer )* nNodes );
320+ HTAB * xid2state ;
321+
322+ #ifdef USE_EPOLL
323+ struct epoll_event * events = (struct epoll_event * )palloc (SIZEOF (struct epoll_event )* nNodes );
324+ epollfd = epoll_create (nNodes );
325+ #else
326+ FD_ZERO (& inset );
327+ max_fd = 0 ;
328+ #endif
329+
257330 acceptConnections ();
331+ xid2state = MMCreateHash ();
332+
333+ for (i = 0 ; i < nNodes ; i ++ ) {
334+ txBuffer [i ].used = 0 ;
335+ }
336+
337+ while (true) {
338+ #ifdef USE_EPOLL
339+ rc = epoll_wait (epollfd , events , MAX_EVENTS , shub -> in_buffer_used == 0 ? -1 : shub -> params -> delay );
340+ if (rc < 0 ) {
341+ elog (ERROR , "epoll failed: %d" , errno );
342+ }
343+ for (j = 0 ; j < rc ; j ++ ) {
344+ i = events [j ].data .u32 ;
345+ if (events [j ].events & EPOLLERR ) {
346+ struct sockaddr_in insock ;
347+ socklen_t len = sizeof (insock );
348+ getpeername (fd , (struct sockaddr * )& insock , & len );
349+ elog (WARNING , "Loose connection with %s" , inet_ntoa (insock .sin_addr_ ));
350+ epoll_ctl (epollfd , EPOLL_CTL_DEL , fd , NULL );
351+ }
352+ else if (events [j ].events & EPOLLIN )
353+ #else
354+ fd_set events ;
355+ events = inset ;
356+ rc = select (max_fd + 1 , & events , NULL , NULL , NULL );
357+ if (rc < 0 ) {
358+ elog (ERROR , "select failed: %d" , errno );
359+ }
360+ for (i = 0 ; i < nNodes ; i ++ ) {
361+ if (FD_ISSET (sockets [i ], & events ))
362+ #endif
363+ {
364+ int nResponses ;
365+ rxBuffer [i ].used += ReadSocket (sockets [i ], (char * )rxBuffer [i ].data + rxBuffer [i ].used , RX_BUFFER_SIZE - rxBufPos );
366+ nResponses = rxBuffer [i ].used /sizeof (DtmCommitRequest );
367+
368+ LWLockAcquire (& dtm -> hashLock , LW_SHARED );
369+
370+ for (j = 0 ; j < nResponses ; j ++ ) {
371+ DtmCommitRequest * req = & rxBuffer [i ].data [j ];
372+ DtmTransState * ts = (DtmTransState * )hash_search (xid2state , & req -> xid , HASH_FIND , NULL );
373+ Assert (ts != NULL );
374+ if (req -> csn > ts -> csn ) {
375+ ts -> csn = req -> csn ;
376+ }
377+ if (ts -> nVotes == dtm -> nNodes - 1 ) {
378+ SetLatch (& ProcGlobal -> allProcs [ts -> pid ].procLatch );
379+ }
380+ }
381+ if (rxBuffer [i ].used != nResponses * sizeof (DtmCommitRequest )) {
382+ rxBuffer [i ].used -= nResponses * sizeof (DtmCommitRequest );
383+ memmove (rxBuffer [i ].data , (char * )rxBuffer [i ].data + nResponses * sizeof (DtmCommitRequest ), rxBuffer [i ].used );
384+ }
385+ }
386+ }
387+ }
258388}
259389
0 commit comments