Skip to content

Commit 2ce3cfc

Browse files
committed
Merge pull request googlegenomics#47 from deflaux/master
Update to latest version of DataflowSDK.
2 parents d59a9fb + 38ec487 commit 2ce3cfc

File tree

9 files changed

+65
-113
lines changed

9 files changed

+65
-113
lines changed

README.rst

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ Getting started
3131
Project ID and Google Cloud Storage bucket you made in the first step.
3232
This command runs the VariantSimilarity pipeline (which runs PCoA on a dataset)::
3333

34-
java -cp target/google-genomics-dataflow-v1beta2-0.2-SNAPSHOT.jar \
34+
java -cp target/google-genomics-dataflow-*.jar \
3535
com.google.cloud.genomics.dataflow.pipelines.VariantSimilarity \
3636
--project=my-project-id \
3737
--output=gs://my-bucket/output/localtest.txt \
@@ -43,7 +43,7 @@ Getting started
4343
* To deploy your pipeline (which runs on Google Compute Engine), some additional
4444
command line arguments are required::
4545

46-
java -cp target/google-genomics-dataflow-v1beta2-0.2-SNAPSHOT.jar \
46+
java -cp target/google-genomics-dataflow-*.jar \
4747
com.google.cloud.genomics.dataflow.pipelines.VariantSimilarity \
4848
--runner=BlockingDataflowPipelineRunner \
4949
--project=my-project-id \
@@ -65,7 +65,7 @@ Identity By State (IBS)
6565
first argument provided in the above command lines. For example, to run Identity by State
6666
change ``VariantSimilarity`` to ``IdentityByState``::
6767

68-
java -cp target/google-genomics-dataflow-v1beta2-0.2-SNAPSHOT.jar \
68+
java -cp target/google-genomics-dataflow-*.jar \
6969
com.google.cloud.genomics.dataflow.pipelines.IdentityByState \
7070
--project=my-project-id \
7171
--output=gs://my-bucket/localtest.txt \
@@ -122,15 +122,26 @@ To depend on this code, add the following to your ``pom.xml`` file::
122122
<dependency>
123123
<groupId>com.google.cloud.genomics</groupId>
124124
<artifactId>google-genomics-dataflow</artifactId>
125-
<version>v1beta2-0.2</version>
125+
<version>LATEST</version>
126126
</dependency>
127127
</dependencies>
128128
</project>
129129

130130
You can find the latest version in
131131
`Maven's central repository <https://search.maven.org/#search%7Cga%7C1%7Ca%3A%22google-genomics-dataflow%22>`_
132132

133-
We'll soon include an example pipeline that depends on this code in another GitHub repository.
133+
For an example pipeline that depends on this code in another GitHub repository, see https://github.com/googlegenomics/codelabs/tree/master/Java/PlatinumGenomes-variant-transformation.
134+
135+
Updating the prebuilt jar
136+
-------------------------
137+
138+
The prebuilt jar is currently generated by hand, whenever the code seems stable and has enough new features to warrant the effort.
139+
140+
To update the jar, we use a different set of maven commands:
141+
142+
cd dataflow-java
143+
mvn clean compile bundle:bundle
144+
cp target/google-genomics-dataflow-*.jar google-genomics-dataflow.jar
134145
135146
Project status
136147
--------------

pom.xml

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@
6969
<dependency>
7070
<groupId>com.google.cloud.dataflow</groupId>
7171
<artifactId>google-cloud-dataflow-java-sdk-all</artifactId>
72-
<version>0.3.150109</version>
72+
<version>0.3.150326</version>
7373
</dependency>
7474

7575
<!-- Google client dependencies -->
@@ -124,19 +124,6 @@
124124
</exclusion>
125125
</exclusions>
126126
</dependency>
127-
<dependency>
128-
<groupId>com.google.apis</groupId>
129-
<artifactId>google-api-services-dataflow</artifactId>
130-
<version>v1b4-rev1-1.19.1</version>
131-
<exclusions>
132-
<!-- Exclude an old version of guava which is being pulled
133-
in by a transitive dependency google-api-client 1.19.0 -->
134-
<exclusion>
135-
<groupId>com.google.guava</groupId>
136-
<artifactId>guava-jdk5</artifactId>
137-
</exclusion>
138-
</exclusions>
139-
</dependency>
140127
<dependency>
141128
<groupId>com.google.cloud.genomics</groupId>
142129
<artifactId>google-genomics-utils</artifactId>

src/main/java/com/google/cloud/genomics/dataflow/pipelines/AnnotateVariants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,13 @@ private static String canonicalizeRefName(String refName) {
250250
}
251251

