0

I wrote a large program that processes a big data set of 70k documents. Each docu takes about 5 seconds, hence I want to parallelize the procedure. The code doesn't work and I can't make sense of why. I tried it with one worker only, to make sure it's not a memory issue.

Code:

from doc_builder import DocBuilder
from glob import glob
from tqdm import tqdm
import threading

path = "/home/marcel/Desktop/transformers-master/examples/token-classification/CORD-19-research-challenge/document_parses/test_collection"
paths = [x for x in glob(path + '/**/*.json', recursive=True)]
workers_amount = 1

def main(paths):
    doc_builder = DocBuilder()
    for path in tqdm(paths):
        data, doc = doc_builder.get_doc(path)
        doc_builder.write_doc(path, data, doc)

threads = []
for i in range(workers_amount):
    worker_paths = paths[int((i-1/workers_amount)*len(paths)):int((i/workers_amount)*len(paths))]
    t = threading.Thread(target=main, args=[worker_paths])
    t.start()
    threads.append(t)

for t in threads:
    t.join()

It just randomly finishes executing after a while. CPU threads do spike when starting but besides that nothing really happens. Is there something wrong with the code? If thats important I am running this on a Ryzen 7 3700X (so 16 threads should be possible).

/edit: At first I thought the problem might be that each thread initalizes a large PyTorch model and a trainer like this:

self.tokenizer = AutoTokenizer.from_pretrained(self.pretrained_dir) #, cache_dir=cache_dir)
self.splitter = spacy.load(cd_dir + "/en_core_sci_md-0.2.4/en_core_sci_md/en_core_sci_md-0.2.4")
self.model = AutoModelForTokenClassification.from_pretrained(self.pretrained_dir, config=self.config_dir) #,cache_dir=cache_dir)
self.model.load_state_dict(torch.load(self.model_dir))
self.trainer = Trainer(model=self.model, args=TrainingArguments(output_dir=self.output_dir))

This could be shared amongst the threads, so I don't need to initialize a new one every time (initializing those is very costly), but as I said, I tried using 1 worker, so really, that shouldn't be the problem, right?

6
  • 1
    What happens that is not the desired behaviour? Is an error thrown? Commented May 26, 2020 at 17:44
  • It just doesn't do anything.. I put a print statement behind doc_builder = DocBuilder() but it doesn't even get there! Commented May 26, 2020 at 17:52
  • Nevermind, it just got there and printed it out 3 times.. I guess I have to go deeper.. Commented May 26, 2020 at 17:52
  • So, it never enters the for loop.. it executes before that. Commented May 26, 2020 at 17:58
  • You are probably running into issues with the GIL. Threads in Python can only run one at a time. It bounces back and forth between threads for the illusion of concurrency. Try using multiprocessing instead. Commented May 26, 2020 at 17:59

1 Answer 1

1

Can you maybe try it using the ThreadPoolExecutor? Takes away the headache of managing the pool. Syntax may not be 100% right. I never used tqdm.

from doc_builder import DocBuilder
from glob import glob
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor

path = "/home/marcel/Desktop/transformers-master/examples/token-classification/CORD-19-research-challenge/document_parses/test_collection"
files = [x for x in glob(path + '/**/*.json', recursive=True)]

WORKERS_AMOUNT = 16

def main(file):
    doc_builder = DocBuilder()
    data, doc = doc_builder.get_doc(tqdm(file))
    doc_builder.write_doc(file, data, doc)

threads = []
with ThreadPoolExecutor(max_workers=WORKERS_AMOUNT) as executor:
    for file in files:
        threads.append(executor.submit(main, file))
Sign up to request clarification or add additional context in comments.

3 Comments

Will try that next, I may have found a bug in my program.
do you know why my CPU is not fully being used though? I am using 16 workers, because I thought thats how many threads my CPU can run. However, it's only at ~25% workload.
Thats because of the the GIL (Global Interpreter Lock). It basically works as designed. If you want to utilise all cores, look into multiprocessing. I am not experienced with it´s usage though and don´t know if you can combine the multiprocessing with the ThreadPoolExecutor.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.