1

I'm building an Airflow DAG where some of the steps should do ML/NLP processing.

I have a service class that loads NLP model in constructor. E.g.:

class SentenceService:
    def __init__(self, model: str = "abc"):
        self.nlp = spacy.load(model)

I'd like to reuse this service instance so that the tasks execute faster. Some of the models are very heavy and take disproportional amount of time to load.

I'm using processed_texts = process_text.expand(texts=texts) to create task for each text to process so that I can distribute the processing horizontally across multiple workers.

What I tried is to instantiate the service globally in the DAG - as a module-level/top-level variable) According to the docs, this seems not advisable anyway (https://airflow.apache.org/docs/apache-airflow/2.6.0/best-practices.html#top-level-python-code).

I also tried to implement typical Python-ish singleton for the service, so that whenever it's used in the tasks, same instance would be returned.

I also understand that if I won't run the processing in parallel (.expand above), I can have one service per batch - but only on one worker.

None of that led to what I want - quite obviously to me now :), tasks execute separately so DAG "runs fresh again", so it makes sense. Neither the global variable, nor Singleton is shared across processes.

Is there any technique to handle this in Airflow?

0

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.