0

My requirement is to read data from csv file along with header and create the same structure in Google Dat a Store using Python with Dataflow. I had tried creating a sample code as below.

My Sample CSV is below,

First Name,Last Name,Date of Birth
Tom,Cruise,"July 3, 1962"
Bruce,Willis,"March 19, 1955"
Morgan,Freeman,"June 1, 1937"
John,Wayne,"May 26, 1907"

My pyhton 2.7 code snippet is as below

import csv
import datetime
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.datastore.v1.datastoreio import WriteToDatastore
from google.cloud.proto.datastore.v1 import entity_pb2
from googledatastore import helper as datastore_helper
from apache_beam.io.filesystems import FileSystems
from apache_beam import pvalue


class CSVtoDict(beam.DoFn):
    """Converts line into dictionary"""
    def process(self, element, header):
        rec = ""
        element = element.encode('utf-8')
        try:
            for line in csv.reader([element]):
                rec = line

            if len(rec) == len(header):
                data = {header.strip(): val.strip() for header, val in zip(header, rec)}
                return [data]
            else:
                logging.info("row contains bad data")
        except Exception:
            pass

class CreateEntities(beam.DoFn):
    """Creates Datastore entity"""
    def process(self, element):
        entity = entity_pb2.Entity()
        sku = int(element.pop('sku'))
        element[1] = float(element[1])
        element['salePrice'] = float(element['salePrice'])
        element['name'] = unicode(element['name'].decode('utf-8'))
        element['type'] = unicode(element['type'].decode('utf-8'))
        element['url'] = unicode(element['url'].decode('utf-8'))
        element['image'] = unicode(element['image'].decode('utf-8'))
        element['inStoreAvailability'] = unicode(element['inStoreAvailability'])

        datastore_helper.add_key_path(entity.key, 'Productx', sku)
        datastore_helper.add_properties(entity, element)
        return [entity]


class ProcessOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_value_provider_argument(
                '--input',
                dest='input',
                type=str,
                required=False,
                help='Input file to read. This can be a local file or a file in a Google Storage Bucket.')

def read_header_from_filename(filename):
  # note that depending on your newline character/file encoding, this may need to be modified
  file_handle = FileSystems.open(filename)  
  header = file_handle.readline()
  return header.split(',')

process_options = PipelineOptions().view_as(ProcessOptions)
p = beam.Pipeline(options=process_options)
# Create PCollection containing header line
header = (p
          | beam.Create(process_options.input)
          | beam.Map(read_header_from_filename))

def dataflow(argv=None):
    process_options = PipelineOptions().view_as(ProcessOptions)
    p = beam.Pipeline(options=process_options)

    (p
    | 'Reading input file' >> beam.io.ReadFromText(process_options.input)
    | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
    | 'Create entities' >> beam.ParDo(CreateEntities())
    | 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
    )    
    p.run().wait_until_finish()

if __name__ == '__main__':
    logging.getLogger().setLevel(logging.INFO)
    dataflow()

I can able to load entities using data flow however i would like to parse CSV file from Header and then rows instead of hard coding the values in class CreateEntities and write the same in Data Store Entities.

Basically upload the same CSV file which is given as input to Data flow job with the rows. Can someone please help?

Required Output in Data Store for Key Actor:

First Name Last Name Date of Birth
Tom,Cruise "July 3, 1962"
Bruce,Willis "March 19, 1955"
Morgan,Freeman "June 1, 1937"
John,Wayne "May 26, 1907"

Edit: I had incorporated the code given by u and getting the below error now. I am using Python 2.7 and imported the respective libraries.Sorry i am very new to Python.

Error:
  File "/usr/lib/python2.7/runpy.py", line 174, in _run_module_as_main
    "__main__", fname, loader, pkg_name)
  File "/usr/lib/python2.7/runpy.py", line 72, in _run_code
    exec code in run_globals
  File "/home/gurusankar_p/upload-data-datastore-dataflow/upload2.py", line 70, in <module>
    | beam.Map(read_header_from_filename))
  File "/usr/local/lib/python2.7/dist-packages/apache_beam/transforms/core.py", line 2423, in __init__
    self.values = tuple(values)
TypeError: 'RuntimeValueProvider' object is not iterable

Thanks,GS

1

1 Answer 1

3

Apache Beam parallelizes processing of your data by splitting up reading of the file across many workers which means that most workers will never read the header line at all.

What you want to do is join the rows read with the header line. Since the header line is a small amount of data, you can read it in as a separate PCollection and pass it as a side input to CSVtoDict.

Some example code to read your header line:

def read_header_from_filename(filename):
  # note that depending on your newline character/file encoding, this may need to be modified
  file_handle = FileSystems.open(filename)  
  header = file_handle.readline()
  return header.split(',')

# Create PCollection containing header line
header = (p
          | beam.Create(process_options.input) 
          | beam.Map(read_header_from_filename))

Your pipeline construction code becomes:

    (p 
    | 'Reading input file' >> beam.io.ReadFromText(process_options.input)
    | 'Converting from csv to dict' >> beam.ParDo(CSVtoDict(), pvalue.AsSingleton(header))
    | 'Create entities' >> beam.ParDo(CreateEntities())
    | 'Write entities into Datastore' >> WriteToDatastore('isc-am-poc')
     )    
    p.run().wait_until_finish()
Sign up to request clarification or add additional context in comments.

7 Comments

Hi Lukasz, thanks for ur help. I had made some edit above in the original code and getting error TypeError: 'RuntimeValueProvider' object is not iterable, can u kindly look in this and suggest?
Try moving the logic that I supplied into your dataflow() method.
Did you ever figure this out? The error you're seeing is because you're trying to use a template parameter as a collection with Create, however Create doesn't unpack those for you. I'm trying to do the same thing but it's looking like I'm going to have to implement my own version of Create that does so. This sort of thing exists in the Java SDK
You can chain a dummy create of a single value to a ParDo that outputs the templated value.
Thanks Preston & Lukasz, still i am with same error even after moving the header part inside dataflow function. Can someone please help me here?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.