0

I have this code which tags outputs based on some data of the input file:

class filters(beam.DoFn):
 def process(self, element): 
    data = json.loads(element)
    yield TaggedOutput(data['EventName'],element)

I need help with the next step of writting the resulting tagged outputs:

tagged = lines | beam.ParDo(filters()).with_outputs('How can I dinamiclly acces this tags?')

So as you can see when I do '.with_outputs()' I dont know how many and what names are the taggs going to be so I can´t predict things like:

tag1 = tagged.tag1

Thank you for your help

UPDATE: this wont work cause with.outputs() is empty

tagged_data= lines | 'tagged data by key' >>  
beam.ParDo(filters()).with_outputs()

for tag in tagged_data:
    print('something')

output: WARNING:apache_beam.options.pipeline_options:Discarding unparseable args

but this will work

tagged_data= lines | 'tagged data by key' >>  
beam.ParDo(filters()).with_outputs('tag1','tag2')

for tag in tagged_data:
    print('something')

output:
  something
  something

2 Answers 2

2

Apache Beam pipeline execution is deferred--a DAG of operations to execute is built up and nothing actually happens until you run your pipeline. (In Beam Python, this is typically implicitly invoked at the end of a with beam.Pipeline(...) block.). PCollections don't actually contain data, just instructions for how the data should be computed.

In particular, this means that when you write

tagged = lines | beam.ParDo(filters()).with_outputs(...)

tagged doesn't actually contain any data, rather it contains references to the PCollections that will be produced (and further processing steps can be added to them). The data in lines has not actually been computed or read yet so you can't (during pipeline construction) figure out what the set of outputs is.

It's not clear what your end goal is from the question, but if you're trying to partition outputs, you may want to look into dynamic destinations.

Sign up to request clarification or add additional context in comments.

Comments

2

To get what you are trying to achieve, you would need to create a DoFn. You can use this example as a base:

from apache_beam.io.textio import _TextSink


class WriteEachKeyToText(beam.DoFn):
    def __init__(self, file_path_prefix=str):
        super().__init__()
        self.file_path_prefix = file_path_prefix

    def process(self, kv):
        key = kv[0]
        elements = kv[1]
        sink = _TextSink(self.file_path_prefix, file_name_suffix=f"{key}.json")

        writer = sink.open_writer("prefix", self.file_path_prefix)
        for e in elements:  # values
            writer.write(e)

Then, you can use it like this:

output_path = "/some/path/"
tagged_data | beam.ParDo(WriteEachKeyToText(output_path))

5 Comments

The problem is that I dont know the names neither the amount of side outputs im going to get so I cant predict the tags, and if I apply this to the tagged = lines | beam.ParDo(filters()).with_outputs('How can I dinamiclly acces this tags?') I will get this warning and nothing is going to happen WARNING:apache_beam.options.pipeline_options:Discarding unparseable args
Give me a couple mins to try and find something that could help you :)
Ohh man I really appreciate that, if this can help you to help me I found that if I write the tags inside with_outputs() I can iterate the tagged_data but if I leave it empty I will get the warnning I mentioned previously, but again, I have to leave it empty because I wont Know wicht tags are going to be generated until the file provides that info,Im updating the post so you can see the the code of what I mean
What I can see on the function that you are using .with_outputs() only retrieves the tags that you are sending to your code, have you tried to use DoFn.Process so it can retrieve all the information
Im sorry Im kind of new to this, isn´t that what I am doing on the class filters(beam.DoFn): with yield TaggedOutput(data['EventName'],element) when I yield the element? Im retrieving the tag along with the element

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.