Skip to content

Commit 608bf16

Browse files
committed
add functionality for aggregating different runs
1 parent b86633c commit 608bf16

File tree

6 files changed

+113
-68
lines changed

6 files changed

+113
-68
lines changed

src/itwinai/cli.py

Lines changed: 55 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ def generate_scalability_report(
5151
)
5252
),
5353
] = False,
54+
run_id: str | None = None,
5455
backup_root_dir: Annotated[
5556
str, typer.Option(help=("Which directory to store the backup files in."))
5657
] = "backup-scalability-metrics/",
@@ -96,45 +97,65 @@ def generate_scalability_report(
9697
plot_dir_path = Path(plot_dir)
9798
plot_dir_path.mkdir(exist_ok=True, parents=True)
9899

99-
report_dirs = {
100-
"Epoch Time": {
101-
"dir": log_dir_path / "epoch-time",
102-
"func": epoch_time_report,
103-
},
104-
"GPU Data": {
105-
"dir": log_dir_path / "gpu-energy-data",
106-
"func": gpu_data_report,
107-
},
108-
"Communication Data": {
109-
"dir": log_dir_path / "communication-data",
110-
"func": communication_data_report,
111-
},
112-
}
100+
# Finding all the appropriate paths
101+
epoch_time_logdirs = []
102+
gpu_data_logdirs = []
103+
comm_time_logdirs = []
104+
if run_id is None:
105+
print("run_id was not passed, so will aggregate data from all runs in given directory!")
106+
for path_elem in log_dir_path.iterdir():
107+
print(f"Adding data from {path_elem}!")
108+
if not path_elem.is_dir():
109+
raise ValueError(
110+
f"Found element in logdir that was not itself a directory: "
111+
f"{path_elem.resolve()}"
112+
)
113+
epoch_time_logdirs.append(path_elem / "epoch-time")
114+
gpu_data_logdirs.append(path_elem / "gpu-energy-data")
115+
comm_time_logdirs.append(path_elem / "communication-data")
116+
print()
117+
else:
118+
epoch_time_logdirs.append(log_dir_path / run_id / "epoch-time")
119+
gpu_data_logdirs.append(log_dir_path / run_id / "gpu-energy-data")
120+
comm_time_logdirs.append(log_dir_path / run_id / "communication-data")
113121

122+
# TODO: Add run_id into this, somehow
114123
# Setting the backup directory from exp name and run name
115124
experiment_name = experiment_name or f"exp_{uuid.uuid4().hex[:6]}"
116125
backup_dir = Path(backup_root_dir) / experiment_name
117126

118-
# Creating reports from dictionary
119-
for report_name, details in report_dirs.items():
120-
report_dir = details["dir"]
121-
report_func = details["func"]
122-
123-
if report_dir.exists():
124-
print("#" * 8, f"{report_name} Report", "#" * 8)
125-
report_func(
126-
report_dir,
127-
plot_dir=plot_dir_path,
128-
backup_dir=backup_dir,
129-
do_backup=do_backup,
130-
plot_file_suffix=plot_file_suffix,
131-
)
132-
print()
133-
else:
134-
print(
135-
f"No report was created for {report_name} as '{report_dir.resolve()}' does "
136-
f"not exist."
137-
)
127+
epoch_time_table = epoch_time_report(
128+
log_dirs=epoch_time_logdirs,
129+
plot_dir=plot_dir_path,
130+
backup_dir=backup_dir,
131+
do_backup=do_backup,
132+
plot_file_suffix=plot_file_suffix,
133+
)
134+
gpu_data_table = gpu_data_report(
135+
log_dirs=gpu_data_logdirs,
136+
plot_dir=plot_dir_path,
137+
backup_dir=backup_dir,
138+
do_backup=do_backup,
139+
plot_file_suffix=plot_file_suffix,
140+
)
141+
communication_data_table = communication_data_report(
142+
log_dirs=comm_time_logdirs,
143+
plot_dir=plot_dir_path,
144+
backup_dir=backup_dir,
145+
do_backup=do_backup,
146+
plot_file_suffix=plot_file_suffix,
147+
)
148+
149+
print()
150+
print("#" * 8, "Epoch Time Report", "#" * 8)
151+
print(epoch_time_table)
152+
print()
153+
print("#" * 8, "GPU Data Report", "#" * 8)
154+
print(gpu_data_table)
155+
print()
156+
print("#" * 8, "Communication Data Report", "#" * 8)
157+
print(communication_data_table)
158+
print()
138159

139160

140161
@app.command()

src/itwinai/scalability_report/data.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ def read_scalability_metrics_from_csv(
5454
dataframes = []
5555
for file_path in file_paths:
5656
df = pd.read_csv(file_path)
57-
check_contains_columns(
58-
df=df, expected_columns=expected_columns, file_path=file_path
59-
)
57+
check_contains_columns(df=df, expected_columns=expected_columns, file_path=file_path)
6058
dataframes.append(df)
6159

6260
return pd.concat(dataframes)

src/itwinai/scalability_report/reports.py

Lines changed: 43 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
# --------------------------------------------------------------------------------------
1010

1111
from pathlib import Path
12+
from typing import List
13+
import pandas as pd
1214

1315
from itwinai.scalability_report.data import read_scalability_metrics_from_csv
1416
from itwinai.scalability_report.plot import (
@@ -24,37 +26,41 @@
2426

2527

2628
def epoch_time_report(
27-
epoch_time_dir: Path | str,
29+
log_dirs: List[Path] | List[str],
2830
plot_dir: Path | str,
2931
backup_dir: Path,
3032
do_backup: bool = False,
3133
plot_file_suffix: str = ".png",
32-
) -> None:
34+
) -> str:
3335
"""Generates reports and plots for epoch training times across distributed training
3436
strategies, including a log-log plot of absolute average epoch times against the
3537
number of GPUs and a log-log plot of relative speedup as more GPUs are added. The
3638
function optionally creates backups of the data.
3739
3840
Args:
39-
epoch_time_dir (Path | str): Path to the directory containing CSV files with
40-
epoch time metrics. The files must include the columns "name", "nodes",
41-
"epoch_id", and "time".
41+
# epoch_time_dir (Path | str): Path to the directory containing CSV files with
42+
# epoch time metrics. The files must include the columns "name", "nodes",
43+
# "epoch_id", and "time".
4244
plot_dir (Path | str): Path to the directory where the generated plots will
4345
be saved.
4446
backup_dir (Path): Path to the directory where backups of the data will be stored
4547
if `do_backup` is True.
4648
do_backup (bool): Whether to create a backup of the epoch time data in the
4749
`backup_dir`. Defaults to False.
4850
"""
49-
if isinstance(epoch_time_dir, str):
50-
epoch_time_dir = Path(epoch_time_dir)
5151
if isinstance(plot_dir, str):
5252
plot_dir = Path(plot_dir)
5353

5454
epoch_time_expected_columns = {"name", "nodes", "epoch_id", "time"}
55-
epoch_time_df = read_scalability_metrics_from_csv(
56-
data_dir=epoch_time_dir, expected_columns=epoch_time_expected_columns
57-
)
55+
56+
# Reading data from all the logdirs and concatenating the results
57+
dataframes = []
58+
for log_dir in log_dirs:
59+
temp_df = read_scalability_metrics_from_csv(
60+
data_dir=log_dir, expected_columns=epoch_time_expected_columns
61+
)
62+
dataframes.append(temp_df)
63+
epoch_time_df = pd.concat(dataframes)
5864

5965
# Calculate the average time per epoch for each strategy and number of nodes
6066
avg_epoch_time_df = (
@@ -66,7 +72,7 @@ def epoch_time_report(
6672
# Print the resulting table
6773
formatters = {"avg_epoch_time": "{:.2f} s".format}
6874
epoch_time_table = avg_epoch_time_df.to_string(index=False, formatters=formatters)
69-
print(epoch_time_table)
75+
# print(epoch_time_table)
7076

7177
# Create and save the figures
7278
absolute_fig, _ = absolute_avg_epoch_time_plot(avg_epoch_time_df=avg_epoch_time_df)
@@ -81,21 +87,22 @@ def epoch_time_report(
8187
print(f"Saved relative average time plot at '{relative_speedup_plot_path.resolve()}'.")
8288

8389
if not do_backup:
84-
return
90+
return epoch_time_table
8591

8692
backup_dir.mkdir(exist_ok=True, parents=True)
8793
backup_path = backup_dir / "epoch_time_data.csv"
8894
epoch_time_df.to_csv(backup_path)
8995
print(f"Storing backup file at '{backup_path.resolve()}'.")
96+
return epoch_time_table
9097

9198

9299
def gpu_data_report(
93-
gpu_data_dir: Path | str,
100+
log_dirs: List[Path] | List[str],
94101
plot_dir: Path | str,
95102
backup_dir: Path,
96103
do_backup: bool = False,
97104
plot_file_suffix: str = ".png",
98-
) -> None:
105+
) -> str:
99106
"""Generates reports and plots for GPU energy consumption and utilization across
100107
distributed training strategies. Includes bar plots for energy consumption and GPU
101108
utilization by strategy and number of GPUs. The function optionally creates backups
@@ -115,6 +122,7 @@ def gpu_data_report(
115122
"""
116123
if isinstance(plot_dir, str):
117124
plot_dir = Path(plot_dir)
125+
118126
gpu_data_expected_columns = {
119127
"sample_idx",
120128
"utilization",
@@ -125,9 +133,14 @@ def gpu_data_report(
125133
"strategy",
126134
"probing_interval",
127135
}
128-
gpu_data_df = read_scalability_metrics_from_csv(
129-
data_dir=gpu_data_dir, expected_columns=gpu_data_expected_columns
130-
)
136+
dataframes = []
137+
for log_dir in log_dirs:
138+
temp_df = read_scalability_metrics_from_csv(
139+
data_dir=log_dir, expected_columns=gpu_data_expected_columns
140+
)
141+
dataframes.append(temp_df)
142+
gpu_data_df = pd.concat(dataframes)
143+
131144
gpu_data_statistics_df = calculate_gpu_statistics(
132145
gpu_data_df=gpu_data_df, expected_columns=gpu_data_expected_columns
133146
)
@@ -136,7 +149,6 @@ def gpu_data_report(
136149
"utilization": "{:.2f} %".format,
137150
}
138151
gpu_data_table = gpu_data_statistics_df.to_string(index=False, formatters=formatters)
139-
print(gpu_data_table)
140152

141153
energy_plot_path = plot_dir / ("gpu_energy_plot" + plot_file_suffix)
142154
utilization_plot_path = plot_dir / ("utilization_plot" + plot_file_suffix)
@@ -158,21 +170,22 @@ def gpu_data_report(
158170
print(f"Saved utilization plot at '{utilization_plot_path.resolve()}'.")
159171

160172
if not do_backup:
161-
return
173+
return gpu_data_table
162174

163175
backup_dir.mkdir(exist_ok=True, parents=True)
164176
backup_path = backup_dir / "gpu_data.csv"
165177
gpu_data_df.to_csv(backup_path)
166178
print(f"Storing backup file at '{backup_path.resolve()}'.")
179+
return gpu_data_table
167180

168181

169182
def communication_data_report(
170-
communication_data_dir: Path | str,
183+
log_dirs: List[Path] | List[str],
171184
plot_dir: Path | str,
172185
backup_dir: Path,
173186
do_backup: bool = False,
174187
plot_file_suffix: str = ".png",
175-
) -> None:
188+
) -> str:
176189
"""Generates reports and plots for communication and computation fractions across
177190
distributed training strategies. Includes a bar plot showing the fraction of time
178191
spent on computation vs communication for each strategy and GPU count. The function
@@ -199,17 +212,19 @@ def communication_data_report(
199212
"name",
200213
"self_cuda_time_total",
201214
}
202-
communication_data_df = read_scalability_metrics_from_csv(
203-
data_dir=communication_data_dir,
204-
expected_columns=communication_data_expected_columns,
205-
)
215+
dataframes = []
216+
for log_dir in log_dirs:
217+
temp_df = read_scalability_metrics_from_csv(
218+
data_dir=log_dir, expected_columns=communication_data_expected_columns
219+
)
220+
dataframes.append(temp_df)
221+
communication_data_df = pd.concat(dataframes)
206222
computation_fraction_df = get_computation_fraction_data(communication_data_df)
207223

208224
formatters = {"computation_fraction": lambda x: "{:.2f} %".format(x * 100)}
209225
communication_data_table = computation_fraction_df.to_string(
210226
index=False, formatters=formatters
211227
)
212-
print(communication_data_table)
213228

214229
computation_fraction_plot_path = plot_dir / (
215230
"computation_fraction_plot" + plot_file_suffix
@@ -219,9 +234,10 @@ def communication_data_report(
219234
print(f"Saved computation fraction plot at '{computation_fraction_plot_path.resolve()}'.")
220235

221236
if not do_backup:
222-
return
237+
return communication_data_table
223238

224239
backup_dir.mkdir(exist_ok=True, parents=True)
225240
backup_path = backup_dir / "communication_data.csv"
226241
communication_data_df.to_csv(backup_path)
227242
print(f"Storing backup file at '{backup_path.resolve()}'.")
243+
return communication_data_table

src/itwinai/torch/monitoring/monitoring.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ def measured_method(self: 'TorchTrainer', *args, **kwargs) -> Any:
175175

176176
global_utilization_log = strategy.gather_obj(local_utilization_log, dst_rank=0)
177177
if strategy.is_main_worker:
178-
output_dir = Path("scalability-metrics/gpu-energy-data")
178+
output_dir = Path(f"scalability-metrics/{self.run_id}/gpu-energy-data")
179179
output_dir.mkdir(exist_ok=True, parents=True)
180180
output_path = output_dir / f"{strategy_name}_{num_global_gpus}.csv"
181181

src/itwinai/torch/profiling/profiler.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ def profiled_method(self: 'TorchTrainer', *args, **kwargs) -> Any:
124124
profiling_dataframe["num_gpus"] = num_gpus_global
125125
profiling_dataframe["global_rank"] = global_rank
126126

127-
profiling_log_dir = Path("scalability-metrics/communication-data")
127+
profiling_log_dir = Path(f"scalability-metrics/{self.run_id}/communication-data")
128128
profiling_log_dir.mkdir(parents=True, exist_ok=True)
129129

130130
filename = f"{strategy_name}_{num_gpus_global}_{global_rank}.csv"

src/itwinai/torch/trainer.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,13 @@ class TorchTrainer(Trainer, LogMixin):
123123
#: PyTorch Profiler for communication vs. computation comparison
124124
profiler: Any | None
125125

126+
#: Toggles for the profilers
126127
measure_gpu_data: bool = False
128+
measure_communication_overhead: bool = False
129+
measure_epoch_time: bool = False
130+
131+
#: Run ID
132+
run_id: str
127133

128134
def __init__(
129135
self,
@@ -144,7 +150,8 @@ def __init__(
144150
profiling_warmup_epochs: int = 2,
145151
measure_gpu_data: bool = False,
146152
measure_communication_overhead: bool = False,
147-
measure_epoch_time: bool = False
153+
measure_epoch_time: bool = False,
154+
run_id: str | None = None
148155
) -> None:
149156
super().__init__(name)
150157
self.save_parameters(**self.locals2params(locals()))
@@ -174,6 +181,9 @@ def __init__(
174181
self.measure_communication_overhead = measure_communication_overhead
175182
self.measure_epoch_time = measure_epoch_time
176183

184+
if run_id is None:
185+
run_id = "run0"
186+
self.run_id = run_id
177187

178188
@property
179189
def strategy(self) -> TorchDistributedStrategy:
@@ -565,7 +575,7 @@ def train(self):
565575
" when running distributed training!"
566576
)
567577
num_nodes = int(os.environ["SLURM_NNODES"])
568-
epoch_time_output_dir = Path("scalability-metrics/epoch-time")
578+
epoch_time_output_dir = Path(f"scalability-metrics/{self.run_id}/epoch-time")
569579
epoch_time_file_name = f"epochtime_{self.strategy.name}_{num_nodes}N.csv"
570580
epoch_time_output_path = epoch_time_output_dir / epoch_time_file_name
571581

0 commit comments

Comments
 (0)