-
Notifications
You must be signed in to change notification settings - Fork 75
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
bottleneck in base::file.info() for large pipelines #1403
Comments
FTS works on most posix systems, apparently. statx |
Could this arguably fit in |
fts looks promising in my initial benchmarks. it's competitive with statx. |
performant FTS implementation: #include <R.h>
#include <Rinternals.h>
#include <fts.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>
SEXP fts(SEXP directory) {
const char *path_argv[] = {CHAR(STRING_ELT(directory, 0)), NULL};
FTS *fts = fts_open((char* const*) path_argv, FTS_LOGICAL, NULL);
if (!fts) {
Rf_error("fts_open() failed: %s", strerror(errno));
}
int capacity = 2048;
int count = 0;
FTSENT *entry;
SEXP path;
SEXP size;
SEXP mtime;
PROTECT_INDEX index_path;
PROTECT_INDEX index_size;
PROTECT_INDEX index_mtime;
PROTECT_WITH_INDEX(path = allocVector(STRSXP, capacity), &index_path);
PROTECT_WITH_INDEX(size = allocVector(REALSXP, capacity), &index_size);
PROTECT_WITH_INDEX(mtime = allocVector(REALSXP, capacity), &index_mtime);
while ((entry = fts_read(fts)) != NULL) {
R_CheckUserInterrupt();
if (entry->fts_info == FTS_F) {
if (count == capacity) {
capacity *= 2;
REPROTECT(path = Rf_xlengthgets(path, capacity), index_path);
REPROTECT(size = Rf_xlengthgets(size, capacity), index_size);
REPROTECT(mtime = Rf_xlengthgets(mtime, capacity), index_mtime);
}
SET_STRING_ELT(path, count, mkChar(entry->fts_path));
REAL(size)[count] = (double) entry->fts_statp->st_size;
REAL(mtime)[count] = (double) entry->fts_statp->st_mtime;
count++;
}
}
fts_close(fts);
REPROTECT(path = Rf_xlengthgets(path, count), index_path);
REPROTECT(size = Rf_xlengthgets(size, count), index_size);
REPROTECT(mtime = Rf_xlengthgets(mtime, count), index_mtime);
SEXP result = PROTECT(allocVector(VECSXP, 3));
SEXP names = PROTECT(allocVector(STRSXP, 3));
SET_STRING_ELT(names, 0, mkChar("path"));
SET_STRING_ELT(names, 1, mkChar("size"));
SET_STRING_ELT(names, 2, mkChar("mtime"));
SET_VECTOR_ELT(result, 0, path);
SET_VECTOR_ELT(result, 1, size);
SET_VECTOR_ELT(result, 2, mtime);
setAttrib(result, R_NamesSymbol, names);
UNPROTECT(5);
return result;
} |
It's highly dependent on the file system, and on the state of the file system caches, but I'm seeing improvements. Reprex on an old slow file system on a shared RHEL9 cluster: directory <- "files"
if (!file.exists(directory)) {
dir.create(directory)
}
files <- seq_len(5e4)
random_data <- function() {
out <- list()
rows <- sample(seq(from = 800, to = 1200), size = 1)
for (name in paste0("x", seq_len(32L))) {
out[[name]] <- runif(rows)
}
as.data.frame(out)
}
temp <- lapply(files, \(file) {
if (!(file %% 100)) print(file)
saveRDS(
object = random_data(),
file = file.path(directory, paste0("test-data-", file, ".rds")),
compress = FALSE
)
})
paths <- list.files("files", full.names = TRUE)
system.time(out_base <- file.info(paths, extra_cols = FALSE))
#> user system elapsed
#> 0.033 0.345 102.515
file_info_fts <- inline::cfunction(
sig = c(directory = "character"),
includes = c(
"#include <R.h>",
"#include <Rinternals.h>",
"#include <fts.h>",
"#include <sys/stat.h>",
"#include <string.h>",
"#include <errno.h>"
),
body = "const char *path_argv[] = {CHAR(STRING_ELT(directory, 0)), NULL};
FTS *fts = fts_open((char* const*) path_argv, FTS_LOGICAL, NULL);
if (!fts) {
Rf_error(\"fts_open() failed: %s\", strerror(errno));
}
int capacity = 2048;
int count = 0;
FTSENT *entry;
SEXP path;
SEXP size;
SEXP mtime;
PROTECT_INDEX index_path;
PROTECT_INDEX index_size;
PROTECT_INDEX index_mtime;
PROTECT_WITH_INDEX(path = allocVector(STRSXP, capacity), &index_path);
PROTECT_WITH_INDEX(size = allocVector(REALSXP, capacity), &index_size);
PROTECT_WITH_INDEX(mtime = allocVector(REALSXP, capacity), &index_mtime);
while ((entry = fts_read(fts)) != NULL) {
R_CheckUserInterrupt();
if (entry->fts_info == FTS_F) {
if (count == capacity) {
capacity *= 2;
REPROTECT(path = Rf_xlengthgets(path, capacity), index_path);
REPROTECT(size = Rf_xlengthgets(size, capacity), index_size);
REPROTECT(mtime = Rf_xlengthgets(mtime, capacity), index_mtime);
}
SET_STRING_ELT(path, count, mkChar(entry->fts_path));
REAL(size)[count] = (double) entry->fts_statp->st_size;
REAL(mtime)[count] = (double) entry->fts_statp->st_mtime;
count++;
}
}
fts_close(fts);
REPROTECT(path = Rf_xlengthgets(path, count), index_path);
REPROTECT(size = Rf_xlengthgets(size, count), index_size);
REPROTECT(mtime = Rf_xlengthgets(mtime, count), index_mtime);
SEXP result = PROTECT(allocVector(VECSXP, 3));
SEXP names = PROTECT(allocVector(STRSXP, 3));
SET_STRING_ELT(names, 0, mkChar(\"path\"));
SET_STRING_ELT(names, 1, mkChar(\"size\"));
SET_STRING_ELT(names, 2, mkChar(\"mtime\"));
SET_VECTOR_ELT(result, 0, path);
SET_VECTOR_ELT(result, 1, size);
SET_VECTOR_ELT(result, 2, mtime);
setAttrib(result, R_NamesSymbol, names);
UNPROTECT(5);
return result;"
)
system.time(out_fts <- file_info_fts("files"))
#> user system elapsed
#> 0.010 0.100 4.497 |
I proposed a custom implementation in wlandau/autometric#5 and wlandau/autometric#6. Ideally though, there should be a robust home in |
#1398 was not the only bottleneck in the part of the code that queries files in
_targets/objects
.base::file.info()
itself is also a bottleneck for pipelines with large numbers of files.statx()
and FTS (fts_open()
/fts_read()
) are a lot faster.The text was updated successfully, but these errors were encountered: