0

Context: I want to query an in-memory dict struct and use the same query function to trigger some post-processing tasks (like getting extended info attached to the dict, hence the name exinfo) as I don't want to have those in the "critical processing path". Asyncio is acting very weird possibly because I'm doing some stuff wrong here. The task_resolve_names and task_save are not getting executed, only if I uncomment that "hello world" testing task prior to running the run_forever thread. I appreciate the help.

exinfo.py

import os, sys
from os import path
import pickle
import asyncio
import threading
import sys
print (sys.version)
print (sys.version_info)

exinfopath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
extpkl = os.path.join(exinfopath, "exinfo.pkl")

exinfo = None

class ExInfo:
    instance = None
    
    def __new__(cls, exinfopath, extpkl):
        if not ExInfo.instance:
            ExInfo.instance = ExInfo.__ExInfo(exinfopath, extpkl)
        return ExInfo.instance

    class __ExInfo:
        client = None
        ext = None
        count = 0
        lock = asyncio.Lock()
        loop = None
        extpkl = None
        def __init__(self, exinfopath, extpkl): #arg TBD
            self.loop = asyncio.get_event_loop()
            #self.loop.create_task(self.hello()) <--- weird behavior, UNCOMMENT this to mk stuff happen
            threading.Thread(target=self.thread_main_loop, 
                args=[self.lock, self.loop],
                daemon=True).start()

        def query(self, host, dnsquery): #host can be ipaddr or not
            r = {}
            r['hits'] = 1
            r['query'] = dnsquery
            print("query")
            
            if 'post_process1' not in r.keys():
                print("self.loop.create_task")
                self.loop.create_task(self.task_resolve_names(r, self.lock))
            
            if self.count % 5 == 0: #save pickle every 5 executions
                print("self.loop.create_task")
                self.loop.create_task(self.task_save(r, self.lock))
            
            return r
        
        def thread_main_loop(self, lock, loop):
            print("thread_main_loop / loop.run_forever()")
            loop.run_forever()
            loop.close()

        async def hello(self):
            print("WORLD")

        async def task_resolve_names(self, ext, lock):
            print("task_resolve_names")
            async with lock:
              ext['post_process1'] = 'OK'
              
        
        async def task_save(self, ext, lock):
             print("task_save")
             async with lock:
                ext['post_process2'] = 'OK'
                #with open(self.extpkl, 'wb') as f:
                #    pickle.dump(self.ext, f, protocol=pickle.HIGHEST_PROTOCOL)

def init():
    global exinfo
    exinfo = ExInfo(exinfopath, extpkl)

def test_query(info, query):
    global exinfo
    print("{}".format(exinfo.query(info, query)))

test_exinfo.py

#!/usr/bin/env python3
import os
import exinfo
import time
exinfopath = os.path.join(os.path.dirname(os.path.abspath(__file__)), "data")
extpkl = os.path.join(exinfopath, "exinfo.pkl")

exinfo.init()
exinfo.test_query("someinfo", "someinfo")

time.sleep(10)

actual result:

3.7.6 (default, Jan  8 2020, 13:42:34) 
[Clang 4.0.1 (tags/RELEASE_401/final)]
sys.version_info(major=3, minor=7, micro=6, releaselevel='final', serial=0)
thread_main_loop / loop.run_forever()
query
self.loop.create_task
self.loop.create_task
{'hits': 1, 'query': ''}

expected result:

3.7.6 (default, Jan  8 2020, 13:42:34) 
[Clang 4.0.1 (tags/RELEASE_401/final)]
sys.version_info(major=3, minor=7, micro=6, releaselevel='final', serial=0)
thread_main_loop / loop.run_forever()
query
self.loop.create_task
self.loop.create_task
{'hits': 1, 'query': ''}
task_resolve_names
task_save
3
  • 1
    query uses create_task to add a task to an event loop running in a different thread without proper synchronization. Use asyncio.run_coroutine_threadsafe instead and your code should work. Commented Sep 22, 2020 at 7:11
  • @user4815162342 thanks, that fixed the problem, if you want to submit an answer, that would be great, otherwise I'm happy to. Commented Sep 22, 2020 at 20:41
  • Thanks, I've now submitted an answer. Commented Sep 22, 2020 at 20:43

1 Answer 1

1

The query method uses loop.create_task to add a task to an event loop running in a different thread without proper synchronization. To gix the issue, use asyncio.run_coroutine_threadsafe instead.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.