-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcapping.py
More file actions
173 lines (152 loc) · 5.44 KB
/
capping.py
File metadata and controls
173 lines (152 loc) · 5.44 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable, List, Optional, Tuple
from urllib.request import urlretrieve
import numpy as np
from PIL import Image
# Configuration for frame extraction
VIDEO_EXTS = {".mp4", ".mov", ".mkv"}
FPS = 8 # export 8 frames per second to reduce duplicates and keep outputs lean
JPEG_QUALITY = 2 # lower is better quality for ffmpeg qscale (2 is near-lossless)
FACE_SAMPLE_STRIDE = 3 # analyze every Nth frame for face crops
MAX_FACE_CROPS = 200 # per video cap to avoid explosion
FACE_PAD_RATIO = 0.2 # 20% padding around detected box
FACE_MODEL_URL = "https://github.com/YapaLab/yolo-face/releases/download/v0.0.0/yolov12n-face.pt"
FACE_MODEL_PATH = Path(__file__).resolve().parent / "_system" / "models" / "face" / "yolov12n-face.pt"
@dataclass
class Box:
x1: float
y1: float
x2: float
y2: float
def iter_videos(root: Path) -> Iterable[Path]:
"""Yield all video files under root that match VIDEO_EXTS."""
for path in root.rglob("*"):
if path.is_file() and path.suffix.lower() in VIDEO_EXTS:
yield path
def cap_video(src: Path, out_dir: Path, fps: int = FPS, jpeg_quality: int = JPEG_QUALITY) -> None:
"""
Export frames from a single video to out_dir using ffmpeg.
Skips work if out_dir already has files.
"""
out_dir.mkdir(parents=True, exist_ok=True)
if any(out_dir.iterdir()):
print(f"[skip] frames already exist: {out_dir}")
return
output_pattern = out_dir / "%06d.jpg"
cmd = [
"ffmpeg",
"-loglevel",
"warning",
"-i",
str(src),
"-vf",
f"fps={fps}",
"-qscale:v",
str(jpeg_quality),
str(output_pattern),
]
print(f"[cap ] {src} -> {out_dir}")
subprocess.run(cmd, check=True)
def ensure_face_model() -> Optional[Path]:
FACE_MODEL_PATH.parent.mkdir(parents=True, exist_ok=True)
if FACE_MODEL_PATH.exists():
return FACE_MODEL_PATH
try:
print(f"[face] downloading face model to {FACE_MODEL_PATH}")
urlretrieve(FACE_MODEL_URL, FACE_MODEL_PATH)
return FACE_MODEL_PATH
except Exception as e:
print(f"[face] download failed: {e}")
return None
def load_face_detector():
try:
from ultralytics import YOLO
except Exception:
return None
model_path = ensure_face_model()
if model_path is None:
return None
try:
return YOLO(model_path)
except Exception as e:
print(f"[face] failed to load detector: {e}")
return None
def detect_faces(detector, image: Image.Image) -> list[Box]:
if detector is None:
return []
# YOLO expects numpy array
results = detector.predict(np.array(image), verbose=False)
boxes: list[Box] = []
for r in results:
if not hasattr(r, "boxes"):
continue
for b in r.boxes:
if b.conf is not None and float(b.conf) < 0.25:
continue
x1, y1, x2, y2 = b.xyxy[0].tolist()
boxes.append(Box(x1, y1, x2, y2))
return boxes
def crop_faces(detector, frames_dir: Path, stride: int = FACE_SAMPLE_STRIDE, max_crops: int = MAX_FACE_CROPS) -> int:
"""
Scan frames in frames_dir for faces (sampling every Nth frame) and write crops to frames_dir/face.
Returns number of crops written.
"""
face_dir = frames_dir / "face"
face_dir.mkdir(parents=True, exist_ok=True)
written = 0
frames = sorted(p for p in frames_dir.iterdir() if p.is_file() and p.suffix.lower() in {".jpg", ".jpeg", ".png", ".webp", ".bmp"})
for idx, frame_path in enumerate(frames):
if idx % stride != 0:
continue
if written >= max_crops:
break
try:
img = Image.open(frame_path)
except Exception:
continue
boxes = detect_faces(detector, img)
for b in boxes:
if written >= max_crops:
break
w, h = img.size
pad_x = (b.x2 - b.x1) * FACE_PAD_RATIO
pad_y = (b.y2 - b.y1) * FACE_PAD_RATIO
x1 = max(0, int(b.x1 - pad_x))
y1 = max(0, int(b.y1 - pad_y))
x2 = min(w, int(b.x2 + pad_x))
y2 = min(h, int(b.y2 + pad_y))
crop = img.crop((x1, y1, x2, y2))
out_name = face_dir / f"{frame_path.stem}_face{written+1}{frame_path.suffix.lower()}"
try:
crop.save(out_name)
written += 1
except Exception:
continue
return written
def cap_all(
source_root: Path,
capping_root: Path,
facecap: bool = False,
fps: int = FPS,
jpeg_quality: int = JPEG_QUALITY,
) -> List[Path]:
"""
Cap all videos under source_root into capping_root, mirroring the folder structure.
Returns list of produced frame directories.
"""
produced: List[Path] = []
detector = load_face_detector() if facecap else None
for video in sorted(iter_videos(source_root)):
rel_parent = video.parent.relative_to(source_root)
out_dir = capping_root / rel_parent / video.stem
cap_video(video, out_dir, fps=fps, jpeg_quality=jpeg_quality)
if facecap:
crops = crop_faces(detector, out_dir)
if crops:
print(f"[face] {video} -> {crops} crops")
produced.append(out_dir)
return produced
if __name__ == "__main__":
raise SystemExit("Use this module from workflow.py")