diff --git a/pom.xml b/pom.xml index 3173e43..2fa5425 100644 --- a/pom.xml +++ b/pom.xml @@ -114,7 +114,7 @@ com.google.apis google-api-services-genomics - v1beta2-rev9-1.19.0 + v1beta2-rev25-1.19.1 diff --git a/src/main/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariants.java b/src/main/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariants.java index 8df8483..37f5dac 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariants.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/functions/JoinNonVariantSegmentsWithVariants.java @@ -88,9 +88,9 @@ public static PCollection joinVariantsTransform( PCollection input, GenomicsFactory.OfflineAuth auth, String fields) { for (String field : REQUIRED_FIELDS) { Preconditions - .checkState( + .checkArgument( fields.contains(field), - "Required field missing: %s Add this field to the list of Variants fields returned in the partial response.", + "Required field missing: '%s' Add this field to the list of Variants fields returned in the partial response.", field); } return joinVariants(input, auth, fields); diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/ConvertNonVariantSegmentsToVariants.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/ConvertNonVariantSegmentsToVariants.java deleted file mode 100644 index 76801fc..0000000 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/ConvertNonVariantSegmentsToVariants.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Copyright (C) 2014 Google Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except - * in compliance with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.genomics.dataflow.pipelines; - -import java.io.IOException; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.List; - -import com.google.api.client.repackaged.com.google.common.base.Preconditions; -import com.google.api.services.bigquery.model.TableFieldSchema; -import com.google.api.services.bigquery.model.TableRow; -import com.google.api.services.bigquery.model.TableSchema; -import com.google.api.services.genomics.model.Call; -import com.google.api.services.genomics.model.SearchVariantsRequest; -import com.google.api.services.genomics.model.Variant; -import com.google.cloud.dataflow.sdk.Pipeline; -import com.google.cloud.dataflow.sdk.io.BigQueryIO; -import com.google.cloud.dataflow.sdk.options.Description; -import com.google.cloud.dataflow.sdk.options.PipelineOptionsFactory; -import com.google.cloud.dataflow.sdk.options.Validation; -import com.google.cloud.dataflow.sdk.transforms.DoFn; -import com.google.cloud.dataflow.sdk.transforms.ParDo; -import com.google.cloud.dataflow.sdk.values.PCollection; -import com.google.cloud.genomics.dataflow.functions.JoinNonVariantSegmentsWithVariants; -import com.google.cloud.genomics.dataflow.utils.DataflowWorkarounds; -import com.google.cloud.genomics.dataflow.utils.GenomicsDatasetOptions; -import com.google.cloud.genomics.dataflow.utils.GenomicsOptions; -import com.google.cloud.genomics.utils.Contig.SexChromosomeFilter; -import com.google.cloud.genomics.utils.GenomicsFactory; - -/** - * Sample pipeline that converts data with non-variant segments (such as data that was in source - * format Genome VCF (gVCF) or Complete Genomics) to variant-only data with calls from - * non-variant-segments merged into the variants with which they overlap. The resultant data is - * emitted to a BigQuery table. - * - * This is currently done only for SNP variants. Indels and structural variants are left as-is. - *

- * The sample could be expanded upon to: - *

    - *
  1. emit additional fields from the variants and calls - *
  2. perform additional data munging - *
- */ -public class ConvertNonVariantSegmentsToVariants { - - /** - * Options supported by {@link ConvertNonVariantSegmentsToVariants}. - *

- * Inherits standard configuration options for Genomics pipelines and datasets. - */ - private static interface Options extends GenomicsDatasetOptions { - @Override - @Description("BigQuery table to write to, specified as " - + ":.. The dataset must already exist.") - @Validation.Required - String getOutput(); - - @Override - void setOutput(String value); - } - - /** - * Construct the table schema for the output table. - * - * @return The schema for the destination table. - */ - private static TableSchema getTableSchema() { - // TODO use variant set metadata to auto-generate the BigQuery schema - List callFields = new ArrayList<>(); - callFields.add(new TableFieldSchema().setName("call_set_name").setType("STRING")); - callFields.add(new TableFieldSchema().setName("phaseset").setType("STRING")); - callFields.add(new TableFieldSchema().setName("genotype").setType("INTEGER") - .setMode("REPEATED")); - callFields.add(new TableFieldSchema().setName("genotype_likelihood").setType("FLOAT") - .setMode("REPEATED")); - - List fields = new ArrayList<>(); - fields.add(new TableFieldSchema().setName("variant_id").setType("STRING")); - fields.add(new TableFieldSchema().setName("reference_name").setType("STRING")); - fields.add(new TableFieldSchema().setName("start").setType("INTEGER")); - fields.add(new TableFieldSchema().setName("end").setType("INTEGER")); - fields.add(new TableFieldSchema().setName("reference_bases").setType("STRING")); - fields.add(new TableFieldSchema().setName("alternate_bases").setType("STRING") - .setMode("REPEATED")); - fields.add(new TableFieldSchema().setName("call").setType("RECORD").setMode("REPEATED") - .setFields(callFields)); - - return new TableSchema().setFields(fields); - } - - /** - * Prepares the data for writing to BigQuery by building a TableRow object containing data from - * the variant mapped onto the schema to be used for the destination table. - */ - static class FormatVariantsFn extends DoFn { - @Override - public void processElement(ProcessContext c) { - Variant v = c.element(); - - List calls = new ArrayList<>(); - for (Call call : v.getCalls()) { - calls.add(new TableRow() - .set("call_set_name", call.getCallSetName()) - .set("phaseset", call.getPhaseset()) - .set("genotype", call.getGenotype()) - .set( - "genotype_likelihood", - (call.getGenotypeLikelihood() == null) ? new ArrayList() : - call.getGenotypeLikelihood() - )); - } - - TableRow row = - new TableRow().set("variant_id", v.getId()).set("reference_name", v.getReferenceName()) - .set("start", v.getStart()).set("end", v.getEnd()) - .set("reference_bases", v.getReferenceBases()) - .set("alternate_bases", v.getAlternateBases()).set("call", calls); - c.output(row); - } - } - - public static void main(String[] args) throws IOException, GeneralSecurityException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - GenomicsDatasetOptions.Methods.validateOptions(options); - - Preconditions.checkState(options.getHasNonVariantSegments(), - "This job is only valid for data containing non-variant segments. " - + "Set the --hasNonVariantSegments command line option accordingly."); - - GenomicsFactory.OfflineAuth auth = GenomicsOptions.Methods.getGenomicsAuth(options); - List requests = - GenomicsDatasetOptions.Methods.getVariantRequests(options, auth, - SexChromosomeFilter.INCLUDE_XY); - - Pipeline p = Pipeline.create(options); - DataflowWorkarounds.registerGenomicsCoders(p); - - PCollection input = - DataflowWorkarounds.getPCollection(requests, p, options.getNumWorkers()); - - PCollection variants = - JoinNonVariantSegmentsWithVariants.joinVariantsTransform(input, auth, - JoinNonVariantSegmentsWithVariants.VARIANT_JOIN_FIELDS); - - variants.apply(ParDo.of(new FormatVariantsFn())).apply( - BigQueryIO.Write.to(options.getOutput()).withSchema(getTableSchema()) - .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) - .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); - - p.run(); - } -} diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/CountReads.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/CountReads.java index 91b938d..09eb94e 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/CountReads.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/CountReads.java @@ -140,7 +140,7 @@ private static PCollection getReads() throws IOException { private static PCollection getReadsFromAPI() { List requests = getReadRequests(options); PCollection readRequests = - DataflowWorkarounds.getPCollection(requests, p, options.getNumWorkers()); + DataflowWorkarounds.getPCollection(requests, p); PCollection reads = readRequests.apply( ParDo.of( diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java index e7656e6..9412d20 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/IdentityByState.java @@ -48,7 +48,7 @@ public class IdentityByState { private static final String VARIANT_FIELDS = - "nextPageToken,variants(id,calls(genotype,callSetName))"; + "nextPageToken,variants(start,calls(genotype,callSetName))"; public static void main(String[] args) throws IOException, GeneralSecurityException, InstantiationException, IllegalAccessException { @@ -64,7 +64,7 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept Pipeline p = Pipeline.create(options); DataflowWorkarounds.registerGenomicsCoders(p); PCollection input = - DataflowWorkarounds.getPCollection(requests, p, options.getNumWorkers()); + DataflowWorkarounds.getPCollection(requests, p); PCollection variants = options.getHasNonVariantSegments() diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/TransmissionProbability.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/TransmissionProbability.java index f204a4e..6d14b88 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/TransmissionProbability.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/TransmissionProbability.java @@ -44,7 +44,7 @@ */ public class TransmissionProbability { private static final String VARIANT_FIELDS - = "nextPageToken,variants(id,names,calls(info,callSetName))"; + = "nextPageToken,variants(id,start,names,calls(info,callSetName))"; public static void main(String[] args) throws IOException, GeneralSecurityException { GenomicsDatasetOptions options = PipelineOptionsFactory.fromArgs(args) @@ -65,7 +65,7 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept // - Groups Transmission sources by Variant, // - Calculate transmission Probability for each variant // - Print calculated values to a file. - DataflowWorkarounds.getPCollection(requests, p, options.getNumWorkers()) + DataflowWorkarounds.getPCollection(requests, p) .apply(ParDo.named("VariantReader") .of(new VariantReader(auth, ShardBoundary.STRICT, VARIANT_FIELDS))) .apply(ParDo.named("ExtractFamilyVariantStatus") diff --git a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarity.java b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarity.java index 489d2f9..42b67ac 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarity.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/pipelines/VariantSimilarity.java @@ -39,7 +39,7 @@ */ public class VariantSimilarity { private static final String VARIANT_FIELDS - = "nextPageToken,variants(id,calls(genotype,callSetName))"; + = "nextPageToken,variants(start,calls(genotype,callSetName))"; public static void main(String[] args) throws IOException, GeneralSecurityException { GenomicsDatasetOptions options = PipelineOptionsFactory.fromArgs(args) @@ -54,7 +54,7 @@ public static void main(String[] args) throws IOException, GeneralSecurityExcept Pipeline p = Pipeline.create(options); DataflowWorkarounds.registerGenomicsCoders(p); DataflowWorkarounds - .getPCollection(requests, p, options.getNumWorkers()) + .getPCollection(requests, p) .apply( ParDo.named("VariantReader").of( new VariantReader(auth, ShardBoundary.STRICT, VARIANT_FIELDS))) diff --git a/src/main/java/com/google/cloud/genomics/dataflow/readers/GenomicsApiReader.java b/src/main/java/com/google/cloud/genomics/dataflow/readers/GenomicsApiReader.java index 0b34b7c..f6b845c 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/readers/GenomicsApiReader.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/readers/GenomicsApiReader.java @@ -17,7 +17,9 @@ import com.google.api.client.json.GenericJson; import com.google.api.services.genomics.Genomics; +import com.google.cloud.dataflow.sdk.transforms.Aggregator; import com.google.cloud.dataflow.sdk.transforms.DoFn; +import com.google.cloud.dataflow.sdk.transforms.Sum; import com.google.cloud.genomics.utils.GenomicsFactory; import java.io.IOException; @@ -31,18 +33,33 @@ public abstract class GenomicsApiReader initializedRequestsCount; + protected Aggregator unsuccessfulResponsesCount; + protected Aggregator ioExceptionsCount; + protected Aggregator itemCount; + public GenomicsApiReader(GenomicsFactory.OfflineAuth auth, String fields) { this.auth = auth; this.fields = fields; } + @Override + public void startBundle(Context c) { + initializedRequestsCount = c.createAggregator("Genomics API Initialized Request Count", new Sum.SumIntegerFn()); + unsuccessfulResponsesCount = c.createAggregator("Genomics API Unsuccessful Response Count", new Sum.SumIntegerFn()); + ioExceptionsCount = c.createAggregator("Genomics API IOException Response Count", new Sum.SumIntegerFn()); + itemCount = c.createAggregator("Genomics API Item Count", new Sum.SumLongFn()); + } + @Override public void processElement(ProcessContext c) { try { GenomicsFactory factory = auth.getDefaultFactory(); processApiCall(auth.getGenomics(factory), c, c.element()); + initializedRequestsCount.addValue(factory.initializedRequestsCount()); + unsuccessfulResponsesCount.addValue(factory.unsuccessfulResponsesCount()); + ioExceptionsCount.addValue(factory.ioExceptionsCount()); LOG.info("ApiReader processed " + factory.initializedRequestsCount() + " requests (" + factory.unsuccessfulResponsesCount() + " server errors and " + factory.ioExceptionsCount() + " IO exceptions)"); diff --git a/src/main/java/com/google/cloud/genomics/dataflow/readers/ReadReader.java b/src/main/java/com/google/cloud/genomics/dataflow/readers/ReadReader.java index 241c93b..3462d53 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/readers/ReadReader.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/readers/ReadReader.java @@ -57,6 +57,7 @@ protected void processApiCall(Genomics genomics, ProcessContext c, SearchReadsRe for (Read read : Paginator.Reads.create(genomics, shardBoundary).search(request, fields)) { c.output(read); + itemCount.addValue(1L); } } } diff --git a/src/main/java/com/google/cloud/genomics/dataflow/readers/VariantReader.java b/src/main/java/com/google/cloud/genomics/dataflow/readers/VariantReader.java index 63a3991..d353829 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/readers/VariantReader.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/readers/VariantReader.java @@ -31,7 +31,7 @@ public class VariantReader extends GenomicsApiReader PCollection getPCollection(List shardOptions, Coder coder, - Pipeline p, double numWorkers) { + Pipeline p) { + + DataflowPipelineOptions options = (GenomicsOptions) p.getOptions(); + int numWorkers = options.getNumWorkers(); + String machineType = options.getMachineType(); + + if (machineType != null) { + String[] machineNameParts = + Iterables.toArray(Splitter.on('-').split(machineType), String.class); + if (3 == machineNameParts.length) { + try { + int numCores = UnsignedInts.parseUnsignedInt(machineNameParts[2]); + numWorkers *= numCores; + } catch (Exception e) { + LOG.warning("Assuming one core per worker: " + e); + } + } + } LOG.info("Turning " + shardOptions.size() + " options into " + numWorkers + " workers"); numWorkers = Math.min(shardOptions.size(), numWorkers); @@ -115,6 +138,7 @@ public static PCollection getPCollection(List shardOptions, Coder c int optionsPerWorker = (int) Math.ceil(shardOptions.size() / numWorkers); List> pCollections = Lists.newArrayList(); + for (int i = 0; i < numWorkers; i++) { int start = i * optionsPerWorker; int end = Math.min(shardOptions.size(), start + optionsPerWorker); @@ -137,7 +161,7 @@ public static PCollection getPCollection(List shardOptions, Coder c } public static PCollection getPCollection( - List shardOptions, Pipeline p, double numWorkers) { - return getPCollection(shardOptions, null, p, numWorkers); + List shardOptions, Pipeline p) { + return getPCollection(shardOptions, null, p); } } diff --git a/src/main/java/com/google/cloud/genomics/dataflow/utils/GenomicsDatasetOptions.java b/src/main/java/com/google/cloud/genomics/dataflow/utils/GenomicsDatasetOptions.java index 15a816c..db9bf55 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/utils/GenomicsDatasetOptions.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/utils/GenomicsDatasetOptions.java @@ -105,8 +105,8 @@ public static void validateOptions(GenomicsDatasetOptions options) { void setBasesPerShard(long basesPerShard); @Description("The maximum number of calls to return. However, at least one variant will always " - + "be returned, even if it has more calls than this limit. By default the response will be as " - + "large as the Genomics API will allow. Use this option only to reduce response sizes.") + + "be returned, even if it has more calls than this limit. If unspecified, the default " + + "number of calls will be returned.") @Default.Integer(0) int getMaxCalls(); diff --git a/src/main/java/com/google/cloud/genomics/dataflow/utils/GenomicsOptions.java b/src/main/java/com/google/cloud/genomics/dataflow/utils/GenomicsOptions.java index 20935ae..b5ec748 100644 --- a/src/main/java/com/google/cloud/genomics/dataflow/utils/GenomicsOptions.java +++ b/src/main/java/com/google/cloud/genomics/dataflow/utils/GenomicsOptions.java @@ -64,9 +64,8 @@ public static void validateOptions(GenomicsOptions options) { void setNumberOfRetries(int numOfRetries); - @Description("Specifies number of results to return in a single page of results. By " - + "default the response will be as large as the Genomics API will allow. Use this " - + "option only to reduce response sizes.") + @Description("Specifies number of results to return in a single page of results. " + + "If unspecified, the default page size for the Genomics API is used.") @Default.Integer(0) int getPageSize();