-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathfun.py
133 lines (110 loc) · 4.84 KB
/
fun.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
import numpy as np
import scipy as sp
import pandas as pd
from numba import njit
from time import time
# result of method and computing time
def measure_runtime(func, *args, **kwargs):
start_time = time()
result = func(*args, **kwargs)
runtime = time() - start_time
return result, runtime
# metrics for various methods
def method_metrics(method_result, beta_index, beta_value):
SC = int(np.all(np.in1d(beta_index, method_result['index'])))
CF = int(np.array_equal(method_result['index'], beta_index))
AMS = len(method_result['index'])
PSR = len(np.intersect1d(method_result['index'], beta_index)) / len(beta_index)
FDR = len(np.setdiff1d(method_result['index'], beta_index)) / len(method_result['index'])
ME = np.linalg.norm(beta_value - method_result['beta_value'])
return SC, CF, AMS, PSR, FDR, ME
# mean, std calculation
def mean_std(method_index):
mean_values = np.mean(method_index, axis=0)
std_values = np.std(method_index, axis=0)
# Create a DataFrame with 'mean' and 'std' as columns
result = pd.DataFrame({
'mean': mean_values,
'std': std_values
})
return result
# index=['sc', 'cf', 'ams', 'psr', 'fdr', 'aee', 'time'])
# check loss function 𝜌_𝜏(x)
def check_fun(tau, x):
return np.abs(x) / 2 + (tau - 0.5) * x
# summation of check loss function
# tau is 1-d array, beta is column vector
def cqr_check_sum(X, Y, m, tau, beta):
machine_index = np.zeros(m)
for i in range(m):
residuals = Y[i] - X[i] @ beta
machine_index[i] = np.mean( check_fun(tau, residuals) )
return np.mean(machine_index)
# kernel funtion K(u)
def kernel_fun(x, kernel):
if kernel == 'Gaussian':
return sp.stats.norm.pdf(x)
elif kernel == 'Laplacian':
return np.exp(- np.abs(x)) / 2
elif kernel == 'Logistic':
return 1 / (np.exp(x) + np.exp(-x) + 2)
elif kernel == 'Uniform':
return np.where(abs(x) <= 1, 0.5, 0)
elif kernel == 'Epanechnikov':
return np.where( abs(x) <= 1, 0.75 * (1 - x ** 2), 0 )
# smooth convolution function 𝓁_h(u)
# tau is 1-d array
def convu_fun(h, tau, x, kernel):
if kernel == 'Gaussian':
G = lambda x: x * sp.stats.norm.pdf(x) + x * ( 1 - 2 * sp.stats.norm.cdf(-x) )
return h / 2 * G(x / h) + (tau - 0.5) * x
elif kernel == 'Laplacian':
return check_fun(tau, x) + h / 2 * np.exp( -np.abs(x)/h )
elif kernel == 'Logistic':
return tau * x + h * np.log( 1 + np.exp(-x/h) )
elif kernel == 'Uniform':
U = lambda x: np.where( abs(x) <= 1, 0.5 + 0.5 * x ** 2, np.abs(x) )
return h / 2 * U(x / h) + (tau - 0.5) * x
elif kernel == 'Epanechnikov':
E = lambda x: np.where( abs(x) <= 1, 0.75 * x ** 2 - (1/8) * x ** 4 + (3/8), np.abs(x) )
return h / 2 * E(x / h) + (tau - 0.5) * x
# integrated kernel function: \int_{−∞}^{u} K(t) dt
def kernel_integral(x, kernel):
if kernel == 'Gaussian':
return sp.stats.norm.cdf(x)
elif kernel == 'Laplacian':
return 0.5 + 0.5 * np.sign(x) * ( 1 - np.exp(-np.abs(x)) )
elif kernel == 'Logistic':
return 1 / (1 + np.exp(-x))
elif kernel == 'Uniform':
return np.where(x > 1, 1, 0) + np.where(abs(x) <= 1, 0.5 * (1 + x), 0)
elif kernel == 'Epanechnikov':
return 0.25 * (2 + 3 * x / 5 ** 0.5 - (x / 5 ** 0.5) ** 3) * (abs(x) <= 5 ** 0.5) + (x > 5 ** 0.5)
# hard thresholding function
def hard_thresholding(x, k_retain):
ord = np.sort(np.abs(x),axis=None)[::-1]
a = x * ( np.abs(x) >= ord[k_retain - 1] )
return a
# soft thresholding function
def soft_thresholding(x, lambda_value):
return np.sign(x) * np.maximum(np.abs(x) - lambda_value, 0)
# gradient_beta is a list of gradient vector, each element is a column vector R^p
def deriv_i(X, Y, m, n, h, tau, beta, kernel):
gradient_beta = [0] * m
kernel_term = [0] * m
for i in range(m):
kernel_term[i] = kernel_integral( (X[i] @ beta - Y[i]) / h, kernel ) - tau
gradient_beta[i] = X[i].T @ kernel_term[i] / n
return gradient_beta
# objective function, X and Y are lists
def obj_fun(X, Y, m, n, h, tau, beta, kernel, deriv_first_initial, deriv_total_initial):
residuals = Y[0] - X[0] @ beta
first_term = np.sum( convu_fun(h, tau, residuals, kernel) ) / n
second_term = beta @ (deriv_first_initial[0] - deriv_total_initial)
return first_term - second_term
# derivative of objective function, X and Y are lists
def deriv_obj_fun(X, Y, m, n, h, tau, beta, kernel, deriv_first_initial, deriv_total_initial):
kernel_term = kernel_integral((X[0] @ beta - Y[0]) / h, kernel) - tau
first_term = X[0].T @ kernel_term / n
second_term = deriv_first_initial[0] - deriv_total_initial
return first_term - second_term