Skip to content

Commit

Permalink
update dev scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
stemangiola committed Nov 1, 2024
1 parent 8902d86 commit 10531e7
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 131 deletions.
34 changes: 14 additions & 20 deletions dev/HCA_cell_type_consensus.R
Original file line number Diff line number Diff line change
Expand Up @@ -9,47 +9,41 @@ parquet_file = "/vast/projects/cellxgene_curated/census_samples/concensus_input.

data_tbl <- tbl(con, sql(paste0("SELECT * FROM read_parquet('", parquet_file, "')")))

annotation_combination =
data_tbl |>
#select(azimuth_predicted.celltype.l2, monaco_first.labels.fine, blueprint_first.labels.fine) |>
select(cell_, dataset_id, cell_type, cell_type_ontology_term_id, azimuth_predicted.celltype.l2, monaco_first.labels.fine, blueprint_first.labels.fine)
#arrange(desc(n)) |>




# annotation_combination =
# data_tbl |>
# #select(azimuth_predicted.celltype.l2, monaco_first.labels.fine, blueprint_first.labels.fine) |>
# select(cell_, dataset_id, cell_type, cell_type_ontology_term_id, azimuth_predicted.celltype.l2, monaco_first.labels.fine, blueprint_first.labels.fine)
# #arrange(desc(n)) |>

annotation_consensus =
annotation_combination |>
data_tbl |>
distinct(azimuth_predicted.celltype.l2, monaco_first.labels.fine, blueprint_first.labels.fine) |>
as_tibble() |>
mutate(reannotation_consensus = reference_annotation_to_consensus(azimuth_input = azimuth_predicted.celltype.l2, monaco_input = monaco_first.labels.fine, blueprint_input = blueprint_first.labels.fine ))
mutate(data_driven_consensus = reference_annotation_to_consensus(azimuth_input = azimuth_predicted.celltype.l2, monaco_input = monaco_first.labels.fine, blueprint_input = blueprint_first.labels.fine ))


annotation_combination =
annotation_combination |>
data_tbl =
data_tbl |>
left_join(annotation_consensus, copy = TRUE)

output_parquet <- "/vast/projects/mangiola_immune_map/PostDoc/CuratedAtlasQueryR/dev/consensus_output.parquet"

con_write <- dbConnect(duckdb::duckdb(), dbdir = ":memory:")

# Use DuckDB's COPY TO command to write the data back to Parquet
# We need to execute a SQL command using dbExecute()
copy_query <- paste0("
COPY (
SELECT *
FROM (
", dbplyr::sql_render(annotation_combination), "
", dbplyr::sql_render(data_tbl), "
)
) TO '", output_parquet, "' (FORMAT PARQUET);
")

# Execute the COPY command
dbExecute(con, copy_query)
dbExecute(con_write, copy_query)

# Disconnect from the database
dbDisconnect(con, shutdown = TRUE)

# Read back
con <- dbConnect(duckdb::duckdb(), dbdir = ":memory:")
data_consensus <- tbl(con, sql(paste0("SELECT * FROM read_parquet('", output_parquet, "')")))
dbDisconnect(con_write, shutdown = TRUE)

213 changes: 102 additions & 111 deletions dev/execute_hpcell_on_census_and_defining_data_tranformation.R
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
library(glue)
library(dplyr)
library(arrow)
library(SingleCellExperiment)
library(tibble)
library(SummarizedExperiment)
library(glue)
library(purrr)
library(zellkonverter)
library(tidyr)
library(ggplot2)
library(plotly)
library(targets)
library(stringr)
library(CuratedAtlasQueryR)
library(fs)
library(HPCell)
library(crew.cluster)
library(arrow)
library(CuratedAtlasQueryR)
directory = "/vast/scratch/users/shen.m/Census_rerun/split_h5ad_based_on_sample_id/"
sample_anndata <- dir(glue("{directory}"), full.names = T)
downloaded_samples_tbl <- read_parquet("/vast/scratch/users/shen.m/Census_rerun/census_samples_to_download_groups.parquet")
Expand All @@ -33,8 +26,8 @@ result_directory = "/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_20

sample_meta <- tar_read(metadata_dataset_id_common_sample_columns, store = glue("{result_directory}/_targets"))
sample_tbl = downloaded_samples_tbl |> left_join(get_metadata() |> select(dataset_id, contains("norm")) |>
distinct() |> filter(!is.na(x_normalization)) |>
as_tibble(), by = "dataset_id")
distinct() |> filter(!is.na(x_normalization)) |>
as_tibble(), by = "dataset_id")


