Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions lib/galaxy/config/schemas/config_schema.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3914,6 +3914,27 @@ mapping:
order in the UI. If false, the order of tools in the toolbox will be
preserved as they are loaded from the tool config files.

parallel_tool_loading:
type: bool
default: false
required: false
desc: |
If true, tool XML source files will be parsed in parallel during Galaxy startup.
This pre-parses tool XML and expands macros before the serial tool loading phase,
reducing I/O latency on slow filesystems (e.g., CVMFS with cold cache). This does
not parallelize Tool object creation, only XML source parsing. Set to false if
you experience issues with tool loading.

parallel_tool_loading_workers:
type: int
default: 4
required: false
desc: |
Number of worker threads to use for parallel tool XML source parsing.
Only used if parallel_tool_loading is true. Higher values (8-16) may
improve startup time on high-latency filesystems but provide diminishing
returns on local disk. Default of 4 is a safe choice for most deployments.

tool_filters:
type: str
required: false
Expand Down
61 changes: 61 additions & 0 deletions lib/galaxy/tool_util/toolbox/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ def _init_tools_from_configs(self, config_filenames: List[str]) -> None:
directory_contents = sorted(os.listdir(config_directory))
directory_config_files = [config_file for config_file in directory_contents if config_file.endswith(".xml")]
config_filenames.extend(directory_config_files)

# Pre-parse tool XML sources in parallel if enabled
parallel_tool_loading = getattr(self.app.config, "parallel_tool_loading", False)
if parallel_tool_loading:
self._preload_tool_sources_parallel(config_filenames)

for config_filename in config_filenames:
if not self.can_load_config_file(config_filename):
continue
Expand All @@ -318,8 +324,63 @@ def _init_tools_from_configs(self, config_filenames: List[str]) -> None:
raise
except Exception:
log.exception("Error loading tools defined in config %s", config_filename)

# Clear pre-loaded tool sources cache to free memory
self._clear_preloaded_tool_sources()

log.debug("Reading tools from config files finished %s", execution_timer)

def _preload_tool_sources_parallel(self, config_filenames: List[str]) -> None:
"""No-op. Subclasses may override to pre-parse tool XML sources in parallel."""
pass

def _clear_preloaded_tool_sources(self) -> None:
"""No-op. Subclasses may override to clear pre-loaded tool sources cache."""
pass

def _collect_tool_paths_for_preload(self, config_filenames: List[str]) -> List[str]:
"""Best-effort collection of tool file paths for parallel pre-parsing.

This method collects tool paths from top-level tools and tools one level
deep in sections. Deeply nested sections are not traversed. This is
intentional - tools not collected here will still be parsed on-demand
during normal loading. This is an optimization, not a complete enumeration.
"""
tool_paths: List[str] = []
for config_filename in config_filenames:
if not self.can_load_config_file(config_filename):
continue
try:
tool_conf_source = get_toolbox_parser(config_filename)
tool_path = tool_conf_source.parse_tool_path()
tool_path = self.__resolve_tool_path(tool_path, config_filename)

for item in tool_conf_source.parse_items():
item = ensure_tool_conf_item(item)
if item.type == "tool":
path_template = item.get("file")
if path_template:
template_kwds = self._path_template_kwds()
path = string.Template(path_template).safe_substitute(**template_kwds)
concrete_path = os.path.join(tool_path, path)
if os.path.exists(concrete_path):
tool_paths.append(concrete_path)
elif item.type == "section":
# Collect tools from within sections
for section_elem in item.items:
section_elem = ensure_tool_conf_item(section_elem)
if section_elem.type == "tool":
path_template = section_elem.get("file")
if path_template:
template_kwds = self._path_template_kwds()
path = string.Template(path_template).safe_substitute(**template_kwds)
concrete_path = os.path.join(tool_path, path)
if os.path.exists(concrete_path):
tool_paths.append(concrete_path)
except Exception:
log.exception(f"Error collecting tools from config {config_filename}")
return tool_paths

