@@ -44,6 +44,7 @@ static CustomScanMethods distplanexec_plan_methods;
4444static CustomExecMethods distplanexec_exec_methods ;
4545
4646char destsName [10 ] = "DMQ_DESTS" ;
47+ char * network_interface ;
4748
4849
4950static Node * CreateDistPlanExecState (CustomScan * node );
@@ -257,7 +258,7 @@ EstablishDMQConnections(const lcontext *context, const char *serverName,
257258 sprintf (connstr , "host=%s port=%d "
258259 "fallback_application_name=%s" ,
259260 host , port , senderName );
260-
261+ elog ( LOG , "CONN STR: %s" , connstr );
261262 sub -> dest_id = dmq_destination_add (connstr , senderName , receiverName , 10 );
262263 memcpy (sub -> node , receiverName , strlen (receiverName ) + 1 );
263264 }
@@ -391,12 +392,9 @@ ExplainDistPlanExec(CustomScanState *node, List *ancestors, ExplainState *es)
391392}
392393
393394static struct Plan *
394- CreateDistExecPlan (PlannerInfo * root ,
395- RelOptInfo * rel ,
396- struct CustomPath * best_path ,
397- List * tlist ,
398- List * clauses ,
399- List * custom_plans )
395+ CreateDistExecPlan (PlannerInfo * root , RelOptInfo * rel ,
396+ struct CustomPath * best_path ,
397+ List * tlist , List * clauses , List * custom_plans )
400398{
401399 CustomScan * distExecNode ;
402400
@@ -572,7 +570,7 @@ localize_plan(Plan *node, lcontext *context)
572570 if (IsExchangePlanNode (node ))
573571 {
574572 List * private = ((CustomScan * ) node )-> custom_private ;
575- elog ( LOG , "LOCALIZE: exchange" );
573+
576574 if (lnext (lnext (list_head (private ))))
577575 context -> indexinfo = (IndexOptInfo * ) lthird (private );
578576 }
@@ -582,7 +580,6 @@ elog(LOG, "LOCALIZE: exchange");
582580 if (context -> foreign_scans != NIL )
583581 {
584582 CustomScan * css = (CustomScan * ) node ;
585- // Index scanrelid = ((Scan *) cstmSubPlan1(node))->scanrelid;
586583
587584 Assert (list_length (context -> foreign_scans ) == 1 );
588585 css -> custom_plans = list_delete_ptr (css -> custom_plans ,
@@ -722,10 +719,48 @@ FSExtractServerName(Oid fsid, char **host, int *port)
722719 * host = hostname ;
723720}
724721
722+ #include <unistd.h>
723+ #include <sys/types.h>
724+ #include <sys/socket.h>
725+ #include <sys/ioctl.h>
726+ #include <netinet/in.h>
727+ #include <net/if.h>
728+ #include <arpa/inet.h>
729+ #include "common/ip.h"
730+
725731void
726732GetMyServerName (char * * host , int * port )
727733{
728- * host = pstrdup (LOCALHOST );
734+ int fd ;
735+ struct ifreq ifr ;
736+ struct addrinfo hintp ;
737+ struct addrinfo * result ;
738+ char * sipaddr ;
739+ struct sockaddr_storage saddr ;
740+ int res ;
741+
742+ fd = socket (AF_INET , SOCK_DGRAM , 0 );
743+
744+ /* I want to get an IPv4 IP address */
745+ ifr .ifr_addr .sa_family = AF_INET ;
746+
747+ /* I want IP address attached to "eth0" */
748+ strncpy (ifr .ifr_name , network_interface , IFNAMSIZ - 1 );
749+ ioctl (fd , SIOCGIFADDR , & ifr );
750+ close (fd );
751+
752+ MemSet (& hintp , 0 , sizeof (hintp ));
753+ hintp .ai_family = AF_INET ;
754+ hintp .ai_flags = AI_ALL ;
755+ sipaddr = inet_ntoa (((struct sockaddr_in * )& ifr .ifr_addr )-> sin_addr );
756+ if ((res = pg_getaddrinfo_all (sipaddr , NULL , & hintp , & result )) != 0 )
757+ elog (FATAL , "Cannot resolve network address %s, error=%d." , sipaddr , res );
758+ memcpy (& saddr , result -> ai_addr , result -> ai_addrlen );
759+ * host = (char * ) palloc0 (NI_MAXHOST );
760+ if (pg_getnameinfo_all (& saddr , result -> ai_addrlen , * host , NI_MAXHOST ,
761+ NULL , 0 , 0 ) != 0 )
762+ elog (FATAL , "Cannot resolve network name" );
763+
729764 * port = PostPortNumber ;
730765}
731766
@@ -755,8 +790,9 @@ dmq_init_barrier(DMQDestCont *dmq_data, PlanState *child)
755790 /* Wait for dmq connection establishing */
756791 for (i = 0 ; i < dmq_data -> nservers ; i ++ )
757792 while (dmq_get_destination_status (dmq_data -> dests [i ].dest_id ) != Active );
758-
793+ elog ( LOG , "DMQ INIT BARRIER" );
759794 init_exchange_channel (child , (void * ) dmq_data );
795+ elog (LOG , "END DMQ INIT BARRIER" );
760796}
761797
762798static bool
@@ -809,7 +845,7 @@ init_exchange_channel(PlanState *node, void *context)
809845 }
810846 else
811847 state -> indexes [i ] = j ;
812-
848+ elog ( LOG , "SendByteMessage: j=%d, dest_id=%d, stream=%s" , j , dmq_data -> dests [ j ]. dest_id , state -> stream );
813849 SendByteMessage (dmq_data -> dests [j ].dest_id , state -> stream , ib );
814850 }
815851 return false;
0 commit comments