sample_tbl <- sample_tbl |> left_join(sample_meta, by = "dataset_id") |> distinct(file_name, tier, cell_number, dataset_id, sample_2,
Expand Down Expand Up @@ -64,52 +57,8 @@ sample_tbl <- sample_tbl |> left_join(sample_meta, by = "dataset_id") |> distinc
x_normalization == "normalized" ~ "round negative value to 0"
))

sample_tbl <- sample_tbl |> mutate(transformation_function = map(
method_to_apply,
~ ( function(data) {
assay_name <- data@assays |> names() |> magrittr::extract2(1)
counts <- assay(data, assay_name)
density_est <- counts |> as.matrix() |> density()
mode_value <- density_est$x[which.max(density_est$y)]
if (mode_value < 0 ) counts <- counts + abs(mode_value)

# Scale max counts to 20 to avoid any downstream failure
if (.x != "identity" && (max(counts) > 20)) {
scale_factor = 20 / max(counts)
counts <- counts * scale_factor}

counts <- transform_method(counts)
# round counts to avoid potential substraction error due to different digits print out
counts <- counts |> round(5)
majority_gene_counts = names(which.max(table(as.vector(counts)))) |> as.numeric()
if (majority_gene_counts != 0) {
counts <- counts - majority_gene_counts
}

# Avoid downstream failures negative counts
if((counts[,1:min(10000, ncol(counts))] |> min()) < 0)
counts[counts < 0] <- 0

# Assign counts back to data
assay(data, assay_name) <- counts

col_sums <- colSums(counts)
# Drop all zero cells
data <- data[, col_sums > 0]

# Avoid downstream binding error
rowData(data) = NULL

data

}) |>
# Meta programming, replacing the transformation programmatically
substitute( env = list(transform_method = as.name(.x))) |>
# Evaluate back to a working function
eval()
))

sample_tbl <- readRDS("/vast/scratch/users/shen.m/Census_rerun/sample_tbl_input_for_hpcell.rds")

