3

I'm trying to parallelise my job, but I'm new to multithreading, so feel confused about the concrete implementation.

I have a socket listener, that saves data to a buffer. When buffer reaches his capacity I need to save its data to database. On one thread I want to start socket listener, while on parallel task I want to check the buffer status.

BufferQueue is just an extension of a python list, with method that allow to check whether the list has reached the specified size.

SocketManager is streaming data provider of a STREAM_URL I'm listening to. It use callback function to handle messages

But as I use callbacks to retrieve data I'm not sure that using shared variable is a right and optimal decision for that

buffer = BufferQueue(buffer_size=10000)

def start_listening_to_sokcet(client):
    s = SocketManager(client)
    s.start_socket(cb_new)
    s.start()

def cb_new(message):
    print("New message")
    global buffer
    for m in message:
        #save data to buffer

def is_buffer_ready(buffer):
    global buffer
    print("Buffer state")
    if buffer.ready():
         #save buffer data to db

I'm appreciate if you can help me with this case

4
  • You can use that shared buffer, but you need some way to control access to it so that only one thread at a time can modify it. Eg, you could use a Lock. Commented Jun 19, 2018 at 7:42
  • Without knowing where these BufferQueue, SocketManager, etc. types come from, or at least what they do, it's pretty hard to offer anything that's not really vague. But I'd be wary of any API that used an is_buffer_ready function that the caller had to check periodically (or, worse, in a spin-loop); usually you're going to want something you can block on instead. Commented Jun 19, 2018 at 7:43
  • If you can give us a minimal reproducible example, we can probably suggest more concrete ideas than "you probably want a Lock here" and "usually you want some way to block there"… Commented Jun 19, 2018 at 7:45
  • @abarnert thanks for suggestion. I edited my question Commented Jun 19, 2018 at 7:59

2 Answers 2

4

I think all you’re looking for is the queue module.

A queue.Queue is a self-synchronized queue designed specifically for passing objects between threads.

By default, calling get on a queue will block until an object is available, which is what you usually want to do—the point of using threads for concurrency in a network app is that your threads all look like normal synchronous code, but spend most of their time waiting on a socket, a file, a queue, or whatever when they have nothing to do. But you can check without blocking by using block=False, or put a timeout on the wait.

You can also specify a maxsize when you construct the queue. Then, by default, put will block until the queue isn’t too full to accept the new object. But, again, you can use block or timeout to try and fail if it’s too full.

All synchronization is taken care of internally inside get and put, so you don’t need a Lock to guarantee thread safety or a Condition to signal waiters.

A queue can even take care of shutdown for you. The producer can just put a special value that tells the consumer to quit when it sees it on a get.

For graceful shutdown where the producer then needs to wait until the consumer has finished, you can use the optional task_done method after the consumer has finished processing each queued object, and have the producer block on the join method. But if you don’t need this—or or have another way to wait for shutdown, e.g., joining the consumer thread—you can skip this part.

Sign up to request clarification or add additional context in comments.

Comments

2

Multithreading gives you shared state of resources (variables). Instead of using globals, just pass in the buffer as an argument to your methods, and read/write from/to it.

You still need to control access to the buffer resource, so both threads are not reading/writing at the same time. You can achieve that using Lock from the threading module:

lock = threading.Lock()

def cb_new(buffer_, lock_, message):
    print("New message")
    with lock_():
        for m in message:
            #save data to buffer
            buffer.add(m)

def is_buffer_ready(buffer_, lock_):
    print("Buffer state")
    with lock_():
        if buffer_.ready():
             #save buffer data to db

Note that in case you are working with multiprocessing instead of threads, this solution won't work.

By the way, as @abarnert commented, there are better mechanisms to check if the buffer is ready (has data to read / has free space to write) then calling a function that checks it. Check out select.select() which blocks you until the buffer is actually ready.


When working with select, you put the calls inside a while True loop, and then you check if the buffer is ready for reading. You can start this function in a thread, passing a flag variable and the buffer. If you want to stop the thread, change the flag you passed to False. For the buffer object, use Queue.Queue() or similar datastructure.

def read_select(flag, buff):
    flag = 1
    while flag:
        r, _, _ = select.select([buff], [], [])
        if r:
            data = s.read(BUFFSIZE)
            # process data

P.S - select also works with sockets. You can pass a socket object instead of a buffer, and it would check if the buffer on the socket is ready for read.

4 Comments

I guess you are right. select.select() looks very reasonable in that case. Could you give me a hint how to start
I've added an example of how you would use it.
I don’t think this is a good suggestion. You can’t select on a queue, and you don’t need to; you just block on the get call. And, while you can select on a socket, again, you don’t need to; you can just block on the recv call. The main point of select is that you can block on multiple sockets at the same time without needing a thread per socket. It’s an alternative to multithreading for network concurrency; you don’t need to use both.
And you definitely don’t want to use select on top of a callback API. You might want to use it to implement that callback API, but again, that’s as an alternative to threads (and there are better alternatives, like asyncio, or a third party lib like Twisted). And, either way, once your callback has been called, either the data is ready to read or it’s already been read, so you don’t want to call select on anything.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.