From 6620e4651fc679f221c06a8a0a15dc7ca81cdde5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andra=C5=BE=20Cuderman?= Date: Thu, 9 Sep 2021 12:52:24 +0200 Subject: [PATCH 1/3] feat(IDENT-3326): encode external queue payload --- aries_cloudagent/core/conductor.py | 4 ++- .../transport/outbound/manager.py | 31 +++++++++++++++++++ 2 files changed, 34 insertions(+), 1 deletion(-) diff --git a/aries_cloudagent/core/conductor.py b/aries_cloudagent/core/conductor.py index 951e7d785a..d8c4743f0b 100644 --- a/aries_cloudagent/core/conductor.py +++ b/aries_cloudagent/core/conductor.py @@ -531,9 +531,11 @@ async def _queue_external( targets = ( [outbound.target] if outbound.target else (outbound.target_list or []) ) + queue = await self.outbound_transport_manager.encode_message_external_queue(profile, outbound) + for target in targets: await self.outbound_queue.enqueue_message( - outbound.payload, target.endpoint + queue.payload, target.endpoint ) return OutboundSendStatus.SENT_TO_EXTERNAL_QUEUE diff --git a/aries_cloudagent/transport/outbound/manager.py b/aries_cloudagent/transport/outbound/manager.py index b9086445cb..780205233f 100644 --- a/aries_cloudagent/transport/outbound/manager.py +++ b/aries_cloudagent/transport/outbound/manager.py @@ -234,6 +234,37 @@ def get_transport_instance(self, transport_id: str) -> BaseOutboundTransport: """Get an instance of a running transport by ID.""" return self.running_transports[transport_id] + + async def encode_message_external_queue(self, profile: Profile, outbound: OutboundMessage): + """ + Add an outbound message to the queue. + + Args: + profile: The active profile for the request + outbound: The outbound message to deliver + """ + targets = [outbound.target] if outbound.target else (outbound.target_list or []) + transport_id = None + for target in targets: + endpoint = target.endpoint + try: + transport_id = self.get_running_transport_for_endpoint(endpoint) + except OutboundDeliveryError: + pass + if transport_id: + break + if not transport_id: + raise OutboundDeliveryError("No supported transport for outbound message") + + queued = QueuedOutboundMessage(profile, outbound, target, None) + + if queued.message and queued.message.enc_payload: + queued.payload = queued.message.enc_payload + else: + await self.perform_encode(queued) + + return queued + def enqueue_message(self, profile: Profile, outbound: OutboundMessage): """ Add an outbound message to the queue. From c701c6a89518a9c14f50e401db11cfd0a8ad1e1c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andra=C5=BE=20Cuderman?= Date: Thu, 9 Sep 2021 13:01:36 +0200 Subject: [PATCH 2/3] fix(IDENT-3326): Remove transport id check --- aries_cloudagent/transport/outbound/manager.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/aries_cloudagent/transport/outbound/manager.py b/aries_cloudagent/transport/outbound/manager.py index 780205233f..6cf03f9696 100644 --- a/aries_cloudagent/transport/outbound/manager.py +++ b/aries_cloudagent/transport/outbound/manager.py @@ -247,14 +247,6 @@ async def encode_message_external_queue(self, profile: Profile, outbound: Outbou transport_id = None for target in targets: endpoint = target.endpoint - try: - transport_id = self.get_running_transport_for_endpoint(endpoint) - except OutboundDeliveryError: - pass - if transport_id: - break - if not transport_id: - raise OutboundDeliveryError("No supported transport for outbound message") queued = QueuedOutboundMessage(profile, outbound, target, None) From 46a1a3c1e92f4fb836f31a0a43e7d1ecf37d0bdb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andra=C5=BE=20Cuderman?= Date: Thu, 9 Sep 2021 13:12:23 +0200 Subject: [PATCH 3/3] fix(IDENT-3326): Comment out transport search --- aries_cloudagent/transport/outbound/manager.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/aries_cloudagent/transport/outbound/manager.py b/aries_cloudagent/transport/outbound/manager.py index 6cf03f9696..2ec5f26d0f 100644 --- a/aries_cloudagent/transport/outbound/manager.py +++ b/aries_cloudagent/transport/outbound/manager.py @@ -446,8 +446,9 @@ def encode_queued_message(self, queued: QueuedOutboundMessage) -> asyncio.Task: async def perform_encode(self, queued: QueuedOutboundMessage): """Perform message encoding.""" - transport = self.get_transport_instance(queued.transport_id) - wire_format = transport.wire_format or self.context.inject(BaseWireFormat) + #transport = self.get_transport_instance(queued.transport_id) + #wire_format = transport.wire_format or self.context.inject(BaseWireFormat) + wire_format = self.context.inject(BaseWireFormat) session = await queued.profile.session() queued.payload = await wire_format.encode_message( session,