Skip to content

Commit

Permalink
Add domain to ExperimentData when overwriting data and adding cluster…
Browse files Browse the repository at this point in the history
…_parallel mode
  • Loading branch information
mpvanderschelling committed Mar 26, 2024
1 parent ad13821 commit 641c725
Showing 1 changed file with 72 additions and 0 deletions.
72 changes: 72 additions & 0 deletions src/f3dasm/_src/experimentdata/experimentdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,6 +670,7 @@ def add_experiments(self,

if isinstance(experiment_sample, ExperimentData):
experiment_sample._reset_index()
self.domain += experiment_sample.domain

self._input_data += experiment_sample._input_data
self._output_data += experiment_sample._output_data
Expand Down Expand Up @@ -765,6 +766,22 @@ def _overwrite_experiments(
self._jobs.overwrite(
indices=indices, other=experiment_sample._jobs)

if isinstance(experiment_sample, ExperimentData):
self.domain += experiment_sample.domain

@_access_file
def overwrite_disk(
self, indices: Iterable[int],
domain: Optional[Domain] = None,
input_data: Optional[DataTypes] = None,
output_data: Optional[DataTypes] = None,
jobs: Optional[Path | str] = None,
add_if_not_exist: bool = False
) -> None:
self.overwrite(indices=indices, domain=domain, input_data=input_data,
output_data=output_data, jobs=jobs,
add_if_not_exist=add_if_not_exist)

def add_input_parameter(
self, name: str,
type: Literal['float', 'int', 'category', 'constant'],
Expand Down Expand Up @@ -1084,6 +1101,8 @@ def evaluate(self, data_generator: DataGenerator, mode: str = 'sequential',
return self._run_multiprocessing(data_generator, kwargs)
elif mode.lower() == "cluster":
return self._run_cluster(data_generator, kwargs)
elif mode.lower() == "cluster_parallel":
return self._run_cluster_parallel(data_generator, kwargs)
else:
raise ValueError("Invalid parallelization mode specified.")

Expand Down Expand Up @@ -1219,6 +1238,59 @@ def _run_cluster(self, data_generator: DataGenerator, kwargs: dict):
(self.project_dir / EXPERIMENTDATA_SUBFOLDER / LOCK_FILENAME
).with_suffix('.lock').unlink(missing_ok=True)

def _run_cluster_parallel(
self, data_generator: DataGenerator, kwargs: dict):
"""Run the operation on the cluster and parallelize it over cores
Parameters
----------
operation : ExperimentSampleCallable
function execution for every entry in the ExperimentData object
kwargs : dict
Any keyword arguments that need to be supplied to the function
Raises
------
NoOpenJobsError
Raised when there are no open jobs left
"""
# Retrieve the updated experimentdata object from disc
try:
self = self.from_file(self.project_dir)
except FileNotFoundError: # If not found, store current
self.store()

no_jobs = False

while True:
es_list = []
for core in range(mp.cpu_count()):
try:
es_list.append(self._get_open_job_data())
except NoOpenJobsError:
logger.debug("No Open jobs left!")
no_jobs = True
break

d = self.select([e.job_number for e in es_list])

d.evaluate(data_generator=data_generator, mode='parallel',
kwargs=kwargs)

# TODO access resource first!
self.overwrite_disk(
indices=d.index, input_data=d._input_data,
output_data=d._output_data, jobs=d._jobs,
domain=d.domain, add_if_not_exist=False)

if no_jobs:
break

self = self.from_file(self.project_dir)
# Remove the lockfile from disk
(self.project_dir / EXPERIMENTDATA_SUBFOLDER / LOCK_FILENAME
).with_suffix('.lock').unlink(missing_ok=True)

# Optimization
# =========================================================================

Expand Down

0 comments on commit 641c725

Please sign in to comment.