forked from ronghuaiyang/arcface-pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest.py
329 lines (264 loc) · 12.9 KB
/
test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
# -*- coding: utf-8 -*-
"""
Created on 18-5-30 下午4:55
@author: ronghuaiyang
"""
from __future__ import print_function
import argparse
import logging
import os
import random
import time
import numpy as np
import torch
from torch.utils.data import DataLoader
import utils
from config import Config
from utils.dataset import get_dataset
logger = logging.getLogger(__name__)
def get_tester(type, opt, device):
# 准备数据,如果mode是"mnist",使用MNIST数据集
# 可视化,其实就是使用MNIST数据集,训练一个2维向量
# mnist数据,用于可视化的测试
if type.startswith("mnist"):
logger.info("构建Mnist测试类")
tester = MnistTester(opt, device)
else:
logger.info("构建人脸测试类")
tester = FaceTester(device)
return tester
class Tester():
"""
为了适配2种测试:一种是人脸的,一种是实验用的MNIST(为了可视化)
"""
def acc(self, model, metric, opt):
pass
def calculate_features(self, model, opt):
pass
class MnistTester(Tester):
def __init__(self, opt, device):
dataset = get_dataset(train=False, type='mnist', opt=opt)
self.data_loader = DataLoader(dataset,
batch_size=32, # 测试 = 3 | 32
shuffle=True,
num_workers=0)
self.device = device
def acc(self, model, opt):
correct = 0
start = time.time()
for index, data in enumerate(self.data_loader):
imgs_of_batch, label = data
# bugfix:...found at least two devices, cuda:0 and cpu!
imgs_of_batch, label = imgs_of_batch.to(self.device), label.to(self.device)
# 预测
with torch.no_grad():
output = model.predict(imgs_of_batch)
# 本来还想要再经过一下arcface的metrics,也就是论文的那个s*cos(θ+m),
# 但是,突然反思了一下,觉得不对,因为那个是需要同时传入label,我靠,我用网络就是为了argmax得到label,你让我传给你label,什么鬼?
# 显然我是理解错了,对比看了真实人脸的acc代码,在下面FaceTest.acc的实现里,test_performance方法里,
# 那个根本没有用metrics(也就是arcface的loss),而是直接用resnet的输出,算出两个不同的x1、x2的夹角,
# 且通过一堆人脸(6000个)得到一个阈值,来判断是不是同一人脸,人家是在做这事!
#
# 而我们这个acc,就是要简单的判断是哪个数字,不是要判断2张图是不是同一数字啊。
# 我只要看从resnet出来的向量就可以,argmax的那个就是最像的类别(不用softmax了,softmax只是为了放大而已)
pred = output.max(1, keepdim=True)[1]
correct += pred.eq(label.view_as(pred)).sum().item()
acc = correct / (index * self.data_loader.batch_size)
logger.info("测试了%d条,正确%d条,正确率:%.4f,耗时:%.2f",
index * self.data_loader.batch_size,
correct, acc, time.time() - start)
return acc
def calculate_features(self, model, image_paths):
features = None
labels = None
for data, label in self.data_loader:
data = data.to(self.device) # 放到显存中,用于加速
# you don't need to calculate gradients for forward and backward phase.防止OOM
with torch.no_grad():
__features = model.extract_feature(data)
__features = __features.cpu() # 用cpu()值替换掉原引用,导致旧引用回收=>GPU内存回收,解决OOM问题
if features is None:
features = __features.numpy()
else:
features = np.concatenate((features, __features.numpy()))
if labels is None:
labels = label.numpy()
else:
labels = np.concatenate((labels, label.numpy()))
return features, labels
class FaceTester(Tester):
def __init__(self, device):
self.device = device
def calculate_features(self, model, dir, face_image_names):
"""
image_paths: 所有的图片的路径(全路径)
"""
image_feature_dict = {}
# image_name 带着1层目录名
for i, image_name in enumerate(face_image_names):
image_full_path = os.path.join(dir,image_name)
image = utils.load_image(image_full_path)
if image is None: continue
# 转成 cuda tensor
data = np.array([image])
data = torch.from_numpy(data)
data = data.to(self.device)
# logger.debug("推断要求输入:%r", list(model.parameters())[0].shape)
# logger.debug("推断实际输入:%r", data.shape)
feature = model.extract_feature(data)[0]
feature = feature.cpu().detach().numpy() # cpu(显存=>内存),detach(去掉梯度), numpy(tensor转成numpy)
# logger.debug("推断实际输出(%s):%r", image_name, feature.shape)
# logger.debug("推断实际输出:%r", feature)
image_feature_dict[image_name] = feature
return image_feature_dict
def load_model(self, model, model_path):
model_dict = model.state_dict()
pretrained_dict = torch.load(model_path)
pretrained_dict = {k: v for k, v in pretrained_dict.items() if k in model_dict}
model_dict.update(pretrained_dict)
model.load_state_dict(model_dict)
def cosin_metric(self, x1, x2):
"""
x1 。x2
--------- ==> 这个就是x1,x2的夹角θ的cos值,arcface的θ,是和权重W_i的角度,不是两张脸的features:x1、x2的角度,
|x1|*|x2| 但因为模型导致某一类他们都压缩成一个聚集的类了,变相地导致了,不同类之间的角度就很大,
"""
return np.dot(x1, x2) / (np.linalg.norm(x1) * np.linalg.norm(x2))
def cal_accuracy(self, y_score, y_true):
"""
从一堆的 脸脸对,得到cosθ差异值,
然后用每一个cosθ差异值当阈值,来算正确率,
对应最好的正确率的那个阈值,当做最好的阈值。
这个算法有点意思。
y_score: 是x1,x2之间的夹角θ的cosθ值,
一共有多少个呢?有很多个,一对就是一个,我记得测试数据有6000个,3000个同一个人的脸,3000个不同人的脸,
"""
y_score = np.asarray(y_score)
y_true = np.asarray(y_true)
best_acc = 0
best_th = 0
for i in range(len(y_score)):
th = y_score[i]
# y_score是夹角的余弦值,你可以理解成一个夹角,大于这个值(也就是小于某个夹角,余弦是递减的),就是同一人
# 然后,我用所有的比对的cos值,都试验一遍,效果最好的那个值(也就是某个夹角角度),就是最好的阈值
y_test = (y_score >= th)
acc = np.mean((y_test == y_true).astype(int))
if acc > best_acc:
logger.debug("更好的acc:%r > %r(旧)", acc, best_acc)
logger.debug("更好的阈值:%r vs %r(旧)", th, best_th)
best_acc = acc
best_th = th
logger.debug("最好的阈值: %r,最好的ACC:%r", best_th, best_acc)
return (best_acc, best_th)
def test_performance(self, feature_dict, pairs):
sims = []
labels = []
for face1, face2, label in pairs:
feature_1 = feature_dict.get(face1, None)
feature_2 = feature_dict.get(face2, None)
if feature_1 is None or feature_2 is None:
continue
sim = self.cosin_metric(feature_1, feature_2) # 计算cosθ
sims.append(sim)
labels.append(label)
acc, th = self.cal_accuracy(sims, labels)
return acc, th
def extract_face_images(self, face1_face2_label_list):
face_image_paths = []
for face1, face2, _ in face1_face2_label_list:
if face1 not in face_image_paths:
face_image_paths.append(face1)
if face1 not in face_image_paths:
face_image_paths.append(face2)
return face_image_paths
def acc(self, model, opt):
"""
重构后的测试入口,它去加载 形如 "xxx.jpg xxx.jpg 1"的lfw的测试文件,0/1表示是不是同一个人的脸,
"""
model.eval()
# 加载所有的face1\face2\是否同一人
face1_face2_label_list = self.load_test_pairs(opt.lfw_test_pair_path, opt.test_pair_size)
# 得到所有的人脸图片名字(包含1层目录)
face_image_names = self.extract_face_images(face1_face2_label_list)
s = time.time()
image_feature_dicts = self.calculate_features(model, opt.lfw_root, face_image_names)
# logger.debug("人脸的特征维度:%r", len(image_feature_dicts))
t = time.time() - s
logger.info('[验证]耗时: %.2f秒, 每张耗时:%.4f秒', t, t / len(image_feature_dicts))
acc, th = self.test_performance(image_feature_dicts, face1_face2_label_list)
logger.info("[验证]测试%d对人脸,(最好)正确率%.2f,(适配出来的最好的阈值%.2f)", len(face1_face2_label_list), acc, th)
return acc
def caculate_samples(self, model, opt):
"""
重构后的测试入口,它去加载 形如 "xxx.jpg xxx.jpg 1"的lfw的测试文件,0/1表示是不是同一个人的脸,
"""
# 仅装载前10个人的脸
face_dirs = self.load_samples(opt.lfw_root, opt.test_classes)
start = time.time()
different_faces = []
count = 0
for face_dir, file_num in face_dirs:
# 一个人脸文件夹,包含多个人脸
file_names = os.listdir(face_dir)
count += len(file_names)
full_paths = [os.path.join(face_dir, file_name) for file_name in file_names]
image_feature_dicts = self.calculate_features(model, full_paths)
different_faces.append(list(image_feature_dicts.values())) # 只保留value数据,多个人脸
t = time.time() - start
logger.info('[计算%d个人的%d张人脸] 耗时: %.2f秒, 每张耗时:%.4f秒', opt.len(face_dirs), count, t, t / count)
return different_faces
def load_test_pairs(self, test_file_path, pair_size):
"""
各加载pair_size的一半的比较对
"""
fd = open(test_file_path, 'r')
lines = fd.readlines()
fd.close()
random.shuffle(lines) # shuffle一下
positive_list = []
negtive_list = []
face1_face2_label_list = []
half_size = pair_size // 2
for line in lines:
line = line.strip()
pairs = line.split()
face1 = pairs[0]
face2 = pairs[1]
label = int(pairs[2])
if label == 1:
positive_list.append([face1, face2, label])
if label == 0:
negtive_list.append([face1, face2, label])
face1_face2_label_list += positive_list[:half_size]
face1_face2_label_list += negtive_list[:half_size]
logger.info("从[%s]加载比较对%d个", test_file_path, len(face1_face2_label_list))
return face1_face2_label_list
def load_samples(self, dir, size):
"""
加载测试集中,人脸最多的前N个人,这个用于embedding显示
"""
dirs = os.listdir(dir)
dirs = [os.path.join(dir, sub_dir) for sub_dir in dirs]
dirs = [dir for dir in dirs if os.path.isdir(dir)]
logger.debug("从目录[%s],加载测试文件夹:%d 个", dir, len(dirs))
dir_files = {}
for dir in dirs:
dir_files[dir] = len(os.listdir(dir))
sored_dir_files = [[k, v] for k, v in sorted(dir_files.items(), key=lambda item: item[1])]
# sored_dir_files = sored_dir_files[-size:]
sored_dir_files = sored_dir_files[:3]
logger.debug("过滤后,剩余%d个文件夹", len(sored_dir_files))
return sored_dir_files
# bin/train.docker 1 test --model arcface_e39_s246948_202110121044_l0.51_a0.00.model
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--model", default=None, type=str)
args = parser.parse_args()
logger.info("参数配置:%r", args)
utils.init_log()
opt = Config()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # torch.device代表将torch.Tensor分配到的设备的对象
tester = FaceTester(device)
model = utils.load_model(args.model,device,opt)
acc = tester.acc(model, opt)
logger.info("测试acc:%r", acc)