-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
146 lines (117 loc) · 4.47 KB
/
utils.py
File metadata and controls
146 lines (117 loc) · 4.47 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
"""
File containing some helper functions.
"""
from pathlib import Path
from typing import Callable
from functools import wraps
import time
import yaml
import pandas as pd
import SimpleITK as sitk
def parse_cfg(cfg_path: Path) -> dict:
"""
Parses the yaml file in the path to a dictionary.
Args:
cfg_path (Path): Path to the yaml configuration file
Returns:
dict: Dictionary containing the configuration information
"""
with open(cfg_path, "r") as cfgyaml:
cfg = yaml.safe_load(cfgyaml)
return cfg
def generate_input_csv_dicom(
root_path: Path, subjects: list[str], output_path: Path = None
) -> pd.DataFrame:
"""
Creates an input file for the pipeline for dicom images.
Args:
root_path (Path): Root path of the folder containing the nifti images
subjects (list[str]): List of subjects for which to look for dicom folders.
output_path (Path, optional): Output path of the csv file. Defaults to None.
Returns:
pd.DataFrame: Dataframe containing the subjects and the folder paths
"""
if not output_path:
output_path = root_path
subject_paths = [
subject_path
for subject_path in root_path.glob(f"**/")
if subject_path.name in subjects
]
subject_scan_tuples = []
for subject_path in subject_paths:
subject_scan_tuples += [
(subject_path.name, scan_path.parent)
for scan_path in subject_path.glob(f"**/*.dcm")
]
df = pd.DataFrame(subject_scan_tuples, columns=["subjects", "scan_paths"])
df = df.drop_duplicates(ignore_index=True)
df.to_csv(str(output_path / "dicom_input_csv.csv"), index=False)
return df
def generate_input_csv_nifti(
root_path: Path, output_path: Path = None, keyword: str = ""
) -> pd.DataFrame:
"""
Creates an input file for the pipeline for nifti images.
Args:
root_path (Path): Root path of the folder containing the nifti images
output_path (Path, optional): Output path of the csv file. Defaults to None.
keyword (str, optional): keyword used to select the nifti files. Defaults to "".
Returns:
pd.DataFrame: Dataframe containing the subjects and the file paths
"""
if not output_path:
output_path = root_path
scan_paths = [
nii_path for nii_path in root_path.glob("**/*.nii") if keyword in str(nii_path)
]
df = pd.DataFrame({"scan_paths": scan_paths})
df.to_csv(str(output_path / "nifti_input_csv.csv"), index=False)
return df
def time_func(func: Callable) -> Callable:
"""
Decorator function that times how long it takes to run a function.
Args:
func (Callable): The function that should be timed.
Returns:
Callable: The function that will be timed.
"""
@wraps(func)
def time_wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = func(*args, **kwargs)
end_time = time.perf_counter()
total_time = end_time - start_time
print(f"Function {func.__name__}{args} {kwargs} Took {total_time:.4f} seconds")
return result
return time_wrapper
def log_nifti_info(log_dict: dict, nifti_img: sitk.Image, tag: str) -> None:
"""Logs usefull information about a nifti file to the supplied log_dict
Args:
log_dict (dict): the dictionary where the info should be logged to.
nifti_img (sitk.Image): the nifti image whose information should be logged
tag (str): tag to add to the logged value (i.e. input/output)
"""
log_dict[f"{tag}_shape"] = nifti_img.GetSize()
log_dict[f"{tag}_spacing"] = nifti_img.GetSpacing()
log_dict[f"{tag}_pixel"] = nifti_img.GetPixelIDTypeAsString()
def get_max_dimensions(data_path: Path):
"""Gets the maximum dimensions of images in a dataset. Requires them to have a change_log.yaml,
generated by the processing pipeline.
Args:
data_path (Path): Root path of the processed files.
Returns:
Tuple[int, int, int]: Tuple with the maximum sizes in x, y and z directions
"""
log_paths = data_path.glob("**/change_log.yaml")
max_x = 0
max_y = 0
max_z = 0
for log_path in log_paths:
log_dict = parse_cfg(log_path)
last_step = log_dict["last_step"]
output_shape = log_dict[last_step].get("output_shape", (0, 0, 0))
max_x = max(max_x, output_shape[0])
max_y = max(max_y, output_shape[1])
max_z = max(max_z, output_shape[2])
return (max_x, max_y, max_z)