-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils_eval.py
More file actions
executable file
·130 lines (102 loc) · 3.84 KB
/
utils_eval.py
File metadata and controls
executable file
·130 lines (102 loc) · 3.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import numpy as np
import torch
from lifelines import KaplanMeierFitter
### C(t)-INDEX CALCULATION
def c_index(Prediction, Time_survival, Death, Time):
'''
This is a cause-specific c(t)-index
- Prediction : risk at Time (higher --> more risky)
- Time_survival : survival/censoring time
- Death :
> 1: death
> 0: censored (including death from other causes)
- Time : time of evaluation (time-horizon when evaluating C-index)
'''
N = len(Prediction)
A = np.zeros((N, N))
Q = np.zeros((N, N))
N_t = np.zeros((N, N))
Num = 0
Den = 0
for i in range(N):
A[i, np.where(Time_survival[i] < Time_survival)] = 1
Q[i, np.where(Prediction[i] > Prediction)] = 1
if (Time_survival[i] <= Time and Death[i] == 1):
N_t[i, :] = 1
Num = np.sum(((A) * N_t) * Q)
Den = np.sum((A) * N_t)
if Num == 0 and Den == 0:
result = -1 # Not able to compute c-index!
else:
result = float(Num / Den)
return result
### BRIER-SCORE
def brier_score(Prediction, Time_survival, Death, Time):
N = len(Prediction)
y_true = ((Time_survival <= Time) * Death).astype(float)
return np.mean((Prediction - y_true) ** 2)
##### WEIGHTED C-INDEX & BRIER-SCORE
def CensoringProb(Y, T):
T = T.reshape([-1]) # (N,) - np array
Y = Y.reshape([-1]) # (N,) - np array
kmf = KaplanMeierFitter()
kmf.fit(T, event_observed=(Y == 0).astype(int)) # Censoring prob = survival probability of event "censoring"
G = np.asarray(kmf.survival_function_.reset_index()).transpose()
# Fill 0 with ZoH (to prevent NaN values)
G[1, G[1, :] == 0] = G[1, G[1, :] != 0][-1]
return G
### C(t)-INDEX CALCULATION: this accounts for the weighted average for unbiased estimation
def weighted_c_index(T_train, Y_train, Prediction, T_test, Y_test, Time):
'''
This is a cause-specific c(t)-index
- Prediction : risk at Time (higher --> more risky)
- Time_survival : survival/censoring time
- Death :
> 1: death
> 0: censored (including death from other cause)
- Time : time of evaluation (time-horizon when evaluating C-index)
'''
G = CensoringProb(Y_train, T_train)
N = len(Prediction)
A = np.zeros((N, N))
Q = np.zeros((N, N))
N_t = np.zeros((N, N))
Num = 0
Den = 0
for i in range(N):
tmp_idx = np.where(G[0, :] >= T_test[i])[0]
if len(tmp_idx) == 0:
W = (1. / G[1, -1]) ** 2
else:
W = (1. / G[1, tmp_idx[0]]) ** 2
A[i, np.where(T_test[i] < T_test)] = 1. * W
Q[i, np.where(Prediction[i] > Prediction)] = 1. # Give weights
if (T_test[i] <= Time and Y_test[i] == 1):
N_t[i, :] = 1.
Num = np.sum(((A) * N_t) * Q)
Den = np.sum((A) * N_t)
if Num == 0 and Den == 0:
result = -1 # Not able to compute c-index!
else:
result = float(Num / Den)
return result
### WEIGHTED BRIER SCORE: This accounts for the weighted average for unbiased estimation
def weighted_brier_score(T_train, Y_train, Prediction, T_test, Y_test, Time):
G = CensoringProb(Y_train, T_train)
N = len(Prediction)
W = np.zeros(len(Y_test))
Y_tilde = (T_test > Time).astype(float)
for i in range(N):
tmp_idx1 = np.where(G[0, :] >= T_test[i])[0]
tmp_idx2 = np.where(G[0, :] >= Time)[0]
if len(tmp_idx1) == 0:
G1 = G[1, -1]
else:
G1 = G[1, tmp_idx1[0]]
if len(tmp_idx2) == 0:
G2 = G[1, -1]
else:
G2 = G[1, tmp_idx2[0]]
W[i] = (1. - Y_tilde[i]) * float(Y_test[i]) / G1 + Y_tilde[i] / G2
y_true = ((T_test <= Time) * Y_test).astype(float)
return np.mean(W * (Y_tilde - (1. - Prediction)) ** 2)