1

I want to use multiple workers and feeders like this:

feeder => tespPing => testSNMP => testSSH

And for the fastest result, using asyncio seems a great idea.

My current work:

# global variable
ipListPing = asyncio.Queue(30000)
ipListSNMP = asyncio.Queue(1000)
ipListSSH =  []


listEquipement = EquipementList()

providerTask = None
pingsTasks = None
SNMPTasks = None
SShTasks = None
def B():
    loop = asyncio.get_event_loop()  # creation dune boucle asynchrone
    watcherTask = asyncio.gather(*[watcher()], return_exceptions= True)
    providerTask = asyncio.gather(*[provider(config['range'])], return_exceptions= True )  # generate IP
    pingsTasks = asyncio.gather(*[ping() for i in range(4000)], return_exceptions= True ) # test Ping
    SNMPTasks = asyncio.gather(*[SNMP() for i in range(2000)], return_exceptions= True ) # test SNMP
# SShTasks = asyncio.gather([SSH()]for i in range(2)) # currently not working next step :p
    logging.debug("lancement des taches asynchrone")

    all_groups = asyncio.gather(watcherTask, providerTask, pingsTasks, SNMPTasks, return_exceptions= True )
    results = loop.run_until_complete(all_groups)
    print('fin des taches')

# Waits until all the pending threads are done
    SSH()

async def provider(range):
    global ipListPing
    logging.debug(f'lancement provider')

    # dans un premier temps on s'occupe de diviser la plage IP (afin de reduire l'usage RAM en cas de /8 par exemple)
    plageIpList = []
    for iprange in range:
        plageIP = IPv4Network(iprange)
        if plageIP.prefixlen < 16:
            plageIpList += plageIP.subnets(new_prefix=16)
            continue
        plageIpList.append(plageIP)
    print(f"nombre de plage IP: {len(plageIpList)}")
    

    # nous remplissons la Variable global avec un nombre d'IP "reduit"
    while len(plageIpList) > 0:

        iprange = plageIpList.pop()
        for ip in iprange:
            await ipListPing.put(ip.compressed)
            logSize()

    print(f'fermeture provider')
    return

async def ping():
    global providerTask
    global ipListPing
    global ipListSNMP
    start = time.time()
    print(f'lancement PING')

    while not providerTask.done() or not ipListPing.join():

        try:
            ip = await ipListPing.get()
            logSize()

            await aioping.ping(ip)
            ipListSNMP.put(ip)

        except TimeoutError:
            pass
        finally:
            ipListPing.task_done()
            print('a')

    end = time.time()
    elapsed = end - start
    print(f'Fermeture PING {elapsed}')
    return True


async def SNMP():

    global pingsTasks
    global ipListSNMP
    global ipListSSH
    global listEquipement
    start = time.time()

    print(f'Ouverture SNMP')

    while not pingsTasks.done() or not ipListPing.join():
        try:
            # while len(ipListSSH) > 1000:
            #     await asyncio.sleep(1)

            ip = await ipListSNMP.get()
            logSize()

            if not await CPE.SNMP(ip):
                ipListSSH.append(ip)

            else:
                listEquipement.add('ACL', ip)
        except TimeoutError:
            pass
        finally:
            ipListSNMP.task_done()
            print('b')

    end = time.time()
    elapsed = end - start
    print(f'Fermeture SNMP {elapsed} ')
    return True


def SSH():
    print('Debut SSH')
    start = time.time()
    max_threads = 100

    pool = ProcessPoolExecutor(max_threads)

    with ProcessPoolExecutor(max_threads) as pool:
        parti = partial(CPE.SSH, username='Depfryer', password='HardPassword')
        results_generator = pool.map(parti, ipListSSH)

        # Results generator
        for result in results_generator:
            logSize()
            logging.debug(result)
            if(result[0]):
                listEquipement.add('CPE_SSH', result[1])
            else:
                listEquipement.add('unknow', result[1])

    end = time.time()
    elapsed = end - start
    return    async def provider(range):
    global ipListPing
    logging.debug(f'lancement provider')

    # dans un premier temps on s'occupe de diviser la plage IP (afin de reduire l'usage RAM en cas de /8 par exemple)
    plageIpList = []
    for iprange in range:
        plageIP = IPv4Network(iprange)
        if plageIP.prefixlen < 16:
            plageIpList += plageIP.subnets(new_prefix=16)
            continue
        plageIpList.append(plageIP)
    print(f"nombre de plage IP: {len(plageIpList)}")
    

    # nous remplissons la Variable global avec un nombre d'IP "reduit"
    while len(plageIpList) > 0:

        iprange = plageIpList.pop()
        for ip in iprange:
            await ipListPing.put(ip.compressed)
            logSize()

    print(f'fermeture provider')
    return

