This repository has been archived by the owner on Dec 27, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathminituna_v2.py
226 lines (175 loc) · 7.44 KB
/
minituna_v2.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import abc
import copy
import math
import random
from typing import Any
from typing import Callable
from typing import cast
from typing import Dict
from typing import List
from typing import Literal
from typing import Optional
from typing import Union
TrialStateType = Literal["running", "completed", "failed"]
CategoricalChoiceType = Union[None, bool, int, float, str]
class BaseDistribution(abc.ABC):
@abc.abstractmethod
def to_internal_repr(self, external_repr: Any) -> float:
...
@abc.abstractmethod
def to_external_repr(self, internal_repr: float) -> Any:
...
class UniformDistribution(BaseDistribution):
def __init__(self, low: float, high: float) -> None:
self.low = low
self.high = high
def to_internal_repr(self, external_repr: Any) -> float:
return external_repr
def to_external_repr(self, internal_repr: float) -> Any:
return internal_repr
class LogUniformDistribution(BaseDistribution):
def __init__(self, low: float, high: float) -> None:
self.low = low
self.high = high
def to_internal_repr(self, external_repr: Any) -> float:
return external_repr
def to_external_repr(self, internal_repr: float) -> Any:
return internal_repr
class IntUniformDistribution(BaseDistribution):
def __init__(self, low: int, high: int) -> None:
self.low = low
self.high = high
def to_internal_repr(self, external_repr: Any) -> float:
return float(external_repr)
def to_external_repr(self, internal_repr: float) -> Any:
return int(internal_repr)
class CategoricalDistribution(BaseDistribution):
def __init__(self, choices: List[CategoricalChoiceType]) -> None:
self.choices = choices
def to_internal_repr(self, external_repr: Any) -> float:
return self.choices.index(external_repr)
def to_external_repr(self, internal_repr: float) -> Any:
return self.choices[int(internal_repr)]
class FrozenTrial:
def __init__(self, trial_id: int, state: TrialStateType) -> None:
self.trial_id = trial_id
self.state = state
self.value: Optional[float] = None
self.internal_params: Dict[str, float] = {}
self.distributions: Dict[str, BaseDistribution] = {}
@property
def is_finished(self) -> bool:
return self.state != "running"
@property
def params(self) -> Dict[str, Any]:
external_repr = {}
for param_name in self.internal_params:
distribution = self.distributions[param_name]
internal_repr = self.internal_params[param_name]
external_repr[param_name] = distribution.to_external_repr(internal_repr)
return external_repr
class Storage:
def __init__(self) -> None:
self.trials: List[FrozenTrial] = []
def create_new_trial(self) -> int:
trial_id = len(self.trials)
trial = FrozenTrial(trial_id=trial_id, state="running")
self.trials.append(trial)
return trial_id
def get_trial(self, trial_id: int) -> FrozenTrial:
return copy.deepcopy(self.trials[trial_id])
def get_best_trial(self) -> Optional[FrozenTrial]:
completed_trials = [t for t in self.trials if t.state == "completed"]
best_trial = min(completed_trials, key=lambda t: cast(float, t.value))
return copy.deepcopy(best_trial)
def set_trial_value(self, trial_id: int, value: float) -> None:
trial = self.trials[trial_id]
assert not trial.is_finished, "cannot update finished trials"
trial.value = value
def set_trial_state(self, trial_id: int, state: TrialStateType) -> None:
trial = self.trials[trial_id]
assert not trial.is_finished, "cannot update finished trials"
trial.state = state
def set_trial_param(
self, trial_id: int, name: str, distribution: BaseDistribution, value: float
) -> None:
trial = self.trials[trial_id]
assert not trial.is_finished, "cannot update finished trials"
trial.distributions[name] = distribution
trial.internal_params[name] = value
class Trial:
def __init__(self, study: "Study", trial_id: int):
self.study = study
self.trial_id = trial_id
self.state = "running"
def _suggest(self, name: str, distribution: BaseDistribution) -> Any:
storage = self.study.storage
trial = storage.get_trial(self.trial_id)
param_value = self.study.sampler.sample_independent(
self.study, trial, name, distribution
)
param_value_in_internal_repr = distribution.to_internal_repr(param_value)
storage.set_trial_param(
self.trial_id, name, distribution, param_value_in_internal_repr
)
return param_value
def suggest_uniform(self, name: str, low: float, high: float) -> float:
return self._suggest(name, UniformDistribution(low=low, high=high))
def suggest_loguniform(self, name: str, low: float, high: float) -> float:
return self._suggest(name, LogUniformDistribution(low=low, high=high))
def suggest_int(self, name: str, low: int, high: int) -> int:
return self._suggest(name, IntUniformDistribution(low=low, high=high))
def suggest_categorical(
self, name: str, choices: List[CategoricalChoiceType]
) -> CategoricalChoiceType:
return self._suggest(name, CategoricalDistribution(choices=choices))
class Sampler:
def __init__(self, seed: int = None) -> None:
self.rng = random.Random(seed)
def sample_independent(
self,
study: "Study",
trial: FrozenTrial,
name: str,
distribution: BaseDistribution,
) -> Any:
if isinstance(distribution, UniformDistribution):
return self.rng.uniform(distribution.low, distribution.high)
elif isinstance(distribution, LogUniformDistribution):
log_low = math.log(distribution.low)
log_high = math.log(distribution.high)
return math.exp(self.rng.uniform(log_low, log_high))
elif isinstance(distribution, IntUniformDistribution):
return self.rng.randint(distribution.low, distribution.high)
elif isinstance(distribution, CategoricalDistribution):
index = self.rng.randint(0, len(distribution.choices) - 1)
return distribution.choices[index]
else:
raise ValueError("unsupported distribution")
class Study:
def __init__(self, storage: Storage, sampler: Sampler) -> None:
self.storage = storage
self.sampler = sampler
def optimize(self, objective: Callable[[Trial], float], n_trials: int) -> None:
for _ in range(n_trials):
trial_id = self.storage.create_new_trial()
trial = Trial(self, trial_id)
try:
value = objective(trial)
self.storage.set_trial_value(trial_id, value)
self.storage.set_trial_state(trial_id, "completed")
print(f"trial_id={trial_id} is completed with value={value}")
except Exception as e:
self.storage.set_trial_state(trial_id, "failed")
print(f"trial_id={trial_id} is failed by {e}")
@property
def best_trial(self) -> Optional[FrozenTrial]:
return self.storage.get_best_trial()
def create_study(
storage: Optional[Storage] = None,
sampler: Optional[Sampler] = None,
) -> Study:
return Study(
storage=storage or Storage(),
sampler=sampler or Sampler(),
)