|
12 | 12 |
|
13 | 13 | from spikeinterface.qualitymetrics.utils import create_ground_truth_pc_distributions |
14 | 14 |
|
| 15 | +from spikeinterface.qualitymetrics.quality_metric_list import ( |
| 16 | + _misc_metric_name_to_func, |
| 17 | +) |
15 | 18 |
|
16 | 19 | from spikeinterface.qualitymetrics import ( |
| 20 | + get_quality_metric_list, |
17 | 21 | mahalanobis_metrics, |
18 | 22 | lda_metrics, |
19 | 23 | nearest_neighbors_metrics, |
|
34 | 38 | compute_amplitude_cv_metrics, |
35 | 39 | compute_sd_ratio, |
36 | 40 | get_synchrony_counts, |
| 41 | + compute_quality_metrics, |
37 | 42 | ) |
38 | 43 |
|
39 | 44 | from spikeinterface.core.basesorting import minimum_spike_dtype |
|
42 | 47 | job_kwargs = dict(n_jobs=2, progress_bar=True, chunk_duration="1s") |
43 | 48 |
|
44 | 49 |
|
| 50 | +def _small_sorting_analyzer(): |
| 51 | + recording, sorting = generate_ground_truth_recording( |
| 52 | + durations=[2.0], |
| 53 | + num_units=4, |
| 54 | + seed=1205, |
| 55 | + ) |
| 56 | + |
| 57 | + sorting = sorting.select_units([3, 2, 0], ["#3", "#9", "#4"]) |
| 58 | + |
| 59 | + sorting_analyzer = create_sorting_analyzer(recording=recording, sorting=sorting, format="memory") |
| 60 | + |
| 61 | + extensions_to_compute = { |
| 62 | + "random_spikes": {"seed": 1205}, |
| 63 | + "noise_levels": {"seed": 1205}, |
| 64 | + "waveforms": {}, |
| 65 | + "templates": {}, |
| 66 | + "spike_amplitudes": {}, |
| 67 | + "spike_locations": {}, |
| 68 | + "principal_components": {}, |
| 69 | + } |
| 70 | + |
| 71 | + sorting_analyzer.compute(extensions_to_compute) |
| 72 | + |
| 73 | + return sorting_analyzer |
| 74 | + |
| 75 | + |
| 76 | +@pytest.fixture(scope="module") |
| 77 | +def small_sorting_analyzer(): |
| 78 | + return _small_sorting_analyzer() |
| 79 | + |
| 80 | + |
| 81 | +def test_unit_structure_in_output(small_sorting_analyzer): |
| 82 | + |
| 83 | + qm_params = { |
| 84 | + "presence_ratio": {"bin_duration_s": 0.1}, |
| 85 | + "amplitude_cutoff": {"num_histogram_bins": 3}, |
| 86 | + "amplitude_cv": {"average_num_spikes_per_bin": 7, "min_num_bins": 3}, |
| 87 | + "firing_range": {"bin_size_s": 1}, |
| 88 | + "isi_violation": {"isi_threshold_ms": 10}, |
| 89 | + "drift": {"interval_s": 1, "min_spikes_per_interval": 5}, |
| 90 | + "sliding_rp_violation": {"max_ref_period_ms": 50, "bin_size_ms": 0.15}, |
| 91 | + "rp_violation": {"refractory_period_ms": 10.0, "censored_period_ms": 0.0}, |
| 92 | + } |
| 93 | + |
| 94 | + for metric_name in get_quality_metric_list(): |
| 95 | + |
| 96 | + try: |
| 97 | + qm_param = qm_params[metric_name] |
| 98 | + except: |
| 99 | + qm_param = {} |
| 100 | + |
| 101 | + result_all = _misc_metric_name_to_func[metric_name](sorting_analyzer=small_sorting_analyzer, **qm_param) |
| 102 | + result_sub = _misc_metric_name_to_func[metric_name]( |
| 103 | + sorting_analyzer=small_sorting_analyzer, unit_ids=["#4", "#9"], **qm_param |
| 104 | + ) |
| 105 | + |
| 106 | + if isinstance(result_all, dict): |
| 107 | + assert list(result_all.keys()) == ["#3", "#9", "#4"] |
| 108 | + assert list(result_sub.keys()) == ["#4", "#9"] |
| 109 | + assert result_sub["#9"] == result_all["#9"] |
| 110 | + assert result_sub["#4"] == result_all["#4"] |
| 111 | + |
| 112 | + else: |
| 113 | + for result_ind, result in enumerate(result_sub): |
| 114 | + |
| 115 | + assert list(result_all[result_ind].keys()) == ["#3", "#9", "#4"] |
| 116 | + assert result_sub[result_ind].keys() == set(["#4", "#9"]) |
| 117 | + |
| 118 | + assert result_sub[result_ind]["#9"] == result_all[result_ind]["#9"] |
| 119 | + assert result_sub[result_ind]["#4"] == result_all[result_ind]["#4"] |
| 120 | + |
| 121 | + |
| 122 | +def test_unit_id_order_independence(small_sorting_analyzer): |
| 123 | + """ |
| 124 | + Takes two almost-identical sorting_analyzers, whose unit_ids are in different orders and have different labels, |
| 125 | + and checks that their calculated quality metrics are independent of the ordering and labelling. |
| 126 | + """ |
| 127 | + |
| 128 | + recording = small_sorting_analyzer.recording |
| 129 | + sorting = small_sorting_analyzer.sorting.select_units(["#4", "#9", "#3"], [0, 2, 3]) |
| 130 | + |
| 131 | + small_sorting_analyzer_2 = create_sorting_analyzer(recording=recording, sorting=sorting, format="memory") |
| 132 | + |
| 133 | + extensions_to_compute = { |
| 134 | + "random_spikes": {"seed": 1205}, |
| 135 | + "noise_levels": {"seed": 1205}, |
| 136 | + "waveforms": {}, |
| 137 | + "templates": {}, |
| 138 | + "spike_amplitudes": {}, |
| 139 | + "spike_locations": {}, |
| 140 | + "principal_components": {}, |
| 141 | + } |
| 142 | + |
| 143 | + small_sorting_analyzer_2.compute(extensions_to_compute) |
| 144 | + |
| 145 | + # need special params to get non-nan results on a short recording |
| 146 | + qm_params = { |
| 147 | + "presence_ratio": {"bin_duration_s": 0.1}, |
| 148 | + "amplitude_cutoff": {"num_histogram_bins": 3}, |
| 149 | + "amplitude_cv": {"average_num_spikes_per_bin": 7, "min_num_bins": 3}, |
| 150 | + "firing_range": {"bin_size_s": 1}, |
| 151 | + "isi_violation": {"isi_threshold_ms": 10}, |
| 152 | + "drift": {"interval_s": 1, "min_spikes_per_interval": 5}, |
| 153 | + "sliding_rp_violation": {"max_ref_period_ms": 50, "bin_size_ms": 0.15}, |
| 154 | + } |
| 155 | + |
| 156 | + quality_metrics_1 = compute_quality_metrics( |
| 157 | + small_sorting_analyzer, metric_names=get_quality_metric_list(), qm_params=qm_params |
| 158 | + ) |
| 159 | + quality_metrics_2 = compute_quality_metrics( |
| 160 | + small_sorting_analyzer_2, metric_names=get_quality_metric_list(), qm_params=qm_params |
| 161 | + ) |
| 162 | + |
| 163 | + for metric, metric_1_data in quality_metrics_1.items(): |
| 164 | + assert quality_metrics_2[metric][3] == metric_1_data["#3"] |
| 165 | + assert quality_metrics_2[metric][2] == metric_1_data["#9"] |
| 166 | + assert quality_metrics_2[metric][0] == metric_1_data["#4"] |
| 167 | + |
| 168 | + |
45 | 169 | def _sorting_analyzer_simple(): |
46 | 170 | recording, sorting = generate_ground_truth_recording( |
47 | 171 | durations=[ |
|
0 commit comments