async def ping():
    global providerTask
    global ipListPing
    global ipListSNMP
    start = time.time()
    print(f'lancement PING')

    while not providerTask.done() or not ipListPing.join():

        try:
            ip = await ipListPing.get()
            logSize()

            await aioping.ping(ip)
            ipListSNMP.put(ip)

        except TimeoutError:
            pass
        finally:
            ipListPing.task_done()
            print('a')

    end = time.time()
    elapsed = end - start
    print(f'Fermeture PING {elapsed}')
    return True


# @request_concurrency_limit_decorator(4096)
async def SNMP():

    global pingsTasks
    global ipListSNMP
    global ipListSSH
    global listEquipement
    start = time.time()

    print(f'Ouverture SNMP')

    while not pingsTasks.done() or not ipListPing.join():
        try:
            # while len(ipListSSH) > 1000:
            #     await asyncio.sleep(1)

            ip = await ipListSNMP.get()
            logSize()

            if not await CPE.SNMP(ip):
                ipListSSH.append(ip)

            else:
                listEquipement.add('ACL', ip)
        except TimeoutError:
            pass
        finally:
            ipListSNMP.task_done()
            print('b')

    end = time.time()
    elapsed = end - start
    print(f'Fermeture SNMP {elapsed} ')
    return True


def SSH():
    print('Debut SSH')
    start = time.time()
    max_threads = 100

    pool = ProcessPoolExecutor(max_threads)

    with ProcessPoolExecutor(max_threads) as pool:
        parti = partial(CPE.SSH, username='Depfryer', password='HardPassword')
        results_generator = pool.map(parti, ipListSSH)

        # Results generator
        for result in results_generator:
            logSize()
            logging.debug(result)
            if(result[0]):
                listEquipement.add('CPE_SSH', result[1])
            else:
                listEquipement.add('unknow', result[1])

    end = time.time()
    elapsed = end - start
    return True

async def watcher():
    global providerTask
    global ipListPing
    global pingsTasks
    global ipListSNMP
    global SNMPTasks
    # await asyncio.sleep(1)

    while not(SNMPTasks.done() and pingsTasks.done() and providerTask.done()):
        await asyncio.sleep(0.1)
        logSize()

        logging.debug('watcher')

        await ipListPing.join()
        logging.debug('watcher')

        if providerTask.done() and ipListPing.empty() is None:
            logging.debug("KILL des taches de pings")
            pingsTasks.cancel()

        await ipListSNMP.join()
        if pingsTasks.done() and ipListSNMP.empty() is None :
            logging.debug("KILL des taches de SNMP")
            SNMPTasks.cancel() 
            

The current problem is that the ping task works great, but sometimes it's stuck just after and doesn't do anything else....

Someone can help?

2
  • I wrote a much smaller set a while ago and had trouble with the python ping libraries. While expensive to run I used a subprocess call to run the OS ping command instead. It's not clear to me that aioping works for multiple simultaneous ping requests ... and I do not like the code in that library as it is all in the __init__.py file. Commented Sep 25, 2021 at 0:48
  • Ye i see all in init really weird, but work fine, gonna test with call process and see if something change Commented Sep 26, 2021 at 1:25

1 Answer 1

1

I edited my watcher function. Now it is:

    async def watcher():
    global providerTask
    global ipListPing
    global pingsTasks
    global ipListSNMP
    global SNMPTasks

    while True:
        logSize()
        await asyncio.sleep(30)


        # await ipListPing.join()
        if providerTask.done() and ipListPing.empty():
            logging.debug("KILL des taches de pings")
            pingsTasks.cancel()


        if pingsTasks.done() and ipListSNMP.empty() :
            logging.debug("KILL des taches de SNMP")
            SNMPTasks.cancel() 


            return
            # break

And it works great.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.