Skip to content

Commit

Permalink
add workflow "unpack_archive_to_bucket"; add task "unpack_archive_to_…
Browse files Browse the repository at this point in the history
…bucket_path" (#573)

* add workflow "unpack_archive_to_bucket"; add task "unpack_archive_to_bucket_path"

This adds a new WDL task, "unpack_archive_to_bucket_path", to unpack an input tarball to a specified bucket path, with options to set the target destination, layers of wrapping directories to remove (from around the tarball contents), and whether the files extracted should be uploaded directly via pipe or from a temporary location on-disk. This task is intended to run on Terra, but it can be run elsewhere if a Google Cloud auth token is passed in to the task. A corresponding workflow, unpack_archive_to_bucket, is also added.

* pass in gcloud access token, if provided, to unpack_archive_to_bucket_path; otherwise, obtain it via `gcloud auth print-access-token`

pass in gcloud access token, if provided, to unpack_archive_to_bucket_path; otherwise, obtain it via `gcloud auth print-access-token` (which should work on Terra/GCE instances).

* unpack_archive_to_bucket_path: allow additional opts to be passed to `gcloud storage cp`; reorder of inputs and their descriptions

* update unpack_archive_to_bucket_path to viral-core 2.4.1

* add fc- prefix to bucket ID returned by check_terra_env

* additional input descriptive text
  • Loading branch information
tomkinsc authored Dec 13, 2024
1 parent 7bdc16d commit efee43e
Show file tree
Hide file tree
Showing 6 changed files with 277 additions and 37 deletions.
39 changes: 22 additions & 17 deletions .dockstore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/beast_gpu.wdl
testParameterFiles:
- /empty.json
- name: blastoff
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/blastoff.wdl
testParameterFiles:
- /empty.json
- name: chunk_blast
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/megablast_chunk.wdl
testParameterFiles:
- /empty.json
- name: classify_kaiju
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/classify_kaiju.wdl
Expand Down Expand Up @@ -100,6 +110,16 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/coverage_table.wdl
testParameterFiles:
- /empty.json
- name: create_enterics_qc_viz
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz.wdl
testParameterFiles:
- /empty.json
- name: create_enterics_qc_viz_general
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz_general.wdl
testParameterFiles:
- /empty.json
- name: demux_metag
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/demux_metag.wdl
Expand Down Expand Up @@ -404,23 +424,8 @@ workflows:
primaryDescriptorPath: /pipes/WDL/workflows/bam_to_qiime.wdl
testParameterFiles:
- /empty.json
- name: create_enterics_qc_viz
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz.wdl
testParameterFiles:
- /empty.json
- name: create_enterics_qc_viz_general
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/create_enterics_qc_viz_general.wdl
testParameterFiles:
- /empty.json
- name: blastoff
- name: unpack_archive_to_bucket
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/blastoff.wdl
testParameterFiles:
- /empty.json
- name: chunk_blast
subclass: WDL
primaryDescriptorPath: /pipes/WDL/workflows/megablast_chunk.wdl
primaryDescriptorPath: /pipes/WDL/workflows/unpack_archive_to_bucket.wdl
testParameterFiles:
- /empty.json
4 changes: 3 additions & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ on:
- created

