Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 43 additions & 3 deletions aiopppp/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,23 +64,34 @@ class VideoQueueMixin:
def __init__(self, *args, **kwargs):
super().__init__()
self.video_chunk_queue = asyncio.Queue()
self.audio_chunk_queue = asyncio.Queue()
self.frame_buffer = SharedFrameBuffer()
self.process_video_task = None
self.process_audio_task = None
self.last_drw_pkt_idx = 0
self.video_epoch = 0 # number of overflows over 0xffff DRW index

self.video_received = {}
self.video_boundaries = set()
self.last_video_frame = -1
self.last_audio_frame = -1

async def process_video_queue(self):
while True:
pkt_epoch, pkt = await self.video_chunk_queue.get()
await self.handle_incoming_video_packet(pkt_epoch, pkt)

async def process_audio_queue(self):
while True:
pkt_epoch, pkt = await self.audio_chunk_queue.get()
await self.handle_incoming_audio_packet(pkt_epoch, pkt)

def start_video_queue(self):
self.process_video_task = asyncio.create_task(self.process_video_queue())

def start_audio_queue(self):
self.process_audio_task = asyncio.create_task(self.process_audio_queue())

async def handle_incoming_video_packet(self, pkt_epoch, pkt):
video_payload = pkt.get_drw_payload()
# logger.info(f'- video frame {pkt._cmd_idx}')
Expand All @@ -96,6 +107,18 @@ async def handle_incoming_video_packet(self, pkt_epoch, pkt):
self.video_received[video_chunk_idx] = video_payload
await self.process_video_frame()

async def handle_incoming_audio_packet(self, pkt_epoch, pkt):
audio_payload = pkt.get_drw_payload()
audio_marker = b'\x55\xaa\x15\xa8'

if audio_payload.startswith(audio_marker):
actual_payload = audio_payload[0x20:]
else:
actual_payload = audio_payload

# Guess is now: a-law 8000Hz
open('/tmp/test.raw', 'ab').write(actual_payload)

async def process_video_frame(self):
if len(self.video_boundaries) <= 1:
return
Expand Down Expand Up @@ -219,6 +242,7 @@ async def start_video(self):
logger.info('Start video')
self.last_drw_pkt_at = datetime.datetime.now()
await self._request_video(1)
await self._request_audio(1)
self.is_video_requested = True

async def stop_video(self):
Expand All @@ -231,7 +255,10 @@ async def stop_video(self):
self.last_video_frame = -1
while not self.video_chunk_queue.empty():
self.video_chunk_queue.get_nowait()
while not self.audio_chunk_queue.empty():
self.audio_chunk_queue.get_nowait()
await self._request_video(0)
await self._request_audio(0)

async def _request_video(self, mode):
"""
Expand Down Expand Up @@ -319,11 +346,12 @@ def start(self):
self.device_is_ready.clear()
self.start_packet_queue()
self.start_video_queue()
self.start_audio_queue()
self.main_task = asyncio.create_task(self._run())
return self.main_task

def running_tasks(self):
return tuple(x for x in (self.main_task, self.process_packet_task, self.process_video_task) if x)
return tuple(x for x in (self.main_task, self.process_packet_task, self.process_video_task, self.process_audio_task) if x)

def _on_device_lost(self):
logger.warning('Device %s lost', self.dev.dev_id)
Expand All @@ -338,6 +366,7 @@ def stop(self):
self.device_is_ready.set()
self.process_packet_task.cancel()
self.process_video_task.cancel()
self.process_audio_task.cancel()
self.main_task.cancel()
self.transport.close()
self.transport = None
Expand Down Expand Up @@ -417,7 +446,8 @@ async def handle_drw(self, drw_pkt):
self.video_stale_at = None
self.video_chunk_queue.put_nowait((pkt_epoch, drw_pkt))
elif drw_pkt._channel == Channel.Audio:
pass
# logger.debug(f'Got audio data {drw_pkt.get_drw_payload()}')
self.audio_chunk_queue.put_nowait((pkt_epoch, drw_pkt))
elif drw_pkt._channel == Channel.Command:
await self.handle_incoming_command_packet(drw_pkt)

Expand Down Expand Up @@ -482,6 +512,7 @@ async def loop_step(self):
self.video_stale_at = self.last_drw_pkt_at
logger.info('No video for 5 seconds. Re-request video ')
await self._request_video(1)
await self._request_audio(1)
if self.video_stale_at and (datetime.datetime.now() - self.video_stale_at).total_seconds() > 10:
# camera disconnected
logger.warning('No video for 10 seconds. Disconnecting')
Expand Down Expand Up @@ -546,6 +577,8 @@ class BinarySession(Session):
BinaryCommands.CMD_PEER_VIDEOPARAM_SET: BinaryCommands.ACK_PEER_VIDEOPARAM_SET,
BinaryCommands.CMD_PEER_LIVEVIDEO_START: BinaryCommands.ACK_PEER_LIVEVIDEO_START,
BinaryCommands.CMD_PEER_LIVEVIDEO_STOP: BinaryCommands.ACK_PEER_LIVEVIDEO_STOP,
BinaryCommands.CMD_PEER_LIVEAUDIO_START: BinaryCommands.ACK_PEER_LIVEAUDIO_START,
BinaryCommands.CMD_PEER_LIVEAUDIO_STOP: BinaryCommands.ACK_PEER_LIVEAUDIO_STOP,
BinaryCommands.CMD_SYSTEM_STATUS_GET: BinaryCommands.ACK_SYSTEM_STATUS_GET,
}
REV_ACKS = {v: k for k, v in ACKS.items()}
Expand Down Expand Up @@ -583,7 +616,7 @@ async def handle_drw(self, drw_pkt):
self.video_stale_at = None
self.video_chunk_queue.put_nowait((pkt_epoch, drw_pkt))
elif drw_pkt._channel == Channel.Audio:
pass
self.audio_chunk_queue.put_nowait((pkt_epoch, drw_pkt))
elif drw_pkt._channel == Channel.Command:
await self.handle_incoming_command_packet(drw_pkt)

Expand Down Expand Up @@ -678,6 +711,13 @@ async def _request_video(self, mode):
else:
await self.send_command(BinaryCommands.CMD_PEER_LIVEVIDEO_STOP, b'', with_response=True)

async def _request_audio(self, mode):
logger.info('Request audio %s', mode)
if mode:
await self.send_command(BinaryCommands.CMD_PEER_LIVEAUDIO_START, b'', with_response=True)
else:
await self.send_command(BinaryCommands.CMD_PEER_LIVEAUDIO_STOP, b'', with_response=True)

async def login(self):
# type is char account[0x20]; char password[0x80];
payload = struct.pack('>32s128s', self.auth_login.encode('utf-8'), self.auth_password.encode('utf-8'))
Expand Down