Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 43 additions & 57 deletions apimetrics_agent/command_line.py
100755 → 100644
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import json
import sys
import argparse
from azure.servicebus import QueueClient

from azure.servicebus import ServiceBusClient
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are in the wrong repository. Can you do this in APIm-Agent instead?

from .config import Config
from .register import register_agent_with_gae
from .thread import handle_request
Expand Down Expand Up @@ -60,73 +61,58 @@ def main():
run(config=config)


def get_callback(listener, msg):
msg_id = (
msg.broker_properties.get("MessageId", "??")
if msg and msg.broker_properties
else "?"
)

def cb_func(*args, **kwargs):
logger.info("Success callback for msg %s: %s %s", msg_id, args, kwargs)
return listener.mark_message_as_complete(msg)

return cb_func


def get_error_callback(listener, msg):
msg_id = (
msg.broker_properties.get("MessageId", "??")
if msg and msg.broker_properties
else "?"
)

def cb_func(*args, **kwargs):
logger.warning("Error callback for msg %s: %s %s", msg_id, args, kwargs)

return cb_func


def extract_defintion(msg):
logger.debug("extract_defintion %s %s", msg, msg.body)
if msg and msg.body is not None:
bytes_arr = next(msg.body)
json_string = bytes_arr.decode("utf-8")
def extract_defintion(body):
logger.debug("extract_defintion")
try:
json_string = body.decode("utf-8")
output = json.loads(json_string)
return output
except Exception as ex:
logger.error("Failed to parse message body: %s", ex)
return None


def listen(config):

queue_client = QueueClient.from_connection_string(
config.azure.connection_string, config.azure.taskqueue
)
servicebus_conn_str = config.azure.connection_string
queue_name = config.azure.taskqueue

last_message = dt.datetime.utcnow()
diff = 0
while diff < (60 * 15):
logger.info(
"Last msg %s - creating receiver for %s", diff, config.azure.taskqueue
)
with queue_client.get_receiver(idle_timeout=60*5) as messages:
for message in messages: # pylint: disable=not-an-iterable
last_message = dt.datetime.utcnow()
definiton = extract_defintion(message)
if definiton:
url, _, _ = (
definiton.get("request", {}).get("url", "").partition("?")
)
logger.info("Request received for %s", url)
handle_request(config, definiton, complete_cb=message.complete)
else:
logger.info("... no message")

now = dt.datetime.utcnow()
diff = (now - last_message).total_seconds()

with ServiceBusClient.from_connection_string(servicebus_conn_str) as sb_client:
receiver = sb_client.get_queue_receiver(queue_name=queue_name, max_wait_time=300)
with receiver:
logger.info("Listening on ServiceBus queue '%s'", queue_name)
while diff < (60 * 15):
batch = receiver.receive_messages(max_message_count=1, max_wait_time=5)
found = False
for message in batch:
found = True
last_message = dt.datetime.utcnow()
logger.info("Received message from %s", queue_name)
definiton = extract_defintion(message.body)
if definiton:
url, _, _ = (
definiton.get("request", {}).get("url", "").partition("?")
)
logger.info("Request received for %s", url)
handle_request(
config,
definiton,
complete_cb=message.complete # This is not callable in v7.x
)
try:
receiver.complete_message(message)
except Exception as ex:
logger.error("Failed to complete message: %s", ex)
if not found:
logger.info("... no message")

now = dt.datetime.utcnow()
diff = (now - last_message).total_seconds()

def run(config):

def run(config):
try:
register_agent_with_gae(config)
except Exception as ex: # pylint: disable=W0703
Expand Down
2 changes: 1 addition & 1 deletion requirements.in
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
azure-servicebus>=0.50.1
azure-servicebus>=7.14.2
pycurl>=7.43.0
requests>=2.10.0
google-cloud-storage>=0.21.0
55 changes: 27 additions & 28 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,39 +4,38 @@
#
# pip-compile --output-file=requirements.txt requirements.in
#
adal==1.2.2 # via msrestazure
azure-common==1.1.23 # via azure-servicebus
azure-servicebus==0.50.1
cachetools==3.1.1 # via google-auth
certifi==2019.9.11 # via msrest, requests, uamqp
cffi==1.13.2 # via cryptography
chardet==3.0.4 # via requests
cryptography==2.8 # via adal
google-api-core==1.14.3 # via google-cloud-core
google-auth==1.7.0 # via google-api-core, google-cloud-storage
google-cloud-core==1.0.3 # via google-cloud-storage
azure-core==1.34.0 # via azure-servicebus
azure-servicebus==7.14.2
cachetools==3.1.1 # via google-auth
certifi==2019.9.11 # via msrest, requests, uamqp
cffi==1.13.2 # via cryptography
chardet==3.0.4 # via requests
cryptography==2.8 # via adal
google-api-core==1.14.3 # via google-cloud-core
google-auth==1.7.0 # via google-api-core, google-cloud-storage
google-cloud-core==1.0.3 # via google-cloud-storage
google-cloud-storage==1.22.0
google-resumable-media==0.4.1 # via google-cloud-storage
googleapis-common-protos==1.6.0 # via google-api-core
idna==2.8 # via requests
isodate==0.6.0 # via msrest
msrest==0.6.10 # via msrestazure
msrestazure==0.6.2 # via azure-servicebus
oauthlib==3.1.0 # via requests-oauthlib
protobuf==3.10.0 # via google-api-core, googleapis-common-protos
pyasn1-modules==0.2.7 # via google-auth
pyasn1==0.4.7 # via pyasn1-modules, rsa
pycparser==2.19 # via cffi
idna==2.8 # via requests
isodate==0.7.2 # via azure-servicebus
msrest==0.6.10 # via msrestazure
msrestazure==0.6.2 # via azure-servicebus
oauthlib==3.1.0 # via requests-oauthlib
protobuf==3.10.0 # via google-api-core, googleapis-common-protos
pyasn1-modules==0.2.7 # via google-auth
pyasn1==0.4.7 # via pyasn1-modules, rsa
pycparser==2.19 # via cffi
pycurl==7.43.0.3
pyjwt==1.7.1 # via adal
python-dateutil==2.8.1 # via adal
pytz==2019.3 # via google-api-core
requests-oauthlib==1.3.0 # via msrest
pyjwt==1.7.1 # via adal
python-dateutil==2.8.1 # via adal
pytz==2019.3 # via google-api-core
requests-oauthlib==1.3.0 # via msrest
requests==2.22.0
rsa==4.0 # via google-auth
six==1.13.0 # via cryptography, google-api-core, google-auth, google-resumable-media, isodate, protobuf, python-dateutil, uamqp
uamqp==1.2.3 # via azure-servicebus
urllib3==1.25.7 # via requests
rsa==4.0 # via google-auth
six==1.13.0 # via cryptography, google-api-core, google-auth, google-resumable-media, isodate, protobuf, python-dateutil, uamqp
uamqp==1.2.3 # via azure-servicebus
urllib3==1.25.7 # via requests

# The following packages are considered to be unsafe in a requirements file:
# setuptools==41.6.0 # via google-api-core, google-auth, protobuf