Applying multiple GroupByKey transforms in a DataFlow job causing windows to be applied multiple times -
we have dataflow job subscribed pubsub stream of events. have applied sliding windows of 1 hour 10 minute period. in our code, perform count.perelement counts each element , want run through top.of top n elements.
at high level: 1) read pubsub io 2) window.into(slidingwindows.of(windowsize).every(period)) // windowsize = 1 hour, period = 10 mins 3) count.perelement 4) top.of(n, comparisonfunction)
what we're seeing window being applied twice data seems watermarked 1 hour 40 mins (instead of 50 mins) behind current time. when dig job graph on dataflow console, see there 2 groupbykey operations being performed on data: 1) part of count.perelement. watermark on data step onwards 50 minutes behind current time expected. 2) part of top.of (in combine.perkey). watermark on seems 50 minutes behind current time. thus, data in steps below watermarked 1:40 mins behind.
this manifests in downstream graphs being 50 minutes late.
thus seems every time groupbykey applied, windowing seems kick in afresh.
is expected behavior? anyway can make windowing applicable count.perelement , turn off after that?
our code on lines of:
final int top = 50; final duration windowsize = standardminutes(60); final duration windowperiod = standardminutes(10); final slidingwindows window = slidingwindows.of(windowsize).every(windowperiod); options.setworkermachinetype("n1-standard-16"); options.setworkerdisktype("compute.googleapis.com/projects//zones//disktypes/pd-ssd"); options.setjobname(applicationname); options.setstreaming(true); options.setrunner(dataflowpipelinerunner.class); final pipeline pipeline = pipeline.create(options); // events final string eventtopic = "projects/" + options.getproject() + "/topics/eventlog"; final pcollection<string> events = pipeline .apply(pubsubio.read.topic(eventtopic)); // create toplist final pcollection<list<kv<string, long>>> toplist = events .apply(window.into(window)) .apply(count.perelement()) //as eventids repeated // top n top events .apply(top.of(top, orderbyvalue()).withoutdefaults());
windowing not applied each time there groupbykey. lag seeing result of 2 issues, both of should resolved.
the first data buffered later windows @ first group key preventing watermark advancing, meant earlier windows getting held @ second group key. has been fixed in latest versions of sdk.
the second sliding windows causing amount of data increase significantly. new optimization has been added uses combine (you mentioned count , top) reduce amount of data.
Comments
Post a Comment