Skip to content

Commit 2fe1d65

Browse files
committed
Implement other ToilFsAccess operations without local copies
1 parent c5d5167 commit 2fe1d65

File tree

6 files changed

+210
-113
lines changed

6 files changed

+210
-113
lines changed

src/toil/cwl/cwltoil.py

Lines changed: 104 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1293,26 +1293,26 @@ def open(self, fn: str, mode: str) -> IO[Any]:
12931293
if parse.scheme in ["", "file"]:
12941294
# Handle local files
12951295
return open(self._abs(path), mode)
1296-
elif parse.scheme in ["toilfile", "toildir"]:
1296+
elif parse.scheme == "toildir":
1297+
contents, subpath, cache_key = decode_directory(path)
1298+
if cache_key in self.dir_to_download:
1299+
# This is already available locally, so fall back on the local copy
1300+
return open(self._abs(path), mode)
1301+
else:
1302+
# We need to get the URI out of the virtual directory
1303+
if subpath is None:
1304+
raise RuntimeError(f"{fn} is a toildir directory")
1305+
uri = get_from_structure(contents, subpath)
1306+
if not isinstance(uri, str):
1307+
raise RuntimeError(f"{fn} does not point to a file")
1308+
# Recurse on that URI
1309+
return self.open(uri, mode)
1310+
elif parse.scheme == "toilfile":
12971311
if self.file_store is None:
12981312
raise RuntimeError("URL requires a file store: " + fn)
1299-
1300-
if parse.scheme == "toildir":
1301-
contents, subpath, cache_key = decode_directory(path)
1302-
if cache_key in self.dir_to_download:
1303-
# This is already available locally, so fall back on the local copy
1304-
return open(self._abs(path), mode)
1305-
else:
1306-
# We need to get the URI out of the virtual directory
1307-
uri = get_from_structure(contents, subpath)
1308-
if not isinstance(uri, str):
1309-
raise RuntimeError(f"{fn} does not point to a file")
1310-
# Recurse on that URI
1311-
return self.open(uri, mode)
1312-
elif parse.scheme == "toilfile":
1313-
file_id = FileID.unpack(fn[len("toilfile:") :])
1314-
encoding = None if "b" in mode else "utf-8"
1315-
return self.file_store.readGlobalFileStream(file_id, encoding)
1313+
file_id = FileID.unpack(fn[len("toilfile:") :])
1314+
encoding = None if "b" in mode else "utf-8"
1315+
return self.file_store.readGlobalFileStream(file_id, encoding)
13161316
else:
13171317
# This should be supported by a job store.
13181318
byte_stream = AbstractJobStore.open_url(fn)
@@ -1325,61 +1325,102 @@ def open(self, fn: str, mode: str) -> IO[Any]:
13251325

13261326
def exists(self, path: str) -> bool:
13271327
"""Test for file existence."""
1328-
# toil's _abs() throws errors when files are not found and cwltool's _abs() does not
1329-
try:
1330-
# TODO: Also implement JobStore-supported URLs through JobStore methods.
1331-
return os.path.exists(self._abs(path))
1332-
except NoSuchFileException:
1333-
return False
1328+
parse = urlparse(fn)
1329+
if parse.scheme in ["", "file"]:
1330+
# Handle local files
1331+
# toil's _abs() throws errors when files are not found and cwltool's _abs() does not
1332+
try:
1333+
return os.path.exists(self._abs(path))
1334+
except NoSuchFileException:
1335+
return False
1336+
elif parse.scheme == "toildir":
1337+
contents, subpath, cache_key = decode_directory(path)
1338+
if subpath is None:
1339+
# The toildir directory itself exists
1340+
return True
1341+
uri = get_from_structure(contents, subpath)
1342+
if uri is None:
1343+
# It's not in the virtual directory, so it doesn't exist
1344+
return False
1345+
# We recurse and poll the URI directly to make sure it really exists
1346+
return self.exists(uri)
1347+
elif parse.scheme == "toilfile":
1348+
# TODO: we assume CWL can't call deleteGlobalFile and so the file always exists
1349+
return True
1350+
else:
1351+
# This should be supported by a job store.
1352+
return AbstractJobStore.url_exists(fn)
13341353

13351354
def size(self, path: str) -> int:
1336-
# This should avoid _abs for things actually in the file store, to
1337-
# prevent multiple downloads as in
1338-
# https://github.com/DataBiosphere/toil/issues/3665
1339-
if path.startswith("toilfile:"):
1340-
if self.file_store is None:
1341-
raise RuntimeError("URL requires a file store: " + path)
1342-
return self.file_store.getGlobalFileSize(
1343-
FileID.unpack(path[len("toilfile:") :])
1344-
)
1345-
elif path.startswith("toildir:"):
1355+
parse = urlparse(path)
1356+
if parse.scheme in ["", "file"]:
1357+
return os.stat(self._abs(path)).st_size
1358+
elif parse.scheme == "toildir":
13461359
# Decode its contents, the path inside it to the file (if any), and
13471360
# the key to use for caching the directory.
1348-
here, subpath, cache_key = decode_directory(path)
1361+
contents, subpath, cache_key = decode_directory(path)
13491362

