Skip to content
This repository was archived by the owner on Oct 29, 2023. It is now read-only.

Commit efd9219

Browse files
committed
Merge pull request #41 from deflaux/master
Add Genomics API counters for Dataflow UI display.
2 parents 41953b1 + cb269a9 commit efd9219

13 files changed

+63
-186
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@
114114
<dependency>
115115
<groupId>com.google.apis</groupId>
116116
<artifactId>google-api-services-genomics</artifactId>
117-
<version>v1beta2-rev9-1.19.0</version>
117+
<version>v1beta2-rev25-1.19.1</version>
118118
<exclusions>
119119
<!-- Exclude an old version of guava which is being pulled
120120
in by a transitive dependency google-api-client 1.19.0 -->

src/main/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariants.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ public static PCollection<Variant> joinVariantsTransform(
8888
PCollection<SearchVariantsRequest> input, GenomicsFactory.OfflineAuth auth, String fields) {
8989
for (String field : REQUIRED_FIELDS) {
9090
Preconditions
91-
.checkState(
91+
.checkArgument(
9292
fields.contains(field),
93-
"Required field missing: %s Add this field to the list of Variants fields returned in the partial response.",
93+
"Required field missing: '%s' Add this field to the list of Variants fields returned in the partial response.",
9494
field);
9595
}
9696
return joinVariants(input, auth, fields);

src/main/java/com/google/cloud/genomics/dataflow/pipelines/ConvertNonVariantSegmentsToVariants.java

Lines changed: 0 additions & 165 deletions
This file was deleted.

src/main/java/com/google/cloud/genomics/dataflow/pipelines/CountReads.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ private static PCollection<Read> getReads() throws IOException {
142142
private static PCollection<Read> getReadsFromAPI() {
143143
List<SearchReadsRequest> requests = getReadRequests(options);
144144
PCollection<SearchReadsRequest> readRequests =
145-
DataflowWorkarounds.getPCollection(requests, p, options.getNumWorkers());
145+
DataflowWorkarounds.getPCollection(requests, p);
146146
PCollection<Read> reads =
147147
readRequests.apply(
148148
ParDo.of(

src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@
4848

4949
public class IdentityByState {
5050
private static final String VARIANT_FIELDS =
51-
"nextPageToken,variants(id,calls(genotype,callSetName))";
51+
"nextPageToken,variants(start,calls(genotype,callSetName))";
5252

5353
public static void main(String[] args) throws IOException, GeneralSecurityException,
5454
InstantiationException, IllegalAccessException {
@@ -64,7 +64,7 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
6464
Pipeline p = Pipeline.create(options);
6565
DataflowWorkarounds.registerGenomicsCoders(p);
6666
PCollection<SearchVariantsRequest> input =
67-
DataflowWorkarounds.getPCollection(requests, p, options.getNumWorkers());
67+
DataflowWorkarounds.getPCollection(requests, p);
6868

6969
PCollection<Variant> variants =
7070
options.getHasNonVariantSegments()

src/main/java/com/google/cloud/genomics/dataflow/pipelines/TransmissionProbability.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
*/
4545
public class TransmissionProbability {
4646
private static final String VARIANT_FIELDS
47-
= "nextPageToken,variants(id,names,calls(info,callSetName))";
47+
= "nextPageToken,variants(id,start,names,calls(info,callSetName))";
4848

4949
public static void main(String[] args) throws IOException, GeneralSecurityException {
5050
GenomicsDatasetOptions options = PipelineOptionsFactory.fromArgs(args)
@@ -65,7 +65,7 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
6565
// - Groups Transmission sources by Variant,
6666
// - Calculate transmission Probability for each variant
6767
// - Print calculated values to a file.
68-
DataflowWorkarounds.getPCollection(requests, p, options.getNumWorkers())
68+
DataflowWorkarounds.getPCollection(requests, p)
6969
.apply(ParDo.named("VariantReader")
7070
.of(new VariantReader(auth, ShardBoundary.STRICT, VARIANT_FIELDS)))
7171
.apply(ParDo.named("ExtractFamilyVariantStatus")

src/main/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarity.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
*/
4040
public class VariantSimilarity {
4141
private static final String VARIANT_FIELDS
42-
= "nextPageToken,variants(id,calls(genotype,callSetName))";
42+
= "nextPageToken,variants(start,calls(genotype,callSetName))";
4343

4444
public static void main(String[] args) throws IOException, GeneralSecurityException {
4545
GenomicsDatasetOptions options = PipelineOptionsFactory.fromArgs(args)
@@ -54,7 +54,7 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
5454
Pipeline p = Pipeline.create(options);
5555
DataflowWorkarounds.registerGenomicsCoders(p);
5656
DataflowWorkarounds
57-
.getPCollection(requests, p, options.getNumWorkers())
57+
.getPCollection(requests, p)
5858
.apply(
5959
ParDo.named("VariantReader").of(
6060
new VariantReader(auth, ShardBoundary.STRICT, VARIANT_FIELDS)))

src/main/java/com/google/cloud/genomics/dataflow/readers/GenomicsApiReader.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
import com.google.api.client.json.GenericJson;
1919
import com.google.api.services.genomics.Genomics;
20+
import com.google.cloud.dataflow.sdk.transforms.Aggregator;
2021
import com.google.cloud.dataflow.sdk.transforms.DoFn;
22+
import com.google.cloud.dataflow.sdk.transforms.Sum;
2123
import com.google.cloud.genomics.utils.GenomicsFactory;
2224

2325
import java.io.IOException;
@@ -31,18 +33,33 @@ public abstract class GenomicsApiReader<I extends GenericJson, O extends Generic
3133
// Used for access to the genomics API
3234
protected final GenomicsFactory.OfflineAuth auth;
3335
protected final String fields;
34-
36+
protected Aggregator<Integer> initializedRequestsCount;
37+
protected Aggregator<Integer> unsuccessfulResponsesCount;
38+
protected Aggregator<Integer> ioExceptionsCount;
39+
protected Aggregator<Long> itemCount;
40+
3541
public GenomicsApiReader(GenomicsFactory.OfflineAuth auth, String fields) {
3642
this.auth = auth;
3743
this.fields = fields;
3844
}
3945

46+
@Override
47+
public void startBundle(Context c) {
48+
initializedRequestsCount = c.createAggregator("Genomics API Initialized Request Count", new Sum.SumIntegerFn());
49+
unsuccessfulResponsesCount = c.createAggregator("Genomics API Unsuccessful Response Count", new Sum.SumIntegerFn());
50+
ioExceptionsCount = c.createAggregator("Genomics API IOException Response Count", new Sum.SumIntegerFn());
51+
itemCount = c.createAggregator("Genomics API Item Count", new Sum.SumLongFn());
52+
}
53+
4054
@Override
4155
public void processElement(ProcessContext c) {
4256
try {
4357
GenomicsFactory factory = auth.getDefaultFactory();
4458
processApiCall(auth.getGenomics(factory), c, c.element());
4559

60+
initializedRequestsCount.addValue(factory.initializedRequestsCount());
61+
unsuccessfulResponsesCount.addValue(factory.unsuccessfulResponsesCount());
62+
ioExceptionsCount.addValue(factory.ioExceptionsCount());
4663
LOG.info("ApiReader processed " + factory.initializedRequestsCount() + " requests ("
4764
+ factory.unsuccessfulResponsesCount() + " server errors and "
4865
+ factory.ioExceptionsCount() + " IO exceptions)");

src/main/java/com/google/cloud/genomics/dataflow/readers/ReadReader.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ protected void processApiCall(Genomics genomics, ProcessContext c, SearchReadsRe
5757

5858
for (Read read : Paginator.Reads.create(genomics, shardBoundary).search(request, fields)) {
5959
c.output(read);
60+
itemCount.addValue(1L);
6061
}
6162
}
6263
}

src/main/java/com/google/cloud/genomics/dataflow/readers/VariantReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public class VariantReader extends GenomicsApiReader<SearchVariantsRequest, Vari
3131

3232
/**
3333
* Create a VariantReader using a auth and fields parameter. All fields not specified under
34-
* readFields will not be returned in the API response.
34+
* variantFields will not be returned in the API response.
3535
*
3636
* @param auth Auth class containing credentials.
3737
* @param variantFields Fields to return in responses.
@@ -52,12 +52,13 @@ public VariantReader(GenomicsFactory.OfflineAuth auth, ShardBoundary shardBounda
5252
@Override
5353
protected void processApiCall(Genomics genomics, ProcessContext c, SearchVariantsRequest request)
5454
throws IOException {
55-
LOG.info("Starting Variants read loop");
55+
LOG.info("Starting Variants read loop: " + request);
5656

5757
int numberOfVariants = 0;
5858
for (Variant variant : Paginator.Variants.create(genomics, shardBoundary).search(request, fields)) {
5959
c.output(variant);
6060
++numberOfVariants;
61+
itemCount.addValue(1L);
6162
}
6263

6364
LOG.info("Read " + numberOfVariants + " variants at: " + request.getReferenceName() + "-" + "["

0 commit comments

Comments
 (0)