diff --git a/emu/_version.py b/emu/_version.py index 827d351f..e1c1290d 100644 --- a/emu/_version.py +++ b/emu/_version.py @@ -15,6 +15,7 @@ import re import subprocess import sys +from typing import Any, Callable, Dict, List, Mapping, Optional, Tuple def get_keywords(): @@ -32,6 +33,12 @@ def get_keywords(): class VersioneerConfig: """Container for Versioneer configuration parameters.""" + VCS: str + style: str + tag_prefix: str + parentdir_prefix: str + versionfile_source: str + verbose: bool def get_config(): @@ -56,9 +63,9 @@ class NotThisMethod(Exception): HANDLERS = {} -def register_vcs_handler(vcs, method): # decorator +def register_vcs_handler(vcs: str, method: str): # decorator """Decorator to mark a method as the handler for a particular VCS.""" - def decorate(f): + def decorate(f: Any): """Store f in HANDLERS[vcs][method].""" if vcs not in HANDLERS: HANDLERS[vcs] = {} @@ -67,11 +74,12 @@ def decorate(f): return decorate -def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False, - env=None): +def run_command(commands: List[Any], args: List[str], cwd: Optional[str] = None, verbose: bool = False, hide_stderr: bool = False, + env: Optional[Mapping[str, Any]] = None): """Call the given command(s).""" assert isinstance(commands, list) p = None + dispcmd = None for c in commands: try: dispcmd = str([c] + args) @@ -81,8 +89,7 @@ def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False, stderr=(subprocess.PIPE if hide_stderr else None)) break - except EnvironmentError: - e = sys.exc_info()[1] + except EnvironmentError as e: if e.errno == errno.ENOENT: continue if verbose: @@ -104,16 +111,16 @@ def run_command(commands, args, cwd=None, verbose=False, hide_stderr=False, return stdout, p.returncode -def versions_from_parentdir(parentdir_prefix, root, verbose): +def versions_from_parentdir(parentdir_prefix: str, root: str, verbose: bool): """Try to determine the version from the parent directory name. Source tarballs conventionally unpack into a directory that includes both the project name and a version string. We will also support searching up two directory levels for an appropriately named parent directory """ - rootdirs = [] + rootdirs: List[str] = [] - for i in range(3): + for _ in range(3): dirname = os.path.basename(root) if dirname.startswith(parentdir_prefix): return {"version": dirname[len(parentdir_prefix):], @@ -130,13 +137,13 @@ def versions_from_parentdir(parentdir_prefix, root, verbose): @register_vcs_handler("git", "get_keywords") -def git_get_keywords(versionfile_abs): +def git_get_keywords(versionfile_abs: str): """Extract version information from the given file.""" # the code embedded in _version.py can just fetch the value of these # keywords. When used from setup.py, we don't want to import _version.py, # so we do it with a regexp instead. This function is not used from # _version.py. - keywords = {} + keywords: Dict[str, str] = {} try: f = open(versionfile_abs, "r") for line in f.readlines(): @@ -159,7 +166,7 @@ def git_get_keywords(versionfile_abs): @register_vcs_handler("git", "keywords") -def git_versions_from_keywords(keywords, tag_prefix, verbose): +def git_versions_from_keywords(keywords: Mapping[str, Any], tag_prefix: str, verbose: bool): """Get version information from git keywords.""" if not keywords: raise NotThisMethod("no keywords at all, weird") @@ -214,7 +221,7 @@ def git_versions_from_keywords(keywords, tag_prefix, verbose): @register_vcs_handler("git", "pieces_from_vcs") -def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command): +def git_pieces_from_vcs(tag_prefix: str, root: str, verbose: bool, run_command: Callable[..., Tuple[Any, Any]] = run_command): """Get version from 'git describe' in the root of the source tree. This only gets called if the git-archive 'subst' keywords were *not* @@ -225,8 +232,8 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command): if sys.platform == "win32": GITS = ["git.cmd", "git.exe"] - out, rc = run_command(GITS, ["rev-parse", "--git-dir"], cwd=root, - hide_stderr=True) + _, rc = run_command(GITS, ["rev-parse", "--git-dir"], cwd=root, + hide_stderr=True) if rc != 0: if verbose: print("Directory %s not under git control" % root) @@ -247,7 +254,7 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command): raise NotThisMethod("'git rev-parse' failed") full_out = full_out.strip() - pieces = {} + pieces: Dict[str, Any] = {} pieces["long"] = full_out pieces["short"] = full_out[:7] # maybe improved later pieces["error"] = None @@ -305,14 +312,14 @@ def git_pieces_from_vcs(tag_prefix, root, verbose, run_command=run_command): return pieces -def plus_or_dot(pieces): +def plus_or_dot(pieces: Mapping[str, Any]): """Return a + if we don't already have one, else return a .""" if "+" in pieces.get("closest-tag", ""): return "." return "+" -def render_pep440(pieces): +def render_pep440(pieces: Mapping[str, Any]): """Build up version string, with post-release "local version identifier". Our goal: TAG[+DISTANCE.gHEX[.dirty]] . Note that if you @@ -337,7 +344,7 @@ def render_pep440(pieces): return rendered -def render_pep440_pre(pieces): +def render_pep440_pre(pieces: Mapping[str, Any]): """TAG[.post.devDISTANCE] -- No -dirty. Exceptions: @@ -353,7 +360,7 @@ def render_pep440_pre(pieces): return rendered -def render_pep440_post(pieces): +def render_pep440_post(pieces: Mapping[str, Any]): """TAG[.postDISTANCE[.dev0]+gHEX] . The ".dev0" means dirty. Note that .dev0 sorts backwards @@ -380,7 +387,7 @@ def render_pep440_post(pieces): return rendered -def render_pep440_old(pieces): +def render_pep440_old(pieces: Mapping[str, Any]): """TAG[.postDISTANCE[.dev0]] . The ".dev0" means dirty. @@ -402,7 +409,7 @@ def render_pep440_old(pieces): return rendered -def render_git_describe(pieces): +def render_git_describe(pieces: Mapping[str, Any]): """TAG[-DISTANCE-gHEX][-dirty]. Like 'git describe --tags --dirty --always'. @@ -422,7 +429,7 @@ def render_git_describe(pieces): return rendered -def render_git_describe_long(pieces): +def render_git_describe_long(pieces: Mapping[str, Any]): """TAG-DISTANCE-gHEX[-dirty]. Like 'git describe --tags --dirty --always -long'. @@ -442,7 +449,7 @@ def render_git_describe_long(pieces): return rendered -def render(pieces, style): +def render(pieces: Mapping[str, Any], style: str): """Render the given version pieces into the requested style.""" if pieces["error"]: return {"version": "unknown", @@ -482,7 +489,7 @@ def get_versions(): # case we can only use expanded keywords. cfg = get_config() - verbose = cfg.verbose + verbose: bool = cfg.verbose try: return git_versions_from_keywords(get_keywords(), cfg.tag_prefix, @@ -495,7 +502,7 @@ def get_versions(): # versionfile_source is the relative path from the top of the source # tree (where the .git directory might live) to this file. Invert # this to find the root from __file__. - for i in cfg.versionfile_source.split('/'): + for _ in cfg.versionfile_source.split('/'): root = os.path.dirname(root) except NameError: return {"version": "0+unknown", "full-revisionid": None, diff --git a/emu/android_release_zip.py b/emu/android_release_zip.py index 8f655411..8eca7639 100644 --- a/emu/android_release_zip.py +++ b/emu/android_release_zip.py @@ -11,10 +11,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from _typeshed import StrPath import collections import logging import os import shutil +from typing import DefaultDict, Dict, Union import zipfile from tqdm import tqdm @@ -30,28 +32,30 @@ class AndroidReleaseZip(object): about the contents of the zip. """ - def __init__(self, file_name): + def __init__(self, file_name: StrPath): self.file_name = file_name if not zipfile.is_zipfile(file_name): raise Exception("{} is not a zipfile!".format(file_name)) with zipfile.ZipFile(file_name, "r") as zip_file: - self.props = collections.defaultdict(set) - files = [x for x in zip_file.infolist() if "source.properties" in x.filename or "build.prop" in x.filename] + self.props: DefaultDict[str, str + ] = collections.defaultdict(str) + files = [x for x in zip_file.infolist( + ) if "source.properties" in x.filename or "build.prop" in x.filename] for file in files: for key, value in self._unpack_properties(zip_file, file).items(): self.props[key] = value - def _unpack_properties(self, zip_file, zip_info): + def _unpack_properties(self, zip_file: zipfile.ZipFile, zip_info: zipfile.ZipInfo): prop = zip_file.read(zip_info).decode("utf-8").splitlines() - res = dict([a.split("=") for a in prop if "=" in a]) + res: Dict[str, str] = dict([a.split("=") for a in prop if "=" in a]) return res def __str__(self): return "{}-{}".format(self.description(), self.revision()) - def description(self): + def description(self) -> Union[str, set[str]]: """Descripton of this release.""" - return self.props.get("Pkg.Desc") + return self.props.get("Pkg.Desc", "") def revision(self): """The revision of this release.""" @@ -71,7 +75,7 @@ def is_emulator(self): """True if this zip file contains the android emulator.""" return "Android Emulator" in self.description() - def copy(self, destination): + def copy(self, destination: str): """Copy the zipfile to the given destination. If the destination is the same as this zipfile the current path @@ -89,7 +93,7 @@ def copy(self, destination): logging.warning("Will not copy to itself, ignoring..") return self.file_name - def extract(self, destination): + def extract(self, destination: str): """Extract this release zip to the given destination Args: @@ -100,7 +104,7 @@ def extract(self, destination): print("Extracting: {} -> {}".format(self.file_name, destination)) for info in tqdm(iterable=zip_file.infolist(), total=len(zip_file.infolist())): filename = zip_file.extract(info, path=destination) - mode = info.external_attr >> 16 + mode: int = info.external_attr >> 16 if mode: os.chmod(filename, mode) @@ -108,8 +112,10 @@ def extract(self, destination): class SystemImageReleaseZip(AndroidReleaseZip): """An Android Release Zipfile containing an emulator system image.""" - ABI_CPU_MAP = {"armeabi-v7a": "arm", "arm64-v8a": "arm64", "x86_64": "x86_64", "x86": "x86"} - SHORT_MAP = {"armeabi-v7a": "a32", "arm64-v8a": "a64", "x86_64": "x64", "x86": "x86"} + ABI_CPU_MAP = {"armeabi-v7a": "arm", "arm64-v8a": "arm64", + "x86_64": "x86_64", "x86": "x86"} + SHORT_MAP = {"armeabi-v7a": "a32", + "arm64-v8a": "a64", "x86_64": "x64", "x86": "x86"} SHORT_TAG = { "android": "aosp", "google_apis": "google", @@ -118,10 +124,11 @@ class SystemImageReleaseZip(AndroidReleaseZip): "android-tv": "tv", } - def __init__(self, file_name): + def __init__(self, file_name: str): super().__init__(file_name) if not self.is_system_image(): - raise Exception("{} is not a zip file with a system image".format(file_name)) + raise Exception( + "{} is not a zip file with a system image".format(file_name)) self.props["qemu.cpu"] = self.qemu_cpu() self.props["qemu.tag"] = self.tag() diff --git a/emu/cloud_build.py b/emu/cloud_build.py index 456a7e88..bbc8573e 100644 --- a/emu/cloud_build.py +++ b/emu/cloud_build.py @@ -11,12 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from _typeshed import StrPath +import argparse import itertools import logging import os import re +from typing import Any, Dict, List, Set import yaml +from emu.containers.docker_container import DockerContainer import emu.emu_downloads_menu as emu_downloads_menu from emu.template_writer import TemplateWriter @@ -27,7 +31,7 @@ from emu.utils import mkdir_p -def git_commit_and_push(dest): +def git_commit_and_push(dest: str): """Commit and pushes this cloud build to the git repo. Note that this can be *EXTREMELY* slow as you will likely @@ -41,7 +45,7 @@ def git_commit_and_push(dest): run(["git", "push"], dest) -def create_build_step(for_container, destination): +def create_build_step(for_container: DockerContainer, destination: StrPath) -> Dict[str, Any]: build_destination = os.path.join(destination, for_container.image_name()) logging.info("Generating %s", build_destination) for_container.write(build_destination) @@ -56,7 +60,15 @@ def create_build_step(for_container, destination): return step -def cloud_build(args): +class ArgsCreateCloudBuildDistribuition(argparse.Namespace): + dest: str + img: str + repo: str + sys: bool + emuzip: str + + +def cloud_build(args: ArgsCreateCloudBuildDistribuition): """Prepares the cloud build yaml and all its dependencies. The cloud builder will generate a single cloudbuild.yaml and generates the build @@ -76,15 +88,16 @@ def cloud_build(args): emulator_zip = [args.emuzip] if emulator_zip[0] in ["stable", "canary", "all"]: - emulator_zip = [x.download() for x in emu_downloads_menu.find_emulator(emulator_zip[0])] + emulator_zip = [x.download() + for x in emu_downloads_menu.find_emulator(emulator_zip[0])] elif re.match(r"\d+", emulator_zip[0]): # We must be looking for a build id logging.warning("Treating %s as a build id", emulator_zip[0]) emulator_zip = [emu_downloads_menu.download_build(emulator_zip[0])] - steps = [] - images = [] - emulators = set() + steps: List[Dict[str, Any]] = [] + images: List[str] = [] + emulators: Set[str] = set() emulator_images = [] for (img, emu) in itertools.product(image_zip, emulator_zip): @@ -94,7 +107,8 @@ def cloud_build(args): steps.append(create_build_step(system_container, args.dest)) else: for metrics in [True, False]: - emulator_container = EmulatorContainer(emu, system_container, args.repo, metrics) + emulator_container = EmulatorContainer( + emu, system_container, args.repo, metrics) emulators.add(emulator_container.props["emu_build_id"]) steps.append(create_build_step(emulator_container, args.dest)) images.append(emulator_container.full_name()) @@ -110,7 +124,8 @@ def cloud_build(args): writer = TemplateWriter(args.dest) writer.write_template( "cloudbuild.README.MD", - {"emu_version": ", ".join(emulators), "emu_images": "\n".join(["* {}".format(x) for x in emulator_images])}, + {"emu_version": ", ".join(emulators), "emu_images": "\n".join( + ["* {}".format(x) for x in emulator_images])}, rename_as="README.MD", ) writer.write_template( diff --git a/emu/containers/docker_container.py b/emu/containers/docker_container.py index f714c0f3..bc0d6632 100644 --- a/emu/containers/docker_container.py +++ b/emu/containers/docker_container.py @@ -17,8 +17,11 @@ import sys import shutil import abc +from typing import Any, Dict, List, Mapping, Optional import docker +from docker.client import DockerClient +from docker.models.images import Image from tqdm import tqdm from emu.utils import mkdir_p @@ -28,14 +31,14 @@ class ProgressTracker(object): def __init__(self): # This tracks the information for a given layer id. - self.progress = {} + self.progress: Dict[Any, Any] = {} self.idx = -1 def __del__(self): for k in self.progress: self.progress[k]["tqdm"].close() - def update(self, entry): + def update(self, entry: Dict[Any, Any]): """Update the progress bars given a an entry..""" if "id" not in entry: return @@ -44,7 +47,8 @@ def update(self, entry): if identity not in self.progress: self.idx += 1 self.progress[identity] = { - "tqdm": tqdm(total=0, position=self.idx, unit="B", unit_scale=True), # The progress bar + # The progress bar + "tqdm": tqdm(total=0, position=self.idx, unit="B", unit_scale=True), "total": 0, # Total of bytes we are shipping "status": "", # Status message. "current": 0, # Current of total already send. @@ -57,14 +61,15 @@ def update(self, entry): prog["total"] = total prog["tqdm"].reset(total=total) if prog["status"] != entry["status"]: - prog["tqdm"].set_description("{0} {1}".format(entry.get("status"), identity)) + prog["tqdm"].set_description( + "{0} {1}".format(entry.get("status"), identity)) if current != 0: diff = current - prog["current"] prog["current"] = current prog["tqdm"].update(diff) -class DockerContainer(object): +class DockerContainer: """A Docker Device is capable of creating and launching docker images. In order to successfully create and launch a docker image you must either @@ -72,13 +77,14 @@ class DockerContainer(object): """ TAG_REGEX = re.compile(r"[a-zA-Z0-9][a-zA-Z0-9._-]*:?[a-zA-Z0-9._-]*") + repo: Optional[str] - def __init__(self, repo=None): + def __init__(self, repo: Optional[str] = None): if repo and repo[-1] != "/": repo += "/" self.repo = repo - def get_client(self): + def get_client(self) -> DockerClient: return docker.from_env() def get_api_client(self): @@ -87,7 +93,8 @@ def get_api_client(self): logging.info(api_client.version()) return api_client except: - logging.exception("Failed to create default client, trying domain socket.", exc_info=True) + logging.exception( + "Failed to create default client, trying domain socket.", exc_info=True) api_client = docker.APIClient(base_url="unix://var/run/docker.sock") logging.info(api_client.version()) @@ -95,11 +102,13 @@ def get_api_client(self): def push(self): image = self.full_name() - print("Pushing docker image: {}.. be patient this can take a while!".format(self.full_name())) + print("Pushing docker image: {}.. be patient this can take a while!".format( + self.full_name())) tracker = ProgressTracker() try: - client = docker.from_env() - result = client.images.push(image, "latest", stream=True, decode=True) + client: DockerClient = docker.from_env() + result = client.images.push( + image, "latest", stream=True, decode=True) for entry in result: tracker.update(entry) self.docker_image().tag("{}{}:latest".format(self.repo, self.image_name())) @@ -108,7 +117,7 @@ def push(self): logging.warning("You can manually push the image as follows:") logging.warning("docker push %s", image) - def launch(self, port_map): + def launch(self, port_map: Mapping[str, int]): """Launches the container with the given sha, publishing abd on port, and gRPC on port 8554 Returns the container. @@ -132,15 +141,17 @@ def launch(self, port_map): print("Unable to start the container, try running it as:") print("./run.sh ", image_sha) - def create_container(self, dest): + def create_container(self, dest: str): """Creates the docker container, returning the sha of the container, or None in case of failure.""" identity = None image_tag = self.full_name() print("docker build {} -t {}".format(dest, image_tag)) try: api_client = self.get_api_client() - logging.info("build(path=%s, tag=%s, rm=True, decode=True)", dest, image_tag) - result = api_client.build(path=dest, tag=image_tag, rm=True, decode=True) + logging.info( + "build(path=%s, tag=%s, rm=True, decode=True)", dest, image_tag) + result = api_client.build( + path=dest, tag=image_tag, rm=True, decode=True) for entry in result: if "stream" in entry: sys.stdout.write(entry["stream"]) @@ -151,17 +162,18 @@ def create_container(self, dest): image.tag(self.repo + self.image_name(), "latest") except: logging.exception("Failed to create container.", exc_info=True) - logging.warning("You can manually create the container as follows:") + logging.warning( + "You can manually create the container as follows:") logging.warning("docker build -t %s %s", image_tag, dest) return identity - def clean(self, dest): + def clean(self, dest: str): if os.path.exists(dest): shutil.rmtree(dest) mkdir_p(dest) - def pull(self, image, tag): + def pull(self, image: str, tag: str): """Tries to retrieve the given image and tag. Return True if succeeded, False when failed. @@ -173,7 +185,8 @@ def pull(self, image, tag): for entry in result: tracker.update(entry) except: - logging.info("Failed to retrieve image, this is not uncommon.", exc_info=True) + logging.info( + "Failed to retrieve image, this is not uncommon.", exc_info=True) return False return True @@ -188,7 +201,7 @@ def latest_name(self): return "{}{}:{}".format(self.repo, self.image_name(), "latest") return (self.image_name(), "latest") - def create_cloud_build_step(self, dest): + def create_cloud_build_step(self, dest: str) -> Dict[str, Any]: return { "name": "gcr.io/cloud-builders/docker", "args": [ @@ -201,24 +214,25 @@ def create_cloud_build_step(self, dest): ], } - def docker_image(self): + def docker_image(self) -> Optional[Image]: """The docker local docker image if any Returns: {docker.models.images.Image}: A docker image object, or None. """ client = self.get_client() - for img in client.images.list(): + images: List[Image] = client.images.list() + for img in images: for tag in img.tags: if self.image_name() in tag: return img return None - def available(self): + def available(self) -> bool: """True if this container image is locally available.""" - return self.docker_image() != None + return self.docker_image() is not None - def build(self, dest): + def build(self, dest: str): self.write(dest) return self.create_container(dest) @@ -227,7 +241,7 @@ def can_pull(self): return self.pull(self.image_name(), self.docker_tag()) @abc.abstractmethod - def write(self, destination): + def write(self, destination: str) -> None: """Method responsible for writing the Dockerfile and all necessary files to build a container. Args: @@ -239,7 +253,7 @@ def write(self, destination): raise NotImplementedError() @abc.abstractmethod - def image_name(self): + def image_name(self) -> str: """The image name without the tag used to uniquely identify this image. Raises: @@ -248,11 +262,11 @@ def image_name(self): raise NotImplementedError() @abc.abstractmethod - def docker_tag(self): + def docker_tag(self) -> Any: raise NotImplementedError() @abc.abstractmethod - def depends_on(self): + def depends_on(self) -> Any: """Name of the system image this container is build on.""" raise NotImplementedError() diff --git a/emu/containers/emulator_container.py b/emu/containers/emulator_container.py index 22beb31c..c8563e28 100644 --- a/emu/containers/emulator_container.py +++ b/emu/containers/emulator_container.py @@ -11,12 +11,15 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from _typeshed import StrPath import os import shutil +from typing import Any, Dict, Optional import emu from emu.android_release_zip import AndroidReleaseZip from emu.containers.docker_container import DockerContainer +from emu.containers.system_image_container import SystemImageContainer from emu.template_writer import TemplateWriter @@ -30,7 +33,7 @@ class EmulatorContainer(DockerContainer): """ NO_METRICS_MESSAGE = "No metrics are collected when running this container." - def __init__(self, emulator, system_image_container, repository=None, metrics=False, extra=""): + def __init__(self, emulator: StrPath, system_image_container: SystemImageContainer, repository: Optional[str] = None, metrics: bool = False, extra: str = ""): self.emulator_zip = AndroidReleaseZip(emulator) self.system_image_container = system_image_container self.metrics = metrics @@ -38,7 +41,7 @@ def __init__(self, emulator, system_image_container, repository=None, metrics=Fa if type(extra) is list: extra = " ".join(extra) - cpu = system_image_container.image_labels()["ro.product.cpu.abi"] + cpu: str = system_image_container.image_labels()["ro.product.cpu.abi"] self.extra = self._logger_flags(cpu) + " " + extra metrics_msg = EmulatorContainer.NO_METRICS_MESSAGE @@ -46,7 +49,7 @@ def __init__(self, emulator, system_image_container, repository=None, metrics=Fa self.extra += " -metrics-collection" metrics_msg = EmulatorContainer.METRICS_MESSAGE - self.props = system_image_container.image_labels() + self.props: Dict[str, Any] = system_image_container.image_labels() self.props["playstore"] = self.props["qemu.tag"] == "google_apis_playstore" self.props["metrics"] = metrics_msg self.props["emu_build_id"] = self.emulator_zip.build_id() @@ -59,21 +62,22 @@ def __init__(self, emulator, system_image_container, repository=None, metrics=Fa "qemu.short_abi", "ro.product.cpu.abi", ]: - assert expect in self.props, "{} is not in {}".format(expect, self.props) + assert expect in self.props, "{} is not in {}".format( + expect, self.props) super().__init__(repository) - def _logger_flags(self, cpu): + def _logger_flags(self, cpu: str): if "arm" in cpu: return "-logcat *:V -show-kernel" else: return "-shell-serial file:/tmp/android-unknown/kernel.log -logcat-output /tmp/android-unknown/logcat.log" - def write(self, dest): + def write(self, destination: str): # Make sure the destination directory is empty. - self.clean(dest) + self.clean(destination) - writer = TemplateWriter(dest) + writer = TemplateWriter(destination) writer.write_template("avd/Pixel2.ini", self.props) writer.write_template("avd/Pixel2.avd/config.ini", self.props) @@ -84,7 +88,8 @@ def write(self, dest): rename_as="README.MD", ) - writer.write_template("launch-emulator.sh", {"extra": self.extra, "version": emu.__version__}) + writer.write_template("launch-emulator.sh", + {"extra": self.extra, "version": emu.__version__}) writer.write_template("default.pa", {}) writer.write_template( @@ -93,7 +98,7 @@ def write(self, dest): rename_as="Dockerfile", ) - self.emulator_zip.extract(os.path.join(dest, "emu")) + self.emulator_zip.extract(os.path.join(destination, "emu")) def image_name(self): name = "{}-{}-{}".format( diff --git a/emu/containers/system_image_container.py b/emu/containers/system_image_container.py index 2169c2d8..e2c4429d 100644 --- a/emu/containers/system_image_container.py +++ b/emu/containers/system_image_container.py @@ -11,9 +11,11 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from _typeshed import StrPath import logging import os import shutil +from typing import Union from emu.android_release_zip import SystemImageReleaseZip from emu.platform_tools import PlatformTools @@ -23,7 +25,7 @@ class SystemImageContainer(DockerContainer): - def __init__(self, sort, repo="us-docker.pkg.dev/android-emulator-268719/images"): + def __init__(self, sort: Union[SysImgInfo, StrPath], repo: str = "us-docker.pkg.dev/android-emulator-268719/images"): super().__init__(repo) self.system_image_zip = None self.system_image_info = None @@ -34,23 +36,24 @@ def __init__(self, sort, repo="us-docker.pkg.dev/android-emulator-268719/images" self.system_image_zip = SystemImageReleaseZip(sort) assert "ro.build.version.incremental" in self.system_image_zip.props - def _copy_adb_to(self, dest): + def _copy_adb_to(self, dest: str): """Find adb, or download it if needed.""" logging.info("Retrieving platform-tools") tools = PlatformTools() tools.extract_adb(dest) - def write(self, dest): + def write(self, destination: str): # We do not really want to overwrite if the files already exist. # Make sure the destination directory is empty. if self.system_image_zip is None: - self.system_image_zip = SystemImageReleaseZip(self.system_image_info.download(dest)) + self.system_image_zip = SystemImageReleaseZip( + self.system_image_info.download(destination)) - writer = TemplateWriter(dest) - self._copy_adb_to(dest) + writer = TemplateWriter(destination) + self._copy_adb_to(destination) props = self.system_image_zip.props - dest_zip = os.path.basename(self.system_image_zip.copy(dest)) + dest_zip = os.path.basename(self.system_image_zip.copy(destination)) props["system_image_zip"] = dest_zip writer.write_template( "Dockerfile.system_image", @@ -58,12 +61,13 @@ def write(self, dest): rename_as="Dockerfile", ) - def image_name(self): + def image_name(self) -> str: if self.system_image_info: return self.system_image_info.image_name() if self.system_image_zip: return "sys-{}-{}-{}".format( - self.system_image_zip.api(), self.system_image_zip.short_tag(), self.system_image_zip.short_abi() + self.system_image_zip.api(), self.system_image_zip.short_tag( + ), self.system_image_zip.short_abi() ) def docker_tag(self): diff --git a/emu/docker_config.py b/emu/docker_config.py index 67baf231..bb1b2149 100644 --- a/emu/docker_config.py +++ b/emu/docker_config.py @@ -17,10 +17,7 @@ from appdirs import user_config_dir import os -try: - from configparser import ConfigParser -except: - import ConfigParser +from configparser import ConfigParser class DockerConfig(object): @@ -36,29 +33,29 @@ def __init__(self): def collect_metrics(self): return self._cfg_true("metrics") - def set_collect_metrics(self, to_collect): + def set_collect_metrics(self, to_collect: bool): self._set_cfg("metrics", str(to_collect)) def decided_on_metrics(self): return self._has_cfg("metrics") - def accepted_license(self, license): + def accepted_license(self, license: str): return self._cfg_true(license) - def accept_license(self, license): + def accept_license(self, license: str): self._set_cfg(license) - def _cfg_true(self, label): + def _cfg_true(self, label: str): if self._has_cfg(label): return "True" in self.cfg["DEFAULT"][label] return False - def _set_cfg(self, label, state="True"): + def _set_cfg(self, label: str, state: str = "True"): self._load_config() self.cfg["DEFAULT"][label] = state self._save_config() - def _has_cfg(self, label): + def _has_cfg(self, label: str): return label in self.cfg["DEFAULT"] def _save_config(self): diff --git a/emu/emu_docker.py b/emu/emu_docker.py index 633e2284..cbe982af 100644 --- a/emu/emu_docker.py +++ b/emu/emu_docker.py @@ -18,34 +18,52 @@ import itertools import logging import os -import sys import re +import sys +from typing import List import click import colorlog import emu import emu.emu_downloads_menu as emu_downloads_menu -from emu.docker_config import DockerConfig -from emu.cloud_build import cloud_build +from emu.cloud_build import ArgsCreateCloudBuildDistribuition, cloud_build from emu.containers.emulator_container import EmulatorContainer from emu.containers.system_image_container import SystemImageContainer +from emu.docker_config import DockerConfig + + +class ArgsListImages(argparse.Namespace): + arm: bool + + +class ArgsAcceptLicenses(argparse.Namespace): + accept: bool -def list_images(args): +class ArgsCreateDockerImage(argparse.Namespace): + imgzip: str + emuzip: str + + +class ArgsCreateDockerImageInteractive(argparse.Namespace): + pass + + +def list_images(args: ArgsListImages): """Lists all the publicly available system and emlator images.""" emu_downloads_menu.list_all_downloads(args.arm) -def accept_licenses(args): +def accept_licenses(args: ArgsAcceptLicenses): emu_downloads_menu.accept_licenses(args.accept) -def create_cloud_build_distribuition(args): +def create_cloud_build_distribuition(args: ArgsCreateCloudBuildDistribuition): cloud_build(args) -def metrics_config(args): +def metrics_config(args: ArgsCreateDockerImage): cfg = DockerConfig() if args.metrics: cfg.set_collect_metrics(True) @@ -62,25 +80,26 @@ def metrics_config(args): return cfg -def create_docker_image(args): +def create_docker_image(args: ArgsCreateDockerImage): """Create a directory containing all the necessary ingredients to construct a docker image. Returns the created DockerDevice objects. """ cfg = metrics_config(args) - imgzip = [args.imgzip] + imgzip: List[str] = [args.imgzip] if not os.path.exists(imgzip[0]): imgzip = emu_downloads_menu.find_image(imgzip[0]) - emuzip = [args.emuzip] + emuzip: List[str] = [args.emuzip] if emuzip[0] in ["stable", "canary", "all"]: - emuzip = [x.download() for x in emu_downloads_menu.find_emulator(emuzip[0])] + emuzip = [x.download() + for x in emu_downloads_menu.find_emulator(emuzip[0])] elif re.match(r"\d+", emuzip[0]): # We must be looking for a build id logging.info("Treating %s as a build id", emuzip[0]) emuzip = [emu_downloads_menu.download_build(emuzip[0])] - devices = [] + devices: List[EmulatorContainer] = [] logging.info("Using repo %s", args.repo) for (img, emu) in itertools.product(imgzip, emuzip): logging.info("Processing %s, %s", img, emu) @@ -95,7 +114,8 @@ def create_docker_image(args): if args.sys: continue - emu_docker = EmulatorContainer(emu, sys_docker, args.repo, cfg.collect_metrics(), args.extra) + emu_docker = EmulatorContainer( + emu, sys_docker, args.repo, cfg.collect_metrics(), args.extra) emu_docker.build(args.dest) if args.start: @@ -108,7 +128,7 @@ def create_docker_image(args): return devices -def create_docker_image_interactive(args): +def create_docker_image_interactive(args: ArgsCreateDockerImage): """Interactively create a docker image by selecting the desired combination from a menu.""" img = emu_downloads_menu.select_image(args.arm) or sys.exit(1) emulator = emu_downloads_menu.select_emulator() or sys.exit(1) @@ -140,10 +160,12 @@ def main(): """Entry point that parses the argument, and invokes the proper functions.""" parser = argparse.ArgumentParser( - description="List and create emulator docker containers ({}).".format(emu.__version__), + description="List and create emulator docker containers ({}).".format( + emu.__version__), formatter_class=argparse.ArgumentDefaultsHelpFormatter, ) - parser.add_argument("-v", "--verbose", dest="verbose", action="store_true", help="Set verbose logging") + parser.add_argument("-v", "--verbose", dest="verbose", + action="store_true", help="Set verbose logging", type=bool) subparsers = parser.add_subparsers() @@ -155,13 +177,15 @@ def main(): "--arm", action="store_true", help="Display arm images. Note that arm images are not hardware accelerated and are *extremely* slow.", + type=bool ) list_parser.set_defaults(func=list_images) license_parser = subparsers.add_parser( "licenses", help="Lists all licenses and gives you a chance to accept or reject them." ) - license_parser.add_argument("--accept", action="store_true", help="Accept all licensens after displaying them.") + license_parser.add_argument( + "--accept", action="store_true", help="Accept all licensens after displaying them.") license_parser.set_defaults(func=accept_licenses) create_parser = subparsers.add_parser( @@ -192,7 +216,8 @@ def main(): create_parser.add_argument( "--dest", default=os.path.join(os.getcwd(), "src"), help="Destination for the generated docker files" ) - create_parser.add_argument("--tag", default="", help="Docker tag, defaults to the emulator build id") + create_parser.add_argument( + "--tag", default="", help="Docker tag, defaults to the emulator build id") create_parser.add_argument( "--repo", default="us-docker.pkg.dev/android-emulator-268719/images", @@ -212,14 +237,16 @@ def main(): action="store_true", help="When enabled, the emulator will send usage metrics to Google when the container exists gracefully.", ) - create_parser.add_argument("--no-metrics", action="store_true", help="Disables the collection of usage metrics.") + create_parser.add_argument( + "--no-metrics", action="store_true", help="Disables the collection of usage metrics.") create_parser.add_argument( "--start", action="store_true", help="Starts the container after creating it. " "All exposed ports are forwarded, and your private adbkey (if available) is injected but not stored.", ) - create_parser.add_argument("--sys", action="store_true", help="Process system image layer only.") + create_parser.add_argument( + "--sys", action="store_true", help="Process system image layer only.") create_parser.set_defaults(func=create_docker_image) create_inter = subparsers.add_parser( @@ -269,7 +296,8 @@ def main(): dist_parser.add_argument( "--dest", default=os.path.join(os.getcwd(), "src"), help="Destination for the generated docker files" ) - dist_parser.add_argument("--git", action="store_true", help="Create a git commit, and push to destination.") + dist_parser.add_argument("--git", action="store_true", + help="Create a git commit, and push to destination.") dist_parser.add_argument( "--sys", action="store_true", help="Write system image steps, otherwise write emulator steps." ) @@ -291,7 +319,8 @@ def main(): # Configure logger. lvl = logging.DEBUG if args.verbose else logging.WARNING handler = colorlog.StreamHandler() - handler.setFormatter(colorlog.ColoredFormatter("%(log_color)s%(levelname)s:%(message)s")) + handler.setFormatter(colorlog.ColoredFormatter( + "%(log_color)s%(levelname)s:%(message)s")) logging.root = colorlog.getLogger("root") logging.root.addHandler(handler) logging.root.setLevel(lvl) diff --git a/emu/emu_downloads_menu.py b/emu/emu_downloads_menu.py index 70063fd1..8d978839 100644 --- a/emu/emu_downloads_menu.py +++ b/emu/emu_downloads_menu.py @@ -18,25 +18,29 @@ import logging import os import re +from typing import Dict, List, Optional import xml.etree.ElementTree as ET -import zipfile import click import requests from consolemenu import SelectionMenu -from emu.utils import download -from emu.docker_config import DockerConfig +from .utils import download +from .docker_config import DockerConfig + +ANDROID_REPOSITORY = os.environ.get( + "ANDROID_REPOSITORY", "https://dl.google.com/") SYSIMG_REPOS = [ - "https://dl.google.com/android/repository/sys-img/android/sys-img2-1.xml", - "https://dl.google.com/android/repository/sys-img/google_apis/sys-img2-1.xml", - "https://dl.google.com/android/repository/sys-img/google_apis_playstore/sys-img2-1.xml", - "https://dl.google.com/android/repository/sys-img/android-tv/sys-img2-1.xml", + f"{ANDROID_REPOSITORY}/android/repository/sys-img/android/sys-img2-1.xml", + f"{ANDROID_REPOSITORY}/android/repository/sys-img/google_apis/sys-img2-1.xml", + f"{ANDROID_REPOSITORY}/android/repository/sys-img/google_apis_playstore/sys-img2-1.xml", + f"{ANDROID_REPOSITORY}/android/repository/sys-img/android-tv/sys-img2-1.xml", ] -EMU_REPOS = ["https://dl.google.com/android/repository/repository2-1.xml"] +EMU_REPOS = [f"{ANDROID_REPOSITORY}/android/repository/repository2-1.xml"] -CHANNEL_MAPPING = {"channel-0": "stable", "channel-1": "beta", "channel-2": "dev", "channel-3": "canary"} +CHANNEL_MAPPING = {"channel-0": "stable", "channel-1": "beta", + "channel-2": "dev", "channel-3": "canary"} API_LETTER_MAPPING = { "10": "G", @@ -64,10 +68,12 @@ class License(object): """Represents a license.""" + text: str + name: str - def __init__(self, license): + def __init__(self, license: ET.Element): self.name = license.attrib["id"] - self.text = license.text + self.text = license.text if license.text is not None else "" self.cfg = DockerConfig() def accept(self): @@ -92,26 +98,33 @@ def __str__(self): def __hash__(self): return hash(self.name) - def __eq__(self, other): - return self.__class__ == other.__class__ and self.name == other.name + def __eq__(self, other: object): + if not isinstance(other, self.__class__): + return False + + return self.name == other.name class LicensedObject(object): """A dowloadable object for which a license needs to be accepted.""" - def __init__(self, pkg, licenses): - self.license = licenses[pkg.find("uses-license").attrib["ref"]] + def __init__(self, pkg: ET.Element, licenses: Dict[str, License]): + if lic := pkg.find("uses-license"): + self.license = licenses[lic.attrib["ref"]] - def download(self, url, dest): + def download(self, url: str, dest: str): """"Downloads the released pacakage to the dest.""" if self.license.accept(): return download(url, dest) + raise Exception("License not accepted") + class SysImgInfo(LicensedObject): """Provides information about a released system image.""" - SHORT_MAP = {"armeabi-v7a": "a32", "arm64-v8a": "a64", "x86_64": "x64", "x86": "x86"} + SHORT_MAP = {"armeabi-v7a": "a32", + "arm64-v8a": "a64", "x86_64": "x64", "x86": "x86"} SHORT_TAG = { "android": "aosp", "google_apis": "google", @@ -120,140 +133,173 @@ class SysImgInfo(LicensedObject): "android-tv": "tv", } - def __init__(self, pkg, licenses): + def __init__(self, pkg: ET.Element, licenses: Dict[str, License]): super(SysImgInfo, self).__init__(pkg, licenses) details = pkg.find("type-details") - self.api = details.find("api-level").text + + if not details: + raise Exception + + if apilevel := details.find("api-level"): + self.api = apilevel.text codename = details.find("codename") if codename is None: - if self.api in API_LETTER_MAPPING: + if self.api and self.api in API_LETTER_MAPPING: self.letter = API_LETTER_MAPPING[self.api] else: self.letter = "A" # A indicates unknown code. else: self.letter = codename.text - self.tag = details.find("tag").find("id").text + if (tag := details.find("tag")) and (tag_id := tag.find("id")): + self.tag = tag_id.text if self.tag == "default": self.tag = "android" - self.abi = details.find("abi").text + if abi := details.find("abi"): + self.abi = abi.text # prefer a url for a Linux host in case there are multiple url_element = pkg.find(".//archive[host-os='linux']/complete/url") # fallback is to pick the first url if url_element is None: url_element = pkg.find(".//url") + assert url_element is not None self.zip = url_element.text - self.url = "https://dl.google.com/android/repository/sys-img/%s/%s" % (self.tag, self.zip) + self.url = f"{ANDROID_REPOSITORY}/android/repository/sys-img/{self.tag}/{self.zip}" def short_tag(self): + if self.tag is None: + raise Exception + return self.SHORT_TAG[self.tag] def short_abi(self): + if self.abi is None: + raise Exception + return self.SHORT_MAP[self.abi] - def image_name(self): + def image_name(self) -> str: return "sys-{}-{}-{}".format(self.api, self.short_tag(), self.short_abi()) def download_name(self): return "sys-img-{}-{}-{}-{}.zip".format(self.tag, self.api, self.letter, self.abi) - def download(self, dest=None): + def download(self, dest: Optional[str] = None): dest = os.path.join(dest or os.getcwd(), self.download_name()) - print("Downloading system image: {} {} {} {} to {}".format(self.tag, self.api, self.letter, self.abi, dest)) + print("Downloading system image: {} {} {} {} to {}".format( + self.tag, self.api, self.letter, self.abi, dest)) return super(SysImgInfo, self).download(self.url, dest) def __str__(self): - return "{} {} {}".format(self.letter, self.tag, self.abi) + return f"{self.letter} {self.tag} {self.abi}" class EmuInfo(LicensedObject): """Provides information about a released emulator.""" - def __init__(self, pkg, licenses): + def __init__(self, pkg: ET.Element, licenses: Dict[str, License]): super(EmuInfo, self).__init__(pkg, licenses) rev = pkg.find("revision") - rev_major = rev.find("major").text - rev_minor = rev.find("minor").text - rev_micro = rev.find("micro").text + if rev is None: + raise Exception + + rev_major = major.text if (major := rev.find("major")) else "0" + rev_minor = minor.text if (minor := rev.find("minor")) else "0" + rev_micro = micro.text if (micro := rev.find("micro")) else "0" archives = pkg.find("archives") channel = pkg.find("channelRef") - self.channel = CHANNEL_MAPPING[channel.attrib["ref"]] + self.channel = CHANNEL_MAPPING[channel.attrib["ref"] + ] if channel is not None else None self.version = "%s.%s.%s" % (rev_major, rev_minor, rev_micro) - self.urls = {} + self.urls: Dict[str, str] = {} - for archive in archives: - url = archive.find(".//url").text - hostos = archive.find("host-os").text - self.urls[hostos] = "https://dl.google.com/android/repository/%s" % url + if archives is not None: + for archive in archives: + url = archive_url.text if ( + archive_url := archive.find(".//url")) else None + hostos = host_os.text if ( + host_os := archive.find("host-os")) else None + if hostos is not None: + self.urls[hostos] = f"{ANDROID_REPOSITORY}/android/repository/{url}" def download_name(self): return "emulator-{}.zip".format(self.version) - def download(self, hostos="linux", dest=None): + def download(self, hostos: str = "linux", dest: Optional[str] = None): """"Downloads the released pacakage for the given os to the dest.""" dest = dest or os.path.join(os.getcwd(), self.download_name()) - print("Downloading emulator: {} {} to {}".format(self.channel, self.version, dest)) + print("Downloading emulator: {} {} to {}".format( + self.channel, self.version, dest)) return super(EmuInfo, self).download(self.urls[hostos], dest) def __str__(self): return "{} {}".format(self.channel, self.version) -def get_images_info(arm=False): +def get_images_info(arm: bool = False): """Gets all the publicly available system images from the Android Image Repos. Returns a list of AndroidSystemImages that were found and (hopefully) can boot.""" - xml = [] + _xml: List[bytes] = [] for url in SYSIMG_REPOS: response = requests.get(url) if response.status_code == 200: - xml.append(response.content) + _xml.append(response.content) - licenses = [License(p) for x in xml for p in ET.fromstring(x).findall("license")] + licenses = [License(p) + for x in _xml for p in ET.fromstring(x).findall("license")] licenses = dict([(x.name, x) for x in [y for y in licenses]]) - xml = [ET.fromstring(x).findall("remotePackage") for x in xml] + xml = [ET.fromstring(x).findall("remotePackage") for x in _xml] # Flatten the list of lists into a system image objects. infos = [SysImgInfo(item, licenses) for sublist in xml for item in sublist] # Filter only for intel images that we know that work - x86_64_imgs = [info for info in infos if info.abi == "x86_64" and info.letter >= MIN_REL_X64] - x86_imgs = [info for info in infos if info.abi == "x86" and info.letter >= MIN_REL_I386] + x86_64_imgs = [info for info in infos if info.abi == + "x86_64" and info.letter and info.letter >= MIN_REL_X64] + x86_imgs = [info for info in infos if info.abi == + "x86" and info.letter and info.letter >= MIN_REL_I386] slow = [] if arm: - slow = [info for info in infos if info.abi.startswith("arm")] - all_imgs = sorted(x86_64_imgs + x86_imgs + slow, key=lambda x: x.api + x.tag) + slow = [ + info for info in infos if info.abi and info.abi.startswith("arm")] + all_imgs = sorted(x86_64_imgs + x86_imgs + slow, + key=lambda x: (x.api or "") + (x.tag or "")) # Filter out windows/darwin images. return [i for i in all_imgs if "windows" not in i.url and "darwin" not in i.url] -def find_image(regexpr): +def find_image(regexpr: str): reg = re.compile(regexpr) all_images = get_images_info(True) matches = [img for img in all_images if reg.match(str(img))] logging.info( - "Found %s matching images: %s from %s", regexpr, [str(x) for x in matches], [str(x) for x in all_images] + "Found %s matching images: %s from %s", regexpr, [str(x) for x in matches], [ + str(x) for x in all_images] ) if not matches: raise Exception( - "No system image found matching {}. Run the list command to list available images".format(regexpr) + "No system image found matching {}. Run the list command to list available images".format( + regexpr) ) return matches -def find_emulator(channel): +def find_emulator(channel: str): """Displayes an interactive menu to select a released emulator binary. Returns a ImuInfo object with the choice or None if the user aborts.""" - emu_infos = [x for x in get_emus_info() if "linux" in x.urls and (channel == "all" or x.channel == channel)] - logging.info("Found %s matching images: %s", channel, [str(x) for x in emu_infos]) + emu_infos = [x for x in get_emus_info() if "linux" in x.urls and ( + channel == "all" or x.channel == channel)] + logging.info("Found %s matching images: %s", + channel, [str(x) for x in emu_infos]) if not emu_infos: raise Exception("No emulator found in channel {}".format(channel)) return emu_infos @@ -263,21 +309,23 @@ def get_emus_info(): """Gets all the publicly available emulator builds. Returns a list of EmuInfo items that were found.""" - xml = [] + _xml: List[bytes] = [] for url in EMU_REPOS: response = requests.get(url) if response.status_code == 200: - xml.append(response.content) + _xml.append(response.content) - licenses = [License(p) for x in xml for p in ET.fromstring(x).findall("license")] + licenses = [License(p) + for x in _xml for p in ET.fromstring(x).findall("license")] licenses = dict([(x.name, x) for x in [y for y in licenses]]) - xml = [[p for p in ET.fromstring(x).findall("remotePackage") if "emulator" == p.attrib["path"]] for x in xml] + xml = [[p for p in ET.fromstring(x).findall( + "remotePackage") if "emulator" == p.attrib["path"]] for x in _xml] # Flatten the list of lists into a system image objects. infos = [EmuInfo(item, licenses) for sublist in xml for item in sublist] return infos -def select_image(arm): +def select_image(arm: bool): """Displayes an interactive menu to select a released system image. Returns a SysImgInfo object with the choice or None if the user aborts.""" @@ -285,7 +333,9 @@ def select_image(arm): display = [ "{} {} {} ({})".format(img_info.api, img_info.letter, img_info.tag, img_info.abi) for img_info in img_infos ] - selection = SelectionMenu.get_selection(display, title="Select the system image you wish to use:") + selection: int = SelectionMenu.get_selection( + display, title="Select the system image you wish to use:") + return img_infos[selection] if selection < len(img_infos) else None @@ -294,40 +344,47 @@ def select_emulator(): Returns a ImuInfo object with the choice or None if the user aborts.""" emu_infos = [x for x in get_emus_info() if "linux" in x.urls] - display = ["EMU {} {}".format(emu_info.channel, emu_info.version) for emu_info in emu_infos] - selection = SelectionMenu.get_selection(display, title="Select the emulator you wish to use:") + display = ["EMU {} {}".format( + emu_info.channel, emu_info.version) for emu_info in emu_infos] + selection: int = SelectionMenu.get_selection( + display, title="Select the emulator you wish to use:") return emu_infos[selection] if selection < len(emu_infos) else None -def list_all_downloads(arm): +def list_all_downloads(arm: bool): """Lists all available downloads that can be used to construct a Docker image.""" img_infos = get_images_info(arm) emu_infos = get_emus_info() for img_info in img_infos: - print("SYSIMG {} {} {} {} {}".format(img_info.letter, img_info.tag, img_info.abi, img_info.api, img_info.url)) + print("SYSIMG {} {} {} {} {}".format(img_info.letter, + img_info.tag, img_info.abi, img_info.api, img_info.url)) for emu_info in emu_infos: for (hostos, url) in list(emu_info.urls.items()): - print("EMU {} {} {} {}".format(emu_info.channel, emu_info.version, hostos, url)) + print("EMU {} {} {} {}".format( + emu_info.channel, emu_info.version, hostos, url)) -def download_build(build_id, dest=None): +def download_build(build_id: str, dest: Optional[str] = None): """Download a public build with the given build id.""" - dest = dest or os.path.join(os.getcwd(), "sdk-repo-linux-emulator-{}.zip".format(build_id)) + dest = dest or os.path.join( + os.getcwd(), "sdk-repo-linux-emulator-{}.zip".format(build_id)) uri = ( "https://ci.android.com/builds/submitted/{0}/sdk_tools_linux/latest/raw/sdk-repo-linux-emulator-{0}.zip".format( build_id ) ) print("Downloading emulator build {} ({}) to {}".format(build_id, uri, dest)) - logging.warning("Downloading build from ci server, these builds might not have been tested extensively.") + logging.warning( + "Downloading build from ci server, these builds might not have been tested extensively.") download(uri, dest) return dest -def accept_licenses(force_accept): - licenses = set([x.license for x in get_emus_info()] + [x.license for x in get_images_info()]) +def accept_licenses(force_accept: bool): + licenses = set([x.license for x in get_emus_info()] + + [x.license for x in get_images_info()]) to_accept = [x for x in licenses if not x.is_accepted()] diff --git a/emu/platform_tools.py b/emu/platform_tools.py index 4bd3a396..f9143eb7 100644 --- a/emu/platform_tools.py +++ b/emu/platform_tools.py @@ -12,27 +12,29 @@ # See the License for the specific language governing permissions and # limitations under the License. import os +from typing import Optional import zipfile from emu.utils import download # Platform tools, needed to get adb. -PLATFORM_TOOLS_URL = "https://dl.google.com/android/repository/platform-tools_r29.0.5-linux.zip" +PLATFORM_TOOLS_URL = f'{os.environ.get("ANDROID_REPOSITORY", "https://dl.google.com")}/android/repository/platform-tools_r29.0.5-linux.zip' class PlatformTools(object): """The platform tools zip file. It will be downloaded on demand.""" - def __init__(self, fname=None): + def __init__(self, fname: Optional[str] = None): self.platform = fname - def extract_adb(self, dest): + def extract_adb(self, dest: str): if not self.platform: self.platform = self.download() with zipfile.ZipFile(self.platform, "r") as plzip: plzip.extract("platform-tools/adb", dest) - def download(self, dest=None): - dest = dest or os.path.join(os.getcwd(), "platform-tools-latest-linux.zip") + def download(self, dest: Optional[str] = None): + dest = dest or os.path.join( + os.getcwd(), "platform-tools-latest-linux.zip") print("Downloading platform tools to {}".format(dest)) return download(PLATFORM_TOOLS_URL, dest) diff --git a/emu/process.py b/emu/process.py index d9216a31..79edf470 100644 --- a/emu/process.py +++ b/emu/process.py @@ -15,17 +15,14 @@ # limitations under the License. import os import logging -import platform +from typing import IO, Any, List, Mapping, Optional, Tuple, Union -try: - from queue import Queue -except ImportError: - from Queue import Queue +from queue import Queue import subprocess from threading import Thread -def _reader(pipe, queue): +def _reader(pipe: IO[bytes], queue: Queue[Union[Tuple[IO[bytes], bytes], None]]): try: with pipe: for line in iter(pipe.readline, b""): @@ -34,9 +31,9 @@ def _reader(pipe, queue): queue.put(None) -def log_std_out(proc): +def log_std_out(proc: subprocess.Popen[bytes]): """Logs the output of the given process.""" - q = Queue() + q: Queue[Union[Tuple[IO[bytes], bytes], None]] = Queue() Thread(target=_reader, args=[proc.stdout, q]).start() Thread(target=_reader, args=[proc.stderr, q]).start() for _ in range(2): @@ -47,16 +44,18 @@ def log_std_out(proc): pass -def run(cmd, cwd=None, extra_env=None): +def run(cmd: List[Any], cwd: Optional[str] = None, extra_env: Optional[Mapping[str, Any]] = None): if cwd: cwd = os.path.abspath(cwd) cmd = [str(c) for c in cmd] logging.info("Running: %s in %s", " ".join(cmd), cwd) - proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=cwd, env=extra_env) + proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, cwd=cwd, env=extra_env) log_std_out(proc) proc.wait() if proc.returncode != 0: - raise Exception("Failed to run %s - %s" % (" ".join(cmd), proc.returncode)) + raise Exception("Failed to run %s - %s" % + (" ".join(cmd), proc.returncode)) diff --git a/emu/template_writer.py b/emu/template_writer.py index d7f2b3ce..ebf88a7b 100644 --- a/emu/template_writer.py +++ b/emu/template_writer.py @@ -11,29 +11,23 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import errno import logging import os +from typing import Any, Dict, Optional -import docker from jinja2 import Environment, PackageLoader -from packaging import version from emu.utils import mkdir_p -import emu -from emu.android_release_zip import AndroidReleaseZip -from emu.platform_tools import PlatformTools - -class TemplateWriter(object): +class TemplateWriter: """A Template writer uses jinja to fill in templates. All the templates should live in the ./emu/templates directory. """ - def __init__(self, out_dir): + def __init__(self, out_dir: str): """Creates a template writer that writes templates to the out_dir The out directory will be created if needed. @@ -41,19 +35,19 @@ def __init__(self, out_dir): self.env = Environment(loader=PackageLoader("emu", "templates")) self.dest = out_dir - def _jinja_safe_dict(self, props): + def _jinja_safe_dict(self, props: Dict[str, Any]): """Replace all the . with _ in the keys.""" - normalized = {} + normalized: Dict[str, Any] = {} for k, v in props.items(): normalized[k.replace(".", "_")] = v return normalized - def write_template(self, template_file, template_dict, rename_as=None): + def write_template(self, template_file: str, template_dict: Dict[str, Any], rename_as: Optional[str] = None): """Fill out the given template, writing it to the destination directory.""" dest_name = rename_as if rename_as else template_file return self._write_template_to(template_file, os.path.join(self.dest, dest_name), template_dict) - def _write_template_to(self, tmpl_file, dest_file, template_dict): + def _write_template_to(self, tmpl_file: str, dest_file: str, template_dict: Dict[str, Any]): """Loads the the given template, writing it to the dest_file Note: the template will be written {dest_dir}/{tmpl_file}, @@ -62,6 +56,7 @@ def _write_template_to(self, tmpl_file, dest_file, template_dict): template = self.env.get_template(tmpl_file) safe_dict = self._jinja_safe_dict(template_dict) mkdir_p(os.path.dirname(dest_file)) - logging.info("Writing: %s -> %s with %s", tmpl_file, dest_file, safe_dict) + logging.info("Writing: %s -> %s with %s", + tmpl_file, dest_file, safe_dict) with open(dest_file, "wb") as dfile: dfile.write(template.render(safe_dict).encode("utf-8")) diff --git a/emu/utils.py b/emu/utils.py index 90de6622..7f9d6283 100644 --- a/emu/utils.py +++ b/emu/utils.py @@ -38,13 +38,13 @@ } -def mkdir_p(path): +def mkdir_p(path: str): """Make directories recursively if path not exists.""" if not os.path.exists(path): os.makedirs(path) -def download(url, dest): +def download(url: str, dest: str): """Downloads the given url to the given destination with a progress bar. This function will immediately return if the file already exists. @@ -63,7 +63,7 @@ def download(url, dest): return dest -def api_codename(api): +def api_codename(api: str): """First letter of the desert, if any.""" if api in API_LETTER_MAPPING: return API_LETTER_MAPPING[api]