diff --git a/dev/cellxgene_to_metadata.R b/dev/cellxgene_to_metadata.R index cbe9e68..6cd3a95 100644 --- a/dev/cellxgene_to_metadata.R +++ b/dev/cellxgene_to_metadata.R @@ -71,7 +71,6 @@ tar_script({ tasks_max = 5, verbose = T, , seconds_idle = 30 - script_directory = paste0("/vast/scratch/users/mangiola.s/cellxgenedp/crew_cluster/", basename(tempdir())) ), crew_controller_slurm( name = "slurm_1_40", diff --git a/dev/make_pseudobulk.R b/dev/make_pseudobulk.R index 6a85bc2..bbe97a6 100644 --- a/dev/make_pseudobulk.R +++ b/dev/make_pseudobulk.R @@ -8,11 +8,11 @@ library(duckdb) # Get input -result_directory = "/vast/scratch/users/mangiola.s/pseudobulk_cellNexus_1_0_0_coarse" +result_directory = "/vast/scratch/users/mangiola.s/pseudobulk_cellNexus_1_0_2" tar_script({ - result_directory = "/vast/scratch/users/mangiola.s/pseudobulk_cellNexus_1_0_0_coarse" + result_directory = "/vast/scratch/users/mangiola.s/pseudobulk_cellNexus_1_0_2" #-----------------------# @@ -123,30 +123,30 @@ tar_script({ # ) - split_metadata = function(){ + split_metadata = function(metadata_parquet){ # CAQ_directory = "/vast/projects/cellxgene_curated/cellNexus" my_metadata = tbl( dbConnect(duckdb::duckdb(), dbdir = ":memory:"), - sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_0.parquet')") + sql(glue("SELECT * FROM read_parquet('{metadata_parquet}')")) ) |> # # this, because for curated Atlas, query, I don't need of the other meta data, # I can add it later from the full meta data - select(sample_id, cell_, file_id_cellNexus, cell_type_consensus_harmonised) |> - filter(cell_type_consensus_harmonised!="other", !cell_type_consensus_harmonised |> is.na()) |> - filter(!file_id_cellNexus |> is.na()) |> + select(sample_id, cell__id, file_id_cellNexus_single_Cell, cell_type_unified_ensemble) |> + filter(cell_type_unified_ensemble!="other", !cell_type_unified_ensemble |> is.na()) |> + filter(!file_id_cellNexus_single_Cell |> is.na()) |> # THIS SHOULD BE REMOVED IN THE FUTURE - mutate(file_id_cellNexus = paste0(file_id_cellNexus, ".h5ad")) |> + mutate(file_id_cellNexus_single_Cell = paste0(file_id_cellNexus_single_Cell, ".h5ad")) |> # NEST as_tibble() |> - mutate(chunk = file_id_cellNexus) |> + mutate(chunk = file_id_cellNexus_single_Cell) |> nest(data = -c(chunk)) |> mutate(number_of_cells = map_int(data, nrow)) @@ -162,11 +162,11 @@ tar_script({ data, chunk, ~ { - file_id_cellNexus = .y + file_id_cellNexus_single_Cell = .y # THIS IS TEMPORARY BECAUSE I HAVE BRAIN DATASETS WITH 1M CELL THAT ARE HARD TO SAVE IN THE DB - if(!file.exists(paste0(cache.path, "/original/", file_id_cellNexus))) { - warning(paste0(cache.path, "/original/", file_id_cellNexus, " does not exist in the cache") ) + if(!file.exists(paste0(cache.path, "/original/", file_id_cellNexus_single_Cell))) { + warning(paste0(cache.path, "/original/", file_id_cellNexus_single_Cell, " does not exist in the cache") ) return(NULL) } @@ -174,7 +174,7 @@ tar_script({ CuratedAtlasQueryR:::get_data_container( repository = NULL, cache_directory = cache.path, - grouping_column = "file_id_cellNexus" + grouping_column = "file_id_cellNexus_single_Cell" ) } @@ -198,12 +198,12 @@ tar_script({ pseudobulk = aggregateAcrossCells( .x, - colData(.x)[,c("sample_id", "cell_type_consensus_harmonised")], + colData(.x)[,c("sample_id", "cell_type_unified_ensemble")], BPPARAM = MulticoreParam(workers = if_else(.y>50000, 20, 5)) ) - colnames(pseudobulk) = paste0(colData(pseudobulk)$sample_id, "___", colData(pseudobulk)$cell_type_consensus_harmonised) + colnames(pseudobulk) = paste0(colData(pseudobulk)$sample_id, "___", colData(pseudobulk)$cell_type_unified_ensemble) - pseudobulk = pseudobulk |> select(.cell, sample_id, file_id_cellNexus, cell_type_consensus_harmonised) + pseudobulk = pseudobulk |> select(.cell, sample_id, file_id_cellNexus_single_Cell, cell_type_unified_ensemble) # Decrease size # We will reattach rowames later @@ -226,7 +226,7 @@ tar_script({ # Add columns and filter mutate(data = map2( - data, file_id_cellNexus, + data, file_id_cellNexus_single_Cell, ~ { # ## TEMPORARY FIX BECAUSE I FORGOT TO ADD ASSAY NAME @@ -236,8 +236,8 @@ tar_script({ # Add columns se = .x |> - mutate(file_id_cellNexus = .y) |> - select(-any_of(c("file_id_cellNexus", ".cell", "original_cell_id"))) + mutate(file_id_cellNexus_single_Cell = .y) |> + select(-any_of(c("file_id_cellNexus_single_Cell", ".cell", "original_cell_id"))) # # Identify samples with many genes @@ -260,7 +260,7 @@ tar_script({ .progress=TRUE )) |> - nest(data = -file_id_cellNexus) |> + nest(data = -file_id_cellNexus_single_Cell) |> mutate(data = map(data, ~ { se = .x |> @@ -383,33 +383,40 @@ tar_script({ # Pipeline #-----------------------# list( - tar_target(cache.path, "/vast/projects/cellxgene_curated/cellNexus/cellxgene/29_10_2024/", deployment = "main"), - - # Get rownames + tar_target(cache.path, "/vast/scratch/users/mangiola.s/cellNexus/cellxgene/15_11_2024/", deployment = "main"), tar_target( - sce_rownames, - tbl( - dbConnect(duckdb::duckdb(), dbdir = ":memory:"), - sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_0.parquet')") - ) |> - head(1) |> - mutate(file_id_cellNexus = paste0(file_id_cellNexus, ".h5ad")) |> - CuratedAtlasQueryR:::get_data_container( - repository = NULL, - cache_directory = cache.path, - grouping_column = "file_id_cellNexus" - ) |> - rownames(), - resources = tar_resources(crew = tar_resources_crew("slurm_1_20")), - packages = c("arrow", "dplyr", "duckdb") + metadata_parquet, + "/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_2.parquet", + format = "file", + deployment = "main" ), + + # # Get rownames + # tar_target( + # sce_rownames, + # tbl( + # dbConnect(duckdb::duckdb(), dbdir = ":memory:"), + # sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_2.parquet')") + # ) |> + # head(1) |> + # mutate(file_id_cellNexus_single_Cell = paste0(file_id_cellNexus_single_Cell, ".h5ad")) |> + # CuratedAtlasQueryR:::get_data_container( + # repository = NULL, + # cache_directory = cache.path, + # grouping_column = "file_id_cellNexus_single_Cell" + # ) |> + # rownames(), + # resources = tar_resources(crew = tar_resources_crew("slurm_1_20")), + # packages = c("arrow", "dplyr", "duckdb") + # ), + # Do metadata tar_target( metadata_split, - split_metadata(), + split_metadata(metadata_parquet), resources = tar_resources(crew = tar_resources_crew("slurm_1_20")), - packages = c("arrow", "dplyr", "duckdb", "tidyr", "dplyr", "purrr") + packages = c("arrow", "dplyr", "duckdb", "tidyr", "dplyr", "purrr", "glue") ), # Get SCE SMALL @@ -599,7 +606,7 @@ job::job({ tar_make( # callr_function = NULL, - reporter = "summary", + reporter = "verbose_positives", script = glue("{result_directory}/_targets.R"), store = glue("{result_directory}/_targets") ) @@ -617,7 +624,7 @@ tar_meta(store = glue("{result_directory}/_targets")) |> library(SummarizedExperiment) library(tidySummarizedExperiment) -sccomp_counts = readRDS("/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/sccomp_on_cellNexus_1_0_0/cell_metadata_1_0_0_sccomp_input_counts.rds") +sccomp_counts = readRDS("/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/sccomp_on_cellNexus_1_0_2/cell_metadata_1_0_2_sccomp_input_counts.rds") # Save into one SE job::job({ @@ -640,7 +647,7 @@ job::job({ zellkonverter::writeH5AD( se, verbose = TRUE, - file = "/vast/projects/cellxgene_curated/cellNexus/pseudobulk_sample_cell_type_1_0_0.h5ad", + file = "/vast/projects/cellxgene_curated/cellNexus/pseudobulk_sample_cell_type_1_0_2.h5ad", X_name = "counts_scaled" ) }) @@ -665,11 +672,11 @@ system("~/bin/rclone copy /vast/projects/cellxgene_curated/cellNexus/pseudobulk_ tar_read_raw("pseudobulk_file_id_quantile_normalised_processed_split_1_HDF5_1b4b19de1079ec95", store = glue("{result_directory}/_targets")) -# tar_invalidate(pseudobulk_file_id_quantile_normalised_processed_split_2_HDF5, store = glue("{result_directory}/_targets")) +# tar_invalidate(metadata_split, store = glue("{result_directory}/_targets")) # tar_invalidate(pseudobulk_file_id_quantile_normalised_processed_split_1, store = glue("{result_directory}/_targets")) tar_workspace( - pseudobulk_file_id_quantile_normalised_processed_split_1_HDF5_5f69024c54bd8215, + "metadata_split_SMALL", script = glue("{result_directory}/_targets.R"), store = glue("{result_directory}/_targets") ) diff --git a/dev/prepare_local_cache_splitting_du_dataset_and_cell_type.R b/dev/prepare_local_cache_splitting_du_dataset_and_cell_type.R index 665a798..efb3281 100644 --- a/dev/prepare_local_cache_splitting_du_dataset_and_cell_type.R +++ b/dev/prepare_local_cache_splitting_du_dataset_and_cell_type.R @@ -14,331 +14,408 @@ # # By integrating targets, crew, and various data manipulation packages (dplyr, tidyverse, SingleCellExperiment), this script ensures that large-scale scRNA-seq data processing is efficient, reproducible, and capable of leveraging parallel computing resources. It is designed to handle edge cases gracefully and provides a clear framework for preprocessing scRNA-seq data, which is essential for subsequent analyses such as clustering, differential expression, and cell type identification. -library(targets) -library(tidyverse) -store_file_cellNexus = "/vast/scratch/users/mangiola.s/targets_prepare_database_split_datasets_chunked" +library(arrow) +library(dplyr) +library(duckdb) +# Get Dharmesh metadata consensus +system("~/bin/rclone copy box_adelaide:/Mangiola_ImmuneAtlas/reannotation_consensus/cell_annotation_new.parquet /vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/") -tar_script({ - library(dplyr) - library(magrittr) - library(tibble) - library(targets) - library(tarchetypes) - library(crew) - library(crew.cluster) - tar_option_set( - memory = "transient", - garbage_collection = 100, - storage = "worker", retrieval = "worker", error = "continue", - # debug = "dataset_id_sce_ce393fc1e85f2cbc", - cue = tar_cue(mode = "never"), - workspace_on_error = TRUE, - controller = crew_controller_group( - list( - crew_controller_slurm( - name = "tier_1", - script_lines = "#SBATCH --mem 8G", - slurm_cpus_per_task = 1, - workers = 300, - tasks_max = 10, - verbose = T, - launch_max = 5, - seconds_idle = 30 - ), - - crew_controller_slurm( - name = "tier_2", - script_lines = "#SBATCH --mem 10G", - slurm_cpus_per_task = 1, - workers = 300, - tasks_max = 10, - verbose = T, - launch_max = 5, - seconds_idle = 30 - ), - crew_controller_slurm( - name = "tier_3", - script_lines = "#SBATCH --mem 30G", - slurm_cpus_per_task = 1, - workers = 300, - tasks_max = 10, - verbose = T, - launch_max = 5, - seconds_idle = 30 - ), - crew_controller_slurm( - name = "tier_4", - script_lines = "#SBATCH --mem 80G", - slurm_cpus_per_task = 1, - workers = 40, - tasks_max = 10, - verbose = T, - launch_max = 5, - seconds_idle = 30 - ) - ) - ), - trust_object_timestamps = TRUE - ) +job::job({ - - save_anndata = function(target_name_grouped_by_dataset_id, my_store, cache_directory, use_future = FALSE){ - - if(use_future) my_map = future_map2 - else my_map = map2 + get_file_ids = function(cell_annotation, cell_type_consensus_parquet){ - my_dataset_id = unique(target_name_grouped_by_dataset_id$dataset_id) + cell_consensus = + tbl( + dbConnect(duckdb::duckdb(), dbdir = ":memory:"), + sql(glue::glue("SELECT * FROM read_parquet('{cell_type_consensus_parquet}')")) + ) |> + select(cell_, dataset_id, cell_type_unified_ensemble, cell_type_unified) - my_consensus = + # This because f7c1c579-2dc0-47e2-ba19-8165c5a0e353 includes 13K samples + # It affects only very few datasets + sample_chunk_df = tbl( dbConnect(duckdb::duckdb(), dbdir = ":memory:"), - sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation_new.parquet')") + sql(glue::glue("SELECT * FROM read_parquet('{cell_annotation}')")) ) |> - filter(dataset_id == my_dataset_id) |> - # mutate(cell_ = paste(cell_, dataset_id, sep="___")) |> # ALREAD DONE - - # Lighten it up - select(cell_, dataset_id, cell_type_unified_ensemble) - - - # rename( - # cell_type_unified_ensemble = cell_type_unified_ensemble, - # cell_type_immune_confidence = confidence_score, - # cell_type_immune_independent_annotation_consensus = data_driven_consensus - # ) - + # Define chunks + count(dataset_id, sample_id, name = "cell_count") |> # Ensure unique dataset_id and sample_id combinations + distinct(dataset_id, sample_id, cell_count) |> # Ensure unique dataset_id and sample_id combinations + group_by(dataset_id) |> + dbplyr::window_order(cell_count) |> # Ensure dataset_id order + mutate(sample_index = row_number()) |> # Create sequential index within each dataset + mutate(sample_chunk = (sample_index - 1) %/% 100 + 1) |> # Assign chunks (up to 1000 samples per chunk) + mutate(cell_chunk = cumsum(cell_count) %/% 20000 + 1) |> # max 20K cells per sample + ungroup() - #--------------- - ### WHICH RELEASE - #--------------- + tbl( + dbConnect(duckdb::duckdb(), dbdir = ":memory:"), + sql(glue::glue("SELECT * FROM read_parquet('{cell_annotation}')")) + ) |> + left_join(cell_consensus, copy=TRUE) |> + left_join(sample_chunk_df |> select(dataset_id, sample_chunk, cell_chunk, sample_id), copy=TRUE) |> + # # Make sure I cover cell type even if consensus of harmonisation is not present (it should be the vast minority) + # mutate(temp_cell_type_label_for_file_id = if_else(cell_type_unified_ensemble |> is.na(), cell_type, cell_type_unified)) |> + # mutate(temp_cell_type_label_for_file_id = if_else(temp_cell_type_label_for_file_id |> is.na(), cell_type, temp_cell_type_label_for_file_id)) |> + # + # Define chunks + group_by(dataset_id, sample_chunk, cell_chunk, cell_type_unified, sample_id) |> + summarise(cell_count = n(), .groups = "drop") |> + group_by(dataset_id, sample_chunk, cell_chunk, cell_type_unified) |> + dbplyr::window_order(desc(cell_count)) |> + mutate(chunk = cumsum(cell_count) %/% 20000 + 1) |> # max 20K cells per sample + ungroup() |> + as_tibble() |> + + mutate(file_id_cellNexus_single_cell = + glue::glue("{dataset_id}___{sample_chunk}___{cell_chunk}___{cell_type_unified}") |> + sapply(digest::digest) |> + paste0("___", chunk) + ) + } + + + get_file_ids( + "/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation.parquet", + "/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation_new.parquet" + ) |> + write_parquet("/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/file_id_cellNexus_single_cell.parquet") + + gc() - cache_directory_counts = glue("{cache_directory}/original") - cache_directory_cpm = glue("{cache_directory}/cpm") + con <- dbConnect(duckdb::duckdb(), dbdir = ":memory:") + + # Create a view for cell_annotation in DuckDB + dbExecute(con, " + CREATE VIEW cell_metadata AS + SELECT + CONCAT(cell_, '___', dataset_id) AS cell_, + dataset_id, + * + FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata.parquet') +") + + # Create views for other tables + dbExecute(con, " + CREATE VIEW cell_consensus AS + SELECT * + FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation_new.parquet') +") + + dbExecute(con, " + CREATE VIEW cell_annotation AS + SELECT cell_, blueprint_first_labels_fine, monaco_first_labels_fine, azimuth_predicted_celltype_l2 + FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/annotation_tbl_light.parquet') +") + + dbExecute(con, " + CREATE VIEW empty_droplet_df AS + SELECT cell_, dataset_id, empty_droplet + FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation.parquet') +") + + dbExecute(con, " + CREATE VIEW file_id_cellNexus_single_cell AS + SELECT dataset_id, sample_chunk, cell_chunk, cell_type_unified, sample_id, file_id_cellNexus_single_cell + FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/file_id_cellNexus_single_cell.parquet') +") + + # Perform the left join and save to Parquet + copy_query <- " + COPY ( + SELECT + cell_metadata.cell_ AS cell_id, -- Rename cell_ to cell_id + cell_metadata.*, -- Include all other columns from cell_metadata + cell_annotation.*, -- Include all columns from cell_annotation + cell_consensus.*, -- Include all columns from cell_consensus + empty_droplet_df.*, -- Include all columns from empty_droplet_df + file_id_cellNexus_single_cell.* -- Include all columns from file_id_cellNexus_single_cell + FROM cell_metadata - dir.create(cache_directory_counts, recursive = TRUE, showWarnings = FALSE) - dir.create(cache_directory_cpm, recursive = TRUE, showWarnings = FALSE) + LEFT JOIN cell_annotation + ON cell_annotation.cell_ = cell_metadata.cell_ - # Begin processing the data pipeline with the initial dataset 'target_name_grouped_by_dataset_id' - sce_df = - target_name_grouped_by_dataset_id |> - - # Step 1: Read raw data for each 'target_name' and store it in a new column 'sce' - mutate( - sce = map( - target_name, - ~ tar_read_raw(.x, store = my_store) # Read the raw SingleCellExperiment object - ) - ) |> - - # This should not be needed, but there are some data sets with zero cells - filter(!map_lgl(sce, is.null)) + LEFT JOIN cell_consensus + ON cell_consensus.cell_ = cell_metadata.cell_ + AND cell_consensus.dataset_id = cell_metadata.dataset_id + + LEFT JOIN empty_droplet_df + ON empty_droplet_df.cell_ = cell_metadata.cell_ + AND empty_droplet_df.dataset_id = cell_metadata.dataset_id - if(nrow(sce_df) == 0) return(NULL) - - # plan(multisession, workers = 20) - - sce_df = - sce_df |> - - # Step 2: Remove any entries where 'sce' is NULL (i.e., failed to read data) - filter(!map_lgl(sce, is.null) ) |> - - # Step 3: Calculate CPM (Counts Per Million) for each 'sce' object - mutate( - sce = map( - sce, - ~ { - # Calculate CPM using 'calculateCPM' on the 'X' assay and store it in a new assay 'cpm' - assay(.x, "cpm") = calculateCPM(.x, assay.type = "X") - .x # Return the modified 'sce' object - } - ) - ) |> - - # Step 4: Group the data by 'dataset_id' and 'tar_group' for further summarization - group_by(dataset_id, tar_group, chunk) |> - - # FORCEFULLY drop all but counts and metadata - # int_colData(.x) = DataFrame(row.names = colnames(.x)) - # Creates error - # THIS SHOULD HAVE BEEN DONE IN THE TRANFORM HPCell - mutate(sce = map(sce, ~ SingleCellExperiment(assay = assays(.x), colData = colData(.x)) )) |> - - # Step 5: Combine all 'sce' objects within each group into a single 'sce' object - summarise( - sce = list( do.call(cbind, args = sce) ) - ) |> + LEFT JOIN file_id_cellNexus_single_cell + ON file_id_cellNexus_single_cell.sample_id = cell_consensus.sample_id + AND file_id_cellNexus_single_cell.dataset_id = cell_consensus.dataset_id + AND file_id_cellNexus_single_cell.cell_type_unified = cell_consensus.cell_type_unified - # Step 6: Join additional metadata to each 'sce' object - mutate( - sce = map( - sce, - ~ .x |> - left_join(my_consensus, by =c(".cell" = "cell_", "dataset_id"), copy = TRUE) # Merge consensus into 'sce' - ) - ) |> - - # Step 7: Split each 'sce' object based on 'cell_type' and create a nested tibble - mutate( - sce = map( - sce, - ~ .x |> - HPCell:::splitColData(colData(.x)$cell_type_unified_ensemble) |> # Split 'sce' by 'cell_type' - enframe(name = "cell_type_unified_ensemble", value = "sce") # Convert to tibble with 'cell_type' and 'sce' columns - ) - ) |> - - # Step 8: Unnest the list of 'sce' objects to have one row per 'cell_type' - unnest(sce) - - - - sce_df = - sce_df |> - - # Step 9: Generate a unique file identifier for each 'cell_type' within a dataset - unite("file_id_cellNexus", c(dataset_id, cell_type_unified_ensemble), sep = "___", remove=FALSE) |> - mutate(file_id_cellNexus = paste0(file_id_cellNexus, ".h5ad")) |> - mutate(file_id_cellNexus = map_chr(file_id_cellNexus, digest) ) |> - unite("file_id_cellNexus", c(file_id_cellNexus, chunk), sep = "___chunk") - - - # process_sce <- function(x, y, cache_directory_counts, my_assay) { - # tryCatch({ - # # Check if the 'sce' has only one cell (column) - # if (ncol(assay(x, my_assay)) == 1) { - # # Duplicate the assay to prevent saving errors due to single-column matrices - # my_assay <- cbind(assay(x, my_assay), assay(x, my_assay)) - # - # # Rename the second column to distinguish it - # colnames(my_assay)[2] <- paste0("DUMMY", "___", colnames(my_assay)[2]) - # } else { - # # Use the assay as is if more than one cell - # my_assay <- assay(x, my_assay) - # } - # - # # Create a new SingleCellExperiment object with the adjusted counts assay - # new_sce <- SingleCellExperiment(assays = list(counts = my_assay)) - # - # # Save the experiment data to the specified counts cache directory - # save_experiment_data(new_sce, glue("{cache_directory_counts}/{y}")) - # - # return(TRUE) # Indicate successful saving - # - # }, error = function(e) { - # message("Error in process_sce: ", e$message) - # return(NA) # Return NA to indicate an error occurred - # }) - # } - # - # # Run `mclapply` in parallel across `y` and `z` - # results <- mclapply( - # seq_along(y), - # function(i) process_sce(y[[i]], z[[i]], cache_directory_counts, "X"), - # mc.cores = 20 - # ) - # - # results <- mclapply( - # seq_along(y), - # function(i) process_sce(y[[i]], z[[i]], cache_directory_cpm, "cpm"), - # mc.cores = 20 - # ) - # - # sce_df |> select(-sce) - - sce_df|> - - # Step 10: Save the raw counts data for each 'sce' object - mutate( - saved_raw_counts = map2( - sce, file_id_cellNexus, - ~ { - - # Check if the 'sce' has only one cell (column) - if(ncol(assay(.x)) == 1) { - - # Duplicate the assay to prevent saving errors due to single-column matrices - my_assay = cbind(assay(.x), assay(.x)) - - # Rename the second column to distinguish it - colnames(my_assay)[2] = paste0("DUMMY", "___", colnames(my_assay)[2]) - } else { - # Use the assay as is if more than one cell - my_assay = assay(.x) - } - - # Create a new SingleCellExperiment object with the adjusted counts assay - SingleCellExperiment(assays = list(counts = my_assay)) |> - - # Save the experiment data to the specified counts cache directory - save_experiment_data(glue("{cache_directory_counts}/{.y}")) - - return(TRUE) # Indicate successful saving - } - ) - ) |> - - # Step 11: Save the CPM data for each 'sce' object - mutate( - saved_cpm = map2( - sce, file_id_cellNexus, - ~ { - - # Check if the 'cpm' assay has only one cell - if(ncol(assay(.x, "cpm")) == 1) { + ) TO '/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_3.parquet' + (FORMAT PARQUET, COMPRESSION 'gzip'); +" + + # Execute the final query to write the result to a Parquet file + dbExecute(con, copy_query) + + # Disconnect from the database + dbDisconnect(con, shutdown = TRUE) + + system("~/bin/rclone copy /vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_3.parquet box_adelaide:/Mangiola_ImmuneAtlas/taskforce_shared_folder/") + +}) - # Duplicate the 'cpm' assay to avoid single-column issues - my_assay = cbind(assay(.x, "cpm"), assay(.x, "cpm")) - # Rename the second column - colnames(my_assay)[2] = paste0("DUMMY", "___", colnames(my_assay)[2]) - } else { - # Use the 'cpm' assay as is - my_assay = assay(.x, "cpm") - } +cell_metadata = + tbl( + dbConnect(duckdb::duckdb(), dbdir = ":memory:"), + sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_3.parquet')") + ) - # Create a new SingleCellExperiment object with the adjusted CPM assay - SingleCellExperiment(assays = list(cpm = my_assay)) |> - # Save the experiment data to the specified CPM cache directory - save_experiment_data(glue("{cache_directory_cpm}/{.y}")) +library(targets) +library(tidyverse) +store_file_cellNexus = "/vast/scratch/users/mangiola.s/targets_prepare_database_split_datasets_chunked_3" - return(TRUE) # Indicate successful saving - } +tar_script({ + library(dplyr) + library(magrittr) + library(tibble) + library(targets) + library(tarchetypes) + library(crew) + library(crew.cluster) + tar_option_set( + memory = "transient", + garbage_collection = 100, + storage = "worker", + retrieval = "worker", + error = "continue", + # debug = "target_name_grouped_by_dataset_id", + cue = tar_cue(mode = "never"), + workspace_on_error = TRUE, + controller = crew_controller_group( + list( + crew_controller_slurm( + name = "tier_1", + script_lines = "#SBATCH --mem 8G", + slurm_cpus_per_task = 1, + workers = 300, + tasks_max = 50, + verbose = T, + crashes_error = 5, + seconds_idle = 30 + ), + + crew_controller_slurm( + name = "tier_2", + script_lines = "#SBATCH --mem 10G", + slurm_cpus_per_task = 1, + workers = 300, + tasks_max = 10, + verbose = T, + crashes_error = 5, + seconds_idle = 30 + ), + crew_controller_slurm( + name = "tier_3", + script_lines = "#SBATCH --mem 30G", + slurm_cpus_per_task = 1, + workers = 200, + tasks_max = 10, + verbose = T, + crashes_error = 5, + seconds_idle = 30 + ), + crew_controller_slurm( + name = "tier_4", + script_lines = "#SBATCH --mem 150G", + slurm_cpus_per_task = 20, + workers = 50, + tasks_max = 10, + verbose = T, + crashes_error = 5, + seconds_idle = 30 + ), + crew_controller_slurm( + name = "tier_5", + script_lines = "#SBATCH --mem 400G", + slurm_cpus_per_task = 1, + workers = 2, + tasks_max = 10, + verbose = T, + crashes_error = 5, + seconds_idle = 30 ) - ) |> - - # Step 12: Remove the 'sce' column from the final output as it's no longer needed - select(-sce) + ) + ), + trust_object_timestamps = TRUE, + workspaces = "dataset_id_sce_52dbec3c15f98d66" + ) + + + save_anndata = function(dataset_id_sce, cache_directory, use_future = FALSE){ + + dir.create(cache_directory, showWarnings = FALSE, recursive = TRUE) + + # Parallelise + cores = as.numeric(Sys.getenv("SLURM_CPUS_PER_TASK", unset = 1)) + bp <- MulticoreParam(workers = cores , progressbar = TRUE) # Adjust the number of workers as needed + + dataset_id_sce |> + purrr::transpose() |> + bplapply( + FUN = function(x) { + + .x = x[[2]] + .y = x[[1]] + + # Check if the 'sce' has only one cell (column) + if(ncol(assay(.x)) == 1) { + + # Duplicate the assay to prevent saving errors due to single-column matrices + my_assay = cbind(assay(.x), assay(.x)) + # Rename the second column to distinguish it + colnames(my_assay)[2] = paste0("DUMMY", "___", colnames(my_assay)[2]) + + cd = colData(.x) + cd = cd |> rbind(cd) + rownames(cd)[2] = paste0("DUMMY", "___", rownames(cd)[2]) + + + + .x = SingleCellExperiment(assay = list( my_assay ) |> set_names(names(assays(.x))[1]), colData = cd) + } + + + # TEMPORARY FOR SOME REASON THE MIN COUNTS IS NOT 0 FOR SOME SAMPLES + .x = HPCell:::check_if_assay_minimum_count_is_zero_and_correct_TEMPORARY(.x, assays(.x) |> names() |> _[1]) + + + # My attempt to save a integer, sparse, delayed matrix (with zellkonverter it is not possible to save integers) + # .x |> assay() |> type() <- "integer" + # .x |> saveHDF5SummarizedExperiment("~/temp", as.sparse = T, replace = T) + + # Save the experiment data to the specified counts cache directory + .x |> save_experiment_data(glue("{cache_directory}/{.y}")) + + return(TRUE) # Indicate successful saving + }, + BPPARAM = bp # Use the defined parallel backend + ) + + return("saved") + + # dataset_id_sce|> + # + # # Step 10: Save the raw counts data for each 'sce' object + # mutate( + # saved_raw_counts = map2( + # sce, cell_type_unified_ensemble, + # ~ { + # + # # Check if the 'sce' has only one cell (column) + # if(ncol(assay(.x)) == 1) { + # + # # Duplicate the assay to prevent saving errors due to single-column matrices + # my_assay = cbind(assay(.x), assay(.x)) + # + # # Rename the second column to distinguish it + # colnames(my_assay)[2] = paste0("DUMMY", "___", colnames(my_assay)[2]) + # } else { + # # Use the assay as is if more than one cell + # my_assay = assay(.x) + # } + # + # # TEMPORARY FOR SOME REASON THE MIN COUNTS IS NOT 0 FOR SOME SAMPLES + # .x = HPCell:::check_if_assay_minimum_count_is_zero_and_correct_TEMPORARY(.x, assays(.x) |> names() |> _[1]) + # + # + # # My attempt to save a integer, sparse, delayed matrix (with zellkonverter it is not possible to save integers) + # # .x |> assay() |> type() <- "integer" + # # .x |> saveHDF5SummarizedExperiment("~/temp", as.sparse = T, replace = T) + # + # # Save the experiment data to the specified counts cache directory + # .x |> save_experiment_data(glue("{cache_directory_counts}/{.y}")) + # + # return(TRUE) # Indicate successful saving + # } + # ) + # ) |> + # + # # Step 11: Save the CPM data for each 'sce' object + # mutate( + # saved_cpm = map2( + # sce, file_id_cellNexus, + # ~ { + # + # # Check if the 'cpm' assay has only one cell + # if(ncol(assay(.x, "cpm")) == 1) { + # + # # Duplicate the 'cpm' assay to avoid single-column issues + # my_assay = cbind(assay(.x, "cpm"), assay(.x, "cpm")) + # + # # Rename the second column + # colnames(my_assay)[2] = paste0("DUMMY", "___", colnames(my_assay)[2]) + # } else { + # # Use the 'cpm' assay as is + # my_assay = assay(.x, "cpm") + # } + # + # # Create a new SingleCellExperiment object with the adjusted CPM assay + # SingleCellExperiment(assays = list(cpm = my_assay)) |> + # + # # Save the experiment data to the specified CPM cache directory + # save_experiment_data(glue("{cache_directory_cpm}/{.y}")) + # + # return(TRUE) # Indicate successful saving + # } + # ) + # ) |> + # + # # Step 12: Remove the 'sce' column from the final output as it's no longer needed + # select(-sce) } - cbind_sce_by_dataset_id = function(target_name_grouped_by_dataset_id, chunk_tbl){ - + cbind_sce_by_dataset_id = function(target_name_grouped_by_dataset_id, file_id_db_file, my_store){ my_dataset_id = unique(target_name_grouped_by_dataset_id$dataset_id) - - + file_id_db = + tbl( + dbConnect(duckdb::duckdb(), dbdir = ":memory:"), + sql(glue("SELECT * FROM read_parquet('{file_id_db_file}')")) + ) |> + filter(dataset_id == my_dataset_id) |> + select(cell_id, sample_id, dataset_id, file_id_cellNexus_single_cell) |> + as_tibble() + + # Parallelise + cores = as.numeric(Sys.getenv("SLURM_CPUS_PER_TASK", unset = 1)) + bp <- MulticoreParam(workers = cores , progressbar = TRUE) # Adjust the number of workers as needed + # Begin processing the data pipeline with the initial dataset 'target_name_grouped_by_dataset_id' sce_df = target_name_grouped_by_dataset_id |> # Step 1: Read raw data for each 'target_name' and store it in a new column 'sce' mutate( - sce = map( + sce = bplapply( target_name, - ~ tar_read_raw(.x, store = my_store) # Read the raw SingleCellExperiment object + FUN = function(x) tar_read_raw(x, store = "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store"), # Read the raw SingleCellExperiment object + BPPARAM = bp # Use the defined parallel backend ) ) |> # This should not be needed, but there are some data sets with zero cells filter(!map_lgl(sce, is.null)) - - if(nrow(sce_df) == 0) return(NULL) + + + if(nrow(sce_df) == 0) { + warning("this chunk has no rows for somereason.") + return(NULL) + } # plan(multisession, workers = 20) @@ -356,110 +433,54 @@ tar_script({ mutate(sce = map(sce, ~ SingleCellExperiment(assay = assays(.x), colData = colData(.x)) )) |> # Step 5: Combine all 'sce' objects within each group into a single 'sce' object - summarise( - sce = - do.call(cbind, args = sce) |> - left_join(chunk_tbl) |> - - ) |> - - - # Step 7: Split each 'sce' object based on 'cell_type' and create a nested tibble - mutate( - sce = map( - sce, - ~ .x |> - HPCell:::splitColData(colData(.x)$cell_type_unified_ensemble) |> # Split 'sce' by 'cell_type' - enframe(name = "cell_type_unified_ensemble", value = "sce") # Convert to tibble with 'cell_type' and 'sce' columns - ) - ) |> + summarise( sce = list(do.call(cbind, args = sce) ) ) |> + mutate(sce = map(sce, + ~ { .x = + .x |> + left_join(file_id_db, by = join_by(.cell==cell_id, dataset_id==dataset_id, sample_id==sample_id)) + .x |> + HPCell:::splitColData(colData(.x)$file_id_cellNexus_single_cell) |> # Split 'sce' by 'cell_type' + enframe(name = "file_id_cellNexus_single_cell", value = "sce") # Convert to tibble with 'cell_type' and 'sce' columns + })) |> # Step 8: Unnest the list of 'sce' objects to have one row per 'cell_type' unnest(sce) - - sce_df|> - - # Step 10: Save the raw counts data for each 'sce' object - mutate( - saved_raw_counts = map2( - sce, file_id_cellNexus, - ~ { - - # Check if the 'sce' has only one cell (column) - if(ncol(assay(.x)) == 1) { - - # Duplicate the assay to prevent saving errors due to single-column matrices - my_assay = cbind(assay(.x), assay(.x)) - - # Rename the second column to distinguish it - colnames(my_assay)[2] = paste0("DUMMY", "___", colnames(my_assay)[2]) - } else { - # Use the assay as is if more than one cell - my_assay = assay(.x) - } - - # Create a new SingleCellExperiment object with the adjusted counts assay - sce = SingleCellExperiment(assays = list(counts = my_assay)) - - sce <<<>>> convert assay to HDF5 sparse integers - - # Save the experiment data to the specified counts cache directory - save_experiment_data(glue("{cache_directory_counts}/{.y}")) - - return(TRUE) # Indicate successful saving - } - ) - ) |> - - # Step 11: Save the CPM data for each 'sce' object - mutate( - saved_cpm = map2( - sce, file_id_cellNexus, - ~ { - - # Check if the 'cpm' assay has only one cell - if(ncol(assay(.x, "cpm")) == 1) { - - # Duplicate the 'cpm' assay to avoid single-column issues - my_assay = cbind(assay(.x, "cpm"), assay(.x, "cpm")) - - # Rename the second column - colnames(my_assay)[2] = paste0("DUMMY", "___", colnames(my_assay)[2]) - } else { - # Use the 'cpm' assay as is - my_assay = assay(.x, "cpm") - } - - # Create a new SingleCellExperiment object with the adjusted CPM assay - SingleCellExperiment(assays = list(cpm = my_assay)) |> - - # Save the experiment data to the specified CPM cache directory - save_experiment_data(glue("{cache_directory_cpm}/{.y}")) - - return(TRUE) # Indicate successful saving - } - ) - ) |> - - # Step 12: Remove the 'sce' column from the final output as it's no longer needed - select(-sce) } - get_dataset_id = function(target_name, my_store){ sce = tar_read_raw(target_name, store = my_store) - if(sce |> is.null()) return("sce_0_cells") - sce |> pull(dataset_id) |> unique() + + if(sce |> is.null()) return(tibble(sample_id = character(), dataset_id= character(), target_name= character())) + + sce |> distinct(sample_id, dataset_id) |> mutate(target_name = !!target_name) + } + + create_chunks_for_reading_and_saving = function(dataset_id_sample_id, cell_metadata){ + dataset_id_sample_id |> + left_join( + tbl( + dbConnect(duckdb::duckdb(), dbdir = ":memory:"), + sql(glue("SELECT * FROM read_parquet('{cell_metadata}')")) + ) |> + distinct(dataset_id, sample_id, sample_chunk, cell_chunk) |> + as_tibble(), + copy=T + ) } list( # The input DO NOT DELETE tar_target(my_store, "/vast/projects/mangiola_immune_map/PostDoc/immuneHealthyBodyMap/census_hpcell_oct_2024/target_store", deployment = "main"), - #tar_target(cache_directory, "/vast/projects/cellxgene_curated/cellNexus/cellxgene/29_10_2024", deployment = "main"), - tar_target(cache_directory, "/vast/scratch/users/mangiola.s/cellNexus/cellxgene/11_11_2024", deployment = "main"), + tar_target(cache_directory, "/vast/scratch/users/mangiola.s/cellNexus/cellxgene/19_11_2024", deployment = "main"), + tar_target( + cell_metadata, + "/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_3.parquet", + packages = c( "arrow","dplyr","duckdb") + + ), tar_target( target_name, @@ -471,7 +492,7 @@ tar_script({ deployment = "main" ), tar_target( - dataset_id, + dataset_id_sample_id, get_dataset_id(target_name, my_store), packages = "tidySingleCellExperiment", pattern = map(target_name), @@ -479,55 +500,37 @@ tar_script({ crew = tar_resources_crew(controller = "tier_1") ) ), - tar_target( - chunk_tbl, - { - cell_annotation = - tbl( - dbConnect(duckdb::duckdb(), dbdir = ":memory:"), - sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation_new.parquet')") - ) |> - select(cell_, dataset_id, cell_type_unified_ensemble) - - - tbl( - dbConnect(duckdb::duckdb(), dbdir = ":memory:"), - sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation.parquet')") - ) |> - left_join(cell_annotation, copy=TRUE) |> - - # Define chunks - group_by(dataset_id, cell_type_unified_ensemble, sample_id) |> - summarise(cell_count = n(), .groups = "drop") |> - group_by(dataset_id, cell_type_unified_ensemble) |> - dbplyr::window_order(desc(cell_count)) |> - mutate(chunk = cumsum(cell_count) %/% 20000 + 1) |> # max 20K cells per sample - ungroup() |> - as_tibble() - }, deployment = "main", packages = c("tidyverse", "dbplyr", "arrow", "dplyr", "duckdb") - ), - - tar_target( - sce_dataset_id, - chunk_tbl - ) - + tar_target( target_name_grouped_by_dataset_id, - tibble(target_name = target_name, dataset_id = dataset_id) |> - group_by(dataset_id) |> + create_chunks_for_reading_and_saving(dataset_id_sample_id, cell_metadata) |> + group_by(dataset_id, sample_chunk, cell_chunk) |> tar_group(), iteration = "group", resources = tar_resources( crew = tar_resources_crew(controller = "tier_3") - ) + ), + packages = c("arrow", "duckdb", "dplyr", "glue") + ), + tar_target( dataset_id_sce, - save_anndata(target_name_grouped_by_dataset_id, my_store, cache_directory), + cbind_sce_by_dataset_id(target_name_grouped_by_dataset_id, cell_metadata, my_store = my_store), pattern = map(target_name_grouped_by_dataset_id), - packages = c("tidySingleCellExperiment", "SingleCellExperiment", "tidyverse", "glue", "digest", "HPCell", "digest", "scater", "arrow", "dplyr", "duckdb", "furrr", "future", "parallel"), + packages = c("tidySingleCellExperiment", "SingleCellExperiment", "tidyverse", "glue", "digest", "HPCell", "digest", "scater", "arrow", "dplyr", "duckdb", "BiocParallel", "parallelly"), + resources = tar_resources( + crew = tar_resources_crew(controller = "tier_3") + ) + ), + + + tar_target( + saved_dataset, + save_anndata(dataset_id_sce, paste0(cache_directory, "single_cell/counts")), + pattern = map(dataset_id_sce), + packages = c("tidySingleCellExperiment", "SingleCellExperiment", "tidyverse", "glue", "digest", "HPCell", "digest", "scater", "arrow", "dplyr", "duckdb", "BiocParallel", "parallelly"), resources = tar_resources( crew = tar_resources_crew(controller = "tier_4") ) @@ -535,262 +538,34 @@ tar_script({ ) + }, script = paste0(store_file_cellNexus, "_target_script.R"), ask = FALSE) job::job({ - store_file_cellNexus = "/vast/scratch/users/mangiola.s/targets_prepare_database_split_datasets_chunked" - tar_make(script = paste0(store_file_cellNexus, "_target_script.R"), store = store_file_cellNexus, - reporter = "summary") + tar_make( + script = paste0(store_file_cellNexus, "_target_script.R"), + store = store_file_cellNexus, + reporter = "summary" #, callr_function = NULL + ) }) tar_make(script = paste0(store_file_cellNexus, "_target_script.R"), store = store_file_cellNexus, callr_function = NULL) -tar_workspaces(store = store_file_cellNexus) +tar_workspace(target_name_grouped_by_dataset_id, store = store_file_cellNexus, script = paste0(store_file_cellNexus, "_target_script.R")) -tar_read_raw(name = "dataset_id_sce_0b71a2a4781c9418", store = store_file_cellNexus) +job::job({ dataset_id_sce = tar_read_raw("dataset_id_sce_ed698af04de55aef", store = store_file_cellNexus) }) tar_meta(store = store_file_cellNexus) |> arrange(desc(time)) |> filter(!error |> is.na()) |> select(name, error) -tar_workspace("dataset_id_sce_b0a3540d0d2d257d", store = store_file_cellNexus, script = paste0(store_file_cellNexus, "_target_script.R")) - -# tar_invalidate(dataset_id_sce, store = store_file_cellNexus) - -# # PREPARE METADATA FINAL -# file_id_tbl = -# tbl( -# dbConnect(duckdb::duckdb(), dbdir = ":memory:"), -# sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata_cell_type_consensus_v1_0_0.parquet')") -# ) |> -# distinct(dataset_id, cell_type_unified_ensemble) |> -# as_tibble() |> -# unite("file_id_cellNexus", c(dataset_id, cell_type_unified_ensemble), sep = "___", remove=FALSE) |> -# mutate(file_id_cellNexus = map_chr(file_id_cellNexus, digest::digest) ) -# - - -library(arrow) -library(dplyr) -library(duckdb) - - -# Get Dharmesh metadata consensus -system("~/bin/rclone copy box_adelaide:/Mangiola_ImmuneAtlas/reannotation_consensus/cell_annotation_new.parquet /vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/") - - - - -job::job({ - - get_file_ids = function(cell_annotation, cell_type_consensus_parquet){ - - cell_consensus = - tbl( - dbConnect(duckdb::duckdb(), dbdir = ":memory:"), - sql(glue::glue("SELECT * FROM read_parquet('{cell_type_consensus_parquet}')")) - ) |> - select(cell_, dataset_id, cell_type_unified_ensemble, cell_type_unified) - - - tbl( - dbConnect(duckdb::duckdb(), dbdir = ":memory:"), - sql(glue::glue("SELECT * FROM read_parquet('{cell_annotation}')")) - ) |> - left_join(cell_consensus, copy=TRUE) |> - - # # Make sure I cover cell type even if consensus of harmonisation is not present (it should be the vast minority) - # mutate(temp_cell_type_label_for_file_id = if_else(cell_type_unified_ensemble |> is.na(), cell_type, cell_type_unified)) |> - # mutate(temp_cell_type_label_for_file_id = if_else(temp_cell_type_label_for_file_id |> is.na(), cell_type, temp_cell_type_label_for_file_id)) |> - # - # Define chunks - group_by(dataset_id, cell_type_unified, sample_id) |> - summarise(cell_count = n(), .groups = "drop") |> - group_by(dataset_id, cell_type_unified) |> - dbplyr::window_order(desc(cell_count)) |> - mutate(chunk = cumsum(cell_count) %/% 20000 + 1) |> # max 20K cells per sample - ungroup() |> - as_tibble() |> - - mutate(file_id_cellNexus_single_cell = - glue::glue("{dataset_id}___{cell_type_unified}") |> - sapply(digest::digest) |> - paste0("___", chunk) - ) - } - - - get_file_ids( - "/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation.parquet", - "/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation_new.parquet" - ) |> - write_parquet("/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/file_id_cellNexus_single_cell.parquet") - - gc() - - con <- dbConnect(duckdb::duckdb(), dbdir = ":memory:") - - # Create a view for cell_annotation in DuckDB - dbExecute(con, " - CREATE VIEW cell_metadata AS - SELECT - CONCAT(cell_, '___', dataset_id) AS cell_, - dataset_id, - * - FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata.parquet') -") - - # Create views for other tables - dbExecute(con, " - CREATE VIEW cell_consensus AS - SELECT * - FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation_new.parquet') -") - - dbExecute(con, " - CREATE VIEW cell_annotation AS - SELECT cell_, blueprint_first_labels_fine, monaco_first_labels_fine, azimuth_predicted_celltype_l2 - FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/annotation_tbl_light.parquet') -") - - dbExecute(con, " - CREATE VIEW empty_droplet_df AS - SELECT cell_, dataset_id, empty_droplet - FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_annotation.parquet') -") - - dbExecute(con, " - CREATE VIEW file_id_cellNexus_single_cell AS - SELECT dataset_id, cell_type_unified, sample_id, file_id_cellNexus_single_cell - FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/file_id_cellNexus_single_cell.parquet') -") - - # Perform the left join and save to Parquet - copy_query <- " - COPY ( - SELECT * - FROM cell_metadata - - LEFT JOIN cell_annotation - ON cell_annotation.cell_ = cell_metadata.cell_ - - LEFT JOIN cell_consensus - ON cell_consensus.cell_ = cell_metadata.cell_ - AND cell_consensus.dataset_id = cell_metadata.dataset_id - - LEFT JOIN empty_droplet_df - ON empty_droplet_df.cell_ = cell_metadata.cell_ - AND empty_droplet_df.dataset_id = cell_metadata.dataset_id - - LEFT JOIN file_id_cellNexus_single_cell - ON file_id_cellNexus_single_cell.sample_id = cell_consensus.sample_id - AND file_id_cellNexus_single_cell.dataset_id = cell_consensus.dataset_id - AND file_id_cellNexus_single_cell.cell_type_unified = cell_consensus.cell_type_unified - - ) TO '/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_1.parquet' - (FORMAT PARQUET, COMPRESSION 'gzip'); -" - - # Execute the final query to write the result to a Parquet file - dbExecute(con, copy_query) - - # Disconnect from the database - dbDisconnect(con, shutdown = TRUE) - - system("~/bin/rclone copy /vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_1.parquet box_adelaide:/Mangiola_ImmuneAtlas/taskforce_shared_folder/") - - -# #---------------- -# # Add File ID -# #---------------- -# -# file_id_tbl = -# tbl( -# dbConnect(duckdb::duckdb(), dbdir = ":memory:"), -# sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata_cell_type_consensus_TEMP.parquet')") -# ) |> -# distinct(dataset_id, cell_type_unified_ensemble) |> -# as_tibble() |> -# unite("file_id_cellNexus", c(dataset_id, cell_type_unified_ensemble), sep = "___", remove=FALSE) |> -# mutate(file_id_cellNexus = map_chr(file_id_cellNexus, digest::digest) ) |> -# write_parquet("/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata_file_id_TEMP.parquet") -# -# -# con <- dbConnect(duckdb::duckdb(), dbdir = ":memory:") -# -# dbExecute(con, " -# CREATE VIEW cell_metadata_cell_type_consensus AS -# SELECT * -# FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata_cell_type_consensus_TEMP.parquet') -# ") -# -# dbExecute(con, " -# CREATE VIEW cell_metadata_file_id AS -# SELECT * -# FROM read_parquet('/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata_file_id_TEMP.parquet') -# ") -# -# # Perform the join and save the result to Parquet -# join_query <- " -# COPY ( -# SELECT * -# FROM cell_metadata_cell_type_consensus -# -# LEFT JOIN cell_metadata_file_id -# ON cell_metadata_cell_type_consensus.dataset_id = cell_metadata_file_id.dataset_id -# AND cell_metadata_cell_type_consensus.cell_type_unified_ensemble = cell_metadata_file_id.cell_type_unified_ensemble -# -# ) TO '/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_0.parquet' -# (FORMAT PARQUET, COMPRESSION 'gzip'); -# " -# -# # Execute the join query -# dbExecute(con, join_query) -# -# # Disconnect from the database -# dbDisconnect(con, shutdown = TRUE) -# -# file.remove("/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata_file_id_TEMP.parquet") -# file.remove("/vast/projects/cellxgene_curated/metadata_cellxgenedp_Apr_2024/cell_metadata_cell_type_consensus_TEMP.parquet") - - }) - - - -# process_h5ad_files("/vast/projects/cellxgene_curated/cellxgene/", "/vast/projects/cellxgene_curated/cellNexus/cellxgene") +tar_workspace("saved_dataset_ed3c60d5343228ee", store = store_file_cellNexus, script = paste0(store_file_cellNexus, "_target_script.R")) -############## -# PLOTS # -############## +# tar_invalidate(target_name_grouped_by_dataset_id, store = store_file_cellNexus) +# tar_delete(dataset_id_sce, , store = store_file_cellNexus) +tar_read(dataset_id_sce, store = store_file_cellNexus) -cell_metadata = - tbl( - dbConnect(duckdb::duckdb(), dbdir = ":memory:"), - sql("SELECT * FROM read_parquet('/vast/projects/cellxgene_curated/cellNexus/cell_metadata_cell_type_consensus_v1_0_1.parquet')") - ) - -cell_metadata_v1_0_0 |> - filter(age_days > 365) |> - distinct(donor_id, tissue_groups) |> - ggplot(aes(fct_infreq(tissue_groups))) + - geom_bar() + - scale_y_log10() + - theme(axis.text.x = element_text(angle = 90, vjust = 0.5, hjust = 1)) - -cell_metadata_v1_0_0 |> - filter(age_days > 365) |> - distinct(sample_id, donor_id, tissue_groups) |> - as_tibble() |> - nest(data = -tissue_groups) |> - mutate(n_sample = map_int(data, ~ .x |> distinct(sample_id) |> nrow())) |> - mutate(n_donor = map_int(data, ~ .x |> distinct(donor_id) |> nrow())) |> - ggplot(aes(n_sample, n_donor)) + - geom_point() + - geom_text(aes(label = tissue_groups)) + - scale_y_log10() + - scale_x_log10() + - theme(axis.text.x = element_text(angle = 90, vjust = 0.5, hjust = 1))