21

I am looking for more insights on the Queues implementations in Python than I can find in the documentation.

From what I understood, and excuse my ignorance if I am wrong on this:

queue.Queue(): is implemented through basic arrays in-memory and so cannot be shared between multiple processes but can be shared between threads. So far, so good.

multiprocessing.Queue(): is implemented through pipes (man 2 pipes) which have size limit (rather tiny: on Linux, man 7 pipe says 65536 untweaked):

Since Linux 2.6.35, the default pipe capacity is 65536 bytes, but the capacity can be queried and set using the fcntl(2) F_GETPIPE_SZ and F_SETPIPE_SZ operations

But, in Python, whenever I try to write data larger than 65536 bytes into the pipe, it works without exception - I could flood my memory this way:

import multiprocessing
from time import sleep

def big():
    result = ""
    for i in range(1,70000):
        result += ","+str(i)
    return result # 408888 bytes string

def writequeue(q):
    while True:
        q.put(big())
        sleep(0.1)

if __name__ == '__main__':
    q = multiprocessing.Queue()
    p = multiprocessing.Process(target=writequeue, args=(q,))
    p.start()
    while True:
        sleep(1) # No pipe consumption, we just want to flood the pipe

So here are my questions:

  • does Python tweak the pipe limit? if yes, by how much ? Python source code is welcomed.

  • Are Python piped communications inter-operable with other non-Python processes? If yes, working examples (JS preferably) and resource links are welcomed.

3
  • 1
    You could look at the code of the module :) On of the best way to know how things works, and see good python code. Commented Jul 21, 2017 at 13:22
  • Definitely a thorough answer from Louis, regarding the last part, if you're interested in communication between 2 programs (whatever their language), you might want to have a look at brokers implementing the AMQP protocol (rabbitmq, zeromq...etc). Commented Jul 21, 2017 at 14:34
  • message brokers are a ton slower than straight file descriptors consumption whenever you need to perform thousands of cuntion calls per seconds... Hence the interest in python's data format interoperability (I suspect it 'pickles' the object...) Commented Jul 21, 2017 at 16:05

1 Answer 1

22
+25

Why is q.put() not blocking??

mutiprocessing.Queue creates a pipe which blocks if the pipe is already full. Of course writing more than the pipe capacity will cause the write call to block until the reading end has cleared enough data. Ok, so if the pipe blocks when its capacity is reached, why is q.put() not also blocking once the pipe is full? Even the first call to q.put() in the example should fill up the pipe, and everything should block there, no?

No, it does not block, because the multiprocessing.Queue implementation decouples the .put() method from writes to the pipe. The .put() method enqueues the data passed to it in an internal buffer, and there is a separate thread which is charged with reading from this buffer and writing to the pipe. This thread will block when the pipe is full, but it will not prevent .put() from enqueuing more data into the internal buffer.

The implementation of .put() saves the data to self._buffer and note how it kicks off a thread if there is not one already running:

def put(self, obj, block=True, timeout=None):
    assert not self._closed
    if not self._sem.acquire(block, timeout):
        raise Full

    with self._notempty:
        if self._thread is None:
            self._start_thread()
        self._buffer.append(obj)
        self._notempty.notify()

The ._feed() method is what reads from self._buffer and feeds the data to the pipe. And ._start_thread() is what sets up a thread that runs ._feed().

How can I limit queue size?

If you want to limit how much data can be written into a queue, I don't see a way to do it by specifying a number of bytes but you can limit the number of items that are stored in the internal buffer at any one time by passing a number to multiprocessing.Queue:

q = multiprocessing.Queue(2)

When I use the parameter above, and use your code, q.put() will enqueue two items, and will block on the third attempt.

Are Python piped communications inter-operable with other non-Python processes?

It depends. The facilities provided by the multiprocessing module are not easily interoperable with other languages. I expect it would be possible to make multiprocessing interoperate with other languages, but achieving this goal would be a major enterprise. The module is written with the expectation that the processes involved are running Python code.

If you look at more general methods, then the answer is yes. You could use a socket as a communication pipe between two different processes. For instance, a JavaScript process that reads from a named socket:

var net = require("net");
var fs = require("fs");

sockPath = "/tmp/test.sock"
try {
    fs.unlinkSync(sockPath);
}
catch (ex) {
    // Don't care if the path does not exist, but rethrow if we get
    // another error.
    if (ex.code !== "ENOENT") {
        throw ex;
    }
}

var server = net.createServer(function(stream) {
  stream.on("data", function(c) {
    console.log("received:", c.toString());
  });

  stream.on("end", function() {
    server.close();
  });
});

server.listen(sockPath);

And a Python process that writes to it:

import socket
import time

sockfile = "/tmp/test.sock"

conn = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
conn.connect(sockfile)

count = 0
while True:
    count += 1
    conn.sendall(bytes(str(count), "utf-8"))
    time.sleep(1)

If you want to try the above, you need to start the JavaScript side first so that the Python side has something to write to. This is a proof-of-concept. A complete solution would need more polish.

In order to pass complex structures from Python to other languages, you'll have to find a way to serialize your data in a format that can be read on both sides. Pickles are unfortunately Python-specific. I generally pick JSON whenever I need to serialize between languages, or use an ad-hoc format if JSON won't do it.

Sign up to request clarification or add additional context in comments.

6 Comments

Your answer is insightful. I upvote for this reason. A second part of the questions was about interoperating the queue from other processes built upon other languages. Eg: pass a string from/to Python to Javascript processes using similar compatible put() and get() operations. Or is Python using a shady format to pass objects? Do not hesitate to update your answer for another section.
Sorry. I did indeed miss that bit. I've edited my answer to address that.
This sounds rather good to me, thank you. I will let some time pass and approve the answers later for fairness, if anyone is willing to share his own perspective as an answer.
I missed the bounty deadline, otherwise I would have validated. Sorry for inconvenience, I was busy IRL. Your answer is very good ans so I am sure it has worth more value than a few rep points :-)
This will limit the queue to two items, but still does not limit the size of the queue.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.