0

Can we use for loop to create apache beam data flow pipeline dynamically? My fear is how for loop will behave in distributed environment when i am using it with data flow runner. I am sure this will work fine with direct runner

for example can I create pipelines dynamically like this:

with beam.Pipeline(options=pipeline_options) as pipeline:
        for p in cdata['tablelist']:
            i_file_path = p['sourcefile']
            schemauri = p['schemauri']
            schema=getschema(schemauri)
            dest_table_id = p['targettable']

            (   pipeline | "Read From Input Datafile" + dest_table_id >> beam.io.ReadFromText(i_file_path)
                         | "Convert to Dict" + dest_table_id >> beam.Map(lambda r: data_ingestion.parse_method(r))
                         | "Write to BigQuery Table" + dest_table_id >> beam.io.WriteToBigQuery('{0}:{1}'.format(project_name, dest_table_id),
                                                                            schema=schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
            )

3
  • 1
    Can you elaborate more on your use case? For loops shouldn't be used in the global level of the pipeline, it can be used inside transformation. Can you explain a little bit your use case? Commented Sep 10, 2022 at 18:25
  • 1
    You want to launch multiple Dataflow Jobs dynamically with a foreach or you want to generate the code of the pipeline dynamically ? Commented Sep 10, 2022 at 18:52
  • I have added one example please check and thanks for your response Commented Sep 11, 2022 at 12:29

2 Answers 2

3

Yes, this is totally legal, and lots of pipelines (especially ML ones) are constructed this way. Your looped pipeline construction above should work just fine on all runners.

You can think of a Beam pipeline has having two phases: construction and execution. The first phase, construction, happens entirely in the main program and can have arbitrary loops, control statements, etc. Behind the scenes, this builds up a DAG of deferred operations (such as reads, maps, etc.) to perform. If you have a loop, each iteration will simply append more operations to this graph. The only think you can't do in this phase is inspect the data (i.e. the contents of a PCollection) itself.

The second phase, execution, starts when pipeline.run() is invoked. (For Python, this is implicitly invoked on exiting the with block). At this point the pipeline graph (as constructed above), its dependencies, pipeline options, etc. are passed to a Runner which will actually execute the fully-specified graph, ideally in parallel.

This is covered a bit in the programming guide, though I agree it could be more clear.

Sign up to request clarification or add additional context in comments.

Comments

1

I think it's not possible.

You have many other solutions to do that.

If you have an orchestrator like Cloud Composer/Airflow or Cloud Workflows, you can put this logic inside this orchestrator, instantiate and launch a Dataflow job per element in the loop :

Solution 1, example with Airflow :

for p in cdata['tablelist']:
      i_file_path = p['sourcefile']
      schemauri = p['schemauri']
      dest_table_id = p['targettable']

      options = {
          'i_file_path': i_file_path,
          'dest_table_id': dest_table_id,
          'schemauri' : schemauri,
          ...
      }

      dataflow_task = DataflowCreatePythonJobOperator(
          py_file=beam_main_file_path,
          task_id=f'task_{dest_table_id}',
          dataflow_default_options=your_default_options,
          options=options,
          gcp_conn_id="google_cloud_default"
      )
      
      # You can execute your Dataflow jobs in parallel
      dataflow_task >> DummyOperator(task_id='END', dag=dag)

Solution 2, with a shell script :

for module_path in ${tablelist}; do
   # Options
   i_file_path = ...
   schemauri = ...
   dest_table_id = ...

   #Python command to execute the Dataflow job
   python -m your_module.main \
        --runner=DataflowRunner \
        --staging_location=gs://your_staging_location/ \
        --temp_location=gs://your_temp_location/ \
        --region=europe-west1 \
        --setup_file=./setup.py \
        --i_file_path=$i_file_path \
        --schemauri=$schemauri \
        --dest_table_id=$dest_table_id

In this case Dataflow jobs are executed in sequential.

If you have too many files and Dataflow jobs to launch, you can think about another solution. With a shell script or a cloud function you can get all the needed files and rename them as expected (with metadata on filename), move them in a separated object in GCS.

Then in a single Dataflow job :

  • read all the previous files via a pattern
  • Parse the metadata from filename like schemauri and dest_table_id
  • apply the map operation in the job on the current element
  • write the result to Bigquery

If you don't have a huge amount of files, the two first solutions are simplers.

4 Comments

thanks for your response Mazlum Tosun, I just want to understand why ? What kind of issues we can run into?
I proposed you other solutions because it’s not possible to generate multiple pipeline in a single Dataflow job.
This is actually fine to do.
Ok thanks @robertwb, I didn’t know that 🙏🏻 But I think it can be better if this logic is put outside of the pipeline if there are no many jobs to launch (DAG orchestrator for example).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.