-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathstage1.py
More file actions
126 lines (105 loc) · 6.16 KB
/
stage1.py
File metadata and controls
126 lines (105 loc) · 6.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import preprocessing_utils as utils
import os
import SimpleITK as sitk
import logging
import sys
# Set this to true if you want to skip already pre-processsed patients (checks if output files already exists)
skip_existing = True
if __name__ == "__main__":
## set up logging to console and file
log_file = 'stage1.log'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler('stage1.log',mode = 'a')
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
console_handler = logging.StreamHandler()
console_handler.setFormatter(logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s'))
logger.addHandler(file_handler)
logger.addHandler(console_handler)
logger.info('Starting stage 1 preprocessing')
## load pre-processing configuration from .csv file (sys.argv[1])
file = sys.argv[1]
if file.endswith('.csv'):
patient_dict = utils.csv_to_dict(file)
else:
logger.error('Input file must be a csv file')
sys.exit(1)
for pat in patient_dict:
patient = patient_dict[pat]
# check if output files already exist and skip if flag is set
if skip_existing:
if patient['task'] == 1:
if (os.path.isfile(os.path.join(patient['output_dir'],'mr_s1.nii.gz')) and
os.path.isfile(os.path.join(patient['output_dir'],'ct_s1.nii.gz')) and
os.path.isfile(os.path.join(patient['output_dir'],'fov_s1.nii.gz'))):
logger.info(f'Patient {pat} already pre-processed. Skipping...')
continue
elif patient['task'] == 2:
if (os.path.isfile(os.path.join(patient['output_dir'],'cbct_s1.nii.gz')) and
os.path.isfile(os.path.join(patient['output_dir'],'ct_s1.nii.gz')) and
os.path.isfile(os.path.join(patient['output_dir'],'fov_s1.nii.gz'))):
logger.info(f'Patient {pat} already pre-processed. Skipping...')
continue
# log patient details
logger.info(f'''Processing case {pat}:
Input: {patient['input_path']}
CT: {patient['ct_path']}
Output: {patient['output_dir']}''')
# Load input (CBCT or MRI) and CT as sitk images, if dicom provide path to directory, if other file type provide path to file
input = utils.read_image(patient['input_path'],log=logger)
ct = utils.read_image(patient['ct_path'],log=logger)
# if necessary correct orientation of input/MRI (swap or flip axes)
input = utils.correct_image_properties(input_image = input,
order = patient['order'],
flip = patient['flip'],
mr_overlap_correction = patient['mr_overlap_correction'],
intensity_shift = patient['intensity_shift'],
log = logger)
# calculate CBCT/MRI FOV mask
if patient['task'] == 1:
fov_mask = utils.get_mr_fov(input)
if patient['task'] == 2:
fov_mask = utils.get_cbct_fov(input,background=patient['background'],log=logger)
# Register CBCT/MRI to CT
parameter_file = patient['registration']
if patient['reg_fovmask']:
logger.info('Registering input to CT using FOV mask...')
input, transform = utils.rigid_registration(ct, input, parameter_file, default_value=patient['background'], log=logger, mask=fov_mask)
else:
logger.info('Registering input to CT without FOV mask...')
input, transform = utils.rigid_registration(ct, input, parameter_file, default_value=patient['background'], log=logger)
# Apply transformation to CBCT/MRI FOV mask
fov_mask_reg = utils.apply_transform(fov_mask,transform,ct)
# Deface CBCT/MRI and CT
if patient['defacing']:
# Get structures for defacing
brain,skull = utils.segment_defacing(ct,log=logger)
defacing_mask = utils.defacing(brain,skull,log=logger)
ct = sitk.Mask(ct,defacing_mask,outsideValue=-1024,maskingValue=1)
input = sitk.Mask(input,defacing_mask,outsideValue=patient['background'],maskingValue=1)
# Resample everything
input = utils.resample_image(input,new_spacing=patient['resample'],log=logger)
ct = utils.resample_image(ct,new_spacing=patient['resample'],log=logger)
fov_mask_reg = utils.resample_image(fov_mask_reg,new_spacing=patient['resample'],log=logger)
if patient['defacing']:
defacing_mask = utils.resample_image(defacing_mask,new_spacing=patient['resample'],log=logger)
# Save registered,defaced CBCT/MRI and CT, input/MRI FOV mask and transform
if not os.path.isdir(patient['output_dir']):
os.mkdir(patient['output_dir'])
logger.info('Creating output directory...')
else:
logger.warning('Output directory already exists. Overwriting existing files...')
if patient['task'] == 1:
utils.save_image(input,os.path.join(patient['output_dir'],'mr_s1.nii.gz'))
if patient['task'] == 2:
utils.save_image(input,os.path.join(patient['output_dir'],'cbct_s1.nii.gz'))
utils.save_image(fov_mask_reg,os.path.join(patient['output_dir'],'fov_s1.nii.gz'))
utils.save_image(ct,os.path.join(patient['output_dir'],'ct_s1.nii.gz'))
if patient['defacing']:
utils.save_image(defacing_mask,os.path.join(patient['output_dir'],'defacing_mask.nii.gz'))
sitk.WriteTransform(transform,os.path.join(patient['output_dir'],'transform.tfm'))
# convert rtstruct to nrrd
if not patient['struct_path']=='':
utils.convert_rtstruct_to_nrrd(patient['struct_path'],patient['output_dir'],log=logger)
# generate overview png
utils.generate_overview_stage1(ct,input,patient['output_dir'])