1717import java .security .GeneralSecurityException ;
1818import java .util .Iterator ;
1919import java .util .List ;
20+ import java .util .concurrent .TimeUnit ;
21+
22+ import org .apache .commons .math3 .stat .descriptive .DescriptiveStatistics ;
23+ import org .slf4j .Logger ;
24+ import org .slf4j .LoggerFactory ;
2025
2126import com .google .cloud .dataflow .sdk .transforms .Aggregator ;
2227import com .google .cloud .dataflow .sdk .transforms .DoFn ;
28+ import com .google .cloud .dataflow .sdk .transforms .Max ;
2329import com .google .cloud .dataflow .sdk .transforms .PTransform ;
2430import com .google .cloud .dataflow .sdk .transforms .ParDo ;
2531import com .google .cloud .dataflow .sdk .transforms .Sum ;
2632import com .google .cloud .dataflow .sdk .values .PCollection ;
2733import com .google .cloud .genomics .utils .GenomicsFactory ;
2834import com .google .cloud .genomics .utils .ShardBoundary ;
2935import com .google .cloud .genomics .utils .grpc .VariantStreamIterator ;
36+ import com .google .common .base .Stopwatch ;
3037import com .google .genomics .v1 .StreamVariantsRequest ;
3138import com .google .genomics .v1 .StreamVariantsResponse ;
3239import com .google .genomics .v1 .Variant ;
3744public class VariantStreamer extends
3845PTransform <PCollection <StreamVariantsRequest >, PCollection <Variant >> {
3946
47+ private static final Logger LOG = LoggerFactory .getLogger (VariantStreamer .class );
4048 protected final GenomicsFactory .OfflineAuth auth ;
4149 protected final ShardBoundary .Requirement shardBoundary ;
4250 protected final String fields ;
@@ -63,25 +71,33 @@ public PCollection<Variant> apply(PCollection<StreamVariantsRequest> input) {
6371 private class RetrieveVariants extends DoFn <StreamVariantsRequest , List <Variant >> {
6472
6573 protected Aggregator <Integer , Integer > initializedShardCount ;
66- protected Aggregator <Long , Long > itemCount ;
6774 protected Aggregator <Integer , Integer > finishedShardCount ;
75+ protected Aggregator <Long , Long > shardTimeMaxSec ;
76+ DescriptiveStatistics stats ;
6877
6978 public RetrieveVariants () {
7079 initializedShardCount = createAggregator ("Initialized Shard Count" , new Sum .SumIntegerFn ());
71- itemCount = createAggregator ("Number of variant lists" , new Sum .SumLongFn ());
7280 finishedShardCount = createAggregator ("Finished Shard Count" , new Sum .SumIntegerFn ());
81+ shardTimeMaxSec = createAggregator ("Maximum Shard Processing Time (sec)" , new Max .MaxLongFn ());
82+ stats = new DescriptiveStatistics (500 );
7383 }
7484
7585 @ Override
7686 public void processElement (ProcessContext c ) throws IOException , GeneralSecurityException , InterruptedException {
7787 initializedShardCount .addValue (1 );
88+ shardTimeMaxSec .addValue (0L );
89+ Stopwatch stopWatch = Stopwatch .createStarted ();
7890 Iterator <StreamVariantsResponse > iter = new VariantStreamIterator (c .element (), auth , shardBoundary , fields );
7991 while (iter .hasNext ()) {
8092 StreamVariantsResponse variantResponse = iter .next ();
8193 c .output (variantResponse .getVariantsList ());
82- itemCount .addValue (1L );
8394 }
95+ stopWatch .stop ();
96+ shardTimeMaxSec .addValue (stopWatch .elapsed (TimeUnit .SECONDS ));
97+ stats .addValue (stopWatch .elapsed (TimeUnit .SECONDS ));
8498 finishedShardCount .addValue (1 );
99+ LOG .info ("Shard Duration in Seconds - Min: " + stats .getMin () + " Max: " + stats .getMax () +
100+ " Avg: " + stats .getMean () + " StdDev: " + stats .getStandardDeviation ());
85101 }
86102 }
87103
0 commit comments