1

I hope you can help me.

I have a msgList, containing msg objects, each one having the pos and content attributes. Then I have a function posClassify, that creates a SentimentClassifier object, that iterates thru this msgList and does msgList[i].pos = clf.predict(msgList[i].content), being clf an instance of SentimentClassifier.

def posClassify(msgList):
    clf = SentimentClassifier()
    for i in tqdm(range(len(msgList))):
        if msgList[i].content.find("omitted") == -1:
            msgList[i].pos = clf.predict(msgList[i].content)

And what I wanted is to compute this using multiprocessing. I have read that you create a pool, and call a function with a list of the arguments you want to pass this function, and thats it. I imagine that that function must be something like saving an image or working on different memory spaces, and not like mine, where you want to modify that same msg object, and also, having to use that SentimentClassifier object (which takes about 10 seconds or so to initialize).

My thoughts where creating cpu_cores-1 processes, each one using an instance of SentimentClassifier, and then each process starts consuming that msg list with its own classifier, but I can't work out how to approach this. I also thought of creating threads with binary semaphores, each one calling its own classifier, and then waiting the semaphore to update the pos value in the msg object, but still cant figure it out.

1
  • Have you actually tried what you think you should be doing? What problems did you encounter? Commented Feb 21, 2020 at 8:30

1 Answer 1

1

You can use ProcessPoolExecutor from futures module in Python.

The ProcessPoolExecutor is

An Executor subclass that executes calls asynchronously using a pool of at most max_workers processes. If max_workers is None or not given, it will default to the number of processors on the machine

You can find more at Python docs.

Here, is the sample code of achieving the concurrency assuming that each msgList[i] is independent of msgList[j] when i != j,

from concurrent import futures

def posClassify(msg, idx, clf):
    return idx, clf.predict(msg.content)

def classify(msgList):
    clf = SentimentClassifier()

    calls = []

    executor = futures.ProcessPoolExecutor(max_workers=4)
    for i in tqdm(range(len(msgList))):
        if msgList[i].content.find("omitted") == -1:
            call = executor.submit(posClassify, msgList[i], i, clf)
            calls.append(call)

    # wait for all processes to finish
    executor.shutdown()

    # assign the result of individual calls to msgList[i].pos
    for call in calls:
        result = call.result()
        msgList[result[0]].pos = result[1]

In order to execute the code, just call the classify(msgList) function.

Sign up to request clarification or add additional context in comments.

6 Comments

Since msg in the subprocess is likely a copy (unless explicitly made multiprocess aware), the results don't affect the original msgList in the main process. It will discard all results.
The following error is thrown: TypeError: posClassify() missing 1 required positional argument: 'idx'
Thank you for the answer! but, the problem comes with clf = SentimentClassifier, that takes around 10 seconds each time its instanced. The solution would be creating 4 (n of processes) of this clf, and then each process work with its own instance of the classifier, but I cant manage how to reach this solution, as I dont know how to work correctly with processes and those instances.
Well in that case you could just create one instance of classifier and pass it to the posClassify method. What do you think?
What's the average running time of clf.predict method?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.