Skip to content

New AWS jobstore. #5123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,6 @@ py313_main:
- make test threads="${TEST_THREADS}" tests="src/toil/test/src src/toil/test/utils"
- TOIL_SKIP_DOCKER=true make test threads="${TEST_THREADS}" tests=src/toil/test/lib


slurm_test:
rules:
- if: $CI_PIPELINE_SOURCE == "schedule"
Expand Down
4 changes: 2 additions & 2 deletions contrib/admin/mypy-with-ignore.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ def main():

# TODO: Remove these paths as typing is added and mypy conflicts are addressed.
# These are handled as path prefixes.
ignore_paths = [os.path.abspath(f) for f in [

ignore_paths = [os.path.join(pkg_root, f) for f in [
'docs/_build',
'docker/Dockerfile.py',
'docs/conf.py',
Expand Down Expand Up @@ -66,7 +67,6 @@ def main():
'src/toil/jobStores/aws/__init__.py',
'src/toil/utils/__init__.py',
'src/toil/lib/throttle.py',
'src/toil/lib/iterables.py',
'src/toil/lib/bioio.py',
'src/toil/lib/ec2.py',
'src/toil/lib/expando.py',
Expand Down
1 change: 0 additions & 1 deletion src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,6 @@ class JobDescription(Requirer):
Subclassed into variants for checkpoint jobs and service jobs that have
their specific parameters.
"""

def __init__(
self,
requirements: Mapping[str, Union[int, str, bool]],
Expand Down
35 changes: 18 additions & 17 deletions src/toil/jobStores/abstractJobStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
Optional,
Union,
cast,
overload,
overload
)
from urllib.error import HTTPError
from urllib.parse import ParseResult, urlparse
Expand Down Expand Up @@ -88,7 +88,7 @@ def __init__(self, url: ParseResult) -> None:
class NoSuchJobException(Exception):
"""Indicates that the specified job does not exist."""

def __init__(self, jobStoreID: FileID):
def __init__(self, jobStoreID: Union[FileID, str]):
"""
:param str jobStoreID: the jobStoreID that was mistakenly assumed to exist
"""
Expand All @@ -98,7 +98,7 @@ def __init__(self, jobStoreID: FileID):
class ConcurrentFileModificationException(Exception):
"""Indicates that the file was attempted to be modified by multiple processes at once."""

def __init__(self, jobStoreFileID: FileID):
def __init__(self, jobStoreFileID: Union[FileID, str]):
"""
:param jobStoreFileID: the ID of the file that was modified by multiple workers
or processes concurrently
Expand All @@ -110,7 +110,7 @@ class NoSuchFileException(Exception):
"""Indicates that the specified file does not exist."""

def __init__(
self, jobStoreFileID: FileID, customName: Optional[str] = None, *extra: Any
self, jobStoreFileID: Union[FileID, str], customName: Optional[str] = None, *extra: Any
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good reason to say this can be a string now? Is it too hard to drag the typed file ID object through in the new implementation for some good reason?

When this is a string, is this meant to be the string-packed version of the file ID? Or just the ID part without e.g. the size packed in?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess some of the user-facing file ID API still accepts file IDs typed as strings, so maybe we do still need to be able to take them here.

):
"""
:param jobStoreFileID: the ID of the file that was mistakenly assumed to exist
Expand Down Expand Up @@ -138,11 +138,12 @@ class NoSuchJobStoreException(LocatorException):
def __init__(self, locator: str, prefix: str):
"""
:param str locator: The location of the job store
:param str prefix: The type of job store
"""
super().__init__(
"The job store '%s' does not exist, so there is nothing to restart.",
locator,
prefix,
prefix
)


Expand All @@ -157,7 +158,7 @@ def __init__(self, locator: str, prefix: str):
"The job store '%s' already exists. Use --restart to resume the workflow, or remove "
"the job store with 'toil clean' to start the workflow from scratch.",
locator,
prefix,
prefix
)


Expand Down Expand Up @@ -224,7 +225,7 @@ def write_config(self) -> None:
) as fileHandle:
pickle.dump(self.__config, fileHandle, pickle.HIGHEST_PROTOCOL)

def resume(self) -> None:
def resume(self, sse_key_path: Optional[str] = None) -> None:
"""
Connect this instance to the physical storage it represents and load the Toil configuration
into the :attr:`AbstractJobStore.config` attribute.
Expand Down Expand Up @@ -748,7 +749,6 @@ def _open_url(cls, url: ParseResult) -> IO[bytes]:
"""
raise NotImplementedError(f"No implementation for {url}")

@classmethod
@abstractmethod
def _write_to_url(
cls,
Expand Down Expand Up @@ -1155,15 +1155,6 @@ def assign_job_id(self, job_description: JobDescription) -> None:
"""
raise NotImplementedError()

@contextmanager
def batch(self) -> Iterator[None]:
"""
If supported by the batch system, calls to create() with this context
manager active will be performed in a batch after the context manager
is released.
"""
yield

@deprecated(new_function_name="create_job")
def create(self, jobDescription: JobDescription) -> JobDescription:
return self.create_job(jobDescription)
Expand Down Expand Up @@ -1261,6 +1252,15 @@ def load_job(self, job_id: str) -> JobDescription:
def update(self, jobDescription: JobDescription) -> None:
return self.update_job(jobDescription)

@contextmanager
def batch(self) -> Iterator[None]:
"""
If supported by the batch system, calls to create() with this context
manager active will be performed in a batch after the context manager
is released.
"""
yield

@abstractmethod
def update_job(self, job_description: JobDescription) -> None:
"""
Expand Down Expand Up @@ -1502,6 +1502,7 @@ def read_file_stream(
) -> ContextManager[IO[str]]: ...

@abstractmethod
@contextmanager # type: ignore
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we really only need/want @contextmanager on the implementation? It doesn't really help on the unimplemented stub, and it apparently means we have to break out of typing.

def read_file_stream(
self,
file_id: Union[FileID, str],
Expand Down
Loading
Loading