Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
165 changes: 165 additions & 0 deletions awscli/customizations/ecs/expressgateway/display_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.

"""Display strategy implementations for ECS Express Gateway Service monitoring."""

import asyncio
import time

from botocore.exceptions import ClientError

from awscli.customizations.utils import uni_print


class DisplayStrategy:
"""Base class for display strategies.

Each strategy controls its own execution model, timing, and output format.
"""

def execute_monitoring(self, collector, start_time, timeout_minutes):
"""Execute the monitoring loop.

Args:
collector: ServiceViewCollector instance for data fetching
start_time: Start timestamp for timeout calculation
timeout_minutes: Maximum monitoring duration in minutes
"""
raise NotImplementedError


class InteractiveDisplayStrategy(DisplayStrategy):
"""Interactive display strategy with async spinner and keyboard navigation.

Uses dual async tasks:
- Data task: Polls ECS APIs every 5 seconds
- Spinner task: Updates display every 100ms with rotating spinner
"""

def __init__(self, display, use_color):
"""Initialize the interactive display strategy.

Args:
display: Display instance from prompt_toolkit_display module
providing the interactive terminal interface
use_color: Whether to use colored output
"""
self.display = display
self.use_color = use_color

def execute_monitoring(self, collector, start_time, timeout_minutes):
"""Execute async monitoring with spinner and keyboard controls."""
final_output, timed_out = asyncio.run(
self._execute_async(collector, start_time, timeout_minutes)
)
if timed_out:
uni_print(final_output + "\nMonitoring timed out!\n")
else:
uni_print(final_output + "\nMonitoring Complete!\n")

async def _execute_async(self, collector, start_time, timeout_minutes):
"""Async execution with dual tasks for data and spinner."""
spinner_chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
spinner_index = 0
current_output = "Waiting for initial data"
timed_out = False

async def update_data():
nonlocal current_output, timed_out
while True:
current_time = time.time()
if current_time - start_time > timeout_minutes * 60:
timed_out = True
self.display.app.exit()
break

try:
loop = asyncio.get_event_loop()
new_output = await loop.run_in_executor(
None, collector.get_current_view, "{SPINNER}"
)
current_output = new_output
except ClientError as e:
if (
e.response.get('Error', {}).get('Code')
== 'InvalidParameterException'
):
error_message = e.response.get('Error', {}).get(
'Message', ''
)
if (
"Cannot call DescribeServiceRevisions for a service that is INACTIVE"
in error_message
):
current_output = "Service is inactive"
else:
raise
else:
raise

await asyncio.sleep(5.0)

async def update_spinner():
nonlocal spinner_index
while True:
spinner_char = spinner_chars[spinner_index]
display_output = current_output.replace(
"{SPINNER}", spinner_char
)
status_text = f"Getting updates... {spinner_char} | up/down to scroll, q to quit"
self.display.display(display_output, status_text)
spinner_index = (spinner_index + 1) % len(spinner_chars)
await asyncio.sleep(0.1)

data_task = asyncio.create_task(update_data())
spinner_task = asyncio.create_task(update_spinner())
display_task = None

try:
display_task = asyncio.create_task(self.display.run())

done, pending = await asyncio.wait(
[display_task, data_task], return_when=asyncio.FIRST_COMPLETED
)

if data_task in done:
# Retrieve and re-raise any exception from the task.
# asyncio.wait() doesn't retrieve exceptions itself.
exc = data_task.exception()
if exc:
raise exc

# Cancel pending tasks
for task in pending:
task.cancel()
# Await cancelled task to ensure proper cleanup and prevent
# warnings about unawaited tasks
try:
await task
except asyncio.CancelledError:
pass

finally:
# Ensure display app is properly shut down
self.display.app.exit()
spinner_task.cancel()
if display_task is not None and not display_task.done():
display_task.cancel()
# Await cancelled task to ensure proper cleanup and prevent
# warnings about unawaited tasks
try:
await display_task
except asyncio.CancelledError:
pass

return current_output.replace("{SPINNER}", ""), timed_out
83 changes: 10 additions & 73 deletions awscli/customizations/ecs/monitorexpressgatewayservice.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,16 @@
aws ecs monitor-express-gateway-service --service-arn <arn> [--resource-view RESOURCE|DEPLOYMENT]
"""

import asyncio
import sys
import threading
import time

from botocore.exceptions import ClientError

from awscli.customizations.commands import BasicCommand
from awscli.customizations.ecs.exceptions import MonitoringError
from awscli.customizations.ecs.expressgateway.display_strategy import (
InteractiveDisplayStrategy,
)
from awscli.customizations.ecs.prompt_toolkit_display import Display
from awscli.customizations.ecs.serviceviewcollector import ServiceViewCollector
from awscli.customizations.utils import uni_print
Expand Down Expand Up @@ -185,7 +186,7 @@ def __init__(
service_arn,
mode,
timeout_minutes=30,
display=None,
display_strategy=None,
use_color=True,
collector=None,
):
Expand All @@ -195,7 +196,9 @@ def __init__(
self.timeout_minutes = timeout_minutes
self.start_time = time.time()
self.use_color = use_color
self.display = display or Display()
self.display_strategy = display_strategy or InteractiveDisplayStrategy(
display=Display(), use_color=use_color
)
self.collector = collector or ServiceViewCollector(
client, service_arn, mode, use_color
)
Expand All @@ -207,72 +210,6 @@ def is_monitoring_available():

def exec(self):
"""Start monitoring the express gateway service with progress display."""

def monitor_service(spinner_char):
return self.collector.get_current_view(spinner_char)

asyncio.run(self._execute_with_progress_async(monitor_service, 100))

async def _execute_with_progress_async(
self, execution, progress_refresh_millis, execution_refresh_millis=5000
):
"""Execute monitoring loop with animated progress display."""
spinner_chars = "⠋⠙⠹⠸⠼⠴⠦⠧⠇⠏"
spinner_index = 0

current_output = "Waiting for initial data"

async def update_data():
nonlocal current_output
while True:
current_time = time.time()
if current_time - self.start_time > self.timeout_minutes * 60:
break
try:
loop = asyncio.get_event_loop()
new_output = await loop.run_in_executor(
None, execution, "{SPINNER}"
)
current_output = new_output
except ClientError as e:
if (
e.response.get('Error', {}).get('Code')
== 'InvalidParameterException'
):
error_message = e.response.get('Error', {}).get(
'Message', ''
)
if (
"Cannot call DescribeServiceRevisions for a service that is INACTIVE"
in error_message
):
current_output = "Service is inactive"
else:
raise
else:
raise
await asyncio.sleep(execution_refresh_millis / 1000.0)

async def update_spinner():
nonlocal spinner_index
while True:
spinner_char = spinner_chars[spinner_index]
display_output = current_output.replace(
"{SPINNER}", spinner_char
)
status_text = f"Getting updates... {spinner_char} | up/down to scroll, q to quit"
self.display.display(display_output, status_text)
spinner_index = (spinner_index + 1) % len(spinner_chars)
await asyncio.sleep(progress_refresh_millis / 1000.0)

# Start both tasks
data_task = asyncio.create_task(update_data())
spinner_task = asyncio.create_task(update_spinner())

try:
await self.display.run()
finally:
data_task.cancel()
spinner_task.cancel()
final_output = current_output.replace("{SPINNER}", "")
uni_print(final_output + "\nMonitoring Complete!\n")
self.display_strategy.execute_monitoring(
self.collector, self.timeout_minutes, self.start_time
)
Loading
Loading