Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 191 additions & 97 deletions src/azure-cli-core/azure/cli/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import os
import sys
import timeit
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

from knack.cli import CLI
from knack.commands import CLICommandsLoader
Expand All @@ -34,6 +36,10 @@
ALWAYS_LOADED_MODULES = []
# Extensions that will always be loaded if installed. They don't expose commands but hook into CLI core.
ALWAYS_LOADED_EXTENSIONS = ['azext_ai_examples', 'azext_next']
# Timeout (in seconds) for loading a single module. Acts as a safety valve to prevent indefinite hangs
MODULE_LOAD_TIMEOUT_SECONDS = 30
# Maximum number of worker threads for parallel module loading.
MAX_WORKER_THREAD_COUNT = 4


def _configure_knack():
Expand Down Expand Up @@ -197,6 +203,17 @@ def _configure_style(self):
format_styled_text.theme = theme


class ModuleLoadResult: # pylint: disable=too-few-public-methods
def __init__(self, module_name, command_table, group_table, elapsed_time, error=None, traceback_str=None, command_loader=None):
self.module_name = module_name
self.command_table = command_table
self.group_table = group_table
self.elapsed_time = elapsed_time
self.error = error
self.traceback_str = traceback_str
self.command_loader = command_loader


class MainCommandsLoader(CLICommandsLoader):

# Format string for pretty-print the command module table
Expand Down Expand Up @@ -241,11 +258,11 @@ def load_command_table(self, args):
import pkgutil
import traceback
from azure.cli.core.commands import (
_load_module_command_loader, _load_extension_command_loader, BLOCKED_MODS, ExtensionCommandSource)
_load_extension_command_loader, ExtensionCommandSource)
from azure.cli.core.extension import (
get_extensions, get_extension_path, get_extension_modname)
from azure.cli.core.breaking_change import (
import_core_breaking_changes, import_module_breaking_changes, import_extension_breaking_changes)
import_core_breaking_changes, import_extension_breaking_changes)

