-
Notifications
You must be signed in to change notification settings - Fork 0
/
calc.py
240 lines (206 loc) · 9.39 KB
/
calc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
import numpy as np
import pandas as pd
from copy import copy
from flirt.stats.common import FUNCTIONS as flirt_functions
# keep only a selected number of functions to reduce data size
del flirt_functions['ptp'] # remove because of multicollinearity
del flirt_functions['skewness'], flirt_functions['kurtosis'] # redundant
del flirt_functions['lineintegral'] # mostly nan
del flirt_functions['n_above_mean'], flirt_functions['n_below_mean'],\
flirt_functions['n_sign_changes'], flirt_functions['iqr_5_95'], \
flirt_functions['pct_5'], flirt_functions['pct_95'] # redundant
# the following functions should be left: mean, std, min, max, sum, energy, iqr, line_integral
def timestamp_to_seconds(timestamp:pd.Series):
return (pd.to_datetime(timestamp) - pd.to_datetime(pd.to_datetime(timestamp).dt.date)).dt.seconds
# glucose levels in mg/dL
glucose_levels = {'hypo L2': (0,53),
'hypo L1': (54,69),
'target' : (70,180),
'hyper L1': (181,250),
'hyper L2': (251,10000)}
# perform some adjustments to the glucose levels, so that floats don't fall in between glucose levels
# e.g. 69.5 falls in between 69 and 70
glucose_levels_ = {level: (lmin-(1-1e-8), lmax) if level.startswith('hyper') else (
(lmin, lmax+(1-1e-8)) if level.startswith('hypo') else (
(lmin, lmax))) for level, (lmin, lmax) in glucose_levels.items()}
glucose_levels_ext = copy(glucose_levels_)
glucose_levels_ext['hypo'] = (glucose_levels_['hypo L2'][0], glucose_levels_['hypo L1'][1])
glucose_levels_ext['hyper'] = (glucose_levels_['hyper L1'][0], glucose_levels_['hyper L2'][1])
mmoll_mgdl = 18.018
mgdl_mmoll = 1/mmoll_mgdl
def minimum_eventtime(X:pd.Series, mintime:str):
"""
Set event to false if minimum event time is violated
Note: a pandas rolling function does not work here, as it would not take into account the entire event
X - pd.Series with dtype int indicating event (1 or 0)
mintime - minimum even time
"""
for start, end in zip(X[X.diff() == 1].index, X[X.diff().shift(-1) == -1].index):
assert start <= end, f"Oops, there is something wrong with your event list. Start {start} is larger than end {end}."
if end - start < pd.to_timedelta(mintime):
X.loc[(X.index >= start) & (X.index <= end)] = 0
return X
def hypo(X:pd.Series, mintime:str='14min45S', unit:str='mgdl'):
"""
Calculate hypo according to definition of https://doi.org/10.2337/dc17-1600
Note: make sure your data has timestamp indices
"""
if unit == 'mmoll':
res = (X * mmoll_mgdl < glucose_levels['target'][0]).astype(int)
elif unit == 'mgdl':
res = (X < glucose_levels['target'][0]).astype(int)
res = minimum_eventtime(res, mintime)
res.loc[X.isna()] = np.nan
return res
def hyper(X:pd.Series, mintime:str='14min45S', unit:str='mgdl'):
"""
Calculate hyper according to definition of https://doi.org/10.2337/dc17-1600
Note: make sure your data has timestamp indices
"""
if unit == 'mmoll':
res = (X * mmoll_mgdl > glucose_levels['target'][1]).astype(int)
elif unit == 'mgdl':
res = (X > glucose_levels['target'][1]).astype(int)
res = minimum_eventtime(res, mintime)
res.loc[X.isna()] = np.nan
return res
def symmetric_scale(X, unit='mgdl'):
# symmetric scaling for blood glucose
if unit == 'mgdl':
return 1.509*(np.log(X)**1.084 - 5.381)
elif unit == 'mmoll':
return 1.794*(np.log(X)**1.026 - 1.861)
def LBGI(X):
# https://doi.org/10.2337/diacare.21.11.1870
return symmetric_scale(X).apply(lambda x: 10*x**2 if x < 0 else 0).mean()
def HBGI(X):
# https://doi.org/10.2337/dc06-1085
return symmetric_scale(X).apply(lambda x: 10*x**2 if x >= 0 else 0).mean()
def time_in_level(x, l, extend=True):
if extend:
levels = glucose_levels_ext
else:
levels = glucose_levels_
return ((x >= levels[l][0]) & (x <= levels[l][1])).sum()
def perc_in_level(x, l, extend=True):
if extend:
levels = glucose_levels_ext
else:
levels = glucose_levels_
return ((x >= levels[l][0]) & (x <= levels[l][1])).sum() / x.count() * 100
def calc_hr_zones(LTHR:float) -> list:
# Coggan HR zones
# https://www.trainingpeaks.com/blog/power-training-levels/
# https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.111.3820&rep=rep1&type=pdf
# 1 - Active Recovery
# 2 - Endurance
# 3 - Tempo
# 4 - Lactate Threshold
# 5 - VO2Max
return [0.69*LTHR, 0.84*LTHR, 0.95*LTHR, 1.06*LTHR]
def calc_power_zones(FTP:float) -> list:
# Coggan power zones
# https://www.trainingpeaks.com/blog/power-training-levels/
# https://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.111.3820&rep=rep1&type=pdf
# 1 - Active Recovery
# 2 - Endurance
# 3 - Tempo
# 4 - Lactate Threshold
# 5 - VO2Max
# 6 - Anaerobic Capacity
# Note: the numbers below define the starts of the zones
return [0.56*FTP, 0.76*FTP, 0.91*FTP, 1.06*FTP, 1.21*FTP]
def time_in_zone(X:pd.Series, zones:list) -> list:
# For each training session, calculate heart rate zones
time_in_zone = np.zeros(len(zones)+1)
for n, (z_min, z_max) in enumerate(zip([0]+zones, zones+[1e6])):
time_in_zone[n] = ((X >= z_min) & (X < z_max)).sum()
return time_in_zone
def combine_pedal_smoothness(left, right, balance):
return left*(balance.clip(0,100)/100) + right*(1-balance.clip(0,100)/100)
def elevation_gain(altitude: pd.Series):
# calculate the total elevation gain during a workout
return altitude.diff()[altitude.diff() > 0].sum()
def elevation_loss(altitude: pd.Series):
# calculate the total elevation loss during a workout
return altitude.diff()[altitude.diff() < 0].sum()
def normalised_power(power: pd.Series) -> float:
# TODO: should I ignore 0s?
# calculate normalised power (NP) for each training individually
# make sure power is a pandas series with a monotonic datetimeindex
return power.rolling('30s', min_periods=30).mean().pow(4).mean()**(1/4)
def intensity_factor(NP, FTP) -> float:
# calculate intensity factor (IF) as ratio between normalised power (NP) and functional threshold power (FTP)
return NP/FTP
def training_stress_score(T, IF) -> float:
# calculate training stress score (TSS) using duration of workout in seconds (T) and intensity factor (IF)
return 100 * (T/3600) * IF**2
def variability_index(NP:float, power: pd.Series) -> float:
# calculate variability index (VI) using normalised power (NP) and power
# for each training individually
return NP / power.mean()
def efficiency_factor(NP:float, heart_rate: pd.Series) -> float:
# calculate efficiency factor (EF) using normalised power (NP) and heart rate
# for each training individually
return NP / heart_rate.mean()
def chronic_training_load(TSS: pd.Series):
# calculate chronic training load (CTL) (fitness)
return TSS.ewm(span=42).mean()
def acute_training_load(TSS: pd.Series):
# calculate acute training load (ATL) (fatigue)
return TSS.ewm(span=7).mean()
def training_stress_balance(CTL, ATL):
# calculate the training stress balance (TSB) (form) from chronic training load (CTL) and acute training load (ATL)
return CTL - ATL
def agg_power(X:pd.DataFrame, FTP):
# For each training session, calculate power characteristics
T = len(X)
NP = normalised_power(X['power'])
IF = intensity_factor(NP, FTP)
TSS = training_stress_score(T, IF)
VI = variability_index(NP, X['power'])
EF = efficiency_factor(NP, X['heart_rate'])
return pd.Series({'normalised_power' : NP,
'intensity_factor' : IF,
'training_stress_score' : TSS,
'variability_index' : VI,
'efficiency_factor' : EF})
def agg_zones(X:pd.DataFrame, hr_zones:list, power_zones:list):
time_in_hr_zone = {'time_in_hr_zone%s'%(n+1):t for n, t in enumerate(time_in_zone(X['heart_rate'], hr_zones))}
time_in_power_zone = {'time_in_power_zone%s'%(n+1):t for n, t in enumerate(time_in_zone(X['power'], power_zones))}
return pd.Series({**time_in_hr_zone, **time_in_power_zone})
def stats_cgm(x, sec='', col='Glucose Value (mg/dL)'):
return {'time_in_hypo_'+sec : time_in_level(x[col], 'hypo'),
'time_in_hypoL2_'+sec : time_in_level(x[col], 'hypo L2'),
'time_in_hypoL1_'+sec : time_in_level(x[col], 'hypo L1'),
'time_in_target_'+sec : time_in_level(x[col], 'target'),
'time_in_hyper_'+sec : time_in_level(x[col], 'hyper'),
'time_in_hyperL1_'+sec : time_in_level(x[col], 'hyper L1'),
'time_in_hyperL2_'+sec : time_in_level(x[col], 'hyper L2'),
'perc_in_hypo_'+sec : perc_in_level(x[col], 'hypo'),
'perc_in_hypoL2_'+sec : perc_in_level(x[col], 'hypo L2'),
'perc_in_hypoL1_'+sec : perc_in_level(x[col], 'hypo L1'),
'perc_in_target_'+sec : perc_in_level(x[col], 'target'),
'perc_in_hyper_'+sec : perc_in_level(x[col], 'hyper'),
'perc_in_hyperL1_'+sec : perc_in_level(x[col], 'hyper L1'),
'perc_in_hyperL2_'+sec : perc_in_level(x[col], 'hyper L2'),
'glucose_mean_'+sec : x[col].mean(),
'glucose_std_'+sec : x[col].std(),
'glucose_cv_'+sec : x[col].std() / x[col].mean() * 100,
'glucose_rate_'+sec : x['glucose_rate'].mean(),
'completeness_'+sec : x[col].count() / x['timestamp'].count(),
'count_'+sec : x[col].count(),
'LBGI_'+sec : LBGI(x[col]),
'HBGI_'+sec : HBGI(x[col]),
'AUC_'+sec : np.trapz(y=x[col], x=x['timestamp']) / np.timedelta64(5, 'm'),
'hypo_'+sec : x['hypo'].any(),
'hyper_'+sec : x['hyper'].any()}
def agg_stats(X:pd.DataFrame):
# apply flirt statistics to every column of data
results = {}
for col in X:
res = {}
for name, func in flirt_functions.items():
res[name] = func(X[col])
results[col] = pd.Series(res)
return pd.concat(results)