From 50b5d4a4a915bb2490da5183b1b038a578f5d58d Mon Sep 17 00:00:00 2001 From: Robo Loop Date: Mon, 17 Mar 2025 19:52:13 +0000 Subject: [PATCH 1/2] Upgraded README with details on how it works Rewroted log and exception messages --- README.md | 46 +++++++++++++++- config/__init__.py | 2 + config/config.py | 3 +- stream/frame.py | 54 +++++++++--------- stream/reader.py | 10 +--- stream/segment.py | 103 +++++++++++------------------------ tests/stream/test_sectors.py | 2 - utils/visualizer.py | 8 +-- 8 files changed, 115 insertions(+), 113 deletions(-) diff --git a/README.md b/README.md index 454d8e1..d61ad60 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,47 @@ Want to predict the result of a wheel spin before the winner is announced? Use t ![preview.gif](./preview.gif) +## How it works + +The solution is entirely built on collected heuristics and on standard human behavioral patterns. The implementation relies on screen parsing via OpenCV, text recognition via Tesseract OCR engine and basic math computations. + +The program workflow: + +1. Detecting the initial spin frame + + - The program processes frames sequentially until it finds one that meets the following criteria: + - The current frame contains a wheel (identified as a large circle) and has the text "Winner" above it. + - The next frame shows the wheel in motion (determined by comparing the angular difference between frames). + +2. Determining the total spin duration + + - The simplest approach is detecting a user-entered duration on the screen (by parsing an image and extracting a numerical value) + - If the duration box is unrecognized, hidden, or spin duration is randomized, a mathematical approach is applied. The program estimates the duration based on observed wheel movement. The wheel spin follows a custom easing function implemented with [GSAP](https://gsap.com/docs/v3/Eases/CustomEase/), which optimizes Bézier curves by generating interpolated `x` and `y` values for improved calculation accuracy. However, this implementation introduces slight deviations near the interpolated points. These deviations help eliminate unsuitable duration candidates, typically narrowing the range to a 1-4 second window. Post-processing further refines the estimate. This approximation is sufficient for our needs at this stage + - The fallback strategy: if all else fails, the program prompts the user for input + +3. Calculating the destination angle + + Picking a frame from the spinning sequence: + + - $x_{i} = \frac{l_{i}}{l}$, where `lᵢ` is the elapsed time, and `l` is the total spin duration + - $y_{i} = \frac{a_{i}}{a}$, where `aᵢ` is the elapsed angular displacement (including full rotations) from initial state, and `a` is the target angle + - $y_{i} = bezier(x_{i})$, where the `bezier` function maps the elapsed time to the corresponding elapsed angle, both normalized to a scale from 0 to 1 + - The target angle is computed as: $a = \frac{a_{i}}{bezier(\frac{l_{i}}{l})}$ + +4. Collecting lot names and their positions on the wheel + + - The initial frame is used to determine sector boundaries (collected in format start_angle and end_angle) + - The first wheel spin is analyzed to extract sector names (obtained from the text displayed above the wheel) + +5. Refining calculations for greater accuracy + + - Avoiding early-stage angle measurements due to high error margins + - Smoothing data by filtering out spikes, duplicates, and inconsistencies + - Discarding invalid duration candidates based on discrepancies between computed and expected target angles + - Converting the elliptical wheel projection to a circular model for more precise measurements (not yet implemented) + - Extending the spin analysis window to improve text recognition of lot names + - Implementing a voting system to enhance accuracy in determining the winning sector + ## Installation 1. Install the project: @@ -34,7 +75,7 @@ Want to predict the result of a wheel spin before the winner is announced? Use t ## Run -Run the program before the wheel spin starts: +Run the program before the wheel starts to spin: ```shell @@ -42,6 +83,9 @@ streamlink --twitch-low-latency --stdout best | python main.py wi # or you can use a shorthand ./utils run + +# or you can analyze your video snippet +cat vod.ts | python main.py winner ``` ## Lint diff --git a/config/__init__.py b/config/__init__.py index aff6b92..cc2ccea 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -12,6 +12,7 @@ READ_STEP, SPIN_BUFFER_SIZE, SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION, + SPIN_DETECTION_FRAMES_LIMIT, SPIN_DETECT_LENGTH, TESSERACT_LANG, VISUALIZATION_ENABLED, @@ -32,6 +33,7 @@ "NASTY_OPTIMIZATION", "SPIN_DETECT_LENGTH", "SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION", + "SPIN_DETECTION_FRAMES_LIMIT", "FRAMES_STEP_FOR_LENGTH_DETECTION", "LOOK_BEHIND_FRAMES_FOR_LENGTH_DETECTION", "EXCLUDE_SECONDS_RANGE", diff --git a/config/config.py b/config/config.py index b9c1448..5ed668c 100644 --- a/config/config.py +++ b/config/config.py @@ -2,7 +2,7 @@ ANGLE_WINDOW_LEN = 30 CALCULATION_STEP = ANGLE_WINDOW_LEN * 3 SPIN_BUFFER_SIZE = 3 -# The close to accurate is 1/4. The most accurate 1/3. +# The calculations are most accurate at 1/3 and fairly accurate at 1/4. MIN_SKIP_OF_WHEEL_SPIN = 1 / 3 MIN_SKIP_SEC = 10 MAX_MEAN_ANGLE_DELTA = 10.0 @@ -12,6 +12,7 @@ SPIN_DETECT_LENGTH = True SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION = 10 +SPIN_DETECTION_FRAMES_LIMIT = 600 FRAMES_STEP_FOR_LENGTH_DETECTION = 10 LOOK_BEHIND_FRAMES_FOR_LENGTH_DETECTION = 5 EXCLUDE_SECONDS_RANGE = (30, 180) diff --git a/stream/frame.py b/stream/frame.py index 3cd8218..cbc480f 100644 --- a/stream/frame.py +++ b/stream/frame.py @@ -71,7 +71,7 @@ def detect_wheel(self) -> np.ndarray: self._wheel = max(circles[0], key=lambda c: c[2]) return self._wheel - raise Exception("cannot detect wheel") + raise Exception("Cannot detect the wheel") def detect_lot_name(self) -> str: if self._lot_name is not None: @@ -90,7 +90,7 @@ def detect_lot_name(self) -> str: utils.visualize(text_roi, "text roi before preprocessing") text = pytesseract.image_to_string(thresh, lang=config.TESSERACT_LANG, config="--psm 6") - logger.info("Lot name was detected", extra={"text": text.strip(), "frame": {self._index}}) + logger.info("Lot name detected", extra={"text": text.strip(), "frame": {self._index}}) self._lot_name = text.strip() @@ -98,7 +98,7 @@ def detect_lot_name(self) -> str: def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: gray = cv2.cvtColor(block_roi, cv2.COLOR_BGR2GRAY) - # 50 and 100 — heuristics value. For someone's stream 5 and 10 should be used + # 50 and 100 are heuristics value. For some streams, 5 and 10 should be used instead candidates = [(50, 100), (5, 10)] for threshold1, threshold2 in candidates: edges = cv2.Canny(gray, threshold1, threshold2) @@ -107,11 +107,11 @@ def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: rectangles = [] for contour in contours: x, y, w, h = cv2.boundingRect(contour) - # 70 (50 for the range length) and 28 the min size of the target rectangle + # 70 (50 for the range length) and 28 are the minimal dimensions of the target rectangle if w >= 50 and h >= 28: rectangles.append((x, y, w, h)) - # drop all inner rectangles + # Drop all inner rectangles filtered_rectangles = [] for i, (x1, y1, w1, h1) in enumerate(rectangles): is_inner = False @@ -122,8 +122,8 @@ def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: if not is_inner: filtered_rectangles.append((x1, y1, w1, h1)) - # drop all rectangles that intersect others - # keep only with the biggest area. + # Drop all rectangles that intersect others + # Keep only the one with the largest area. filtered_rectangles = sorted(filtered_rectangles, key=lambda item: item[2] * item[3], reverse=True) final_rectangles = [] for rect in filtered_rectangles: @@ -134,20 +134,19 @@ def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: ): final_rectangles.append(rect) - # keep rectangles from the left to the right + # Keep rectangles from left to right final_rectangles = sorted(final_rectangles, key=lambda item: item[0]) - # if there are only 3 rectangles: Крутимся, От и До, then it was not successful + # If there are only 3 rectangles: Spin, From и To, then the operation was not successful if len(final_rectangles) == 3: - raise Exception("range length was detected") + raise Exception("Range length detected") - # if there are only 2 rectangles: Крутимся и Длительность, then it was successful + # If there are only 2 rectangles: Spinning и Duration, then the operation was successful if len(final_rectangles) == 2: - # x, y, w, h = max(final_rectangles, key=lambda item: item[1]) x, y, w, h = final_rectangles[1] return block_roi[y + 8 : y + h - 8, x + 5 : x + w - 5] - # if there is only one rectangle: Крутимся, then look to the left + # If there is one rectangle: Spinning, look to the left if len(final_rectangles) == 1: x, y, w, h = final_rectangles[0] if ( @@ -158,7 +157,7 @@ def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: ): return block_roi[y + 8 : y + h - 8, x + w + 20 : x + w + 20 + 50] - raise Exception("length roi wasn't found") + raise Exception("Length roi not found") def _detect_length(self, roi: np.ndarray) -> int: candidates = [] @@ -173,12 +172,12 @@ def _detect_length(self, roi: np.ndarray) -> int: candidates.append(int(matches[0])) if len(candidates) == 0: - raise Exception(f"no candidates") + raise Exception("No candidates") length, total = collections.Counter(candidates).most_common(1)[0] logger.info("Length candidates", extra={"candidates": candidates}) if total < 2: - raise Exception(f"not enough the candidates of {length}") + raise Exception(f"Not enough candidates of {length}") return length @@ -205,21 +204,21 @@ def _extract_raw_lines(self) -> np.ndarray: height, width = image.shape[:2] mask = np.ones((height, width), dtype=np.uint8) * 255 - # fill out the text on the wheel + # Fill out the text on the wheel gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) _, thresh = cv2.threshold(gray, 215, 255, cv2.THRESH_BINARY) contours, _ = cv2.findContours(thresh, cv2.RETR_TREE, cv2.CHAIN_APPROX_SIMPLE) for i, contour in enumerate(contours): x, y, w, h = cv2.boundingRect(contour) - # filter out the contour that may contain text + # Filter out contours that may contain text if w + h < 60: cv2.drawContours(mask, contours, i, 0, thickness=cv2.FILLED) - # fill out the circle center on the wheel and around that center space to avoid collisions + # Fill in the circle center on the wheel and the surrounding space to avoid collisions _, _, radius = self._wheel cv2.circle(mask, (radius, radius), int(0.5 * radius), 0, thickness=cv2.FILLED) - # mask is ready, apply to the image to extact lines + # Mask is ready, apply it to the image to extract lines masked = cv2.bitwise_and(image, image, mask=mask) masked_gray = cv2.cvtColor(masked, cv2.COLOR_BGR2GRAY) _, masked_thresh = cv2.threshold(masked_gray, 215, 255, cv2.THRESH_BINARY) @@ -282,7 +281,7 @@ def _uniq_lines( uniq_lines_: List[tuple[np.ndarray, float, float, np.ndarray]] = [] for group in groups: - closest_line = min(group, key=lambda p: p[1]) # Line with the smallest distance + closest_line = min(group, key=lambda p: p[1]) # Choose the line with the smallest distance to the center a, b, c = closest_line d = _to_prolong_line(a) uniq_lines_.append((a, b, c, d)) @@ -311,7 +310,7 @@ def extract_sectors(self) -> List[tuple[float, float]]: start_angle = angles[i] end_angle = angles[(i + 1) % len(angles)] - # normalize angle relatively to North (counterclockwise) + # Normalize the angle relative to North (counterclockwise) normalized_start_angle = (3 * math.pi / 2 - start_angle) % (2 * math.pi) normalized_end_angle_deg = (3 * math.pi / 2 - end_angle) % (2 * math.pi) @@ -330,19 +329,19 @@ def is_init_frame(self) -> bool: try: text = self.detect_lot_name() except pytesseract.TesseractNotFoundError: - logger.error("Tesseract was not found") + logger.error("Tesseract not found") raise except Exception: return False - # search substring 'победитель' или 'winner' in the frame. This prevents some false parsing + # Searching for the substring 'winner' in the frame to prevent false parsing return any(substring in text.lower() for substring in self._init_text) def is_spin_frame(self) -> bool: try: text = self.detect_lot_name() except pytesseract.TesseractNotFoundError: - logger.error("Tesseract was not found") + logger.error("Tesseract not found") raise except Exception as e: return False @@ -364,7 +363,6 @@ def is_circle(self): ellipse = cv2.fitEllipse(contour) (x, y), (major_axis, minor_axis), angle = ellipse return abs(major_axis - minor_axis) / max(major_axis, minor_axis) < 0.5 - # return abs(major_axis - minor_axis) / max(major_axis, minor_axis) < 0.01 def extract_circle_content(self, keep_center: bool) -> np.ndarray: center_x, center_y, radius = self.wheel @@ -405,11 +403,11 @@ def calculate_rotation_with(self, another_frame: stream.Frame) -> float: points2 = np.float64([keypoints2[m.trainIdx].pt for m in matches]) if len(points1) < 4: - raise Exception("Not enough matches to compute homography.") + raise Exception("Not enough matches to compute homography") matrix, _ = cv2.estimateAffinePartial2D(points1, points2) if matrix is None: - raise Exception("Homography matrix could not be computed.") + raise Exception("Homography matrix could not be computed") angle = np.arctan2(matrix[1, 0], matrix[0, 0]) * (180 / np.pi) another_frame._rotation_angle = float(angle if angle >= 0 else angle + 360) diff --git a/stream/reader.py b/stream/reader.py index be8ce50..af1359d 100644 --- a/stream/reader.py +++ b/stream/reader.py @@ -54,7 +54,7 @@ def read(self, sec: int) -> List[stream.Frame]: frame = self._stream.read() if frame is None: self._can_read = False - logger.error("No frame was read", extra={"frame": self._frame_index}) + logger.error("No frame read", extra={"frame": self._frame_index}) break frame_buffer.append(stream.Frame(frame, self._frame_index)) @@ -75,11 +75,11 @@ def read_until_spin_found(self, sec: int) -> List[stream.Frame]: if last_frame.is_spin_frame(): if len(buffer) < 3: raise Exception("Buffer has less than 3 frames") - # Take a frame before the previous one, to avoid a case when two serial frames are the same frames. + # Take the frame before the previous one to avoid the case when two consecutive frames are identical prev_frame = buffer[-3] try: diff = last_frame.calculate_rotation_with(prev_frame) - # else delta is so small then it's not spin + # Otherwise, if delta is too small, it's not considered a spin if min(diff, abs(diff - 360)) > 0.01: break @@ -91,10 +91,6 @@ def read_until_spin_found(self, sec: int) -> List[stream.Frame]: pass logger.warning(f"Frame on {last_frame.index / self._fps}s ({last_frame.index}f) is not a spin") - # Nasty optimization for all other wheels - # for frame in buffer: - # frame.force_set_wheel(last_frame.wheel) - return buffer def skip(self, sec: int) -> None: diff --git a/stream/segment.py b/stream/segment.py index ad32ed5..62bb247 100644 --- a/stream/segment.py +++ b/stream/segment.py @@ -46,20 +46,19 @@ def _detect_length(self) -> int | List[int]: self._populate_first_spins() try: - # The first spin frame. length = self._first_spins_buffer[0].detect_length() - logger.info(f"Length was detected via screen parsing: {length}") + logger.info(f"Length detected via screen parsing: {length}") return length except Exception as e: - logger.error(f"The length wasn't detected via screen parsing: {e}") + logger.error(f"The length not detected via screen parsing: {e}") if config.SPIN_DETECT_LENGTH: try: - length = self._populate_first_n_spins(config.SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION) - logger.info(f"Length was detected via wheel spin: {length}") + length = self._populate_first_n_spins() + logger.info(f"Length detected via wheel spin: {length}") return length except Exception as e: - logger.error(f"The length wasn't detected via wheel spin: {e}") + logger.error(f"The length not detected via wheel spin: {e}") if config.ASK_LENGTH: # Manual enter @@ -71,33 +70,31 @@ def _detect_length(self) -> int | List[int]: if response.isdigit(): length = int(response) - logger.info(f"Length was set to {length}") + logger.info(f"Length set to {length}") return length - raise Exception("cannot detect the length") + raise Exception("Cannot detect the length") def _populate_first_spins(self) -> None: if len(self._first_spins_buffer) != 0: return buffer = self._reader.read_until_spin_found(config.READ_STEP) - logger.info("Spin with a lot name was found") + logger.info("Spin with the lot name found") _init_frame_index = self._binary_search(buffer) if _init_frame_index is None: - raise Exception("no init frame") + raise Exception("No inital frame") self._first_spins_buffer = buffer[_init_frame_index + 1 :] self._init_frame = buffer[_init_frame_index] - logger.info("Init frame was found", extra={"frame": {self._init_frame.index}}) + logger.info("Initial frame found", extra={"frame": {self._init_frame.index}}) if self._init_frame.is_circle(): - logger.error("Circle is ellipse? The result could be vary") + logger.error("Circle is an ellipse. Result may vary") # TODO: why was it added? self._init_frame.wheel[2] -= 1 - # if config.NASTY_OPTIMIZATION: - # [frame.force_set_wheel(self._init_frame.wheel) for frame in self._first_spins_buffer] def _populate_first_spins_with_length(self, length: int) -> None: if len(self._first_spins_buffer) == 0: @@ -110,28 +107,7 @@ def _populate_first_spins_with_length(self, length: int) -> None: sec = math.ceil(max(first_wheel_frames - len(self._first_spins_buffer), 0) / self._reader.fps) self._first_spins_buffer.extend(self._reader.read(sec)) - def _populate_first_n_spins(self, max_spins: int) -> int: - def _binary_search(part: List[stream.Frame]) -> Optional[int]: - low = 0 - high = len(part) - 1 - while low <= high: - mid = (low + high) // 2 - # if mid == len(part) - 1: - # return mid - if mid == 0: - return None - angle = self._init_frame.calculate_rotation_with(part[mid]) - prev_angle = self._init_frame.calculate_rotation_with(part[mid - 1]) - if angle < prev_angle and abs(360.0 + angle - prev_angle) % 360.0 < 60.0: - return mid - - if angle > self._init_frame.calculate_rotation_with(part[-1]): - low = mid + 1 - else: - high = mid - 1 - - return None - + def _populate_first_n_spins(self) -> int: def _find_new_spin_frame(part: List[stream.Frame], reverse: bool) -> int: iteration = range(1, len(part)) if reverse is False else range(len(part) - 1, 0, -1) for i in iteration: @@ -140,9 +116,9 @@ def _find_new_spin_frame(part: List[stream.Frame], reverse: bool) -> int: if angle < prev_angle and abs(360.0 + angle - prev_angle) % 360.0 < 60.0: return i - raise Exception("there is no new spin") + raise Exception("No new spins") - def _exclude_not_matched_second(buffer: List[stream.Frame], index: int, idx: int, seconds: List[int]): + def _exclude_not_matched_seconds(buffer: List[stream.Frame], index: int, idx: int, seconds: List[int]): part_behind = buffer[max(0, index - config.FRAMES_STEP_FOR_LENGTH_DETECTION) : index + 1] new_spin_index = _find_new_spin_frame(part_behind, True) new_spin_idx = idx - (len(part_behind) - new_spin_index - 1) @@ -150,7 +126,7 @@ def _exclude_not_matched_second(buffer: List[stream.Frame], index: int, idx: int to_remove = [] for sec in seconds: if spins > round(sec * 270 / 360): - logger.error("there is no point to look further", extra={"sec": sec, "spins": spins}) + logger.error("No point to look further", extra={"sec": sec, "spins": spins}) continue min_range, max_range = utils.range(sec) @@ -190,21 +166,21 @@ def _exclude_not_matched_second(buffer: List[stream.Frame], index: int, idx: int frame.force_set_wheel(self._init_frame.wheel) angle = self._init_frame.calculate_rotation_with(frame) except Exception as e: - logger.error("cannot calculate angle", extra={"frame_id": idx, "e": e}) + logger.error("Cannot calculate angle", extra={"frame_id": idx, "e": e}) continue if angle < prev_angle: spins += 1 - _exclude_not_matched_second(self._first_spins_buffer, index, idx, seconds) - if spins > max_spins or idx > 600: + _exclude_not_matched_seconds(self._first_spins_buffer, index, idx, seconds) + if spins > config.SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION or idx > config.SPIN_DETECTION_FRAMES_LIMIT: break if len(seconds) == 1: - logger.info("Only one length candidate has left") + logger.info("One length candidate left") break prev_angle = angle if len(seconds) == 0: - raise Exception(f"There are no length candidates") + raise Exception("No length candidates") seconds.sort() @@ -249,7 +225,7 @@ def _filter_anomalies_linear(self, predicted_angles: List[float], deviation: flo filtered.append(predicted_angles[j]) if len(filtered) == 0: - raise Exception(f"mean is nan. angle set: {predicted_angles}") + raise Exception(f"Mean is NaN. Angle set: {predicted_angles}") return float(np.mean(filtered)) @@ -273,7 +249,7 @@ def _calc_mean_angle(self, length: int, angles_window: List[(int, float)]) -> Op mean_angle = self._filter_anomalies_linear(predicted_angles, 2.5) return mean_angle except Exception as e: - logger.warning(f"[{length}s] fail to calculate the mean", extra={"e": e}) + logger.warning(f"[{length}s] mean calculation failed", extra={"e": e}) return None @@ -284,7 +260,7 @@ def detect_winner(self): min_length, max_length = min(length), max(length) length_candidates = length length_candidates = sorted(set(l for length in length_candidates for l in range(length - 1, length + 1))) - logger.info(f"Speculate the possible length: {length_candidates}") + logger.info(f"Speculate possible length: {length_candidates}") self._populate_first_spins_with_length(max_length) @@ -293,7 +269,6 @@ def detect_winner(self): logger.info(f"Total lots: {len(sectors)}") idx = 0 - # min_range, max_range = utils.range(length) max_read_frames = min_length * self._reader.fps max_skip_frames = max( self._reader.fps * config.MIN_SKIP_OF_WHEEL_SPIN * max_length, self._reader.fps * config.MIN_SKIP_SEC @@ -301,17 +276,11 @@ def detect_winner(self): skipped_frames = 0 buffer = self._first_spins_buffer - # predicted_angles = {length: [] for length in length_candidates} - # predicted_angles = {} angles_window: List[(int, float)] = [] prev_mean_angle = {} logger.info(f"Skipping {round(max_skip_frames / self._reader.fps, 2)}s") - # optimization - # self._reader.skip(math.ceil(max_skip_frames / self._reader.fps)) - # skipped_frames = max_skip_frames - while True: for frame in buffer: idx += 1 @@ -319,19 +288,14 @@ def detect_winner(self): skipped_frames += 1 continue - # Collect a window with angles. The size of window is config.ANGLE_WINDOW_LEN + # Collect the window of angles try: if config.NASTY_OPTIMIZATION: frame.force_set_wheel(self._init_frame.wheel) angle = self._init_frame.calculate_rotation_with(frame) - angles_window.append( - ( - idx, - angle, - ) - ) + angles_window.append((idx, angle)) except Exception as e: - logger.error("cannot calculate angle", extra={"frame_id": idx, "e": e}) + logger.error("Cannot calculate angle", extra={"frame_id": idx, "e": e}) angles_window = [] max_skip_frames += config.CALCULATION_STEP @@ -340,32 +304,31 @@ def detect_winner(self): if len(angles_window) <= config.ANGLE_WINDOW_LEN: continue - # Examine each length candidate and drop + # Examine each length candidate separatly for length in length_candidates: mean_angle = self._calc_mean_angle(length, angles_window) if mean_angle is None: continue - # Remove the candidate if it doesn't fit to the final angle + # Remove the candidate if it doesn't match to the target angle if length in prev_mean_angle: diff = abs(mean_angle - prev_mean_angle[length]) if min(diff, 360.0 - diff) > config.MAX_MEAN_ANGLE_DELTA: length_candidates.remove(length) logger.error( - f"Looks like the length {length}s was detected incorrectly. Remove it from the candidates." + f"Length {length}s detected incorrectly. Remove it from the candidates" ) - # if only one left, use it as the main length if len(length_candidates) == 1: max_read_frames = length_candidates[0] * self._reader.fps - logger.info(f"The length is: {length_candidates[0]}") + logger.info(f"Length is: {length_candidates[0]}") prev_mean_angle[length] = mean_angle if len(length_candidates) == 0: - raise Exception("no length candidates have left") + raise Exception("No length candidates left") - # if only one length candidate has left use it as the main length + # If only one length candidate is left, use it as the main length if len(length_candidates) == 1: - # dirty hack + # hack if length_candidates[0] not in prev_mean_angle: continue mean_angle = prev_mean_angle[length_candidates[0]] diff --git a/tests/stream/test_sectors.py b/tests/stream/test_sectors.py index b5dc00c..a4e3014 100644 --- a/tests/stream/test_sectors.py +++ b/tests/stream/test_sectors.py @@ -21,8 +21,6 @@ def test_extract_sectors(self): if "_comment" in obj: self.skipTest(obj["_comment"]) - # if obj['path'] != 'a1.png': - # self.skipTest('Looking for another') print(f"start:{obj['path']}") image_path = os.path.join(os.path.dirname(__file__), "testdata/extract_sectors", obj["path"]) raw = cv2.imread(image_path, cv2.IMREAD_COLOR) diff --git a/utils/visualizer.py b/utils/visualizer.py index a7757c8..dcdfcc9 100644 --- a/utils/visualizer.py +++ b/utils/visualizer.py @@ -37,7 +37,7 @@ def draw_lines(frame: np.ndarray, lines: np.ndarray, title: str = "Lines") -> No elif len(line) == 4: x1, y1, x2, y2 = line else: - raise Exception("invalid format") + raise Exception("Invalid format") cv2.line(image, (x1, y1), (x2, y2), colors[i], 1) @@ -89,7 +89,7 @@ def _denormalize_angle(angle: float) -> float: deferred_calls = [] for i, sector in enumerate(sectors): start_angle, end_angle = sector - # Make a text before normalization + # Create the text before normalization text = f"{round(start_angle, 2)}-{round(end_angle, 2)}" denormalized_end_angle_deg = _denormalize_angle(start_angle) @@ -102,7 +102,7 @@ def _denormalize_angle(angle: float) -> float: y = int(radius + radius * 0.8 * np.sin(np.radians(mid_angle))) text_angle = -mid_angle if mid_angle <= 90 or mid_angle >= 270 else (180 - mid_angle) - # draw a conu part + # Draw the cone part cv2.ellipse( image, (radius, radius), @@ -114,7 +114,7 @@ def _denormalize_angle(angle: float) -> float: -1, cv2.LINE_AA, ) - # the text will be drawn latter + # The text will be drawn latter deferred_calls.append(partial(_draw_rotated_text, image, text, (x, y), text_angle)) [call() for call in deferred_calls] From 732037acfbb861d7dffc280a6d7739fb8caa7d6a Mon Sep 17 00:00:00 2001 From: Robo Loop Date: Mon, 17 Mar 2025 21:00:06 +0000 Subject: [PATCH 2/2] Ubiquitous naming --- README.md | 6 +- config/__init__.py | 48 +++++----- config/config.py | 27 +++--- stream/frame.py | 41 +++++---- stream/segment.py | 179 ++++++++++++++++++------------------ tests/stream/test_length.py | 2 +- utils.sh | 2 +- utils/spin.py | 8 +- 8 files changed, 153 insertions(+), 160 deletions(-) diff --git a/README.md b/README.md index d61ad60..e8e501a 100644 --- a/README.md +++ b/README.md @@ -30,15 +30,15 @@ The program workflow: Picking a frame from the spinning sequence: - - $x_{i} = \frac{l_{i}}{l}$, where `lᵢ` is the elapsed time, and `l` is the total spin duration + - $x_{i} = \frac{d_{i}}{d}$, where `dᵢ` is the elapsed time, and `d` is the total spin duration - $y_{i} = \frac{a_{i}}{a}$, where `aᵢ` is the elapsed angular displacement (including full rotations) from initial state, and `a` is the target angle - $y_{i} = bezier(x_{i})$, where the `bezier` function maps the elapsed time to the corresponding elapsed angle, both normalized to a scale from 0 to 1 - - The target angle is computed as: $a = \frac{a_{i}}{bezier(\frac{l_{i}}{l})}$ + - The target angle is computed as: $a = \frac{a_{i}}{bezier(\frac{d_{i}}{d})}$ 4. Collecting lot names and their positions on the wheel - The initial frame is used to determine sector boundaries (collected in format start_angle and end_angle) - - The first wheel spin is analyzed to extract sector names (obtained from the text displayed above the wheel) + - The initial wheel spin is analyzed to extract sector names (obtained from the text displayed above the wheel) 5. Refining calculations for greater accuracy diff --git a/config/__init__.py b/config/__init__.py index cc2ccea..d91ddd2 100644 --- a/config/__init__.py +++ b/config/__init__.py @@ -1,20 +1,18 @@ from .config import ( - ANGLE_WINDOW_LEN, - ASK_LENGTH, + ANGLE_WINDOW_SIZE, + PROMPT_FOR_DURATION, CALCULATION_STEP, - EXCLUDE_SECONDS_RANGE, - FRAMES_STEP_FOR_LENGTH_DETECTION, - LOOK_BEHIND_FRAMES_FOR_LENGTH_DETECTION, - MAX_MEAN_ANGLE_DELTA, - MIN_SKIP_OF_WHEEL_SPIN, - MIN_SKIP_SEC, - NASTY_OPTIMIZATION, + DURATION_DETECTION_TIME_RANGE, + DURATION_DETECTION_FRAME_STEP, + MAX_ANGLE_DELTA, + MIN_WHEEL_SPIN_SKIP_RATIO, + MIN_SKIP_DURATION, + SPECULATIVE_OPTIMIZATION, READ_STEP, SPIN_BUFFER_SIZE, - SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION, - SPIN_DETECTION_FRAMES_LIMIT, - SPIN_DETECT_LENGTH, - TESSERACT_LANG, + DURATION_DETECTION_MAX_SPINS, + DURATION_DETECTION_MAX_FRAMES, + TESSERACT_LANGUAGE, VISUALIZATION_ENABLED, ) from .logger_config import add_global_event_time, setup_logger, update_global_event_time @@ -24,20 +22,18 @@ "update_global_event_time", "add_global_event_time", "READ_STEP", - "ANGLE_WINDOW_LEN", + "ANGLE_WINDOW_SIZE", "CALCULATION_STEP", "SPIN_BUFFER_SIZE", - "MIN_SKIP_OF_WHEEL_SPIN", - "MIN_SKIP_SEC", - "MAX_MEAN_ANGLE_DELTA", - "NASTY_OPTIMIZATION", - "SPIN_DETECT_LENGTH", - "SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION", - "SPIN_DETECTION_FRAMES_LIMIT", - "FRAMES_STEP_FOR_LENGTH_DETECTION", - "LOOK_BEHIND_FRAMES_FOR_LENGTH_DETECTION", - "EXCLUDE_SECONDS_RANGE", - "ASK_LENGTH", - "TESSERACT_LANG", + "MIN_WHEEL_SPIN_SKIP_RATIO", + "MIN_SKIP_DURATION", + "MAX_ANGLE_DELTA", + "SPECULATIVE_OPTIMIZATION", + "DURATION_DETECTION_MAX_SPINS", + "DURATION_DETECTION_MAX_FRAMES", + "DURATION_DETECTION_FRAME_STEP", + "DURATION_DETECTION_TIME_RANGE", + "PROMPT_FOR_DURATION", + "TESSERACT_LANGUAGE", "VISUALIZATION_ENABLED", ] diff --git a/config/config.py b/config/config.py index 5ed668c..608a0fa 100644 --- a/config/config.py +++ b/config/config.py @@ -1,22 +1,21 @@ READ_STEP = 2 -ANGLE_WINDOW_LEN = 30 -CALCULATION_STEP = ANGLE_WINDOW_LEN * 3 +ANGLE_WINDOW_SIZE = 30 +CALCULATION_STEP = ANGLE_WINDOW_SIZE * 3 SPIN_BUFFER_SIZE = 3 # The calculations are most accurate at 1/3 and fairly accurate at 1/4. -MIN_SKIP_OF_WHEEL_SPIN = 1 / 3 -MIN_SKIP_SEC = 10 -MAX_MEAN_ANGLE_DELTA = 10.0 -TESSERACT_LANG = "rus+eng" # "eng" +MIN_WHEEL_SPIN_SKIP_RATIO = 1 / 3 +MIN_SKIP_DURATION = 10 +MAX_ANGLE_DELTA = 10.0 -NASTY_OPTIMIZATION = True +TESSERACT_LANGUAGE = "rus+eng" # "eng" -SPIN_DETECT_LENGTH = True -SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION = 10 -SPIN_DETECTION_FRAMES_LIMIT = 600 -FRAMES_STEP_FOR_LENGTH_DETECTION = 10 -LOOK_BEHIND_FRAMES_FOR_LENGTH_DETECTION = 5 -EXCLUDE_SECONDS_RANGE = (30, 180) -ASK_LENGTH = True +SPECULATIVE_OPTIMIZATION = True +DURATION_DETECTION_MAX_SPINS = 10 +DURATION_DETECTION_MAX_FRAMES = 600 +DURATION_DETECTION_FRAME_STEP = 10 +DURATION_DETECTION_TIME_RANGE = (30, 180) + +PROMPT_FOR_DURATION = True VISUALIZATION_ENABLED = False diff --git a/stream/frame.py b/stream/frame.py index cbc480f..3347ce6 100644 --- a/stream/frame.py +++ b/stream/frame.py @@ -24,7 +24,7 @@ class Frame: _wheel: Optional[np.ndarray] _lot_name: Optional[str] _rotation_angle: Optional[float] - _init_text = ["победитель", "winner"] + _initial_text = ["winner", "победитель"] def __init__(self, frame: np.ndarray, index: int): self._frame = frame @@ -89,14 +89,14 @@ def detect_lot_name(self) -> str: utils.visualize(text_roi, "text roi before preprocessing") - text = pytesseract.image_to_string(thresh, lang=config.TESSERACT_LANG, config="--psm 6") + text = pytesseract.image_to_string(thresh, lang=config.TESSERACT_LANGUAGE, config="--psm 6") logger.info("Lot name detected", extra={"text": text.strip(), "frame": {self._index}}) self._lot_name = text.strip() return self._lot_name - def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: + def _find_duration_block(self, block_roi: np.ndarray) -> np.ndarray: gray = cv2.cvtColor(block_roi, cv2.COLOR_BGR2GRAY) # 50 and 100 are heuristics value. For some streams, 5 and 10 should be used instead candidates = [(50, 100), (5, 10)] @@ -139,7 +139,7 @@ def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: # If there are only 3 rectangles: Spin, From и To, then the operation was not successful if len(final_rectangles) == 3: - raise Exception("Range length detected") + raise Exception("Range duration detected") # If there are only 2 rectangles: Spinning и Duration, then the operation was successful if len(final_rectangles) == 2: @@ -157,9 +157,9 @@ def _find_length_section(self, block_roi: np.ndarray) -> np.ndarray: ): return block_roi[y + 8 : y + h - 8, x + w + 20 : x + w + 20 + 50] - raise Exception("Length roi not found") + raise Exception("Duration roi not found") - def _detect_length(self, roi: np.ndarray) -> int: + def _detect_duration(self, roi: np.ndarray) -> int: candidates = [] for threshold in range(170, 140, -5): _, thresh = cv2.threshold(roi, threshold, 255, cv2.THRESH_BINARY) @@ -174,14 +174,14 @@ def _detect_length(self, roi: np.ndarray) -> int: if len(candidates) == 0: raise Exception("No candidates") - length, total = collections.Counter(candidates).most_common(1)[0] - logger.info("Length candidates", extra={"candidates": candidates}) + duration, total = collections.Counter(candidates).most_common(1)[0] + logger.info("Duration candidates", extra={"candidates": candidates}) if total < 2: - raise Exception(f"Not enough candidates of {length}") + raise Exception(f"Not enough candidates of {duration}") - return length + return duration - def detect_length(self) -> int: + def detect_duration(self) -> int: circle = self.detect_wheel() center_x, center_y, radius = circle @@ -191,13 +191,13 @@ def detect_length(self) -> int: roi_x_end = min(self._frame.shape[1], center_x + radius + radius) block_roi = self._frame[roi_y_start:roi_y_end, roi_x_start:roi_x_end] - utils.visualize(block_roi, "Length section") - rect = self._find_length_section(block_roi) - utils.visualize(rect, "Cropped length section") + utils.visualize(block_roi, "Duration section") + rect = self._find_duration_block(block_roi) + utils.visualize(rect, "Cropped duration section") - length = self._detect_length(rect) + duration = self._detect_duration(rect) - return length + return duration def _extract_raw_lines(self) -> np.ndarray: image = self.extract_circle_content(True) @@ -281,7 +281,8 @@ def _uniq_lines( uniq_lines_: List[tuple[np.ndarray, float, float, np.ndarray]] = [] for group in groups: - closest_line = min(group, key=lambda p: p[1]) # Choose the line with the smallest distance to the center + # Choose the line with the smallest distance to the center + closest_line = min(group, key=lambda p: p[1]) a, b, c = closest_line d = _to_prolong_line(a) uniq_lines_.append((a, b, c, d)) @@ -325,7 +326,7 @@ def extract_sectors(self) -> List[tuple[float, float]]: return sectors - def is_init_frame(self) -> bool: + def is_initial_frame(self) -> bool: try: text = self.detect_lot_name() except pytesseract.TesseractNotFoundError: @@ -335,7 +336,7 @@ def is_init_frame(self) -> bool: return False # Searching for the substring 'winner' in the frame to prevent false parsing - return any(substring in text.lower() for substring in self._init_text) + return any(substring in text.lower() for substring in self._initial_text) def is_spin_frame(self) -> bool: try: @@ -346,7 +347,7 @@ def is_spin_frame(self) -> bool: except Exception as e: return False - return not any(substring in text.lower() for substring in self._init_text) + return not any(substring in text.lower() for substring in self._initial_text) # TODO: doesn't work right. It finds a strange contours def is_circle(self): diff --git a/stream/segment.py b/stream/segment.py index 62bb247..c1f0629 100644 --- a/stream/segment.py +++ b/stream/segment.py @@ -24,7 +24,7 @@ class Segment: def __init__(self, reader: stream.Reader): self._reader = reader self._first_spins_buffer: List[stream.Frame] = [] - self._init_frame: Optional[stream.Frame] = None + self._initial_frame: Optional[stream.Frame] = None self._circle_sectors: Optional[stream.CircleSectors] = None def _binary_search(self, segment: List[stream.Frame]) -> Optional[int]: @@ -33,7 +33,7 @@ def _binary_search(self, segment: List[stream.Frame]) -> Optional[int]: while low <= high: mid = (low + high) // 2 - if segment[mid].is_init_frame(): + if segment[mid].is_initial_frame(): if mid == len(segment) - 1 or segment[mid + 1].is_spin_frame(): return mid low = mid + 1 @@ -42,39 +42,38 @@ def _binary_search(self, segment: List[stream.Frame]) -> Optional[int]: return None - def _detect_length(self) -> int | List[int]: + def _detect_duration(self) -> int | List[int]: self._populate_first_spins() try: - length = self._first_spins_buffer[0].detect_length() - logger.info(f"Length detected via screen parsing: {length}") - return length + duration = self._first_spins_buffer[0].detect_duration() + logger.info(f"Duration detected via screen parsing: {duration}") + return duration except Exception as e: - logger.error(f"The length not detected via screen parsing: {e}") + logger.error(f"Duration not detected via screen parsing: {e}") - if config.SPIN_DETECT_LENGTH: - try: - length = self._populate_first_n_spins() - logger.info(f"Length detected via wheel spin: {length}") - return length - except Exception as e: - logger.error(f"The length not detected via wheel spin: {e}") + try: + duration = self._populate_first_n_spins() + logger.info(f"Duration detected via wheel spin: {duration}") + return duration + except Exception as e: + logger.error(f"Duration not detected via wheel spin: {e}") - if config.ASK_LENGTH: + if config.PROMPT_FOR_DURATION: # Manual enter with open("/dev/tty", "r+") as tty: - tty.write("Cannot detect length. Manual enter: ") + tty.write("Cannot detect duration. Manual enter: ") tty.flush() response = tty.readline().strip() tty.write("") if response.isdigit(): - length = int(response) - logger.info(f"Length set to {length}") + duration = int(response) + logger.info(f"Duration set to {duration}") - return length + return duration - raise Exception("Cannot detect the length") + raise Exception("Cannot detect the duration") def _populate_first_spins(self) -> None: if len(self._first_spins_buffer) != 0: @@ -83,26 +82,25 @@ def _populate_first_spins(self) -> None: buffer = self._reader.read_until_spin_found(config.READ_STEP) logger.info("Spin with the lot name found") - _init_frame_index = self._binary_search(buffer) - if _init_frame_index is None: - raise Exception("No inital frame") - self._first_spins_buffer = buffer[_init_frame_index + 1 :] - self._init_frame = buffer[_init_frame_index] - logger.info("Initial frame found", extra={"frame": {self._init_frame.index}}) - if self._init_frame.is_circle(): + _initial_frame_index = self._binary_search(buffer) + if _initial_frame_index is None: + raise Exception("No initial frame found") + self._first_spins_buffer = buffer[_initial_frame_index + 1 :] + self._initial_frame = buffer[_initial_frame_index] + logger.info("Initial frame found", extra={"frame": {self._initial_frame.index}}) + if self._initial_frame.is_circle(): logger.error("Circle is an ellipse. Result may vary") # TODO: why was it added? - self._init_frame.wheel[2] -= 1 - + self._initial_frame.wheel[2] -= 1 - def _populate_first_spins_with_length(self, length: int) -> None: + def _populate_first_spins_with_duration(self, duration: int) -> None: if len(self._first_spins_buffer) == 0: self._populate_first_spins() - min_range, max_range = utils.range(length) + min_range, max_range = utils.range(duration) first_wheel_frames = math.ceil( - utils.calculate_x(config.SPIN_BUFFER_SIZE * 360.0 / min_range) * self._reader.fps * length + utils.calculate_x(config.SPIN_BUFFER_SIZE * 360.0 / min_range) * self._reader.fps * duration ) sec = math.ceil(max(first_wheel_frames - len(self._first_spins_buffer), 0) / self._reader.fps) self._first_spins_buffer.extend(self._reader.read(sec)) @@ -111,15 +109,15 @@ def _populate_first_n_spins(self) -> int: def _find_new_spin_frame(part: List[stream.Frame], reverse: bool) -> int: iteration = range(1, len(part)) if reverse is False else range(len(part) - 1, 0, -1) for i in iteration: - angle = self._init_frame.calculate_rotation_with(part[i]) - prev_angle = self._init_frame.calculate_rotation_with(part[i - 1]) + angle = self._initial_frame.calculate_rotation_with(part[i]) + prev_angle = self._initial_frame.calculate_rotation_with(part[i - 1]) if angle < prev_angle and abs(360.0 + angle - prev_angle) % 360.0 < 60.0: return i raise Exception("No new spins") def _exclude_not_matched_seconds(buffer: List[stream.Frame], index: int, idx: int, seconds: List[int]): - part_behind = buffer[max(0, index - config.FRAMES_STEP_FOR_LENGTH_DETECTION) : index + 1] + part_behind = buffer[max(0, index - config.DURATION_DETECTION_FRAME_STEP) : index + 1] new_spin_index = _find_new_spin_frame(part_behind, True) new_spin_idx = idx - (len(part_behind) - new_spin_index - 1) @@ -150,7 +148,7 @@ def _exclude_not_matched_seconds(buffer: List[stream.Frame], index: int, idx: in idx = 0 spins = 0 prev_angle = 0.0 - starts, ends = config.EXCLUDE_SECONDS_RANGE + starts, ends = config.DURATION_DETECTION_TIME_RANGE seconds = np.arange(starts, ends + 1).tolist() for index, frame in enumerate(self._first_spins_buffer): @@ -158,13 +156,13 @@ def _exclude_not_matched_seconds(buffer: List[stream.Frame], index: int, idx: in self._first_spins_buffer.extend(self._reader.read(config.READ_STEP)) idx += 1 - if idx % config.FRAMES_STEP_FOR_LENGTH_DETECTION != 0: + if idx % config.DURATION_DETECTION_FRAME_STEP != 0: continue try: - if config.NASTY_OPTIMIZATION: - frame.force_set_wheel(self._init_frame.wheel) - angle = self._init_frame.calculate_rotation_with(frame) + if config.SPECULATIVE_OPTIMIZATION: + frame.force_set_wheel(self._initial_frame.wheel) + angle = self._initial_frame.calculate_rotation_with(frame) except Exception as e: logger.error("Cannot calculate angle", extra={"frame_id": idx, "e": e}) continue @@ -172,29 +170,29 @@ def _exclude_not_matched_seconds(buffer: List[stream.Frame], index: int, idx: in if angle < prev_angle: spins += 1 _exclude_not_matched_seconds(self._first_spins_buffer, index, idx, seconds) - if spins > config.SPIN_BUFFER_SIZE_FOR_LENGTH_DETECTION or idx > config.SPIN_DETECTION_FRAMES_LIMIT: + if spins > config.DURATION_DETECTION_MAX_SPINS or idx > config.DURATION_DETECTION_MAX_FRAMES: break if len(seconds) == 1: - logger.info("One length candidate left") + logger.info("One duration candidate left") break prev_angle = angle if len(seconds) == 0: - raise Exception("No length candidates") + raise Exception("No duration candidates") seconds.sort() return seconds - def _add_lot_names_around(self, angle: float, length: int) -> None: - min_range, max_range = utils.range(length) + def _add_lot_names_around(self, angle: float, duration: int) -> None: + min_range, max_range = utils.range(duration) angles = [angle + spins * 360.0 for spins in range(0, config.SPIN_BUFFER_SIZE)] task_queue = queue.Queue() for angle in angles: - start_frame_id = math.floor(utils.calculate_x_gsap(angle / max_range) * self._reader.fps * length) - end_frame_id = math.ceil(utils.calculate_x_gsap(angle / min_range) * self._reader.fps * length) + start_frame_id = math.floor(utils.calculate_x_gsap(angle / max_range) * self._reader.fps * duration) + end_frame_id = math.ceil(utils.calculate_x_gsap(angle / min_range) * self._reader.fps * duration) for frame_id in range(start_frame_id, end_frame_id): frame = self._first_spins_buffer[frame_id] @@ -204,7 +202,7 @@ def _worker(task_queue: queue.Queue) -> None: while not task_queue.empty(): frame = task_queue.get() lot_name = frame.detect_lot_name() - frame_angle = self._init_frame.calculate_rotation_with(frame) + frame_angle = self._initial_frame.calculate_rotation_with(frame) self._circle_sectors.add_lot_name(frame_angle, lot_name) task_queue.task_done() @@ -235,12 +233,12 @@ def _format_lot_name(self, lot: tuple[float, float, str, float]) -> str: return f"{B}[{percent}%{synthetic_suffix}]{E} {G}{lot_name} ({lot_percent}%){E}" - def _calc_mean_angle(self, length: int, angles_window: List[(int, float)]) -> Optional[float]: + def _calc_mean_angle(self, duration: int, angles_window: List[(int, float)]) -> Optional[float]: predicted_angles: List[float] = [] for idx, angle in angles_window: - x = idx / (self._reader.fps * length) + x = idx / (self._reader.fps * duration) y = utils.calculate_y_gsap(x) - min_range, max_range = utils.range(length) + min_range, max_range = utils.range(duration) predicted_spins = math.ceil((y * min_range - angle) / 360) predicted_angle = ((angle + predicted_spins * 360) / y) % 360 predicted_angles.append(predicted_angle) @@ -249,29 +247,30 @@ def _calc_mean_angle(self, length: int, angles_window: List[(int, float)]) -> Op mean_angle = self._filter_anomalies_linear(predicted_angles, 2.5) return mean_angle except Exception as e: - logger.warning(f"[{length}s] mean calculation failed", extra={"e": e}) + logger.warning(f"[{duration}s] mean calculation failed", extra={"e": e}) return None def detect_winner(self): - length = self._detect_length() - min_length, max_length, length_candidates = length, length, [length] - if not isinstance(length, int): - min_length, max_length = min(length), max(length) - length_candidates = length - length_candidates = sorted(set(l for length in length_candidates for l in range(length - 1, length + 1))) - logger.info(f"Speculate possible length: {length_candidates}") + duration = self._detect_duration() + min_duration, max_duration, duration_candidates = duration, duration, [duration] + if not isinstance(duration, int): + min_duration, max_duration = min(duration), max(duration) + duration_candidates = duration + duration_candidates = sorted(set(l for d in duration_candidates for l in range(d - 1, d + 1))) + logger.info(f"Speculate possible duration: {duration_candidates}") - self._populate_first_spins_with_length(max_length) + self._populate_first_spins_with_duration(max_duration) - sectors = self._init_frame.extract_sectors() + sectors = self._initial_frame.extract_sectors() self._circle_sectors = stream.CircleSectors(sectors) logger.info(f"Total lots: {len(sectors)}") idx = 0 - max_read_frames = min_length * self._reader.fps + max_read_frames = min_duration * self._reader.fps max_skip_frames = max( - self._reader.fps * config.MIN_SKIP_OF_WHEEL_SPIN * max_length, self._reader.fps * config.MIN_SKIP_SEC + self._reader.fps * config.MIN_WHEEL_SPIN_SKIP_RATIO * max_duration, + self._reader.fps * config.MIN_SKIP_DURATION, ) skipped_frames = 0 @@ -290,9 +289,9 @@ def detect_winner(self): # Collect the window of angles try: - if config.NASTY_OPTIMIZATION: - frame.force_set_wheel(self._init_frame.wheel) - angle = self._init_frame.calculate_rotation_with(frame) + if config.SPECULATIVE_OPTIMIZATION: + frame.force_set_wheel(self._initial_frame.wheel) + angle = self._initial_frame.calculate_rotation_with(frame) angles_window.append((idx, angle)) except Exception as e: logger.error("Cannot calculate angle", extra={"frame_id": idx, "e": e}) @@ -301,39 +300,37 @@ def detect_winner(self): max_skip_frames += config.CALCULATION_STEP continue - if len(angles_window) <= config.ANGLE_WINDOW_LEN: + if len(angles_window) <= config.ANGLE_WINDOW_SIZE: continue - # Examine each length candidate separatly - for length in length_candidates: - mean_angle = self._calc_mean_angle(length, angles_window) + # Examine each duration candidate separatly + for duration in duration_candidates: + mean_angle = self._calc_mean_angle(duration, angles_window) if mean_angle is None: continue # Remove the candidate if it doesn't match to the target angle - if length in prev_mean_angle: - diff = abs(mean_angle - prev_mean_angle[length]) - if min(diff, 360.0 - diff) > config.MAX_MEAN_ANGLE_DELTA: - length_candidates.remove(length) - logger.error( - f"Length {length}s detected incorrectly. Remove it from the candidates" - ) - if len(length_candidates) == 1: - max_read_frames = length_candidates[0] * self._reader.fps - logger.info(f"Length is: {length_candidates[0]}") - prev_mean_angle[length] = mean_angle - - if len(length_candidates) == 0: - raise Exception("No length candidates left") - - # If only one length candidate is left, use it as the main length - if len(length_candidates) == 1: + if duration in prev_mean_angle: + diff = abs(mean_angle - prev_mean_angle[duration]) + if min(diff, 360.0 - diff) > config.MAX_ANGLE_DELTA: + duration_candidates.remove(duration) + logger.error(f"Duration {duration}s detected incorrectly. Remove it from the candidates") + if len(duration_candidates) == 1: + max_read_frames = duration_candidates[0] * self._reader.fps + logger.info(f"Duration is: {duration_candidates[0]}") + prev_mean_angle[duration] = mean_angle + + if len(duration_candidates) == 0: + raise Exception("No duration candidates left") + + # If only one duration candidate is left, use it as the main duration + if len(duration_candidates) == 1: # hack - if length_candidates[0] not in prev_mean_angle: + if duration_candidates[0] not in prev_mean_angle: continue - mean_angle = prev_mean_angle[length_candidates[0]] + mean_angle = prev_mean_angle[duration_candidates[0]] - self._add_lot_names_around(mean_angle, length) + self._add_lot_names_around(mean_angle, duration) self._circle_sectors.vote(mean_angle) most_voted = self._circle_sectors.most_voted() most_voted_formatted = "\n".join( @@ -344,7 +341,7 @@ def detect_winner(self): logger.info( f"Last voted: {self._format_lot_name(by_angle)}\nMost voted:\n{most_voted_formatted}", extra={ - "sec": f"{int(idx / self._reader.fps)} ({round(idx / (self._reader.fps * length) * 100, 2)}%)", + "sec": f"{int(idx / self._reader.fps)} ({round(idx / (self._reader.fps * duration) * 100, 2)}%)", "angle": f"{round(mean_angle, 2)}", }, ) diff --git a/tests/stream/test_length.py b/tests/stream/test_length.py index 5d39783..79a220d 100644 --- a/tests/stream/test_length.py +++ b/tests/stream/test_length.py @@ -24,7 +24,7 @@ def test_detect_length(self): image_path = os.path.join(os.path.dirname(__file__), "testdata/detect_length", obj["path"]) raw = cv2.imread(image_path, cv2.IMREAD_COLOR) frame = stream.Frame(raw, 0) - length = frame.detect_length() + length = frame.detect_duration() print(f"Length: {obj['length']}. Result: {length}") self.assertEqual(obj["length"], length) diff --git a/utils.sh b/utils.sh index a22d63b..fa10fea 100755 --- a/utils.sh +++ b/utils.sh @@ -265,7 +265,7 @@ run_vod() { streamlink --hls-start-offset "$offset" --hls-duration "$duration" --stdout "$link" best | python main.py winner } -[[ $# -lt 1 ]] && echo "No function was passed" && exit 1 +[[ $# -lt 1 ]] && echo "No function passed" && exit 1 fn="$1" shift "$fn" "$@" diff --git a/utils/spin.py b/utils/spin.py index 8495d53..fb945fb 100644 --- a/utils/spin.py +++ b/utils/spin.py @@ -1,12 +1,12 @@ -def range(length: int) -> tuple[float, float]: +def range(duration: int) -> tuple[float, float]: # https://github.com/Poíntauc/poíntauc_frontend/blob/310893f1b9e58068a9ece793a4b71a6cd11baea1/src/components/BaseWheel/BaseWheel.tsx#L147 - min_range = round(length * 270 / 360) * 360 + min_range = round(duration * 270 / 360) * 360 max_range = min_range + 360 return float(min_range), float(max_range) -def range_with_angle(angle: float, length: int) -> float: - min_range, _ = range(length) +def range_with_angle(angle: float, duration: int) -> float: + min_range, _ = range(duration) return min_range + angle