-
Notifications
You must be signed in to change notification settings - Fork 65
/
fl_evaluate.py
140 lines (125 loc) · 4.75 KB
/
fl_evaluate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Ke Chen
# HTS-AT: A HIERARCHICAL TOKEN-SEMANTIC AUDIO TRANSFORMER FOR SOUND CLASSIFICATION AND DETECTION
# Evalute the localization performance in DESED
import numpy as np
import argparse
import os
import matplotlib.pyplot as plt
import config
from utils import int16_to_float32
import soundfile as sf
import h5py
import pandas as pd
from tqdm import tqdm
from sklearn.metrics import average_precision_score
from sklearn.metrics import roc_auc_score
import math
def scratch_optim(preds, threshold, sec_threshold, mode):
new_preds = np.zeros(preds.shape)
if threshold == 0:
return preds
for i in range(len(preds)):
new_preds[i] = preds[i]
if threshold - sec_threshold < preds[i] < threshold + sec_threshold:
if preds[i] < threshold:
if mode > 1:
new_preds[i] = 2 * threshold - preds[i]
else:
if mode < 3:
new_preds[i] = 2 * preds[i] - threshold
return new_preds
def label_enhance(preds):
class_map = np.load(config.class_map_path, allow_pickle=True)
new_preds = np.zeros(preds.shape)
for i in tqdm(range(len(preds))):
for j in range(preds.shape[-1]):
new_preds[i, j] += preds[i, j]
for add_key in class_map[j][2]:
new_preds[i, add_key] += preds[i, j] / len(class_map[j][2])
# new_preds[i, add_key] = max(1.0, preds[i, j] / len(class_map[j][1]) + new_preds[i, add_key])
return new_preds
def process(f_dir, f_name, f_class, f_map):
# load data
f_idx = 0
rf_name = os.path.join(f_dir, f_name + "_cuda:" + str(f_idx) + ".npy")
load_file = []
while os.path.exists(rf_name):
load_file.append(np.load(rf_name, allow_pickle = True))
print("Load:", rf_name)
f_idx += 1
rf_name = os.path.join(f_dir, f_name + "_cuda:" + str(f_idx) + ".npy")
load_file = np.concatenate(load_file, axis = 0)
print(len(load_file))
print(load_file[0]["heatmap"].shape)
output_maps = {
"filename": [],
"onset":[],
"offset":[],
"event_label":[]
}
meta_maps = {
"filename": [],
"duration": []
}
for d in tqdm(load_file):
audio_name = d["audio_name"]
real_len = math.ceil(d["real_len"] // config.hop_size * 1.024) # add ratio
pred_map = d["heatmap"][:real_len]
pred_map = fl_mapping(pred_map, f_map)
output_map = draw_timeline(pred_map, f_class)
for ops in output_map:
output_maps["filename"].append(audio_name)
output_maps["onset"].append(ops[0])
output_maps["offset"].append(ops[1])
output_maps["event_label"].append(ops[2])
meta_maps["filename"].append(audio_name)
meta_maps["duration"].append(d["real_len"] / config.sample_rate)
q_filename = os.path.join(f_dir, f_name + "_outputmap.tsv")
m_filename = os.path.join(f_dir, f_name + "_meta.tsv")
q = pd.DataFrame(
output_maps
)
q.to_csv(q_filename, index = False, sep="\t")
m = pd.DataFrame(
meta_maps
)
m.to_csv(m_filename, index = False, sep="\t")
def fl_mapping(heatmap, f_map):
# thres = [0.25, 0.1, 0.25, 0.15, 0.4, 0.1, 0.1, 0.4, 0.3, 0.1] # tscam + attn
thres = [0.275, 0.2, 0.20, 0.23, 0.48, 0.20, 0.18, 0.51, 0.39, 0.14] # tscam
# thres = [0.28, 0.20, 0.05, 0.1, 0.2, 0.2, 0.2, 0.2, 0.2, 0.05] # pann
output = np.zeros((len(heatmap), len(f_map)))
f_maps = np.concatenate(f_map)
for hidx, d in enumerate(heatmap):
didx = np.where(d > min(thres))[0]
for dd in didx:
for fidx, ff_map in enumerate(f_map):
if dd in ff_map and (d[dd] > thres[fidx]):
output[hidx, fidx] = 1
break
return output
def draw_timeline(heatmap, f_class):
output = []
step = config.hop_size / config.sample_rate * 1000 / 1024 # add ratio
for fidx, cls in enumerate(f_class):
sta = 0
prev = heatmap[sta, fidx]
for i in range(1, len(heatmap)):
if heatmap[i, fidx] != prev:
if prev != 0:
output.append([sta * step, i * step, cls])
prev = heatmap[i, fidx]
sta = i
if prev != 0:
output.append([sta * step, len(heatmap) * step, cls])
return output
def main():
# default settings
# t = np.load("label_flipping_setting.npy",allow_pickle=True).item()
print("Load File Group:", config.test_file)
print("Class:", config.fl_class_num)
process(config.heatmap_dir, config.test_file, config.fl_class_num, config.fl_audioset_mapping)
# audioset_process(config.heatmap_dir, config.test_file)
if __name__ == '__main__':
main()