2

I'm very new at apache beam. My scenario is like below,

I have multiple events which in json format. In each event, event_time column indicates that event's create time and I'm calculating their create date using event_time. I want to write this events under their date partitions seperately. My code is like

import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.pvalue import TaggedOutput
import json
import time


class EventFormatter(beam.DoFn):

  def process(self, element, *args, **kwargs):
    tmp_dict = {}
    for i in range(len(element['properties'])):
        tmp_dict['messageid'] = element['messageid']
        tmp_dict['userid'] = element['userid']
        tmp_dict['event_time'] = element['event_time']
        tmp_dict['productid'] = element['properties'][i]['productid']

        yield tmp_dict


class DateParser(beam.DoFn):

    def process(self, element, *args, **kwargs):
        key = time.strftime('%Y-%m-%d', time.localtime(element.get('event_time')))
        print(key, element)
        yield TaggedOutput(time.strftime('%Y-%m-%d', time.localtime(element.get('event_time'))), element)


with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | 'Sample Events' >> beam.Create([
          {"messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1", "userid": "user-78", "event_time": 1598516997, "properties": [{"productid": "product-173"}]},
          {"messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686", "userid": "user-74", "event_time": 1598346837, "properties": [{"productid": "product-143"},{"productid": "product-144"}]}
        ])
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser())
  )


  output = events | "Parse Date" >> WriteToText('/Users/oguz.aydin/Desktop/event_folder/date={}/'.format(....))

I'm not able to find how i should complete the format block. When I run code to print result, it gives

('2020-08-27', {'productid': 'product-173', 'userid': 'user-78', 'event_time': 1598516997, 'messageid': '6b1291ea-e50d-425b-9940-44c2aff089c1'})
('2020-08-25', {'productid': 'product-143', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})
('2020-08-25', {'productid': 'product-144', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})

as example. I want to write 2 events under the date=2020-08-25 folder and other one date=2020-08-27.

At the end of the day I want to write each event under their create date folder.

How can i do this?

Thanks for your help,

Oguz.

2 Answers 2

3

Concretely, to write several elements per key, you can do something like

class WriteByKey(apache_beam.DoFn):
    def process(self, kvs):
         # All values with the same key will come in at once.
         key, values = kvs
         with beam.io.gcp.gcsio.GcsIO().open(f'gs://bucket/path/{key}.extension', 'w') as fp:
             for value in values:
                 fp.write(value)
                 fp.write('\n')

with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | ...
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser())
  )
  output = events | beam.GroupByKey() | beam.ParDo(WriteByKey())

Note that runners may need to retry elements on failure, so instead of writing directly to the output a safer approach would be to write to a temporary file and then atomically rename it on success, e.g.

class WriteByKey(apache_beam.DoFn):
    def process(self, kvs):
         # All values with the same key will come in at once.
         key, values = kvs
         nonce = random.randint(1, 1e9)
         path = f'gs://bucket/path/{key}.extension'
         temp_path = f'{path}-{nonce}'
         with beam.io.gcp.gcsio.GcsIO().open(temp_path, 'w') as fp:
             for value in values:
                 fp.write(value)
                 fp.write('\n')
         beam.io.gcp.gcsio.GcsIO().rename(temp_path, path)
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks a lot, works well! (strings need to be encoded before calling .write)
2

In your code, you're using multiple outputs. This is meant to connect the output of a DoFn (a ParDo) to another DoFn and this is static for the whole pipeline.

If you want to dump data into different files based on the content of you have, you must implement a DoFn just for writing.

Something like this:

class WriteByKey(apache_beam.DoFn):
   def process(self, kv):
        key, value = kv
        with beam.io.gcp.gcsio.GcsIO().open(f'gs://bucket/path/{key}.extension', 'a') as fp:
            fp.write(value)

     

You should change your DataParser DoFn to yield a tuple (date, value) instead of a TaggedOut, and change the pipeline to something like this:

with beam.Pipeline() as pipeline:
  events = (
      pipeline
      | 'Sample Events' >> beam.Create([
          {"messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1", "userid": "user-78", "event_time": 1598516997, "properties": [{"productid": "product-173"}]},
          {"messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686", "userid": "user-74", "event_time": 1598346837, "properties": [{"productid": "product-143"},{"productid": "product-144"}]}
        ])
      | beam.ParDo(EventFormatter())
      | beam.ParDo(DateParser()) | beam.ParDo(WriteByKey())
  )

2 Comments

thanks for your answer Iñigo, but i have a problem with your solution. In WriteByKey class, there is no append mode in beam.io.gcp.gcsio.GcsIO().open so I overwrite my events while writing. I can produce multiple file for each event but it is unproductive approach. Can you suggest another solution to handle this?
Yes, it throws an exception id thw file already exists.The best option is to add another step in the pipeline with beam.GroupByKey() see example

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.