I'm very new at apache beam. My scenario is like below,
I have multiple events which in json format. In each event, event_time column indicates that event's create time and I'm calculating their create date using event_time. I want to write this events under their date partitions seperately. My code is like
import apache_beam as beam
from apache_beam.io import WriteToText
from apache_beam.pvalue import TaggedOutput
import json
import time
class EventFormatter(beam.DoFn):
def process(self, element, *args, **kwargs):
tmp_dict = {}
for i in range(len(element['properties'])):
tmp_dict['messageid'] = element['messageid']
tmp_dict['userid'] = element['userid']
tmp_dict['event_time'] = element['event_time']
tmp_dict['productid'] = element['properties'][i]['productid']
yield tmp_dict
class DateParser(beam.DoFn):
def process(self, element, *args, **kwargs):
key = time.strftime('%Y-%m-%d', time.localtime(element.get('event_time')))
print(key, element)
yield TaggedOutput(time.strftime('%Y-%m-%d', time.localtime(element.get('event_time'))), element)
with beam.Pipeline() as pipeline:
events = (
pipeline
| 'Sample Events' >> beam.Create([
{"messageid": "6b1291ea-e50d-425b-9940-44c2aff089c1", "userid": "user-78", "event_time": 1598516997, "properties": [{"productid": "product-173"}]},
{"messageid": "b8b14eb3-8e39-42a3-9528-a323b10a7686", "userid": "user-74", "event_time": 1598346837, "properties": [{"productid": "product-143"},{"productid": "product-144"}]}
])
| beam.ParDo(EventFormatter())
| beam.ParDo(DateParser())
)
output = events | "Parse Date" >> WriteToText('/Users/oguz.aydin/Desktop/event_folder/date={}/'.format(....))
I'm not able to find how i should complete the format block. When I run code to print result, it gives
('2020-08-27', {'productid': 'product-173', 'userid': 'user-78', 'event_time': 1598516997, 'messageid': '6b1291ea-e50d-425b-9940-44c2aff089c1'})
('2020-08-25', {'productid': 'product-143', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})
('2020-08-25', {'productid': 'product-144', 'userid': 'user-74', 'event_time': 1598346837, 'messageid': 'b8b14eb3-8e39-42a3-9528-a323b10a7686'})
as example. I want to write 2 events under the date=2020-08-25 folder and other one date=2020-08-27.
At the end of the day I want to write each event under their create date folder.
How can i do this?
Thanks for your help,
Oguz.