I want to use multiple workers and feeders like this:
feeder => tespPing => testSNMP => testSSH
And for the fastest result, using asyncio seems a great idea.
My current work:
# global variable
ipListPing = asyncio.Queue(30000)
ipListSNMP = asyncio.Queue(1000)
ipListSSH = []
listEquipement = EquipementList()
providerTask = None
pingsTasks = None
SNMPTasks = None
SShTasks = None
def B():
loop = asyncio.get_event_loop() # creation dune boucle asynchrone
watcherTask = asyncio.gather(*[watcher()], return_exceptions= True)
providerTask = asyncio.gather(*[provider(config['range'])], return_exceptions= True ) # generate IP
pingsTasks = asyncio.gather(*[ping() for i in range(4000)], return_exceptions= True ) # test Ping
SNMPTasks = asyncio.gather(*[SNMP() for i in range(2000)], return_exceptions= True ) # test SNMP
# SShTasks = asyncio.gather([SSH()]for i in range(2)) # currently not working next step :p
logging.debug("lancement des taches asynchrone")
all_groups = asyncio.gather(watcherTask, providerTask, pingsTasks, SNMPTasks, return_exceptions= True )
results = loop.run_until_complete(all_groups)
print('fin des taches')
# Waits until all the pending threads are done
SSH()
async def provider(range):
global ipListPing
logging.debug(f'lancement provider')
# dans un premier temps on s'occupe de diviser la plage IP (afin de reduire l'usage RAM en cas de /8 par exemple)
plageIpList = []
for iprange in range:
plageIP = IPv4Network(iprange)
if plageIP.prefixlen < 16:
plageIpList += plageIP.subnets(new_prefix=16)
continue
plageIpList.append(plageIP)
print(f"nombre de plage IP: {len(plageIpList)}")
# nous remplissons la Variable global avec un nombre d'IP "reduit"
while len(plageIpList) > 0:
iprange = plageIpList.pop()
for ip in iprange:
await ipListPing.put(ip.compressed)
logSize()
print(f'fermeture provider')
return
async def ping():
global providerTask
global ipListPing
global ipListSNMP
start = time.time()
print(f'lancement PING')
while not providerTask.done() or not ipListPing.join():
try:
ip = await ipListPing.get()
logSize()
await aioping.ping(ip)
ipListSNMP.put(ip)
except TimeoutError:
pass
finally:
ipListPing.task_done()
print('a')
end = time.time()
elapsed = end - start
print(f'Fermeture PING {elapsed}')
return True
async def SNMP():
global pingsTasks
global ipListSNMP
global ipListSSH
global listEquipement
start = time.time()
print(f'Ouverture SNMP')
while not pingsTasks.done() or not ipListPing.join():
try:
# while len(ipListSSH) > 1000:
# await asyncio.sleep(1)
ip = await ipListSNMP.get()
logSize()
if not await CPE.SNMP(ip):
ipListSSH.append(ip)
else:
listEquipement.add('ACL', ip)
except TimeoutError:
pass
finally:
ipListSNMP.task_done()
print('b')
end = time.time()
elapsed = end - start
print(f'Fermeture SNMP {elapsed} ')
return True
def SSH():
print('Debut SSH')
start = time.time()
max_threads = 100
pool = ProcessPoolExecutor(max_threads)
with ProcessPoolExecutor(max_threads) as pool:
parti = partial(CPE.SSH, username='Depfryer', password='HardPassword')
results_generator = pool.map(parti, ipListSSH)
# Results generator
for result in results_generator:
logSize()
logging.debug(result)
if(result[0]):
listEquipement.add('CPE_SSH', result[1])
else:
listEquipement.add('unknow', result[1])
end = time.time()
elapsed = end - start
return async def provider(range):
global ipListPing
logging.debug(f'lancement provider')
# dans un premier temps on s'occupe de diviser la plage IP (afin de reduire l'usage RAM en cas de /8 par exemple)
plageIpList = []
for iprange in range:
plageIP = IPv4Network(iprange)
if plageIP.prefixlen < 16:
plageIpList += plageIP.subnets(new_prefix=16)
continue
plageIpList.append(plageIP)
print(f"nombre de plage IP: {len(plageIpList)}")
# nous remplissons la Variable global avec un nombre d'IP "reduit"
while len(plageIpList) > 0:
iprange = plageIpList.pop()
for ip in iprange:
await ipListPing.put(ip.compressed)
logSize()
print(f'fermeture provider')
return
async def ping():
global providerTask
global ipListPing
global ipListSNMP
start = time.time()
print(f'lancement PING')
while not providerTask.done() or not ipListPing.join():
try:
ip = await ipListPing.get()
logSize()
await aioping.ping(ip)
ipListSNMP.put(ip)
except TimeoutError:
pass
finally:
ipListPing.task_done()
print('a')
end = time.time()
elapsed = end - start
print(f'Fermeture PING {elapsed}')
return True
# @request_concurrency_limit_decorator(4096)
async def SNMP():
global pingsTasks
global ipListSNMP
global ipListSSH
global listEquipement
start = time.time()
print(f'Ouverture SNMP')
while not pingsTasks.done() or not ipListPing.join():
try:
# while len(ipListSSH) > 1000:
# await asyncio.sleep(1)
ip = await ipListSNMP.get()
logSize()
if not await CPE.SNMP(ip):
ipListSSH.append(ip)
else:
listEquipement.add('ACL', ip)
except TimeoutError:
pass
finally:
ipListSNMP.task_done()
print('b')
end = time.time()
elapsed = end - start
print(f'Fermeture SNMP {elapsed} ')
return True
def SSH():
print('Debut SSH')
start = time.time()
max_threads = 100
pool = ProcessPoolExecutor(max_threads)
with ProcessPoolExecutor(max_threads) as pool:
parti = partial(CPE.SSH, username='Depfryer', password='HardPassword')
results_generator = pool.map(parti, ipListSSH)
# Results generator
for result in results_generator:
logSize()
logging.debug(result)
if(result[0]):
listEquipement.add('CPE_SSH', result[1])
else:
listEquipement.add('unknow', result[1])
end = time.time()
elapsed = end - start
return True
async def watcher():
global providerTask
global ipListPing
global pingsTasks
global ipListSNMP
global SNMPTasks
# await asyncio.sleep(1)
while not(SNMPTasks.done() and pingsTasks.done() and providerTask.done()):
await asyncio.sleep(0.1)
logSize()
logging.debug('watcher')
await ipListPing.join()
logging.debug('watcher')
if providerTask.done() and ipListPing.empty() is None:
logging.debug("KILL des taches de pings")
pingsTasks.cancel()
await ipListSNMP.join()
if pingsTasks.done() and ipListSNMP.empty() is None :
logging.debug("KILL des taches de SNMP")
SNMPTasks.cancel()
The current problem is that the ping task works great, but sometimes it's stuck just after and doesn't do anything else....
Someone can help?
__init__.pyfile.