Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bottleneck in base::file.info() for large pipelines #1403

Open
wlandau opened this issue Dec 17, 2024 · 6 comments
Open

bottleneck in base::file.info() for large pipelines #1403

wlandau opened this issue Dec 17, 2024 · 6 comments

Comments

@wlandau
Copy link
Member

wlandau commented Dec 17, 2024

#1398 was not the only bottleneck in the part of the code that queries files in _targets/objects. base::file.info() itself is also a bottleneck for pipelines with large numbers of files. statx() and FTS (fts_open()/fts_read()) are a lot faster.

@wlandau wlandau self-assigned this Dec 17, 2024
@wlandau
Copy link
Member Author

wlandau commented Dec 17, 2024

FTS works on most posix systems, apparently. statx is a bit faster but only works on some linux systems (need to check which).

@wlandau
Copy link
Member Author

wlandau commented Dec 17, 2024

Could this arguably fit in autometric (on the grounds that it helps query log files)? Or even ps?

@wlandau
Copy link
Member Author

wlandau commented Dec 17, 2024

fts looks promising in my initial benchmarks. it's competitive with statx.

@wlandau
Copy link
Member Author

wlandau commented Dec 18, 2024

performant FTS implementation:

#include <R.h>
#include <Rinternals.h>
#include <fts.h>
#include <sys/stat.h>
#include <string.h>
#include <errno.h>

SEXP fts(SEXP directory) {
  const char *path_argv[] = {CHAR(STRING_ELT(directory, 0)), NULL};
  FTS *fts = fts_open((char* const*) path_argv, FTS_LOGICAL, NULL);
  if (!fts) {
    Rf_error("fts_open() failed: %s", strerror(errno));
  }
  int capacity = 2048;
  int count = 0;
  FTSENT *entry;
  SEXP path;
  SEXP size;
  SEXP mtime;
  PROTECT_INDEX index_path;
  PROTECT_INDEX index_size;
  PROTECT_INDEX index_mtime;
  PROTECT_WITH_INDEX(path = allocVector(STRSXP, capacity), &index_path);
  PROTECT_WITH_INDEX(size = allocVector(REALSXP, capacity), &index_size);
  PROTECT_WITH_INDEX(mtime = allocVector(REALSXP, capacity), &index_mtime);
  while ((entry = fts_read(fts)) != NULL) {
    R_CheckUserInterrupt();
    if (entry->fts_info == FTS_F) {
      if (count == capacity) {
        capacity *= 2;
        REPROTECT(path = Rf_xlengthgets(path, capacity), index_path);
        REPROTECT(size = Rf_xlengthgets(size, capacity), index_size);
        REPROTECT(mtime = Rf_xlengthgets(mtime, capacity), index_mtime);
      }
      SET_STRING_ELT(path, count, mkChar(entry->fts_path));
      REAL(size)[count] = (double) entry->fts_statp->st_size;
      REAL(mtime)[count] = (double) entry->fts_statp->st_mtime;
      count++;
    }
  }
  fts_close(fts);
  REPROTECT(path = Rf_xlengthgets(path, count), index_path);
  REPROTECT(size = Rf_xlengthgets(size, count), index_size);
  REPROTECT(mtime = Rf_xlengthgets(mtime, count), index_mtime);
  SEXP result = PROTECT(allocVector(VECSXP, 3));
  SEXP names = PROTECT(allocVector(STRSXP, 3));
  SET_STRING_ELT(names, 0, mkChar("path"));
  SET_STRING_ELT(names, 1, mkChar("size"));
  SET_STRING_ELT(names, 2, mkChar("mtime"));
  SET_VECTOR_ELT(result, 0, path);
  SET_VECTOR_ELT(result, 1, size);
  SET_VECTOR_ELT(result, 2, mtime);
  setAttrib(result, R_NamesSymbol, names);
  UNPROTECT(5);
  return result;
}

@wlandau
Copy link
Member Author

wlandau commented Dec 18, 2024

