Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion utils/update_checkout/tests/test_locked_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ def _update_arguments_with_fake_path(repo_name: str, path: str) -> UpdateArgumen
reset_to_remote=False,
clean=False,
stash=False,
cross_repos_pr=False,
cross_repos_pr={},
output_prefix="",
verbose=False,
)
Expand Down
62 changes: 47 additions & 15 deletions utils/update_checkout/update_checkout/git_command.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,44 @@
import os
import shlex
import subprocess
import sys
from typing import List, Any, Optional, Dict
from typing import List, Any, Optional, Dict, Tuple


class GitException(Exception):
"""
Exception raised when a Git command execution fails.

Attributes
----------
returncode : int
The return code from the failed Git command.
command : List[str]
The Git command that was executed.
repo_name : str
The name of the Git repository.
stderr : str
The output of the failed Git command.
"""

def __init__(
self,
returncode: int,
command: List[str],
repo_name: str,
output: str,
):
super().__init__()
self.returncode = returncode
self.command = command
self.repo_name = repo_name
self.stderr = output

def __str__(self):
return (
f"[{self.repo_name}] '{Git._quote_command(self.command)}' "
f"returned ({self.returncode}) with the following {self.stderr}."
)


class Git:
Expand All @@ -15,9 +52,9 @@ def run(
allow_non_zero_exit: bool = False,
fatal: bool = False,
**kwargs,
):
) -> Tuple[str, int, List[str]]:
command = Git._build_command(args)

output = ""
try:
result = subprocess.run(
command,
Expand All @@ -38,20 +75,15 @@ def run(
if fatal:
sys.exit(
f"command `{command}` terminated with a non-zero exit "
f"status {str(e.returncode)}, aborting")
eout = Exception(
f"[{repo_path}] '{Git._quote_command(command)}' failed with '{output}'"
f"status {str(e.returncode)}, aborting"
)
raise GitException(
e.returncode, command, os.path.dirname(repo_path), output
)
eout.ret = e.returncode
eout.arguments = command
eout.repo_path = repo_path
eout.stderr = output
raise eout
except OSError as e:
if fatal:
sys.exit(
f"could not execute '{Git._quote_command(command)}': "
f"{e.strerror}"
f"could not execute '{Git._quote_command(command)}': {e.strerror}"
)
return (output.strip(), result.returncode, command)

Expand All @@ -73,7 +105,7 @@ def _echo_command(
print(f"{prefix}+ {' '.join(command_str)}", file=sys.stderr)
if output:
for line in output.splitlines():
print(prefix+line)
print(prefix + line)
sys.stdout.flush()
sys.stderr.flush()

Expand All @@ -86,5 +118,5 @@ def _quote(arg: Any) -> str:
return shlex.quote(str(arg))

@staticmethod
def _quote_command(command: Any) -> str:
def _quote_command(command: List[Any]) -> str:
return " ".join(Git._quote(arg) for arg in command)
53 changes: 27 additions & 26 deletions utils/update_checkout/update_checkout/parallel_runner.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import sys
from multiprocessing import cpu_count
import time
from typing import Callable, List, Any, Tuple, Union
from typing import Callable, List, Any, Optional, Tuple, Union
from threading import Lock, Thread, Event
from concurrent.futures import ThreadPoolExecutor
import shutil

from .runner_arguments import RunnerArguments, AdditionalSwiftSourcesArguments
from .git_command import GitException

from .runner_arguments import RunnerArguments, AdditionalSwiftSourcesArguments, UpdateArguments


class TaskTracker:
Expand Down Expand Up @@ -50,32 +52,38 @@ def done_task_counter(self) -> int:
class MonitoredFunction:
def __init__(
self,
fn: Callable,
fn: Callable[..., Union[Exception]],
task_tracker: TaskTracker,
):
self.fn = fn
self._fn = fn
self._task_tracker = task_tracker

def __call__(self, *args: Union[RunnerArguments, AdditionalSwiftSourcesArguments]):
task_name = args[0].repo_name
self._task_tracker.mark_task_as_running(task_name)
result = None
try:
result = self.fn(*args)
result = self._fn(*args)
except Exception as e:
print(e)
finally:
self._task_tracker.mark_task_as_done(task_name)
return result


class ParallelRunner:
class ParallelRunner():
def __init__(
self,
fn: Callable,
pool_args: List[Union[RunnerArguments, AdditionalSwiftSourcesArguments]],
fn: Callable[..., None],
pool_args: Union[List[UpdateArguments], List[AdditionalSwiftSourcesArguments]],
n_threads: int = 0,
):
def run_safely(*args, **kwargs):
try:
fn(*args, **kwargs)
except GitException as e:
return e

if n_threads == 0:
# Limit the number of threads as the performance regresses if the
# number is too high.
Expand All @@ -84,7 +92,8 @@ def __init__(
self._monitor_polling_period = 0.1
self._terminal_width = shutil.get_terminal_size().columns
self._pool_args = pool_args
self._fn = fn
self._fn_name = fn.__name__
self._fn = run_safely
self._output_prefix = pool_args[0].output_prefix
self._nb_repos = len(pool_args)
self._stop_event = Event()
Expand All @@ -93,8 +102,8 @@ def __init__(
self._task_tracker = TaskTracker()
self._monitored_fn = MonitoredFunction(self._fn, self._task_tracker)

def run(self) -> List[Any]:
print(f"Running ``{self._fn.__name__}`` with up to {self._n_threads} processes.")
def run(self) -> List[Union[None, Exception]]:
print(f"Running ``{self._fn_name}`` with up to {self._n_threads} processes.")
if self._verbose:
with ThreadPoolExecutor(max_workers=self._n_threads) as pool:
results = list(pool.map(self._fn, self._pool_args, timeout=1800))
Expand Down Expand Up @@ -129,13 +138,10 @@ def _monitor(self):
sys.stdout.flush()

@staticmethod
def check_results(results, op) -> int:
"""Function used to check the results of ParallelRunner.

NOTE: This function was originally located in the shell module of
swift_build_support and should eventually be replaced with a better
parallel implementation.
"""
def check_results(
results: Optional[List[Union[GitException, Exception, Any]]], operation: str
) -> int:
"""Check the results of ParallelRunner and print the failures."""

fail_count = 0
if results is None:
Expand All @@ -144,15 +150,10 @@ def check_results(results, op) -> int:
if r is None:
continue
if fail_count == 0:
print("======%s FAILURES======" % op)
print(f"======{operation} FAILURES======")
fail_count += 1
if isinstance(r, str):
if isinstance(r, (GitException, Exception)):
print(r)
continue
if not hasattr(r, "repo_path"):
# TODO: create a proper Exception class with these attributes
continue
print("%s failed (ret=%d): %s" % (r.repo_path, r.ret, r))
if r.stderr:
print(r.stderr.decode())
print(r)
return fail_count
6 changes: 3 additions & 3 deletions utils/update_checkout/update_checkout/runner_arguments.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dataclasses import dataclass
from typing import Any, Dict, List
from typing import Any, Dict, List, Optional

from .cli_arguments import CliArguments

Expand All @@ -15,12 +15,12 @@ class UpdateArguments(RunnerArguments):
source_root: str
config: Dict[str, Any]
scheme_map: Any
tag: str
tag: Optional[str]
timestamp: Any
reset_to_remote: bool
clean: bool
stash: bool
cross_repos_pr: bool
cross_repos_pr: Dict[str, str]

@dataclass
class AdditionalSwiftSourcesArguments(RunnerArguments):
Expand Down
Loading