diff --git a/README.md b/README.md index 454d8e1..e8e501a 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,47 @@ Want to predict the result of a wheel spin before the winner is announced? Use t ![preview.gif](./preview.gif) +## How it works + +The solution is entirely built on collected heuristics and on standard human behavioral patterns. The implementation relies on screen parsing via OpenCV, text recognition via Tesseract OCR engine and basic math computations. + +The program workflow: + +1. Detecting the initial spin frame + + - The program processes frames sequentially until it finds one that meets the following criteria: + - The current frame contains a wheel (identified as a large circle) and has the text "Winner" above it. + - The next frame shows the wheel in motion (determined by comparing the angular difference between frames). + +2. Determining the total spin duration + + - The simplest approach is detecting a user-entered duration on the screen (by parsing an image and extracting a numerical value) + - If the duration box is unrecognized, hidden, or spin duration is randomized, a mathematical approach is applied. The program estimates the duration based on observed wheel movement. The wheel spin follows a custom easing function implemented with [GSAP](https://gsap.com/docs/v3/Eases/CustomEase/), which optimizes Bézier curves by generating interpolated `x` and `y` values for improved calculation accuracy. However, this implementation introduces slight deviations near the interpolated points. These deviations help eliminate unsuitable duration candidates, typically narrowing the range to a 1-4 second window. Post-processing further refines the estimate. This approximation is sufficient for our needs at this stage + - The fallback strategy: if all else fails, the program prompts the user for input + +3. Calculating the destination angle + + Picking a frame from the spinning sequence: + + - $x_{i} = \frac{d_{i}}{d}$, where `dᵢ` is the elapsed time, and `d` is the total spin duration + - $y_{i} = \frac{a_{i}}{a}$, where `aᵢ` is the elapsed angular displacement (including full rotations) from initial state, and `a` is the target angle + - $y_{i} = bezier(x_{i})$, where the `bezier` function maps the elapsed time to the corresponding elapsed angle, both normalized to a scale from 0 to 1 + - The target angle is computed as: $a = \frac{a_{i}}{bezier(\frac{d_{i}}{d})}$ + +4. Collecting lot names and their positions on the wheel + + - The initial frame is used to determine sector boundaries (collected in format start_angle and end_angle) + - The initial wheel spin is analyzed to extract sector names (obtained from the text displayed above the wheel) + +5. Refining calculations for greater accuracy + + - Avoiding early-stage angle measurements due to high error margins + - Smoothing data by filtering out spikes, duplicates, and inconsistencies + - Discarding invalid duration candidates based on discrepancies between computed and expected target angles + - Converting the elliptical wheel projection to a circular model for more precise measurements (not yet implemented) + - Extending the spin analysis window to improve text recognition of lot names + - Implementing a voting system to enhance accuracy in determining the winning sector + ## Installation 1. Install the project: @@ -34,7 +75,7 @@ Want to predict the result of a wheel spin before the winner is announced? Use t ## Run -Run the program before the wheel spin starts: +Run the program before the wheel starts to spin: ```shell @@ -42,6 +83,9 @@ streamlink --twitch-low-latency --stdout best | python main.py wi # or you can use a shorthand ./utils run + +# or you can analyze your video snippet +cat vod.ts | python main.py winner ``` ## Lint diff --git a/config/__init__.py b/config/__init__.py index aff6b92..d91ddd2 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,19 +1,18 @@ from .config import ( - ANGLE_WINDOW_LEN, - ASK_LENGTH, + ANGLE_WINDOW_SIZE, + PROMPT_FOR_DURATION, CALCULATION_STEP, - EXCLUDE_SECONDS_RANGE, - FRAMES_STEP_FOR_LENGTH_DETECTION, - LOOK_BEHIND_FRAMES_FOR_LENGTH_DETECTION, - MAX_MEAN_ANGLE_DELTA, - MIN_SKIP_OF_WHEEL_SPIN, - MIN_SKIP_SEC, - NASTY_OPTIMIZATION, + DURATION_DETECTION_TIME_RANGE, + DURATION_DETECTION_FRAME_STEP, + MAX_ANGLE_DELTA, + MIN_WHEEL_SPIN_SKIP_RATIO, + MIN_SKIP_DURATION, + SPECULATIVE_OPTIMIZATION, READ_STEP, SPIN_BUFFER_SIZE, - SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION, - SPIN_DETECT_LENGTH, - TESSERACT_LANG, + DURATION_DETECTION_MAX_SPINS, + DURATION_DETECTION_MAX_FRAMES, + TESSERACT_LANGUAGE, VISUALIZATION_ENABLED, ) from .logger_config import add_global_event_time, setup_logger, update_global_event_time @@ -23,19 +22,18 @@ "update_global_event_time", "add_global_event_time", "READ_STEP", - "ANGLE_WINDOW_LEN", + "ANGLE_WINDOW_SIZE", "CALCULATION_STEP", "SPIN_BUFFER_SIZE", - "MIN_SKIP_OF_WHEEL_SPIN", - "MIN_SKIP_SEC", - "MAX_MEAN_ANGLE_DELTA", - "NASTY_OPTIMIZATION", - "SPIN_DETECT_LENGTH", - "SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION", - "FRAMES_STEP_FOR_LENGTH_DETECTION", - "LOOK_BEHIND_FRAMES_FOR_LENGTH_DETECTION", - "EXCLUDE_SECONDS_RANGE", - "ASK_LENGTH", - "TESSERACT_LANG", + "MIN_WHEEL_SPIN_SKIP_RATIO", + "MIN_SKIP_DURATION", + "MAX_ANGLE_DELTA", + "SPECULATIVE_OPTIMIZATION", + "DURATION_DETECTION_MAX_SPINS", + "DURATION_DETECTION_MAX_FRAMES", + "DURATION_DETECTION_FRAME_STEP", + "DURATION_DETECTION_TIME_RANGE", + "PROMPT_FOR_DURATION", + "TESSERACT_LANGUAGE", "VISUALIZATION_ENABLED", ] diff --git a/config/config.py b/config/config.py index b9c1448..608a0fa 100644 --- a/config/config.py +++ b/config/config.py @@ -1,21 +1,21 @@ READ_STEP = 2 -ANGLE_WINDOW_LEN = 30 -CALCULATION_STEP = ANGLE_WINDOW_LEN * 3 +ANGLE_WINDOW_SIZE = 30 +CALCULATION_STEP = ANGLE_WINDOW_SIZE * 3 SPIN_BUFFER_SIZE = 3 -# The close to accurate is 1/4. The most accurate 1/3. -MIN_SKIP_OF_WHEEL_SPIN = 1 / 3 -MIN_SKIP_SEC = 10 -MAX_MEAN_ANGLE_DELTA = 10.0 -TESSERACT_LANG = "rus+eng" # "eng" +# The calculations are most accurate at 1/3 and fairly accurate at 1/4. +MIN_WHEEL_SPIN_SKIP_RATIO = 1 / 3 +MIN_SKIP_DURATION = 10 +MAX_ANGLE_DELTA = 10.0 -NASTY_OPTIMIZATION = True +TESSERACT_LANGUAGE = "rus+eng" # "eng" -SPIN_DETECT_LENGTH = True -SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION = 10 -FRAMES_STEP_FOR_LENGTH_DETECTION = 10 -LOOK_BEHIND_FRAMES_FOR_LENGTH_DETECTION = 5 -EXCLUDE_SECONDS_RANGE = (30, 180) -ASK_LENGTH = True +SPECULATIVE_OPTIMIZATION = True +DURATION_DETECTION_MAX_SPINS = 10 +DURATION_DETECTION_MAX_FRAMES = 600 +DURATION_DETECTION_FRAME_STEP = 10 +DURATION_DETECTION_TIME_RANGE = (30, 180) + +PROMPT_FOR_DURATION = True VISUALIZATION_ENABLED = False diff --git a/stream/frame.py b/stream/frame.py index 3cd8218..3347ce6 100644 --- a/stream/frame.py +++ b/stream/frame.py @@ -24,7 +24,7 @@ class Frame: _wheel: Optional[np.ndarray] _lot_name: Optional[str] _rotation_angle: Optional[float] - _init_text = ["победитель", "winner"] + _initial_text = ["winner", "победитель"] def __init__(self, frame: np.ndarray, index: int): self._frame = frame @@ -71,7 +71,7 @@ def detect_wheel(self) -> np.ndarray: self._wheel = max(circles[0], key=lambda c: c[2]) return self._wheel - raise Exception("cannot detect wheel") + raise Exception("Cannot detect the wheel") def detect_lot_name(self) -> str: if self._lot_name is not None: @@ -89,16 +89,16 @@ def detect_lot_name(self) -> str: utils.visualize(text_roi, "text roi before preprocessing") - text = pytesseract.image_to_string(thresh, lang=config.TESSERACT_LANG, config="--psm 6") - logger.info("Lot name was detected", extra={"text": text.strip(), "frame": {self._index}}) + text = pytesseract.image_to_string(thresh, lang=config.TESSERACT_LANGUAGE, config="--psm 6") + logger.info("Lot name detected", extra={"text": text.strip(), "frame": {self._index}}) self._lot_name = text.strip() return self._lot_name - def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: + def _find_duration_block(self, block_roi: np.ndarray) -> np.ndarray: gray = cv2.cvtColor(block_roi, cv2.COLOR_BGR2GRAY) - # 50 and 100 — heuristics value. For someone's stream 5 and 10 should be used + # 50 and 100 are heuristics value. For some streams, 5 and 10 should be used instead candidates = [(50, 100), (5, 10)] for threshold1, threshold2 in candidates: edges = cv2.Canny(gray, threshold1, threshold2) @@ -107,11 +107,11 @@ def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: rectangles = [] for contour in contours: x, y, w, h = cv2.boundingRect(contour) - # 70 (50 for the range length) and 28 the min size of the target rectangle + # 70 (50 for the range length) and 28 are the minimal dimensions of the target rectangle if w >= 50 and h >= 28: rectangles.append((x, y, w, h)) - # drop all inner rectangles + # Drop all inner rectangles filtered_rectangles = [] for i, (x1, y1, w1, h1) in enumerate(rectangles): is_inner = False @@ -122,8 +122,8 @@ def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: if not is_inner: filtered_rectangles.append((x1, y1, w1, h1)) - # drop all rectangles that intersect others - # keep only with the biggest area. + # Drop all rectangles that intersect others + # Keep only the one with the largest area. filtered_rectangles = sorted(filtered_rectangles, key=lambda item: item[2] * item[3], reverse=True) final_rectangles = [] for rect in filtered_rectangles: @@ -134,20 +134,19 @@ def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: ): final_rectangles.append(rect) - # keep rectangles from the left to the right + # Keep rectangles from left to right final_rectangles = sorted(final_rectangles, key=lambda item: item[0]) - # if there are only 3 rectangles: Крутимся, От и До, then it was not successful + # If there are only 3 rectangles: Spin, From и To, then the operation was not successful if len(final_rectangles) == 3: - raise Exception("range length was detected") + raise Exception("Range duration detected") - # if there are only 2 rectangles: Крутимся и Длительность, then it was successful + # If there are only 2 rectangles: Spinning и Duration, then the operation was successful if len(final_rectangles) == 2: - # x, y, w, h = max(final_rectangles, key=lambda item: item[1]) x, y, w, h = final_rectangles[1] return block_roi[y + 8 : y + h - 8, x + 5 : x + w - 5] - # if there is only one rectangle: Крутимся, then look to the left + # If there is one rectangle: Spinning, look to the left if len(final_rectangles) == 1: x, y, w, h = final_rectangles[0] if ( @@ -158,9 +157,9 @@ def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: ): return block_roi[y + 8 : y + h - 8, x + w + 20 : x + w + 20 + 50] - raise Exception("length roi wasn't found") + raise Exception("Duration roi not found") - def _detect_length(self, roi: np.ndarray) -> int: + def _detect_duration(self, roi: np.ndarray) -> int: candidates = [] for threshold in range(170, 140, -5): _, thresh = cv2.threshold(roi, threshold, 255, cv2.THRESH_BINARY) @@ -173,16 +172,16 @@ def _detect_length(self, roi: np.ndarray) -> int: candidates.append(int(matches[0])) if len(candidates) == 0: - raise Exception(f"no candidates") + raise Exception("No candidates") - length, total = collections.Counter(candidates).most_common(1)[0] - logger.info("Length candidates", extra={"candidates": candidates}) + duration, total = collections.Counter(candidates).most_common(1)[0] + logger.info("Duration candidates", extra={"candidates": candidates}) if total < 2: - raise Exception(f"not enough the candidates of {length}") + raise Exception(f"Not enough candidates of {duration}") - return length + return duration - def detect_length(self) -> int: + def detect_duration(self) -> int: circle = self.detect_wheel() center_x, center_y, radius = circle @@ -192,34 +191,34 @@ def detect_length(self) -> int: roi_x_end = min(self._frame.shape[1], center_x + radius + radius) block_roi = self._frame[roi_y_start:roi_y_end, roi_x_start:roi_x_end] - utils.visualize(block_roi, "Length section") - rect = self._find_length_section(block_roi) - utils.visualize(rect, "Cropped length section") + utils.visualize(block_roi, "Duration section") + rect = self._find_duration_block(block_roi) + utils.visualize(rect, "Cropped duration section") - length = self._detect_length(rect) + duration = self._detect_duration(rect) - return length + return duration def _extract_raw_lines(self) -> np.ndarray: image = self.extract_circle_content(True) height, width = image.shape[:2] mask = np.ones((height, width), dtype=np.uint8) * 255 - # fill out the text on the wheel + # Fill out the text on the wheel gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) _, thresh = cv2.threshold(gray, 215, 255, cv2.THRESH_BINARY) contours, _ = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) for i, contour in enumerate(contours): x, y, w, h = cv2.boundingRect(contour) - # filter out the contour that may contain text + # Filter out contours that may contain text if w + h < 60: cv2.drawContours(mask, contours, i, 0, thickness=cv2.FILLED) - # fill out the circle center on the wheel and around that center space to avoid collisions + # Fill in the circle center on the wheel and the surrounding space to avoid collisions _, _, radius = self._wheel cv2.circle(mask, (radius, radius), int(0.5 * radius), 0, thickness=cv2.FILLED) - # mask is ready, apply to the image to extact lines + # Mask is ready, apply it to the image to extract lines masked = cv2.bitwise_and(image, image, mask=mask) masked_gray = cv2.cvtColor(masked, cv2.COLOR_BGR2GRAY) _, masked_thresh = cv2.threshold(masked_gray, 215, 255, cv2.THRESH_BINARY) @@ -282,7 +281,8 @@ def _uniq_lines( uniq_lines_: List[tuple[np.ndarray, float, float, np.ndarray]] = [] for group in groups: - closest_line = min(group, key=lambda p: p[1]) # Line with the smallest distance + # Choose the line with the smallest distance to the center + closest_line = min(group, key=lambda p: p[1]) a, b, c = closest_line d = _to_prolong_line(a) uniq_lines_.append((a, b, c, d)) @@ -311,7 +311,7 @@ def extract_sectors(self) -> List[tuple[float, float]]: start_angle = angles[i] end_angle = angles[(i + 1) % len(angles)] - # normalize angle relatively to North (counterclockwise) + # Normalize the angle relative to North (counterclockwise) normalized_start_angle = (3 * math.pi / 2 - start_angle) % (2 * math.pi) normalized_end_angle_deg = (3 * math.pi / 2 - end_angle) % (2 * math.pi) @@ -326,28 +326,28 @@ def extract_sectors(self) -> List[tuple[float, float]]: return sectors - def is_init_frame(self) -> bool: + def is_initial_frame(self) -> bool: try: text = self.detect_lot_name() except pytesseract.TesseractNotFoundError: - logger.error("Tesseract was not found") + logger.error("Tesseract not found") raise except Exception: return False - # search substring 'победитель' или 'winner' in the frame. This prevents some false parsing - return any(substring in text.lower() for substring in self._init_text) + # Searching for the substring 'winner' in the frame to prevent false parsing + return any(substring in text.lower() for substring in self._initial_text) def is_spin_frame(self) -> bool: try: text = self.detect_lot_name() except pytesseract.TesseractNotFoundError: - logger.error("Tesseract was not found") + logger.error("Tesseract not found") raise except Exception as e: return False - return not any(substring in text.lower() for substring in self._init_text) + return not any(substring in text.lower() for substring in self._initial_text) # TODO: doesn't work right. It finds a strange contours def is_circle(self): @@ -364,7 +364,6 @@ def is_circle(self): ellipse = cv2.fitEllipse(contour) (x, y), (major_axis, minor_axis), angle = ellipse return abs(major_axis - minor_axis) / max(major_axis, minor_axis) < 0.5 - # return abs(major_axis - minor_axis) / max(major_axis, minor_axis) < 0.01 def extract_circle_content(self, keep_center: bool) -> np.ndarray: center_x, center_y, radius = self.wheel @@ -405,11 +404,11 @@ def calculate_rotation_with(self, another_frame: stream.Frame) -> float: points2 = np.float64([keypoints2[m.trainIdx].pt for m in matches]) if len(points1) < 4: - raise Exception("Not enough matches to compute homography.") + raise Exception("Not enough matches to compute homography") matrix, _ = cv2.estimateAffinePartial2D(points1, points2) if matrix is None: - raise Exception("Homography matrix could not be computed.") + raise Exception("Homography matrix could not be computed") angle = np.arctan2(matrix[1, 0], matrix[0, 0]) * (180 / np.pi) another_frame._rotation_angle = float(angle if angle >= 0 else angle + 360) diff --git a/stream/reader.py b/stream/reader.py index be8ce50..af1359d 100644 --- a/stream/reader.py +++ b/stream/reader.py @@ -54,7 +54,7 @@ def read(self, sec: int) -> List[stream.Frame]: frame = self._stream.read() if frame is None: self._can_read = False - logger.error("No frame was read", extra={"frame": self._frame_index}) + logger.error("No frame read", extra={"frame": self._frame_index}) break frame_buffer.append(stream.Frame(frame, self._frame_index)) @@ -75,11 +75,11 @@ def read_until_spin_found(self, sec: int) -> List[stream.Frame]: if last_frame.is_spin_frame(): if len(buffer) < 3: raise Exception("Buffer has less than 3 frames") - # Take a frame before the previous one, to avoid a case when two serial frames are the same frames. + # Take the frame before the previous one to avoid the case when two consecutive frames are identical prev_frame = buffer[-3] try: diff = last_frame.calculate_rotation_with(prev_frame) - # else delta is so small then it's not spin + # Otherwise, if delta is too small, it's not considered a spin if min(diff, abs(diff - 360)) > 0.01: break @@ -91,10 +91,6 @@ def read_until_spin_found(self, sec: int) -> List[stream.Frame]: pass logger.warning(f"Frame on {last_frame.index / self._fps}s ({last_frame.index}f) is not a spin") - # Nasty optimization for all other wheels - # for frame in buffer: - # frame.force_set_wheel(last_frame.wheel) - return buffer def skip(self, sec: int) -> None: diff --git a/stream/segment.py b/stream/segment.py index ad32ed5..c1f0629 100644 --- a/stream/segment.py +++ b/stream/segment.py @@ -24,7 +24,7 @@ class Segment: def __init__(self, reader: stream.Reader): self._reader = reader self._first_spins_buffer: List[stream.Frame] = [] - self._init_frame: Optional[stream.Frame] = None + self._initial_frame: Optional[stream.Frame] = None self._circle_sectors: Optional[stream.CircleSectors] = None def _binary_search(self, segment: List[stream.Frame]) -> Optional[int]: @@ -33,7 +33,7 @@ def _binary_search(self, segment: List[stream.Frame]) -> Optional[int]: while low <= high: mid = (low + high) // 2 - if segment[mid].is_init_frame(): + if segment[mid].is_initial_frame(): if mid == len(segment) - 1 or segment[mid + 1].is_spin_frame(): return mid low = mid + 1 @@ -42,115 +42,89 @@ def _binary_search(self, segment: List[stream.Frame]) -> Optional[int]: return None - def _detect_length(self) -> int | List[int]: + def _detect_duration(self) -> int | List[int]: self._populate_first_spins() try: - # The first spin frame. - length = self._first_spins_buffer[0].detect_length() - logger.info(f"Length was detected via screen parsing: {length}") - return length + duration = self._first_spins_buffer[0].detect_duration() + logger.info(f"Duration detected via screen parsing: {duration}") + return duration except Exception as e: - logger.error(f"The length wasn't detected via screen parsing: {e}") + logger.error(f"Duration not detected via screen parsing: {e}") - if config.SPIN_DETECT_LENGTH: - try: - length = self._populate_first_n_spins(config.SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION) - logger.info(f"Length was detected via wheel spin: {length}") - return length - except Exception as e: - logger.error(f"The length wasn't detected via wheel spin: {e}") + try: + duration = self._populate_first_n_spins() + logger.info(f"Duration detected via wheel spin: {duration}") + return duration + except Exception as e: + logger.error(f"Duration not detected via wheel spin: {e}") - if config.ASK_LENGTH: + if config.PROMPT_FOR_DURATION: # Manual enter with open("/dev/tty", "r+") as tty: - tty.write("Cannot detect length. Manual enter: ") + tty.write("Cannot detect duration. Manual enter: ") tty.flush() response = tty.readline().strip() tty.write("") if response.isdigit(): - length = int(response) - logger.info(f"Length was set to {length}") + duration = int(response) + logger.info(f"Duration set to {duration}") - return length + return duration - raise Exception("cannot detect the length") + raise Exception("Cannot detect the duration") def _populate_first_spins(self) -> None: if len(self._first_spins_buffer) != 0: return buffer = self._reader.read_until_spin_found(config.READ_STEP) - logger.info("Spin with a lot name was found") + logger.info("Spin with the lot name found") - _init_frame_index = self._binary_search(buffer) - if _init_frame_index is None: - raise Exception("no init frame") - self._first_spins_buffer = buffer[_init_frame_index + 1 :] - self._init_frame = buffer[_init_frame_index] - logger.info("Init frame was found", extra={"frame": {self._init_frame.index}}) - if self._init_frame.is_circle(): - logger.error("Circle is ellipse? The result could be vary") + _initial_frame_index = self._binary_search(buffer) + if _initial_frame_index is None: + raise Exception("No initial frame found") + self._first_spins_buffer = buffer[_initial_frame_index + 1 :] + self._initial_frame = buffer[_initial_frame_index] + logger.info("Initial frame found", extra={"frame": {self._initial_frame.index}}) + if self._initial_frame.is_circle(): + logger.error("Circle is an ellipse. Result may vary") # TODO: why was it added? - self._init_frame.wheel[2] -= 1 - - # if config.NASTY_OPTIMIZATION: - # [frame.force_set_wheel(self._init_frame.wheel) for frame in self._first_spins_buffer] + self._initial_frame.wheel[2] -= 1 - def _populate_first_spins_with_length(self, length: int) -> None: + def _populate_first_spins_with_duration(self, duration: int) -> None: if len(self._first_spins_buffer) == 0: self._populate_first_spins() - min_range, max_range = utils.range(length) + min_range, max_range = utils.range(duration) first_wheel_frames = math.ceil( - utils.calculate_x(config.SPIN_BUFFER_SIZE * 360.0 / min_range) * self._reader.fps * length + utils.calculate_x(config.SPIN_BUFFER_SIZE * 360.0 / min_range) * self._reader.fps * duration ) sec = math.ceil(max(first_wheel_frames - len(self._first_spins_buffer), 0) / self._reader.fps) self._first_spins_buffer.extend(self._reader.read(sec)) - def _populate_first_n_spins(self, max_spins: int) -> int: - def _binary_search(part: List[stream.Frame]) -> Optional[int]: - low = 0 - high = len(part) - 1 - while low <= high: - mid = (low + high) // 2 - # if mid == len(part) - 1: - # return mid - if mid == 0: - return None - angle = self._init_frame.calculate_rotation_with(part[mid]) - prev_angle = self._init_frame.calculate_rotation_with(part[mid - 1]) - if angle < prev_angle and abs(360.0 + angle - prev_angle) % 360.0 < 60.0: - return mid - - if angle > self._init_frame.calculate_rotation_with(part[-1]): - low = mid + 1 - else: - high = mid - 1 - - return None - + def _populate_first_n_spins(self) -> int: def _find_new_spin_frame(part: List[stream.Frame], reverse: bool) -> int: iteration = range(1, len(part)) if reverse is False else range(len(part) - 1, 0, -1) for i in iteration: - angle = self._init_frame.calculate_rotation_with(part[i]) - prev_angle = self._init_frame.calculate_rotation_with(part[i - 1]) + angle = self._initial_frame.calculate_rotation_with(part[i]) + prev_angle = self._initial_frame.calculate_rotation_with(part[i - 1]) if angle < prev_angle and abs(360.0 + angle - prev_angle) % 360.0 < 60.0: return i - raise Exception("there is no new spin") + raise Exception("No new spins") - def _exclude_not_matched_second(buffer: List[stream.Frame], index: int, idx: int, seconds: List[int]): - part_behind = buffer[max(0, index - config.FRAMES_STEP_FOR_LENGTH_DETECTION) : index + 1] + def _exclude_not_matched_seconds(buffer: List[stream.Frame], index: int, idx: int, seconds: List[int]): + part_behind = buffer[max(0, index - config.DURATION_DETECTION_FRAME_STEP) : index + 1] new_spin_index = _find_new_spin_frame(part_behind, True) new_spin_idx = idx - (len(part_behind) - new_spin_index - 1) to_remove = [] for sec in seconds: if spins > round(sec * 270 / 360): - logger.error("there is no point to look further", extra={"sec": sec, "spins": spins}) + logger.error("No point to look further", extra={"sec": sec, "spins": spins}) continue min_range, max_range = utils.range(sec) @@ -174,7 +148,7 @@ def _exclude_not_matched_second(buffer: List[stream.Frame], index: int, idx: int idx = 0 spins = 0 prev_angle = 0.0 - starts, ends = config.EXCLUDE_SECONDS_RANGE + starts, ends = config.DURATION_DETECTION_TIME_RANGE seconds = np.arange(starts, ends + 1).tolist() for index, frame in enumerate(self._first_spins_buffer): @@ -182,43 +156,43 @@ def _exclude_not_matched_second(buffer: List[stream.Frame], index: int, idx: int self._first_spins_buffer.extend(self._reader.read(config.READ_STEP)) idx += 1 - if idx % config.FRAMES_STEP_FOR_LENGTH_DETECTION != 0: + if idx % config.DURATION_DETECTION_FRAME_STEP != 0: continue try: - if config.NASTY_OPTIMIZATION: - frame.force_set_wheel(self._init_frame.wheel) - angle = self._init_frame.calculate_rotation_with(frame) + if config.SPECULATIVE_OPTIMIZATION: + frame.force_set_wheel(self._initial_frame.wheel) + angle = self._initial_frame.calculate_rotation_with(frame) except Exception as e: - logger.error("cannot calculate angle", extra={"frame_id": idx, "e": e}) + logger.error("Cannot calculate angle", extra={"frame_id": idx, "e": e}) continue if angle < prev_angle: spins += 1 - _exclude_not_matched_second(self._first_spins_buffer, index, idx, seconds) - if spins > max_spins or idx > 600: + _exclude_not_matched_seconds(self._first_spins_buffer, index, idx, seconds) + if spins > config.DURATION_DETECTION_MAX_SPINS or idx > config.DURATION_DETECTION_MAX_FRAMES: break if len(seconds) == 1: - logger.info("Only one length candidate has left") + logger.info("One duration candidate left") break prev_angle = angle if len(seconds) == 0: - raise Exception(f"There are no length candidates") + raise Exception("No duration candidates") seconds.sort() return seconds - def _add_lot_names_around(self, angle: float, length: int) -> None: - min_range, max_range = utils.range(length) + def _add_lot_names_around(self, angle: float, duration: int) -> None: + min_range, max_range = utils.range(duration) angles = [angle + spins * 360.0 for spins in range(0, config.SPIN_BUFFER_SIZE)] task_queue = queue.Queue() for angle in angles: - start_frame_id = math.floor(utils.calculate_x_gsap(angle / max_range) * self._reader.fps * length) - end_frame_id = math.ceil(utils.calculate_x_gsap(angle / min_range) * self._reader.fps * length) + start_frame_id = math.floor(utils.calculate_x_gsap(angle / max_range) * self._reader.fps * duration) + end_frame_id = math.ceil(utils.calculate_x_gsap(angle / min_range) * self._reader.fps * duration) for frame_id in range(start_frame_id, end_frame_id): frame = self._first_spins_buffer[frame_id] @@ -228,7 +202,7 @@ def _worker(task_queue: queue.Queue) -> None: while not task_queue.empty(): frame = task_queue.get() lot_name = frame.detect_lot_name() - frame_angle = self._init_frame.calculate_rotation_with(frame) + frame_angle = self._initial_frame.calculate_rotation_with(frame) self._circle_sectors.add_lot_name(frame_angle, lot_name) task_queue.task_done() @@ -249,7 +223,7 @@ def _filter_anomalies_linear(self, predicted_angles: List[float], deviation: flo filtered.append(predicted_angles[j]) if len(filtered) == 0: - raise Exception(f"mean is nan. angle set: {predicted_angles}") + raise Exception(f"Mean is NaN. Angle set: {predicted_angles}") return float(np.mean(filtered)) @@ -259,12 +233,12 @@ def _format_lot_name(self, lot: tuple[float, float, str, float]) -> str: return f"{B}[{percent}%{synthetic_suffix}]{E} {G}{lot_name} ({lot_percent}%){E}" - def _calc_mean_angle(self, length: int, angles_window: List[(int, float)]) -> Optional[float]: + def _calc_mean_angle(self, duration: int, angles_window: List[(int, float)]) -> Optional[float]: predicted_angles: List[float] = [] for idx, angle in angles_window: - x = idx / (self._reader.fps * length) + x = idx / (self._reader.fps * duration) y = utils.calculate_y_gsap(x) - min_range, max_range = utils.range(length) + min_range, max_range = utils.range(duration) predicted_spins = math.ceil((y * min_range - angle) / 360) predicted_angle = ((angle + predicted_spins * 360) / y) % 360 predicted_angles.append(predicted_angle) @@ -273,45 +247,39 @@ def _calc_mean_angle(self, length: int, angles_window: List[(int, float)]) -> Op mean_angle = self._filter_anomalies_linear(predicted_angles, 2.5) return mean_angle except Exception as e: - logger.warning(f"[{length}s] fail to calculate the mean", extra={"e": e}) + logger.warning(f"[{duration}s] mean calculation failed", extra={"e": e}) return None def detect_winner(self): - length = self._detect_length() - min_length, max_length, length_candidates = length, length, [length] - if not isinstance(length, int): - min_length, max_length = min(length), max(length) - length_candidates = length - length_candidates = sorted(set(l for length in length_candidates for l in range(length - 1, length + 1))) - logger.info(f"Speculate the possible length: {length_candidates}") + duration = self._detect_duration() + min_duration, max_duration, duration_candidates = duration, duration, [duration] + if not isinstance(duration, int): + min_duration, max_duration = min(duration), max(duration) + duration_candidates = duration + duration_candidates = sorted(set(l for d in duration_candidates for l in range(d - 1, d + 1))) + logger.info(f"Speculate possible duration: {duration_candidates}") - self._populate_first_spins_with_length(max_length) + self._populate_first_spins_with_duration(max_duration) - sectors = self._init_frame.extract_sectors() + sectors = self._initial_frame.extract_sectors() self._circle_sectors = stream.CircleSectors(sectors) logger.info(f"Total lots: {len(sectors)}") idx = 0 - # min_range, max_range = utils.range(length) - max_read_frames = min_length * self._reader.fps + max_read_frames = min_duration * self._reader.fps max_skip_frames = max( - self._reader.fps * config.MIN_SKIP_OF_WHEEL_SPIN * max_length, self._reader.fps * config.MIN_SKIP_SEC + self._reader.fps * config.MIN_WHEEL_SPIN_SKIP_RATIO * max_duration, + self._reader.fps * config.MIN_SKIP_DURATION, ) skipped_frames = 0 buffer = self._first_spins_buffer - # predicted_angles = {length: [] for length in length_candidates} - # predicted_angles = {} angles_window: List[(int, float)] = [] prev_mean_angle = {} logger.info(f"Skipping {round(max_skip_frames / self._reader.fps, 2)}s") - # optimization - # self._reader.skip(math.ceil(max_skip_frames / self._reader.fps)) - # skipped_frames = max_skip_frames - while True: for frame in buffer: idx += 1 @@ -319,58 +287,50 @@ def detect_winner(self): skipped_frames += 1 continue - # Collect a window with angles. The size of window is config.ANGLE_WINDOW_LEN + # Collect the window of angles try: - if config.NASTY_OPTIMIZATION: - frame.force_set_wheel(self._init_frame.wheel) - angle = self._init_frame.calculate_rotation_with(frame) - angles_window.append( - ( - idx, - angle, - ) - ) + if config.SPECULATIVE_OPTIMIZATION: + frame.force_set_wheel(self._initial_frame.wheel) + angle = self._initial_frame.calculate_rotation_with(frame) + angles_window.append((idx, angle)) except Exception as e: - logger.error("cannot calculate angle", extra={"frame_id": idx, "e": e}) + logger.error("Cannot calculate angle", extra={"frame_id": idx, "e": e}) angles_window = [] max_skip_frames += config.CALCULATION_STEP continue - if len(angles_window) <= config.ANGLE_WINDOW_LEN: + if len(angles_window) <= config.ANGLE_WINDOW_SIZE: continue - # Examine each length candidate and drop - for length in length_candidates: - mean_angle = self._calc_mean_angle(length, angles_window) + # Examine each duration candidate separatly + for duration in duration_candidates: + mean_angle = self._calc_mean_angle(duration, angles_window) if mean_angle is None: continue - # Remove the candidate if it doesn't fit to the final angle - if length in prev_mean_angle: - diff = abs(mean_angle - prev_mean_angle[length]) - if min(diff, 360.0 - diff) > config.MAX_MEAN_ANGLE_DELTA: - length_candidates.remove(length) - logger.error( - f"Looks like the length {length}s was detected incorrectly. Remove it from the candidates." - ) - # if only one left, use it as the main length - if len(length_candidates) == 1: - max_read_frames = length_candidates[0] * self._reader.fps - logger.info(f"The length is: {length_candidates[0]}") - prev_mean_angle[length] = mean_angle - - if len(length_candidates) == 0: - raise Exception("no length candidates have left") - - # if only one length candidate has left use it as the main length - if len(length_candidates) == 1: - # dirty hack - if length_candidates[0] not in prev_mean_angle: + # Remove the candidate if it doesn't match to the target angle + if duration in prev_mean_angle: + diff = abs(mean_angle - prev_mean_angle[duration]) + if min(diff, 360.0 - diff) > config.MAX_ANGLE_DELTA: + duration_candidates.remove(duration) + logger.error(f"Duration {duration}s detected incorrectly. Remove it from the candidates") + if len(duration_candidates) == 1: + max_read_frames = duration_candidates[0] * self._reader.fps + logger.info(f"Duration is: {duration_candidates[0]}") + prev_mean_angle[duration] = mean_angle + + if len(duration_candidates) == 0: + raise Exception("No duration candidates left") + + # If only one duration candidate is left, use it as the main duration + if len(duration_candidates) == 1: + # hack + if duration_candidates[0] not in prev_mean_angle: continue - mean_angle = prev_mean_angle[length_candidates[0]] + mean_angle = prev_mean_angle[duration_candidates[0]] - self._add_lot_names_around(mean_angle, length) + self._add_lot_names_around(mean_angle, duration) self._circle_sectors.vote(mean_angle) most_voted = self._circle_sectors.most_voted() most_voted_formatted = "\n".join( @@ -381,7 +341,7 @@ def detect_winner(self): logger.info( f"Last voted: {self._format_lot_name(by_angle)}\nMost voted:\n{most_voted_formatted}", extra={ - "sec": f"{int(idx / self._reader.fps)} ({round(idx / (self._reader.fps * length) * 100, 2)}%)", + "sec": f"{int(idx / self._reader.fps)} ({round(idx / (self._reader.fps * duration) * 100, 2)}%)", "angle": f"{round(mean_angle, 2)}", }, ) diff --git a/tests/stream/test_length.py b/tests/stream/test_length.py index 5d39783..79a220d 100644 --- a/tests/stream/test_length.py +++ b/tests/stream/test_length.py @@ -24,7 +24,7 @@ def test_detect_length(self): image_path = os.path.join(os.path.dirname(__file__), "testdata/detect_length", obj["path"]) raw = cv2.imread(image_path, cv2.IMREAD_COLOR) frame = stream.Frame(raw, 0) - length = frame.detect_length() + length = frame.detect_duration() print(f"Length: {obj['length']}. Result: {length}") self.assertEqual(obj["length"], length) diff --git a/tests/stream/test_sectors.py b/tests/stream/test_sectors.py index b5dc00c..a4e3014 100644 --- a/tests/stream/test_sectors.py +++ b/tests/stream/test_sectors.py @@ -21,8 +21,6 @@ def test_extract_sectors(self): if "_comment" in obj: self.skipTest(obj["_comment"]) - # if obj['path'] != 'a1.png': - # self.skipTest('Looking for another') print(f"start:{obj['path']}") image_path = os.path.join(os.path.dirname(__file__), "testdata/extract_sectors", obj["path"]) raw = cv2.imread(image_path, cv2.IMREAD_COLOR) diff --git a/utils.sh b/utils.sh index a22d63b..fa10fea 100755 --- a/utils.sh +++ b/utils.sh @@ -265,7 +265,7 @@ run_vod() { streamlink --hls-start-offset "$offset" --hls-duration "$duration" --stdout "$link" best | python main.py winner } -[[ $# -lt 1 ]] && echo "No function was passed" && exit 1 +[[ $# -lt 1 ]] && echo "No function passed" && exit 1 fn="$1" shift "$fn" "$@" diff --git a/utils/spin.py b/utils/spin.py index 8495d53..fb945fb 100644 --- a/utils/spin.py +++ b/utils/spin.py @@ -1,12 +1,12 @@ -def range(length: int) -> tuple[float, float]: +def range(duration: int) -> tuple[float, float]: # https://github.com/Poíntauc/poíntauc_frontend/blob/310893f1b9e58068a9ece793a4b71a6cd11baea1/src/components/BaseWheel/BaseWheel.tsx#L147 - min_range = round(length * 270 / 360) * 360 + min_range = round(duration * 270 / 360) * 360 max_range = min_range + 360 return float(min_range), float(max_range) -def range_with_angle(angle: float, length: int) -> float: - min_range, _ = range(length) +def range_with_angle(angle: float, duration: int) -> float: + min_range, _ = range(duration) return min_range + angle diff --git a/utils/visualizer.py b/utils/visualizer.py index a7757c8..dcdfcc9 100644 --- a/utils/visualizer.py +++ b/utils/visualizer.py @@ -37,7 +37,7 @@ def draw_lines(frame: np.ndarray, lines: np.ndarray, title: str = "Lines") -> No elif len(line) == 4: x1, y1, x2, y2 = line else: - raise Exception("invalid format") + raise Exception("Invalid format") cv2.line(image, (x1, y1), (x2, y2), colors[i], 1) @@ -89,7 +89,7 @@ def _denormalize_angle(angle: float) -> float: deferred_calls = [] for i, sector in enumerate(sectors): start_angle, end_angle = sector - # Make a text before normalization + # Create the text before normalization text = f"{round(start_angle, 2)}-{round(end_angle, 2)}" denormalized_end_angle_deg = _denormalize_angle(start_angle) @@ -102,7 +102,7 @@ def _denormalize_angle(angle: float) -> float: y = int(radius + radius * 0.8 * np.sin(np.radians(mid_angle))) text_angle = -mid_angle if mid_angle <= 90 or mid_angle >= 270 else (180 - mid_angle) - # draw a conu part + # Draw the cone part cv2.ellipse( image, (radius, radius), @@ -114,7 +114,7 @@ def _denormalize_angle(angle: float) -> float: -1, cv2.LINE_AA, ) - # the text will be drawn latter + # The text will be drawn latter deferred_calls.append(partial(_draw_rotated_text, image, text, (x, y), text_angle)) [call() for call in deferred_calls]