Skip to content

WIP: Parallelizing get context analysis #267

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion pipeline/context_explorer/get_context_analysis.conseq
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ rule get_context_analysis:
compound_summary_repurposing={"type": "compound-summary", "dataset": "Rep_all_single_pt"},
compound_summary_oncref={"type": "compound-summary", "dataset": "Prism_oncology_AUC"},
tda_table={"type":"tda-table"},
workflow=fileref("parallelized_get_context_analysis.json")
outputs:
{"type": "context_analysis", "filename": { "$filename": "context_analysis.csv"} }
run "python3" with """
Expand Down Expand Up @@ -43,7 +44,25 @@ rule get_context_analysis:
artifact = by_type.get(type_name)
transformed[dest_name] = [ artifact ] if artifact is not None else []

raise Exception("todo: transformed needs to map filenames to the parameters below")

with open("inputs.json", "wt") as fd:
fd.write(json.dumps(transformed, indent=2))

"""
run """ python {{ inputs.script.filename }} inputs.json context_analysis.csv """
run """sparkles --config sparkles-config \
workflow run \
--add-hash-to-job-id \
--retry \
--nodes 10 \
-p batches=10 \
-u inputs.json \
-u ~/.taiga/token:.taiga-token \
-u {{ inputs.script.filename }}:get_context_analysis.py \
-u {{ }}:rep-cpmd-sum \
-u {{ }}:oncref-cpmd-sum \
-u {{ }}:tda-table \
context-analysis \
{{ inputs.workflow }}
"""
run """gsutil cp `cat results_path.txt` context_analysis.csv """
184 changes: 161 additions & 23 deletions pipeline/context_explorer/get_context_analysis.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
from sre_parse import parse

import pandas as pd
import scipy.stats as stats
import numpy as np
import statsmodels.api as sm
import warnings
from tqdm import tqdm
import argparse
import json

import pickle
import csv
import fsspec

from click.core import batch
from taigapy import create_taiga_client_v3

MIN_GROUP_SIZE = 5
Expand Down Expand Up @@ -137,6 +145,18 @@ def format_selectivity_vals(repurposing_table_path, oncref_table_path):
return selectivity_vals


from dataclasses import dataclass
from typing import Dict


@dataclass
class AllData:
subtype_tree: pd.DataFrame
context_matrix: pd.DataFrame
datasets_to_test: Dict[str, pd.DataFrame]
data_for_extra_cols: Dict[str, pd.DataFrame]


