-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtriangulation_dev.py
More file actions
1111 lines (929 loc) · 57 KB
/
triangulation_dev.py
File metadata and controls
1111 lines (929 loc) · 57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
'''
###########################################################################
## ROBUST TRIANGULATION AND PARAMETER OPTIMIZATION OF 2D COORDINATES ##
###########################################################################
This module performs robust triangulation of 2D JSON coordinates to generate
a .trc file readable by OpenSim. It includes functionality to test and optimize
triangulation parameters, ensuring the best results for specific scenarios.
### Key Features:
1. **Weighted Triangulation**:
- Triangulation is weighted by the likelihood of each detected 2D keypoint.
- Points below a likelihood threshold are excluded.
2. **Reprojection Error Handling**:
- If the reprojection error exceeds a threshold, left and right sides are swapped.
- If the error persists, cameras are excluded iteratively until the threshold is met.
- If too many cameras are excluded, the point is skipped for the frame.
3. **Missing Value Interpolation**:
- Non-triangulated frames are filled using interpolation for continuity.
4. **Multi-Person Compatibility**:
- If multiple subjects are detected, the `personAssociation` module should
first be run to associate people across frames.
5. **Parameter Optimization (New Feature)**:
- Enables testing different triangulation parameters (e.g., reprojection error
threshold, likelihood threshold, minimum cameras).
- Results for each set of parameters are stored in a CSV file for analysis.
- Facilitates optimization to determine the best parameters for robust triangulation
in complex scenarios.
### INPUTS:
- A calibration file (.toml extension).
- JSON files for each camera, containing the detected 2D coordinates for one person.
- A `Config.toml` file with parameters for triangulation.
- A skeleton model defining the structure of the keypoints.
### OUTPUTS:
- A `.trc` file with 3D coordinates in the Y-up system.
- A CSV file (`parameters_optimisation_triangulation.csv`) with statistics for
each set of tested parameters, including reprojection errors, excluded cameras,
and sequence continuity.
### Usage:
This script is especially useful for optimizing the triangulation process in cases
where high accuracy is required, such as biomechanical analysis or motion tracking
in sports and clinical studies.
'''
## INIT
import os
import glob
import fnmatch
import re
import numpy as np
import json
import itertools as it
import pandas as pd
import cv2
import toml
from tqdm import tqdm
from scipy import interpolate
from collections import Counter
from anytree import RenderTree
from anytree.importer import DictImporter
import logging
import csv
from Pose2Sim.common import retrieve_calib_params, computeP, weighted_triangulation, \
reprojection, euclidean_distance, sort_stringlist_by_last_number, \
min_with_single_indices, zup2yup, convert_to_c3d
from Pose2Sim.skeletons import *
## AUTHORSHIP INFORMATION
__author__ = "David Pagnon"
__copyright__ = "Copyright 2021, Pose2Sim"
__credits__ = ["David Pagnon"]
__license__ = "BSD 3-Clause License"
__version__ = "0.9.4"
__maintainer__ = "David Pagnon"
__email__ = "contact@david-pagnon.com"
__status__ = "Development"
## FUNCTIONS
def interpolate_zeros_nans(col, *args):
'''
Interpolate missing points (of value zero),
unless more than N contiguous values are missing.
INPUTS:
- col: pandas column of coordinates
- args[0] = N: max number of contiguous bad values, above which they won't be interpolated
- args[1] = kind: 'linear', 'slinear', 'quadratic', 'cubic'. Default: 'cubic'
OUTPUT:
- col_interp: interpolated pandas column
'''
if len(args)==2:
N, kind = args
if len(args)==1:
N = np.inf
kind = args[0]
if not args:
N = np.inf
# Interpolate nans
mask = ~(np.isnan(col) | col.eq(0)) # true where nans or zeros
idx_good = np.where(mask)[0]
if len(idx_good) <= 4:
return col
if 'kind' not in locals(): # 'linear', 'slinear', 'quadratic', 'cubic'
f_interp = interpolate.interp1d(idx_good, col[idx_good], kind="linear", bounds_error=False)
else:
f_interp = interpolate.interp1d(idx_good, col[idx_good], kind=kind, fill_value='extrapolate', bounds_error=False)
col_interp = np.where(mask, col, f_interp(col.index)) #replace at false index with interpolated values
# Reintroduce nans if length of sequence > N
idx_notgood = np.where(~mask)[0]
gaps = np.where(np.diff(idx_notgood) > 1)[0] + 1 # where the indices of true are not contiguous
sequences = np.split(idx_notgood, gaps)
if sequences[0].size>0:
for seq in sequences:
if len(seq) > N: # values to exclude from interpolation are set to false when they are too long
col_interp[seq] = np.nan
return col_interp
def count_persons_in_json(file_path):
with open(file_path, 'r') as file:
data = json.load(file)
return len(data.get('people', []))
def sort_people(Q_kpt_old, Q_kpt):
'''
Associate persons across frames
Persons' indices are sometimes swapped when changing frame
A person is associated to another in the next frame when they are at a small distance
INPUTS:
- Q_kpt_old: list of arrays of 3D coordinates [X, Y, Z, 1.] for the previous frame
- Q_kpt: idem Q_kpt_old, for current frame
OUTPUT:
- Q_kpt_new: array with reordered persons
- personsIDs_sorted: index of reordered persons
'''
# Generate possible person correspondences across frames
if len(Q_kpt_old) < len(Q_kpt):
Q_kpt_old = np.concatenate((Q_kpt_old, [[0., 0., 0., 1.]]*(len(Q_kpt)-len(Q_kpt_old))))
if len(Q_kpt) < len(Q_kpt_old):
Q_kpt = np.concatenate((Q_kpt, [[0., 0., 0., 1.]]*(len(Q_kpt_old)-len(Q_kpt))))
personsIDs_comb = sorted(list(it.product(range(len(Q_kpt_old)),range(len(Q_kpt)))))
# Compute distance between persons from one frame to another
frame_by_frame_dist = []
for comb in personsIDs_comb:
frame_by_frame_dist += [euclidean_distance(Q_kpt_old[comb[0]],Q_kpt[comb[1]])]
frame_by_frame_dist = np.mean(frame_by_frame_dist, axis=1)
# sort correspondences by distance
minL, _, associated_tuples = min_with_single_indices(frame_by_frame_dist, personsIDs_comb)
# print('Distances :', minL)
# associate 3D points to same index across frames, nan if no correspondence
Q_kpt_new, personsIDs_sorted = [], []
for i in range(len(Q_kpt_old)):
id_in_old = associated_tuples[:,1][associated_tuples[:,0] == i].tolist()
# print('id_in_old ', i, id_in_old)
if len(id_in_old) > 0:
personsIDs_sorted += id_in_old
Q_kpt_new += [Q_kpt[id_in_old[0]]]
else:
personsIDs_sorted += [-1]
Q_kpt_new += [Q_kpt_old[i]]
return Q_kpt_new, personsIDs_sorted, associated_tuples
def make_trc(config_dict, Q, keypoints_names, f_range, id_person=-1):
'''
Make Opensim compatible trc file from a dataframe with 3D coordinates
INPUT:
- config_dict: dictionary of configuration parameters
- Q: pandas dataframe with 3D coordinates as columns, frame number as rows
- keypoints_names: list of strings
- f_range: list of two numbers. Range of frames
OUTPUT:
- trc file
'''
# Read config_dict
project_dir = config_dict.get('project').get('project_dir')
multi_person = config_dict.get('project').get('multi_person')
pose_model = config_dict.get('pose').get('pose_model')
if multi_person:
seq_name = f'{os.path.basename(os.path.realpath(project_dir))}_P{id_person+1}'
else:
seq_name = f'{os.path.basename(os.path.realpath(project_dir))}'
if pose_model == "CUSTOM":
pose3d_dir = os.path.join(project_dir, 'pose-3d-custom')
else:
pose3d_dir = os.path.join(project_dir, 'pose-3d')
#pose3d_dir = os.path.join(project_dir, 'pose-3d')
# Get frame_rate
video_dir = os.path.join(project_dir, 'videos')
vid_img_extension = config_dict['pose']['vid_img_extension']
video_files = glob.glob(os.path.join(video_dir, '*'+vid_img_extension))
frame_rate = config_dict.get('project').get('frame_rate')
if frame_rate == 'auto':
try:
cap = cv2.VideoCapture(video_files[0])
cap.read()
if cap.read()[0] == False:
raise
frame_rate = int(cap.get(cv2.CAP_PROP_FPS))
except:
frame_rate = 60
trc_f = f'{seq_name}_{f_range[0]}-{f_range[1]}.trc'
#Header
DataRate = CameraRate = OrigDataRate = frame_rate
NumFrames = len(Q)
NumMarkers = len(keypoints_names)
header_trc = ['PathFileType\t4\t(X/Y/Z)\t' + trc_f,
'DataRate\tCameraRate\tNumFrames\tNumMarkers\tUnits\tOrigDataRate\tOrigDataStartFrame\tOrigNumFrames',
'\t'.join(map(str,[DataRate, CameraRate, NumFrames, NumMarkers, 'm', OrigDataRate, f_range[0], f_range[1]])),
'Frame#\tTime\t' + '\t\t\t'.join(keypoints_names) + '\t\t',
'\t\t'+'\t'.join([f'X{i+1}\tY{i+1}\tZ{i+1}' for i in range(len(keypoints_names))])]
# Zup to Yup coordinate system
Q = zup2yup(Q)
#Add Frame# and Time columns
Q.index = np.array(range(f_range[0], f_range[1]))
Q.insert(0, 't', Q.index/ frame_rate)
# Q = Q.fillna(' ')
#Write file
if not os.path.exists(pose3d_dir): os.mkdir(pose3d_dir)
trc_path = os.path.realpath(os.path.join(pose3d_dir, trc_f))
with open(trc_path, 'w') as trc_o:
[trc_o.write(line+'\n') for line in header_trc]
Q.to_csv(trc_o, sep='\t', index=True, header=None, lineterminator='\n')
return trc_path
def retrieve_right_trc_order(trc_paths):
'''
Lets the user input which static file correspond to each generated trc file.
INPUT:
- trc_paths: list of strings
OUTPUT:
- trc_id: list of integers
'''
logging.info('\n\nReordering trc file IDs:')
logging.info(f'\nPlease visualize the generated trc files in Blender or OpenSim.\nTrc files are stored in {os.path.dirname(trc_paths[0])}.\n')
retry = True
while retry:
retry = False
logging.info('List of trc files:')
[logging.info(f'#{t_list}: {os.path.basename(trc_list)}') for t_list, trc_list in enumerate(trc_paths)]
trc_id = []
for t, trc_p in enumerate(trc_paths):
logging.info(f'\nStatic trial #{t} corresponds to trc number:')
trc_id += [input('Enter ID:')]
# Check non int and duplicates
try:
trc_id = [int(t) for t in trc_id]
duplicates_in_input = (len(trc_id) != len(set(trc_id)))
if duplicates_in_input:
retry = True
print('\n\nWARNING: Same ID entered twice: please check IDs again.\n')
except:
print('\n\nWARNING: The ID must be an integer: please check IDs again.\n')
retry = True
return trc_id
def recap_triangulate(config_dict, error_tot, nb_cams_excluded_tot, keypoints_names, cam_excluded_count, count_nan_frames_per_kpt, average_sequence_lengths_per_kpt, longest_sequence_lengths_per_kpt, interp_frames, non_interp_frames, trc_paths):
'''
Print a message giving statistics on reprojection errors (in pixel and in meters),
as well as the number of cameras that had to be excluded to reach threshold
conditions. Also stores results in User/logs.txt and parameters_optimisation_triangulation.csv.
INPUT:
- config_dict: Configuration dictionary parsed from `Config.toml`.
- error_tot: List of DataFrames containing reprojection errors for each person.
- nb_cams_excluded_tot: List of DataFrames with excluded camera counts.
- keypoints_names: List of keypoint names (strings).
- count_nan_frames_per_kpt: List of counts for frames with non-triangulated keypoints.
- average_sequence_lengths_per_kpt: List of average sequence lengths per keypoint.
- longest_sequence_lengths_per_kpt: List of longest sequence lengths per keypoint.
OUTPUT:
- Console logs with reprojection and triangulation statistics.
- CSV file with triangulation optimization parameters.
'''
# Read necessary paths and parameters from the config
project_dir = config_dict.get('project').get('project_dir')
session_dir = os.path.realpath(os.path.join(project_dir, '..'))
session_dir = session_dir if 'Config.toml' in os.listdir(session_dir) else os.getcwd()
# Locate the calibration directory and file
calib_dir = [os.path.join(session_dir, c) for c in os.listdir(session_dir) if os.path.isdir(os.path.join(session_dir, c)) and 'calib' in c.lower()][0]
calib_file = glob.glob(os.path.join(calib_dir, '*.toml'))[0]
calib = toml.load(calib_file)
# Read pose model and triangulation thresholds
pose_model = config_dict.get('pose').get('pose_model')
error_threshold_triangulation = config_dict.get('triangulation').get('reproj_error_threshold_triangulation')
likelihood_threshold = config_dict.get('triangulation').get('likelihood_threshold_triangulation')
min_cameras_value = config_dict.get('triangulation').get('min_cameras_for_triangulation')
# Set up the output directory for 3D poses
if pose_model == "CUSTOM":
pose_3d_dir = os.path.join(project_dir, 'pose-3d-custom')
else:
pose_3d_dir = os.path.join(project_dir, 'pose-3d')
os.makedirs(pose_3d_dir, exist_ok=True)
# Prepare the CSV file to store results
csv_file = os.path.join(pose_3d_dir, 'parameters_optimisation_triangulation.csv')
csv_headers = [
"Participant", "Keypoint", "Mean Error (px)", "Mean Error (m)",
"Excluded Cameras (avg)", "Frames Non-Triangulated",
"Non-Triangulated (%)", "Average Seq. Length", "Longest Seq. Length", "|",
"Reprojection Error Threshold (px)", "Likelihood Threshold", "Minimum Cameras for Triangulation"
]
if not os.path.exists(csv_file):
# Write headers if the file does not exist
with open(csv_file, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(csv_headers)
# Compute calibration matrix and translation for conversion to meters
calib_cam1 = calib[list(calib.keys())[0]]
fm = calib_cam1['matrix'][0][0] # Focal length
Dm = euclidean_distance(calib_cam1['translation'], [0, 0, 0]) # Distance to the origin
logging.info('') # Blank line for separation
nb_persons_to_detect = len(error_tot) # Number of participants
# Loop over each participant
for n in range(nb_persons_to_detect):
if nb_persons_to_detect > 1:
logging.info(f'\n\nPARTICIPANT {n+1}\n') # Log participant header
total_frames = len(error_tot[n]) # Total number of frames in the trial
# Dictionary to store statistics for all keypoints
all_keypoints_stats = {
"mean_error_px": [],
"mean_error_m": [],
"mean_cam_excluded": [],
"frames_non_triangulated": [],
"non_triangulated_ratio": [],
"avg_seq_length": [],
"max_seq_length": []
}
# Loop over each keypoint
for idx, name in enumerate(keypoints_names):
# Calculate per-keypoint statistics
mean_error_keypoint_px = np.around(error_tot[n].iloc[:, idx].mean(), decimals=1)
mean_error_keypoint_m = np.around(mean_error_keypoint_px * Dm / fm, decimals=3)
mean_cam_excluded_keypoint = np.around(nb_cams_excluded_tot[n].iloc[:, idx].mean(), decimals=2)
frames_non_triangulated = count_nan_frames_per_kpt[n][idx]
non_triangulated_ratio = np.around((frames_non_triangulated / total_frames) * 100, decimals=1)
avg_seq_length = np.around(average_sequence_lengths_per_kpt[n][idx], decimals=1)
max_seq_length = longest_sequence_lengths_per_kpt[n][idx]
# Store statistics for "All Keypoints" calculation
all_keypoints_stats["mean_error_px"].append(mean_error_keypoint_px)
all_keypoints_stats["mean_error_m"].append(mean_error_keypoint_m)
all_keypoints_stats["mean_cam_excluded"].append(mean_cam_excluded_keypoint)
all_keypoints_stats["frames_non_triangulated"].append(frames_non_triangulated)
all_keypoints_stats["non_triangulated_ratio"].append(non_triangulated_ratio)
all_keypoints_stats["avg_seq_length"].append(avg_seq_length)
all_keypoints_stats["max_seq_length"].append(max_seq_length)
# Log statistics for the current keypoint
logging.info(
f'Mean reprojection error for {name} is {mean_error_keypoint_px} px (~ {mean_error_keypoint_m} m), '
f'reached with {mean_cam_excluded_keypoint} excluded cameras, {frames_non_triangulated} frames non-triangulated ({non_triangulated_ratio}% of trial), '
f'average sequence length {avg_seq_length}, longest sequence {max_seq_length}.'
)
# Write statistics to the CSV file
with open(csv_file, mode='a', newline='') as file:
writer = csv.writer(file)
writer.writerow([
f"Participant {n+1}", name, mean_error_keypoint_px, mean_error_keypoint_m,
mean_cam_excluded_keypoint, frames_non_triangulated, non_triangulated_ratio,
avg_seq_length, max_seq_length, "|", error_threshold_triangulation, likelihood_threshold, min_cameras_value
])
# Compute overall statistics for "All Keypoints"
mean_error_px_all = np.mean(all_keypoints_stats["mean_error_px"])
mean_error_m_all = np.mean(all_keypoints_stats["mean_error_m"])
mean_cam_excluded_all = np.mean(all_keypoints_stats["mean_cam_excluded"])
frames_non_triangulated_all = np.mean(all_keypoints_stats["frames_non_triangulated"])
non_triangulated_ratio_all = np.mean(all_keypoints_stats["non_triangulated_ratio"])
avg_seq_length_all = np.mean(all_keypoints_stats["avg_seq_length"])
max_seq_length_all = np.max(all_keypoints_stats["max_seq_length"])
# Log overall statistics
logging.info(
f'\n--> Mean reprojection error for all points is {mean_error_px_all:.1f} px (~ {mean_error_m_all:.3f} m), '
f'{frames_non_triangulated_all:.1f} frames non-triangulated ({non_triangulated_ratio_all:.1f}% of trial), '
f'average sequence length {avg_seq_length_all:.1f}, longest sequence {max_seq_length_all}.'
)
# Write overall statistics to the CSV file
with open(csv_file, mode='a', newline='') as file:
writer = csv.writer(file)
writer.writerow([
f"Participant {n+1}", "All Keypoints", mean_error_px_all, mean_error_m_all,
mean_cam_excluded_all, frames_non_triangulated_all, non_triangulated_ratio_all,
avg_seq_length_all, max_seq_length_all, "|", error_threshold_triangulation, likelihood_threshold, min_cameras_value
])
logging.info('\n\n') # Blank line at the end
def triangulation_from_best_cameras(config_dict, coords_2D_kpt, coords_2D_kpt_swapped, projection_matrices, calib_params):
'''
Triangulates 2D keypoint coordinates. If reprojection error is above threshold,
tries swapping left and right sides. If still above, removes a camera until error
is below threshold unless the number of remaining cameras is below a predefined number.
1. Creates subset with N cameras excluded
2. Tries all possible triangulations
3. Chooses the one with smallest reprojection error
If error too big, take off one more camera.
If then below threshold, retain result.
If better but still too big, take off one more camera.
INPUTS:
- a Config.toml file
- coords_2D_kpt: (x,y,likelihood) * ncams array
- coords_2D_kpt_swapped: (x,y,likelihood) * ncams array with left/right swap
- projection_matrices: list of arrays
OUTPUTS:
- Q: array of triangulated point (x,y,z,1.)
- error_min: float
- nb_cams_excluded: int
'''
# Read config_dict
error_threshold_triangulation = config_dict.get('triangulation').get('reproj_error_threshold_triangulation')
min_cameras_for_triangulation = config_dict.get('triangulation').get('min_cameras_for_triangulation')
handle_LR_swap = config_dict.get('triangulation').get('handle_LR_swap')
undistort_points = config_dict.get('triangulation').get('undistort_points')
if undistort_points:
calib_params_K = calib_params['K']
calib_params_dist = calib_params['dist']
calib_params_R = calib_params['R']
calib_params_T = calib_params['T']
# Initialize
x_files, y_files, likelihood_files = coords_2D_kpt
x_files_swapped, y_files_swapped, likelihood_files_swapped = coords_2D_kpt_swapped
n_cams = len(x_files)
error_min = np.inf
nb_cams_off = 0 # cameras will be taken-off until reprojection error is under threshold
# print('\n')
while error_min > error_threshold_triangulation and n_cams - nb_cams_off >= min_cameras_for_triangulation:
# print("error min ", error_min, "thresh ", error_threshold_triangulation, 'nb_cams_off ', nb_cams_off)
# Create subsets with "nb_cams_off" cameras excluded
id_cams_off = np.array(list(it.combinations(range(n_cams), nb_cams_off)))
if undistort_points:
calib_params_K_filt = [calib_params_K]*len(id_cams_off)
calib_params_dist_filt = [calib_params_dist]*len(id_cams_off)
calib_params_R_filt = [calib_params_R]*len(id_cams_off)
calib_params_T_filt = [calib_params_T]*len(id_cams_off)
projection_matrices_filt = [projection_matrices]*len(id_cams_off)
x_files_filt = np.vstack([x_files.copy()]*len(id_cams_off))
y_files_filt = np.vstack([y_files.copy()]*len(id_cams_off))
x_files_swapped_filt = np.vstack([x_files_swapped.copy()]*len(id_cams_off))
y_files_swapped_filt = np.vstack([y_files_swapped.copy()]*len(id_cams_off))
likelihood_files_filt = np.vstack([likelihood_files.copy()]*len(id_cams_off))
if nb_cams_off > 0:
for i in range(len(id_cams_off)):
x_files_filt[i][id_cams_off[i]] = np.nan
y_files_filt[i][id_cams_off[i]] = np.nan
x_files_swapped_filt[i][id_cams_off[i]] = np.nan
y_files_swapped_filt[i][id_cams_off[i]] = np.nan
likelihood_files_filt[i][id_cams_off[i]] = np.nan
# Excluded cameras index and count
id_cams_off_tot_new = [np.argwhere(np.isnan(x)).ravel() for x in likelihood_files_filt]
nb_cams_excluded_filt = [np.count_nonzero(np.nan_to_num(x)==0) for x in likelihood_files_filt] # count nans and zeros
nb_cams_off_tot = max(nb_cams_excluded_filt)
# print('likelihood_files_filt ',likelihood_files_filt)
# print('nb_cams_excluded_filt ', nb_cams_excluded_filt, 'nb_cams_off_tot ', nb_cams_off_tot)
if nb_cams_off_tot > n_cams - min_cameras_for_triangulation:
break
id_cams_off_tot = id_cams_off_tot_new
# print('still in loop')
if undistort_points:
calib_params_K_filt = [ [ c[i] for i in range(n_cams) if not np.isnan(likelihood_files_filt[j][i]) and not likelihood_files_filt[j][i]==0. ] for j, c in enumerate(calib_params_K_filt) ]
calib_params_dist_filt = [ [ c[i] for i in range(n_cams) if not np.isnan(likelihood_files_filt[j][i]) and not likelihood_files_filt[j][i]==0. ] for j, c in enumerate(calib_params_dist_filt) ]
calib_params_R_filt = [ [ c[i] for i in range(n_cams) if not np.isnan(likelihood_files_filt[j][i]) and not likelihood_files_filt[j][i]==0. ] for j, c in enumerate(calib_params_R_filt) ]
calib_params_T_filt = [ [ c[i] for i in range(n_cams) if not np.isnan(likelihood_files_filt[j][i]) and not likelihood_files_filt[j][i]==0. ] for j, c in enumerate(calib_params_T_filt) ]
projection_matrices_filt = [ [ p[i] for i in range(n_cams) if not np.isnan(likelihood_files_filt[j][i]) and not likelihood_files_filt[j][i]==0. ] for j, p in enumerate(projection_matrices_filt) ]
# print('\nnb_cams_off', repr(nb_cams_off), 'nb_cams_excluded', repr(nb_cams_excluded_filt))
# print('likelihood_files ', repr(likelihood_files))
# print('y_files ', repr(y_files))
# print('x_files ', repr(x_files))
# print('x_files_swapped ', repr(x_files_swapped))
# print('likelihood_files_filt ', repr(likelihood_files_filt))
# print('x_files_filt ', repr(x_files_filt))
# print('id_cams_off_tot ', id_cams_off_tot)
x_files_filt = [ np.array([ xx for ii, xx in enumerate(x) if not np.isnan(likelihood_files_filt[i][ii]) and not likelihood_files_filt[i][ii]==0. ]) for i,x in enumerate(x_files_filt) ]
y_files_filt = [ np.array([ xx for ii, xx in enumerate(x) if not np.isnan(likelihood_files_filt[i][ii]) and not likelihood_files_filt[i][ii]==0. ]) for i,x in enumerate(y_files_filt) ]
x_files_swapped_filt = [ np.array([ xx for ii, xx in enumerate(x) if not np.isnan(likelihood_files_filt[i][ii]) and not likelihood_files_filt[i][ii]==0. ]) for i,x in enumerate(x_files_swapped_filt) ]
y_files_swapped_filt = [ np.array([ xx for ii, xx in enumerate(x) if not np.isnan(likelihood_files_filt[i][ii]) and not likelihood_files_filt[i][ii]==0. ]) for i,x in enumerate(y_files_swapped_filt) ]
likelihood_files_filt = [ np.array([ xx for ii, xx in enumerate(x) if not np.isnan(xx) and not xx==0. ]) for x in likelihood_files_filt ]
# print('y_files_filt ', repr(y_files_filt))
# print('x_files_filt ', repr(x_files_filt))
# Triangulate 2D points
Q_filt = [weighted_triangulation(projection_matrices_filt[i], x_files_filt[i], y_files_filt[i], likelihood_files_filt[i]) for i in range(len(id_cams_off))]
# Reprojection
if undistort_points:
coords_2D_kpt_calc_filt = [np.array([cv2.projectPoints(np.array(Q_filt[i][:-1]), calib_params_R_filt[i][j], calib_params_T_filt[i][j], calib_params_K_filt[i][j], calib_params_dist_filt[i][j])[0].ravel()
for j in range(n_cams-nb_cams_excluded_filt[i])])
for i in range(len(id_cams_off))]
coords_2D_kpt_calc_filt = [[coords_2D_kpt_calc_filt[i][:,0], coords_2D_kpt_calc_filt[i][:,1]] for i in range(len(id_cams_off))]
else:
coords_2D_kpt_calc_filt = [reprojection(projection_matrices_filt[i], Q_filt[i]) for i in range(len(id_cams_off))]
coords_2D_kpt_calc_filt = np.array(coords_2D_kpt_calc_filt, dtype=object)
x_calc_filt = coords_2D_kpt_calc_filt[:,0]
# print('x_calc_filt ', x_calc_filt)
y_calc_filt = coords_2D_kpt_calc_filt[:,1]
# Reprojection error
error = []
for config_off_id in range(len(x_calc_filt)):
q_file = [(x_files_filt[config_off_id][i], y_files_filt[config_off_id][i]) for i in range(len(x_files_filt[config_off_id]))]
q_calc = [(x_calc_filt[config_off_id][i], y_calc_filt[config_off_id][i]) for i in range(len(x_calc_filt[config_off_id]))]
error.append( np.mean( [euclidean_distance(q_file[i], q_calc[i]) for i in range(len(q_file))] ) )
# print('error ', error)
# Choosing best triangulation (with min reprojection error)
# print('\n', error)
# print('len(error) ', len(error))
# print('len(x_calc_filt) ', len(x_calc_filt))
# print('len(likelihood_files_filt) ', len(likelihood_files_filt))
# print('len(id_cams_off_tot) ', len(id_cams_off_tot))
# print('min error ', np.nanmin(error))
# print('argmin error ', np.nanargmin(error))
error_min = np.nanmin(error)
# print(error_min)
best_cams = np.nanargmin(error)
nb_cams_excluded = nb_cams_excluded_filt[best_cams]
Q = Q_filt[best_cams][:-1]
# Swap left and right sides if reprojection error still too high
if handle_LR_swap and error_min > error_threshold_triangulation:
# print('handle')
n_cams_swapped = 1
error_off_swap_min = error_min
while error_off_swap_min > error_threshold_triangulation and n_cams_swapped < (n_cams - nb_cams_off_tot) / 2: # more than half of the cameras switched: may triangulate twice the same side
# print('SWAP: nb_cams_off ', nb_cams_off, 'n_cams_swapped ', n_cams_swapped, 'nb_cams_off_tot ', nb_cams_off_tot)
# Create subsets
id_cams_swapped = np.array(list(it.combinations(range(n_cams-nb_cams_off_tot), n_cams_swapped)))
# print('id_cams_swapped ', id_cams_swapped)
x_files_filt_off_swap = [[x] * len(id_cams_swapped) for x in x_files_filt]
y_files_filt_off_swap = [[y] * len(id_cams_swapped) for y in y_files_filt]
# print('x_files_filt_off_swap ', x_files_filt_off_swap)
# print('y_files_filt_off_swap ', y_files_filt_off_swap)
for id_off in range(len(id_cams_off)): # for each configuration with nb_cams_off_tot removed
for id_swapped, config_swapped in enumerate(id_cams_swapped): # for each of these configurations, test all subconfigurations with with n_cams_swapped swapped
# print('id_off ', id_off, 'id_swapped ', id_swapped, 'config_swapped ', config_swapped)
x_files_filt_off_swap[id_off][id_swapped][config_swapped] = x_files_swapped_filt[id_off][config_swapped]
y_files_filt_off_swap[id_off][id_swapped][config_swapped] = y_files_swapped_filt[id_off][config_swapped]
# Triangulate 2D points
Q_filt_off_swap = np.array([[weighted_triangulation(projection_matrices_filt[id_off], x_files_filt_off_swap[id_off][id_swapped], y_files_filt_off_swap[id_off][id_swapped], likelihood_files_filt[id_off])
for id_swapped in range(len(id_cams_swapped))]
for id_off in range(len(id_cams_off))] )
# Reprojection
if undistort_points:
coords_2D_kpt_calc_off_swap = [np.array([[cv2.projectPoints(np.array(Q_filt_off_swap[id_off][id_swapped][:-1]), calib_params_R_filt[id_off][j], calib_params_T_filt[id_off][j], calib_params_K_filt[id_off][j], calib_params_dist_filt[id_off][j])[0].ravel()
for j in range(n_cams-nb_cams_off_tot)]
for id_swapped in range(len(id_cams_swapped))])
for id_off in range(len(id_cams_off))]
coords_2D_kpt_calc_off_swap = np.array([[[coords_2D_kpt_calc_off_swap[id_off][id_swapped,:,0], coords_2D_kpt_calc_off_swap[id_off][id_swapped,:,1]]
for id_swapped in range(len(id_cams_swapped))]
for id_off in range(len(id_cams_off))])
else:
coords_2D_kpt_calc_off_swap = [np.array([reprojection(projection_matrices_filt[id_off], Q_filt_off_swap[id_off][id_swapped])
for id_swapped in range(len(id_cams_swapped))])
for id_off in range(len(id_cams_off))]
# print(repr(coords_2D_kpt_calc_off_swap))
x_calc_off_swap = [c[:,0] for c in coords_2D_kpt_calc_off_swap]
y_calc_off_swap = [c[:,1] for c in coords_2D_kpt_calc_off_swap]
# Reprojection error
# print('x_files_filt_off_swap ', x_files_filt_off_swap)
# print('x_calc_off_swap ', x_calc_off_swap)
error_off_swap = []
for id_off in range(len(id_cams_off)):
error_percam = []
for id_swapped, config_swapped in enumerate(id_cams_swapped):
# print(id_off,id_swapped,n_cams,nb_cams_off)
# print(repr(x_files_filt_off_swap))
q_file_off_swap = [(x_files_filt_off_swap[id_off][id_swapped][i], y_files_filt_off_swap[id_off][id_swapped][i]) for i in range(n_cams - nb_cams_off_tot)]
q_calc_off_swap = [(x_calc_off_swap[id_off][id_swapped][i], y_calc_off_swap[id_off][id_swapped][i]) for i in range(n_cams - nb_cams_off_tot)]
error_percam.append( np.mean( [euclidean_distance(q_file_off_swap[i], q_calc_off_swap[i]) for i in range(len(q_file_off_swap))] ) )
error_off_swap.append(error_percam)
error_off_swap = np.array(error_off_swap)
# print('error_off_swap ', error_off_swap)
# Choosing best triangulation (with min reprojection error)
error_off_swap_min = np.min(error_off_swap)
best_off_swap_config = np.unravel_index(error_off_swap.argmin(), error_off_swap.shape)
id_off_cams = best_off_swap_config[0]
id_swapped_cams = id_cams_swapped[best_off_swap_config[1]]
Q_best = Q_filt_off_swap[best_off_swap_config][:-1]
n_cams_swapped += 1
if error_off_swap_min < error_min:
error_min = error_off_swap_min
best_cams = id_off_cams
Q = Q_best
# print(error_min)
nb_cams_off += 1
# Index of excluded cams for this keypoint
# print('Loop ended')
if 'best_cams' in locals():
# print(id_cams_off_tot)
# print('len(id_cams_off_tot) ', len(id_cams_off_tot))
# print('id_cams_off_tot ', id_cams_off_tot)
id_excluded_cams = id_cams_off_tot[best_cams]
# print('id_excluded_cams ', id_excluded_cams)
else:
id_excluded_cams = list(range(n_cams))
nb_cams_excluded = n_cams
# print('id_excluded_cams ', id_excluded_cams)
# If triangulation not successful, error = nan, and 3D coordinates as missing values
if error_min > error_threshold_triangulation:
error_min = np.nan
Q = np.array([np.nan, np.nan, np.nan])
return Q, error_min, nb_cams_excluded, id_excluded_cams
def extract_files_frame_f(json_tracked_files_f, keypoints_ids, nb_persons_to_detect):
'''
Extract data from json files for frame f,
in the order of the body model hierarchy.
INPUTS:
- json_tracked_files_f: list of str. Paths of json_files for frame f.
- keypoints_ids: list of int. Keypoints IDs in the order of the hierarchy.
- nb_persons_to_detect: int
OUTPUTS:
- x_files, y_files, likelihood_files: [[[list of coordinates] * n_cams ] * nb_persons_to_detect]
'''
n_cams = len(json_tracked_files_f)
x_files = [[] for n in range(nb_persons_to_detect)]
y_files = [[] for n in range(nb_persons_to_detect)]
likelihood_files = [[] for n in range(nb_persons_to_detect)]
for n in range(nb_persons_to_detect):
for cam_nb in range(n_cams):
x_files_cam, y_files_cam, likelihood_files_cam = [], [], []
try:
with open(json_tracked_files_f[cam_nb], 'r') as json_f:
js = json.load(json_f)
for keypoint_id in keypoints_ids:
try:
x_files_cam.append( js['people'][n]['pose_keypoints_2d'][keypoint_id*3] )
y_files_cam.append( js['people'][n]['pose_keypoints_2d'][keypoint_id*3+1] )
likelihood_files_cam.append( js['people'][n]['pose_keypoints_2d'][keypoint_id*3+2] )
except:
x_files_cam.append( np.nan )
y_files_cam.append( np.nan )
likelihood_files_cam.append( np.nan )
except:
x_files_cam = [np.nan] * len(keypoints_ids)
y_files_cam = [np.nan] * len(keypoints_ids)
likelihood_files_cam = [np.nan] * len(keypoints_ids)
x_files[n].append(x_files_cam)
y_files[n].append(y_files_cam)
likelihood_files[n].append(likelihood_files_cam)
x_files = np.array(x_files)
y_files = np.array(y_files)
likelihood_files = np.array(likelihood_files)
return x_files, y_files, likelihood_files
# Function to calculate the average and maximum length of contiguous sequences in a frame array
def calculate_sequence_stats(frame_array):
"""
Calculate the average and maximum length of contiguous sequences of frames in a given array.
INPUT:
- frame_array: A numpy array containing frame indices.
OUTPUT:
- mean_length: Average length of contiguous frame sequences.
- max_length: Maximum length of any contiguous frame sequence.
If the input array is empty, the function returns 0 for both the mean and max lengths.
"""
# Return 0 for both statistics if the array is empty
if len(frame_array) == 0:
return 0, 0
# Calculate differences between consecutive elements to find gaps
diffs = np.diff(frame_array)
# Identify the indices where the difference is not equal to 1 (indicating a break in continuity)
breaks = np.where(diffs != 1)[0]
# Calculate the lengths of contiguous sequences
# Use concatenation to account for the start and end of the array
sequence_lengths = np.diff(np.concatenate(([-1], breaks, [len(frame_array) - 1])))
# Calculate the mean length of sequences
mean_length = sequence_lengths.mean()
# Find the maximum sequence length, or return 0 if there are no sequences
max_length = sequence_lengths.max() if len(sequence_lengths) > 0 else 0
return mean_length, max_length
def triangulate_all(config_dict):
'''
For each frame
For each keypoint
- Triangulate keypoint
- Reproject it on all cameras
- Take off cameras until requirements are met
Interpolate missing values
Create trc file
Print recap message
INPUTS:
- a calibration file (.toml extension)
- json files for each camera with indices matching the detected persons
- a Config.toml file
- a skeleton model
OUTPUTS:
- a .trc file with 3D coordinates in Y-up system coordinates
'''
# Read config_dict
project_dir = config_dict.get('project').get('project_dir')
# if batch
session_dir = os.path.realpath(os.path.join(project_dir, '..'))
# if single trial
session_dir = session_dir if 'Config.toml' in os.listdir(session_dir) else os.getcwd()
multi_person = config_dict.get('project').get('multi_person')
pose_model = config_dict.get('pose').get('pose_model')
frame_range = config_dict.get('project').get('frame_range')
likelihood_threshold = config_dict.get('triangulation').get('likelihood_threshold_triangulation')
interpolation_kind = config_dict.get('triangulation').get('interpolation')
interp_gap_smaller_than = config_dict.get('triangulation').get('interp_if_gap_smaller_than')
fill_large_gaps_with = config_dict.get('triangulation').get('fill_large_gaps_with')
show_interp_indices = config_dict.get('triangulation').get('show_interp_indices')
undistort_points = config_dict.get('triangulation').get('undistort_points')
make_c3d = config_dict.get('triangulation').get('make_c3d')
try:
calib_dir = [os.path.join(session_dir, c) for c in os.listdir(session_dir) if os.path.isdir(os.path.join(session_dir, c)) and 'calib' in c.lower()][0]
except:
raise Exception(f'No .toml calibration direcctory found.')
try:
calib_file = glob.glob(os.path.join(calib_dir, '*.toml'))[0] # lastly created calibration file
except:
raise Exception(f'No .toml calibration file found in the {calib_dir}.')
if pose_model == "CUSTOM":
pose_dir = os.path.join(project_dir, 'pose-custom')
poseTracked_dir = os.path.join(project_dir, 'pose-associated-custom')
else:
pose_dir = os.path.join(project_dir, 'pose')
poseTracked_dir = os.path.join(project_dir, 'pose-associated')
#pose_dir = os.path.join(project_dir, 'pose')
poseSync_dir = os.path.join(project_dir, 'pose-sync')
#poseTracked_dir = os.path.join(project_dir, 'pose-associated')
# Projection matrix from toml calibration file
P = computeP(calib_file, undistort=undistort_points)
calib_params = retrieve_calib_params(calib_file)
# Retrieve keypoints from model
try: # from skeletons.py
model = eval(pose_model)
except:
try: # from Config.toml
model = DictImporter().import_(config_dict.get('pose').get(pose_model))
if model.id == 'None':
model.id = None
except:
raise NameError('Model not found in skeletons.py nor in Config.toml')
keypoints_ids = [node.id for _, _, node in RenderTree(model) if node.id!=None]
keypoints_names = [node.name for _, _, node in RenderTree(model) if node.id!=None]
keypoints_idx = list(range(len(keypoints_ids)))
keypoints_nb = len(keypoints_ids)
# for pre, _, node in RenderTree(model):
# print(f'{pre}{node.name} id={node.id}')
# left/right swapped keypoints
keypoints_names_swapped = [keypoint_name.replace('R', 'L') if keypoint_name.startswith('R') else keypoint_name.replace('L', 'R') if keypoint_name.startswith('L') else keypoint_name for keypoint_name in keypoints_names]
keypoints_names_swapped = [keypoint_name_swapped.replace('right', 'left') if keypoint_name_swapped.startswith('right') else keypoint_name_swapped.replace('left', 'right') if keypoint_name_swapped.startswith('left') else keypoint_name_swapped for keypoint_name_swapped in keypoints_names_swapped]
keypoints_idx_swapped = [keypoints_names.index(keypoint_name_swapped) for keypoint_name_swapped in keypoints_names_swapped] # find index of new keypoint_name
# 2d-pose files selection
try:
pose_listdirs_names = next(os.walk(pose_dir))[1]
os.listdir(os.path.join(pose_dir, pose_listdirs_names[0]))[0]
except:
raise ValueError(f'No json files found in {pose_dir} subdirectories. Make sure you run Pose2Sim.poseEstimation() first.')
pose_listdirs_names = sort_stringlist_by_last_number(pose_listdirs_names)
json_dirs_names = [k for k in pose_listdirs_names if 'json' in k]
n_cams = len(json_dirs_names)
try:
json_files_names = [fnmatch.filter(os.listdir(os.path.join(poseTracked_dir, js_dir)), '*.json') for js_dir in json_dirs_names]
pose_dir = poseTracked_dir
except:
try:
json_files_names = [fnmatch.filter(os.listdir(os.path.join(poseSync_dir, js_dir)), '*.json') for js_dir in json_dirs_names]
pose_dir = poseSync_dir
except:
try:
json_files_names = [fnmatch.filter(os.listdir(os.path.join(pose_dir, js_dir)), '*.json') for js_dir in json_dirs_names]
except:
raise Exception(f'No json files found in {pose_dir}, {poseSync_dir}, nor {poseTracked_dir} subdirectories. Make sure you run Pose2Sim.poseEstimation() first.')
json_files_names = [sort_stringlist_by_last_number(js) for js in json_files_names]
# frame range selection
f_range = [[0,max([len(j) for j in json_files_names])] if frame_range==[] else frame_range][0]
frame_nb = f_range[1] - f_range[0]
# Check that camera number is consistent between calibration file and pose folders
if n_cams != len(P):
raise Exception(f'Error: The number of cameras is not consistent:\
Found {len(P)} cameras in the calibration file,\
and {n_cams} cameras based on the number of pose folders.')
# Triangulation
if multi_person:
nb_persons_to_detect = max(max(count_persons_in_json(os.path.join(pose_dir, json_dirs_names[c], json_fname)) for json_fname in json_files_names[c]) for c in range(n_cams))
else:
nb_persons_to_detect = 1
Q = [[[np.nan]*3]*keypoints_nb for n in range(nb_persons_to_detect)]
Q_old = [[[np.nan]*3]*keypoints_nb for n in range(nb_persons_to_detect)]
error = [[] for n in range(nb_persons_to_detect)]
nb_cams_excluded = [[] for n in range(nb_persons_to_detect)]
id_excluded_cams = [[] for n in range(nb_persons_to_detect)]
Q_tot, error_tot, nb_cams_excluded_tot,id_excluded_cams_tot = [], [], [], []
for f in tqdm(range(*f_range)):
# print(f'\nFrame {f}:')
# Get x,y,likelihood values from files
json_files_names_f = [[j for j in json_files_names[c] if int(re.split(r'(\d+)',j)[-2])==f] for c in range(n_cams)]
json_files_names_f = [j for j_list in json_files_names_f for j in (j_list or ['none'])]
json_files_f = [os.path.join(pose_dir, json_dirs_names[c], json_files_names_f[c]) for c in range(n_cams)]
x_files, y_files, likelihood_files = extract_files_frame_f(json_files_f, keypoints_ids, nb_persons_to_detect)
# [[[list of coordinates] * n_cams ] * nb_persons_to_detect]
# vs. [[list of coordinates] * n_cams ]
# undistort points
if undistort_points:
for n in range(nb_persons_to_detect):
points = [np.array(tuple(zip(x_files[n][i],y_files[n][i]))).reshape(-1, 1, 2).astype('float32') for i in range(n_cams)]
undistorted_points = [cv2.undistortPoints(points[i], calib_params['K'][i], calib_params['dist'][i], None, calib_params['optim_K'][i]) for i in range(n_cams)]
x_files[n] = np.array([[u[i][0][0] for i in range(len(u))] for u in undistorted_points])
y_files[n] = np.array([[u[i][0][1] for i in range(len(u))] for u in undistorted_points])
# This is good for slight distortion. For fisheye camera, the model does not work anymore. See there for an example https://github.com/lambdaloop/aniposelib/blob/d03b485c4e178d7cff076e9fe1ac36837db49158/aniposelib/cameras.py#L301
# Replace likelihood by 0 if under likelihood_threshold
with np.errstate(invalid='ignore'):
for n in range(nb_persons_to_detect):
x_files[n][likelihood_files[n] < likelihood_threshold] = np.nan
y_files[n][likelihood_files[n] < likelihood_threshold] = np.nan
likelihood_files[n][likelihood_files[n] < likelihood_threshold] = np.nan
# Q_old = Q except when it has nan, otherwise it takes the Q_old value
nan_mask = np.isnan(Q)
Q_old = np.where(nan_mask, Q_old, Q)
Q = [[] for n in range(nb_persons_to_detect)]
error = [[] for n in range(nb_persons_to_detect)]
nb_cams_excluded = [[] for n in range(nb_persons_to_detect)]
id_excluded_cams = [[] for n in range(nb_persons_to_detect)]
for n in range(nb_persons_to_detect):
for keypoint_idx in keypoints_idx:
# keypoints_nb = 2
# for keypoint_idx in range(2):
# Triangulate cameras with min reprojection error
# print('\n', keypoints_names[keypoint_idx])
coords_2D_kpt = np.array( (x_files[n][:, keypoint_idx], y_files[n][:, keypoint_idx], likelihood_files[n][:, keypoint_idx]) )
coords_2D_kpt_swapped = np.array(( x_files[n][:, keypoints_idx_swapped[keypoint_idx]], y_files[n][:, keypoints_idx_swapped[keypoint_idx]], likelihood_files[n][:, keypoints_idx_swapped[keypoint_idx]] ))
Q_kpt, error_kpt, nb_cams_excluded_kpt, id_excluded_cams_kpt = triangulation_from_best_cameras(config_dict, coords_2D_kpt, coords_2D_kpt_swapped, P, calib_params) # P has been modified if undistort_points=True
Q[n].append(Q_kpt)
error[n].append(error_kpt)
nb_cams_excluded[n].append(nb_cams_excluded_kpt)
id_excluded_cams[n].append(id_excluded_cams_kpt)
if multi_person:
# reID persons across frames by checking the distance from one frame to another
# print('Q before ordering ', np.array(Q)[:,:2])
if f !=0:
Q, personsIDs_sorted, associated_tuples = sort_people(Q_old, Q)
# print('Q after ordering ', personsIDs_sorted, associated_tuples, np.array(Q)[:,:2])
error_sorted, nb_cams_excluded_sorted, id_excluded_cams_sorted = [], [], []
for i in range(len(Q)):
id_in_old = associated_tuples[:,1][associated_tuples[:,0] == i].tolist()
if len(id_in_old) > 0:
personsIDs_sorted += id_in_old
error_sorted += [error[id_in_old[0]]]
nb_cams_excluded_sorted += [nb_cams_excluded[id_in_old[0]]]
id_excluded_cams_sorted += [id_excluded_cams[id_in_old[0]]]
else:
personsIDs_sorted += [-1]
error_sorted += [error[i]]
nb_cams_excluded_sorted += [nb_cams_excluded[i]]
id_excluded_cams_sorted += [id_excluded_cams[i]]
error, nb_cams_excluded, id_excluded_cams = error_sorted, nb_cams_excluded_sorted, id_excluded_cams_sorted
# TODO: if distance > threshold, new person
# Add triangulated points, errors and excluded cameras to pandas dataframes
Q_tot.append([np.concatenate(Q[n]) for n in range(nb_persons_to_detect)])
error_tot.append([error[n] for n in range(nb_persons_to_detect)])
nb_cams_excluded_tot.append([nb_cams_excluded[n] for n in range(nb_persons_to_detect)])
id_excluded_cams = [[id_excluded_cams[n][k] for k in range(keypoints_nb)] for n in range(nb_persons_to_detect)]
id_excluded_cams_tot.append(id_excluded_cams)
# fill values for if a person that was not initially detected has entered the frame
Q_tot = [list(tpl) for tpl in zip(*it.zip_longest(*Q_tot, fillvalue=[np.nan]*keypoints_nb*3))]
error_tot = [list(tpl) for tpl in zip(*it.zip_longest(*error_tot, fillvalue=[np.nan]*keypoints_nb*3))]
nb_cams_excluded_tot = [list(tpl) for tpl in zip(*it.zip_longest(*nb_cams_excluded_tot, fillvalue=[np.nan]*keypoints_nb*3))]
id_excluded_cams_tot = [list(tpl) for tpl in zip(*it.zip_longest(*id_excluded_cams_tot, fillvalue=[np.nan]*keypoints_nb*3))]