252252
public static void main(String[] args) throws Exception {
253+
// Register the options so that they show up via --help
254+
PipelineOptionsFactory.register(GenomicsDatasetOptions.class);
253255
GenomicsDatasetOptions opts = PipelineOptionsFactory.fromArgs(args)
254256
.withValidation().as(GenomicsDatasetOptions.class);
257+
// Option validation is not yet automatic, we make an explicit call here.
258+
GenomicsDatasetOptions.Methods.validateOptions(opts);
259+
255260
GenomicsFactory.OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(opts);
256261
Genomics genomics = auth.getGenomics(auth.getDefaultFactory());
257262

src/main/java/com/google/cloud/genomics/dataflow/pipelines/CountReads.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,12 @@ public Contig apply(String contigString) {
111111
}
112112

113113
public static void main(String[] args) throws GeneralSecurityException, IOException {
114+
// Register the options so that they show up via --help
115+
PipelineOptionsFactory.register(CountReadsOptions.class);
114116
options = PipelineOptionsFactory.fromArgs(args).withValidation().as(CountReadsOptions.class);
115-
GenomicsOptions.Methods.validateOptions(options);
117+
// Option validation is not yet automatic, we make an explicit call here.
118+
GenomicsDatasetOptions.Methods.validateOptions(options);
119+
116120
auth = GCSOptions.Methods.createGCSAuth(options);
117121
p = Pipeline.create(options);
118122
DataflowWorkarounds.registerGenomicsCoders(p);
@@ -141,8 +145,8 @@ private static PCollection<Read> getReads() throws IOException {
141145

142146
private static PCollection<Read> getReadsFromAPI() {
143147
List<SearchReadsRequest> requests = getReadRequests(options);
144-
PCollection<SearchReadsRequest> readRequests =
145-
DataflowWorkarounds.getPCollection(requests, p);
148+
PCollection<SearchReadsRequest> readRequests = p.begin()
149+
.apply(Create.of(requests));
146150
PCollection<Read> reads =
147151
readRequests.apply(
148152
ParDo.of(

src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import com.google.cloud.dataflow.sdk.io.TextIO;
2424
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
2525
import com.google.cloud.dataflow.sdk.transforms.Combine;
26+
import com.google.cloud.dataflow.sdk.transforms.Create;
2627
import com.google.cloud.dataflow.sdk.transforms.ParDo;
2728
import com.google.cloud.dataflow.sdk.values.KV;
2829
import com.google.cloud.dataflow.sdk.values.PCollection;
@@ -52,8 +53,11 @@ public class IdentityByState {
5253

5354
public static void main(String[] args) throws IOException, GeneralSecurityException,
5455
InstantiationException, IllegalAccessException {
56+
// Register the options so that they show up via --help
57+
PipelineOptionsFactory.register(IdentityByStateOptions.class);
5558
IdentityByStateOptions options =
5659
PipelineOptionsFactory.fromArgs(args).withValidation().as(IdentityByStateOptions.class);
60+
// Option validation is not yet automatic, we make an explicit call here.
5761
GenomicsDatasetOptions.Methods.validateOptions(options);
5862

5963
GenomicsFactory.OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options);
@@ -63,8 +67,7 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
6367

6468
Pipeline p = Pipeline.create(options);
6569
DataflowWorkarounds.registerGenomicsCoders(p);
66-
PCollection<SearchVariantsRequest> input =
67-
DataflowWorkarounds.getPCollection(requests, p);
70+
PCollection<SearchVariantsRequest> input = p.begin().apply(Create.of(requests));
6871

6972
PCollection<Variant> variants =
7073
options.getHasNonVariantSegments()

src/main/java/com/google/cloud/genomics/dataflow/pipelines/TransmissionProbability.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.google.cloud.dataflow.sdk.Pipeline;
2020
import com.google.cloud.dataflow.sdk.io.TextIO;
2121
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
22+
import com.google.cloud.dataflow.sdk.transforms.Create;
2223
import com.google.cloud.dataflow.sdk.transforms.DoFn;
2324
import com.google.cloud.dataflow.sdk.transforms.GroupByKey;
2425
import com.google.cloud.dataflow.sdk.transforms.ParDo;
@@ -47,9 +48,12 @@ public class TransmissionProbability {
4748
= "nextPageToken,variants(id,start,names,calls(info,callSetName))";
4849

4950
public static void main(String[] args) throws IOException, GeneralSecurityException {
51+
// Register the options so that they show up via --help
52+
PipelineOptionsFactory.register(GenomicsDatasetOptions.class);
5053
GenomicsDatasetOptions options = PipelineOptionsFactory.fromArgs(args)
5154
.withValidation().as(GenomicsDatasetOptions.class);
52-
GenomicsOptions.Methods.validateOptions(options);
55+
// Option validation is not yet automatic, we make an explicit call here.
56+
GenomicsDatasetOptions.Methods.validateOptions(options);
5357

5458
GenomicsFactory.OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options);
5559
List<SearchVariantsRequest> requests = GenomicsDatasetOptions.Methods.getVariantRequests(
@@ -65,7 +69,8 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
6569
// - Groups Transmission sources by Variant,
6670
// - Calculate transmission Probability for each variant
6771
// - Print calculated values to a file.
68-
DataflowWorkarounds.getPCollection(requests, p)
72+
p.begin()
73+
.apply(Create.of(requests))
6974
.apply(ParDo.named("VariantReader")
7075
.of(new VariantReader(auth, ShardBoundary.STRICT, VARIANT_FIELDS)))
7176
.apply(ParDo.named("ExtractFamilyVariantStatus")

src/main/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarity.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.api.services.genomics.model.SearchVariantsRequest;
1919
import com.google.cloud.dataflow.sdk.Pipeline;
2020
import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory;
21+
import com.google.cloud.dataflow.sdk.transforms.Create;
2122
import com.google.cloud.dataflow.sdk.transforms.ParDo;
2223
import com.google.cloud.genomics.dataflow.functions.ExtractSimilarCallsets;
2324
import com.google.cloud.genomics.dataflow.functions.OutputPCoAFile;
@@ -42,9 +43,12 @@ public class VariantSimilarity {
4243
= "nextPageToken,variants(start,calls(genotype,callSetName))";
4344

4445
public static void main(String[] args) throws IOException, GeneralSecurityException {
46+
// Register the options so that they show up via --help
47+
PipelineOptionsFactory.register(GenomicsDatasetOptions.class);
4548
GenomicsDatasetOptions options = PipelineOptionsFactory.fromArgs(args)
4649
.withValidation().as(GenomicsDatasetOptions.class);
47-
GenomicsOptions.Methods.validateOptions(options);
50+
// Option validation is not yet automatic, we make an explicit call here.
51+
GenomicsDatasetOptions.Methods.validateOptions(options);
4852

4953
GenomicsFactory.OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options);
5054
List<SearchVariantsRequest> requests =
@@ -53,8 +57,8 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept
5357

5458
Pipeline p = Pipeline.create(options);
5559
DataflowWorkarounds.registerGenomicsCoders(p);
56-
DataflowWorkarounds
57-
.getPCollection(requests, p)
60+
p.begin()
61+
.apply(Create.of(requests))
5862
.apply(
5963
ParDo.named("VariantReader").of(
6064
new VariantReader(auth, ShardBoundary.STRICT, VARIANT_FIELDS)))

src/main/java/com/google/cloud/genomics/dataflow/readers/bam/ReadBAMTransform.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ public static PCollection<Read> getReadsFromBAMFilesSharded(
6969

7070
public static class ShardFn extends DoFn<String, BAMShard> {
7171
GenomicsFactory.OfflineAuth auth;
72-
PCollectionView<Iterable<Contig>, ?> contigsView;
72+
PCollectionView<Iterable<Contig>> contigsView;
7373
Storage.Objects storage;
7474

75-
public ShardFn(GenomicsFactory.OfflineAuth auth, PCollectionView<Iterable<Contig>, ?> contigsView) {
75+
public ShardFn(GenomicsFactory.OfflineAuth auth, PCollectionView<Iterable<Contig>> contigsView) {
7676
this.auth = auth;
7777
this.contigsView = contigsView;
7878
}
@@ -115,7 +115,7 @@ public void processElement(ProcessContext c) throws java.lang.Exception {
115115
public PCollection<Read> apply(PCollectionTuple contigsAndBAMs) {
116116

117117
final PCollection<Contig> contigs = contigsAndBAMs.get(CONTIGS_TAG);
118-
final PCollectionView<Iterable<Contig>, ?> contigsView =
118+
final PCollectionView<Iterable<Contig>> contigsView =
119119
contigs.apply(View.<Contig>asIterable());
120120

121121
final PCollection<String> BAMFileGCSPaths = contigsAndBAMs.get(BAMS_TAG);

src/main/java/com/google/cloud/genomics/dataflow/utils/DataflowWorkarounds.java

Lines changed: 14 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -15,23 +15,13 @@
1515
*/
1616
package com.google.cloud.genomics.dataflow.utils;
1717

18-
import com.google.api.client.json.GenericJson;
19-
import com.google.cloud.dataflow.sdk.Pipeline;
20-
import com.google.cloud.dataflow.sdk.coders.Coder;
21-
import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
22-
import com.google.cloud.dataflow.sdk.options.DataflowPipelineOptions;
23-
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
24-
import com.google.cloud.dataflow.sdk.runners.DataflowPipelineRegistrar.Options;
25-
import com.google.cloud.dataflow.sdk.transforms.Create;
26-
import com.google.cloud.dataflow.sdk.transforms.Flatten;
27-
import com.google.cloud.dataflow.sdk.values.PCollection;
28-
import com.google.cloud.dataflow.sdk.values.PCollectionList;
29-
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
30-
import com.google.common.base.Predicate;
31-
import com.google.common.base.Splitter;
32-
import com.google.common.collect.Iterables;
33-
import com.google.common.collect.Lists;
34-
import com.google.common.primitives.UnsignedInts;
18+
import static com.google.common.collect.Lists.newArrayList;
19+
20+
import java.net.URL;
21+
import java.util.Collection;
22+
import java.util.LinkedList;
23+
import java.util.List;
24+
import java.util.logging.Logger;
3525

3626
import org.reflections.Reflections;
3727
import org.reflections.scanners.ResourcesScanner;
@@ -40,13 +30,13 @@
4030
import org.reflections.util.ConfigurationBuilder;
4131
import org.reflections.util.FilterBuilder;
4232

43-
import java.net.URL;
44-
import java.util.Collection;
45-
import java.util.LinkedList;
46-
import java.util.List;
47-
import java.util.logging.Logger;
48-
49-
import static com.google.common.collect.Lists.newArrayList;
33+
import com.google.api.client.json.GenericJson;
34+
import com.google.cloud.dataflow.sdk.Pipeline;
35+
import com.google.cloud.dataflow.sdk.coders.Coder;
36+
import com.google.cloud.dataflow.sdk.coders.CoderRegistry;
37+
import com.google.cloud.genomics.dataflow.coders.GenericJsonCoder;
38+
import com.google.common.base.Predicate;
39+
import com.google.common.collect.Iterables;
5040

5141
/**
5242
* Contains dataflow-related workarounds.
@@ -107,61 +97,4 @@ public boolean apply(URL url) {
10797
DataflowWorkarounds.registerCoder(p, clazz, GenericJsonCoder.of(clazz));
10898
}
10999
}
110-
111-
/**
112-
* Change a flat list of sharding options into a flattened PCollection to force dataflow to use
113-
* multiple workers. In the future, this shouldn't be necessary.
114-
*/
115-
public static <T> PCollection<T> getPCollection(List<T> shardOptions, Coder<T> coder,
116-
Pipeline p) {
117-
118-
DataflowPipelineOptions options = (GenomicsOptions) p.getOptions();
119-
int numWorkers = options.getNumWorkers();
120-
String machineType = options.getMachineType();
121-
122-
if (machineType != null) {
123-
String[] machineNameParts =
124-
Iterables.toArray(Splitter.on('-').split(machineType), String.class);
125-
if (3 == machineNameParts.length) {
126-
try {
127-
int numCores = UnsignedInts.parseUnsignedInt(machineNameParts[2]);
128-
numWorkers *= numCores;
129-
} catch (Exception e) {
130-
LOG.warning("Assuming one core per worker: " + e);
131-
}
132-
}
133-
}
134-
135-
LOG.info("Turning " + shardOptions.size() + " options into " + numWorkers + " workers");
136-
numWorkers = Math.min(shardOptions.size(), numWorkers);
137-
138-
int optionsPerWorker = (int) Math.ceil(shardOptions.size() / numWorkers);
139-
List<PCollection<T>> pCollections = Lists.newArrayList();
140-
141-
142-
for (int i = 0; i < numWorkers; i++) {
143-
int start = i * optionsPerWorker;
144-
int end = Math.min(shardOptions.size(), start + optionsPerWorker);
145-
146-
// It's possible for start >= end in the last worker,
147-
// in which case we'll just skip the collection.
148-
if (start >= end) {
149-
break;
150-
}
151-
152-
LOG.info("Adding collection with " + start + " to " + end);
153-
PCollection<T> collection = p.begin().apply(Create.of(shardOptions.subList(start, end)));
154-
if (coder != null) {
155-
collection.setCoder(coder);
156-
}
157-
pCollections.add(collection);
158-
}
159-
160-
return PCollectionList.of(pCollections).apply(Flatten.<T>create());
161-
}
162-
163-
public static <T> PCollection<T> getPCollection(
164-
List<T> shardOptions, Pipeline p) {
165-
return getPCollection(shardOptions, null, p);
166-
}
167100
}

0 commit comments

Comments
 (0)