Skip to content

Commit

Permalink
Merge pull request #364 from MannLabs/separate_configs
Browse files Browse the repository at this point in the history
Addressed main points for distributed search:

- overall code clarity and robustness
- unifying quant directory naming, removing the name 'custom-quant-dir'
- Documentation and user guide in docs
- Handling of partial config files to prevent stale settings in default configs for first or second search
- reading the spectral libraries from one location instead of copying them to each chunk folder

This implementation was tested on 90 .raw files processed in 30 chunks.
  • Loading branch information
vbrennsteiner authored Nov 19, 2024
2 parents db55495 + 365e9eb commit 691af65
Show file tree
Hide file tree
Showing 19 changed files with 755 additions and 14 deletions.
50 changes: 48 additions & 2 deletions alphadia/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@
action="append",
default=[],
)

parser.add_argument(
"--config",
"-c",
Expand All @@ -83,7 +82,6 @@
nargs="?",
default=None,
)

parser.add_argument(
"--wsl",
"-w",
Expand All @@ -97,6 +95,13 @@
nargs="?",
default="{}",
)
parser.add_argument(
"--quant-dir",
type=str,
help="Directory to save the quantification results (psm & frag parquet files) to be reused in a distributed search",
nargs="?",
default=None,
)


def parse_config(args: argparse.Namespace) -> dict:
Expand Down Expand Up @@ -167,6 +172,41 @@ def parse_output_directory(args: argparse.Namespace, config: dict) -> str:
return output_directory


def parse_quant_dir(args: argparse.Namespace, config: dict) -> str:
"""Parse custom quant path.
1. Use custom quant path from config file if specified.
2. Use custom quant path from command line if specified.
Parameters
----------
args : argparse.Namespace
Command line arguments.
config : dict
Config dictionary.
Returns
-------
quant_dir : str
path to quant directory.
"""

quant_dir = None
if "quant_dir" in config:
quant_dir = (
utils.windows_to_wsl(config["quant_dir"])
if args.wsl
else config["quant_dir"]
)

if args.quant_dir is not None:
quant_dir = utils.windows_to_wsl(args.quant_dir) if args.wsl else args.quant_dir

return quant_dir


