diff --git a/aiopppp/session.py b/aiopppp/session.py index 7607923..5e24476 100644 --- a/aiopppp/session.py +++ b/aiopppp/session.py @@ -64,23 +64,34 @@ class VideoQueueMixin: def __init__(self, *args, **kwargs): super().__init__() self.video_chunk_queue = asyncio.Queue() + self.audio_chunk_queue = asyncio.Queue() self.frame_buffer = SharedFrameBuffer() self.process_video_task = None + self.process_audio_task = None self.last_drw_pkt_idx = 0 self.video_epoch = 0 # number of overflows over 0xffff DRW index self.video_received = {} self.video_boundaries = set() self.last_video_frame = -1 + self.last_audio_frame = -1 async def process_video_queue(self): while True: pkt_epoch, pkt = await self.video_chunk_queue.get() await self.handle_incoming_video_packet(pkt_epoch, pkt) + async def process_audio_queue(self): + while True: + pkt_epoch, pkt = await self.audio_chunk_queue.get() + await self.handle_incoming_audio_packet(pkt_epoch, pkt) + def start_video_queue(self): self.process_video_task = asyncio.create_task(self.process_video_queue()) + def start_audio_queue(self): + self.process_audio_task = asyncio.create_task(self.process_audio_queue()) + async def handle_incoming_video_packet(self, pkt_epoch, pkt): video_payload = pkt.get_drw_payload() # logger.info(f'- video frame {pkt._cmd_idx}') @@ -96,6 +107,18 @@ async def handle_incoming_video_packet(self, pkt_epoch, pkt): self.video_received[video_chunk_idx] = video_payload await self.process_video_frame() + async def handle_incoming_audio_packet(self, pkt_epoch, pkt): + audio_payload = pkt.get_drw_payload() + audio_marker = b'\x55\xaa\x15\xa8' + + if audio_payload.startswith(audio_marker): + actual_payload = audio_payload[0x20:] + else: + actual_payload = audio_payload + + # Guess is now: a-law 8000Hz + open('/tmp/test.raw', 'ab').write(actual_payload) + async def process_video_frame(self): if len(self.video_boundaries) <= 1: return @@ -219,6 +242,7 @@ async def start_video(self): logger.info('Start video') self.last_drw_pkt_at = datetime.datetime.now() await self._request_video(1) + await self._request_audio(1) self.is_video_requested = True async def stop_video(self): @@ -231,7 +255,10 @@ async def stop_video(self): self.last_video_frame = -1 while not self.video_chunk_queue.empty(): self.video_chunk_queue.get_nowait() + while not self.audio_chunk_queue.empty(): + self.audio_chunk_queue.get_nowait() await self._request_video(0) + await self._request_audio(0) async def _request_video(self, mode): """ @@ -319,11 +346,12 @@ def start(self): self.device_is_ready.clear() self.start_packet_queue() self.start_video_queue() + self.start_audio_queue() self.main_task = asyncio.create_task(self._run()) return self.main_task def running_tasks(self): - return tuple(x for x in (self.main_task, self.process_packet_task, self.process_video_task) if x) + return tuple(x for x in (self.main_task, self.process_packet_task, self.process_video_task, self.process_audio_task) if x) def _on_device_lost(self): logger.warning('Device %s lost', self.dev.dev_id) @@ -338,6 +366,7 @@ def stop(self): self.device_is_ready.set() self.process_packet_task.cancel() self.process_video_task.cancel() + self.process_audio_task.cancel() self.main_task.cancel() self.transport.close() self.transport = None @@ -417,7 +446,8 @@ async def handle_drw(self, drw_pkt): self.video_stale_at = None self.video_chunk_queue.put_nowait((pkt_epoch, drw_pkt)) elif drw_pkt._channel == Channel.Audio: - pass + # logger.debug(f'Got audio data {drw_pkt.get_drw_payload()}') + self.audio_chunk_queue.put_nowait((pkt_epoch, drw_pkt)) elif drw_pkt._channel == Channel.Command: await self.handle_incoming_command_packet(drw_pkt) @@ -482,6 +512,7 @@ async def loop_step(self): self.video_stale_at = self.last_drw_pkt_at logger.info('No video for 5 seconds. Re-request video ') await self._request_video(1) + await self._request_audio(1) if self.video_stale_at and (datetime.datetime.now() - self.video_stale_at).total_seconds() > 10: # camera disconnected logger.warning('No video for 10 seconds. Disconnecting') @@ -546,6 +577,8 @@ class BinarySession(Session): BinaryCommands.CMD_PEER_VIDEOPARAM_SET: BinaryCommands.ACK_PEER_VIDEOPARAM_SET, BinaryCommands.CMD_PEER_LIVEVIDEO_START: BinaryCommands.ACK_PEER_LIVEVIDEO_START, BinaryCommands.CMD_PEER_LIVEVIDEO_STOP: BinaryCommands.ACK_PEER_LIVEVIDEO_STOP, + BinaryCommands.CMD_PEER_LIVEAUDIO_START: BinaryCommands.ACK_PEER_LIVEAUDIO_START, + BinaryCommands.CMD_PEER_LIVEAUDIO_STOP: BinaryCommands.ACK_PEER_LIVEAUDIO_STOP, BinaryCommands.CMD_SYSTEM_STATUS_GET: BinaryCommands.ACK_SYSTEM_STATUS_GET, } REV_ACKS = {v: k for k, v in ACKS.items()} @@ -583,7 +616,7 @@ async def handle_drw(self, drw_pkt): self.video_stale_at = None self.video_chunk_queue.put_nowait((pkt_epoch, drw_pkt)) elif drw_pkt._channel == Channel.Audio: - pass + self.audio_chunk_queue.put_nowait((pkt_epoch, drw_pkt)) elif drw_pkt._channel == Channel.Command: await self.handle_incoming_command_packet(drw_pkt) @@ -678,6 +711,13 @@ async def _request_video(self, mode): else: await self.send_command(BinaryCommands.CMD_PEER_LIVEVIDEO_STOP, b'', with_response=True) + async def _request_audio(self, mode): + logger.info('Request audio %s', mode) + if mode: + await self.send_command(BinaryCommands.CMD_PEER_LIVEAUDIO_START, b'', with_response=True) + else: + await self.send_command(BinaryCommands.CMD_PEER_LIVEAUDIO_STOP, b'', with_response=True) + async def login(self): # type is char account[0x20]; char password[0x80]; payload = struct.pack('>32s128s', self.auth_login.encode('utf-8'), self.auth_password.encode('utf-8'))