From feec84ad466831c96cd69b71ea1fd03fc1b3ca36 Mon Sep 17 00:00:00 2001 From: "M. Morgan Aster" Date: Fri, 28 Feb 2025 09:18:59 -0500 Subject: [PATCH] TSPS-183 Beagle imputation hg38 wdl and associated support wdls (#1333) * wip add beagle imputation stuff * add 2 wdls to dockstore.yml * fix docker gar url * use the right path for jars * wip on imputation wdl * oops use correct jar * missing equals * fix java call again * fix java call * oops match file names * update beagle jar to 01Mar24.d36 * debug GatherVcfs * debug GatherVcfs 2 * try to resolve missing file issue * don't impute over padding * make the index again * supply vcf_index input to SelectVariantsByIds * update Imputation wdl too * newlines * update for hg38 * Revert "update for hg38" This reverts commit 3757137a02a7d92686a5664596261a0fb91e2624. * update for hg38 * liftover wdl * remove GCP-specific vm commands * use gatk * fix suffix and basename * fix more filenames * remove missing contig stuff for now * fix ref panel path * another chr fix * warn on missign contig * do fail if missing contig * more mem * troubleshooting wld * fixed plink path * add select_first test * cleanup * add if block to test * create and use ref panel interval list * move interval list creation to ref panel wdl * give default values for optional inputs, weird * change CountVariants calls * test * add output to test * next test * more test * another test * update real task * TSPS-226 presplit and prechunk beagle inputs (#1272) *pre splitting and prechunking beagle imputation inputs to lower log numbers and storage account egress --------- Co-authored-by: Jose Soto * TSPS-221 remove index input and add seed to make beagle tool deterministic (#1285) * remove multi sample vcf index workflow input and add it to the PreSplitVcf task. add seed number so that beagle is always deterministic. add comment to cpu input for PhaseAndImputeBeagle task * change output_callset_name to output_base_name and remove optional outputs * change n_failed_chunks ticket to an int --------- Co-authored-by: Jose Soto * rename workflow * TSPS-241 Clean up beagle wdl (#1288) * clean up wdl with stuff from TSPS-241 * try to make fail fast work with double nested scatters --------- Co-authored-by: Jose Soto * add specific gatk_docker * TSPS-142 updates to help creating simulated reference panel and running imputation against it (#1296) * add optional error count override for testing * rename reference base prefix variable and make it more user friendly --------- Co-authored-by: Jose Soto * add maxRetries 2 to all imputation beagle tasks * add prechunk wdl to dockstore * use acr for default ubuntu image * add preemptible 3 * use acr gatk docker as default * don't use preemptibles on GatherVcfs * basename fix for imputation beagle ref panel generation (#1332) * try auto specifying chr at end of basename * both tasks * add liftovervcfs to dockstore * allow specifying max mem * TSPS-269 Speed up CountVariantsInChunksBeagle by using bedtools (#1335) * try creating bed files * try again * try again again * a different thing * use bedtools and bed ref panel files * oops update the correct task * fix * use the right freaking file name * remove comment * update pipeline version to 0.0.2 * TSPS-293: Fix up streaming imputation beagle (#1347) update ImputationBeagle * add array imputation quota consumed wdl (#1425) * add array imputation quota consumed wdl * add changelogs for imputation array related workflows --------- Co-authored-by: Jose Soto * TSPS-239 get wdl running on 400k sample ref panel (#1373) * changes to help beagle imputation wdl run on a 400k sample reference panel --------- Co-authored-by: Jose Soto * remove create imputation ref panel beagle wdl and changelog * PR feedback --------- Co-authored-by: Jose Soto Co-authored-by: M. Morgan Aster * add set -e -o pipefail to all relevant imputation tasks (#1434) Co-authored-by: Jose Soto * TSPS-341 remove tasks for recovering variants not in the reference panel (#1468) * remove tasks for recovering variants not in the reference panel and separate out beagle tasks from imputation tasks * remove prechunk wdl and references to it remove "Beagle" from task names in BeagleTasks.wdl --------- Co-authored-by: Jose Soto * Updated pipeline_versions.txt with all pipeline version information * [PR to feature branch] Add testing to imputation beagle (#1503) * TSPS-239 get wdl running on 400k sample ref panel (#1373) * changes to help beagle imputation wdl run on a 400k sample reference panel --------- Co-authored-by: Jose Soto * remove create imputation ref panel beagle wdl and changelog * PR feedback --------- Co-authored-by: Jose Soto Co-authored-by: M. Morgan Aster * add new files for testing * add test wdl to .dockstore.yml * add test data json files, other updates * version to 1.0.0, update changelog * update beagle docker * update beagle docker again * fix call phase task * re-deleting ImputationBeaglePreChunk.wdl * temporarily try to run test on feature branch pr * remove vault inputs * update output basename for plumbing test * remove feature branch from gha pr branches * pr comments * add quotes in VerifyTasks.CompareVcfs * update dockers, move CreateVcfIndex to BeagleTasks --------- Co-authored-by: jsotobroad Co-authored-by: Jose Soto * Updated pipeline_versions.txt with all pipeline version information * remove newline at end of Utilities.wdl * remove LiftoverVcfs, add README for imputation_beagle * oops this commit adds the README for imputation_beagle * rename test inputs files to reflect contents * PR comments round 1 * Updated pipeline_versions.txt with all pipeline version information * update changelog for BroadInternalImputation * Updated pipeline_versions.txt with all pipeline version information * add back newline to Utilities.wdl with -w flag on changed file check * remove change to Minimac4 task * revert change to tool command in OptionalQCSites * fix fail task dependency, revert attempt to ignore newline in diff, other pr comments * update README for ImputationBeagle * rename test files * Updated pipeline_versions.txt with all pipeline version information * another commit for hashes * Updated pipeline_versions.txt with all pipeline version information * dummy commit * pr comments * Updated pipeline_versions.txt with all pipeline version information * dummy commit * dummy commit --------- Co-authored-by: jsotobroad Co-authored-by: Jose Soto Co-authored-by: GitHub Action Co-authored-by: Nikelle Petrillo <38223776+nikellepetrillo@users.noreply.github.com> Co-authored-by: npetrill --- .dockstore.yml | 12 + .github/workflows/test_imputation_beagle.yml | 75 ++++++ pipeline_versions.txt | 6 +- .../arrays/imputation/Imputation.changelog.md | 5 + .../broad/arrays/imputation/Imputation.wdl | 3 +- .../ArrayImputationQuotaConsumed.changelog.md | 4 + .../ArrayImputationQuotaConsumed.wdl | 29 +++ .../ImputationBeagle.changelog.md | 5 + .../imputation_beagle/ImputationBeagle.wdl | 226 ++++++++++++++++++ .../broad/arrays/imputation_beagle/README.md | 7 + .../Plumbing/NA12878_x10_hg38_arrays.json | 8 + .../Scientific/NA12878_x10_hg38_arrays.json | 8 + .../BroadInternalImputation.changelog.md | 5 + .../imputation/BroadInternalImputation.wdl | 2 +- scripts/firecloud_api/firecloud_api.py | 1 + .../imputation/ImputationBeagleStructs.wdl | 8 + tasks/broad/ImputationBeagleTasks.wdl | 217 +++++++++++++++++ tasks/broad/ImputationTasks.wdl | 125 ++++++---- verification/VerifyImputationBeagle.wdl | 79 ++++++ verification/VerifyTasks.wdl | 2 +- .../test-wdls/TestImputationBeagle.wdl | 111 +++++++++ 21 files changed, 888 insertions(+), 50 deletions(-) create mode 100644 .github/workflows/test_imputation_beagle.yml create mode 100644 pipelines/broad/arrays/imputation_beagle/ArrayImputationQuotaConsumed.changelog.md create mode 100644 pipelines/broad/arrays/imputation_beagle/ArrayImputationQuotaConsumed.wdl create mode 100644 pipelines/broad/arrays/imputation_beagle/ImputationBeagle.changelog.md create mode 100644 pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl create mode 100644 pipelines/broad/arrays/imputation_beagle/README.md create mode 100644 pipelines/broad/arrays/imputation_beagle/test_inputs/Plumbing/NA12878_x10_hg38_arrays.json create mode 100644 pipelines/broad/arrays/imputation_beagle/test_inputs/Scientific/NA12878_x10_hg38_arrays.json create mode 100644 structs/imputation/ImputationBeagleStructs.wdl create mode 100644 tasks/broad/ImputationBeagleTasks.wdl create mode 100644 verification/VerifyImputationBeagle.wdl create mode 100644 verification/test-wdls/TestImputationBeagle.wdl diff --git a/.dockstore.yml b/.dockstore.yml index 9ab1966238..08449df04d 100644 --- a/.dockstore.yml +++ b/.dockstore.yml @@ -83,6 +83,14 @@ workflows: subclass: WDL primaryDescriptorPath: /pipelines/broad/arrays/imputation/Imputation.wdl + - name: ImputationBeagle + subclass: WDL + primaryDescriptorPath: /pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl + + - name: ArrayImputationQuotaConsumed + subclass: WDL + primaryDescriptorPath: /pipelines/broad/arrays/imputation_beagle/ArrayImputationQuotaConsumed.wdl + - name: RNAWithUMIsPipeline subclass: WDL primaryDescriptorPath: /pipelines/broad/rna_seq/RNAWithUMIsPipeline.wdl @@ -155,6 +163,10 @@ workflows: subclass: WDL primaryDescriptorPath: /verification/test-wdls/TestImputation.wdl + - name: TestImputationBeagle + subclass: WDL + primaryDescriptorPath: /verification/test-wdls/TestImputationBeagle.wdl + - name: TestJointGenotyping subclass: WDL primaryDescriptorPath: /verification/test-wdls/TestJointGenotyping.wdl diff --git a/.github/workflows/test_imputation_beagle.yml b/.github/workflows/test_imputation_beagle.yml new file mode 100644 index 0000000000..f9a627f02f --- /dev/null +++ b/.github/workflows/test_imputation_beagle.yml @@ -0,0 +1,75 @@ +name: Test ImputationBeagle + +# Controls when the workflow will run +on: + pull_request: + branches: [ "develop", "staging", "master" ] + # Only run if files in these paths changed: + #################################### + # SET PIPELINE SPECIFIC PATHS HERE # + #################################### + paths: + - 'pipelines/broad/arrays/imputation_beagle/**' + - 'structs/imputation/ImputationBeagleStructs.wdl' + - 'tasks/broad/ImputationTasks.wdl' + - 'tasks/broad/ImputationBeagleTasks.wdl' + - 'verification/VerifyImputationBeagle.wdl' + - 'verification/test-wdls/TestImputationBeagle.wdl' + - 'tasks/broad/Utilities.wdl' + - 'tasks/broad/TerraCopyFilesFromCloudToCloud.wdl' + - '.github/workflows/test_imputation_beagle.yml' + - '.github/workflows/warp_test_workflow.yml' + + + # Allows you to run this workflow manually from the Actions tab + workflow_dispatch: + inputs: + useCallCache: + description: 'Use call cache (default: true)' + required: false + default: "true" + updateTruth: + description: 'Update truth files (default: false)' + required: false + default: "false" + testType: + description: 'Specify the type of test (Plumbing or Scientific)' + required: false + type: choice + options: + - Plumbing + - Scientific + truthBranch: + description: 'Specify the branch for truth files (default: master)' + required: false + default: "master" + +env: + # pipeline configuration + PIPELINE_NAME: TestImputationBeagle + DOCKSTORE_PIPELINE_NAME: ImputationBeagle + PIPELINE_DIR: "pipelines/broad/arrays/imputation_beagle" + + # workspace configuration + TESTING_WORKSPACE: WARP Tests + WORKSPACE_NAMESPACE: warp-pipelines + + # service account configuration + SA_JSON_B64: ${{ secrets.PDT_TESTER_SA_B64 }} + USER: pdt-tester@warp-pipeline-dev.iam.gserviceaccount.com + + +jobs: + TestImputationBeagle: + uses: ./.github/workflows/warp_test_workflow.yml + with: + pipeline_name: TestImputationBeagle + dockstore_pipeline_name: ImputationBeagle + pipeline_dir: pipelines/broad/arrays/imputation_beagle + use_call_cache: ${{ github.event.inputs.useCallCache || 'true' }} + update_truth: ${{ github.event.inputs.updateTruth || 'false' }} + test_type: ${{ github.event.inputs.testType }} + truth_branch: ${{ github.event.inputs.truthBranch || 'master' }} + secrets: + PDT_TESTER_SA_B64: ${{ secrets.PDT_TESTER_SA_B64 }} + DOCKSTORE_TOKEN: ${{ secrets.DOCKSTORE_TOKEN }} diff --git a/pipeline_versions.txt b/pipeline_versions.txt index 87e5e74d3b..bc8fa78585 100644 --- a/pipeline_versions.txt +++ b/pipeline_versions.txt @@ -1,7 +1,9 @@ Pipeline Name Version Date of Last Commit Arrays 2.6.30 2024-11-04 ValidateChip 1.16.7 2024-11-04 -Imputation 1.1.15 2024-11-04 +ArrayImputationQuotaConsumed 1.0.0 2025-02-24 +ImputationBeagle 1.0.0 2025-02-26 +Imputation 1.1.16 2025-02-24 MultiSampleArrays 1.6.2 2024-08-02 WholeGenomeReprocessing 3.3.3 2024-11-04 ExomeReprocessing 3.3.3 2024-11-04 @@ -9,7 +11,7 @@ CramToUnmappedBams 1.1.3 2024-08-02 ExternalWholeGenomeReprocessing 2.3.3 2024-11-04 ExternalExomeReprocessing 3.3.3 2024-11-04 BroadInternalArrays 1.1.14 2024-11-04 -BroadInternalImputation 1.1.14 2024-11-04 +BroadInternalImputation 1.1.15 2025-02-24 BroadInternalRNAWithUMIs 1.0.36 2024-11-04 BroadInternalUltimaGenomics 1.1.3 2024-12-05 RNAWithUMIsPipeline 1.0.18 2024-11-04 diff --git a/pipelines/broad/arrays/imputation/Imputation.changelog.md b/pipelines/broad/arrays/imputation/Imputation.changelog.md index 52765e4ec1..5030cf3f05 100644 --- a/pipelines/broad/arrays/imputation/Imputation.changelog.md +++ b/pipelines/broad/arrays/imputation/Imputation.changelog.md @@ -1,3 +1,8 @@ +# 1.1.16 +2025-02-24 (Date of Last Commit) + +* Updated runtime parameters in some ImputationTasks, and added an explicit definition of a vcf_index. + # 1.1.15 2024-11-04 (Date of Last Commit) diff --git a/pipelines/broad/arrays/imputation/Imputation.wdl b/pipelines/broad/arrays/imputation/Imputation.wdl index 4a44ba4ac5..3466169b64 100644 --- a/pipelines/broad/arrays/imputation/Imputation.wdl +++ b/pipelines/broad/arrays/imputation/Imputation.wdl @@ -6,7 +6,7 @@ import "../../../../tasks/broad/Utilities.wdl" as utils workflow Imputation { - String pipeline_version = "1.1.15" + String pipeline_version = "1.1.16" input { Int chunkLength = 25000000 @@ -242,6 +242,7 @@ workflow Imputation { call tasks.SelectVariantsByIds { input: vcf = SetIdsVcfToImpute.output_vcf, + vcf_index = SetIdsVcfToImpute.output_vcf_index, ids = FindSitesUniqueToFileTwoOnly.missing_sites, basename = "imputed_sites_to_recover" } diff --git a/pipelines/broad/arrays/imputation_beagle/ArrayImputationQuotaConsumed.changelog.md b/pipelines/broad/arrays/imputation_beagle/ArrayImputationQuotaConsumed.changelog.md new file mode 100644 index 0000000000..978888b711 --- /dev/null +++ b/pipelines/broad/arrays/imputation_beagle/ArrayImputationQuotaConsumed.changelog.md @@ -0,0 +1,4 @@ +# 1.0.0 +2025-02-24 (Date of Last Commit) + +* Initial release of pipeline to calculate the number of samples, i.e. quota used by an imputation service that uses ImputationBeagle.wdl. diff --git a/pipelines/broad/arrays/imputation_beagle/ArrayImputationQuotaConsumed.wdl b/pipelines/broad/arrays/imputation_beagle/ArrayImputationQuotaConsumed.wdl new file mode 100644 index 0000000000..a4cd6e8d09 --- /dev/null +++ b/pipelines/broad/arrays/imputation_beagle/ArrayImputationQuotaConsumed.wdl @@ -0,0 +1,29 @@ +version 1.0 + +import "../../../../tasks/broad/ImputationTasks.wdl" as tasks + +workflow QuotaConsumed { + String pipeline_version = "1.0.0" + + input { + Int chunkLength = 25000000 + Int chunkOverlaps = 5000000 + + File multi_sample_vcf + + File ref_dict + Array[String] contigs + String reference_panel_path_prefix + String genetic_maps_path + String output_basename + } + + call tasks.CountSamples { + input: + vcf = multi_sample_vcf + } + + output { + Int quota_consumed = CountSamples.nSamples + } +} diff --git a/pipelines/broad/arrays/imputation_beagle/ImputationBeagle.changelog.md b/pipelines/broad/arrays/imputation_beagle/ImputationBeagle.changelog.md new file mode 100644 index 0000000000..ddc7604697 --- /dev/null +++ b/pipelines/broad/arrays/imputation_beagle/ImputationBeagle.changelog.md @@ -0,0 +1,5 @@ +# 1.0.0 +2025-02-26 (Date of Last Commit) + +* Initial public release of the ImputationBeagle pipeline. + * The ImputationBeagle pipeline imputes missing genotypes from a multi-sample VCF using a large genomic reference panel. It is based on the Michigan Imputation Server pipeline but uses the Beagle imputation tool instead of minimac. Overall, the pipeline filters, phases, and performs imputation on a multi-sample VCF. It outputs the imputed VCF. diff --git a/pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl b/pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl new file mode 100644 index 0000000000..64d058b965 --- /dev/null +++ b/pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl @@ -0,0 +1,226 @@ +version 1.0 + +import "../../../../structs/imputation/ImputationBeagleStructs.wdl" as structs +import "../../../../tasks/broad/ImputationTasks.wdl" as tasks +import "../../../../tasks/broad/ImputationBeagleTasks.wdl" as beagleTasks + +workflow ImputationBeagle { + + String pipeline_version = "1.0.0" + + input { + Int chunkLength = 25000000 + Int chunkOverlaps = 2000000 # this is the padding that will be added to the beginning and end of each chunk to reduce edge effects + + File multi_sample_vcf + + File ref_dict # for reheadering / adding contig lengths in the header of the ouptut VCF, and calculating contig lengths + Array[String] contigs + String reference_panel_path_prefix # path + file prefix to the bucket where the reference panel files are stored for all contigs + String genetic_maps_path # path to the bucket where genetic maps are stored for all contigs + String output_basename # the basename for intermediate and output files + + # file extensions used to find reference panel files + String bed_suffix = ".bed" + String bref3_suffix = ".bref3" + + String gatk_docker = "broadinstitute/gatk:4.6.0.0" + String ubuntu_docker = "ubuntu:20.04" + + Int? error_count_override + } + + call tasks.CountSamples { + input: + vcf = multi_sample_vcf + } + + call beagleTasks.CreateVcfIndex { + input: + vcf_input = multi_sample_vcf, + gatk_docker = gatk_docker + } + + Float chunkLengthFloat = chunkLength + + scatter (contig in contigs) { + # these are specific to hg38 - contig is format 'chr1' + String reference_basename = reference_panel_path_prefix + "." + contig + String genetic_map_filename = genetic_maps_path + "plink." + contig + ".GRCh38.withchr.map" + + ReferencePanelContig referencePanelContig = { + "bed": reference_basename + bed_suffix, + "bref3": reference_basename + bref3_suffix, + "contig": contig, + "genetic_map": genetic_map_filename + } + + + call tasks.CalculateChromosomeLength { + input: + ref_dict = ref_dict, + chrom = referencePanelContig.contig, + ubuntu_docker = ubuntu_docker + } + + Int num_chunks = ceil(CalculateChromosomeLength.chrom_length / chunkLengthFloat) + + scatter (i in range(num_chunks)) { + String chunk_contig = referencePanelContig.contig + + Int start = (i * chunkLength) + 1 + Int startWithOverlaps = if (start - chunkOverlaps < 1) then 1 else start - chunkOverlaps + Int end = if (CalculateChromosomeLength.chrom_length < ((i + 1) * chunkLength)) then CalculateChromosomeLength.chrom_length else ((i + 1) * chunkLength) + Int endWithOverlaps = if (CalculateChromosomeLength.chrom_length < end + chunkOverlaps) then CalculateChromosomeLength.chrom_length else end + chunkOverlaps + String chunk_basename = referencePanelContig.contig + "_chunk_" + i + + # generate the chunked vcf file that will be used for imputation, including overlaps + call tasks.GenerateChunk { + input: + vcf = CreateVcfIndex.vcf, + vcf_index = CreateVcfIndex.vcf_index, + start = startWithOverlaps, + end = endWithOverlaps, + chrom = referencePanelContig.contig, + basename = chunk_basename, + gatk_docker = gatk_docker + } + + call beagleTasks.CountVariantsInChunks { + input: + vcf = GenerateChunk.output_vcf, + vcf_index = GenerateChunk.output_vcf_index, + panel_bed_file = referencePanelContig.bed, + gatk_docker = gatk_docker + } + + call beagleTasks.CheckChunks { + input: + var_in_original = CountVariantsInChunks.var_in_original, + var_also_in_reference = CountVariantsInChunks.var_also_in_reference + } + } + + Array[File] chunkedVcfsWithOverlapsForImputation = GenerateChunk.output_vcf + + call tasks.StoreChunksInfo as StoreContigLevelChunksInfo { + input: + chroms = chunk_contig, + starts = start, + ends = end, + vars_in_array = CountVariantsInChunks.var_in_original, + vars_in_panel = CountVariantsInChunks.var_also_in_reference, + valids = CheckChunks.valid, + basename = output_basename + } + + # if any chunk for any chromosome fail CheckChunks, then we will not impute run any task in the next scatter, + # namely phasing and imputing which would be the most costly to throw away + Int n_failed_chunks_int = select_first([error_count_override, read_int(StoreContigLevelChunksInfo.n_failed_chunks)]) + call beagleTasks.ErrorWithMessageIfErrorCountNotZero as FailQCNChunks { + input: + errorCount = n_failed_chunks_int, + message = "contig " + referencePanelContig.contig + " had " + n_failed_chunks_int + " failing chunks" + } + + scatter (i in range(num_chunks)) { + String chunk_basename_imputed = referencePanelContig.contig + "_chunk_" + i + "_imputed" + + # max amount of cpus you can ask for is 96 so at a max of 10k samples we can only ask for 9 cpu a sample. + # these values are based on trying to optimize for pre-emptibility using a 400k sample reference panel + # and up to a 10k sample input vcf + Int beagle_cpu = if (CountSamples.nSamples <= 1000) then 8 else floor(CountSamples.nSamples / 1000) * 9 + Int beagle_phase_memory_in_gb = if (CountSamples.nSamples <= 1000) then 22 else ceil(beagle_cpu * 1.5) + Int beagle_impute_memory_in_gb = if (CountSamples.nSamples <= 1000) then 30 else ceil(beagle_cpu * 4.3) + + call beagleTasks.Phase { + input: + dataset_vcf = chunkedVcfsWithOverlapsForImputation[i], + ref_panel_bref3 = referencePanelContig.bref3, + chrom = referencePanelContig.contig, + basename = chunk_basename_imputed, + genetic_map_file = referencePanelContig.genetic_map, + start = start[i], + end = end[i], + cpu = beagle_cpu, + memory_mb = beagle_phase_memory_in_gb * 1024, + for_dependency = FailQCNChunks.done + } + + call beagleTasks.Impute { + input: + dataset_vcf = Phase.vcf, + ref_panel_bref3 = referencePanelContig.bref3, + chrom = referencePanelContig.contig, + basename = chunk_basename_imputed, + genetic_map_file = referencePanelContig.genetic_map, + start = start[i], + end = end[i], + cpu = beagle_cpu, + memory_mb = beagle_impute_memory_in_gb * 1024 + } + + call beagleTasks.CreateVcfIndex as IndexImputedBeagle { + input: + vcf_input = Impute.vcf, + gatk_docker = gatk_docker + } + + call tasks.UpdateHeader { + input: + vcf = IndexImputedBeagle.vcf, + vcf_index = IndexImputedBeagle.vcf_index, + ref_dict = ref_dict, + basename = chunk_basename_imputed, + disable_sequence_dictionary_validation = false, + gatk_docker = gatk_docker + } + + call tasks.SeparateMultiallelics { + input: + original_vcf = UpdateHeader.output_vcf, + original_vcf_index = UpdateHeader.output_vcf_index, + output_basename = chunk_basename_imputed + } + + call tasks.RemoveSymbolicAlleles { + input: + original_vcf = SeparateMultiallelics.output_vcf, + original_vcf_index = SeparateMultiallelics.output_vcf_index, + output_basename = chunk_basename_imputed, + gatk_docker = gatk_docker + } + } + + Array[File] chromosome_vcfs = select_all(RemoveSymbolicAlleles.output_vcf) + } + + call tasks.GatherVcfs { + input: + input_vcfs = flatten(chromosome_vcfs), + output_vcf_basename = output_basename + ".imputed", + gatk_docker = gatk_docker + } + + call tasks.StoreChunksInfo { + input: + chroms = flatten(chunk_contig), + starts = flatten(start), + ends = flatten(end), + vars_in_array = flatten(CountVariantsInChunks.var_in_original), + vars_in_panel = flatten(CountVariantsInChunks.var_also_in_reference), + valids = flatten(CheckChunks.valid), + basename = output_basename + } + + output { + File imputed_multi_sample_vcf = GatherVcfs.output_vcf + File imputed_multi_sample_vcf_index = GatherVcfs.output_vcf_index + File chunks_info = StoreChunksInfo.chunks_info + } + + meta { + allowNestedInputs: true + } + +} diff --git a/pipelines/broad/arrays/imputation_beagle/README.md b/pipelines/broad/arrays/imputation_beagle/README.md new file mode 100644 index 0000000000..754e416b5a --- /dev/null +++ b/pipelines/broad/arrays/imputation_beagle/README.md @@ -0,0 +1,7 @@ +### ImputationBeagle summary + +The ImputationBeagle pipeline imputes missing genotypes from a multi-sample VCF using the [Beagle imputation tool](https://faculty.washington.edu/browning/beagle/beagle.html) and a large genomic reference panel. Overall, the pipeline filters, phases, and performs imputation on a multi-sample VCF. This pipeline was created for use by the All of Us/AnVIL Imputation Service. + +### ArrayImputationQuotaConsumed summary + +The ArrayImputationQuotaConsumed pipeline is used by the All of Us/AnVIL Imputation Service and calculates the number of samples in the input multi-sample VCF, which is the metric used by the service for ImputationBeagle pipeline quota. diff --git a/pipelines/broad/arrays/imputation_beagle/test_inputs/Plumbing/NA12878_x10_hg38_arrays.json b/pipelines/broad/arrays/imputation_beagle/test_inputs/Plumbing/NA12878_x10_hg38_arrays.json new file mode 100644 index 0000000000..bdf5a00597 --- /dev/null +++ b/pipelines/broad/arrays/imputation_beagle/test_inputs/Plumbing/NA12878_x10_hg38_arrays.json @@ -0,0 +1,8 @@ +{ + "ImputationBeagle.multi_sample_vcf": "gs://broad-gotc-test-storage/imputation_beagle/scientific/vcfs/NA12878_10_duplicate.merged.cleaned.vcf.gz", + "ImputationBeagle.ref_dict": "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict", + "ImputationBeagle.reference_panel_path_prefix": "gs://broad-gotc-test-storage/imputation_beagle/scientific/1000G_HGDP_no_singletons_reference_panel/hgdp.tgp.gwaspy.AN_added.bcf.ac2", + "ImputationBeagle.contigs": ["chr21","chr22"], + "ImputationBeagle.genetic_maps_path": "gs://broad-gotc-test-storage/imputation_beagle/scientific/plink-genetic-maps/", + "ImputationBeagle.output_basename": "plumbing_test" +} diff --git a/pipelines/broad/arrays/imputation_beagle/test_inputs/Scientific/NA12878_x10_hg38_arrays.json b/pipelines/broad/arrays/imputation_beagle/test_inputs/Scientific/NA12878_x10_hg38_arrays.json new file mode 100644 index 0000000000..4263609e29 --- /dev/null +++ b/pipelines/broad/arrays/imputation_beagle/test_inputs/Scientific/NA12878_x10_hg38_arrays.json @@ -0,0 +1,8 @@ +{ + "ImputationBeagle.multi_sample_vcf": "gs://broad-gotc-test-storage/imputation_beagle/scientific/vcfs/NA12878_10_duplicate.merged.cleaned.vcf.gz", + "ImputationBeagle.ref_dict": "gs://gcp-public-data--broad-references/hg38/v0/Homo_sapiens_assembly38.dict", + "ImputationBeagle.reference_panel_path_prefix": "gs://broad-gotc-test-storage/imputation_beagle/scientific/1000G_HGDP_no_singletons_reference_panel/hgdp.tgp.gwaspy.AN_added.bcf.ac2", + "ImputationBeagle.contigs": ["chr1","chr2","chr3","chr4","chr5","chr6","chr7","chr8","chr9","chr10","chr11","chr12","chr13","chr14","chr15","chr16","chr17","chr18","chr19","chr20","chr21","chr22"], + "ImputationBeagle.genetic_maps_path": "gs://broad-gotc-test-storage/imputation_beagle/scientific/plink-genetic-maps/", + "ImputationBeagle.output_basename": "scientific_test" +} diff --git a/pipelines/broad/internal/arrays/imputation/BroadInternalImputation.changelog.md b/pipelines/broad/internal/arrays/imputation/BroadInternalImputation.changelog.md index a0930046d7..e4f328a7fb 100644 --- a/pipelines/broad/internal/arrays/imputation/BroadInternalImputation.changelog.md +++ b/pipelines/broad/internal/arrays/imputation/BroadInternalImputation.changelog.md @@ -1,3 +1,8 @@ +# 1.1.15 +2025-02-24 (Date of Last Commit) + +* Updated runtime parameters in some ImputationTasks, and added an explicit definition of a vcf_index. + # 1.1.14 2024-11-04 (Date of Last Commit) diff --git a/pipelines/broad/internal/arrays/imputation/BroadInternalImputation.wdl b/pipelines/broad/internal/arrays/imputation/BroadInternalImputation.wdl index 525ce85e00..27e16fa28e 100644 --- a/pipelines/broad/internal/arrays/imputation/BroadInternalImputation.wdl +++ b/pipelines/broad/internal/arrays/imputation/BroadInternalImputation.wdl @@ -9,7 +9,7 @@ workflow BroadInternalImputation { description: "Push outputs of Imputation.wdl to TDR dataset table ImputationOutputsTable and split out Imputation arrays into ImputationWideOutputsTable." allowNestedInputs: true } - String pipeline_version = "1.1.14" + String pipeline_version = "1.1.15" input { # inputs to wrapper task diff --git a/scripts/firecloud_api/firecloud_api.py b/scripts/firecloud_api/firecloud_api.py index c0b43aae6f..6679f3ebc1 100644 --- a/scripts/firecloud_api/firecloud_api.py +++ b/scripts/firecloud_api/firecloud_api.py @@ -681,6 +681,7 @@ def main(self): print(json.dumps(workflow_status_map)) # Output the dictionary as a JSON string for bash parsing else: print("No workflows found or an error occurred.") + elif args.action == "create_new_method_config": # Check for required arguments for create_new_method_config action if not args.pipeline_name or not args.branch_name: diff --git a/structs/imputation/ImputationBeagleStructs.wdl b/structs/imputation/ImputationBeagleStructs.wdl new file mode 100644 index 0000000000..6aaaaa061a --- /dev/null +++ b/structs/imputation/ImputationBeagleStructs.wdl @@ -0,0 +1,8 @@ +version 1.0 + +struct ReferencePanelContig { + File bed + File bref3 + String contig + File genetic_map +} diff --git a/tasks/broad/ImputationBeagleTasks.wdl b/tasks/broad/ImputationBeagleTasks.wdl new file mode 100644 index 0000000000..af4363e600 --- /dev/null +++ b/tasks/broad/ImputationBeagleTasks.wdl @@ -0,0 +1,217 @@ +version 1.0 + +task CountVariantsInChunks { + input { + File vcf + File vcf_index + File panel_bed_file + + String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.5.0.0" + Int cpu = 1 + Int memory_mb = 16000 + Int disk_size_gb = 2 * ceil(size([vcf, vcf_index, panel_bed_file], "GiB")) + 10 + } + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 + + command <<< + set -e -o pipefail + + ln -sf ~{vcf} input.vcf.gz + ln -sf ~{vcf_index} input.vcf.gz.tbi + + gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" CountVariants -V input.vcf.gz | tail -n 1 > var_in_original + bedtools intersect -a ~{vcf} -b ~{panel_bed_file} | wc -l > var_also_in_reference + >>> + + output { + Int var_in_original = read_int("var_in_original") + Int var_also_in_reference = read_int("var_also_in_reference") + } + runtime { + docker: gatk_docker + disks: "local-disk ${disk_size_gb} HDD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: 3 + } +} + +task CheckChunks { + input { + Int var_in_original + Int var_also_in_reference + + String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" + Int cpu = 1 + Int memory_mb = 4000 + } + command <<< + set -e -o pipefail + + if [ $(( ~{var_also_in_reference} * 2 - ~{var_in_original})) -gt 0 ] && [ ~{var_also_in_reference} -gt 3 ]; then + echo true > valid_file.txt + else + echo false > valid_file.txt + fi + >>> + output { + Boolean valid = read_boolean("valid_file.txt") + } + runtime { + docker: bcftools_docker + disks: "local-disk 10 HDD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: 3 + } +} + +task Phase { + input { + File dataset_vcf + File ref_panel_bref3 + File genetic_map_file + String basename + String chrom + Int start + Int end + + String beagle_docker = "us.gcr.io/broad-gotc-prod/imputation-beagle:1.0.0-17Dec24.224-1740423035" + Int cpu = 8 # This parameter is used as the nthreads input to Beagle which is part of how we make it determinstic. Changing this value may change the output generated by the tool + Int memory_mb = 32000 # value depends on chunk size, the number of samples in ref and target panel, and whether imputation is performed + Int xmx_mb = memory_mb - 5000 # I suggest setting this parameter to be 85-90% of the memory_mb parameter + Int disk_size_gb = ceil(3 * size([dataset_vcf, ref_panel_bref3], "GiB")) + 10 # value may need to be adjusted + + Boolean for_dependency # used for task dependency management + } + + command <<< + set -e -o pipefail + + java -ea -Xmx~{xmx_mb}m \ + -jar /usr/gitc/beagle.17Dec24.224.jar \ + gt=~{dataset_vcf} \ + ref=~{ref_panel_bref3} \ + map=~{genetic_map_file} \ + out=phased_~{basename} \ + chrom=~{chrom}:~{start}-~{end} \ + impute=false \ + nthreads=~{cpu} \ + seed=-99999 + + >>> + output { + File vcf = "phased_~{basename}.vcf.gz" + File log = "phased_~{basename}.log" + } + runtime { + docker: beagle_docker + disks: "local-disk ${disk_size_gb} HDD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: 3 + } +} + +task Impute { + input { + File dataset_vcf + File ref_panel_bref3 + File genetic_map_file + String basename + String chrom + Int start + Int end + + String beagle_docker = "us.gcr.io/broad-gotc-prod/imputation-beagle:1.0.0-17Dec24.224-1740423035" + Int cpu = 8 # This parameter is used as the nthreads input to Beagle which is part of how we make it determinstic. Changing this value may change the output generated by the tool + Int memory_mb = 32000 # value depends on chunk size, the number of samples in ref and target panel, and whether imputation is performed + Int xmx_mb = memory_mb - 5000 # I suggest setting this parameter to be 85-90% of the memory_mb parameter + Int disk_size_gb = ceil(3 * size([dataset_vcf, ref_panel_bref3], "GiB")) + 10 # value may need to be adjusted + } + + command <<< + set -e -o pipefail + + java -ea -Xmx~{xmx_mb}m \ + -jar /usr/gitc/beagle.17Dec24.224.jar \ + gt=~{dataset_vcf} \ + ref=~{ref_panel_bref3} \ + map=~{genetic_map_file} \ + out=imputed_~{basename} \ + chrom=~{chrom}:~{start}-~{end} \ + impute=true \ + nthreads=~{cpu} \ + seed=-99999 + + >>> + output { + File vcf = "imputed_~{basename}.vcf.gz" + File log = "imputed_~{basename}.log" + } + runtime { + docker: beagle_docker + disks: "local-disk ${disk_size_gb} HDD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: 3 + } +} + +task ErrorWithMessageIfErrorCountNotZero { + input { + Int errorCount + String message + } + command <<< + if [[ ~{errorCount} -gt 0 ]]; then + >&2 echo "Error: ~{message}" + exit 1 + else + exit 0 + fi + >>> + + runtime { + docker: "ubuntu:20.04" + preemptible: 3 + } + output { + Boolean done = true + } +} + +task CreateVcfIndex { + input { + File vcf_input + + Int disk_size_gb = ceil(1.2*size(vcf_input, "GiB")) + 10 + Int cpu = 1 + Int memory_mb = 6000 + String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.5.0.0" + } + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 + + String vcf_basename = basename(vcf_input) + + command { + set -e -o pipefail + + ln -sf ~{vcf_input} ~{vcf_basename} + + bcftools index -t ~{vcf_basename} + } + runtime { + docker: gatk_docker + disks: "local-disk ${disk_size_gb} HDD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: 3 + } + output { + File vcf = "~{vcf_basename}" + File vcf_index = "~{vcf_basename}.tbi" + } +} diff --git a/tasks/broad/ImputationTasks.wdl b/tasks/broad/ImputationTasks.wdl index 793ae119b2..1a89954535 100644 --- a/tasks/broad/ImputationTasks.wdl +++ b/tasks/broad/ImputationTasks.wdl @@ -12,6 +12,8 @@ task CalculateChromosomeLength { } command { + set -e -o pipefail + grep -P "SN:~{chrom}\t" ~{ref_dict} | sed 's/.*LN://' | sed 's/\t.*//' } runtime { @@ -19,6 +21,7 @@ task CalculateChromosomeLength { disks: "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } output { Int chrom_length = read_int(stdout()) @@ -37,6 +40,8 @@ task GetMissingContigList { } command <<< + set -e -o pipefail + grep "@SQ" ~{ref_dict} | sed 's/.*SN://' | sed 's/\t.*//' > contigs.txt awk 'NR==FNR{arr[$0];next} !($0 in arr)' ~{included_contigs} contigs.txt > missing_contigs.txt >>> @@ -62,13 +67,13 @@ task GenerateChunk { File vcf File vcf_index - Int disk_size_gb = ceil(2*size(vcf, "GiB")) + 50 # not sure how big the disk size needs to be since we aren't downloading the entire VCF here + Int disk_size_gb = ceil(2*size(vcf, "GiB")) + 10 Int cpu = 1 Int memory_mb = 8000 String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.6.1.0" } - Int command_mem = memory_mb - 1000 - Int max_heap = memory_mb - 500 + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 command { gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ @@ -114,17 +119,17 @@ task CountVariantsInChunks { String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.6.1.0" Int cpu = 1 - Int memory_mb = 4000 + Int memory_mb = 6000 Int disk_size_gb = 2 * ceil(size([vcf, vcf_index, panel_vcf, panel_vcf_index], "GiB")) + 20 } - Int command_mem = memory_mb - 1000 - Int max_heap = memory_mb - 500 + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 command <<< set -e -o pipefail echo $(gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" CountVariants -V ~{vcf} | sed 's/Tool returned://') > var_in_original - echo $(gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" CountVariants -V ~{vcf} -L ~{panel_vcf} | sed 's/Tool returned://') > var_in_reference + echo $(gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" CountVariants -V ~{vcf} -L ~{panel_vcf} | sed 's/Tool returned://') > var_in_reference >>> output { Int var_in_original = read_int("var_in_original") @@ -147,7 +152,7 @@ task CheckChunks { Int var_in_original Int var_in_reference - Int disk_size_gb = ceil(2*size([vcf, vcf_index, panel_vcf, panel_vcf_index], "GiB")) + Int disk_size_gb = ceil(2*size([vcf, vcf_index, panel_vcf, panel_vcf_index], "GiB")) + 10 String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" Int cpu = 1 Int memory_mb = 4000 @@ -191,7 +196,7 @@ task PhaseVariantsEagle { String eagle_docker = "us.gcr.io/broad-gotc-prod/imputation-eagle:1.0.0-2.4-1690199702" Int cpu = 8 Int memory_mb = 32000 - Int disk_size_gb = ceil(3 * size([dataset_bcf, reference_panel_bcf, dataset_bcf_index, reference_panel_bcf_index], "GiB")) + 50 + Int disk_size_gb = ceil(3 * size([dataset_bcf, reference_panel_bcf, dataset_bcf_index, reference_panel_bcf_index], "GiB")) + 10 } command <<< /usr/gitc/eagle \ @@ -269,10 +274,10 @@ task GatherVcfs { String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.6.1.0" Int cpu = 1 Int memory_mb = 16000 - Int disk_size_gb = ceil(3*size(input_vcfs, "GiB")) + Int disk_size_gb = ceil(3*size(input_vcfs, "GiB")) + 10 } - Int command_mem = memory_mb - 1000 - Int max_heap = memory_mb - 500 + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 command <<< set -e -o pipefail @@ -285,7 +290,6 @@ task GatherVcfs { gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ IndexFeatureFile -I ~{output_vcf_basename}.vcf.gz - >>> runtime { docker: gatk_docker @@ -304,6 +308,8 @@ task ReplaceHeader { File vcf_to_replace_header File vcf_with_new_header + Int cpu = 1 + Int memory_mb = 6000 String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" } @@ -321,6 +327,9 @@ task ReplaceHeader { runtime { docker: bcftools_docker disks: "local-disk ${disk_size_gb} HDD" + memory: "${memory_mb} MiB" + cpu: cpu + preemptible: 3 } output { @@ -334,30 +343,32 @@ task UpdateHeader { File vcf_index File ref_dict String basename + Boolean disable_sequence_dictionary_validation = true Int disk_size_gb = ceil(4*(size(vcf, "GiB") + size(vcf_index, "GiB"))) + 20 String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.6.1.0" Int cpu = 1 - Int memory_mb = 8000 + Int memory_mb = 6000 } - Int command_mem = memory_mb - 1000 - Int max_heap = memory_mb - 500 + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 + String disable_sequence_dict_validation_flag = if disable_sequence_dictionary_validation then "--disable-sequence-dictionary-validation" else "" command <<< - ## update the header of the merged vcf gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ UpdateVCFSequenceDictionary \ --source-dictionary ~{ref_dict} \ --output ~{basename}.vcf.gz \ --replace -V ~{vcf} \ - --disable-sequence-dictionary-validation + ~{disable_sequence_dict_validation_flag} >>> runtime { docker: gatk_docker disks: "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } output { File output_vcf = "~{basename}.vcf.gz" @@ -376,8 +387,8 @@ task RemoveSymbolicAlleles { Int cpu = 1 Int memory_mb = 4000 } - Int command_mem = memory_mb - 1000 - Int max_heap = memory_mb - 500 + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 command { gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ @@ -392,6 +403,7 @@ task RemoveSymbolicAlleles { disks: "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } } @@ -401,7 +413,7 @@ task SeparateMultiallelics { File original_vcf_index String output_basename - Int disk_size_gb = ceil(2*(size(original_vcf, "GiB") + size(original_vcf_index, "GiB"))) + Int disk_size_gb = ceil(2*(size(original_vcf, "GiB") + size(original_vcf_index, "GiB"))) + 10 String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" Int cpu = 1 Int memory_mb = 4000 @@ -421,6 +433,7 @@ task SeparateMultiallelics { disks: "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } } @@ -435,7 +448,7 @@ task OptionalQCSites { String bcftools_vcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" Int cpu = 1 Int memory_mb = 16000 - Int disk_size_gb = ceil(2*(size(input_vcf, "GiB") + size(input_vcf_index, "GiB"))) + Int disk_size_gb = ceil(2*(size(input_vcf, "GiB") + size(input_vcf_index, "GiB"))) + 10 } Float max_missing = select_first([optional_qc_max_missing, 0.05]) @@ -443,8 +456,11 @@ task OptionalQCSites { command <<< set -e -o pipefail + ln -sf ~{input_vcf} input.vcf.gz + ln -sf ~{input_vcf_index} input.vcf.gz.tbi + # site missing rate < 5% ; hwe p > 1e-6 - vcftools --gzvcf ~{input_vcf} --max-missing ~{max_missing} --hwe ~{hwe} --recode -c | bgzip -c > ~{output_vcf_basename}.vcf.gz + vcftools --gzvcf input.vcf.gz --max-missing ~{max_missing} --hwe ~{hwe} --recode -c | bgzip -c > ~{output_vcf_basename}.vcf.gz bcftools index -t ~{output_vcf_basename}.vcf.gz # Note: this is necessary because vcftools doesn't have a way to output a zipped vcf, nor a way to index one (hence needing to use bcf). >>> runtime { @@ -472,6 +488,7 @@ task MergeSingleSampleVcfs { } command <<< set -e -o pipefail + # Move the index file next to the vcf with the corresponding name declare -a VCFS=(~{sep=' ' input_vcfs}) @@ -507,9 +524,12 @@ task CountSamples { String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" Int cpu = 1 Int memory_mb = 3000 - Int disk_size_gb = 100 + ceil(size(vcf, "GiB")) + Int disk_size_gb = ceil(size(vcf, "GiB")) + 10 } + command <<< + set -e -o pipefail + bcftools query -l ~{vcf} | wc -l >>> runtime { @@ -517,6 +537,7 @@ task CountSamples { disks: "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } output { Int nSamples = read_int(stdout()) @@ -532,7 +553,7 @@ task AggregateImputationQCMetrics { String rtidyverse_docker = "rocker/tidyverse:4.1.0" Int cpu = 1 Int memory_mb = 2000 - Int disk_size_gb = 100 + ceil(size(infoFile, "GiB")) + Int disk_size_gb = ceil(size(infoFile, "GiB")) + 10 } command <<< Rscript -<< "EOF" @@ -560,7 +581,7 @@ task AggregateImputationQCMetrics { disks : "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu - preemptible : 3 + preemptible: 3 } output { File aggregated_metrics = "~{basename}_aggregated_imputation_metrics.tsv" @@ -600,7 +621,7 @@ task StoreChunksInfo { disks : "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu - preemptible : 3 + preemptible: 3 } output { File chunks_info = "~{basename}_chunk_info.tsv" @@ -617,7 +638,7 @@ task MergeImputationQCMetrics { String rtidyverse_docker = "rocker/tidyverse:4.1.0" Int cpu = 1 Int memory_mb = 2000 - Int disk_size_gb = 100 + ceil(size(metrics, "GiB")) + Int disk_size_gb = ceil(size(metrics, "GiB")) + 10 } command <<< Rscript -<< "EOF" @@ -638,7 +659,7 @@ task MergeImputationQCMetrics { disks : "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu - preemptible : 3 + preemptible: 3 } output { File aggregated_metrics = "~{basename}_aggregated_imputation_metrics.tsv" @@ -656,13 +677,13 @@ task SubsetVcfToRegion { Int end Boolean exclude_filtered = false - Int disk_size_gb = ceil(2*size(vcf, "GiB")) + 50 # not sure how big the disk size needs to be since we aren't downloading the entire VCF here + Int disk_size_gb = ceil(2*size(vcf, "GiB")) + 10 Int cpu = 1 Int memory_mb = 8000 String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.6.1.0" } - Int command_mem = memory_mb - 1000 - Int max_heap = memory_mb - 500 + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 command { gatk --java-options "-Xms~{command_mem}m -Xmx~{max_heap}m" \ @@ -705,10 +726,11 @@ task SetIDs { String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" Int cpu = 1 Int memory_mb = 4000 - Int disk_size_gb = 100 + ceil(2.2 * size(vcf, "GiB")) + Int disk_size_gb = ceil(2.2 * size(vcf, "GiB")) + 10 } command <<< set -e -o pipefail + bcftools annotate ~{vcf} --set-id '%CHROM\:%POS\:%REF\:%FIRST_ALT' -Oz -o ~{output_basename}.vcf.gz bcftools index -t ~{output_basename}.vcf.gz >>> @@ -717,6 +739,7 @@ task SetIDs { disks: "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } output { File output_vcf = "~{output_basename}.vcf.gz" @@ -729,7 +752,7 @@ task ExtractIDs { File vcf String output_basename - Int disk_size_gb = 2*ceil(size(vcf, "GiB")) + 100 + Int disk_size_gb = 2*ceil(size(vcf, "GiB")) + 10 String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" Int cpu = 1 Int memory_mb = 4000 @@ -745,28 +768,34 @@ task ExtractIDs { disks: "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } } task SelectVariantsByIds { input { File vcf + File vcf_index File ids String basename String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.6.1.0" Int cpu = 1 Int memory_mb = 16000 - Int disk_size_gb = ceil(1.2*size(vcf, "GiB")) + 100 + Int disk_size_gb = ceil(1.2*size(vcf, "GiB")) + 10 } parameter_meta { vcf: { description: "vcf", localization_optional: true } + vcf_index: { + description: "vcf", + localization_optional: true + } } - Int command_mem = memory_mb - 1000 - Int max_heap = memory_mb - 500 + Int command_mem = memory_mb - 2000 + Int max_heap = memory_mb - 1500 command <<< set -e -o pipefail @@ -780,6 +809,7 @@ task SelectVariantsByIds { disks: "local-disk ${disk_size_gb} SSD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } output { File output_vcf = "~{basename}.vcf.gz" @@ -795,7 +825,7 @@ task RemoveAnnotations { String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" Int cpu = 1 Int memory_mb = 3000 - Int disk_size_gb = ceil(2.2*size(vcf, "GiB")) + 100 + Int disk_size_gb = ceil(2.2*size(vcf, "GiB")) + 10 } command <<< set -e -o pipefail @@ -808,6 +838,7 @@ task RemoveAnnotations { disks: "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } output { File output_vcf = "~{basename}.vcf.gz" @@ -823,10 +854,10 @@ task InterleaveVariants { String gatk_docker = "us.gcr.io/broad-gatk/gatk:4.6.1.0" Int cpu = 1 Int memory_mb = 16000 - Int disk_size_gb = ceil(3.2*size(vcfs, "GiB")) + 100 + Int disk_size_gb = ceil(3.2*size(vcfs, "GiB")) + 10 } - Int command_mem = memory_mb - 1000 - Int max_heap = memory_mb - 500 + Int command_mem = memory_mb - 1500 + Int max_heap = memory_mb - 1000 command <<< set -e -o pipefail @@ -839,6 +870,7 @@ task InterleaveVariants { disks: "local-disk ${disk_size_gb} SSD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } output { File output_vcf = "~{basename}.vcf.gz" @@ -854,9 +886,11 @@ task FindSitesUniqueToFileTwoOnly { String ubuntu_docker = "ubuntu:20.04" Int cpu = 1 Int memory_mb = 4000 - Int disk_size_gb = ceil(size(file1, "GiB") + 2*size(file2, "GiB")) + 100 + Int disk_size_gb = ceil(size(file1, "GiB") + 2*size(file2, "GiB")) + 10 } command <<< + set -e -o pipefail + comm -13 <(sort ~{file1} | uniq) <(sort ~{file2} | uniq) > missing_sites.ids >>> runtime { @@ -864,6 +898,7 @@ task FindSitesUniqueToFileTwoOnly { disks: "local-disk ${disk_size_gb} HDD" memory: "${memory_mb} MiB" cpu: cpu + preemptible: 3 } output { File missing_sites = "missing_sites.ids" @@ -877,10 +912,10 @@ task SplitMultiSampleVcf { String bcftools_docker = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" Int cpu = 1 - Int memory_mb = 8000 + Int memory_mb = 6000 # This calculation is explained in https://github.com/broadinstitute/warp/pull/937 - Int disk_size_gb = ceil(21*nSamples*size(multiSampleVcf, "GiB")/(nSamples+20)) + 100 + Int disk_size_gb = ceil(21*nSamples*size(multiSampleVcf, "GiB")/(nSamples+20)) + 10 } command <<< set -e -o pipefail @@ -901,4 +936,4 @@ task SplitMultiSampleVcf { Array[File] single_sample_vcfs = glob("out_dir/*.vcf.gz") Array[File] single_sample_vcf_indices = glob("out_dir/*.vcf.gz.tbi") } -} \ No newline at end of file +} diff --git a/verification/VerifyImputationBeagle.wdl b/verification/VerifyImputationBeagle.wdl new file mode 100644 index 0000000000..e99f1767c1 --- /dev/null +++ b/verification/VerifyImputationBeagle.wdl @@ -0,0 +1,79 @@ +version 1.0 + +import "../verification/VerifyTasks.wdl" as Tasks + +## Copyright Broad Institute, 2018 +## +## This WDL script is designed to verify (compare) the outputs of an ArrayWf wdl. +## +## +## Runtime parameters are optimized for Broad's Google Cloud Platform implementation. +## For program versions, see docker containers. +## +## LICENSING : +## This script is released under the WDL source code license (BSD-3) (see LICENSE in +## https://github.com/broadinstitute/wdl). Note however that the programs it calls may +## be subject to different licenses. Users are responsible for checking that they are +## authorized to run all programs before running this script. Please see the docker +## page at https://hub.docker.com/r/broadinstitute/genomes-in-the-cloud/ for detailed +## licensing information pertaining to the included programs. + +workflow VerifyImputationBeagle { + input { + Array[File] truth_metrics + Array[File] test_metrics + + File truth_vcf + File test_vcf + File test_vcf_index + File truth_vcf_index + + Boolean? done + } + + String bcftools_docker_tag = "us.gcr.io/broad-gotc-prod/imputation-bcf-vcf:1.0.7-1.10.2-0.1.16-1669908889" + + scatter (idx in range(length(truth_metrics))) { + call CompareImputationMetrics { + input: + test_metrics = test_metrics[idx], + truth_metrics = truth_metrics[idx] + } + } + + call Tasks.CompareVcfs as CompareOutputVcfs { + input: + file1 = truth_vcf, + file2 = test_vcf, + patternForLinesToExcludeFromComparison = "##" # ignore headers + } + + output { + } + meta { + allowNestedInputs: true + } +} + +task CompareImputationMetrics { + input { + File test_metrics + File truth_metrics + } + command <<< + set -eo pipefail + diff "~{test_metrics}" "~{truth_metrics}" + + if [ $? -ne 0 ]; + then + echo "Error: ${test_metrics} and ${truth_metrics} differ" + fi + >>> + + runtime { + docker: "ubuntu:20.04" + cpu: 1 + memory: "3.75 GiB" + disks: "local-disk 10 HDD" + } +} diff --git a/verification/VerifyTasks.wdl b/verification/VerifyTasks.wdl index 43bb9b4340..46ed24373c 100644 --- a/verification/VerifyTasks.wdl +++ b/verification/VerifyTasks.wdl @@ -10,7 +10,7 @@ task CompareVcfs { command { set -eo pipefail - if [ -z ~{patternForLinesToExcludeFromComparison} ]; then + if [ -z '~{patternForLinesToExcludeFromComparison}' ]; then diff <(gunzip -c -f ~{file1}) <(gunzip -c -f ~{file2}) else echo "It's defined!" diff --git a/verification/test-wdls/TestImputationBeagle.wdl b/verification/test-wdls/TestImputationBeagle.wdl new file mode 100644 index 0000000000..2d56d858aa --- /dev/null +++ b/verification/test-wdls/TestImputationBeagle.wdl @@ -0,0 +1,111 @@ +version 1.0 + + +import "../../pipelines/broad/arrays/imputation_beagle/ImputationBeagle.wdl" as ImputationBeagle +import "../../verification/VerifyImputationBeagle.wdl" as VerifyImputationBeagle +import "../../tasks/broad/Utilities.wdl" as Utilities +import "../../tasks/broad/TerraCopyFilesFromCloudToCloud.wdl" as Copy + +workflow TestImputationBeagle { + + input { + Int chunkLength = 25000000 + Int chunkOverlaps = 5000000 # this is the padding that will be added to the beginning and end of each chunk to reduce edge effects + + File multi_sample_vcf + + File ref_dict # for reheadering / adding contig lengths in the header of the ouptut VCF, and calculating contig lengths + Array[String] contigs + String reference_panel_path_prefix # path + file prefix to the bucket where the reference panel files are stored for all contigs + String genetic_maps_path # path to the bucket where genetic maps are stored for all contigs + String output_basename # the basename for intermediate and output files + + # These values will be determined and injected into the inputs by the scala test framework + String truth_path + String results_path + Boolean update_truth + } + + meta { + allowNestedInputs: true + } + + call ImputationBeagle.ImputationBeagle { + input: + chunkLength = chunkLength, + chunkOverlaps = chunkOverlaps, + multi_sample_vcf = multi_sample_vcf, + ref_dict = ref_dict, + contigs = contigs, + reference_panel_path_prefix = reference_panel_path_prefix, + genetic_maps_path = genetic_maps_path, + output_basename = output_basename, + } + + + # Collect all of the pipeline outputs into single Array[String] + Array[String] pipeline_outputs = flatten([ + [ # File outputs + ImputationBeagle.imputed_multi_sample_vcf, + ImputationBeagle.imputed_multi_sample_vcf_index, + ] + ]) + + + # Collect all of the pipeline metrics into single Array[String] + Array[String] pipeline_metrics = flatten([ + [ # File outputs + ImputationBeagle.chunks_info, + ] + ]) + + # Copy results of pipeline to test results bucket + call Copy.TerraCopyFilesFromCloudToCloud as CopyToTestResults { + input: + files_to_copy = flatten([pipeline_outputs, pipeline_metrics]), + destination_cloud_path = results_path + } + + # If updating truth then copy output to truth bucket + if (update_truth){ + call Copy.TerraCopyFilesFromCloudToCloud as CopyToTruth { + input: + files_to_copy = flatten([pipeline_outputs, pipeline_metrics]), + destination_cloud_path = truth_path + } + } + + # This is achieved by passing each desired file/array[files] to GetValidationInputs + if (!update_truth){ + call Utilities.GetValidationInputs as GetMetrics { + input: + input_files = pipeline_metrics, + results_path = results_path, + truth_path = truth_path + } + call Utilities.GetValidationInputs as GetVcf { + input: + input_file = ImputationBeagle.imputed_multi_sample_vcf, + results_path = results_path, + truth_path = truth_path + } + call Utilities.GetValidationInputs as GetVcfIndex { + input: + input_file = ImputationBeagle.imputed_multi_sample_vcf_index, + results_path = results_path, + truth_path = truth_path + } + + + call VerifyImputationBeagle.VerifyImputationBeagle as Verify { + input: + truth_metrics = GetMetrics.truth_files, + test_metrics = GetMetrics.results_files, + truth_vcf = GetVcf.truth_file, + test_vcf = GetVcf.results_file, + truth_vcf_index = GetVcfIndex.truth_file, + test_vcf_index = GetVcfIndex.results_file, + done = CopyToTestResults.done + } + } +}