concurrency:
# Group jobs by the branch or tag ref to ensure only one job runs for a ref at a time
# Group jobs by the branch or tag ref to ensure only one job runs for a ref at a time.
# for pull requests, github.ref is the branch being merged into
# for pushes, github.ref is the name of the branch to which the commit/push was made
group: ${{ github.ref }}
# Cancel any in-progress jobs for the same group
cancel-in-progress: true
Expand Down
13 changes: 3 additions & 10 deletions pipes/WDL/tasks/tasks_terra.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ task check_terra_env {

# create Terra-related output files
touch user_email.txt
touch workspace_id.txt
touch workspace_name.txt
touch workspace_namespace.txt
touch workspace_bucket_path.txt
Expand All @@ -56,6 +57,7 @@ task check_terra_env {
touch method_version.txt
touch method_source.txt
touch method_path.txt
touch top_level_submission_id.txt

# disable the version update alert messages gcloud sometimes emits when executing any command
gcloud config set component_manager/disable_update_check true
Expand Down Expand Up @@ -134,7 +136,7 @@ task check_terra_env {
WORKSPACE_NAME="$(jq -cr '.workspace.name | select (.!=null)' workspace_info.json | tee workspace_name.txt)"
WORKSPACE_NAME_URL_ENCODED="$(jq -rn --arg x "${WORKSPACE_NAME}" '$x|@uri')"
WORKSPACE_NAMESPACE="$(jq -cr '.workspace.namespace | select (.!=null)' workspace_info.json | tee workspace_namespace.txt)"
WORKSPACE_BUCKET="$(echo gs://${WORKSPACE_ID} | tee workspace_bucket_path.txt)"
WORKSPACE_BUCKET="$(echo "gs://fc-${WORKSPACE_ID}" | tee workspace_bucket_path.txt)"

echo "WORKSPACE_NAME: ${WORKSPACE_NAME}"
echo "WORKSPACE_NAMESPACE: ${WORKSPACE_NAMESPACE}"
Expand Down Expand Up @@ -194,15 +196,6 @@ task check_terra_env {
else
echo "Not running on Terra+GCP"
fi
ls -1 /sys
echo "--"
ls -1 /sys/fs
echo "--"
ls -1 /sys/fs/cgroup
echo "-- memory.peak:"
cat /sys/fs/cgroup/memory.peak
echo "--"
#ls -1 /sys/fs/cgroup/memory
echo -n'' "MEM_BYTES: "; { if [ -f /sys/fs/cgroup/memory.peak ]; then cat /sys/fs/cgroup/memory.peak; elif [ -f /sys/fs/cgroup/memory/memory.max_usage_in_bytes ]; then cat /sys/fs/cgroup/memory/memory.max_usage_in_bytes; else echo "0"; fi } | tee MEM_BYTES
>>>
output {
Expand Down
204 changes: 197 additions & 7 deletions pipes/WDL/tasks/tasks_utils.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,196 @@ task concatenate {
}
}

task unpack_archive_to_bucket_path {
meta {
description: "Unpack archive(s) to a target location within a Google Storage bucket"
}
input {
# input archive(s)
Array[File] input_archive_files

# destination for extracted files
String bucket_path_prefix
String? out_dir_name

# tar options
Boolean bypass_disk_and_unpack_directly_to_bucket = false
Int? archive_wrapper_directories_to_strip
String tar_opts = "-v --ignore-zeros --no-ignore-command-error"

# gcloud storage options
Boolean clobber_existing = false
String? gcloud_access_token
String gcloud_storage_cp_opts = ""

# execution and resource requirements
Int disk_size = 500
Int machine_mem_gb = 128
String docker = "quay.io/broadinstitute/viral-core:2.4.1"
}

parameter_meta {
# data I/O inputs
input_archive_files: {
description: "List of input archive files to unpack.",
patterns: ["*.tar", "*.tar.gz", "*.tgz", "*.tar.bz2", "*.tbz2", "*.tar.xz", "*.txz", "*.tar.lz4", "*.tar.zst"]
}
bucket_path_prefix: {
description: "The path prefix to the Google Storage bucket location where the archive contents will be unpacked. This must begin with the bucket name, should start with 'gs://', and can include as many sub-directories as desired. If not provided and the job is running on a GCP-backed Terra instance, the bucket of the associated workspace will be inferred via introspection."
}
out_dir_name: {
description: "Name of the (sub-)directory to unpack the archive contents to within the bucket prefix specified. If not provided, the contents will be unpacked to the bucket prefix."
}

# tar params
bypass_disk_and_unpack_directly_to_bucket: {
description: "(tar) If true, unpack the archive(s) and pipe the contents directly to the gcloud storage upload process, without writing to the disk between extraction and upload. If enabled, minimal disk space will be used beyond storage needed to localize the specified input archive(s), but the task may take significantly longer as each file is uploaded using an independent gcloud storage invocation."
}
archive_wrapper_directories_to_strip: {
description: "(tar) If specified, tar extraction excludes this many top-level directories. (i.e. if all files of a tarball are containined within a top-level subdirectory, and archive_wrapper_directories_to_strip=1, the files files will be extracted without being placed into a corresponding output sub-directory. Equivalent to the parameter '--strip-components' of GNU tar."
}
tar_opts: {
description: "(tar) Options to pass to GNU tar during extraction. By default includes: '-v --ignore-zeros --no-ignore-command-error'"
}

# 'gcloud storage cp' params
clobber_existing: {
description: "(gcloud storage cp) If true, overwrite files in the target directory of the bucket if they already exist."
}
gcloud_access_token: {
description: "(gcloud storage cp) Access token for the Google Cloud Storage bucket, for account authorized to write to the bucket specified by 'bucket_path_prefix'. If not provided, the gcloud auth configuration of the execution environment will be obtained via 'gcloud auth print-access-token' for the active authenticated user (on Terra, the service worker/'pet' account)."
}
gcloud_storage_cp_opts: {
description: "(gcloud storage cp) Additional options to pass to the 'gcloud storage cp' command at the time of upload."
}


# execution and resource requirements
disk_size: {
description: "Size of the disk to allocate for the task, in GB. Note that if multiple files are provided to 'input_archive_files', and extracted data is written to the disk (bypass_disk_and_unpack_directly_to_bucket=false), the extracted data from one archive will be removed before extracting and uploading data from the next input archive."
}
machine_mem_gb: {
description: "Memory to allocate for the task, in GB."
}
docker: {
description: "Docker image to use for the task. For this task, the image must provide GNU tar and the google-cloud-cli ('gcloud' command)"
}
}

command <<<
# verify gcloud is installed (it should be, if the default docker image is used)
if ! command -v gcloud &> /dev/null; then
echo "ERROR: gcloud is not installed; it is required to authenticate to Google Cloud Storage" >&2
exit 1
fi

if ~{if(defined(gcloud_access_token)) then 'true' else 'false'}; then
# set access token env var expected by gcloud,
# if provided by the user
CLOUDSDK_AUTH_ACCESS_TOKEN="~{gcloud_access_token}"
else
CLOUDSDK_AUTH_ACCESS_TOKEN="$(gcloud auth print-access-token)"
fi
export CLOUDSDK_AUTH_ACCESS_TOKEN

# check that the gcloud access token is populated
if [ -z "${CLOUDSDK_AUTH_ACCESS_TOKEN}" ]; then
echo "ERROR: gcloud access token not found; it must either be provided via the 'gcloud_access_token' input, or made available within the execution environment (via 'gcloud auth print-access-token')" >&2
exit 1
fi

# check whether the bucket path prefix begins with "gs://" and if not,
# prepend the 'protocol'; also strip leading or trailing slash if present
# (for flexibility; this way the user can specify the bucket path prefix with or without the protocol)
bucket_path_prefix=$(echo "~{bucket_path_prefix}" | sed -e 's|^gs://||' -e 's|/$||' -e 's|^/*||' -e 's|^|gs://|')

# check that, excluding the gs:// 'protocol' prefix, the bucket path prefix is not empty
if [ -z "${bucket_path_prefix/#gs:\/\//}" ]; then
echo "ERROR: bucket path prefix is empty" >&2
exit 1
fi

# check whether the user can write to the target bucket
# by trying a simple write action, since we cannot rely on
# the user having the permissions needed to view the IAM policies
# that determine their (write) access to the bucket
if ! echo "write_test" | gcloud storage cp --verbosity error - "${bucket_path_prefix}/.tmp/test-write-access.txt" --quiet; then
echo "ERROR: user does not have write access to the target bucket: ~{bucket_path_prefix}" >&2
exit 1
else
# clean up the test file if the write test was successful
gcloud storage rm "${bucket_path_prefix}/.tmp/test-write-access.txt"
fi

# for each of the input archives provided, extract the contents to the target bucket
# either directly via pipe, or from an intermediate location on disk
for input_archive in ~{sep=' ' input_archive_files}; do
echo "Processing archive: $(basename "${input_archive}")"

# if the user has requested to bypass writing to disk between extraction and upload
if ~{if(bypass_disk_and_unpack_directly_to_bucket) then 'true' else 'false'}; then
echo "Unpacking archive(s) and piping directly to gcloud storage upload processes (bypassing the disk)..."

# TODO: parallelize if needed and if the increased memory usage is acceptable
# either via GNU parallel ( https://www.gnu.org/software/parallel/parallel_examples.html )
# or by simply pushing the tar processes to the background

# pipe each file to a command via stdout, relying GNU tar to pass file information
# out of band via special environment variables set for each file when using the --to-command
#
# documentation here:
# https://www.gnu.org/software/tar/manual/html_section/extract-options.html#Writing-to-an-External-Program
tar ~{tar_opts} -x \
~{if(defined(archive_wrapper_directories_to_strip)) then "--strip-components=~{archive_wrapper_directories_to_strip}" else ""} \
--to-command='gcloud storage cp ~{gcloud_storage_cp_opts} ~{if clobber_existing then "" else "--no-clobber"} --verbosity error - '"${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}/"'${TAR_REALNAME}' \
-f "${input_archive}"

# otherwise extract to disk and then upload to the bucket
else
echo 'Extracting archive '"$(basename "${input_archive}")"' to disk before upload...'

# create a temporary directory to extract the archive contents to
mkdir -p extracted_tmp

# extract the archive to the temporary directory
tar ~{tar_opts} -x \
--directory "./extracted_tmp" \
~{if(defined(archive_wrapper_directories_to_strip)) then "--strip-components=~{archive_wrapper_directories_to_strip}" else ""} \
-f "${input_archive}"

pushd extracted_tmp

echo "Uploading extracted files to the target bucket..."

# gcloud storage rsync the extracted files to the target bucket in the target directory
gcloud storage rsync \
--recursive \
~{if clobber_existing then "" else "--no-clobber"} \
--verbosity warning \
~{gcloud_storage_cp_opts} \
./ "${bucket_path_prefix}~{if(defined(out_dir_name)) then '/~{out_dir_name}' else ''}"

popd
rm -r ./extracted_tmp
fi
done
>>>

runtime {
docker: docker
memory: machine_mem_gb + " GB"
cpu: 16
disks: "local-disk " + disk_size + " LOCAL"
disk: disk_size + " GB" # TES
dx_instance_type: "mem3_ssd1_v2_x16"
preemptible: 0
maxRetries: 1
}

output {
}
}

task zcat {
meta {
description: "Glue together a bunch of text files that may or may not be compressed (autodetect among gz,xz,bz2,lz4,zst or uncompressed inputs). Optionally compress the output (depending on requested file extension)"
Expand Down Expand Up @@ -256,7 +446,7 @@ task download_from_url {
# ----
# get the name of the downloaded file
downloaded_file_name="$(basename $(ls -1 | head -n1))"
downloaded_file_name="$(basename "$(ls -1 | head -n1)")"
if [ ! -f "$downloaded_file_name" ]; then
echo "Could not locate downloaded file \"$downloaded_file_name\""
Expand All @@ -274,12 +464,12 @@ task download_from_url {
# since wget stores both in a single file separated by a couple newlines
if [[ "~{url_to_download}" =~ ^https?:// ]] && ~{if save_response_header_to_file then "true" else "false"}; then
echo "Saving response headers separately..."
csplit -f response -s tmp/${downloaded_file_name} $'/^\r$/+1' && \
mv response00 ../${downloaded_file_name}.headers && \
mv response01 ${downloaded_file_name} && \
csplit -f response -s "tmp/${downloaded_file_name}" $'/^\r$/+1' && \
mv response00 "../${downloaded_file_name}.headers" && \
mv response01 "${downloaded_file_name}" && \
rm "tmp/$downloaded_file_name"
else
mv tmp/${downloaded_file_name} ${downloaded_file_name}
mv "tmp/${downloaded_file_name}" "${downloaded_file_name}"
fi
# alternative python implementation to split response headers from body
# via https://stackoverflow.com/a/75483099
Expand Down Expand Up @@ -317,11 +507,11 @@ task download_from_url {
if ~{if defined(md5_hash_expected) then 'true' else 'false'}; then
md5_hash_expected="~{md5_hash_expected}"
check_md5_sum $md5_hash_expected $md5sum_of_downloaded
check_md5_sum "$md5_hash_expected" "$md5sum_of_downloaded"
fi
if ~{if defined(md5_hash_expected_file_url) then 'true' else 'false'}; then
md5_hash_expected="$(curl --silent ~{md5_hash_expected_file_url} | cut -f1 -d' ')"
check_md5_sum $md5_hash_expected $md5sum_of_downloaded
check_md5_sum "$md5_hash_expected" "$md5sum_of_downloaded"
fi
# report the file size, in bytes
Expand Down
2 changes: 0 additions & 2 deletions pipes/WDL/workflows/sarscov2_illumina_full.wdl
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,10 @@ workflow sarscov2_illumina_full {
description: "amplicon primers to trim in reference coordinate space (0-based BED format)",
patterns: ["*.bed"]
}

biosample_attributes: {
description: "A post-submission attributes file from NCBI BioSample, which is available at https://submit.ncbi.nlm.nih.gov/subs/ and clicking on 'Download attributes file with BioSample accessions'. The 'sample_name' column must match the external_ids used in sample_rename_map (or internal ids if sample_rename_map is omitted).",
patterns: ["*.txt", "*.tsv"]
}

}

input {
Expand Down
Loading

0 comments on commit efee43e

Please sign in to comment.