# Set the parent directory where the subdirectories will be created
# parent_dir <- "~/scratch/Census_rerun/"
Expand All @@ -128,68 +77,110 @@ sample_tbl <- readRDS("/vast/scratch/users/shen.m/Census_rerun/sample_tbl_input_
# }

# Run 1000 samples per run. Save log and result in the corresponding store
store = "/vast/projects/mangiola_immune_map/PostDoc/CuratedAtlasQueryR/dev/debug_hpcell/target_store"
setwd(glue("{store}"))
sliced_sample_tbl = sample_tbl |> slice(2001:3000) |> select(file_name, tier, cell_number, dataset_id,
sample_2, transformation_function)

# setwd(glue("{store}"))
sliced_sample_tbl =
sample_tbl |>
select(file_name, tier, cell_number, dataset_id, sample_2, method_to_apply)

# sliced_sample_tbl =
# sliced_sample_tbl |>
# slice(1:20)

# Enable sample_names.rds to store sample names for the input
sample_names <- sliced_sample_tbl |> pull(file_name) |> set_names(sliced_sample_tbl |> pull(sample_2))
sample_names = sample_names |> str_replace("/home/users/allstaff/shen.m/scratch", "/vast/scratch/users/shen.m")

sample_names |>
initialise_hpc(
gene_nomenclature = "ensembl",
data_container_type = "anndata",
store = store,
tier = sliced_sample_tbl |> pull(tier),
computing_resources = list(
crew_controller_slurm(
name = "tier_1",
script_lines = "#SBATCH --mem 35G",
slurm_cpus_per_task = 1,
workers = 200,
tasks_max = 1,
verbose = T
sample_names <-
sliced_sample_tbl |>
pull(file_name) |>
str_replace("/home/users/allstaff/shen.m/scratch", "/vast/scratch/users/shen.m") |>
set_names(sliced_sample_tbl |> pull(sample_2))
tiers = sliced_sample_tbl |> pull(tier)
functions = sliced_sample_tbl |> pull(method_to_apply)
# rm(sliced_sample_tbl)
# gc()

job::job({

library(HPCell)

sample_names |>
initialise_hpc(
gene_nomenclature = "ensembl",
data_container_type = "anndata",
store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store",
# store = "/vast/projects/cellxgene_curated/census_hpcell_oct_2024/target_store_1_20",
tier = tiers,
computing_resources = list(
crew_controller_slurm(
name = "tier_1",
script_lines = "#SBATCH --mem 8G",
slurm_cpus_per_task = 1,
workers = 300,
tasks_max = 10,
verbose = T,
launch_max = 5
),

crew_controller_slurm(
name = "tier_2",
script_lines = "#SBATCH --mem 10G",
slurm_cpus_per_task = 1,
workers = 200,
tasks_max = 10,
verbose = T,
launch_max = 5
),
crew_controller_slurm(
name = "tier_3",
script_lines = "#SBATCH --mem 15G",
slurm_cpus_per_task = 1,
workers = 100,
tasks_max = 10,
verbose = T,
launch_max = 5
),
crew_controller_slurm(
name = "tier_4",
script_lines = "#SBATCH --mem 70G",
slurm_cpus_per_task = 1,
workers = 100,
tasks_max = 10,
verbose = T,
launch_max = 5
)
),
verbosity = "summary",
# debug_step = "annotation_tbl_tier_4",
update = "never",
error = "continue",
garbage_collection = 100,
workspace_on_error = TRUE

crew_controller_slurm(
name = "tier_2",
script_lines = "#SBATCH --mem 60G",
slurm_cpus_per_task = 1,
workers = 50,
tasks_max = 1,
verbose = T
),
crew_controller_slurm(
name = "tier_3",
script_lines = "#SBATCH --mem 90G",
slurm_cpus_per_task = 1,
workers = 25,
tasks_max = 1,
verbose = T
),
crew_controller_slurm(
name = "tier_4",
script_lines = "#SBATCH --mem 100G",
slurm_cpus_per_task = 1,
workers = 14,
tasks_max = 1,
verbose = T
)
)
) |>
tranform_assay(fx = functions, target_output = "sce_transformed") |>

) |>
tranform_assay(fx = sliced_sample_tbl |>
pull(transformation_function),
target_output = "sce_transformed")

|>
# # Remove empty outliers based on RNA count threshold per cell
remove_empty_threshold(target_input = "sce_transformed", RNA_feature_threshold = 200) |>

# Annotation
annotate_cell_type(target_input = "sce_transformed", azimuth_reference = "pbmcref") |>

print()

# Remove empty outliers based on RNA count threshold per cell
remove_empty_DropletUtils(target_input = "sce_transformed", RNA_feature_threshold = 200) |>

# Annotation
annotate_cell_type(target_input = "sce_transformed", azimuth_reference = "pbmcref")
})



tar_meta(starts_with("annotation_tbl"), store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store") |>
filter(!error |> is.na()) |> arrange(desc(time)) |> select(error, name)

# I have to check the input of this NULL target
# annotation_tbl_tier_1_ecac957542df0c20

tar_workspace(annotation_tbl_tier_4_8fcdb6edc543d3ea, store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store")
annotation_label_transfer(sce_transformed_tier_4, empty_droplets_tbl = empty_tbl_tier_4, reference_azimuth = "pbmcref", feature_nomenclature = gene_nomenclature)

tar_read_raw("annotation_tbl_tier_1_ecac957542df0c20", store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store")


# tar_invalidate(starts_with("annotation_tbl_tier_"), store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store")

0 comments on commit 10531e7

Please sign in to comment.