It's highly dependent on the file system, and on the state of the file system caches, but I'm seeing improvements. Reprex on an old slow file system on a shared RHEL9 cluster:

directory <- "files"
if (!file.exists(directory)) {
  dir.create(directory)
}
files <- seq_len(5e4)
random_data <- function() {
  out <- list()
  rows <- sample(seq(from = 800, to = 1200), size = 1)
  for (name in paste0("x", seq_len(32L))) {
    out[[name]] <- runif(rows)
  }
  as.data.frame(out)
}
temp <- lapply(files, \(file) {
  if (!(file %% 100)) print(file)
  saveRDS(
    object = random_data(),
    file = file.path(directory, paste0("test-data-", file, ".rds")),
    compress = FALSE
  )
})

paths <- list.files("files", full.names = TRUE)
system.time(out_base <- file.info(paths, extra_cols = FALSE))
#>   user  system elapsed 
#>  0.033   0.345 102.515 

file_info_fts <- inline::cfunction(
  sig = c(directory = "character"),
  includes = c(
    "#include <R.h>",
    "#include <Rinternals.h>",
    "#include <fts.h>",
    "#include <sys/stat.h>",
    "#include <string.h>",
    "#include <errno.h>"
  ),
  body = "const char *path_argv[] = {CHAR(STRING_ELT(directory, 0)), NULL};
    FTS *fts = fts_open((char* const*) path_argv, FTS_LOGICAL, NULL);
    if (!fts) {
      Rf_error(\"fts_open() failed: %s\", strerror(errno));
    }
    int capacity = 2048;
    int count = 0;
    FTSENT *entry;
    SEXP path;
    SEXP size;
    SEXP mtime;
    PROTECT_INDEX index_path;
    PROTECT_INDEX index_size;
    PROTECT_INDEX index_mtime;
    PROTECT_WITH_INDEX(path = allocVector(STRSXP, capacity), &index_path);
    PROTECT_WITH_INDEX(size = allocVector(REALSXP, capacity), &index_size);
    PROTECT_WITH_INDEX(mtime = allocVector(REALSXP, capacity), &index_mtime);
    while ((entry = fts_read(fts)) != NULL) {
      R_CheckUserInterrupt();
      if (entry->fts_info == FTS_F) {
        if (count == capacity) {
          capacity *= 2;
          REPROTECT(path = Rf_xlengthgets(path, capacity), index_path);
          REPROTECT(size = Rf_xlengthgets(size, capacity), index_size);
          REPROTECT(mtime = Rf_xlengthgets(mtime, capacity), index_mtime);
        }
        SET_STRING_ELT(path, count, mkChar(entry->fts_path));
        REAL(size)[count] = (double) entry->fts_statp->st_size;
        REAL(mtime)[count] = (double) entry->fts_statp->st_mtime;
        count++;
      }
    }
    fts_close(fts);
    REPROTECT(path = Rf_xlengthgets(path, count), index_path);
    REPROTECT(size = Rf_xlengthgets(size, count), index_size);
    REPROTECT(mtime = Rf_xlengthgets(mtime, count), index_mtime);
    SEXP result = PROTECT(allocVector(VECSXP, 3));
    SEXP names = PROTECT(allocVector(STRSXP, 3));
    SET_STRING_ELT(names, 0, mkChar(\"path\"));
    SET_STRING_ELT(names, 1, mkChar(\"size\"));
    SET_STRING_ELT(names, 2, mkChar(\"mtime\"));
    SET_VECTOR_ELT(result, 0, path);
    SET_VECTOR_ELT(result, 1, size);
    SET_VECTOR_ELT(result, 2, mtime);
    setAttrib(result, R_NamesSymbol, names);
    UNPROTECT(5);
    return result;"
)

system.time(out_fts <- file_info_fts("files"))
#>   user  system elapsed 
#>  0.010   0.100   4.497 

@wlandau
Copy link
Member Author

wlandau commented Dec 20, 2024

I proposed a custom implementation in wlandau/autometric#5 and wlandau/autometric#6. Ideally though, there should be a robust home in fs: r-lib/fs#484.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants