After reading data from an unbounded source like pub/sub, I'm applying windowing. I need to write all the records belong to a window to a separate file. I found this in Java but couldn't find anything in python.
1 Answer
There are no details about your use case in the question so you might need to adapt some parts of the following example. One way to do it is to group elements using as key the window that they belong to. Then, we leverage filesystems.FileSystems.create to control how do we want to write the files.
Here I will be using 10s windows and some dummy data where events are separated 4s each. Generated with:
data = [{'event': '{}'.format(event), 'timestamp': time.time() + 4*event} for event in range(10)]
We use the timestamp field to assign element timestamp (this is just to emulate Pub/Sub events in a controlled way). We window the events, use the windowing info as the key, group by key and write the results to the output folder:
events = (p
| 'Create Events' >> beam.Create(data) \
| 'Add Timestamps' >> beam.Map(lambda x: beam.window.TimestampedValue(x, x['timestamp'])) \
| 'Add Windows' >> beam.WindowInto(window.FixedWindows(10)) \
| 'Add Window Info' >> beam.ParDo(AddWindowingInfoFn()) \
| 'Group By Window' >> beam.GroupByKey() \
| 'Windowed Writes' >> beam.ParDo(WindowedWritesFn('output/')))
Where AddWindowingInfoFn is pretty straightforward:
class AddWindowingInfoFn(beam.DoFn):
"""output tuple of window(key) + element(value)"""
def process(self, element, window=beam.DoFn.WindowParam):
yield (window, element)
and WindowedWritesFn writes to the path that we specified in the pipeline (output/ folder in my case). Then, I use the window info for the name of the file. For convenience, I convert the epoch timestamps to human-readable dates. Finally, we iterate over all the elements and write them to the corresponding file. Of course, this behavior can be tuned at will in this function:
class WindowedWritesFn(beam.DoFn):
"""write one file per window/key"""
def __init__(self, outdir):
self.outdir = outdir
def process(self, element):
(window, elements) = element
window_start = str(window.start.to_utc_datetime()).replace(" ", "_")
window_end = str(window.end.to_utc_datetime()).replace(" ", "_")
writer = filesystems.FileSystems.create(self.outdir + window_start + ',' + window_end + '.txt')
for row in elements:
writer.write(str(row)+ "\n")
writer.close()
This will write elements belonging to each window to a different file. In my case there are 5 different ones
$ ls output/
2019-05-21_19:01:20,2019-05-21_19:01:30.txt
2019-05-21_19:01:30,2019-05-21_19:01:40.txt
2019-05-21_19:01:40,2019-05-21_19:01:50.txt
2019-05-21_19:01:50,2019-05-21_19:02:00.txt
2019-05-21_19:02:00,2019-05-21_19:02:10.txt
The first one contains only element 0 (this will vary between executions):
$ cat output/2019-05-21_19\:01\:20\,2019-05-21_19\:01\:30.txt
{'timestamp': 1558465286.933727, 'event': '0'}
The second one contains elements 1 to 3 and so on:
$ cat output/2019-05-21_19\:01\:30\,2019-05-21_19\:01\:40.txt
{'timestamp': 1558465290.933728, 'event': '1'}
{'timestamp': 1558465294.933728, 'event': '2'}
{'timestamp': 1558465298.933729, 'event': '3'}
Caveat from this approach is that all elements from the same window are grouped into the same worker. This would happen anyway if writing to a single shard or output file as per your case but, for higher loads, you might need to consider larger machine types.
Full code here