I am subscribing from one topic and contains different event types and they pass in with different attributes.
After I read the element, based on their attribute, I need to move them to different places. This is the sample code look like:
Options options =PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
"ReadType1",
EventIO.<T>readJsons()
.of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply(
Filter.by(
new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(T input) {
return input.attributes.get("type").equals("type1");
}
}))
.apply(
"WindowMetrics",
Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
.apply("AsJsons", AsJsons.of(T.class))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getRunOutputDirectory(),
options.getUseCurrentDateForOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(
NestedValueProvider.of(
options.getTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
pipeline.apply("ReadType2",
EventIO.<T>readJsons().of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply(Filter.by(new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(Event input) {
return input.attributes.get("type").equals("type2");
}
})).apply( "WindowMetrics",
Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
.apply("AsJsons", AsJsons.of(T.class))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getBatchOutputDirectory(),
options.getUseCurrentDateForOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(
NestedValueProvider.of(
options.getTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
pipeline.apply("ReadType3",
EventIO.<Event>readJsons().of(T.class)
.withPubsubTimestampAttributeName(null)
.withOptions(options))
.apply(Filter.by(new SerializableFunction<T, Boolean>() {
@Override
public Boolean apply(T input) {
return input.attributes.get("type").equals("type3");
}
})).apply( "WindowMetrics",
Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowDuration()))))
.apply("AsJsons", AsJsons.of(T.class))
.apply(
"Write File(s)",
TextIO.write()
.withWindowedWrites()
.withNumShards(options.getNumShards())
.to(
new WindowedFilenamePolicy(
options.getCustomIntervalOutputDirectory(),
options.getUseCurrentDateForOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(
NestedValueProvider.of(
options.getTempDirectory(),
(SerializableFunction<String, ResourceId>)
input -> FileBasedSink.convertToFileResourceIfPossible(input))));
pipeline.run();
Basically I read an event and filter them on their attribute and write the file. The job failed in dataflow as Workflow failed. Causes: The pubsub configuration contains errors: Subscription 'sub-name' is consumed by multiple stages, this will result in undefined behavior.
So what will be the appropriate way to split the pipeline within the same job?
I tried Pipeline1, Pipeline2,Pipeline3 and it end up need to multiple job name to run multiple pipeline, I am not sure that should the right way to do it.