13501363
# We can't get the size of just a directory.
13511364
if subpath is None:
13521365
raise RuntimeError(f"Attempted to check size of directory {path}")
13531366

1354-
for part in subpath.split("/"):
1355-
# Follow the path inside the directory contents.
1356-
here = cast(DirectoryContents, here[part])
1367+
uri = get_from_structure(contents, subpath)
13571368

1358-
# We ought to end up with a toilfile: URI.
1359-
if not isinstance(here, str):
1369+
# We ought to end up with a URI.
1370+
if not isinstance(uri, str):
13601371
raise RuntimeError(f"Did not find a file at {path}")
1361-
if not here.startswith("toilfile:"):
1362-
raise RuntimeError(f"Did not find a filestore file at {path}")
1363-
1364-
return self.size(here)
1372+
return self.size(uri)
1373+
elif parse.scheme == "toilfile":
1374+
if self.file_store is None:
1375+
raise RuntimeError("URL requires a file store: " + path)
1376+
return self.file_store.getGlobalFileSize(
1377+
FileID.unpack(path[len("toilfile:") :])
1378+
)
13651379
else:
1366-
# TODO: Also implement JobStore-supported URLs through JobStore methods.
1367-
# We know this falls back on _abs
1368-
return super().size(path)
1380+
# This should be supported by a job store.
1381+
size = AbstractJobStore.get_size(path)
1382+
if size is None:
1383+
# get_size can be unimplemented or unavailable
1384+
raise RuntimeError(f"Could not get size of {path}")
1385+
return size
13691386

13701387
def isfile(self, fn: str) -> bool:
1388+
if not self.exists(fn):
1389+
# Nonexistent things aren't files
1390+
return False
13711391
parse = urlparse(fn)
1372-
if parse.scheme in ["toilfile", "toildir", "file", ""]:
1373-
# We know this falls back on _abs
1374-
return super().isfile(fn)
1392+
if parse.scheme in ["file", ""]:
1393+
return os.path.isfile(self._abs(path))
1394+
elif parse.scheme == "toilfile":
1395+
return True
1396+
elif parse.scheme == "toildir":
1397+
contents, subpath, cache_key = decode_directory(path)
1398+
if subpath is None:
1399+
# This is the toildir directory itself
1400+
return False
1401+
found = get_from_structure(contents, subpath)
1402+
# If we find a string, that's a file
1403+
return isinstance(found, str)
13751404
else:
13761405
return not AbstractJobStore.get_is_directory(fn)
13771406

13781407
def isdir(self, fn: str) -> bool:
1408+
if not self.exists(fn):
1409+
# Nonexistent things aren't directories
1410+
return False
13791411
parse = urlparse(fn)
1380-
if parse.scheme in ["toilfile", "toildir", "file", ""]:
1381-
# We know this falls back on _abs
1382-
return super().isdir(fn)
1412+
if parse.scheme in ["file", ""]:
1413+
return os.path.isdir(self._abs(path))
1414+
elif parse.scheme == "toilfile":
1415+
return False
1416+
elif parse.scheme == "toildir":
1417+
contents, subpath, cache_key = decode_directory(path)
1418+
if subpath is None:
1419+
# This is the toildir directory itself
1420+
return True
1421+
found = get_from_structure(contents, subpath)
1422+
# If we find a dict, that's a directory
1423+
return isinstance(found, dict)
13831424
else:
13841425
return AbstractJobStore.get_is_directory(fn)
13851426

@@ -1389,12 +1430,19 @@ def listdir(self, fn: str) -> List[str]:
13891430
logger.debug("ToilFsAccess listing %s", fn)
13901431

13911432
parse = urlparse(fn)
1392-
if parse.scheme in ["toilfile", "toildir", "file", ""]:
1393-
# Download the file or directory to a local path
1433+
if parse.scheme in ["file", ""]:
1434+
# Find the local path
13941435
directory = self._abs(fn)
1395-
13961436
# Now list it (it is probably a directory)
13971437
return [abspath(quote(entry), fn) for entry in os.listdir(directory)]
1438+
elif parse.scheme == "toilfile":
1439+
raise RuntimeError(f"Cannot list a file: {fn}")
1440+
elif parse.scheme == "toildir":
1441+
here, subpath, cache_key = decode_directory(path)
1442+
if subpath is not None:
1443+
here = get_from_structure(contents, here)
1444+
# List all the things in here and make full URIs to them
1445+
return [os.path.join(fn, k) for k in here.keys()]
13981446
else:
13991447
return [
14001448
os.path.join(fn, entry.rstrip("/"))

0 commit comments

Comments
 (0)