def parse_raw_path_list(args: argparse.Namespace, config: dict) -> list:
"""Parse raw file list.
1. Use raw file list from config file if specified.
Expand Down Expand Up @@ -305,6 +345,8 @@ def run(*args, **kwargs):
print("No output directory specified.")
return

quant_dir = parse_quant_dir(args, config)

reporting.init_logging(output_directory)
raw_path_list = parse_raw_path_list(args, config)

Expand All @@ -321,7 +363,10 @@ def run(*args, **kwargs):
for f in fasta_path_list:
logger.progress(f" {f}")

# TODO rename all output_directory, output_folder => output_path, quant_dir->quant_path (except cli parameter)
logger.progress(f"Saving output to: {output_directory}")
if quant_dir is not None:
logger.progress(f"Saving quantification output to {quant_dir=}")

try:
import matplotlib
Expand All @@ -337,6 +382,7 @@ def run(*args, **kwargs):
library_path=library_path,
fasta_path_list=fasta_path_list,
config=config,
quant_path=quant_dir,
)

plan.run()
Expand Down
9 changes: 7 additions & 2 deletions alphadia/planning.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
fasta_path_list: list[str] | None = None,
config: dict | None = None,
config_base_path: str | None = None,
quant_path: str | None = None,
) -> None:
"""Highest level class to plan a DIA Search.
Owns the input file list, speclib and the config.
Expand All @@ -59,6 +60,9 @@ def __init__(
config_update : dict, optional
dict to update the default config. Can be used for debugging purposes etc.
quant_path : str, optional
path to directory to save the quantification results (psm & frag parquet files). If not provided, the results are saved in the usual workflow folder
"""
if config is None:
config = {}
Expand All @@ -80,6 +84,7 @@ def __init__(
self.raw_path_list = raw_path_list
self.library_path = library_path
self.fasta_path_list = fasta_path_list
self.quant_path = quant_path

logger.progress(f"version: {alphadia.__version__}")

Expand Down Expand Up @@ -318,11 +323,11 @@ def run(
workflow = peptidecentric.PeptideCentricWorkflow(
raw_name,
self.config,
quant_path=self.quant_path,
)

workflow_folder_list.append(workflow.path)

# check if the raw file is already processed
workflow_folder_list.append(workflow.path)
psm_location = os.path.join(workflow.path, "psm.parquet")
frag_location = os.path.join(workflow.path, "frag.parquet")

Expand Down
12 changes: 9 additions & 3 deletions alphadia/workflow/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

logger = logging.getLogger()

TEMP_FOLDER = ".progress"
QUANT_FOLDER_NAME = "quant"


class WorkflowBase:
Expand All @@ -34,20 +34,26 @@ def __init__(
self,
instance_name: str,
config: Config,
quant_path: str = None,
) -> None:
"""
Parameters
----------
instance_name: str
Name for the particular workflow instance. This will usually be the name of the raw file
Name for the particular workflow instance. this will usually be the name of the raw file
config: dict
Configuration for the workflow. This will be used to initialize the calibration manager and fdr manager
quant_path: str
path to directory holding quant folders, relevant for distributed searches
"""
self._instance_name: str = instance_name
self._parent_path: str = os.path.join(config["output"], TEMP_FOLDER)
self._parent_path: str = quant_path or os.path.join(
config["output"], QUANT_FOLDER_NAME
)
self._config: Config = config
self.reporter: reporting.Pipeline | None = None
self._dia_data: bruker.TimsTOFTranspose | alpharaw_wrapper.AlphaRaw | None = (
Expand Down
2 changes: 2 additions & 0 deletions alphadia/workflow/peptidecentric.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@ def __init__(
self,
instance_name: str,
config: Config,
quant_path: str = None,
) -> None:
super().__init__(
instance_name,
config,
quant_path,
)
self.optlock = None

Expand Down
1 change: 1 addition & 0 deletions assets/distributed_search_schematic.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions docs/methods.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ methods/configuration
methods/calibration
methods/transfer-learning
methods/output-format
methods/dist_search_setup
```
58 changes: 58 additions & 0 deletions docs/methods/dist_search_setup.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
## Distributed AlphaDIA search on HPCL

This guide deals with setting up a distributed search in AlphaDIA, with the following prerequisites:
- A Linux (Ubuntu) HPCL system with Slurm Workload Manager installed. All resource management is handled by Slurm, AlphaDIA does not select, manage or monitor worker nodes.
- The distributed search requires absolute paths for each raw file, saved in the second column of a two-column .csv document. Simpler structures that e.g. process all files in a given directory are disfavored as large cohorts frequently consist of rawfiles spread across a number of subfolders.
- An Anaconda environment called "alphadia" with _mono_ and _alphadia_ installed (for installing _mono_, see https://github.com/MannLabs/alpharaw#installation)

## Distributed search concept

![Distributed_Search](../../assets/distributed_search_schematic.svg)

Compared to a linear two step search, distributing the raw file search steps offers a significant advantage in speed. At the most extreme, N files could be searched on N machines in parallel, completely decoupling search time from the number of files. In practice, not all steps of a search can be easily parallelized (i.e. steps which require knowledge of all files at the same time). Additionally, processing nodes available are never unlimited, requiring a chunking approach to ensure the maximum amount of parallelization with a given number of processing nodes. The diagram above summarizes these steps, indicating parallel AlphaDIA instances for first and second pass fo raw file searches.

## Steps to set up a search

1. Set up an empty search directory on your HPCL partition. One directory corresponds to one study, i.e. one set of raw files, fasta/library and search configuration.
2. Copy all files from alphadia/misc/distributed_search into the search directory
3. If no .csv file with rawfile paths exists, it can be obtained by running **discover_project_files.py** from the search directory.
4. Set first and second search configurations in **first_config.yaml** and **second_config.yaml**. For example, number of precursor candidates and inference strategy, as well as mass tolerances may differ between first and second search.
Leave all the predefined settings in the two .yaml files as they are.
5. Set the search parameters in **outer.sh**. While these can also be provided as command line arguments, it is convenient to set them in **outer.sh** itself. This file requires the following settings:
- input_directory: the search directory
- input_filename: the .csv file containing rawfile paths
- target_directory: the directory where intermediate and final outputs are written (mind that slow read/write speeds to this location may slow down your search)
- library_path (optional, will be reannotated if fasta_path is provided and predict_library is set to 1): absolute path to a .hdf spectral library
- fasta_path (optional if library_path is provided and predict_library is set to 0): absolute path to .fasta file
- first_search_config_filename: name of .yaml file for the first search
- second_search_config_filename: name of the .yaml file for the building the MBR library, second search and LFQ
6. Run **outer.sh** with the following search settings:
- --nnodes (int): specifies how many nodes can be occupied. Rawfile search will be distributed across these nodes. If there are 5 nodes and 50 raw files, the search will take place on 5 nodes in chunks of 10 rawfiles each.
- --ntasks_per_node (int): default to 1, some HPCL systems allow for multiple tasks to run on one node
- --cpus (int): default to 12, specifies how many CPUs shall be used per task
- --mem (str): default to '250G', specifies RAM requirements for each task.
**HPCL systems may be set to restrict user resources to certain limits. Make sure the above parameters comply with your HPCL setup.**
- --predict_library (1/0): default to 1, whether to predict a spectral library from a given fasta
- --first_search (1/0): default to 1, whether to search all files with the initial spectral library
- --mbr_library (1/0): whether to aggregate first search results into a focused "MBR" library
- --second_search (1/0): whether to perform a second search with the focused MBR library
- --lfq (1/0): whether to perform LFQ quantification of the second search results

#### A typical call for running the search could look like this:

```console
sbatch outer.sh --nnodes 3 --search_config search.config
```
where the 'search.config' contains the name of the .csv file containing rawfile paths and other settings, and
```console
--nnodes 3
```
indicates that the search will be parallelized across three nodes.

#### Running the search creates five subdirectories in the target folder:

- _predicted_speclib_: If spectral library prediction was set, this folder contains the .hdf spectral library
- _first_search_: Contains one subdirectory for each processing chunk. AlphaDIA subprocesses for the first search are run from these chunks and their specific config.yaml files. Precursor and fragment datasets from these searches are saved into the _mbr_library_ folder
- _mbr_library_: Contains one chunk, since the library is built from all first search results.
- _second_search_: Analogous to _first_search_, one subdirectory is created for each chunk of rawfiles that are searched with the mbr_library. Precursor and fragment datasets from these searches are saved into the _lfq_ folder.
- _lfq_: Analogous to _mbr_library_, contains one chunk which runs label free quantification (LFQ) on each output from the second search. After all search steps are completed, the final precursor and protein tables are saved here.
98 changes: 98 additions & 0 deletions misc/distributed_search/discover_project_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# Discover project files by searching a project regex. Attempted to make this
# as fast as possible by running rglob/glob search to get all filepaths with
# correct endings into a dict, and then searching that dict with the actual
# project-specific regex. Searching the dict is significantly faster than
# stepping through all directories and subdirectories and running re.search.

import sys
from pathlib import Path

import pandas as pd
import regex as re


def match_files(
project_regex: str,
source_directories: list,
search_recursively: bool = True,
file_ending: str = "raw",
):
# Convert search directories to paths and report
_search_dirs = [Path(d) for d in source_directories]

# Collect all files with correct endings into dict
print(
f"--> Collecting '.{file_ending}' files from {source_directories} \n--> search_recursively = {search_recursively}"
)
_file_dict = {}
for _dir in _search_dirs:
_dir_files = list(
_dir.rglob(f"*.{file_ending}")
if search_recursively
else _dir.glob(f"*.{file_ending}")
)
for _file in _dir_files:
# assign path to filename-key, for quick & unique searching
_file_dict[str(_file)] = _file
print(f"--> Collected {len(_file_dict)} '.{file_ending}' files")

# search project regex against file dict keys and return all matching paths
_matched_paths = []
regex_pattern = re.compile(project_regex)
print(f"--> Searching files matching '{project_regex}'")
for _file, _path in _file_dict.items():
if regex_pattern.search(_file):
_matched_paths.append(_path)

# report
print(
f"--> Discovered {len(_matched_paths)} matching filepaths for {project_regex}."
)

# suitable path dataframe
out_frame = pd.DataFrame(
columns=["project", "filepath"], index=range(len(_matched_paths))
)
out_frame["project"] = project_regex
out_frame["filepath"] = _matched_paths

return out_frame


if __name__ == "__main__":
import argparse
import os

parser = argparse.ArgumentParser(
prog="Discovering project filenames",
description="Search project files based on regex string and put them into a csv file for distributed processing",
)
parser.add_argument(
"--project_regex", help="Regex string to match project files", default=".*"
)
parser.add_argument(
"--source_directories", nargs="+", help="List of source directories"
)
parser.add_argument("--search_recursively", action="store_true")
parser.add_argument("--file_ending", default="raw")
parser.add_argument("--output_filename", default="file_list.csv")

if len(sys.argv) == 1:
parser.print_help(sys.stderr)
sys.exit(1)

args = parser.parse_args()

out_frame = match_files(
args.project_regex,
args.source_directories,
args.search_recursively,
args.file_ending,
)

output_path = (
args.output_filename
if os.path.isabs(args.output_filename)
else os.path.join("./", args.output_filename)
)
out_frame.to_csv(output_path, index=False)
Loading

0 comments on commit 691af65

Please sign in to comment.