Skip to content

Commit

Permalink
update dev scripts
Browse files Browse the repository at this point in the history
  • Loading branch information
stemangiola committed Nov 19, 2024
1 parent 09ad7b3 commit 52c7cdf
Show file tree
Hide file tree
Showing 3 changed files with 480 additions and 699 deletions.
1 change: 0 additions & 1 deletion dev/cellxgene_to_metadata.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ tar_script({
tasks_max = 5,
verbose = T, ,
seconds_idle = 30
script_directory = paste0("/vast/scratch/users/mangiola.s/cellxgenedp/crew_cluster/", basename(tempdir()))
),
crew_controller_slurm(
name = "slurm_1_40",
Expand Down
97 changes: 52 additions & 45 deletions dev/make_pseudobulk.R
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ library(duckdb)
# Get input


result_directory = "/vast/scratch/users/mangiola.s/pseudobulk_cellNexus_1_0_0_coarse"
result_directory = "/vast/scratch/users/mangiola.s/pseudobulk_cellNexus_1_0_2"

tar_script({

result_directory = "/vast/scratch/users/mangiola.s/pseudobulk_cellNexus_1_0_0_coarse"
result_directory = "/vast/scratch/users/mangiola.s/pseudobulk_cellNexus_1_0_2"


#-----------------------#
Expand Down Expand Up @@ -123,30 +123,30 @@ tar_script({
#
)

split_metadata = function(){
split_metadata = function(metadata_parquet){

# CAQ_directory = "/vast/projects/cellxgene_curated/cellNexus"

my_metadata =
tbl(
dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_0.parquet')")
sql(glue("SELECT * FROM read_parquet('{metadata_parquet}')"))
) |>

#

# this, because for curated Atlas, query, I don't need of the other meta data,
# I can add it later from the full meta data
select(sample_id, cell_, file_id_cellNexus, cell_type_consensus_harmonised) |>
filter(cell_type_consensus_harmonised!="other", !cell_type_consensus_harmonised |> is.na()) |>
filter(!file_id_cellNexus |> is.na()) |>
select(sample_id, cell__id, file_id_cellNexus_single_Cell, cell_type_unified_ensemble) |>
filter(cell_type_unified_ensemble!="other", !cell_type_unified_ensemble |> is.na()) |>
filter(!file_id_cellNexus_single_Cell |> is.na()) |>

# THIS SHOULD BE REMOVED IN THE FUTURE
mutate(file_id_cellNexus = paste0(file_id_cellNexus, ".h5ad")) |>
mutate(file_id_cellNexus_single_Cell = paste0(file_id_cellNexus_single_Cell, ".h5ad")) |>

# NEST
as_tibble() |>
mutate(chunk = file_id_cellNexus) |>
mutate(chunk = file_id_cellNexus_single_Cell) |>
nest(data = -c(chunk)) |>

mutate(number_of_cells = map_int(data, nrow))
Expand All @@ -162,19 +162,19 @@ tar_script({
data, chunk,
~ {

file_id_cellNexus = .y
file_id_cellNexus_single_Cell = .y

# THIS IS TEMPORARY BECAUSE I HAVE BRAIN DATASETS WITH 1M CELL THAT ARE HARD TO SAVE IN THE DB
if(!file.exists(paste0(cache.path, "/original/", file_id_cellNexus))) {
warning(paste0(cache.path, "/original/", file_id_cellNexus, " does not exist in the cache") )
if(!file.exists(paste0(cache.path, "/original/", file_id_cellNexus_single_Cell))) {
warning(paste0(cache.path, "/original/", file_id_cellNexus_single_Cell, " does not exist in the cache") )
return(NULL)
}

.x |>
CuratedAtlasQueryR:::get_data_container(
repository = NULL,
cache_directory = cache.path,
grouping_column = "file_id_cellNexus"
grouping_column = "file_id_cellNexus_single_Cell"
)
}

Expand All @@ -198,12 +198,12 @@ tar_script({
pseudobulk =
aggregateAcrossCells(
.x,
colData(.x)[,c("sample_id", "cell_type_consensus_harmonised")],
colData(.x)[,c("sample_id", "cell_type_unified_ensemble")],
BPPARAM = MulticoreParam(workers = if_else(.y>50000, 20, 5))
)
colnames(pseudobulk) = paste0(colData(pseudobulk)$sample_id, "___", colData(pseudobulk)$cell_type_consensus_harmonised)
colnames(pseudobulk) = paste0(colData(pseudobulk)$sample_id, "___", colData(pseudobulk)$cell_type_unified_ensemble)

pseudobulk = pseudobulk |> select(.cell, sample_id, file_id_cellNexus, cell_type_consensus_harmonised)
pseudobulk = pseudobulk |> select(.cell, sample_id, file_id_cellNexus_single_Cell, cell_type_unified_ensemble)

# Decrease size
# We will reattach rowames later
Expand All @@ -226,7 +226,7 @@ tar_script({

# Add columns and filter
mutate(data = map2(
data, file_id_cellNexus,
data, file_id_cellNexus_single_Cell,
~ {

# ## TEMPORARY FIX BECAUSE I FORGOT TO ADD ASSAY NAME
Expand All @@ -236,8 +236,8 @@ tar_script({
# Add columns
se =
.x |>
mutate(file_id_cellNexus = .y) |>
select(-any_of(c("file_id_cellNexus", ".cell", "original_cell_id")))
mutate(file_id_cellNexus_single_Cell = .y) |>
select(-any_of(c("file_id_cellNexus_single_Cell", ".cell", "original_cell_id")))


# # Identify samples with many genes
Expand All @@ -260,7 +260,7 @@ tar_script({
.progress=TRUE
)) |>

nest(data = -file_id_cellNexus) |>
nest(data = -file_id_cellNexus_single_Cell) |>
mutate(data = map(data,
~ {
se = .x |>
Expand Down Expand Up @@ -383,33 +383,40 @@ tar_script({
# Pipeline
#-----------------------#
list(
tar_target(cache.path, "/vast/projects/cellxgene_curated/cellNexus/cellxgene/29_10_2024/", deployment = "main"),

# Get rownames
tar_target(cache.path, "/vast/scratch/users/mangiola.s/cellNexus/cellxgene/15_11_2024/", deployment = "main"),
tar_target(
sce_rownames,
tbl(
dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_0.parquet')")
) |>
head(1) |>
mutate(file_id_cellNexus = paste0(file_id_cellNexus, ".h5ad")) |>
CuratedAtlasQueryR:::get_data_container(
repository = NULL,
cache_directory = cache.path,
grouping_column = "file_id_cellNexus"
) |>
rownames(),
resources = tar_resources(crew = tar_resources_crew("slurm_1_20")),
packages = c("arrow", "dplyr", "duckdb")
metadata_parquet,
"/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_2.parquet",
format = "file",
deployment = "main"
),


# # Get rownames
# tar_target(
# sce_rownames,
# tbl(
# dbConnect(duckdb::duckdb(), dbdir = ":memory:"),
# sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_2.parquet')")
# ) |>
# head(1) |>
# mutate(file_id_cellNexus_single_Cell = paste0(file_id_cellNexus_single_Cell, ".h5ad")) |>
# CuratedAtlasQueryR:::get_data_container(
# repository = NULL,
# cache_directory = cache.path,
# grouping_column = "file_id_cellNexus_single_Cell"
# ) |>
# rownames(),
# resources = tar_resources(crew = tar_resources_crew("slurm_1_20")),
# packages = c("arrow", "dplyr", "duckdb")
# ),

# Do metadata
tar_target(
metadata_split,
split_metadata(),
split_metadata(metadata_parquet),
resources = tar_resources(crew = tar_resources_crew("slurm_1_20")),
packages = c("arrow", "dplyr", "duckdb", "tidyr", "dplyr", "purrr")
packages = c("arrow", "dplyr", "duckdb", "tidyr", "dplyr", "purrr", "glue")
),

# Get SCE SMALL
Expand Down Expand Up @@ -599,7 +606,7 @@ job::job({

tar_make(
# callr_function = NULL,
reporter = "summary",
reporter = "verbose_positives",
script = glue("{result_directory}/_targets.R"),
store = glue("{result_directory}/_targets")
)
Expand All @@ -617,7 +624,7 @@ tar_meta(store = glue("{result_directory}/_targets")) |>
library(SummarizedExperiment)
library(tidySummarizedExperiment)

sccomp_counts = readRDS("/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/sccomp_on_cellNexus_1_0_0/cell_metadata_1_0_0_sccomp_input_counts.rds")
sccomp_counts = readRDS("/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/sccomp_on_cellNexus_1_0_2/cell_metadata_1_0_2_sccomp_input_counts.rds")

# Save into one SE
job::job({
Expand All @@ -640,7 +647,7 @@ job::job({
zellkonverter::writeH5AD(
se,
verbose = TRUE,
file = "/vast/projects/cellxgene_curated/cellNexus/pseudobulk_sample_cell_type_1_0_0.h5ad",
file = "/vast/projects/cellxgene_curated/cellNexus/pseudobulk_sample_cell_type_1_0_2.h5ad",
X_name = "counts_scaled"
)
})
Expand All @@ -665,11 +672,11 @@ system("~/bin/rclone copy /vast/projects/cellxgene_curated/cellNexus/pseudobulk_

tar_read_raw("pseudobulk_file_id_quantile_normalised_processed_split_1_HDF5_1b4b19de1079ec95", store = glue("{result_directory}/_targets"))

# tar_invalidate(pseudobulk_file_id_quantile_normalised_processed_split_2_HDF5, store = glue("{result_directory}/_targets"))
# tar_invalidate(metadata_split, store = glue("{result_directory}/_targets"))
# tar_invalidate(pseudobulk_file_id_quantile_normalised_processed_split_1, store = glue("{result_directory}/_targets"))

tar_workspace(
pseudobulk_file_id_quantile_normalised_processed_split_1_HDF5_5f69024c54bd8215,
"metadata_split_SMALL",
script = glue("{result_directory}/_targets.R"),
store = glue("{result_directory}/_targets")
)
Loading

0 comments on commit 52c7cdf

Please sign in to comment.