def load_all_data(
subtype_tree_taiga_id,
context_matrix_taiga_id,
Expand All @@ -149,9 +169,6 @@ def load_all_data(
oncref_table_path,
tda_table_path,
):

all_data_dict = dict()

# dictionary for the dataframes that we will actually perform t-tests on
datasets_to_test = dict()

Expand All @@ -164,8 +181,6 @@ def load_all_data(
subtype_tree_taiga_id=subtype_tree_taiga_id,
context_matrix_taiga_id=context_matrix_taiga_id,
)
all_data_dict["subtype_tree"] = subtype_tree
all_data_dict["context_matrix"] = context_matrix

gene_effect, gene_dependency = load_crispr_data(
tc=tc,
Expand Down Expand Up @@ -204,10 +219,7 @@ def load_all_data(
)
data_for_extra_cols["selectivity_vals"] = selectivity_vals

all_data_dict["datasets_to_test"] = datasets_to_test
all_data_dict["data_for_extra_cols"] = data_for_extra_cols

return all_data_dict
return AllData(subtype_tree, context_matrix, datasets_to_test, data_for_extra_cols)


### ----- CONTEXT ENRICHMENT FUNCTIONS ----- ###
Expand Down Expand Up @@ -360,7 +372,11 @@ def compute_context_results(


def compute_in_out_groups(
subtype_tree, context_matrix, datasets_to_test, data_for_extra_cols
subtype_tree,
context_matrix,
datasets_to_test,
data_for_extra_cols,
subtype_tree_batch,
):
name_to_code_onco = dict(
zip(
Expand All @@ -377,7 +393,7 @@ def compute_in_out_groups(
names_to_codes = {**name_to_code_onco, **name_to_code_gs}

all_results = []
for idx, ctx_row in subtype_tree.iterrows():
for _, ctx_row in tqdm(list(subtype_tree_batch.iterrows())):
ctx_code = names_to_codes[ctx_row.NodeName]
ctx_in = context_matrix[context_matrix[ctx_code] == True].index

Expand Down Expand Up @@ -438,16 +454,15 @@ def compute_in_out_groups(
data_for_extra_cols,
)
)

return pd.concat(all_results)
results_df = pd.concat(all_results, ignore_index=True)
return results_df


def get_id_or_file_name(possible_id, id_key="dataset_id"):
return None if len(possible_id) == 0 else possible_id[0][id_key]


### ----- MAIN ----- ###
def compute_context_explorer_results(inputs, out_filename):
def _prepare(inputs):
with open(inputs, "rt") as input_json:
taiga_ids_or_file_name = json.load(input_json)

Expand Down Expand Up @@ -484,7 +499,7 @@ def compute_context_explorer_results(inputs, out_filename):
)

### ---- LOAD DATA ---- ###
data_dict = load_all_data(
all_data = load_all_data(
subtype_tree_taiga_id,
context_matrix_taiga_id,
gene_effect_taiga_id,
Expand All @@ -497,18 +512,141 @@ def compute_context_explorer_results(inputs, out_filename):
tda_table_path,
)

context_explorer_results = compute_in_out_groups(**data_dict)
context_explorer_results.to_csv(out_filename, index=False)
return all_data


### ----- MAIN ----- ###
def compute_context_explorer_results(inputs, out_filename):
print("loading all data")
all_data = _prepare(inputs)
print("computing context analysis")
context_explorer_results = compute_in_out_groups(
all_data.subtype_tree,
all_data.context_matrix,
all_data.datasets_to_test,
all_data.data_for_extra_cols,
all_data.subtype_tree,
)
print("writing results to file")
context_explorer_results.to_csv(out_filename, index=False)
return


if __name__ == "__main__":
parser = argparse.ArgumentParser()
def add_compute_command(subparser):
parser = subparser.add_parser("compute", help="Compute context analysis")
parser.add_argument("inputs")
parser.add_argument("out_filename")
args = parser.parse_args()
parser.set_defaults(
func=lambda args: compute_context_explorer_results(
args.inputs, args.out_filename
)
)

compute_context_explorer_results(
args.inputs, args.out_filename,

########################
# code to allow breaking computation into batches and running each batch independently
def prepare_batches(inputs, intermediates_dir, batch_count):
all_data = _prepare(inputs)
with fsspec.open(f"{intermediates_dir}/shared_state.pickle", "wb") as fd:
pickle.dump(all_data, fd)
with fsspec.open(f"{intermediates_dir}/batches.csv", "wt") as fd:
w = csv.DictWriter(fd, fieldnames=["start_index", "end_index"])
w.writeheader()
rows = len(all_data.subtype_tree)
batch_size = rows // batch_count
for i in range(0, rows, batch_size):
w.writerow({"start_index": i, "end_index": min(i + batch_size, rows)})


def add_prepare_batches_command(subparser):
parser = subparser.add_parser("prepare-batches", help="Prepare batches")
parser.add_argument("inputs", type=str, help="Input json")
parser.add_argument(
"intermediates_dir",
type=str,
help="where intermediate files will be written to",
)
parser.add_argument("batches", type=int, help="number of batches to prepare")
parser.set_defaults(
func=lambda args: prepare_batches(
args.inputs, args.intermediates_dir, args.batches
)
)


def run_batch(intermediates_dir, start_index, end_index):
with fsspec.open(f"{intermediates_dir}/shared_state.pickle", "rb") as fd:
all_data = pickle.load(fd)
subtree_batch = all_data.subtype_tree[start_index:end_index]
context_explorer_results = compute_in_out_groups(
all_data.subtype_tree,
all_data.context_matrix,
all_data.datasets_to_test,
all_data.data_for_extra_cols,
subtree_batch,
)
context_explorer_results.to_csv(
f"{intermediates_dir}/batch-{start_index}-{end_index}-out.csv", index=False
)


def add_run_batch_command(subparser):
parser = subparser.add_parser("run-batch", help="Run a single batch")
parser.add_argument(
"intermediates_dir",
type=str,
help="where intermediate files will be written to",
)
parser.add_argument("start_index", type=int)
parser.add_argument("end_index", type=int)
parser.set_defaults(
func=lambda args: run_batch(
args.intermediates_dir, args.start_index, args.end_index
)
)


def gather_batches(intermediates_dir, out_filename):
all_outputs = []
with fsspec.open(f"{intermediates_dir}/batches.csv", "rt") as fd:
r = csv.DictReader(fd)
for row in r:
all_outputs.append(
pd.read_csv(
f"{intermediates_dir}/batch-{row['start_index']}-{row['end_index']}-out.csv"
)
)
all_outputs_df = pd.concat(all_outputs, ignore_index=True)
all_outputs_df.to_csv(out_filename, index=False)


def add_gather_batches_command(subparser):
parser = subparser.add_parser("gather-batches", help="Run a single batch")
parser.add_argument(
"intermediates_dir",
type=str,
help="where intermediate files will be written to",
)
parser.add_argument(
"out_filename", type=str, help="where to write the final merged file"
)
parser.set_defaults(
func=lambda args: gather_batches(args.intermediates_dir, args.out_filename)
)


######################


if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.set_defaults(func=lambda args: parser.print_help())

subparser = parser.add_subparsers(dest="command")
add_prepare_batches_command(subparser)
add_run_batch_command(subparser)
add_gather_batches_command(subparser)
add_compute_command(subparser)

args = parser.parse_args()
args.func(args)
47 changes: 47 additions & 0 deletions pipeline/context_explorer/parallelized_get_context_analysis.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
{
"files_to_localize": ["get_context_analysis.py", ".taiga-token"],
"steps": [
{
"command": [
"python3",
"./get_context_analysis.py",
"prepare-batches",
"inputs.json",
"{step.1.job_path}/1/temp",
"{batches}"
],
"files_to_localize": [
"inputs.json",
"rep-cpmd-sum",
"oncref-cpmd-sum",
"tda-table"
]
},
{
"command": [
"python3",
"./get_context_analysis.py",
"run-batch",
"{step.1.job_path}/1/temp",
"{parameter.start_index}",
"{parameter.end_index}"
],
"parameters_csv": "{step.1.job_path}/1/temp/batches.csv"
},
{
"command": [
"python3",
"./get_context_analysis.py",
"gather-batches",
"{step.1.job_path}/1/temp",
"{step.1.job_path}/1/results.csv"
]
}
],
"write_on_completion": [
{
"expression": "{step.1.job_path}/1/results.csv",
"filename": "results_path.txt"
}
]
}