def _init_tools_from_config(self, config_filename: str) -> None:
"""
Read the configuration file and load each tool. The following tags are currently supported:
Expand Down
58 changes: 58 additions & 0 deletions lib/galaxy/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,7 @@ def __init__(
) -> None:
self._reload_count = 0
self.tool_location_fetcher = ToolLocationFetcher()
self._preloaded_tool_sources: dict[str, ToolSource] = {}
# This is here to deal with the old default value, which doesn't make
# sense in an "installed Galaxy" world.
# FIXME: ./
Expand Down Expand Up @@ -600,11 +601,68 @@ def tools_by_id(self):
# Deprecated method, TODO - eliminate calls to this in test/.
return self._tools_by_id

def _preload_tool_sources_parallel(self, config_filenames: list[str]) -> None:
"""Pre-parse tool XML sources in parallel to warm cache before serial tool creation.

This is a best-effort optimization that pre-parses tool XML files in parallel
to reduce I/O latency during the serial tool loading phase. Tools not pre-parsed
here will be parsed on-demand during normal loading.
"""
from concurrent.futures import (
as_completed,
ThreadPoolExecutor,
)

tool_paths = self._collect_tool_paths_for_preload(config_filenames)
if not tool_paths:
return

num_workers = getattr(self.app.config, "parallel_tool_loading_workers", 4)
log.debug(f"Pre-parsing {len(tool_paths)} tool XML sources in parallel with {num_workers} workers")

enable_beta_formats = getattr(self.app.config, "enable_beta_tool_formats", False)

def parse_tool_source(tool_path: str) -> tuple[str, Optional[ToolSource]]:
"""Parse a single tool XML file. Returns (path, source) tuple."""
try:
source = get_tool_source(
tool_path,
enable_beta_formats=enable_beta_formats,
tool_location_fetcher=self.tool_location_fetcher,
)
return (tool_path, source)
except Exception:
log.debug(f"Error pre-parsing tool XML: {tool_path}", exc_info=True)
return (tool_path, None)

with ThreadPoolExecutor(max_workers=num_workers) as executor:
futures = [executor.submit(parse_tool_source, tp) for tp in tool_paths]
for future in as_completed(futures):
try:
tool_path, tool_source = future.result()
if tool_source is not None:
self._preloaded_tool_sources[os.path.realpath(tool_path)] = tool_source
except Exception:
log.debug("Error in parallel tool source pre-parsing", exc_info=True)

log.debug(f"Pre-parsed {len(self._preloaded_tool_sources)} tool sources")

def _clear_preloaded_tool_sources(self) -> None:
"""Clear pre-loaded tool sources cache to free memory after tool loading completes."""
if self._preloaded_tool_sources:
log.debug(f"Clearing {len(self._preloaded_tool_sources)} unused pre-loaded tool sources")
self._preloaded_tool_sources.clear()

def create_tool(self, config_file: StrPath, **kwds) -> "Tool":
tool_source = self.get_expanded_tool_source(config_file)
return self._create_tool_from_source(tool_source, config_file=config_file, **kwds)

def get_expanded_tool_source(self, config_file: StrPath) -> ToolSource:
# Check if this tool source was pre-parsed during parallel loading
config_file_real = os.path.realpath(config_file)
if config_file_real in self._preloaded_tool_sources:
return self._preloaded_tool_sources[config_file_real]

try:
return get_tool_source(
config_file,
Expand Down
147 changes: 147 additions & 0 deletions test/unit/app/tools/test_parallel_tool_loading.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
"""Tests for parallel tool loading feature."""

import os
import string

from galaxy.app_unittest_utils.toolbox_support import BaseToolBoxTestCase

TOOL_TEMPLATE = string.Template("""<tool id="${tool_id}" name="Test Tool ${tool_id}" version="1.0" profile="16.01">
<command>echo "test"</command>
<inputs>
<param type="text" name="param1" value="" />
</inputs>
<outputs>
<data name="out1" format="data" />
</outputs>
</tool>
""")


class TestParallelToolLoading(BaseToolBoxTestCase):
"""Test that parallel and serial tool loading produce identical results."""

def _create_tool_file(self, tool_id):
"""Create a tool XML file and return its filename."""
tool_contents = TOOL_TEMPLATE.safe_substitute(tool_id=tool_id)
tool_path = os.path.join(self.test_directory, f"{tool_id}.xml")
with open(tool_path, "w") as f:
f.write(tool_contents)
return f"{tool_id}.xml"

def _reset_toolbox(self):
"""Reset toolbox state for fresh load."""
self._toolbox = None
self.app._toolbox = None # type: ignore[assignment]
self.app.tool_cache.cleanup()

def _load_tools_serial(self):
"""Load tools with serial loading and return tool IDs."""
self.app.config.parallel_tool_loading = False # type: ignore[attr-defined]
return set(self.toolbox._tools_by_id.keys())

def _load_tools_parallel(self, workers=2):
"""Load tools with parallel loading and return tool IDs."""
self.app.config.parallel_tool_loading = True # type: ignore[attr-defined]
self.app.config.parallel_tool_loading_workers = workers # type: ignore[attr-defined]
return set(self.toolbox._tools_by_id.keys())

def test_parallel_loading_produces_same_tools(self):
"""Verify parallel and serial loading produce identical tool sets."""
tool_ids = [f"test_tool_{i}" for i in range(5)]
for tool_id in tool_ids:
self._create_tool_file(tool_id)

tool_refs = "\n".join(f'<tool file="{tool_id}.xml" />' for tool_id in tool_ids)
self._add_config(f"""<toolbox tool_path="{self.test_directory}">
{tool_refs}
</toolbox>""")

serial_tool_ids = self._load_tools_serial()
self._reset_toolbox()
parallel_tool_ids = self._load_tools_parallel()

assert serial_tool_ids == parallel_tool_ids
for tool_id in tool_ids:
assert tool_id in parallel_tool_ids

def test_parallel_loading_with_sections(self):
"""Verify parallel loading works with tools in sections."""
tool_ids = ["section_tool_1", "section_tool_2", "top_level_tool"]
for tool_id in tool_ids:
self._create_tool_file(tool_id)

self._add_config(f"""<toolbox tool_path="{self.test_directory}">
<section id="test_section" name="Test Section">
<tool file="section_tool_1.xml" />
<tool file="section_tool_2.xml" />
</section>
<tool file="top_level_tool.xml" />
</toolbox>""")

serial_tool_ids = self._load_tools_serial()
self._reset_toolbox()
parallel_tool_ids = self._load_tools_parallel()

assert serial_tool_ids == parallel_tool_ids
for tool_id in tool_ids:
assert tool_id in parallel_tool_ids

def test_parallel_loading_handles_invalid_tool(self):
"""Verify parallel loading handles invalid tools gracefully."""
self._create_tool_file("valid_tool")

invalid_path = os.path.join(self.test_directory, "invalid_tool.xml")
with open(invalid_path, "w") as f:
f.write("not valid xml <><><<")

self._add_config(f"""<toolbox tool_path="{self.test_directory}">
<tool file="valid_tool.xml" />
<tool file="invalid_tool.xml" />
</toolbox>""")

parallel_tool_ids = self._load_tools_parallel()
assert "valid_tool" in parallel_tool_ids

def test_parallel_loading_clears_cache(self):
"""Verify pre-loaded tool sources cache is cleared after loading completes."""
tool_ids = [f"cache_tool_{i}" for i in range(3)]
for tool_id in tool_ids:
self._create_tool_file(tool_id)

tool_refs = "\n".join(f'<tool file="{tool_id}.xml" />' for tool_id in tool_ids)
self._add_config(f"""<toolbox tool_path="{self.test_directory}">
{tool_refs}
</toolbox>""")

self.app.config.parallel_tool_loading = True # type: ignore[attr-defined]
self.app.config.parallel_tool_loading_workers = 2 # type: ignore[attr-defined]
toolbox = self.toolbox

# Cache should be empty after loading completes
assert len(toolbox._preloaded_tool_sources) == 0

def test_parallel_loading_with_nested_sections(self):
"""Verify tools in nested sections load correctly (via on-demand parsing)."""
tool_ids = ["nested_tool_1", "nested_tool_2", "outer_tool"]
for tool_id in tool_ids:
self._create_tool_file(tool_id)

# Nested sections are not traversed by pre-loading, but tools should still load
self._add_config(f"""<toolbox tool_path="{self.test_directory}">
<section id="outer" name="Outer Section">
<section id="inner" name="Inner Section">
<tool file="nested_tool_1.xml" />
<tool file="nested_tool_2.xml" />
</section>
</section>
<tool file="outer_tool.xml" />
</toolbox>""")

serial_tool_ids = self._load_tools_serial()
self._reset_toolbox()
parallel_tool_ids = self._load_tools_parallel()

# All tools should load regardless of nesting depth
assert serial_tool_ids == parallel_tool_ids
for tool_id in tool_ids:
assert tool_id in parallel_tool_ids
Loading