Skip to content

Commit

Permalink
Added the no-progress flag for project.py status to hide the progress… (
Browse files Browse the repository at this point in the history
#685)

* Added the no-progress flag for project.py status to hide the progress bars

* Implement unit test for appearance of progress bar

* Update changelog.txt

* partial change progress to no progress label

* added documentation for variables

* Update flow/util/misc.py

Co-authored-by: Brandon Butler <[email protected]>

* change format of if statements

* added thread parallelization and serial tests

* Changed from no_progress to hide_progress.

* Update flow/project.py

Co-authored-by: Bradley Dice <[email protected]>

* Update tests/test_status.py

Co-authored-by: Bradley Dice <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update changelog.txt

* changed all the progress variables to hide-progress

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* fix: no-progress process parallization pickling.

We need to use cloudpickle to serialize/deserialize functions for
process parallelization.

* fix: thread parallelization for _get_parallel_executor

Converts the generator to a list which is what print_status expects.

* test: Fix no progress bar tests for signac 2.0

* Remove newline.

* fix: Revert breaking changes to execution progress bar

* doc: fix formatting

Co-authored-by: Bradley Dice <[email protected]>

* Simplify parallel_executor logic.

* fix: Changes to _get_parallel_executor

Still results in shorter clearer code.

* fix: Conditional string in _get_parallel_executor

* fix: ordering in _get_parallel_executor

---------

Co-authored-by: Brandon Butler <[email protected]>
Co-authored-by: Bradley Dice <[email protected]>
Co-authored-by: Kelly Wang <[email protected]>
Co-authored-by: Kelly Wang <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
6 people authored Sep 5, 2023
1 parent 5093591 commit c65cc0f
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 34 deletions.
1 change: 1 addition & 0 deletions changelog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ Version 0.26
Added
+++++

- Added feature to hide the status of the progress bar (#685).
- ``test-workflow`` CLI option for testing template environments/submission scripts (#747).
- Frontier environment and template (#743).
- Added ``-o`` / ``--operation`` flag to report project status information for specific operations (#725).
Expand Down
32 changes: 24 additions & 8 deletions flow/project.py
Original file line number Diff line number Diff line change
Expand Up @@ -2602,6 +2602,7 @@ def _fetch_status(
err,
ignore_errors,
status_parallelization="none",
hide_progress=False,
names=None,
):
"""Fetch status for the provided aggregates / jobs.
Expand All @@ -2617,6 +2618,8 @@ def _fetch_status(
status_parallelization : str
Parallelization mode for fetching the status. Allowed values are
"thread", "process", or "none". (Default value = "none")
hide_progress : bool
Hide the progress bar when printing status output (Default value = False).
names : iterable of :class:`str`
Only show status for operations that match the provided set of names
(interpreted as regular expressions), or all if the argument is
Expand Down Expand Up @@ -2653,7 +2656,9 @@ def _fetch_status(
"Valid choices are 'thread', 'process', or 'none'."
)

parallel_executor = _get_parallel_executor(status_parallelization)
parallel_executor = _get_parallel_executor(
status_parallelization, hide_progress
)

# Update the project's status cache
scheduler_info = self._query_scheduler_status(
Expand Down Expand Up @@ -2750,11 +2755,13 @@ def compute_status(data):
self._get_job_labels,
ignore_errors=ignore_errors,
)
job_labels = parallel_executor(
compute_labels,
individual_jobs,
desc="Fetching labels",
file=err,
job_labels = list(
parallel_executor(
compute_labels,
individual_jobs,
desc="Fetching labels",
file=err,
)
)

def combine_group_and_operation_status(aggregate_status_results):
Expand Down Expand Up @@ -2795,7 +2802,6 @@ def combine_group_and_operation_status(aggregate_status_results):
"_error": error_message,
}
)

return status_results_combined, job_labels, individual_jobs

PRINT_STATUS_ALL_VARYING_PARAMETERS = True
Expand Down Expand Up @@ -2824,6 +2830,7 @@ def print_status(
profile=False,
eligible_jobs_max_lines=None,
output_format="terminal",
hide_progress=False,
operation=None,
):
"""Print the status of the project.
Expand Down Expand Up @@ -2875,6 +2882,8 @@ def print_status(
output_format : str
Status output format, supports:
'terminal' (default), 'markdown' or 'html'.
hide_progress : bool
Hide the progress bar from the status output. (Default value = False)
operation : iterable of :class:`str`
Show status of operations that match the provided set of names
(interpreted as regular expressions), or all if the argument is
Expand Down Expand Up @@ -2923,6 +2932,7 @@ def print_status(
err=err,
ignore_errors=ignore_errors,
status_parallelization=status_parallelization,
hide_progress=hide_progress,
names=operation,
)

Expand Down Expand Up @@ -3003,6 +3013,7 @@ def print_status(
err=err,
ignore_errors=ignore_errors,
status_parallelization=status_parallelization,
hide_progress=hide_progress,
names=operation,
)
profiling_results = None
Expand Down Expand Up @@ -3557,7 +3568,7 @@ def run(
will not exceed this argument. The default is 1, there is no limit
if this argument is None.
progress : bool
Show a progress bar during execution. (Default value = False)
Show a progress bar during execution (Default value = False).
order : str, callable, or None
Specify the order of operations. Possible values are:
Expand Down Expand Up @@ -5001,6 +5012,11 @@ class MyProject(FlowProject):
"to show result for. Defaults to the main module. "
"(requires pprofile)",
)
parser_status.add_argument(
"--hide-progress",
action="store_true",
help="Hide the progress bar",
)
parser_status.add_argument(
"-o",
"--operation",
Expand Down
63 changes: 37 additions & 26 deletions flow/util/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import os
import warnings
from collections.abc import MutableMapping
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from contextlib import contextmanager
from functools import lru_cache, partial
from itertools import cycle, islice
Expand Down Expand Up @@ -335,7 +336,7 @@ def _run_cloudpickled_func(func, *args):
return unpickled_func(*args)


def _get_parallel_executor(parallelization="none"):
def _get_parallel_executor(parallelization="none", hide_progress=False):
"""Get an executor for the desired parallelization strategy.
This executor shows a progress bar while executing a function over an
Expand All @@ -346,50 +347,60 @@ def _get_parallel_executor(parallelization="none"):
(see :meth:`concurrent.futures.Executor.map`). All other ``**kwargs`` are
passed to the tqdm progress bar.
Warning
-------
We ignore key word arguments when ``hide_progress == True``.
Parameters
----------
parallelization : str
Parallelization mode. Allowed values are "thread", "process", or
"none". (Default value = "none")
hide_progress : bool
Hide the progress bar when printing status output (Default value = False).
Returns
-------
callable
A callable with signature ``func, iterable, **kwargs``.
A callable with signature ``func, iterable, **kwargs`` which returns an interator.
"""
if parallelization == "thread":
if parallelization == "process":
executor = ProcessPoolExecutor().map
if not hide_progress:
executor = partial(process_map, tqdm_class=tqdm)

def parallel_executor(func, iterable, **kwargs):
return thread_map(func, iterable, tqdm_class=tqdm, **kwargs)

elif parallelization == "process":
# The top-level function called on each process cannot be a local function, it must be a
# module-level function. Creating a partial here allows us to use the passed function
# "func" regardless of whether it is a local function.
func = partial(_run_cloudpickled_func, cloudpickle.dumps(func))
# The tqdm progress bar requires a total. We compute the total in advance because a map
# iterable (which has no total) is passed to process_map.
kwargs.setdefault("total", len(iterable))
iterable = map(cloudpickle.dumps, iterable)
if hide_progress:
return executor(func, iterable)
return executor(func, iterable, **kwargs)

elif parallelization == "thread":
executor = ThreadPoolExecutor().map
if not hide_progress:
executor = partial(thread_map, tqdm_class=tqdm)

def parallel_executor(func, iterable, **kwargs):
# The tqdm progress bar requires a total. We compute the total in
# advance because a map iterable (which has no total) is passed to
# process_map.
if "total" not in kwargs:
kwargs["total"] = len(iterable)

return process_map(
# The top-level function called on each process cannot be a
# local function, it must be a module-level function. Creating
# a partial here allows us to use the passed function "func"
# regardless of whether it is a local function.
partial(_run_cloudpickled_func, cloudpickle.dumps(func)),
map(cloudpickle.dumps, iterable),
tqdm_class=tqdm,
**kwargs,
)
if hide_progress:
return executor(func, iterable)
return executor(func, iterable, **kwargs)

else:
executor = map if hide_progress else partial(tmap, tqdm_class=tqdm)

def parallel_executor(func, iterable, **kwargs):
if "chunksize" in kwargs:
# Chunk size only applies to thread/process parallel executors
del kwargs["chunksize"]
return list(tmap(func, iterable, tqdm_class=tqdm, **kwargs))
if hide_progress:
return executor(func, iterable)
kwargs.pop("chunksize", None)
return executor(func, iterable, **kwargs)

return parallel_executor

Expand Down
26 changes: 26 additions & 0 deletions tests/test_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,35 @@
import sys

import generate_status_reference_data as gen
import pytest
import signac


@pytest.fixture(params=[True, False])
def hide_progress_bar(request):
return request.param


@pytest.fixture(params=["thread", "process", "none"])
def parallelization(request):
return request.param


def test_hide_progress_bar(hide_progress_bar, parallelization):
with signac.TemporaryProject() as p, signac.TemporaryProject() as status_pr:
gen.init(p)
fp = gen._TestProject.get_project(path=p.path)
fp._flow_config["status_parallelization"] = parallelization
status_pr.import_from(origin=gen.ARCHIVE_PATH)
for job in status_pr:
kwargs = job.statepoint()
tmp_err = io.TextIOWrapper(io.BytesIO(), sys.stderr.encoding)
fp.print_status(**kwargs, err=tmp_err, hide_progress=hide_progress_bar)
tmp_err.seek(0)
generated_tqdm = tmp_err.read()
assert ("Fetching status" not in generated_tqdm) == hide_progress_bar


def test_print_status():
# Must import the data into the project.
with signac.TemporaryProject() as p, signac.TemporaryProject() as status_pr:
Expand Down

0 comments on commit c65cc0f

Please sign in to comment.