diff --git a/docker/Dockerfile b/docker/Dockerfile index 330408a8..e0ca8630 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -15,22 +15,18 @@ # To build a new docker image, run the following from the root source dir: # $ docker build . -f docker/Dockerfile -t $IMAGE_NAME -FROM golang:latest -RUN go install -ldflags '-extldflags "-fno-PIC -static"' -buildmode pie -tags 'osusergo netgo static_build' github.com/googlegenomics/pipelines-tools/pipelines@latest - -FROM apache/beam_python3.8_sdk:2.37.0 +FROM apache/beam_python3.12_sdk:2.65.0 ARG commit_sha ENV COMMIT_SHA=${commit_sha} -COPY --from=0 /go/bin/pipelines /usr/bin - # install gcloud sdk -RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] http://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key --keyring /usr/share/keyrings/cloud.google.gpg add - +RUN echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list && curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg && apt-get update -y && apt-get install google-cloud-cli -y # g++ Needed for installing mmh3 (one of the required packages in setup.py). # Install Pysam dependencies. These dependencies are only required because we # have a monolithic binary - they primarily have to be installed on the workers. RUN apt-get update && apt-get install -y \ + gettext \ apt-transport-https \ autoconf \ automake \ @@ -50,7 +46,7 @@ RUN apt-get update && apt-get install -y \ RUN mkdir -p /opt/gcp_variant_transforms/bin && mkdir -p /opt/gcp_variant_transforms/src -ADD / /opt/gcp_variant_transforms/src/ +COPY /requirements.txt /opt/gcp_variant_transforms/src/requirements.txt # Install dependencies. RUN python3 -m venv /opt/gcp_variant_transforms/venv3 && \ @@ -61,6 +57,8 @@ RUN python3 -m venv /opt/gcp_variant_transforms/venv3 && \ python3 -m pip install --upgrade wheel && \ python3 -m pip install --upgrade -r requirements.txt +COPY / /opt/gcp_variant_transforms/src/ + RUN printf '#!/bin/bash\n%s\n%s' \ ". /opt/gcp_variant_transforms/venv3/bin/activate && cd /opt/gcp_variant_transforms/src" \ 'python -m gcp_variant_transforms.vcf_to_bq --setup_file ./setup.py "$@"' > \ @@ -79,4 +77,4 @@ RUN printf '#!/bin/bash\n%s\n%s' \ /opt/gcp_variant_transforms/bin/bq_to_vcf && \ chmod +x /opt/gcp_variant_transforms/bin/bq_to_vcf -ENTRYPOINT ["/opt/gcp_variant_transforms/src/docker/pipelines_runner.sh"] +ENTRYPOINT ["/opt/gcp_variant_transforms/src/docker/batch_runner.sh"] diff --git a/docker/batch.json b/docker/batch.json new file mode 100644 index 00000000..9c27f455 --- /dev/null +++ b/docker/batch.json @@ -0,0 +1,73 @@ +{ + "taskGroups": [ + { + "taskSpec": { + "runnables": [ + { + "container": { + "imageUri": "gcr.io/google.com/cloudsdktool/cloud-sdk:slim", + "entrypoint": "bash", + "commands": [ + "-c", + "mkdir -p /mnt/disks/google/.google/tmp" + ], + "volumes": ["/mnt/disks/google:/mnt/disks/google"] + } + }, + { + "container": { + "imageUri": "$VT_DOCKER_IMAGE", + "entrypoint": "bash", + "commands": [ + "-c", + "$COMMAND $DF_REQUIRED_ARGS $DF_OPTIONAL_ARGS" + ], + "volumes": ["/mnt/disks/google:/mnt/disks/google"] + } + } + ], + "environment": { + "variables": { + "TMPDIR": "/mnt/disks/google/.google/tmp" + } + }, + "volumes": [ + { + "deviceName": "google", + "mountPath": "/mnt/disks/google", + "mountOptions": "rw,async" + } + ] + }, + "taskCount": 1 + } + ], + "allocationPolicy": { + "serviceAccount": { + "email": "$SERVICE_ACCOUNT_EMAIL", + "scopes": [ + "https://www.googleapis.com/auth/cloud-platform", + "https://www.googleapis.com/auth/devstorage.read_write" + ] + }, + "instances": [ + { + "policy": { + "machineType": "e2-standard-2", + "disks": [ + { + "newDisk": { + "type": "pd-ssd", + "sizeGb": 100 + }, + "deviceName": "google" + } + ] + } + } + ] + }, + "logsPolicy": { + "destination": "CLOUD_LOGGING" + } +} diff --git a/docker/pipelines_runner.sh b/docker/batch_runner.sh similarity index 80% rename from docker/pipelines_runner.sh rename to docker/batch_runner.sh index 5bd2fad8..d9818422 100755 --- a/docker/pipelines_runner.sh +++ b/docker/batch_runner.sh @@ -79,9 +79,9 @@ function main { # If missing, we will try to find the default values. google_cloud_project="${google_cloud_project:-$(gcloud config get-value project)}" region="${region:-$(gcloud config get-value compute/region)}" - vt_docker_image="${vt_docker_image:-gcr.io/cloud-lifesciences/gcp-variant-transforms}" - sdk_container_image="${sdk_container_image:-gcr.io/cloud-lifesciences/variant-transforms-custom-runner:latest}" + vt_docker_image="${vt_docker_image:-us-east1-docker.pkg.dev/variant-transform-dxt/dxt-public-variant-transform/batch-runner:latest}" + sdk_container_image="${sdk_container_image:-}" location="${location:-}" temp_location="${temp_location:-}" subnetwork="${subnetwork:-}" @@ -140,28 +140,22 @@ function main { df_optional_args="${df_optional_args} --service_account_email ${service_account}" fi - # Optional location for Life Sciences API (default us-central1), see currently available - # locations here: https://cloud.google.com/life-sciences/docs/concepts/locations - l_s_location="" - if [[ ! -z "${location}" ]]; then - echo "Adding --location ${location} to Life Sciences API invocation command." - l_s_location="--location ${location}" - fi - - pipelines --project "${google_cloud_project}" ${l_s_location} run \ - --command "/opt/gcp_variant_transforms/bin/${command} ${df_required_args} ${df_optional_args}" \ - --output "${temp_location}"/runner_logs_$(date +%Y%m%d_%H%M%S).log \ - --output-interval 200s \ - --wait \ - --scopes "https://www.googleapis.com/auth/cloud-platform" \ - --regions "${region}" \ - --image "${vt_docker_image}" \ - --machine-type "g1-small" \ - --pvm-attempts 0 \ - --attempts 1 \ - --disk-size 10 \ - --boot-disk-size 100 \ - ${pt_optional_args} + export COMMAND="/opt/gcp_variant_transforms/bin/${command}" + export VT_DOCKER_IMAGE="${vt_docker_image}" + export DF_OPTIONAL_ARGS="${df_optional_args}" + export DF_REQUIRED_ARGS="${df_required_args}" + export TEMP_LOCATION="${temp_location}" + export LOG_TIME=$(date +%Y%m%d_%H%M%S) + export SERVICE_ACCOUNT_EMAIL="${service_account}" + export GOOGLE_APPLICATION_CREDENTIALS="/root/.config/gcloud/application_default_credentials.json" + job_name=$(echo "$command" | head -n1 | awk '{print $1}' | tr '_' '-') + + # Create the batch.json file using envsubst to replace variables. + envsubst < /opt/gcp_variant_transforms/src/docker/batch.json > batch.json + + gcloud batch jobs submit "${job_name}"-`date +"%Y%m%d%H%M%S"` \ + --config=batch.json \ + --location="${location}" } main "$@" diff --git a/gcp_variant_transforms/beam_io/bgzf.py b/gcp_variant_transforms/beam_io/bgzf.py index f949e06a..16ad4226 100644 --- a/gcp_variant_transforms/beam_io/bgzf.py +++ b/gcp_variant_transforms/beam_io/bgzf.py @@ -142,7 +142,7 @@ def read_records(self, file_name, _): # contains sample info that is unique for `file_name`) to `header_lines`. with open_bgzf(file_name) as file_to_read: self._process_header(file_to_read, read_buffer) - with self.open_file(file_name) as file_to_read: + with open_bgzf(file_name) as file_to_read: while True: record = file_to_read.readline() if not record or not record.strip(): @@ -197,15 +197,15 @@ def _read_first_gzip_block_into_buffer(self): def _read_data_from_source(self): if self._start_offset == self._block.end: return b'' - buf = self._file.raw._downloader.get_range(self._start_offset, - self._block.end) + buf = self._file._blob.download_as_bytes(start=self._start_offset, + end=self._block.end) self._start_offset += len(buf) return buf def _complete_last_line(self): # Fetches the first line in the next `self._read_size` bytes. - buf = self._file.raw._downloader.get_range( - self._block.end, self._block.end + self._read_size) + buf = self._file._blob.download_as_bytes(start=self._block.end, + end=self._block.end + self._read_size) self._decompressor = zlib.decompressobj(self._gzip_mask) decompressed = self._decompressor.decompress(buf) del buf diff --git a/gcp_variant_transforms/beam_io/bgzf_test.py b/gcp_variant_transforms/beam_io/bgzf_test.py index f283de32..958d19a7 100644 --- a/gcp_variant_transforms/beam_io/bgzf_test.py +++ b/gcp_variant_transforms/beam_io/bgzf_test.py @@ -35,8 +35,7 @@ def setUp(self): self.gcs = gcsio.GcsIO(self.client) self._file_name = 'gs://bucket/test' bucket, name = gcsio.parse_gcs_path(self._file_name) - self.client.objects.add_file(gcsio_test.FakeFile(bucket, name, self._data, - 1)) + self.client.add_file(bucket, name, self._data) def test_one_gzip_block(self): with self._open_bgzf_block(self._file_name, diff --git a/gcp_variant_transforms/beam_io/vcf_parser.py b/gcp_variant_transforms/beam_io/vcf_parser.py index 04afec0c..8b04c63d 100644 --- a/gcp_variant_transforms/beam_io/vcf_parser.py +++ b/gcp_variant_transforms/beam_io/vcf_parser.py @@ -24,12 +24,22 @@ from typing import Iterable # pylint: disable=unused-import import logging import os +import uuid +import os +import sys +import tempfile +from enum import Enum, auto from apache_beam.coders import coders from apache_beam.io import filesystems from apache_beam.io import textio from pysam import libcbcf +import pysam +import tempfile +from io import StringIO +import socket +from typing import Iterator from gcp_variant_transforms.beam_io import bgzf from gcp_variant_transforms.libs import hashing_util @@ -314,11 +324,11 @@ def __init__( **kwargs) else: text_source = textio._TextSource( - file_pattern, - 0, # min_bundle_size - compression_type, - True, # strip_trailing_newlines - coders.StrUtf8Coder(), # coder + file_pattern=file_pattern, + min_bundle_size=0, # min_bundle_size + compression_type=compression_type, + strip_trailing_newlines=True, # strip_trailing_newlines + coder=coders.StrUtf8Coder(), # coder validate=False, header_processor_fns=( lambda x: not x.strip() or x.startswith('#'), @@ -418,6 +428,7 @@ def _get_variant(self, data_line): raise NotImplementedError + class PySamParser(VcfParser): """An Iterator for processing a single VCF file using PySam. @@ -692,3 +703,150 @@ def _get_variant_calls(self, samples): calls.append(VariantCall(encoded_name, name, genotype, phaseset, info)) return hom_ref_calls, calls + + +class FileDescriptorProvider: + """ + A cross-platform class that provides a file descriptor for inter-process use. + Best used as a context manager (`with` statement) to guarantee cleanup. + """ + + def __init__(self): + # TODO: consider using /dev/shm if available + base_dir = tempfile.gettempdir() + self._name: str = os.path.join(base_dir, str(uuid.uuid4())) + self._temp_file = open(self._name, "w+b") + + self._fd: int = self._temp_file.fileno() + self._is_closed: bool = False + logging.debug(f"File Descriptor Provider Initialized: File '{self._name}' created, FD={self._fd}") + + @property + def fd(self) -> int: + """Returns the integer file descriptor, ready for reading per the chosen strategy.""" + if self._is_closed: + raise ValueError("Cannot access fd of a closed provider.") + return self._fd + + @property + def name(self) -> str: + """Returns the full path to the temporary file.""" + if self._is_closed: + raise ValueError("Cannot access name of a closed provider.") + return self._name + + def rewind(self): + """Rewinds the file cursor to the beginning of the file.""" + self._temp_file.seek(0, os.SEEK_SET) + + def write_line(self, data: str): + """ + Appends data to the file and resets the read cursor based on the chosen strategy. + """ + if self._is_closed: + raise ValueError("Cannot write to a closed provider.") + if not isinstance(data, str): + raise TypeError("Data must be in str.") + + if not data.endswith("\n"): + data += "\n" + # Always append to the end of the file + self._temp_file.seek(0, os.SEEK_END) + bytes_written = self._temp_file.write(data.encode()) + self._temp_file.flush() + self._temp_file.seek(-bytes_written, os.SEEK_END) + + def close(self): + """ + Closes the file handle and deletes the temporary file from the filesystem. + """ + if not self._is_closed: + logging.debug(f"Closing File Descriptor Provider: Deleting file '{self._name}' and closing FD={self._fd}.") + self._temp_file.close() + try: + os.remove(self._name) + except FileNotFoundError: + pass + self._is_closed = True + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.close() + +class PySamParserWithFileStreaming(PySamParser): + def __init__( + self, + file_name, # type: str + range_tracker, # type: range_trackers.OffsetRangeTracker + compression_type, # type: str + allow_malformed_records, # type: bool + file_pattern=None, # type: str + representative_header_lines=None, # type: List[str] + splittable_bgzf=False, # type: bool + pre_infer_headers=False, # type: bool + sample_name_encoding=SampleNameEncoding.WITHOUT_FILE_PATH, # type: int + use_1_based_coordinate=False, # type: bool + move_hom_ref_calls=False, # type: bool + **kwargs # type: **str + ): + # type: (...) -> None + super().__init__(file_name=file_name, + range_tracker=range_tracker, + file_pattern=file_pattern, + compression_type=compression_type, + allow_malformed_records=allow_malformed_records, + representative_header_lines=representative_header_lines, + splittable_bgzf=splittable_bgzf, + pre_infer_headers=pre_infer_headers, + sample_name_encoding=sample_name_encoding, + use_1_based_coordinate=use_1_based_coordinate, + move_hom_ref_calls=move_hom_ref_calls, + **kwargs) + # These members will be properly initiated in _init_parent_process(). + self._to_child = None + self._original_info_list = None + self._process_pid = None + self._encoded_sample_names = {} + + self._text_streamer = FileDescriptorProvider() + self._vcf_reader = None + + def _init_with_header(self, header_lines): + self._header_lines = header_lines + ### write header lines to a tmp file then parse it + with tempfile.NamedTemporaryFile(delete=False) as tmp_file: + tmp_file.write("\n".join(header_lines).encode()) + tmp_file_name = tmp_file.name + self._original_info_list = libcbcf.VariantFile(tmp_file_name).header.info.keys() + self._text_streamer.write_line("\n".join(header_lines)) + + def _get_variant(self, data_line): + """ + Parse a single VCF line from a string + + Args: + data_line: Single VCF data line as string + + Returns: + Parsed variant record + """ + self._text_streamer.write_line(data_line) + try: + # only for the first data line + if self._vcf_reader is None: + self._text_streamer.rewind() + self._vcf_reader = libcbcf.VariantFile(self._text_streamer._temp_file.name, 'r') + record = next(iter(self._vcf_reader)) + variant = self._convert_to_variant(record) + return variant + except Exception as e: + print(f"Error parsing VCF line: {e}") + return None + + def send_kill_signal_to_child(self): + if self._vcf_reader is not None: + self._vcf_reader.close() + self._text_streamer.close() + diff --git a/gcp_variant_transforms/beam_io/vcfio.py b/gcp_variant_transforms/beam_io/vcfio.py index 20809e71..334f6fde 100644 --- a/gcp_variant_transforms/beam_io/vcfio.py +++ b/gcp_variant_transforms/beam_io/vcfio.py @@ -229,7 +229,7 @@ def read_records(self, range_tracker # type: range_trackers.OffsetRangeTracker ): # type: (...) -> Iterable[MalformedVcfRecord] - record_iterator = vcf_parser.PySamParser( + record_iterator = vcf_parser.PySamParserWithFileStreaming( file_name, range_tracker, self._compression_type, @@ -290,7 +290,7 @@ def _read_records(self, file_path_and_block_tuple): # type: (Tuple[str, Block]) -> Iterable(Variant) """Reads records from `file_path` in `block`.""" (file_path, block) = file_path_and_block_tuple - record_iterator = vcf_parser.PySamParser( + record_iterator = vcf_parser.PySamParserWithFileStreaming( file_path, block, filesystems.CompressionTypes.GZIP, diff --git a/gcp_variant_transforms/beam_io/vcfio_test.py b/gcp_variant_transforms/beam_io/vcfio_test.py index d18a0bff..f30eb3d7 100644 --- a/gcp_variant_transforms/beam_io/vcfio_test.py +++ b/gcp_variant_transforms/beam_io/vcfio_test.py @@ -263,7 +263,7 @@ def test_read_single_file_large(self): {'file': 'valid-4.0.vcf.gz', 'num_records': 5}, {'file': 'valid-4.0.vcf.bz2', 'num_records': 5}, {'file': 'valid-4.1-large.vcf', 'num_records': 9882}, - {'file': 'valid-4.2.vcf', 'num_records': 13}, + {'file': 'valid-4.2.vcf', 'num_records': 13} ] for config in test_data_conifgs: read_data = self._read_records( diff --git a/gcp_variant_transforms/libs/annotation/vep/vep_runner.py b/gcp_variant_transforms/libs/annotation/vep/vep_runner.py index 99c65aae..98cce8f8 100644 --- a/gcp_variant_transforms/libs/annotation/vep/vep_runner.py +++ b/gcp_variant_transforms/libs/annotation/vep/vep_runner.py @@ -14,8 +14,10 @@ import argparse # pylint: disable=unused-import +import uuid import logging import time +import json from typing import Any, Dict, List, Optional # pylint: disable=unused-import from apache_beam.io import filesystem # pylint: disable=unused-import @@ -35,29 +37,31 @@ # The name of the file placed at the root of output_dir that includes # information on how the pipelines were run, input files, etc. -_GLOBAL_LOG_FILE = 'VEP_run_info.log' +_GLOBAL_LOG_FILE = "VEP_run_info.log" # TODO(bashir2): Check if instead of raw strings, we can use a protocol # buffer describing the parameters of the Pipelines API or some other way # to create typed objects. # # API constants: -_API_PIPELINE = 'pipeline' -_API_ACTIONS = 'actions' +_API_PIPELINE = "pipeline" +_API_ACTIONS = "actions" +_API_TASKGROUPS = "taskGroups" +_API_TASKSPEC = "taskSpec" +_API_RUNNABLES = "runnables" # The following image wraps gsutil with additional retry logic. -_GSUTIL_IMAGE = 'gcr.io/cloud-genomics-pipelines/io' +_GSUTIL_IMAGE = "gcr.io/google.com/cloudsdktool/cloud-sdk:slim" # The expected path of the run_vep.sh script in the docker container. -_VEP_RUN_SCRIPT = '/opt/variant_effect_predictor/run_vep.sh' +_VEP_RUN_SCRIPT = "/opt/variant_effect_predictor/run_vep.sh" # The expected path of `run_script_with_watchdog.sh` script in the # docker container. We use this script to run `_VEP_RUN_SCRIPT` in background # and kill it in case of failure or cancellation. -_WATCHDOG_RUNNER_SCRIPT = ( - '/opt/variant_effect_predictor/run_script_with_watchdog.sh') +_WATCHDOG_RUNNER_SCRIPT = "/opt/variant_effect_predictor/run_script_with_watchdog.sh" # The local name of the output file and directory for VEP runs. -_LOCAL_OUTPUT_DIR = '/mnt/vep/output_files' -_LOCAL_OUTPUT_FILE = _LOCAL_OUTPUT_DIR + '/output.vcf' +_LOCAL_OUTPUT_DIR = "/mnt/disks/vep/output_files" +_LOCAL_OUTPUT_FILE = _LOCAL_OUTPUT_DIR + "/output.vcf" # The time between operation polling rounds. _POLLING_INTERVAL_SECONDS = 30 @@ -66,415 +70,555 @@ _NUMBER_OF_API_CALL_RETRIES = 5 -def create_runner(known_args, pipeline_args, input_pattern, watchdog_file, - watchdog_file_update_interval_seconds): - # type: (argparse.Namespace, List[str], str, Optional[str], int) -> VepRunner - """Returns an instance of VepRunner using the provided args. - - Args: - known_args: The list of arguments defined in `variant_transform_options`. - pipeline_args: The list of remaining arguments meant to be used to - determine resources like number of workers, machine type, etc. - input_pattern: The VCF files to be annotated. - watchdog_file: The file that will be updated by the Dataflow worker every - `watchdog_file_update_interval_seconds`. Once the file is found to be - stale, the VEP process running in the VM will be killed. - watchdog_file_update_interval_seconds: The `watchdog_file` will be updated - by the Dataflow worker every `watchdog_file_update_interval_seconds`. - """ - credentials = client.GoogleCredentials.get_application_default() - pipeline_service = discovery.build( - 'lifesciences', 'v2beta', credentials=credentials) - runner = VepRunner( - pipeline_service, known_args.location, known_args.vep_species, - known_args.vep_assembly, input_pattern, known_args.annotation_output_dir, - known_args.vep_info_field, known_args.vep_image_uri, - known_args.vep_cache_path, known_args.vep_num_fork, pipeline_args, - watchdog_file, watchdog_file_update_interval_seconds) - return runner - - -class VepRunner(): - """A class for running vep through Pipelines API on a set of input files.""" - - _VEP_CACHE_BASE = ('gs://cloud-lifesciences/vep/' - 'vep_cache_{species}_{assembly}_104.tar.gz') - - def __init__( - self, - pipeline_service, # type: discovery.Resource - location, # type: str - species, # type: str - assembly, # type: str - input_pattern, # type: str - output_dir, # type: str - vep_info_field, # type: str - vep_image_uri, # type: str - vep_cache_path, # type: str - vep_num_fork, # type: int - pipeline_args, # type: List[str] - watchdog_file, # type: Optional[str] - watchdog_file_update_interval_seconds, # type: int - ): - # type: (...) -> None - """Constructs an instance for running VEP. - - Note that external users of this class can use create_runner_and_update_args - function of this module to create an instance of this class from flags. +def create_runner( + known_args, + pipeline_args, + input_pattern, + watchdog_file, + watchdog_file_update_interval_seconds, +): + # type: (argparse.Namespace, List[str], str, Optional[str], int) -> VepRunner + """Returns an instance of VepRunner using the provided args. Args: - location: The Life Sciences API location to use. - input_pattern: The pattern to identify all input files. - output_dir: The location for all output files. This is expected not to - exist and is created in the process of running VEP pipelines. - vep_image_uri: The URI of the image that contains VEP. - vep_cache_path: The URI for the cache file on GCS. - vep_num_fork: The value of the --fork argument for running VEP. - pipeline_args: The list of arguments that are meant to be used when - running Beam; for simplicity we use the same arguments to decide how - many and what type of workers to use, where to run, etc. + known_args: The list of arguments defined in `variant_transform_options`. + pipeline_args: The list of remaining arguments meant to be used to + determine resources like number of workers, machine type, etc. + input_pattern: The VCF files to be annotated. watchdog_file: The file that will be updated by the Dataflow worker every `watchdog_file_update_interval_seconds`. Once the file is found to be stale, the VEP process running in the VM will be killed. watchdog_file_update_interval_seconds: The `watchdog_file` will be updated by the Dataflow worker every `watchdog_file_update_interval_seconds`. """ - self._pipeline_service = pipeline_service - self._location = location - self._species = species - self._assembly = assembly - self._vep_image_uri = vep_image_uri - self._vep_cache_path = self._make_vep_cache_path(vep_cache_path) - self._vep_num_fork = vep_num_fork - self._input_pattern = input_pattern - self._output_dir = output_dir - self._vep_info_field = vep_info_field - self._process_pipeline_args(pipeline_args) - self._watchdog_file = watchdog_file - self._watchdog_file_update_interval_seconds = ( - watchdog_file_update_interval_seconds) - self._running_operation_ids = [] # type: List[str] - self._operation_name_to_io_infos = {} - self._operation_name_to_logs = {} - - def _make_vep_cache_path(self, vep_cache_path): - # type: (str) -> str - if not vep_cache_path: - vep_cache_path = VepRunner._VEP_CACHE_BASE.format(species=self._species, - assembly=self._assembly) - return vep_cache_path - - def get_output_pattern(self): - # type: () -> str - return vep_runner_util.get_output_pattern(self._output_dir) - - def _get_api_request_fixed_parts(self): - # type: () -> Dict - """Returns the part of API request that is fixed between actions. - - This includes setting up VEP cache, virtual machine setup, etc. The variant - parts are the `commands` for processing each file and should be added before - sending the API request. - """ - return { - _API_PIPELINE: { - _API_ACTIONS: [ - self._make_action(self._vep_image_uri, 'mkdir', '-p', - '/mnt/vep/vep_cache'), - self._make_action(_GSUTIL_IMAGE, 'gsutil', '-q', 'cp', - self._vep_cache_path, '/mnt/vep/vep_cache/') + credentials = client.GoogleCredentials.get_application_default() + pipeline_service = discovery.build("batch", "v1", credentials=credentials) + runner = VepRunner( + pipeline_service, + known_args.location, + known_args.vep_species, + known_args.vep_assembly, + input_pattern, + known_args.annotation_output_dir, + known_args.vep_info_field, + known_args.vep_image_uri, + known_args.vep_cache_path, + known_args.vep_num_fork, + pipeline_args, + watchdog_file, + watchdog_file_update_interval_seconds, + ) + return runner + + +class VepRunner: + """A class for running vep through Batch API on a set of input files.""" + + _VEP_CACHE_BASE = ( + "gs://variant-data/vep/" "vep_cache_{species}_{assembly}_104.tar.gz" + ) + + def __init__( + self, + pipeline_service, # type: discovery.Resource + location, # type: str + species, # type: str + assembly, # type: str + input_pattern, # type: str + output_dir, # type: str + vep_info_field, # type: str + vep_image_uri, # type: str + vep_cache_path, # type: str + vep_num_fork, # type: int + pipeline_args, # type: List[str] + watchdog_file, # type: Optional[str] + watchdog_file_update_interval_seconds, # type: int + ): + # type: (...) -> None + """Constructs an instance for running VEP. + + Note that external users of this class can use create_runner_and_update_args + function of this module to create an instance of this class from flags. + + Args: + location: The Batch API location to use. + input_pattern: The pattern to identify all input files. + output_dir: The location for all output files. This is expected not to + exist and is created in the process of running VEP jobs. + vep_image_uri: The URI of the image that contains VEP. + vep_cache_path: The URI for the cache file on GCS. + vep_num_fork: The value of the --fork argument for running VEP. + pipeline_args: The list of arguments that are meant to be used when + running Beam; for simplicity we use the same arguments to decide how + many and what type of workers to use, where to run, etc. + watchdog_file: The file that will be updated by the Dataflow worker every + `watchdog_file_update_interval_seconds`. Once the file is found to be + stale, the VEP process running in the VM will be killed. + watchdog_file_update_interval_seconds: The `watchdog_file` will be updated + by the Dataflow worker every `watchdog_file_update_interval_seconds`. + """ + self._pipeline_service = pipeline_service + self._location = location + self._species = species + self._assembly = assembly + self._vep_image_uri = vep_image_uri + self._vep_cache_path = self._make_vep_cache_path(vep_cache_path) + self._vep_num_fork = vep_num_fork + self._input_pattern = input_pattern + self._output_dir = output_dir + self._vep_info_field = vep_info_field + self._process_pipeline_args(pipeline_args) + self._watchdog_file = watchdog_file + self._watchdog_file_update_interval_seconds = ( + watchdog_file_update_interval_seconds + ) + self._running_operation_ids = [] # type: List[str] + self._operation_name_to_io_infos = {} + self._operation_name_to_logs = {} + self._max_num_workers = 10 + + def _make_vep_cache_path(self, vep_cache_path): + # type: (str) -> str + if not vep_cache_path: + vep_cache_path = VepRunner._VEP_CACHE_BASE.format( + species=self._species, assembly=self._assembly + ) + return vep_cache_path + + def get_output_pattern(self): + # type: () -> str + return vep_runner_util.get_output_pattern(self._output_dir) + + def _get_batch_job_definition(self): + # type: () -> Dict + """Returns a Batch v1 API job definition with fixed setup steps.""" + job_name = self._generate_job_name() + return { + "name": job_name, + "taskGroups": [ + { + "taskSpec": { + "runnables": [ + self._make_runnable( + self._vep_image_uri, + "mkdir", + "-p", + "/mnt/disks/vep/vep_cache", + ), + self._make_runnable( + _GSUTIL_IMAGE, + "sh", + "-c", + f"gsutil cp {self._vep_cache_path} /mnt/disks/vep/vep_cache/{_get_base_name(self._vep_cache_path)} 2>&1", + ), + ], + "environment": { + "variables": { + "GENOME_ASSEMBLY": self._assembly, + "SPECIES": self._species, + "VEP_CACHE": "/mnt/disks/vep/vep_cache/{}".format( + _get_base_name(self._vep_cache_path) + ), + "NUM_FORKS": str(self._vep_num_fork), + "VCF_INFO_FILED": self._vep_info_field, + "OTHER_VEP_OPTS": "--everything --check_ref --allow_non_variant --format vcf", + } + }, + "volumes": [ + { + "deviceName": "vep", + "mountPath": "/mnt/disks/vep", + "mountOptions": "rw,async", + } + ], + "computeResource": { + "cpuMilli": 4000, + "memoryMib": 8192, + "bootDiskMib": _MINIMUM_DISK_SIZE_GB, + }, + "maxRunDuration": "3600s", + }, + "taskCount": 1, + "parallelism": 1, + } ], - 'environment': { - 'GENOME_ASSEMBLY': self._assembly, - 'SPECIES': self._species, - 'VEP_CACHE': '/mnt/vep/vep_cache/{}'.format( - _get_base_name(self._vep_cache_path)), - 'NUM_FORKS': str(self._vep_num_fork), - 'VCF_INFO_FILED': self._vep_info_field, - # TODO(bashir2): Decide how to do proper reference validation, - # the following --check_ref just drops variants that have - # wrong REF. If there are too many of them, it indicates that - # VEP database for a wrong reference sequence is being used - # and this has to caught and communicated to the user. - 'OTHER_VEP_OPTS': - '--everything --check_ref --allow_non_variant --format vcf', - }, - 'resources': { - 'virtualMachine': { - 'disks': [ - { - 'name': 'vep', - 'sizeGb': _MINIMUM_DISK_SIZE_GB - } - ], - 'machineType': self._machine_type, - # TODO(bashir2): Add the option of using preemptible - # machines and the retry functionality. - 'preemptible': False, - 'serviceAccount': { - 'email': self._service_account, - 'scopes': [ - 'https://www.googleapis.com/auth/' - 'devstorage.read_write'] - } + "allocationPolicy": { + "serviceAccount": { + "email": self._service_account, + "scopes": ["https://www.googleapis.com/auth/devstorage.read_write"], }, - 'regions': [self._region] - } + "instances": [ + { + "policy": { + "machineType": self._machine_type, + "disks": [ + { + "newDisk": { + "sizeGb": _MINIMUM_DISK_SIZE_GB, + "type": "pd-ssd", + }, + "deviceName": "vep", + } + ], + }, + } + ], + }, + "logsPolicy": { + "destination": "CLOUD_LOGGING" + # "destination": "PATH", + # "logsPath": "/mnt/disks/vep/logs", + }, + "labels": {"job": "vep-setup"}, } - } - - def _make_action(self, image_uri, *args, **kwargs): - # type: (str, *str, **List[str]) -> Dict - command_args = list(args) - action = { - 'commands': command_args, - 'imageUri': image_uri, - 'mounts': [{'disk': 'vep', 'path': '/mnt/vep'}], - 'alwaysRun': True - } - action.update(kwargs) - # TODO(bashir2): Add a proper `label` based on command arguments. - return action - - def _process_pipeline_args(self, pipeline_args): - # type: (List[str]) -> None - flags_dict = pipeline_options.PipelineOptions( - pipeline_args).get_all_options() - self._project = self._get_flag(flags_dict, 'project') - self._region = self._get_flag(flags_dict, 'region') - # TODO(bahsir2): Fix the error messages of _check_flag since - # --worker_machine_type has dest='machine_type'. - try: - self._machine_type = self._get_flag(flags_dict, 'machine_type') - except ValueError: - self._machine_type = self._get_machine_type_from_fork() - self._max_num_workers = self._get_flag( - flags_dict, 'max_num_workers', 'num_workers') - if self._max_num_workers <= 0: - raise ValueError( - '--max_num_workers and --num_workers should be positive numbers, ' - 'got: {}'.format(self._max_num_workers)) - try: - self._service_account = self._get_flag( - flags_dict, 'service_account_email') - except ValueError: - self._service_account = 'default' - - def _get_flag(self, pipeline_flags, *expected_flags): - # type: (Dict[str, Any], str) -> Any - for flag in expected_flags: - if flag in pipeline_flags and pipeline_flags[flag]: - logging.info('Using %s flag for annotation run: %s.', - flag, pipeline_flags[flag]) - return pipeline_flags[flag] - raise ValueError('Could not find any of {} among pipeline flags {}'.format( - expected_flags, pipeline_flags)) - - def _get_machine_type_from_fork(self): - # type: () -> str - if self._vep_num_fork == 1: - return 'n1-standard-1' - elif self._vep_num_fork == 2: - return 'n1-standard-2' - elif self._vep_num_fork <= 4: - return 'n1-standard-4' - elif self._vep_num_fork <= 8: - return 'n1-standard-8' - else: - # This is just a heuristic since after a certain point having more cores - # does not help VEP performance much more because of its file I/O. - return 'n1-standard-16' - - def wait_until_done(self): - # type: () -> None - """Polls currently running operations and waits until all are done. - - In case of failure, retry the operation for at most - `_NUMBER_OF_API_CALL_RETRIES` times. - """ - num_retries = 0 - while num_retries < _NUMBER_OF_API_CALL_RETRIES: - if not self._running_operation_ids: - return - self._running_operation_ids = self._wait_and_retry_operations() - num_retries += 1 - raise RuntimeError('Annotations for the input {} failed after {} ' - 'retries.'.format(self._input_pattern, - _NUMBER_OF_API_CALL_RETRIES)) + def _make_runnable(self, image_uri, *args, **kwargs): + # type: (str, str, *str, **Any) -> Dict + """Creates a Batch API Runnable dictionary.""" + runnable = { + "container": { + "imageUri": image_uri, + "volumes": ["/mnt/disks/vep:/mnt/disks/vep"], + "commands": list(args), + }, + } - def _wait_and_retry_operations(self): - # type: () -> Optional(List[str]) - """Waits currently running operations and retries when the operation failed. + runnable.update(kwargs) + return runnable + + def _process_pipeline_args(self, pipeline_args): + # type: (List[str]) -> None + flags_dict = pipeline_options.PipelineOptions(pipeline_args).get_all_options() + self._project = self._get_flag(flags_dict, "project") + # TODO(bahsir2): Fix the error messages of _check_flag since + # --worker_machine_type has dest='machine_type'. + try: + self._machine_type = self._get_flag(flags_dict, "machine_type") + except ValueError: + self._machine_type = self._get_machine_type_from_fork() + self._max_num_workers = self._get_flag( + flags_dict, "max_num_workers", "num_workers" + ) + if self._max_num_workers <= 0: + raise ValueError( + "--max_num_workers and --num_workers should be positive numbers, " + "got: {}".format(self._max_num_workers) + ) + try: + self._service_account = self._get_flag(flags_dict, "service_account_email") + except ValueError: + self._service_account = "default" + + def _get_flag(self, pipeline_flags, *expected_flags): + # type: (Dict[str, Any], str) -> Any + for flag in expected_flags: + if flag in pipeline_flags and pipeline_flags[flag]: + logging.info( + "Using %s flag for annotation run: %s.", flag, pipeline_flags[flag] + ) + return pipeline_flags[flag] + raise ValueError( + "Could not find any of {} among pipeline flags {}".format( + expected_flags, pipeline_flags + ) + ) + + def _generate_job_name(self): + return f"vep-job-{uuid.uuid4().hex[:8]}" + + def _get_machine_type_from_fork(self): + # type: () -> str + if self._vep_num_fork == 1: + return "n1-standard-1" + elif self._vep_num_fork == 2: + return "n1-standard-2" + elif self._vep_num_fork <= 4: + return "n1-standard-4" + elif self._vep_num_fork <= 8: + return "n1-standard-8" + else: + # This is just a heuristic since after a certain point having more cores + # does not help VEP performance much more because of its file I/O. + return "n1-standard-16" + + def wait_until_done(self): + # type: () -> None + """Polls currently running operations and waits until all are done. + + In case of failure, retry the operation for at most + `_NUMBER_OF_API_CALL_RETRIES` times. + """ + num_retries = 0 + while num_retries < _NUMBER_OF_API_CALL_RETRIES: + if not self._running_operation_ids: + return + self._running_operation_ids = self._wait_and_retry_operations() + num_retries += 1 + + raise RuntimeError( + "Annotations for the input {} failed after {} " + "retries.".format(self._input_pattern, _NUMBER_OF_API_CALL_RETRIES) + ) + + def _wait_and_retry_operations(self): + # type: () -> Optional(List[str]) + """Waits currently running operations and retries when the operation failed. + + Args: + running_operation_ids: currently running PAPI operation ids. + + Returns: + A list of retry operation ids. + """ + retry_operation_ids = [] + for operation in self._running_operation_ids: + while not self._is_done(operation): + time.sleep(_POLLING_INTERVAL_SECONDS) + error_message = self._get_error_message(operation) + if error_message: + new_operation_name = self._retry_operation(operation, error_message) + retry_operation_ids.append(new_operation_name) + return retry_operation_ids + + def _retry_operation(self, operation, error_message): + # type: (str, str) -> str + """Returns retry operation id.""" + io_infos = self._operation_name_to_io_infos.get(operation) + logs = self._operation_name_to_logs.get(operation) + retry_logs = logs + "retry" + retry_operation_id = self._call_pipelines_api(io_infos) + self._operation_name_to_io_infos.update({retry_operation_id: io_infos}) + self._operation_name_to_logs.update({retry_operation_id: retry_logs}) + + logging.warning( + "Annotation job failed for the operation %s with error: " + "%s. Please check the log file (%s) for more information." + "Retrying with operation id %s.", + operation, + error_message, + logs, + retry_operation_id, + ) + return retry_operation_id + + def _is_done(self, operation): + # type: (str) -> bool + # TODO(bashir2): Silence the log messages of googleapiclient.discovery + # module for the next call of the API since they flood the log file. + # pylint: disable=no-member + request = ( + self._pipeline_service.projects().locations().jobs().get(name=operation) + ) + job = request.execute(num_retries=_NUMBER_OF_API_CALL_RETRIES) + state = job.get("status", {}).get("state") + # TODO(bashir2): Add better monitoring and log progress within each + # operation instead of just checking `done`. + logging.info("Job %s state: %s", operation, state) + return state in ("SUCCEEDED", "FAILED", "DELETION_IN_PROGRESS") + + def _get_error_message(self, operation): + request = ( + self._pipeline_service.projects().locations().jobs().get(name=operation) + ) + job = request.execute(num_retries=_NUMBER_OF_API_CALL_RETRIES) + if job.get("status", {}).get("state") == "FAILED": + events = job["status"].get("statusEvents", []) + if events: + return events[-1].get("description", "No detailed error message.") + return "Job failed without detailed error." + return "" + + def run_on_all_files(self): + # type: () -> None + """Runs VEP on all input files. + + The input files structure is recreated under `self._output_dir` and each + output file will have `_VEP_OUTPUT_SUFFIX`. + """ + if self._running_operation_ids: + raise AssertionError( + "There are already {} operations running.".format( + len(self._running_operation_ids) + ) + ) + logging.info("Finding all files that match %s", self._input_pattern) + match_results = filesystems.FileSystems.match( + [self._input_pattern] + ) # type: List[filesystem.MatchResult] + if not match_results: + raise ValueError( + "No files matched input_pattern: {}".format(self._input_pattern) + ) + logging.info("Number of files: %d", len(match_results[0].metadata_list)) + vm_io_info = vep_runner_util.get_all_vm_io_info( + match_results[0].metadata_list, self._output_dir, self._max_num_workers + ) + for vm_index, io_info in enumerate(vm_io_info): + output_log_path = self._get_output_log_path(self._output_dir, vm_index) + operation_name = self._call_pipelines_api(io_info) + self._operation_name_to_io_infos.update({operation_name: io_info}) + self._operation_name_to_logs.update({operation_name: output_log_path}) + + logging.info( + "Started operation %s on VM %d processing %d input files", + operation_name, + vm_index, + len(io_info.io_map), + ) + self._running_operation_ids.append(operation_name) + + def _call_pipelines_api(self, io_infos): + # type: (vep_runner_util.SingleWorkerActions, str) -> str + api_request = self._get_batch_job_definition() + size_gb = io_infos.disk_size_bytes // (1 << 30) + api_request["allocationPolicy"]["instances"][0]["policy"]["disks"][0][ + "newDisk" + ]["sizeGb"] = (size_gb + _MINIMUM_DISK_SIZE_GB) + for input_file, output_file in io_infos.io_map.items(): + api_request[_API_TASKGROUPS][0][_API_TASKSPEC][_API_RUNNABLES].extend( + self._create_runnables(input_file, output_file) + ) + + # pylint: disable=no-member + parent = "projects/{}/locations/{}".format(self._project, self._location) + request = ( + self._pipeline_service.projects() + .locations() + .jobs() + .create(parent=parent, body=api_request) + ) + operation_name = request.execute(num_retries=_NUMBER_OF_API_CALL_RETRIES)[ + "name" + ] + return operation_name + + def _check_and_write_to_output_dir(self, output_dir): + # type: (str) -> None + real_dir = vep_runner_util.format_dir_path(output_dir) + # NOTE(bashir2): We cannot use exists() because for example on GCS, the + # directory names are only symbolic and are not physical files. + match_results = filesystems.FileSystems.match(["{}*".format(real_dir)]) + if match_results and match_results[0].metadata_list: + raise ValueError("Output directory {} already exists.".format(real_dir)) + log_file = filesystems.FileSystems.create( + filesystems.FileSystems.join(output_dir, _GLOBAL_LOG_FILE) + ) + # TODO(bashir2): Instead of just creating an empty file, log some + # information about how the VEP pipelines are executed. + log_file.close() + + def _get_output_log_path(self, output_dir, vm_index): + # type: (str, int) -> str + log_filename = "output_VM_{}.log".format(vm_index) + return filesystems.FileSystems.join(output_dir, "logs", log_filename) + + def _create_runnables(self, input_file: str, output_file: str) -> list: + """Creates a list of Batch v1 `runnables` for processing one input/output pair.""" + base_input = _get_base_name(input_file) + local_input_file = "/mnt/disks/vep/{}".format(_get_base_name(input_file)) + print(local_input_file) + runnables = [] + + runnables.append( + self._make_runnable( + _GSUTIL_IMAGE, + "sh", + "-c", + f"gsutil cp {input_file} {local_input_file} 2>&1", + ) + ) + + runnables.append( + self._make_runnable( + self._vep_image_uri, "rm", "-r", "-f", _LOCAL_OUTPUT_DIR + ) + ) + # TODO(nhon): Add watchdog + runnables.append( + self._make_runnable( + self._vep_image_uri, + _VEP_RUN_SCRIPT, + local_input_file, + _LOCAL_OUTPUT_FILE, + ) + ) + + # if self._watchdog_file: + # runnables.append(self._make_runnable( + # self._vep_image_uri, + # _WATCHDOG_RUNNER_SCRIPT, + # _VEP_RUN_SCRIPT, + # str(self._watchdog_file_update_interval_seconds), + # self._watchdog_file, + # local_input_file, + # _LOCAL_OUTPUT_FILE + # )) + # else: + # runnables.append(self._make_runnable( + # self._vep_image_uri, + # _VEP_RUN_SCRIPT, + # local_input_file, + # _LOCAL_OUTPUT_FILE + # )) + + runnables.append( + self._make_runnable( + _GSUTIL_IMAGE, + "sh", + "-c", + f"gsutil cp {_LOCAL_OUTPUT_FILE} {output_file} 2>&1", + ) + ) + + return runnables + + def _create_actions(self, input_file, output_file): + # type: (str, str) -> List + local_input_file = "/mnt/vep/{}".format(_get_base_name(input_file)) + if self._watchdog_file: + action = self._make_action( + self._vep_image_uri, + _WATCHDOG_RUNNER_SCRIPT, + _VEP_RUN_SCRIPT, + str(self._watchdog_file_update_interval_seconds), + self._watchdog_file, + local_input_file, + _LOCAL_OUTPUT_FILE, + ) + else: + action = self._make_action( + self._vep_image_uri, + _VEP_RUN_SCRIPT, + local_input_file, + _LOCAL_OUTPUT_FILE, + ) + return [ + self._make_action( + _GSUTIL_IMAGE, "gsutil", "cp", input_file, local_input_file + ), + self._make_action(self._vep_image_uri, "rm", "-r", "-f", _LOCAL_OUTPUT_DIR), + action, + # TODO(bashir2): When the output files are local, the output directory + # structure should be created as well otherwise gsutil fails. + self._make_action( + _GSUTIL_IMAGE, "gsutil", "cp", _LOCAL_OUTPUT_FILE, output_file + ), + ] - Args: - running_operation_ids: currently running PAPI operation ids. - Returns: - A list of retry operation ids. - """ - retry_operation_ids = [] - for operation in self._running_operation_ids: - while not self._is_done(operation): - time.sleep(_POLLING_INTERVAL_SECONDS) - error_message = self._get_error_message(operation) - if error_message: - new_operation_name = self._retry_operation(operation, error_message) - retry_operation_ids.append(new_operation_name) - return retry_operation_ids - - def _retry_operation(self, operation, error_message): - # type: (str, str) -> str - """Returns retry operation id.""" - io_infos = self._operation_name_to_io_infos.get(operation) - logs = self._operation_name_to_logs.get(operation) - retry_logs = logs + 'retry' - retry_operation_id = self._call_pipelines_api(io_infos, retry_logs) - self._operation_name_to_io_infos.update({retry_operation_id: io_infos}) - self._operation_name_to_logs.update({retry_operation_id: retry_logs}) - - logging.warning('Annotation job failed for the operation %s with error: ' - '%s. Please check the log file (%s) for more information.' - 'Retrying with operation id %s.', operation, - error_message, logs, retry_operation_id) - return retry_operation_id - - def _is_done(self, operation): - # type: (str) -> bool - # TODO(bashir2): Silence the log messages of googleapiclient.discovery - # module for the next call of the API since they flood the log file. - # pylint: disable=no-member - request = self._pipeline_service.projects().locations().operations().get( - name=operation) - is_done = request.execute(num_retries=_NUMBER_OF_API_CALL_RETRIES).get( - 'done') - # TODO(bashir2): Add better monitoring and log progress within each - # operation instead of just checking `done`. - if is_done: - logging.info('Operation %s is done.', operation) - return is_done - - def _get_error_message(self, operation): - request = self._pipeline_service.projects().locations().operations().get( - name=operation) - try: - errors = request.execute(num_retries=_NUMBER_OF_API_CALL_RETRIES)['error'] - return errors['message'] - except KeyError: - logging.info('Operation %s is succeed.', operation) - return '' - - def run_on_all_files(self): - # type: () -> None - """Runs VEP on all input files. - - The input files structure is recreated under `self._output_dir` and each - output file will have `_VEP_OUTPUT_SUFFIX`. - """ - if self._running_operation_ids: - raise AssertionError('There are already {} operations running.'.format( - len(self._running_operation_ids))) - logging.info('Finding all files that match %s', self._input_pattern) - match_results = filesystems.FileSystems.match( - [self._input_pattern]) # type: List[filesystem.MatchResult] - if not match_results: - raise ValueError('No files matched input_pattern: {}'.format( - self._input_pattern)) - logging.info('Number of files: %d', len(match_results[0].metadata_list)) - vm_io_info = vep_runner_util.get_all_vm_io_info( - match_results[0].metadata_list, self._output_dir, self._max_num_workers) - for vm_index, io_info in enumerate(vm_io_info): - output_log_path = self._get_output_log_path(self._output_dir, vm_index) - operation_name = self._call_pipelines_api(io_info, output_log_path) - self._operation_name_to_io_infos.update({operation_name: io_info}) - self._operation_name_to_logs.update({operation_name: output_log_path}) - - logging.info('Started operation %s on VM %d processing %d input files', - operation_name, vm_index, len(io_info.io_map)) - self._running_operation_ids.append(operation_name) - - def _call_pipelines_api(self, io_infos, output_log_path): - # type: (vep_runner_util.SingleWorkerActions, str) -> str - api_request = self._get_api_request_fixed_parts() - size_gb = io_infos.disk_size_bytes // (1 << 30) - api_request[_API_PIPELINE]['resources'][ - 'virtualMachine']['disks'][0]['sizeGb'] = ( - size_gb + _MINIMUM_DISK_SIZE_GB) - for input_file, output_file in io_infos.io_map.items(): - api_request[_API_PIPELINE][_API_ACTIONS].extend( - self._create_actions(input_file, output_file)) - api_request[_API_PIPELINE][_API_ACTIONS].append( - self._make_action(_GSUTIL_IMAGE, 'gsutil', '-q', 'cp', - '/google/logs/output', - output_log_path)) - # pylint: disable=no-member - parent = 'projects/{}/locations/{}'.format(self._project, self._location) - request = self._pipeline_service.projects().locations().pipelines().run( - parent=parent, body=api_request) - operation_name = request.execute( - num_retries=_NUMBER_OF_API_CALL_RETRIES)['name'] - return operation_name - - def _check_and_write_to_output_dir(self, output_dir): - # type: (str) -> None - real_dir = vep_runner_util.format_dir_path(output_dir) - # NOTE(bashir2): We cannot use exists() because for example on GCS, the - # directory names are only symbolic and are not physical files. - match_results = filesystems.FileSystems.match(['{}*'.format(real_dir)]) - if match_results and match_results[0].metadata_list: - raise ValueError('Output directory {} already exists.'.format(real_dir)) - log_file = filesystems.FileSystems.create( - filesystems.FileSystems.join(output_dir, _GLOBAL_LOG_FILE)) - # TODO(bashir2): Instead of just creating an empty file, log some - # information about how the VEP pipelines are executed. - log_file.close() - - def _get_output_log_path(self, output_dir, vm_index): - # type: (str, int) -> str - log_filename = 'output_VM_{}.log'.format(vm_index) - return filesystems.FileSystems.join(output_dir, 'logs', log_filename) - - def _create_actions(self, input_file, output_file): - # type: (str, str) -> List - local_input_file = '/mnt/vep/{}'.format(_get_base_name(input_file)) - if self._watchdog_file: - action = self._make_action( - self._vep_image_uri, - _WATCHDOG_RUNNER_SCRIPT, - _VEP_RUN_SCRIPT, - str(self._watchdog_file_update_interval_seconds), - self._watchdog_file, - local_input_file, - _LOCAL_OUTPUT_FILE) - else: - action = self._make_action(self._vep_image_uri, - _VEP_RUN_SCRIPT, - local_input_file, - _LOCAL_OUTPUT_FILE) - return [ - self._make_action(_GSUTIL_IMAGE, 'gsutil', '-q', 'cp', input_file, - local_input_file), - self._make_action(self._vep_image_uri, 'rm', '-r', '-f', - _LOCAL_OUTPUT_DIR), - action, - # TODO(bashir2): When the output files are local, the output directory - # structure should be created as well otherwise gsutil fails. - self._make_action(_GSUTIL_IMAGE, 'gsutil', '-q', 'cp', - _LOCAL_OUTPUT_FILE, output_file)] +def _get_base_name(file_path): + # type: (str) -> str + """Used when we want to copy files to local machines. + Keeping the file names, gives more context to actions. For example if + `file_path` is 'gs://my_bucket/my_input.vcf', tis returns 'my_input.vcf'. -def _get_base_name(file_path): - # type: (str) -> str - """Used when we want to copy files to local machines. - - Keeping the file names, gives more context to actions. For example if - `file_path` is 'gs://my_bucket/my_input.vcf', tis returns 'my_input.vcf'. - - Returns: - The basename of the input `file_path`. - """ - _, base_path = filesystems.FileSystems.split(file_path) - if not base_path: - raise ValueError('Cannot extract base path from the input path {}'.format( - file_path)) - return base_path + Returns: + The basename of the input `file_path`. + """ + _, base_path = filesystems.FileSystems.split(file_path) + if not base_path: + raise ValueError( + "Cannot extract base path from the input path {}".format(file_path) + ) + return base_path diff --git a/gcp_variant_transforms/libs/annotation/vep/vep_runner_test.py b/gcp_variant_transforms/libs/annotation/vep/vep_runner_test.py index 935ff266..627f393f 100644 --- a/gcp_variant_transforms/libs/annotation/vep/vep_runner_test.py +++ b/gcp_variant_transforms/libs/annotation/vep/vep_runner_test.py @@ -27,219 +27,264 @@ from gcp_variant_transforms.libs.annotation.vep import vep_runner -_INPUT_PATTERN = 'some/input/pattern*' +_INPUT_PATTERN = "some/input/pattern*" _INPUT_FILES_WITH_SIZE = [ - ('some/input/pattern/a', 100), - ('some/input/pattern/b', 100), - ('some/input/pattern/c', 100), - ('some/input/pattern/dir1/a', 100), - ('some/input/pattern/dir1/dir2/b', 100), - ('some/input/pattern/dir2/b', 100), - ('some/input/pattern/dir2/c', 100), + ("some/input/pattern/a", 100), + ("some/input/pattern/b", 100), + ("some/input/pattern/c", 100), + ("some/input/pattern/dir1/a", 100), + ("some/input/pattern/dir1/dir2/b", 100), + ("some/input/pattern/dir2/b", 100), + ("some/input/pattern/dir2/c", 100), ] -_SPECIES = 'homo_sapiens' -_ASSEMBLY = 'GRCh38' -_OUTPUT_DIR = 'gs://output/dir' -_VEP_INFO_FIELD = 'TEST_FIELD' -_IMAGE = 'gcr.io/image' -_CACHE = 'gs://path/to/cache' +_SPECIES = "homo_sapiens" +_ASSEMBLY = "GRCh38" +_OUTPUT_DIR = "gs://output/dir" +_VEP_INFO_FIELD = "TEST_FIELD" +_IMAGE = "gcr.io/image" +_CACHE = "gs://path/to/cache" _NUM_FORK = 8 -_PROJECT = 'test-project' -_LOCATION = 'test-location' -_REGION = 'test-region' - +_PROJECT = "test-project" +_LOCATION = "test-location" +_REGION = "test-region" +_NUM_RUNNABLES = 6 class _MockFileSystems(filesystems.FileSystems): - """This inherits from FileSystems such that most functions behave the same.""" - - @staticmethod - def match(patterns, limits=None): - if len(patterns) == 1 and patterns[0] == _INPUT_PATTERN: - return [mock.Mock( - metadata_list=[file_metadata_stub.FileMetadataStub(path, size) for - (path, size) in _INPUT_FILES_WITH_SIZE])] - return [] - - @staticmethod - def create(path, mime_type=None, compression_type=None): - """Overriding `create` to remove any interaction with real file systems.""" - return mock.Mock() + """This inherits from FileSystems such that most functions behave the same.""" + + @staticmethod + def match(patterns, limits=None): + if len(patterns) == 1 and patterns[0] == _INPUT_PATTERN: + return [ + mock.Mock( + metadata_list=[ + file_metadata_stub.FileMetadataStub(path, size) + for (path, size) in _INPUT_FILES_WITH_SIZE + ] + ) + ] + return [] + + @staticmethod + def create(path, mime_type=None, compression_type=None): + """Overriding `create` to remove any interaction with real file systems.""" + return mock.Mock() class VepRunnerTest(unittest.TestCase): - def setUp(self): - self._mock_service = mock.MagicMock() - self._mock_request = mock.MagicMock() - self._mock_projects = mock.MagicMock() - self._mock_locations = mock.MagicMock() - self._mock_service.projects = mock.MagicMock( - return_value=self._mock_projects) - self._mock_projects.locations = mock.MagicMock( - return_value=self._mock_locations) - self._pipelines_spy = PipelinesSpy(self._mock_request) - self._mock_locations.pipelines = mock.MagicMock( - return_value=self._pipelines_spy) - self._mock_request.execute = mock.MagicMock( - return_value={'name': 'operation'}) - - def _create_test_instance(self, pipeline_args=None): - # type: (List[str]) -> vep_runner.VepRunner - test_object = vep_runner.VepRunner( - self._mock_service, _LOCATION, _ASSEMBLY, _SPECIES, - _INPUT_PATTERN, _OUTPUT_DIR, - _VEP_INFO_FIELD, _IMAGE, _CACHE, _NUM_FORK, - pipeline_args or self._get_pipeline_args(), None, 30) - return test_object - - def _get_pipeline_args(self, num_workers=1): - return ['--project', _PROJECT, - '--region', _REGION, - '--max_num_workers', str(num_workers), - '--worker_machine_type', 'n1-standard-8', - ] - - def test_instantiation(self): - """This is just to test object construction.""" - self._create_test_instance() - - def test_instantiation_bad_pipeline_options(self): - """This is to test object construction fails with incomplete arguments.""" - with self.assertRaisesRegex(ValueError, '.*project.*'): - self._create_test_instance(pipeline_args=['no_arguments']) - - def test_make_vep_cache_path(self): - test_instance = self._create_test_instance() - self.assertEqual(test_instance._vep_cache_path, _CACHE) - test_instance = vep_runner.VepRunner( - self._mock_service, _LOCATION, _SPECIES, _ASSEMBLY, _INPUT_PATTERN, - _OUTPUT_DIR, _VEP_INFO_FIELD, _IMAGE, '', _NUM_FORK, - self._get_pipeline_args(), None, 30) - self.assertEqual(test_instance._vep_cache_path, - ('gs://cloud-lifesciences/vep/' - 'vep_cache_homo_sapiens_GRCh38_104.tar.gz')) - test_instance = vep_runner.VepRunner( - self._mock_service, _LOCATION, 'mus_musculus', 'GRCm39', _INPUT_PATTERN, - _OUTPUT_DIR, _VEP_INFO_FIELD, _IMAGE, '', _NUM_FORK, - self._get_pipeline_args(), None, 30) - self.assertEqual(test_instance._vep_cache_path, - ('gs://cloud-lifesciences/vep/' - 'vep_cache_mus_musculus_GRCm39_104.tar.gz')) - - def test_get_output_pattern(self): - output_pattern = self._create_test_instance().get_output_pattern() - self.assertEqual(output_pattern, _OUTPUT_DIR + '/**_vep_output.vcf') - - def test_get_output_log_path(self): - test_instance = self._create_test_instance() - log_path = test_instance._get_output_log_path(_OUTPUT_DIR, 0) - self.assertEqual(log_path, _OUTPUT_DIR + '/logs/output_VM_0.log') - log_path = test_instance._get_output_log_path(_OUTPUT_DIR + '/', 0) - self.assertEqual(log_path, _OUTPUT_DIR + '/logs/output_VM_0.log') - - def _validate_run_for_all_files(self): - self._pipelines_spy.validate_calls([f[0] for f in _INPUT_FILES_WITH_SIZE]) - - def test_run_on_all_files(self): - num_workers = len(_INPUT_FILES_WITH_SIZE) // 2 + 1 - test_instance = self._create_test_instance( - self._get_pipeline_args(num_workers)) - with patch('apache_beam.io.filesystems.FileSystems', _MockFileSystems): - test_instance.run_on_all_files() - self.assertEqual(self._pipelines_spy.num_run_calls(), num_workers) - self._validate_run_for_all_files() - - def test_run_on_all_files_with_more_workers(self): - num_workers = len(_INPUT_FILES_WITH_SIZE) + 5 - test_instance = self._create_test_instance( - self._get_pipeline_args(num_workers)) - with patch('apache_beam.io.filesystems.FileSystems', _MockFileSystems): - test_instance.run_on_all_files() - self.assertEqual(self._pipelines_spy.num_run_calls(), - len(_INPUT_FILES_WITH_SIZE)) - self._validate_run_for_all_files() - - def test_wait_until_done(self): - mock_projects = mock.MagicMock() - mock_locations = mock.MagicMock() - mock_opearations = mock.MagicMock() - mock_request = mock.MagicMock() - self._mock_service.projects = mock.MagicMock(return_value=mock_projects) - mock_projects.locations = mock.MagicMock(return_value=mock_locations) - mock_locations.operations = mock.MagicMock(return_value=mock_opearations) - mock_opearations.get = mock.MagicMock(return_value=mock_request) - mock_request.execute = mock.MagicMock( - return_value={'done': True, 'error': {}}) - test_instance = self._create_test_instance() - with patch('apache_beam.io.filesystems.FileSystems', _MockFileSystems): - test_instance.run_on_all_files() - with self.assertRaisesRegex(AssertionError, '.*already.*running.*'): - # Since there are running operations, the next call raises an exception. - test_instance.run_on_all_files() - test_instance.wait_until_done() - # Since all operations are done, the next call should raise no exceptions. - test_instance.run_on_all_files() - - def test_wait_until_done_fail(self): - mock_projects = mock.MagicMock() - mock_locations = mock.MagicMock() - mock_opearations = mock.MagicMock() - mock_request = mock.MagicMock() - self._mock_service.projects = mock.MagicMock(return_value=mock_projects) - mock_projects.locations = mock.MagicMock(return_value=mock_locations) - mock_locations.operations = mock.MagicMock(return_value=mock_opearations) - mock_opearations.get = mock.MagicMock(return_value=mock_request) - mock_request.execute = mock.MagicMock(return_value={ - 'done': True, 'error': {'message': 'failed'}}) - - test_instance = self._create_test_instance() - with patch('apache_beam.io.filesystems.FileSystems', _MockFileSystems): - test_instance.run_on_all_files() - with self.assertRaisesRegex(RuntimeError, '.*failed.*retries.*'): - test_instance.wait_until_done() - - -class PipelinesSpy(): - """A class to intercept calls to the run() function of Pipelines API.""" - - def __init__(self, mock_request): - # type: (mock.Mock) -> None - self._actions_list = [] - self._mock_request = mock_request - - def run(self, parent=None, body=None): - assert parent - assert body - self._actions_list.append(body['pipeline']['actions']) - return self._mock_request - - def num_run_calls(self): - return len(self._actions_list) - - def validate_calls(self, input_file_list): - # type: (List[str]) -> bool - input_file_set = set(input_file_list) - for one_call_actions in self._actions_list: - start_len = len(input_file_set) - for action in one_call_actions: - for command_part in action['commands']: - if command_part in input_file_set: - input_file_set.remove(command_part) - # Making sure that each actions for each call cover at least one file. - if start_len == len(input_file_set): - logging.error('None of input files appeared in %s or it was repeated', - str(one_call_actions)) - logging.error('List of remaining files: %s', str(input_file_set)) - return False - if input_file_set: - logging.error('Never ran on these files: %s', str(input_file_set)) - return False - return True + def setUp(self): + self._mock_service = mock.MagicMock() + self._mock_request = mock.MagicMock() + self._mock_projects = mock.MagicMock() + self._mock_locations = mock.MagicMock() + self._mock_jobs = mock.MagicMock() + self._mocked_batch_api = MockedBatchAPI(self._mock_request) + self._mock_service.projects = mock.MagicMock(return_value=self._mock_projects) + self._mock_projects.locations = mock.MagicMock( + return_value=self._mock_locations + ) + self._mock_locations.jobs = mock.MagicMock(return_value=self._mocked_batch_api) + + self._mock_request.execute = mock.MagicMock(return_value={"name": "operation"}) + + def _create_test_instance(self, pipeline_args=None): + # type: (List[str]) -> vep_runner.VepRunner + test_object = vep_runner.VepRunner( + self._mock_service, + _LOCATION, + _ASSEMBLY, + _SPECIES, + _INPUT_PATTERN, + _OUTPUT_DIR, + _VEP_INFO_FIELD, + _IMAGE, + _CACHE, + _NUM_FORK, + pipeline_args or self._get_pipeline_args(), + None, + 30, + ) + return test_object + + def _get_pipeline_args(self, num_workers=1): + return [ + "--project", + _PROJECT, + "--region", + _REGION, + "--max_num_workers", + str(num_workers), + "--worker_machine_type", + "n1-standard-8", + ] + + def test_instantiation(self): + """This is just to test object construction.""" + self._create_test_instance() + + def test_instantiation_bad_pipeline_options(self): + """This is to test object construction fails with incomplete arguments.""" + with self.assertRaisesRegex(ValueError, ".*project.*"): + self._create_test_instance(pipeline_args=["no_arguments"]) + + def test_make_vep_cache_path(self): + test_instance = self._create_test_instance() + self.assertEqual(test_instance._vep_cache_path, _CACHE) + test_instance = vep_runner.VepRunner( + self._mock_service, + _LOCATION, + _SPECIES, + _ASSEMBLY, + _INPUT_PATTERN, + _OUTPUT_DIR, + _VEP_INFO_FIELD, + _IMAGE, + "", + _NUM_FORK, + self._get_pipeline_args(), + None, + 30, + ) + self.assertEqual( + test_instance._vep_cache_path, + ("gs://variant-data/vep/" "vep_cache_homo_sapiens_GRCh38_104.tar.gz"), + ) + test_instance = vep_runner.VepRunner( + self._mock_service, + _LOCATION, + "mus_musculus", + "GRCm39", + _INPUT_PATTERN, + _OUTPUT_DIR, + _VEP_INFO_FIELD, + _IMAGE, + "", + _NUM_FORK, + self._get_pipeline_args(), + None, + 30, + ) + self.assertEqual( + test_instance._vep_cache_path, + ("gs://variant-data/vep/" "vep_cache_mus_musculus_GRCm39_104.tar.gz"), + ) + + def test_get_output_pattern(self): + output_pattern = self._create_test_instance().get_output_pattern() + self.assertEqual(output_pattern, _OUTPUT_DIR + "/**_vep_output.vcf") + + def test_get_output_log_path(self): + test_instance = self._create_test_instance() + log_path = test_instance._get_output_log_path(_OUTPUT_DIR, 0) + self.assertEqual(log_path, _OUTPUT_DIR + "/logs/output_VM_0.log") + log_path = test_instance._get_output_log_path(_OUTPUT_DIR + "/", 0) + self.assertEqual(log_path, _OUTPUT_DIR + "/logs/output_VM_0.log") + + def _validate_run_for_all_files(self): + self._mocked_batch_api.validate_calls([f[0] for f in _INPUT_FILES_WITH_SIZE]) + + def test_run_on_all_files(self): + num_runnables = len(_INPUT_FILES_WITH_SIZE) * _NUM_RUNNABLES + test_instance = self._create_test_instance( + self._get_pipeline_args() + ) + with patch("apache_beam.io.filesystems.FileSystems", _MockFileSystems): + test_instance.run_on_all_files() + self.assertEqual(self._mocked_batch_api.num_run_calls(), num_runnables) + self._validate_run_for_all_files() + + def test_run_on_all_files_with_more_workers(self): + test_instance = self._create_test_instance(self._get_pipeline_args()) + with patch("apache_beam.io.filesystems.FileSystems", _MockFileSystems): + test_instance.run_on_all_files() + self.assertEqual( + self._mocked_batch_api.num_run_calls(), len(_INPUT_FILES_WITH_SIZE) * _NUM_RUNNABLES + ) + self._validate_run_for_all_files() + + def test_wait_until_done(self): + mock_projects = mock.MagicMock() + mock_locations = mock.MagicMock() + mock_jobs = mock.MagicMock() + mock_request = mock.MagicMock() + self._mock_service.projects = mock.MagicMock(return_value=mock_projects) + mock_projects.locations = mock.MagicMock(return_value=mock_locations) + mock_locations.jobs = mock.MagicMock(return_value=mock_jobs) + mock_jobs.get = mock.MagicMock(return_value=mock_request) + mock_request.execute = mock.MagicMock(return_value={"status": {"state": "SUCCEEDED"}}) + test_instance = self._create_test_instance() + with patch("apache_beam.io.filesystems.FileSystems", _MockFileSystems): + test_instance.run_on_all_files() + with self.assertRaisesRegex(AssertionError, ".*already.*running.*"): + # Since there are running operations, the next call raises an exception. + test_instance.run_on_all_files() + test_instance.wait_until_done() + # Since all operations are done, the next call should raise no exceptions. + test_instance.run_on_all_files() + + def test_wait_until_done_fail(self): + mock_projects = mock.MagicMock() + mock_locations = mock.MagicMock() + mock_jobs = mock.MagicMock() + mock_request = mock.MagicMock() + self._mock_service.projects = mock.MagicMock(return_value=mock_projects) + mock_projects.locations = mock.MagicMock(return_value=mock_locations) + mock_locations.jobs = mock.MagicMock(return_value=mock_jobs) + mock_jobs.get = mock.MagicMock(return_value=mock_request) + mock_request.execute = mock.MagicMock( + return_value={"status": {"state": "FAILED"}} + ) + + test_instance = self._create_test_instance() + with patch("apache_beam.io.filesystems.FileSystems", _MockFileSystems): + test_instance.run_on_all_files() + with self.assertRaisesRegex(RuntimeError, ".*failed.*retries.*"): + test_instance.wait_until_done() + + +class MockedBatchAPI: + """A class to intercept calls to the run() function of Pipelines API.""" + + def __init__(self, mock_request): + # type: (mock.Mock) -> None + self._runnables_list = [] + self._mock_request = mock_request + + def create(self, parent=None, body=None): + assert parent + assert body + self._runnables_list.extend(body["taskGroups"][0]["taskSpec"]["runnables"]) + return self._mock_request + + def num_run_calls(self): + return len(self._runnables_list) + + def validate_calls(self, input_file_list): + # type: (List[str]) -> bool + input_file_set = set(input_file_list) + for one_call_actions in self._runnables_list: + start_len = len(input_file_set) + for command_part in one_call_actions["container"]["commands"]: + if command_part in input_file_set: + input_file_set.remove(command_part) + # Making sure that each actions for each call cover at least one file. + if start_len == len(input_file_set): + logging.error( + "None of input files appeared in %s or it was repeated", + str(one_call_actions), + ) + logging.error("List of remaining files: %s", str(input_file_set)) + return False + if input_file_set: + logging.error("Never ran on these files: %s", str(input_file_set)) + return False + return True class GetBaseNameTest(unittest.TestCase): - def test_get_base_name(self): - self.assertEqual('t.vcf', vep_runner._get_base_name('a/b/t.vcf')) - self.assertEqual('t.vcf', vep_runner._get_base_name('/a/b/t.vcf')) - self.assertEqual('t.vcf', vep_runner._get_base_name('gs://a/b/t.vcf')) - self.assertEqual('t.vcf', vep_runner._get_base_name('a/b/t.vcf')) + def test_get_base_name(self): + self.assertEqual("t.vcf", vep_runner._get_base_name("a/b/t.vcf")) + self.assertEqual("t.vcf", vep_runner._get_base_name("/a/b/t.vcf")) + self.assertEqual("t.vcf", vep_runner._get_base_name("gs://a/b/t.vcf")) + self.assertEqual("t.vcf", vep_runner._get_base_name("a/b/t.vcf")) diff --git a/gcp_variant_transforms/transforms/annotate_files.py b/gcp_variant_transforms/transforms/annotate_files.py index cf489c19..653433a6 100644 --- a/gcp_variant_transforms/transforms/annotate_files.py +++ b/gcp_variant_transforms/transforms/annotate_files.py @@ -52,7 +52,7 @@ def process(self, input_pattern): t = threading.Thread(target=self._annotate_files, args=(input_pattern, watchdog_file,)) t.start() - while t.isAlive(): + while t.is_alive(): with filesystems.FileSystems.create(watchdog_file) as file_to_write: file_to_write.write(b'Watchdog file.') time.sleep(_WATCHDOG_FILE_UPDATE_INTERVAL_SECONDS) diff --git a/requirements.txt b/requirements.txt index 97a30c4f..ed1b3bf0 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,15 +1,14 @@ -pysam<0.16.0 -cython>=0.28.1 -setuptools -#warning on apache-beam: https://cloud.google.com/dataflow/docs/guides/common-errors -apache-beam[gcp]==2.37.0 +pysam==0.23.0 +Cython==3.1.1 +setuptools==80.8.0 +apache-beam[gcp]==2.65.0 avro-python3==1.10.2 -google-cloud-core -google-api-python-client>=1.6,<1.7.12 -intervaltree>=2.1.0,<2.2.0 -mmh3<2.6 -mock==4.0.3 -google-cloud-storage -pyfarmhash -pyyaml==5.4.1 -nose>=1.0 +google-cloud-core==2.4.3 +google-api-python-client==2.169.0 +intervaltree==3.1.0 +mmh3==5.1.0 +mock==5.2.0 +google-cloud-storage==2.19.0 +pyfarmhash==0.4.0 +PyYAML==6.0.2 +nose==1.3.7 diff --git a/setup.py b/setup.py index 56d6c1ac..ee9063e0 100644 --- a/setup.py +++ b/setup.py @@ -52,10 +52,17 @@ class build(_build): # pylint: disable=invalid-name test_suite='nose.collector', packages=setuptools.find_packages(), - package_data={ - 'gcp_variant_transforms': ['gcp_variant_transforms/testing/testdata/*'] - }, - + install_requires=[ + 'pysam', + 'avro-python3', + 'intervaltree', + 'mmh', + 'mock', + 'pyfarmhash', + 'pyyaml', + 'nose', + 'cloudpickle==2.2.1', + ], cmdclass={ # Command class instantiated and run during pip install scenarios. 'build': build,