1919
2020#include "sockhub.h"
2121
22+ #define SOCKHUB_BUFFER_SIZE (1024*1024)
23+ #define ERR_BUF_SIZE 1024
24+
25+ void ShubAddSocket (Shub * shub , int fd );
26+
27+ inline void ShubAddSocket (Shub * shub , int fd )
28+ {
29+ #ifdef USE_EPOLL
30+ struct epoll_event ev ;
31+ ev .events = EPOLLIN ;
32+ ev .data .fd = fd ;
33+ if (epoll_ctl (shub -> epollfd , EPOLL_CTL_ADD , fd , & ev ) < 0 ) {
34+ char buf [ERR_BUF_SIZE ];
35+ sprintf (buf , "Failed to add socket %d to epoll set" , fd );
36+ shub -> params -> error_handler (buf , SHUB_FATAL_ERROR );
37+ }
38+ #else
39+ FD_SET (fd , & shub -> inset );
40+ if (fd > shub -> max_fd ) {
41+ shub -> max_fd = fd ;
42+ }
43+ #endif
44+ }
45+
46+
2247static void default_error_handler (char const * msg , ShubErrorSeverity severity )
2348{
2449 perror (msg );
@@ -116,8 +141,15 @@ static int resolve_host_by_name(const char *hostname, unsigned* addrs, unsigned*
116141
117142static void close_socket (Shub * shub , int fd )
118143{
119- close (fd );
144+ #ifdef USE_EPOLL
145+ if (epoll_ctl (shub -> epollfd , EPOLL_CTL_DEL , fd , NULL ) < 0 ) {
146+ char buf [ERR_BUF_SIZE ];
147+ sprintf (buf , "Failed to remove socket %d from epoll set" , fd );
148+ shub -> params -> error_handler (buf , SHUB_RECOVERABLE_ERROR );
149+ }
150+ #else
120151 FD_CLR (fd , & shub -> inset );
152+ #endif
121153}
122154
123155int ShubReadSocketEx (int sd , void * buf , int min_size , int max_size )
@@ -211,7 +243,12 @@ static void reconnect(Shub* shub)
211243 } else {
212244 int optval = 1 ;
213245 setsockopt (shub -> output , IPPROTO_TCP , TCP_NODELAY , (char const * )& optval , sizeof (optval ));
214- FD_SET (shub -> output , & shub -> inset );
246+ optval = SOCKHUB_BUFFER_SIZE ;
247+ setsockopt (shub -> output , SOL_SOCKET , SO_SNDBUF , (const char * ) & optval , sizeof (int ));
248+ optval = SOCKHUB_BUFFER_SIZE ;
249+ setsockopt (shub -> output , SOL_SOCKET , SO_RCVBUF , (const char * ) & optval , sizeof (int ));
250+
251+ ShubAddSocket (shub , shub -> output );
215252 return ;
216253 }
217254 }
@@ -238,6 +275,7 @@ static void notify_disconnect(Shub* shub, int chan)
238275
239276static void recovery (Shub * shub )
240277{
278+ #ifndef USE_EPOLL
241279 int i , max_fd ;
242280
243281 for (i = 0 , max_fd = shub -> max_fd ; i <= max_fd ; i ++ ) {
@@ -254,6 +292,7 @@ static void recovery(Shub* shub)
254292 }
255293 }
256294 }
295+ #endif
257296}
258297
259298void ShubInitialize (Shub * shub , ShubParams * params )
@@ -276,11 +315,17 @@ void ShubInitialize(Shub* shub, ShubParams* params)
276315 if (listen (shub -> input , params -> queue_size ) < 0 ) {
277316 shub -> params -> error_handler ("Failed to listen local socket" , SHUB_FATAL_ERROR );
278317 }
279- FD_ZERO (& shub -> inset );
280- FD_SET (shub -> input , & shub -> inset );
281-
282318 shub -> output = -1 ;
283- shub -> max_fd = shub -> input ;
319+ #ifdef USE_EPOLL
320+ shub -> epollfd = epoll_create (MAX_EVENTS );
321+ if (shub -> epollfd < 0 ) {
322+ shub -> params -> error_handler ("Failed to create epoll" , SHUB_FATAL_ERROR );
323+ }
324+ #else
325+ FD_ZERO (& shub -> inset );
326+ shub -> max_fd = 0 ;
327+ #endif
328+ ShubAddSocket (shub , shub -> input );
284329 reconnect (shub );
285330
286331 shub -> in_buffer = malloc (params -> buffer_size );
@@ -301,43 +346,61 @@ static void die(int sig)
301346void ShubLoop (Shub * shub )
302347{
303348 int buffer_size = shub -> params -> buffer_size ;
349+ sigset_t sset ;
304350 signal (SIGINT , die );
305351 signal (SIGQUIT , die );
306352 signal (SIGTERM , die );
307- // signal(SIGHUP, die);
308- sigset_t sset ;
353+ /* signal(SIGHUP, die); */
309354 sigfillset (& sset );
310355 sigprocmask (SIG_UNBLOCK , & sset , NULL );
311356
312- while (!stop ) {
357+ while (!stop ) {
358+ int i , rc ;
359+ #ifdef USE_EPOLL
360+ struct epoll_event events [MAX_EVENTS ];
361+ rc = epoll_wait (shub -> epollfd , events , MAX_EVENTS , shub -> in_buffer_used == 0 ? -1 : shub -> params -> delay );
362+ #else
313363 fd_set events ;
314364 struct timeval tm ;
315- int i , rc ;
316365 int max_fd = shub -> max_fd ;
317366
318367 tm .tv_sec = shub -> params -> delay /1000 ;
319368 tm .tv_usec = shub -> params -> delay % 1000 * 1000 ;
320-
321369 events = shub -> inset ;
322370 rc = select (max_fd + 1 , & events , NULL , NULL , shub -> in_buffer_used == 0 ? NULL : & tm );
323- if (rc < 0 ) {
324- if (errno != EINTR ) {
371+ #endif
372+ if (rc < 0 ) {
373+ if (errno != EINTR ) {
325374 shub -> params -> error_handler ("Select failed" , SHUB_RECOVERABLE_ERROR );
326375 recovery (shub );
327376 }
328377 } else {
329378 if (rc > 0 ) {
379+ #ifdef USE_EPOLL
380+ int j ;
381+ int n = rc ;
382+ for (j = 0 ; j < n ; j ++ ) {
383+ i = events [j ].data .fd ;
384+ if (events [j ].events & EPOLLERR ) {
385+ if (i == shub -> input ) {
386+ shub -> params -> error_handler ("Input socket error" , SHUB_FATAL_ERROR );
387+ } else if (i == shub -> output ) {
388+ reconnect (shub );
389+ } else {
390+ notify_disconnect (shub , i );
391+ close_socket (shub , i );
392+ }
393+ } else if (events [j ].events & EPOLLIN ) {
394+ #else
330395 for (i = 0 ; i <= max_fd ; i ++ ) {
331396 if (FD_ISSET (i , & events )) {
332- if (i == shub -> input ) { /* accept incomming connection */
397+ #endif
398+ if (i == shub -> input ) { /* accept incomming connection */
333399 int s = accept (i , NULL , NULL );
334400 if (s < 0 ) {
335401 shub -> params -> error_handler ("Failed to accept socket" , SHUB_RECOVERABLE_ERROR );
336402 } else {
337- if (s > shub -> max_fd ) {
338- shub -> max_fd = s ;
339- }
340- FD_SET (s , & shub -> inset );
403+ ShubAddSocket (shub , s );
341404 }
342405 } else if (i == shub -> output ) { /* receive response from server */
343406 /* try to read as much as possible */
@@ -424,8 +487,11 @@ void ShubLoop(Shub* shub)
424487 assert (sizeof (ShubMessageHdr ) > available );
425488 /* read as much as possible */
426489 rc = ShubReadSocketEx (chan , & shub -> in_buffer [pos + available ], sizeof (ShubMessageHdr ) - available , buffer_size - pos - available );
427- if (rc < sizeof (ShubMessageHdr ) - available ) {
428- shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
490+ if (rc < sizeof (ShubMessageHdr ) - available ) {
491+ char buf [ERR_BUF_SIZE ];
492+ sprintf (buf , "Failed to read local socket chan=%d, rc=%d, min requested=%ld, max requested=%d, errno=%d" , chan , rc , sizeof (ShubMessageHdr ) - available , buffer_size - pos - available , errno );
493+ shub -> params -> error_handler (buf , SHUB_RECOVERABLE_ERROR );
494+ //shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
429495 close_socket (shub , i );
430496 shub -> in_buffer_used = pos ;
431497 notify_disconnect (shub , i );
@@ -460,8 +526,11 @@ void ShubLoop(Shub* shub)
460526 /* fetch rest of message body */
461527 do {
462528 unsigned int n = processed + size > buffer_size ? buffer_size - processed : size ;
463- if (chan >= 0 && !ShubReadSocket (chan , shub -> in_buffer + processed , n )) {
464- shub -> params -> error_handler ("Failed to read local socket" , SHUB_RECOVERABLE_ERROR );
529+ if (chan >= 0 && !ShubReadSocket (chan , shub -> in_buffer + processed , n )) {
530+ char buf [ERR_BUF_SIZE ];
531+ sprintf (buf , "Failed to read local socket rc=%d, len=%d, errno=%d" , rc , n , errno );
532+ shub -> params -> error_handler (buf , SHUB_RECOVERABLE_ERROR );
533+ //shub->params->error_handler("Failed to read local socket", SHUB_RECOVERABLE_ERROR);
465534 close_socket (shub , chan );
466535 if (hdr != NULL ) { /* if message header is not yet sent to the server... */
467536 /* ... then skip this message */
0 commit comments