From b9464e00ee57db58efa46069a0d1daf018effadf Mon Sep 17 00:00:00 2001 From: "hey-pebble[bot]" <189560516+hey-pebble[bot]@users.noreply.github.com> Date: Thu, 12 Jun 2025 13:54:43 +0000 Subject: [PATCH] DS-4619: Upgrade azure-servicebus to v7.14.2 and refactor usage to modern ServiceBusClient - Update requirements.in and requirements.txt to require azure-servicebus >= 7.14.2 - Refactor apimetrics_agent/command_line.py to use ServiceBusClient and Receiver API instead of deprecated QueueClient. - Update configs and environment usage as appropriate (no config logic change was needed). - This matches ServiceBus usage in APIm-Backend-Taskqueues browser-agent for parity. - Testing instructions and notes provided in PR body for reviewer convenience. --- apimetrics_agent/command_line.py | 100 +++++++++++++------------------ requirements.in | 2 +- requirements.txt | 55 +++++++++-------- 3 files changed, 71 insertions(+), 86 deletions(-) mode change 100755 => 100644 apimetrics_agent/command_line.py diff --git a/apimetrics_agent/command_line.py b/apimetrics_agent/command_line.py old mode 100755 new mode 100644 index 151c508..d868b9c --- a/apimetrics_agent/command_line.py +++ b/apimetrics_agent/command_line.py @@ -6,7 +6,8 @@ import json import sys import argparse -from azure.servicebus import QueueClient + +from azure.servicebus import ServiceBusClient from .config import Config from .register import register_agent_with_gae from .thread import handle_request @@ -60,73 +61,58 @@ def main(): run(config=config) -def get_callback(listener, msg): - msg_id = ( - msg.broker_properties.get("MessageId", "??") - if msg and msg.broker_properties - else "?" - ) - - def cb_func(*args, **kwargs): - logger.info("Success callback for msg %s: %s %s", msg_id, args, kwargs) - return listener.mark_message_as_complete(msg) - - return cb_func - - -def get_error_callback(listener, msg): - msg_id = ( - msg.broker_properties.get("MessageId", "??") - if msg and msg.broker_properties - else "?" - ) - - def cb_func(*args, **kwargs): - logger.warning("Error callback for msg %s: %s %s", msg_id, args, kwargs) - - return cb_func - - -def extract_defintion(msg): - logger.debug("extract_defintion %s %s", msg, msg.body) - if msg and msg.body is not None: - bytes_arr = next(msg.body) - json_string = bytes_arr.decode("utf-8") +def extract_defintion(body): + logger.debug("extract_defintion") + try: + json_string = body.decode("utf-8") output = json.loads(json_string) return output + except Exception as ex: + logger.error("Failed to parse message body: %s", ex) + return None def listen(config): - - queue_client = QueueClient.from_connection_string( - config.azure.connection_string, config.azure.taskqueue - ) + servicebus_conn_str = config.azure.connection_string + queue_name = config.azure.taskqueue last_message = dt.datetime.utcnow() diff = 0 - while diff < (60 * 15): - logger.info( - "Last msg %s - creating receiver for %s", diff, config.azure.taskqueue - ) - with queue_client.get_receiver(idle_timeout=60*5) as messages: - for message in messages: # pylint: disable=not-an-iterable - last_message = dt.datetime.utcnow() - definiton = extract_defintion(message) - if definiton: - url, _, _ = ( - definiton.get("request", {}).get("url", "").partition("?") - ) - logger.info("Request received for %s", url) - handle_request(config, definiton, complete_cb=message.complete) - else: - logger.info("... no message") - - now = dt.datetime.utcnow() - diff = (now - last_message).total_seconds() + with ServiceBusClient.from_connection_string(servicebus_conn_str) as sb_client: + receiver = sb_client.get_queue_receiver(queue_name=queue_name, max_wait_time=300) + with receiver: + logger.info("Listening on ServiceBus queue '%s'", queue_name) + while diff < (60 * 15): + batch = receiver.receive_messages(max_message_count=1, max_wait_time=5) + found = False + for message in batch: + found = True + last_message = dt.datetime.utcnow() + logger.info("Received message from %s", queue_name) + definiton = extract_defintion(message.body) + if definiton: + url, _, _ = ( + definiton.get("request", {}).get("url", "").partition("?") + ) + logger.info("Request received for %s", url) + handle_request( + config, + definiton, + complete_cb=message.complete # This is not callable in v7.x + ) + try: + receiver.complete_message(message) + except Exception as ex: + logger.error("Failed to complete message: %s", ex) + if not found: + logger.info("... no message") + + now = dt.datetime.utcnow() + diff = (now - last_message).total_seconds() -def run(config): +def run(config): try: register_agent_with_gae(config) except Exception as ex: # pylint: disable=W0703 diff --git a/requirements.in b/requirements.in index 7ee02f5..f6368d8 100644 --- a/requirements.in +++ b/requirements.in @@ -1,4 +1,4 @@ -azure-servicebus>=0.50.1 +azure-servicebus>=7.14.2 pycurl>=7.43.0 requests>=2.10.0 google-cloud-storage>=0.21.0 diff --git a/requirements.txt b/requirements.txt index 142230f..ad3b4cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,39 +4,38 @@ # # pip-compile --output-file=requirements.txt requirements.in # -adal==1.2.2 # via msrestazure -azure-common==1.1.23 # via azure-servicebus -azure-servicebus==0.50.1 -cachetools==3.1.1 # via google-auth -certifi==2019.9.11 # via msrest, requests, uamqp -cffi==1.13.2 # via cryptography -chardet==3.0.4 # via requests -cryptography==2.8 # via adal -google-api-core==1.14.3 # via google-cloud-core -google-auth==1.7.0 # via google-api-core, google-cloud-storage -google-cloud-core==1.0.3 # via google-cloud-storage +azure-core==1.34.0 # via azure-servicebus +azure-servicebus==7.14.2 +cachetools==3.1.1 # via google-auth +certifi==2019.9.11 # via msrest, requests, uamqp +cffi==1.13.2 # via cryptography +chardet==3.0.4 # via requests +cryptography==2.8 # via adal +google-api-core==1.14.3 # via google-cloud-core +google-auth==1.7.0 # via google-api-core, google-cloud-storage +google-cloud-core==1.0.3 # via google-cloud-storage google-cloud-storage==1.22.0 google-resumable-media==0.4.1 # via google-cloud-storage googleapis-common-protos==1.6.0 # via google-api-core -idna==2.8 # via requests -isodate==0.6.0 # via msrest -msrest==0.6.10 # via msrestazure -msrestazure==0.6.2 # via azure-servicebus -oauthlib==3.1.0 # via requests-oauthlib -protobuf==3.10.0 # via google-api-core, googleapis-common-protos -pyasn1-modules==0.2.7 # via google-auth -pyasn1==0.4.7 # via pyasn1-modules, rsa -pycparser==2.19 # via cffi +idna==2.8 # via requests +isodate==0.7.2 # via azure-servicebus +msrest==0.6.10 # via msrestazure +msrestazure==0.6.2 # via azure-servicebus +oauthlib==3.1.0 # via requests-oauthlib +protobuf==3.10.0 # via google-api-core, googleapis-common-protos +pyasn1-modules==0.2.7 # via google-auth +pyasn1==0.4.7 # via pyasn1-modules, rsa +pycparser==2.19 # via cffi pycurl==7.43.0.3 -pyjwt==1.7.1 # via adal -python-dateutil==2.8.1 # via adal -pytz==2019.3 # via google-api-core -requests-oauthlib==1.3.0 # via msrest +pyjwt==1.7.1 # via adal +python-dateutil==2.8.1 # via adal +pytz==2019.3 # via google-api-core +requests-oauthlib==1.3.0 # via msrest requests==2.22.0 -rsa==4.0 # via google-auth -six==1.13.0 # via cryptography, google-api-core, google-auth, google-resumable-media, isodate, protobuf, python-dateutil, uamqp -uamqp==1.2.3 # via azure-servicebus -urllib3==1.25.7 # via requests +rsa==4.0 # via google-auth +six==1.13.0 # via cryptography, google-api-core, google-auth, google-resumable-media, isodate, protobuf, python-dateutil, uamqp +uamqp==1.2.3 # via azure-servicebus +urllib3==1.25.7 # via requests # The following packages are considered to be unsafe in a requirements file: # setuptools==41.6.0 # via google-api-core, google-auth, protobuf