I am new to using Apache Beam and Dataflow. I would like to use a data-set as an input for a function that will be deployed in parallel using Dataflow. Here is what I have so far:
import os
import apache_beam as beam
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '[location of json service credentails]'
dataflow_options = ['--project=[PROJECT NAME]',
'--job_name=[JOB NAME]',
'--temp_location=gs://[BUCKET NAME]/temp',
'--staging_location=gs://[BUCKET NAME]/stage']
options = PipelineOptions(dataflow_options)
gcloud_options = options.view_as(GoogleCloudOptions)
options.view_as(StandardOptions).runner = 'dataflow'
with beam.Pipeline(options=options) as p:
new_p = p | beam.io.ReadFromText(file_pattern='[file location].csv',
skip_header_lines=1)
| beam.ParDo([Function Name]())
The CSV file will have 4 columns with n rows. Each row represents an instance and each column represents a parameter of that instance. I would like to slip all of the parameters of an instance into a beam.DoFn so I can run it on multiple machines with the help of dataflow.
How do I get a write the function to take multiple arguments from a PCollection? The function below is how I imagine it would go.
class function_name(beam.DoFn):
def process(self, col_1, col_2, col_3, col_4):
function = function(col_1) + function(col_2) + function(col_3) + function(col_4)
return [function]
PCollectionconsisting ofelement, in your example the csv file is read line-by-line and each line will be an element that will be mapped implicitly to your callable inside the ParDo step. You don't need multiple arguments in yourprocessmethod, you just need a single argument, which in this case will be a string e.g. "col1_value, col2_value, col3_value, col4_value" which you will need to split and process and return as a new single element. If you want to return multiple values, use a tuple, dict or some other collection as your return element.