Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 17 additions & 8 deletions inference/core/interfaces/camera/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import time
from copy import copy
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
Expand Down Expand Up @@ -151,21 +150,31 @@ def retrieve_frames_from_sources(
)
else:
batch_timeout_moment = None
for source_ord, (source, source_should_reconnect) in enumerate(
zip(self._video_sources.all_sources, self._video_sources.allow_reconnection)
):

all_sources = self._video_sources.all_sources
allow_reconnection = self._video_sources.allow_reconnection
total_sources = len(all_sources)

now = datetime.now if batch_timeout_moment is not None else None

for source_ord in range(total_sources):
if self._external_should_stop():
self.join_all_reconnection_threads(include_not_finished=True)
return None
if self._is_source_inactive(source_ord=source_ord):

# Inline _is_source_inactive for loop hot path
if (
source_ord in self._ended_sources
or source_ord in self._reconnection_threads
):
continue
batch_time_left = (
None
if batch_timeout_moment is None
else max((batch_timeout_moment - datetime.now()).total_seconds(), 0.0)
else max((batch_timeout_moment - now()).total_seconds(), 0.0)
)
try:
frame = source.read_frame(timeout=batch_time_left)
frame = all_sources[source_ord].read_frame(timeout=batch_time_left)
if frame is not None:
batch_frames.append(frame)
except EndOfStreamError:
Expand All @@ -178,7 +187,7 @@ def all_sources_ended(self) -> bool:
return len(self._ended_sources) >= len(self._video_sources.all_sources)

def join_all_reconnection_threads(self, include_not_finished: bool = False) -> None:
for source_ord in copy(self._threads_to_join):
for source_ord in set(self._threads_to_join):
self._purge_reconnection_thread(source_ord=source_ord)
if not include_not_finished:
return None
Expand Down
2 changes: 1 addition & 1 deletion inference/core/interfaces/camera/video_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,7 @@ def __init__(
stream_reference: VideoSourceIdentifier,
frames_buffer: Queue,
status_update_handlers: List[Callable[[StatusUpdate], None]],
buffer_consumption_strategy: Optional[BufferConsumptionStrategy],
buffer_consumption_strategy: Optional["BufferConsumptionStrategy"],
video_consumer: "VideoConsumer",
video_source_properties: Optional[Dict[str, float]],
source_id: Optional[int],
Expand Down
Loading