Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log more usefully for CWL workflows #4736

Merged
merged 10 commits into from
Jan 16, 2024
8 changes: 4 additions & 4 deletions attic/toil-sort-example.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def setup(job, input_file_id, n, down_checkpoints):
Returns the FileID of the sorted file
"""
# Write the input file to the file store
job.fileStore.logToMaster("Starting the merge sort")
job.fileStore.log_to_leader("Starting the merge sort")
return job.addChildJobFn(down,
input_file_id, n,
down_checkpoints=down_checkpoints,
Expand All @@ -33,7 +33,7 @@ def down(job, input_file_id, n, down_checkpoints):
length = os.path.getsize(input_file)
if length > n:
# We will subdivide the file
job.fileStore.logToMaster("Splitting file: %s of size: %s"
job.fileStore.log_to_leader("Splitting file: %s of size: %s"
% (input_file_id, length), level=logging.CRITICAL)
# Split the file into two copies
mid_point = get_midpoint(input_file, 0, length)
Expand All @@ -52,7 +52,7 @@ def down(job, input_file_id, n, down_checkpoints):
memory='600M').rv()).rv()
else:
# We can sort this bit of the file
job.fileStore.logToMaster("Sorting file: %s of size: %s"
job.fileStore.log_to_leader("Sorting file: %s of size: %s"
% (input_file_id, length), level=logging.CRITICAL)
# Sort the copy and write back to the fileStore
output_file = job.fileStore.getLocalTempFile()
Expand All @@ -66,7 +66,7 @@ def up(job, input_file_id_1, input_file_id_2):
with job.fileStore.writeGlobalFileStream() as (fileHandle, output_id):
with job.fileStore.readGlobalFileStream(input_file_id_1) as inputFileHandle1:
with job.fileStore.readGlobalFileStream(input_file_id_2) as inputFileHandle2:
job.fileStore.logToMaster("Merging %s and %s to %s"
job.fileStore.log_to_leader("Merging %s and %s to %s"
% (input_file_id_1, input_file_id_2, output_id))
merge(inputFileHandle1, inputFileHandle2, fileHandle)

Expand Down
58 changes: 48 additions & 10 deletions src/toil/cwl/cwltoil.py
Original file line number Diff line number Diff line change
Expand Up @@ -1719,6 +1719,7 @@ def import_files(
skip_broken: bool = False,
skip_remote: bool = False,
bypass_file_store: bool = False,
log_level: int = logging.DEBUG
) -> None:
"""
Prepare all files and directories.
Expand Down Expand Up @@ -1761,12 +1762,21 @@ def import_files(

:param bypass_file_store: If True, leave file:// URIs in place instead of
importing files and directories.

:param log_level: Log imported files at the given level.
"""
tool_id = cwl_object.get("id", str(cwl_object)) if cwl_object else ""

logger.debug("Importing files for %s", tool_id)
logger.debug("Importing files in %s", cwl_object)

def import_and_log(url: str) -> FileID:
"""
Upload a file and log that we are doing so.
"""
logger.log(log_level, "Loading %s...", url)
return import_function(url)

# We need to upload all files to the Toil filestore, and encode structure
# recursively into all Directories' locations. But we cannot safely alter
# the listing fields of Directory objects, because the handling required by
Expand Down Expand Up @@ -1865,7 +1875,7 @@ def visit_file_or_directory_up(

# Upload the file itself, which will adjust its location.
upload_file(
import_function, fileindex, existing, rec, skip_broken=skip_broken, skip_remote=skip_remote
import_and_log, fileindex, existing, rec, skip_broken=skip_broken, skip_remote=skip_remote
)

# Make a record for this file under its name
Expand Down Expand Up @@ -2116,12 +2126,15 @@ def toilStageFiles(
cwljob: Union[CWLObjectType, List[CWLObjectType]],
outdir: str,
destBucket: Union[str, None] = None,
log_level: int = logging.DEBUG
) -> None:
"""
Copy input files out of the global file store and update location and path.

:param destBucket: If set, export to this base URL instead of to the local
filesystem.

:param log_level: Log each file transfered at the given level.
"""

def _collectDirEntries(
Expand Down Expand Up @@ -2161,7 +2174,6 @@ def _realpath(
stage_listing=True,
)
for _, p in pm.items():
logger.debug("Staging output: %s", p)
if p.staged:
# We're supposed to copy/expose something.
# Note that we have to handle writable versions of everything
Expand Down Expand Up @@ -2213,14 +2225,19 @@ def _realpath(

if file_id_or_contents.startswith("toilfile:"):
# This is something we can export
destUrl = "/".join(s.strip("/") for s in [destBucket, baseName])
# TODO: Do we need to urlencode the parts before sending them to S3?
dest_url = "/".join(s.strip("/") for s in [destBucket, baseName])
logger.log(log_level, "Saving %s...", dest_url)
toil.export_file(
FileID.unpack(file_id_or_contents[len("toilfile:") :]),
destUrl,
dest_url,
)
# TODO: can a toildir: "file" get here?
else:
# We are saving to the filesystem so we only really need export_file for actual files.
# We are saving to the filesystem.
dest_url = "file://" + quote(p.target)

# We only really need export_file for actual files.
if not os.path.exists(p.target) and p.type in [
"Directory",
"WritableDirectory",
Expand All @@ -2229,6 +2246,7 @@ def _realpath(
if p.type in ["File", "WritableFile"]:
if p.resolved.startswith("/"):
# Probably staging and bypassing file store. Just copy.
logger.log(log_level, "Saving %s...", dest_url)
os.makedirs(os.path.dirname(p.target), exist_ok=True)
shutil.copyfile(p.resolved, p.target)
else:
Expand All @@ -2241,16 +2259,18 @@ def _realpath(
)

# Actually export from the file store
logger.log(log_level, "Saving %s...", dest_url)
os.makedirs(os.path.dirname(p.target), exist_ok=True)
toil.export_file(
FileID.unpack(uri[len("toilfile:") :]),
"file://" + p.target,
dest_url,
)
if p.type in [
"CreateFile",
"CreateWritableFile",
]:
# We just need to make a file with particular contents
logger.log(log_level, "Saving %s...", dest_url)
os.makedirs(os.path.dirname(p.target), exist_ok=True)
with open(p.target, "wb") as n:
n.write(p.resolved.encode("utf-8"))
Expand Down Expand Up @@ -2635,6 +2655,8 @@ def run(self, file_store: AbstractFileStore) -> Any:

logger.debug("Emitting output: %s", output)

file_store.log_to_leader(f"CWL step complete: {runtime_context.name}")

# metadata[process_uuid] = {
# 'started_at': started_at,
# 'ended_at': ended_at,
Expand Down Expand Up @@ -3767,9 +3789,10 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
Callable[[str], FileID],
functools.partial(toil.import_file, symlink=True),
)

# Import all the input files, some of which may be missing optional
# files.
logger.info("Importing input files...")
fs_access = ToilFsAccess(options.basedir)
import_files(
file_import_function,
Expand All @@ -3780,10 +3803,12 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
skip_broken=True,
skip_remote=options.reference_inputs,
bypass_file_store=options.bypass_file_store,
log_level=logging.INFO,
)
# Import all the files associated with tools (binaries, etc.).
# Not sure why you would have an optional secondary file here, but
# the spec probably needs us to support them.
logger.info("Importing tool-associated files...")
visitSteps(
tool,
functools.partial(
Expand All @@ -3795,6 +3820,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
skip_broken=True,
skip_remote=options.reference_inputs,
bypass_file_store=options.bypass_file_store,
log_level=logging.INFO,
),
)

Expand All @@ -3807,7 +3833,8 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
# were required.
rm_unprocessed_secondary_files(param_value)

logger.debug("tool %s", tool)
logger.info("Creating root job")
logger.debug("Root tool: %s", tool)
try:
wf1, _ = makeJob(
tool=tool,
Expand All @@ -3820,6 +3847,7 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:
logging.error(err)
return CWL_UNSUPPORTED_REQUIREMENT_EXIT_CODE
wf1.cwljob = initialized_job_order
logger.info("Starting workflow")
try:
outobj = toil.start(wf1)
except FailedJobsException as err:
Expand All @@ -3835,13 +3863,20 @@ def main(args: Optional[List[str]] = None, stdout: TextIO = sys.stdout) -> int:

# Now the workflow has completed. We need to make sure the outputs (and
# inputs) end up where the user wants them to be.

logger.info("Collecting workflow outputs...")
outobj = resolve_dict_w_promises(outobj)

# Stage files. Specify destination bucket if specified in CLI
# options. If destination bucket not passed in,
# options.destBucket's value will be None.
toilStageFiles(toil, outobj, outdir, destBucket=options.destBucket)
toilStageFiles(
toil,
outobj,
outdir,
destBucket=options.destBucket,
log_level=logging.INFO
)
logger.info("Stored workflow outputs")

if runtime_context.research_obj is not None:
cwltool.cwlprov.writablebagfile.create_job(
Expand Down Expand Up @@ -3879,6 +3914,7 @@ def remove_at_id(doc: Any) -> None:
)

if not options.destBucket and options.compute_checksum:
logger.info("Computing output file checksums...")
visit_class(
outobj,
("File",),
Expand All @@ -3887,6 +3923,8 @@ def remove_at_id(doc: Any) -> None:

visit_class(outobj, ("File",), MutationManager().unset_generation)
stdout.write(json.dumps(outobj, indent=4, default=str))
stdout.write("\n")
logger.info("CWL run complete!")

return 0

Expand Down
8 changes: 7 additions & 1 deletion src/toil/fileStores/abstractFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ def write(self, fileName: str) -> None:
os.rename(fileName + '.tmp', fileName)

# Functions related to logging
def logToMaster(self, text: str, level: int = logging.INFO) -> None:
def log_to_leader(self, text: str, level: int = logging.INFO) -> None:
"""
Send a logging message to the leader. The message will also be \
logged by the worker at the same level.
Expand All @@ -613,6 +613,12 @@ def logToMaster(self, text: str, level: int = logging.INFO) -> None:
logger.log(level=level, msg=("LOG-TO-MASTER: " + text))
self.loggingMessages.append(dict(text=text, level=level))


@deprecated(new_function_name='export_file')
def logToMaster(self, text: str, level: int = logging.INFO) -> None:
self.log_to_leader(text, level)


# Functions run after the completion of the job.
@abstractmethod
def startCommit(self, jobState: bool = False) -> None:
Expand Down
6 changes: 3 additions & 3 deletions src/toil/fileStores/cachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -1065,11 +1065,11 @@ def open(self, job: Job) -> Generator[None, None, None]:
disk_usage: str = (f"Job {self.jobName} used {percent:.2f}% disk ({bytes2human(disk)}B [{disk}B] used, "
f"{bytes2human(self.jobDiskBytes)}B [{self.jobDiskBytes}B] requested).")
if disk > self.jobDiskBytes:
self.logToMaster("Job used more disk than requested. For CWL, consider increasing the outdirMin "
self.log_to_leader("Job used more disk than requested. For CWL, consider increasing the outdirMin "
f"requirement, otherwise, consider increasing the disk requirement. {disk_usage}",
level=logging.WARNING)
else:
self.logToMaster(disk_usage, level=logging.DEBUG)
self.log_to_leader(disk_usage, level=logging.DEBUG)

# Go back up to the per-worker local temp directory.
os.chdir(startingDir)
Expand Down Expand Up @@ -1811,7 +1811,7 @@ def deleteGlobalFile(self, fileStoreID):
# Add the file to the list of files to be deleted from the job store
# once the run method completes.
self.filesToDelete.add(str(fileStoreID))
self.logToMaster('Added file with ID \'%s\' to the list of files to be' % fileStoreID +
self.log_to_leader('Added file with ID \'%s\' to the list of files to be' % fileStoreID +
' globally deleted.', level=logging.DEBUG)

@deprecated(new_function_name='export_file')
Expand Down
4 changes: 2 additions & 2 deletions src/toil/fileStores/nonCachingFileStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ def open(self, job: Job) -> Generator[None, None, None]:
disk_usage = (f"Job {self.jobName} used {percent:.2f}% disk ({bytes2human(disk)}B [{disk}B] used, "
f"{bytes2human(jobReqs)}B [{jobReqs}B] requested).")
if disk > jobReqs:
self.logToMaster("Job used more disk than requested. For CWL, consider increasing the outdirMin "
self.log_to_leader("Job used more disk than requested. For CWL, consider increasing the outdirMin "
f"requirement, otherwise, consider increasing the disk requirement. {disk_usage}",
level=logging.WARNING)
else:
self.logToMaster(disk_usage, level=logging.DEBUG)
self.log_to_leader(disk_usage, level=logging.DEBUG)
os.chdir(startingDir)
# Finally delete the job from the worker
self.check_for_state_corruption()
Expand Down
12 changes: 6 additions & 6 deletions src/toil/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -1820,8 +1820,8 @@ def tempDir(self) -> str:
return self._tempDir

def log(self, text: str, level=logging.INFO) -> None:
"""Log using :func:`fileStore.logToMaster`."""
self._fileStore.logToMaster(text, level)
"""Log using :func:`fileStore.log_to_leader`."""
self._fileStore.log_to_leader(text, level)

@staticmethod
def wrapFn(fn, *args, **kwargs) -> "FunctionWrappingJob":
Expand Down Expand Up @@ -2606,7 +2606,7 @@ def _saveJobGraph(self, jobStore: "AbstractJobStore", saveSelf: bool = False, re
# Set up to save last job first, so promises flow the right way
ordering.reverse()

logger.info("Saving graph of %d jobs, %d non-service, %d new", len(allJobs), len(ordering), len(fakeToReal))
logger.debug("Saving graph of %d jobs, %d non-service, %d new", len(allJobs), len(ordering), len(fakeToReal))

# Make sure we're the root
if ordering[-1] != self:
Expand All @@ -2626,17 +2626,17 @@ def _saveJobGraph(self, jobStore: "AbstractJobStore", saveSelf: bool = False, re
self._fulfillPromises(returnValues, jobStore)

for job in ordering:
logger.info("Processing job %s", job.description)
logger.debug("Processing job %s", job.description)
for serviceBatch in reversed(list(job.description.serviceHostIDsInBatches())):
# For each batch of service host jobs in reverse order they start
for serviceID in serviceBatch:
logger.info("Processing service %s", serviceID)
logger.debug("Processing service %s", serviceID)
if serviceID in self._registry:
# It's a new service

# Find the actual job
serviceJob = self._registry[serviceID]
logger.info("Saving service %s", serviceJob.description)
logger.debug("Saving service %s", serviceJob.description)
# Pickle the service body, which triggers all the promise stuff
serviceJob.saveBody(jobStore)
if job != self or saveSelf:
Expand Down
2 changes: 1 addition & 1 deletion src/toil/statsAndLogging.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ def callback(fileHandle: Union[IO[bytes], IO[str]]) -> None:
try:
logs = stats.workers.logsToMaster
except AttributeError:
# To be expected if there were no calls to logToMaster()
# To be expected if there were no calls to log_to_leader()
pass
else:
for message in logs:
Expand Down
2 changes: 1 addition & 1 deletion src/toil/test/cwl/cwlTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ def test_workflow_echo_string():
cmd = [toil, jobstore, option_1, option_2, option_3, cwl]
p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
assert stdout == b"{}", f"Got wrong output: {stdout}\nWith error: {stderr}"
assert stdout.decode("utf-8").strip() == "{}", f"Got wrong output: {stdout}\nWith error: {stderr}"
assert b"Finished toil run successfully" in stderr
assert p.returncode == 0

Expand Down
Empty file modified src/toil/test/cwl/whale.txt
100644 → 100755
Empty file.
4 changes: 2 additions & 2 deletions src/toil/test/docs/scripts/tutorial_cwlexample.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@


def initialize_jobs(job):
job.fileStore.logToMaster('initialize_jobs')
job.fileStore.log_to_leader('initialize_jobs')


def runQC(job, cwl_file, cwl_filename, yml_file, yml_filename, outputs_dir, output_num):
job.fileStore.logToMaster("runQC")
job.fileStore.log_to_leader("runQC")
tempDir = job.fileStore.getLocalTempDir()

cwl = job.fileStore.readGlobalFile(cwl_file, userPath=os.path.join(tempDir, cwl_filename))
Expand Down
4 changes: 2 additions & 2 deletions src/toil/test/mesos/helloWorld.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

def hello_world(job):

job.fileStore.logToMaster(parentMessage)
job.fileStore.log_to_leader(parentMessage)
with open('foo_bam.txt', 'w') as handle:
handle.write('\nThis is a triumph...\n')

Expand All @@ -39,7 +39,7 @@ def hello_world(job):
def hello_world_child(job, hw):

path = job.fileStore.readGlobalFile(hw)
job.fileStore.logToMaster(childMessage)
job.fileStore.log_to_leader(childMessage)
# NOTE: path and the udpated file are stored to /tmp
# If we want to SAVE our changes to this tmp file, we must write it out.
with open(path) as r:
Expand Down
Loading