6

Let's say we have some library (eg. for XML parsing) that accepts a callback and calls it everytime it encounters some event (eg. find some XML tag). I'd like to be able to transform those callbacks into a generator that can be iterated via the for loop. Is that possible in Python without using threads or collecting all the callback results (ie. with lazy evaluation)?

Example:

# this is how I can produce the items
def callback(item)
    # do something with each item
parser.parse(xml_file, callback=callback)

# this is how the items should be consumed
for item in iter_parse(xml_file):
    print(item)

I've tried to study if coroutines could be used but it seems that coroutines are useful for pushing data from the producer, while generator pull data to the consumer.

The natural idea was that the producer and consumer would be coroutines that would ping the execution flow back and forth.

I've managed to get a producer-consumer pattern working with the asyncio loop (in a similar way to this answer). However it cannot be used like a generator in a for loop:

import asyncio

q = asyncio.Queue(maxsize=1)

@asyncio.coroutine
def produce(data):
    for v in data:
        print("Producing:", v)
        yield from q.put(v)
        print("Producer waiting")
    yield from q.put(None)
    print("Producer done")

@asyncio.coroutine
def consume():
    while True:
        print("Consumer waiting")
        value = yield from q.get()
        print("Consumed:", value)
        if value is not None:
            # process the value
            yield from asyncio.sleep(0.5)
        else:
            break
    print("Consumer done")

tasks = [
    asyncio.Task(consume()),
    asyncio.Task(produce(data=range(5)))
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

The problem is that the result cannot be iterated in a for loop since it is managed by the loop.

When I rewrite the code so that the callback is called from an ordinary function, the problem is that asyncio.Queue.put() called from the callback doesn't block and the computation is not lazy.

import asyncio

q = asyncio.Queue(maxsize=1)

def parse(data, callback):
    for value in data:
        # yield from q.put(value)
        callback(value)

@asyncio.coroutine
def produce(data):
    @asyncio.coroutine
    def enqueue(value):
        print('enqueue()', value)
        yield from q.put(value)
    def callback(value):
        print('callback()', value)
        asyncio.async(enqueue(value))
    parse(data, callback)

    print('produce()')
    print('produce(): enqueuing sentinel value')
    asyncio.async(enqueue(None))
    print('produce(): done')

@asyncio.coroutine
def consume():
    print('consume()')
    while True:
        print('consume(): waiting')
        value = yield from q.get()
        print('consumed:', value)
        if value is not None:
            # here we'd like to yield and use this in a for loop elsewhere
            print(value)
        else:
            break
    print('consume(): done')

tasks = [
    asyncio.Task(consume()),
    asyncio.Task(produce(range(5)))
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

# I'd like:
# for value in iter_parse(data=range(5)):
#   print('consumed:', value)

It this kind of computation even possible with asyncio or do I need to use greenlet or gevent? I seems in gevent it is possible to iterate over async results in for loop but I don't like to depend on another library if possible and it is not completely ready for Python 3.

3
  • Is there a particular reason you want to avoid threads here? It doesn't make sense to use asyncio or even gevent unless your xml parsing library is integrated with those frameworks. Otherwise, parser.parse is always going to block until its done, which will block the asyncio/gevent event loop. Since XML parsing isn't I/O-bound, I doubt your XML library is integrated with either framework, so your only option is to use a thread. Commented Apr 19, 2015 at 1:07
  • if this parsing is done within an async server then you can yield earlier than if undertaking a computationally heavy load regardless of I/O bound. for example, in boost asio a stackless coroutine is used in some cases to parse a simple string. Commented Apr 19, 2015 at 2:42
  • Please check PEP 525 of Python 3.6 to see if that can be helpful. Commented Mar 26, 2018 at 2:01

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.