def _update_command_table_from_modules(args, command_modules=None):
"""Loads command tables from modules and merge into the main command table.
Expand Down Expand Up @@ -273,38 +290,10 @@ def _update_command_table_from_modules(args, command_modules=None):
except ImportError as e:
logger.warning(e)

count = 0
cumulative_elapsed_time = 0
cumulative_group_count = 0
cumulative_command_count = 0
logger.debug("Loading command modules:")
logger.debug(self.header_mod)
results = self._load_modules(args, command_modules)

for mod in [m for m in command_modules if m not in BLOCKED_MODS]:
try:
start_time = timeit.default_timer()
module_command_table, module_group_table = _load_module_command_loader(self, args, mod)
import_module_breaking_changes(mod)
for cmd in module_command_table.values():
cmd.command_source = mod
self.command_table.update(module_command_table)
self.command_group_table.update(module_group_table)

elapsed_time = timeit.default_timer() - start_time
logger.debug(self.item_format_string, mod, elapsed_time,
len(module_group_table), len(module_command_table))
count += 1
cumulative_elapsed_time += elapsed_time
cumulative_group_count += len(module_group_table)
cumulative_command_count += len(module_command_table)
except Exception as ex: # pylint: disable=broad-except
# Changing this error message requires updating CI script that checks for failed
# module loading.
from azure.cli.core import telemetry
logger.error("Error loading command module '%s': %s", mod, ex)
telemetry.set_exception(exception=ex, fault_type='module-load-error-' + mod,
summary='Error loading module: {}'.format(mod))
logger.debug(traceback.format_exc())
count, cumulative_elapsed_time, cumulative_group_count, cumulative_command_count = \
self._process_results_with_timing(results)
# Summary line
logger.debug(self.item_format_string,
"Total ({})".format(count), cumulative_elapsed_time,
Expand Down Expand Up @@ -345,70 +334,80 @@ def _filter_modname(extensions):
return filtered_extensions

extensions = get_extensions()
if extensions:
if extension_modname is not None:
extension_modname.extend(ALWAYS_LOADED_EXTENSIONS)
extensions = _filter_modname(extensions)
allowed_extensions = _handle_extension_suppressions(extensions)
module_commands = set(self.command_table.keys())

count = 0
cumulative_elapsed_time = 0
cumulative_group_count = 0
cumulative_command_count = 0
logger.debug("Loading extensions:")
logger.debug(self.header_ext)

for ext in allowed_extensions:
try:
# Import in the `for` loop because `allowed_extensions` can be []. In such case we
# don't need to import `check_version_compatibility` at all.
from azure.cli.core.extension.operations import check_version_compatibility
check_version_compatibility(ext.get_metadata())
except CLIError as ex:
# issue warning and skip loading extensions that aren't compatible with the CLI core
logger.warning(ex)
continue
ext_name = ext.name
ext_dir = ext.path or get_extension_path(ext_name)
sys.path.append(ext_dir)
try:
ext_mod = get_extension_modname(ext_name, ext_dir=ext_dir)
# Add to the map. This needs to happen before we load commands as registering a command
# from an extension requires this map to be up-to-date.
# self._mod_to_ext_map[ext_mod] = ext_name
start_time = timeit.default_timer()
extension_command_table, extension_group_table = \
_load_extension_command_loader(self, args, ext_mod)
import_extension_breaking_changes(ext_mod)

for cmd_name, cmd in extension_command_table.items():
cmd.command_source = ExtensionCommandSource(
extension_name=ext_name,
overrides_command=cmd_name in module_commands,
preview=ext.preview,
experimental=ext.experimental)

self.command_table.update(extension_command_table)
self.command_group_table.update(extension_group_table)

elapsed_time = timeit.default_timer() - start_time
logger.debug(self.item_ext_format_string, ext_name, elapsed_time,
len(extension_group_table), len(extension_command_table),
ext_dir)
count += 1
cumulative_elapsed_time += elapsed_time
cumulative_group_count += len(extension_group_table)
cumulative_command_count += len(extension_command_table)
except Exception as ex: # pylint: disable=broad-except
self.cli_ctx.raise_event(EVENT_FAILED_EXTENSION_LOAD, extension_name=ext_name)
logger.warning("Unable to load extension '%s: %s'. Use --debug for more information.",
ext_name, ex)
logger.debug(traceback.format_exc())
# Summary line
logger.debug(self.item_ext_format_string,
"Total ({})".format(count), cumulative_elapsed_time,
cumulative_group_count, cumulative_command_count, "")
if not extensions:
return

if extension_modname is not None:
extension_modname.extend(ALWAYS_LOADED_EXTENSIONS)
extensions = _filter_modname(extensions)
allowed_extensions = _handle_extension_suppressions(extensions)
module_commands = set(self.command_table.keys())

count = 0
cumulative_elapsed_time = 0
cumulative_group_count = 0
cumulative_command_count = 0
logger.debug("Loading extensions:")
logger.debug(self.header_ext)

for ext in allowed_extensions:
try:
# Import in the `for` loop because `allowed_extensions` can be []. In such case we
# don't need to import `check_version_compatibility` at all.
from azure.cli.core.extension.operations import check_version_compatibility
check_version_compatibility(ext.get_metadata())
except CLIError as ex:
# issue warning and skip loading extensions that aren't compatible with the CLI core
logger.warning(ex)
continue
ext_name = ext.name
ext_dir = ext.path or get_extension_path(ext_name)
sys.path.append(ext_dir)
try:
ext_mod = get_extension_modname(ext_name, ext_dir=ext_dir)
# Add to the map. This needs to happen before we load commands as registering a command
# from an extension requires this map to be up-to-date.
# self._mod_to_ext_map[ext_mod] = ext_name
start_time = timeit.default_timer()
extension_command_table, extension_group_table, extension_command_loader = \
_load_extension_command_loader(self, args, ext_mod)
import_extension_breaking_changes(ext_mod)

for cmd_name, cmd in extension_command_table.items():
cmd.command_source = ExtensionCommandSource(
extension_name=ext_name,
overrides_command=cmd_name in module_commands,
preview=ext.preview,
experimental=ext.experimental)

# Populate cmd_to_loader_map for extension commands
if extension_command_loader:
self.loaders.append(extension_command_loader)
for cmd_name in extension_command_table:
if cmd_name not in self.cmd_to_loader_map:
self.cmd_to_loader_map[cmd_name] = []
self.cmd_to_loader_map[cmd_name].append(extension_command_loader)

self.command_table.update(extension_command_table)
self.command_group_table.update(extension_group_table)

elapsed_time = timeit.default_timer() - start_time
logger.debug(self.item_ext_format_string, ext_name, elapsed_time,
len(extension_group_table), len(extension_command_table),
ext_dir)
count += 1
cumulative_elapsed_time += elapsed_time
cumulative_group_count += len(extension_group_table)
cumulative_command_count += len(extension_command_table)
except Exception as ex: # pylint: disable=broad-except
self.cli_ctx.raise_event(EVENT_FAILED_EXTENSION_LOAD, extension_name=ext_name)
logger.warning("Unable to load extension '%s: %s'. Use --debug for more information.",
ext_name, ex)
logger.debug(traceback.format_exc())
# Summary line
logger.debug(self.item_ext_format_string,
"Total ({})".format(count), cumulative_elapsed_time,
cumulative_group_count, cumulative_command_count, "")

def _wrap_suppress_extension_func(func, ext):
""" Wrapper method to handle centralization of log messages for extension filters """
Expand Down Expand Up @@ -587,6 +586,101 @@ def load_arguments(self, command=None):
self.extra_argument_registry.update(loader.extra_argument_registry)
loader._update_command_definitions() # pylint: disable=protected-access

def _load_modules(self, args, command_modules):
"""Load command modules using ThreadPoolExecutor with timeout protection."""
from azure.cli.core.commands import BLOCKED_MODS

results = []
with ThreadPoolExecutor(max_workers=MAX_WORKER_THREAD_COUNT) as executor:
future_to_module = {executor.submit(self._load_single_module, mod, args): mod
for mod in command_modules if mod not in BLOCKED_MODS}

for future in concurrent.futures.as_completed(future_to_module):
try:
result = future.result(timeout=MODULE_LOAD_TIMEOUT_SECONDS)
results.append(result)
except concurrent.futures.TimeoutError:
mod = future_to_module[future]
logger.warning("Module '%s' load timeout after %s seconds", mod, MODULE_LOAD_TIMEOUT_SECONDS)
results.append(ModuleLoadResult(mod, {}, {}, 0,
Exception(f"Module '{mod}' load timeout")))
except (ImportError, AttributeError, TypeError, ValueError) as ex:
mod = future_to_module[future]
logger.warning("Module '%s' load failed: %s", mod, ex)
results.append(ModuleLoadResult(mod, {}, {}, 0, ex))
except Exception as ex: # pylint: disable=broad-exception-caught
mod = future_to_module[future]
logger.warning("Module '%s' load failed with unexpected exception: %s", mod, ex)
results.append(ModuleLoadResult(mod, {}, {}, 0, ex))

return results

def _load_single_module(self, mod, args):
from azure.cli.core.breaking_change import import_module_breaking_changes
from azure.cli.core.commands import _load_module_command_loader
import traceback
try:
start_time = timeit.default_timer()
module_command_table, module_group_table, command_loader = _load_module_command_loader(self, args, mod)
import_module_breaking_changes(mod)
elapsed_time = timeit.default_timer() - start_time
return ModuleLoadResult(mod, module_command_table, module_group_table, elapsed_time, command_loader=command_loader)
except Exception as ex: # pylint: disable=broad-except
tb_str = traceback.format_exc()
return ModuleLoadResult(mod, {}, {}, 0, ex, tb_str)

def _handle_module_load_error(self, result):
"""Handle errors that occurred during module loading."""
from azure.cli.core import telemetry

logger.error("Error loading command module '%s': %s", result.module_name, result.error)
telemetry.set_exception(exception=result.error,
fault_type='module-load-error-' + result.module_name,
summary='Error loading module: {}'.format(result.module_name))
if result.traceback_str:
logger.debug(result.traceback_str)

def _process_successful_load(self, result):
"""Process successfully loaded module results."""
if result.command_loader:
self.loaders.append(result.command_loader)

for cmd in result.command_table:
if cmd not in self.cmd_to_loader_map:
self.cmd_to_loader_map[cmd] = []
self.cmd_to_loader_map[cmd].append(result.command_loader)

for cmd in result.command_table.values():
cmd.command_source = result.module_name

self.command_table.update(result.command_table)
self.command_group_table.update(result.group_table)

logger.debug(self.item_format_string, result.module_name, result.elapsed_time,
len(result.group_table), len(result.command_table))

def _process_results_with_timing(self, results):
"""Process pre-loaded module results with timing and progress reporting."""
logger.debug("Loading command modules:")
logger.debug(self.header_mod)

count = 0
cumulative_elapsed_time = 0
cumulative_group_count = 0
cumulative_command_count = 0

for result in results:
if result.error:
self._handle_module_load_error(result)
else:
self._process_successful_load(result)
count += 1
cumulative_elapsed_time += result.elapsed_time
cumulative_group_count += len(result.group_table)
cumulative_command_count += len(result.command_table)

return count, cumulative_elapsed_time, cumulative_group_count, cumulative_command_count


class CommandIndex:

Expand Down
13 changes: 4 additions & 9 deletions src/azure-cli-core/azure/cli/core/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1134,22 +1134,17 @@ def _load_command_loader(loader, args, name, prefix):
logger.debug("Module '%s' is missing `get_command_loader` entry.", name)

command_table = {}
command_loader = None

if loader_cls:
command_loader = loader_cls(cli_ctx=loader.cli_ctx)
loader.loaders.append(command_loader) # This will be used by interactive
if command_loader.supported_resource_type():
command_table = command_loader.load_command_table(args)
if command_table:
for cmd in list(command_table.keys()):
# TODO: If desired to for extension to patch module, this can be uncommented
# if loader.cmd_to_loader_map.get(cmd):
# loader.cmd_to_loader_map[cmd].append(command_loader)
# else:
loader.cmd_to_loader_map[cmd] = [command_loader]
else:
logger.debug("Module '%s' is missing `COMMAND_LOADER_CLS` entry.", name)
return command_table, command_loader.command_group_table

group_table = command_loader.command_group_table if command_loader else {}
return command_table, group_table, command_loader


def _load_extension_command_loader(loader, args, ext):
Expand Down
Loading