diff --git a/src/gcpath/cache.py b/src/gcpath/cache.py index 95cc46b..492cd18 100644 --- a/src/gcpath/cache.py +++ b/src/gcpath/cache.py @@ -19,7 +19,7 @@ CACHE_DIR = Path.home() / ".gcpath" CACHE_FILE = CACHE_DIR / "cache.json" -CACHE_VERSION = 1 +CACHE_VERSION = 2 DEFAULT_CACHE_TTL_HOURS = 72 @@ -38,7 +38,9 @@ class CacheInfo: scope: Optional[str] = None -def _hierarchy_to_dict(hierarchy: Hierarchy, scope: Optional[str] = None) -> Dict[str, Any]: +def _hierarchy_to_dict( + hierarchy: Hierarchy, scope: Optional[str] = None +) -> Dict[str, Any]: """Serializes the Hierarchy object to a dictionary.""" organizations_data = [] @@ -53,6 +55,8 @@ def _hierarchy_to_dict(hierarchy: Hierarchy, scope: Optional[str] = None) -> Dic "display_name": project.display_name, "parent": project.parent, "folder_name": project.folder.name if project.folder else None, + "labels": project.labels, + "tags": project.tags, } if project.organization: @@ -76,6 +80,8 @@ def _hierarchy_to_dict(hierarchy: Hierarchy, scope: Optional[str] = None) -> Dic "display_name": folder.display_name, "ancestors": folder.ancestors, "parent": folder.parent, + "labels": folder.labels, + "tags": folder.tags, } for name, folder in org_node.folders.items() } @@ -101,7 +107,7 @@ def _dict_to_hierarchy(data: Dict[str, Any]) -> Optional[Hierarchy]: """Deserializes a dictionary to a Hierarchy object.""" if data.get("version") != CACHE_VERSION: logger.warning( - f"Cache version mismatch (expected {CACHE_VERSION}, got {data.get('version')}). Ignoring cache." + "Cache version mismatch (expected %s). Ignoring cache.", CACHE_VERSION ) return None @@ -130,6 +136,8 @@ def _dict_to_hierarchy(data: Dict[str, Any]) -> Optional[Hierarchy]: ancestors=folder_data["ancestors"], parent=folder_data["parent"], organization=node, + labels=folder_data.get("labels", {}), + tags=folder_data.get("tags", {}), ) node.folders[folder_name] = folder @@ -145,6 +153,8 @@ def _dict_to_hierarchy(data: Dict[str, Any]) -> Optional[Hierarchy]: parent=p_data["parent"], organization=node, folder=parent_folder, + labels=p_data.get("labels", {}), + tags=p_data.get("tags", {}), ) projects.append(project) @@ -157,6 +167,8 @@ def _dict_to_hierarchy(data: Dict[str, Any]) -> Optional[Hierarchy]: parent=p_data["parent"], organization=None, folder=None, + labels=p_data.get("labels", {}), + tags=p_data.get("tags", {}), ) projects.append(project) @@ -257,9 +269,7 @@ def get_cache_info( if timestamp_str: try: cached_time = datetime.fromisoformat(timestamp_str) - age_seconds = ( - datetime.now(timezone.utc) - cached_time - ).total_seconds() + age_seconds = (datetime.now(timezone.utc) - cached_time).total_seconds() except (ValueError, TypeError): pass diff --git a/src/gcpath/cli.py b/src/gcpath/cli.py index 54afbfd..bfad247 100644 --- a/src/gcpath/cli.py +++ b/src/gcpath/cli.py @@ -58,7 +58,11 @@ _RESOURCE_PREFIX_PROJECTS = "projects/" _RESOURCE_PREFIX_FOLDERS = "folders/" _RESOURCE_PREFIX_ORGS = "organizations/" -_RESOURCE_PREFIXES = (_RESOURCE_PREFIX_ORGS, _RESOURCE_PREFIX_FOLDERS, _RESOURCE_PREFIX_PROJECTS) +_RESOURCE_PREFIXES = ( + _RESOURCE_PREFIX_ORGS, + _RESOURCE_PREFIX_FOLDERS, + _RESOURCE_PREFIX_PROJECTS, +) _REFRESH_HELP = "Force a refresh of the cache from the GCP API" _VALID_TYPE_FILTERS = ("folder", "project", "organization") @@ -82,9 +86,45 @@ def _matches_type( return get_resource_type(obj) == type_filter +def _parse_label_filter(label_str: str) -> tuple: + """Parse a label filter string 'key=value' into (key, value) tuple.""" + if "=" not in label_str: + return (label_str, None) + key, _, value = label_str.partition("=") + return (key, value) + + +def _matches_metadata( + obj: Union[OrganizationNode, Folder, Project], + filters: List[str], + attr: str, +) -> bool: + """Check if a resource matches ALL filters (ANDed) for a given metadata attribute.""" + if not filters: + return True + metadata = getattr(obj, attr, {}) + for f in filters: + key, value = _parse_label_filter(f) + if value is None: + if key not in metadata: + return False + elif metadata.get(key) != value: + return False + return True + + +def _format_metadata(obj: Union[OrganizationNode, Folder, Project], attr: str) -> str: + """Format a metadata dict attribute as comma-separated key=value string.""" + metadata = getattr(obj, attr, {}) + if not metadata: + return "" + return ", ".join(f"{k}={v}" for k, v in sorted(metadata.items())) + + @dataclass class _ScopeResult: """Result of resolving a resource scope argument.""" + target_resource_name: Optional[str] target_org_name: Optional[str] filter_orgs: Optional[List[str]] @@ -127,6 +167,7 @@ def _resolve_scope( filter_orgs=filter_orgs, ) + app = typer.Typer( name="gcpath", help="Google Cloud Platform resource hierarchy utility", @@ -210,7 +251,9 @@ def main( gcpath - Google Cloud Platform resource hierarchy utility """ if json_output and yaml_output: - error_console.print("[red]Error:[/red] --json and --yaml are mutually exclusive.") + error_console.print( + "[red]Error:[/red] --json and --yaml are mutually exclusive." + ) raise typer.Exit(code=1) ctx.ensure_object(dict) @@ -363,7 +406,9 @@ def _try_read_cache( info = get_cache_info() age_str = _format_age(info.age_seconds) if info.age_seconds else "unknown" - error_console.print(f"[dim]Using cached data ({age_str} ago). Use -F to refresh.[/dim]") + error_console.print( + f"[dim]Using cached data ({age_str} ago). Use -F to refresh.[/dim]" + ) # Apply org filter to cached data if filter_orgs: @@ -381,6 +426,8 @@ def _load_hierarchy( recursive: bool, force_refresh: bool, filter_orgs: Optional[List[str]] = None, + include_labels: bool = False, + include_tags: bool = False, ) -> Hierarchy: """Helper to load hierarchy with cache orchestration. @@ -409,6 +456,8 @@ def _load_hierarchy( via_resource_manager=not ctx.obj["use_asset_api"], scope_resource=scope_resource, recursive=effective_recursive, + include_labels=include_labels, + include_tags=include_tags, ) if is_cacheable: @@ -428,6 +477,93 @@ class _HierarchyCommandContext: target_path: Optional[str] +@dataclass +class _ParsedResource: + """Result of parsing a resource argument for tree-like commands.""" + + target_resource_name: Optional[str] + target_org_name: Optional[str] + target_path: Optional[str] + + +def _parse_resource_arg(effective_resource: str, command_name: str) -> _ParsedResource: + """Parse a resource argument, resolving its path and org name. + + Raises typer.Exit(code=1) if the resource is a project. + """ + if effective_resource.startswith(_RESOURCE_PREFIX_PROJECTS): + rprint( + f"[red]Error:[/red] '{command_name}' command does not support starting " + "from a project (projects are leaf nodes)." + ) + raise typer.Exit(code=1) + + target_org_name = None + target_resource_name = None + target_path = None + + try: + target_path = _clean_resolve_path( + Hierarchy.resolve_ancestry(effective_resource) + ) + if target_path.startswith("//"): + path_parts = target_path[2:].split("/") + if path_parts: + target_org_name = unquote(path_parts[0]) + + if effective_resource.startswith( + (_RESOURCE_PREFIX_FOLDERS, _RESOURCE_PREFIX_ORGS) + ): + target_resource_name = effective_resource + except Exception: + if effective_resource.startswith("//"): + target_path = effective_resource + else: + raise + + return _ParsedResource(target_resource_name, target_org_name, target_path) + + +def _find_target_nodes( + hierarchy: Hierarchy, + target_resource_name: str, + target_path: Optional[str], +) -> List[Union[OrganizationNode, Folder]]: + """Find nodes matching target_resource_name in the hierarchy. + + Creates a synthetic folder if the target folder is not found but the + hierarchy has organizations. + """ + if target_resource_name.startswith(_RESOURCE_PREFIX_ORGS): + for o in hierarchy.organizations: + if o.organization.name == target_resource_name: + return [o] + return [] + + # Folder lookup + for o in hierarchy.organizations: + if target_resource_name in o.folders: + return [o.folders[target_resource_name]] + + # Create synthetic folder from resolved path + if not target_path or not hierarchy.organizations: + return [] + + path_parts = target_path[2:].split("/") if target_path.startswith("//") else [] + display_name = path_parts[-1] if path_parts else target_resource_name.split("/")[-1] + + org_node = hierarchy.organizations[0] + synthetic_folder = Folder( + name=target_resource_name, + display_name=display_name, + ancestors=[target_resource_name, org_node.organization.name], + organization=org_node, + parent=org_node.organization.name, + ) + org_node.folders[target_resource_name] = synthetic_folder + return [synthetic_folder] + + def _prepare_hierarchy_command( ctx: typer.Context, command_name: str, @@ -435,17 +571,14 @@ def _prepare_hierarchy_command( level: Optional[int], yes: bool, force_refresh: bool, + include_labels: bool = False, + include_tags: bool = False, ) -> Optional[_HierarchyCommandContext]: """Shared setup for tree-like commands (tree, diagram). - Handles confirmation prompting, resource argument parsing, hierarchy loading, - nodes_to_process construction (including synthetic folders), and - projects_by_parent building. - Returns None if the user declines the confirmation prompt. Raises typer.Exit(code=1) for invalid or not-found resources. """ - # Inject entrypoint when no explicit resource is given ep = ctx.obj.get("entrypoint") effective_resource = resource or ep @@ -453,131 +586,46 @@ def _prepare_hierarchy_command( if not yes and effective_resource is None: cache_info = get_cache_info() if not cache_info.fresh and (level is None or level >= 4): - confirm = typer.confirm( + if not typer.confirm( "This will load all folders and projects in the hierarchy, " "which may take a long time. Continue?" - ) - if not confirm: + ): return None # Parse resource argument - target_org_name = None - target_resource_name = None - target_path = None - + parsed = _ParsedResource(None, None, None) if effective_resource: - logger.debug(f"{command_name} command: processing resource argument {effective_resource}") - if effective_resource.startswith(_RESOURCE_PREFIX_PROJECTS): - rprint( - f"[red]Error:[/red] '{command_name}' command does not support starting " - "from a project (projects are leaf nodes)." - ) - raise typer.Exit(code=1) - - try: - target_path = _clean_resolve_path( - Hierarchy.resolve_ancestry(effective_resource) - ) - if target_path.startswith("//"): - path_parts = target_path[2:].split("/") - if path_parts: - target_org_name = unquote(path_parts[0]) - - if effective_resource.startswith(_RESOURCE_PREFIX_FOLDERS) or effective_resource.startswith( - _RESOURCE_PREFIX_ORGS - ): - target_resource_name = effective_resource - except Exception: - if effective_resource.startswith("//"): - target_path = effective_resource - else: - raise + parsed = _parse_resource_arg(effective_resource, command_name) # Skip org filtering when using entrypoint without explicit resource - if not resource and ep and target_org_name: + if not resource and ep and parsed.target_org_name: filter_orgs = None else: - filter_orgs = [target_org_name] if target_org_name else None - logger.debug( - f"{command_name}: loading hierarchy for resource='{resource}', filter_orgs={filter_orgs}" - ) + filter_orgs = [parsed.target_org_name] if parsed.target_org_name else None hierarchy = _load_hierarchy( ctx, - scope_resource=target_resource_name, + scope_resource=parsed.target_resource_name, recursive=True, force_refresh=force_refresh, filter_orgs=filter_orgs, - ) - - logger.debug( - f"{command_name}: hierarchy loaded with {len(hierarchy.organizations)} orgs, " - f"{len(hierarchy.projects)} projects, {len(hierarchy.folders)} folders" + include_labels=include_labels, + include_tags=include_tags, ) # Build nodes_to_process - nodes_to_process: List[Union[OrganizationNode, Folder]] = [] - if target_resource_name: - logger.debug( - f"{command_name} command: looking for target resource {target_resource_name}" + if parsed.target_resource_name: + nodes_to_process = _find_target_nodes( + hierarchy, + parsed.target_resource_name, + parsed.target_path, ) - if target_resource_name.startswith(_RESOURCE_PREFIX_ORGS): - for o in hierarchy.organizations: - if o.organization.name == target_resource_name: - logger.debug(f"{command_name} command: found target organization") - nodes_to_process = [o] - break - elif target_resource_name.startswith(_RESOURCE_PREFIX_FOLDERS): - for o in hierarchy.organizations: - if target_resource_name in o.folders: - logger.debug( - f"{command_name} command: found target folder in organization" - ) - nodes_to_process = [o.folders[target_resource_name]] - break - - # If not found in loaded hierarchy, create a synthetic folder from resolved path - if not nodes_to_process and target_path and hierarchy.organizations: - logger.debug( - f"{command_name} command: creating synthetic folder node for scope" - ) - path_parts = ( - target_path[2:].split("/") - if target_path.startswith("//") - else [] - ) - display_name = ( - path_parts[-1] - if path_parts - else target_resource_name.split("/")[-1] - ) - - org_node = hierarchy.organizations[0] - synthetic_folder = Folder( - name=target_resource_name, - display_name=display_name, - ancestors=[target_resource_name, org_node.organization.name], - organization=org_node, - parent=org_node.organization.name, - ) - org_node.folders[target_resource_name] = synthetic_folder - nodes_to_process = [synthetic_folder] - logger.debug( - f"{command_name} command: created synthetic folder {synthetic_folder.name}" - ) - if not nodes_to_process: - logger.warning( - f"{command_name} command: target resource '{target_resource_name}' not found" - ) rprint( - f"[red]Error:[/red] Target resource '{target_resource_name}' not found." + f"[red]Error:[/red] Target resource '{parsed.target_resource_name}' not found." ) raise typer.Exit(code=1) else: - logger.debug( - f"{command_name} command: processing all {len(hierarchy.organizations)} organizations" - ) nodes_to_process = list(hierarchy.organizations) # Build projects_by_parent mapping @@ -589,9 +637,152 @@ def _prepare_hierarchy_command( hierarchy=hierarchy, nodes_to_process=nodes_to_process, projects_by_parent=projects_by_parent, - target_resource_name=target_resource_name, - target_path=target_path, + target_resource_name=parsed.target_resource_name, + target_path=parsed.target_path, + ) + + +def _handle_empty_hierarchy(dumper) -> None: + """Display message when no organizations or projects are found.""" + if dumper: + print(dumper([])) + return + + import google.auth + + account_msg = "" + try: + credentials, _ = google.auth.default() + if hasattr(credentials, "account") and credentials.account: + if credentials.account.endswith("@gmail.com"): + account_msg = f" (Account: {credentials.account})" + except Exception: + pass + + rprint( + f"[yellow]No organizations or projects found accessible to your account{account_msg}.[/yellow]" + ) + if not account_msg: + rprint( + "[dim]Hint: You might not have access to any organizations. " + "Projects without organizations are shown with //_ prefix.[/dim]" + ) + + +def _resolve_target_path_prefix(target_resource_name: Optional[str]) -> str: + """Resolve the display path prefix for a target resource.""" + if not target_resource_name: + return "" + try: + return _clean_resolve_path(Hierarchy.resolve_ancestry(target_resource_name)) + except Exception as e: + logger.warning(f"Could not resolve target path: {e}") + return "" + + +def _get_resource_name(obj: Union[OrganizationNode, Folder, Project]) -> str: + """Get the resource name string for any resource type.""" + if isinstance(obj, OrganizationNode): + return obj.organization.name + return obj.name + + +def _apply_ls_filters( + items: list, + resource_type: Optional[str], + label_filters: Optional[List[str]], + tag_filters: Optional[List[str]], +) -> list: + """Apply type, label, and tag filters to items list.""" + if resource_type: + items = [(p, obj) for p, obj in items if _matches_type(obj, resource_type)] + if label_filters: + items = [ + (p, obj) + for p, obj in items + if _matches_metadata(obj, label_filters, "labels") + ] + if tag_filters: + items = [ + (p, obj) for p, obj in items if _matches_metadata(obj, tag_filters, "tags") + ] + return items + + +def _apply_depth_limit( + items: list, + level: Optional[int], + recursive: bool, + target_path_prefix: str, +) -> list: + """Apply depth limit for recursive listing.""" + if level is None or not recursive: + return items + base_segments = len(target_path_prefix.split("/")) - 3 if target_path_prefix else 0 + return [ + (p, obj) for p, obj in items if len(p.split("/")) - 3 - base_segments <= level + ] + + +def _build_ls_items( + hierarchy: Hierarchy, + target_resource_name: Optional[str], + target_path_prefix: str, + recursive: bool, + resource_type: Optional[str], + label_filters: Optional[List[str]], + tag_filters: Optional[List[str]], + level: Optional[int], +) -> list: + """Build, filter, and sort the items list for ls output.""" + current_folders, current_projects = filter_direct_children( + hierarchy, target_resource_name + ) + items = build_items_list( + hierarchy, + current_folders, + current_projects, + target_path_prefix, + target_resource_name, + recursive, + ) + items = sort_resources(items) + items = _apply_ls_filters(items, resource_type, label_filters, tag_filters) + items = _apply_depth_limit(items, level, recursive, target_path_prefix) + return items + + +def _display_ls_items( + items: list, + long: bool, + show_labels: bool, + show_tags: bool, +) -> None: + """Display ls items in either long or short format.""" + if not long: + for path, _ in items: + print(path) + return + + table = Table( + show_header=True, header_style="bold magenta", box=None, padding=(0, 1) ) + table.add_column("Path", overflow="fold") + table.add_column("Resource Name", overflow="fold") + if show_labels: + table.add_column("Labels", overflow="fold") + if show_tags: + table.add_column("Tags", overflow="fold") + + for path, obj in items: + row = [path, _get_resource_name(obj)] + if show_labels: + row.append(_format_metadata(obj, "labels")) + if show_tags: + row.append(_format_metadata(obj, "tags")) + table.add_row(*row) + + console.print(table) @app.command() @@ -610,7 +801,10 @@ def ls( False, "--recursive", "-R", help="List resources recursively" ), resource_type: Optional[str] = typer.Option( - None, "--type", "-t", help="Filter by resource type: folder, project, organization" + None, + "--type", + "-t", + help="Filter by resource type: folder, project, organization", ), level: Optional[int] = typer.Option( None, "--level", "-L", help="Max depth for recursive listing (requires -R)" @@ -621,6 +815,18 @@ def ls( "-F", help=_REFRESH_HELP, ), + show_labels: bool = typer.Option( + False, "--show-labels", help="Display GCP labels on resources" + ), + show_tags: bool = typer.Option( + False, "--show-tags", help="Display GCP resource tags" + ), + label_filters: Optional[List[str]] = typer.Option( + None, "--label", help="Filter by label (key=value). Repeatable, ANDed together" + ), + tag_filters: Optional[List[str]] = typer.Option( + None, "--tag", help="Filter by tag (key=value). Repeatable, ANDed together" + ), ) -> None: """ List folders and projects. Defaults to the root organization. @@ -628,145 +834,97 @@ def ls( try: _validate_type_filter(resource_type) + # Implicitly enable label/tag fetching when filters are specified + include_labels = show_labels or bool(label_filters) + include_tags = show_tags or bool(tag_filters) + ep = ctx.obj.get("entrypoint") scope = _resolve_scope(resource, ep) target_resource_name = scope.target_resource_name - logger.debug( - f"ls: loading hierarchy for resource='{resource}', filter_orgs={scope.filter_orgs}, recursive={recursive}" - ) - hierarchy = _load_hierarchy( ctx, scope_resource=target_resource_name, recursive=recursive, force_refresh=force_refresh, filter_orgs=scope.filter_orgs, - ) - - logger.debug( - f"ls: hierarchy loaded with {len(hierarchy.organizations)} orgs, {len(hierarchy.projects)} projects, {len(hierarchy.folders)} folders" + include_labels=include_labels, + include_tags=include_tags, ) dumper = _get_dumper(ctx.obj.get("output_format", "text")) if not hierarchy.organizations and not hierarchy.projects: - if dumper: - print(dumper([])) - return - - # Check if it looks like a personal account - import google.auth - - account_msg = "" - try: - credentials, _ = google.auth.default() - if hasattr(credentials, "account") and credentials.account: - if credentials.account.endswith("@gmail.com"): - account_msg = f" (Account: {credentials.account})" - except Exception: - pass - - rprint( - f"[yellow]No organizations or projects found accessible to your account{account_msg}.[/yellow]" - ) - if not account_msg: - rprint( - "[dim]Hint: You might not have access to any organizations. Projects without organizations are shown with //_ prefix.[/dim]" - ) + _handle_empty_hierarchy(dumper) return - # If a specific resource was targeted, we list its children - # Get the target path prefix for proper path display - target_path_prefix = "" - if target_resource_name: - logger.debug( - f"ls command: targeting specific resource {target_resource_name}" - ) - if target_resource_name.startswith(_RESOURCE_PREFIX_PROJECTS): - # projects don't have children in this context - return - - # Get the full path for the target resource to use as prefix - try: - target_path_prefix = _clean_resolve_path( - Hierarchy.resolve_ancestry(target_resource_name) - ) - logger.debug(f"ls: target path prefix: {target_path_prefix}") - except Exception as e: - logger.warning(f"Could not resolve target path: {e}") + if target_resource_name and target_resource_name.startswith( + _RESOURCE_PREFIX_PROJECTS + ): + return - # Filter to get direct children - current_folders, current_projects = filter_direct_children( - hierarchy, target_resource_name - ) + target_path_prefix = _resolve_target_path_prefix(target_resource_name) - # Build items list for display - items = build_items_list( + items = _build_ls_items( hierarchy, - current_folders, - current_projects, - target_path_prefix, target_resource_name, + target_path_prefix, recursive, + resource_type, + label_filters, + tag_filters, + level, ) - # Sort items by path - items = sort_resources(items) - - # Apply type filter - if resource_type: - items = [(p, obj) for p, obj in items if _matches_type(obj, resource_type)] - - # Apply depth limit for recursive listing - if level is not None and recursive: - # Paths look like "//example.com/f1/f2". Splitting on "/" gives - # ["", "", "example.com", "f1", "f2"], so subtract 2 for the - # leading empty segments and 1 for the org root to get the - # folder/project depth (e.g. "//o/f1" → 3 parts after split → depth 0). - if target_path_prefix: - base_segments = len(target_path_prefix.split("/")) - 2 - 1 - else: - base_segments = 0 - items = [ - (p, obj) for p, obj in items - if len(p.split("/")) - 2 - 1 - base_segments <= level - ] - - logger.debug(f"ls: found {len(items)} items to display") - if dumper: print(dumper(serialize_ls(items))) return - if long: - table = Table( - show_header=True, header_style="bold magenta", box=None, padding=(0, 1) - ) - table.add_column("Path", overflow="fold") - table.add_column("Resource Name", overflow="fold") - - for path, obj in items: - resource_name = "" - - if isinstance(obj, OrganizationNode): - resource_name = obj.organization.name - elif isinstance(obj, Folder): - resource_name = obj.name - elif isinstance(obj, Project): - resource_name = obj.name - - table.add_row(path, resource_name) - - console.print(table) - else: - for path, _ in items: - print(path) + _display_ls_items(items, long, show_labels, show_tags) except Exception as e: handle_error(e) +def _get_orgless_projects(hctx: _HierarchyCommandContext) -> Optional[List[Project]]: + """Get organizationless projects if not targeting a specific resource.""" + if hctx.target_resource_name: + return None + orgless = [p for p in hctx.hierarchy.projects if not p.organization] + return orgless or None + + +def _tree_root_label( + node: Union[OrganizationNode, Folder], + is_targeted: Optional[str], +) -> tuple[str, str]: + """Build root tree label and node_id for a tree node.""" + if isinstance(node, OrganizationNode): + safe_path = f"//{path_escape(node.organization.display_name)}" + color = "cyan" if is_targeted else "magenta" + return f"[bold {color}]{safe_path}[/bold {color}]", node.organization.name + return f"[bold cyan]{node.path}[/bold cyan]", node.name + + +def _add_orgless_tree_nodes( + root_tree, + orgless_projects: Optional[List[Project]], + resource_type: Optional[str], + level: Optional[int], + show_ids: bool, + show_labels: bool, + show_tags: bool, +) -> None: + """Add organizationless projects section to tree.""" + if not orgless_projects or resource_type == "folder": + return + orgless_node = root_tree.add("[bold yellow](organizationless)[/bold yellow]") + if level is not None and level < 1: + return + for p in sorted(orgless_projects, key=lambda x: x.display_name): + orgless_node.add(format_tree_label(p, show_ids, show_labels, show_tags)) + + @app.command() def tree( ctx: typer.Context, @@ -795,6 +953,18 @@ def tree( "-F", help=_REFRESH_HELP, ), + show_labels: bool = typer.Option( + False, "--show-labels", help="Display GCP labels on resources" + ), + show_tags: bool = typer.Option( + False, "--show-tags", help="Display GCP resource tags" + ), + label_filters: Optional[List[str]] = typer.Option( + None, "--label", help="Filter by label (key=value). Repeatable, ANDed together" + ), + tag_filters: Optional[List[str]] = typer.Option( + None, "--tag", help="Filter by tag (key=value). Repeatable, ANDed together" + ), ) -> None: """ Display the resource hierarchy in a tree format. @@ -804,20 +974,26 @@ def tree( try: _validate_type_filter(resource_type) + include_labels = show_labels or bool(label_filters) + include_tags = show_tags or bool(tag_filters) + hctx = _prepare_hierarchy_command( - ctx, "tree", resource, level, yes, force_refresh + ctx, + "tree", + resource, + level, + yes, + force_refresh, + include_labels=include_labels, + include_tags=include_tags, ) if hctx is None: return + orgless_projects = _get_orgless_projects(hctx) + dumper = _get_dumper(ctx.obj.get("output_format", "text")) if dumper: - orgless_projects = None - if not hctx.target_resource_name: - orgless = [p for p in hctx.hierarchy.projects if not p.organization] - if orgless: - orgless_projects = orgless - data = serialize_tree( hctx.nodes_to_process, hctx.projects_by_parent, @@ -834,19 +1010,8 @@ def tree( else "[bold cyan]GCP Hierarchy[/bold cyan]" ) - # Add root nodes to tree for node in hctx.nodes_to_process: - if isinstance(node, OrganizationNode): - node_id = node.organization.name - if hctx.target_resource_name: - safe_path = f"//{path_escape(node.organization.display_name)}" - label = f"[bold cyan]{safe_path}[/bold cyan]" - else: - label = f"[bold magenta]//{path_escape(node.organization.display_name)}[/bold magenta]" - else: - node_id = node.name - label = f"[bold cyan]{node.path}[/bold cyan]" - + label, node_id = _tree_root_label(node, hctx.target_resource_name) if show_ids: label += f" [dim]({node_id})[/dim]" @@ -860,23 +1025,19 @@ def tree( 0, show_ids, type_filter=resource_type, + show_labels=show_labels, + show_tags=show_tags, ) - # Organizationless projects - if not hctx.target_resource_name and resource_type != "folder" and any( - not p.organization for p in hctx.hierarchy.projects - ): - orgless_node = root_tree.add( - "[bold yellow](organizationless)[/bold yellow]" - ) - if level is None or level >= 1: - orgless_projs = [ - p for p in hctx.hierarchy.projects if not p.organization - ] - orgless_projs.sort(key=lambda x: x.display_name) - for p in orgless_projs: - label = format_tree_label(p, show_ids) - orgless_node.add(label) + _add_orgless_tree_nodes( + root_tree, + orgless_projects, + resource_type, + level, + show_ids, + show_labels, + show_tags, + ) console.print(root_tree) @@ -995,7 +1156,7 @@ def stats( raise typer.Exit(code=1) elif any( effective_resource.startswith(p) - for p in ["organizations/", "folders/"] + for p in [_RESOURCE_PREFIX_ORGS, _RESOURCE_PREFIX_FOLDERS] ): target_resource_name = effective_resource else: @@ -1022,7 +1183,9 @@ def stats( table.add_column("Resource", style="bold") table.add_column("Count", justify="right") - if not target_resource_name or target_resource_name.startswith("organizations/"): + if not target_resource_name or target_resource_name.startswith( + _RESOURCE_PREFIX_ORGS + ): table.add_row("Organizations", str(len(hierarchy.organizations))) table.add_row("Folders", str(folder_count)) table.add_row("Projects", str(project_count)) @@ -1165,17 +1328,28 @@ def _search_hierarchy( @app.command() def find( ctx: typer.Context, - pattern: Annotated[str, typer.Argument(help="Name pattern to search (glob syntax: *, ?)")], + pattern: Annotated[ + str, typer.Argument(help="Name pattern to search (glob syntax: *, ?)") + ], resource: Annotated[ Optional[str], typer.Argument(help="Resource to scope search within (e.g. folders/123)"), ] = None, resource_type: Optional[str] = typer.Option( - None, "--type", "-t", help="Filter by resource type: folder, project, organization" + None, + "--type", + "-t", + help="Filter by resource type: folder, project, organization", ), force_refresh: bool = typer.Option( False, "--force-refresh", "-F", help=_REFRESH_HELP ), + label_filters: Optional[List[str]] = typer.Option( + None, "--label", help="Filter by label (key=value). Repeatable, ANDed together" + ), + tag_filters: Optional[List[str]] = typer.Option( + None, "--tag", help="Filter by tag (key=value). Repeatable, ANDed together" + ), ) -> None: """ Search for resources by display name pattern (glob syntax). @@ -1183,6 +1357,9 @@ def find( try: _validate_type_filter(resource_type) + include_labels = bool(label_filters) + include_tags = bool(tag_filters) + ep = ctx.obj.get("entrypoint") scope = _resolve_scope(resource, ep) @@ -1192,10 +1369,26 @@ def find( recursive=True, force_refresh=force_refresh, filter_orgs=scope.filter_orgs, + include_labels=include_labels, + include_tags=include_tags, ) items = sort_resources(_search_hierarchy(hierarchy, pattern, resource_type)) + # Apply label/tag filters + if label_filters: + items = [ + (p, obj) + for p, obj in items + if _matches_metadata(obj, label_filters, "labels") + ] + if tag_filters: + items = [ + (p, obj) + for p, obj in items + if _matches_metadata(obj, tag_filters, "tags") + ] + dumper = _get_dumper(ctx.obj.get("output_format", "text")) if dumper: print(dumper(serialize_ls(items))) @@ -1237,7 +1430,9 @@ def ancestors( print(dumper(serialize_ancestors(chain))) return - table = Table(show_header=True, header_style="bold magenta", box=None, padding=(0, 1)) + table = Table( + show_header=True, header_style="bold magenta", box=None, padding=(0, 1) + ) table.add_column("Resource Name", overflow="fold") table.add_column("Display Name", overflow="fold") table.add_column("Type", overflow="fold") diff --git a/src/gcpath/config.py b/src/gcpath/config.py index b3970d5..1869009 100644 --- a/src/gcpath/config.py +++ b/src/gcpath/config.py @@ -36,7 +36,9 @@ def write_config(config: Dict[str, Any]) -> None: def _validate_entrypoint(resource: str) -> None: """Validate that the entrypoint is a valid resource name.""" - if not resource.startswith("organizations/") and not resource.startswith("folders/"): + if not resource.startswith("organizations/") and not resource.startswith( + "folders/" + ): raise ValueError( f"Entrypoint must start with 'organizations/' or 'folders/', got '{resource}'" ) diff --git a/src/gcpath/core.py b/src/gcpath/core.py index 00c10de..9d8c6e3 100644 --- a/src/gcpath/core.py +++ b/src/gcpath/core.py @@ -12,6 +12,8 @@ load_projects_asset, load_scope_folder, load_organizationless_projects, + load_tags_asset, + apply_tags, ) # We use a logger but don't configure it here. @@ -90,6 +92,8 @@ class Folder: parent: str = ( "" # Parent resource name (e.g., 'organizations/123' or 'folders/456') ) + labels: Dict[str, str] = field(default_factory=dict) + tags: Dict[str, str] = field(default_factory=dict) def is_path_match(self, path_parts: List[str]) -> bool: # path matching logic @@ -133,6 +137,8 @@ class Project: parent: str organization: Optional["OrganizationNode"] folder: Optional[Folder] + labels: Dict[str, str] = field(default_factory=dict) + tags: Dict[str, str] = field(default_factory=dict) @property def path(self) -> str: @@ -169,6 +175,8 @@ def load( via_resource_manager: bool = True, scope_resource: Optional[str] = None, recursive: bool = False, + include_labels: bool = False, + include_tags: bool = False, ) -> "Hierarchy": """Load the GCP resource hierarchy from GCP APIs. @@ -180,6 +188,8 @@ def load( If None, defaults to loading from organization level. recursive: If True, load all descendants. If False, only load direct children. Only applies when via_resource_manager=False (Asset API mode). + include_labels: If True, fetch GCP labels for folders and projects. + include_tags: If True, fetch GCP resource tag bindings. """ logger.debug("Loading hierarchy from GCP API.") org_client = resourcemanager_v3.OrganizationsClient() @@ -187,7 +197,12 @@ def load( # Load Organizations org_nodes = cls._load_organizations( - org_client, display_names, via_resource_manager, scope_resource, recursive + org_client, + display_names, + via_resource_manager, + scope_resource, + recursive, + include_labels=include_labels, ) # Fallback: if no orgs found and scope is a folder, try folder-scoped loading @@ -200,15 +215,51 @@ def load( f"No organizations found, falling back to folder-scoped loading for {scope_resource}" ) return cls._load_from_folder_scope( - scope_resource, via_resource_manager, recursive + scope_resource, + via_resource_manager, + recursive, + include_labels=include_labels, + include_tags=include_tags, ) # Load Projects all_projects = cls._load_all_projects( - project_client, org_nodes, via_resource_manager, scope_resource, recursive + project_client, + org_nodes, + via_resource_manager, + scope_resource, + recursive, + include_labels=include_labels, ) - return cls(organizations=org_nodes, projects=all_projects) + hierarchy = cls(organizations=org_nodes, projects=all_projects) + + # Load tags if requested (separate Asset API query) + if include_tags and not via_resource_manager: + tag_scopes = ( + [scope_resource] + if scope_resource + else [org_node.organization.name for org_node in org_nodes] + ) + for tag_scope in tag_scopes: + tags_map = load_tags_asset(tag_scope) + apply_tags(hierarchy, tags_map) + + return hierarchy + + @classmethod + def _search_organizations(cls, org_client) -> List[resourcemanager_v3.Organization]: + """Search for accessible organizations.""" + try: + page_result = org_client.search_organizations( + request=resourcemanager_v3.SearchOrganizationsRequest() + ) + return list(page_result) + except exceptions.PermissionDenied: + logger.warning("Permission denied searching organizations") + except Exception as e: + logger.error(f"Error searching organizations: {e}") + return [] @classmethod def _load_organizations( @@ -218,44 +269,35 @@ def _load_organizations( via_resource_manager: bool, scope_resource: Optional[str], recursive: bool, + include_labels: bool = False, ) -> List[OrganizationNode]: """Load organizations and their folders.""" + display_names_set = set(display_names) if display_names else None org_nodes = [] - try: - logger.debug( - f"Calling search_organizations() with display_names filter: {display_names}" - ) - page_result = org_client.search_organizations( - request=resourcemanager_v3.SearchOrganizationsRequest() - ) - logger.debug("search_organizations() returned successfully") - - for org in page_result: - if display_names and org.display_name not in display_names: - logger.debug( - f"Skipping organization '{org.display_name}' (not in filter)" - ) - continue + for org in cls._search_organizations(org_client): + if display_names_set and org.display_name not in display_names_set: logger.debug( - f"Processing organization: {org.display_name} (name: {org.name})" + f"Skipping organization '{org.display_name}' (not in filter)" ) - node = OrganizationNode(organization=org) - org_nodes.append(node) + continue - # Load folders for this organization - cls._load_folders_for_org( - node, via_resource_manager, scope_resource, recursive - ) - - logger.debug( - f"Loaded {len(node.folders)} folders for org {node.organization.display_name}" - ) + logger.debug( + f"Processing organization: {org.display_name} (name: {org.name})" + ) + node = OrganizationNode(organization=org) + org_nodes.append(node) - except exceptions.PermissionDenied: - logger.warning("Permission denied searching organizations") - except Exception as e: - logger.error(f"Error searching organizations: {e}") + cls._load_folders_for_org( + node, + via_resource_manager, + scope_resource, + recursive, + include_labels=include_labels, + ) + logger.debug( + f"Loaded {len(node.folders)} folders for org {node.organization.display_name}" + ) return org_nodes @@ -265,6 +307,8 @@ def _load_from_folder_scope( scope_resource: str, via_resource_manager: bool, recursive: bool, + include_labels: bool = False, + include_tags: bool = False, ) -> "Hierarchy": """Load hierarchy rooted at a folder when org access is unavailable. @@ -339,6 +383,7 @@ def _load_from_folder_scope( recursive=recursive, query_parent=scope_resource, root_ancestor=scope_resource, + include_labels=include_labels, ) # Load projects @@ -346,16 +391,27 @@ def _load_from_folder_scope( if via_resource_manager: # RM path: search_projects returns all accessible, filter by parent project_client = resourcemanager_v3.ProjectsClient() - all_projects = cls._load_projects_rm(project_client, [node]) + all_projects = cls._load_projects_rm( + project_client, + [node], + include_labels=include_labels, + ) else: all_projects = cls._load_projects_asset_all_orgs( [node], scope_resource=scope_resource, recursive=recursive, query_parent=scope_resource, + include_labels=include_labels, ) - return cls(organizations=[node], projects=all_projects) + hierarchy = cls(organizations=[node], projects=all_projects) + + if include_tags and not via_resource_manager: + tags_map = load_tags_asset(scope_resource) + apply_tags(hierarchy, tags_map) + + return hierarchy @classmethod def _load_folders_for_org( @@ -366,6 +422,7 @@ def _load_folders_for_org( recursive: bool, query_parent: Optional[str] = None, root_ancestor: Optional[str] = None, + include_labels: bool = False, ): """Load folders for a single organization.""" if via_resource_manager: @@ -390,6 +447,7 @@ def _load_folders_for_org( ancestors_filter=folder_ancestors_filter, query_parent=query_parent, root_ancestor=root_ancestor, + include_labels=include_labels, ) # Load scope folder separately if needed (for recursive scoped loads) @@ -404,16 +462,24 @@ def _load_all_projects( via_resource_manager: bool, scope_resource: Optional[str], recursive: bool, + include_labels: bool = False, ) -> List[Project]: """Load all projects across all organizations.""" all_projects = [] if via_resource_manager: - all_projects = cls._load_projects_rm(project_client, org_nodes) + all_projects = cls._load_projects_rm( + project_client, + org_nodes, + include_labels=include_labels, + ) else: # Asset API mode all_projects = cls._load_projects_asset_all_orgs( - org_nodes, scope_resource, recursive + org_nodes, + scope_resource, + recursive, + include_labels=include_labels, ) # Load organizationless projects @@ -425,7 +491,10 @@ def _load_all_projects( @classmethod def _load_projects_rm( - cls, project_client, org_nodes: List[OrganizationNode] + cls, + project_client, + org_nodes: List[OrganizationNode], + include_labels: bool = False, ) -> List[Project]: """Load projects using Resource Manager API.""" all_projects = [] @@ -452,6 +521,10 @@ def _load_projects_rm( parent_org = o break + labels = ( + dict(p_proto.labels) if include_labels and p_proto.labels else {} + ) + proj = Project( name=p_proto.name, project_id=p_proto.project_id, @@ -459,6 +532,7 @@ def _load_projects_rm( parent=p_proto.parent, organization=parent_org, folder=parent_folder, + labels=labels, ) all_projects.append(proj) @@ -476,6 +550,7 @@ def _load_projects_asset_all_orgs( scope_resource: Optional[str], recursive: bool, query_parent: Optional[str] = None, + include_labels: bool = False, ) -> List[Project]: """Load projects for all organizations using Asset API.""" all_projects = [] @@ -498,6 +573,7 @@ def _load_projects_asset_all_orgs( parent_filter=project_parent_filter, ancestors_filter=project_ancestors_filter, query_parent=query_parent, + include_labels=include_labels, ) all_projects.extend(org_projects) @@ -577,132 +653,103 @@ def get_path_by_resource_name(self, resource_name: str) -> str: raise ResourceNotFoundError(f"Unsupported resource name '{resource_name}'") @staticmethod - def resolve_ancestry(resource_name: str) -> str: - """ - Resolves the path for a given resource name by traversing up the hierarchy. - This avoids loading the entire hierarchy. - """ - # Lazily initialize clients only when needed to avoid triggering - # credential lookup for unused client types - _folders_client = None - _projects_client = None - _org_client = None - - def folders_client(): - nonlocal _folders_client - if _folders_client is None: - _folders_client = resourcemanager_v3.FoldersClient() - return _folders_client - - def projects_client(): - nonlocal _projects_client - if _projects_client is None: - _projects_client = resourcemanager_v3.ProjectsClient() - return _projects_client - - def org_client(): - nonlocal _org_client - if _org_client is None: - _org_client = resourcemanager_v3.OrganizationsClient() - return _org_client - - segments: List[str] = [] - current_resource_name = resource_name + def _get_resource_info( + name: str, + folders_client, + projects_client, + org_client, + ) -> tuple[str, Optional[str]]: + """Fetch display name and parent for a resource. - # First, allow organizations/ID directly - if current_resource_name.startswith(_PREFIX_ORGS): + Returns (display_name, parent_name_or_None). + Raises ResourceNotFoundError on permission issues. + """ + if name.startswith(_PREFIX_PROJECTS): try: - org = org_client().get_organization(name=current_resource_name) - logger.debug( - f"GCP API: get_organization({current_resource_name}) returned" - ) - return "//" + path_escape(org.display_name) + p = projects_client.get_project(name=name) + logger.debug(f"GCP API: get_project({name}) returned") + return (p.display_name or p.project_id), p.parent except exceptions.PermissionDenied: - logger.warning( - f"Permission denied accessing organization {current_resource_name}" + raise ResourceNotFoundError( + f"Permission denied accessing project {name}" ) - return f"//_unknown_org_({current_resource_name})" # Fallback - except Exception as e: - logger.error( - f"Error fetching organization {current_resource_name}: {e}" + + if name.startswith(_PREFIX_FOLDERS): + try: + f = folders_client.get_folder(name=name) + logger.debug(f"GCP API: get_folder({name}) returned") + return f.display_name, f.parent + except exceptions.PermissionDenied: + raise ResourceNotFoundError( + f"Permission denied accessing folder {name}" ) - raise - # Helper to fetch display name and parent - def get_resource_info(name: str): - if name.startswith(_PREFIX_PROJECTS): - try: - p = projects_client().get_project(name=name) - logger.debug(f"GCP API: get_project({name}) returned") - # Project display_name is optional, fallback to projectId - d_name = p.display_name or p.project_id - return d_name, p.parent - except exceptions.PermissionDenied: - # If we can't see the project, we can't resolve its path - raise ResourceNotFoundError( - f"Permission denied accessing project {name}" - ) + if name.startswith(_PREFIX_ORGS): + try: + o = org_client.get_organization(name=name) + logger.debug(f"GCP API: get_organization({name}) returned") + return o.display_name, None + except exceptions.PermissionDenied: + return f"_unknown_org_({name})", None - elif name.startswith(_PREFIX_FOLDERS): - try: - f = folders_client().get_folder(name=name) - logger.debug(f"GCP API: get_folder({name}) returned") - return f.display_name, f.parent - except exceptions.PermissionDenied: - raise ResourceNotFoundError( - f"Permission denied accessing folder {name}" - ) + raise ResourceNotFoundError(f"Unknown resource type: {name}") - elif name.startswith(_PREFIX_ORGS): - try: - o = org_client().get_organization(name=name) - logger.debug(f"GCP API: get_organization({name}) returned") - return o.display_name, None - except exceptions.PermissionDenied: - # This might happen at the top of the chain - return f"_unknown_org_({name})", None + @staticmethod + def _resolve_org_directly(resource_name: str) -> Optional[str]: + """Resolve an organization resource name to its path. Returns None for non-org resources.""" + if not resource_name.startswith(_PREFIX_ORGS): + return None + try: + org_client = resourcemanager_v3.OrganizationsClient() + org = org_client.get_organization(name=resource_name) + logger.debug(f"GCP API: get_organization({resource_name}) returned") + return "//" + path_escape(org.display_name) + except exceptions.PermissionDenied: + logger.warning(f"Permission denied accessing organization {resource_name}") + return f"//_unknown_org_({resource_name})" + + @staticmethod + def resolve_ancestry(resource_name: str) -> str: + """ + Resolves the path for a given resource name by traversing up the hierarchy. + This avoids loading the entire hierarchy. + """ + # Handle organization directly + org_path = Hierarchy._resolve_org_directly(resource_name) + if org_path is not None: + return org_path + + # Lazy client initialization + folders_client = resourcemanager_v3.FoldersClient() + projects_client = resourcemanager_v3.ProjectsClient() + org_client = resourcemanager_v3.OrganizationsClient() - raise ResourceNotFoundError(f"Unknown resource type: {name}") + segments: List[str] = [] + current = resource_name - # Traverse up - while current_resource_name: + while current: try: - display_name, parent = get_resource_info(current_resource_name) - # We build the path relevant to the resource itself, - # but we need to handle the root (Org). - # If it's an organization, it becomes the prefix //Org - if current_resource_name.startswith(_PREFIX_ORGS): - # We reached the top - path_prefix = "//" + path_escape(display_name) - # Prepend prefix to existing segments - full_path = path_prefix + ( - ("/" + "/".join(segments)) if segments else "" - ) - return full_path + display_name, parent = Hierarchy._get_resource_info( + current, + folders_client, + projects_client, + org_client, + ) + except exceptions.NotFound: + raise ResourceNotFoundError(f"Resource not found: {current}") - # If it's a project or folder, add to segments - # Note: We are traversing UP, so we are collecting child -> parent - # We insert at the beginning of the list later or just reverse. - # Actually simpler: append to a list and reverse at the end? - # But we build segments usually as [Folder, Subfolder, Resource] - # Here we get Resource, then Parent (Folder), etc. - segments.insert(0, path_escape(display_name)) + if current.startswith(_PREFIX_ORGS): + prefix = "//" + path_escape(display_name) + return prefix + ("/" + "/".join(segments) if segments else "") - # Check for organizationless project - if not parent: - # Missing parent usually implies Organizationless (or error) - # If we are at a project and it has no parent or parent is not org/folder - # (though get_resource_info handles standard types) - return "//_/" + "/".join(segments) + segments.insert(0, path_escape(display_name)) - current_resource_name = parent + if not parent: + return "//_/" + "/".join(segments) - except exceptions.NotFound: - raise ResourceNotFoundError( - f"Resource not found: {current_resource_name}" - ) + current = parent - return "//?/" + "/".join(segments) # Should not be reached ideally + return "//?/" + "/".join(segments) @staticmethod def _fetch_chain_link( diff --git a/src/gcpath/formatters.py b/src/gcpath/formatters.py index 09f2a29..0791fa9 100644 --- a/src/gcpath/formatters.py +++ b/src/gcpath/formatters.py @@ -9,6 +9,34 @@ from gcpath.core import OrganizationNode, Folder, Project, path_escape, Hierarchy +def _children_of_target( + hierarchy: Hierarchy, target: str +) -> Tuple[List[Folder], List[Project]]: + """Return folders and projects whose parent matches the target resource.""" + folders = [f for f in hierarchy.folders if f.parent == target] + projects = [p for p in hierarchy.projects if p.parent == target] + return folders, projects + + +def _org_level_children( + hierarchy: Hierarchy, +) -> Tuple[List[Folder], List[Project]]: + """Return org-level folders and projects (including organizationless).""" + org_names = {org.organization.name for org in hierarchy.organizations} + folders = [ + f + for org in hierarchy.organizations + for f in org.folders.values() + if f.parent == org.organization.name + ] + projects = [ + p + for p in hierarchy.projects + if (p.organization and p.parent in org_names) or not p.organization + ] + return folders, projects + + def filter_direct_children( hierarchy: Hierarchy, target_resource_name: Optional[str] = None ) -> Tuple[List[Folder], List[Project]]: @@ -21,30 +49,9 @@ def filter_direct_children( Returns: Tuple of (folders, projects) that are direct children """ - current_folders = [] - current_projects = [] - if target_resource_name: - # Find direct children of the target resource - for f in hierarchy.folders: - if f.parent == target_resource_name: - current_folders.append(f) - for p in hierarchy.projects: - if p.parent == target_resource_name: - current_projects.append(p) - else: - # No target: show org-level resources - for org in hierarchy.organizations: - for f in org.folders.values(): - if f.parent == org.organization.name: - current_folders.append(f) - for p in hierarchy.projects: - if p.organization and p.parent == p.organization.organization.name: - current_projects.append(p) - # Add organizationless projects - current_projects.extend([p for p in hierarchy.projects if not p.organization]) - - return current_folders, current_projects + return _children_of_target(hierarchy, target_resource_name) + return _org_level_children(hierarchy) def get_display_path( @@ -82,6 +89,52 @@ def get_display_path( return "" +def _collect_recursive_items( + hierarchy: Hierarchy, + target_path_prefix: str, + target_resource_name: Optional[str], +) -> List[Tuple[str, Union[OrganizationNode, Folder, Project]]]: + """Collect all items for recursive listing.""" + + def _path(item, is_direct=False): + return get_display_path( + item, target_path_prefix, target_resource_name, is_direct, True + ) + + items: List[Tuple[str, Union[OrganizationNode, Folder, Project]]] = [] + if target_resource_name: + items.extend((_path(f), f) for f in hierarchy.folders) + items.extend((_path(p), p) for p in hierarchy.projects) + else: + for org in hierarchy.organizations: + items.append((_path(org), org)) + items.extend((_path(f), f) for f in org.folders.values()) + items.extend((_path(p), p) for p in hierarchy.projects) + return items + + +def _collect_nonrecursive_items( + hierarchy: Hierarchy, + current_folders: List[Folder], + current_projects: List[Project], + target_path_prefix: str, + target_resource_name: Optional[str], +) -> List[Tuple[str, Union[OrganizationNode, Folder, Project]]]: + """Collect items for non-recursive (direct children) listing.""" + + def _path(item, is_direct=False): + return get_display_path( + item, target_path_prefix, target_resource_name, is_direct, False + ) + + items: List[Tuple[str, Union[OrganizationNode, Folder, Project]]] = [] + if not target_resource_name: + items.extend((_path(org), org) for org in hierarchy.organizations) + items.extend((_path(f, True), f) for f in current_folders) + items.extend((_path(p, True), p) for p in current_projects) + return items + + def build_items_list( hierarchy: Hierarchy, current_folders: List[Folder], @@ -103,124 +156,17 @@ def build_items_list( Returns: List of (path, resource) tuples """ - items: List[Tuple[str, Union[OrganizationNode, Folder, Project]]] = [] - if recursive: - # Recursive listing - list everything under the target - if target_resource_name: - # All loaded folders and projects are descendants - for f in hierarchy.folders: - items.append( - ( - get_display_path( - f, - target_path_prefix, - target_resource_name, - is_direct_child=False, - recursive=True, - ), - f, - ) - ) - for p in hierarchy.projects: - items.append( - ( - get_display_path( - p, - target_path_prefix, - target_resource_name, - is_direct_child=False, - recursive=True, - ), - p, - ) - ) - else: - # Full recursive list - for org in hierarchy.organizations: - items.append( - ( - get_display_path( - org, - target_path_prefix, - target_resource_name, - is_direct_child=False, - recursive=True, - ), - org, - ) - ) - for f in org.folders.values(): - items.append( - ( - get_display_path( - f, - target_path_prefix, - target_resource_name, - is_direct_child=False, - recursive=True, - ), - f, - ) - ) - for p in hierarchy.projects: - items.append( - ( - get_display_path( - p, - target_path_prefix, - target_resource_name, - is_direct_child=False, - recursive=True, - ), - p, - ) - ) - else: - # Non-recursive - only direct children - if not target_resource_name: - for org in hierarchy.organizations: - items.append( - ( - get_display_path( - org, - target_path_prefix, - target_resource_name, - is_direct_child=False, - recursive=False, - ), - org, - ) - ) - - for f in current_folders: - items.append( - ( - get_display_path( - f, - target_path_prefix, - target_resource_name, - is_direct_child=True, - recursive=False, - ), - f, - ) - ) - for p in current_projects: - items.append( - ( - get_display_path( - p, - target_path_prefix, - target_resource_name, - is_direct_child=True, - recursive=False, - ), - p, - ) - ) - - return items + return _collect_recursive_items( + hierarchy, target_path_prefix, target_resource_name + ) + return _collect_nonrecursive_items( + hierarchy, + current_folders, + current_projects, + target_path_prefix, + target_resource_name, + ) def sort_resources(items: List[Tuple[str, Any]]) -> List[Tuple[str, Any]]: @@ -235,12 +181,37 @@ def sort_resources(items: List[Tuple[str, Any]]) -> List[Tuple[str, Any]]: return sorted(items, key=lambda x: x[0]) -def format_tree_label(item: Union[Folder, Project], show_ids: bool = False) -> str: +def _format_metadata_suffix( + item: Union[Folder, Project], + show_labels: bool = False, + show_tags: bool = False, +) -> str: + """Build a Rich markup suffix for labels and tags.""" + parts = [] + if show_labels and hasattr(item, "labels") and item.labels: + label_str = ", ".join(f"{k}={v}" for k, v in sorted(item.labels.items())) + parts.append(f"[dim]labels: {label_str}[/dim]") + if show_tags and hasattr(item, "tags") and item.tags: + tag_str = ", ".join(f"{k}={v}" for k, v in sorted(item.tags.items())) + parts.append(f"[dim]tags: {tag_str}[/dim]") + if not parts: + return "" + return " [" + " | ".join(parts) + "]" + + +def format_tree_label( + item: Union[Folder, Project], + show_ids: bool = False, + show_labels: bool = False, + show_tags: bool = False, +) -> str: """Format label for tree display. Args: item: The resource to format show_ids: Whether to include resource IDs + show_labels: Whether to include GCP labels + show_tags: Whether to include GCP tags Returns: Formatted label string with rich markup @@ -249,11 +220,13 @@ def format_tree_label(item: Union[Folder, Project], show_ids: bool = False) -> s label = f"[bold blue]{item.display_name}[/bold blue]" if show_ids: label += f" [dim]({item.name})[/dim]" + label += _format_metadata_suffix(item, show_labels, show_tags) return label elif isinstance(item, Project): label = f"[green]{item.display_name}[/green]" if show_ids: label += f" [dim]({item.name})[/dim]" + label += _format_metadata_suffix(item, show_labels, show_tags) return label return "" @@ -286,6 +259,8 @@ def build_tree_view( current_depth: int = 0, show_ids: bool = False, type_filter: Optional[str] = None, + show_labels: bool = False, + show_tags: bool = False, ): """Recursively build tree view of resources. @@ -299,18 +274,31 @@ def build_tree_view( show_ids: Whether to show resource IDs type_filter: If set, only show resources of this type ("folder" or "project"). Folders are always recursed into to find matching descendants. + show_labels: Whether to display GCP labels + show_tags: Whether to display GCP tags """ if level is not None and current_depth >= level: return parent_name = _get_node_parent_name(current_node) - recurse_args = (hierarchy, projects_by_parent, level, current_depth + 1, show_ids, type_filter) + recurse_args = ( + hierarchy, + projects_by_parent, + level, + current_depth + 1, + show_ids, + type_filter, + show_labels, + show_tags, + ) for f in _get_child_folders(current_node, parent_name): if type_filter == "project": build_tree_view(tree_node, f, *recurse_args) else: - sub_node = tree_node.add(format_tree_label(f, show_ids)) + sub_node = tree_node.add( + format_tree_label(f, show_ids, show_labels, show_tags) + ) build_tree_view(sub_node, f, *recurse_args) if type_filter != "folder": @@ -318,7 +306,7 @@ def build_tree_view( projects_by_parent.get(parent_name, []), key=lambda x: x.display_name ) for p in children_projects: - tree_node.add(format_tree_label(p, show_ids)) + tree_node.add(format_tree_label(p, show_ids, show_labels, show_tags)) # --- Diagram generation (Mermaid / D2) --- @@ -415,7 +403,7 @@ def _format_mermaid(labels: Dict[str, str], edges: List[Tuple[str, str]]) -> str """Format collected nodes and edges as a Mermaid flowchart.""" lines = ["graph TD"] for node_id, label in labels.items(): - safe_label = label.replace('"', '#quot;') + safe_label = label.replace('"', "#quot;") lines.append(f' {node_id}["{safe_label}"]') for parent_id, child_id in edges: lines.append(f" {parent_id} --> {child_id}") diff --git a/src/gcpath/loaders.py b/src/gcpath/loaders.py index ba6931f..281b4c0 100644 --- a/src/gcpath/loaders.py +++ b/src/gcpath/loaders.py @@ -22,19 +22,23 @@ def build_folder_sql_query( - parent_filter: Optional[str] = None, ancestors_filter: Optional[str] = None + parent_filter: Optional[str] = None, + ancestors_filter: Optional[str] = None, + include_labels: bool = False, ) -> str: """Build SQL query for loading folders from Asset API. Args: parent_filter: Only load folders directly under this parent ancestors_filter: Only load folders with this resource in their ancestors + include_labels: If True, include resource.data.labels in SELECT Returns: SQL query string for Asset API """ + labels_col = ", resource.data.labels" if include_labels else "" base_query = ( - "SELECT name, resource.data.displayName, resource.data.parent, ancestors " + f"SELECT name, resource.data.displayName, resource.data.parent, ancestors{labels_col} " "FROM `cloudresourcemanager_googleapis_com_Folder` " "WHERE resource.data.lifecycleState = 'ACTIVE'" ) @@ -57,20 +61,24 @@ def build_folder_sql_query( def build_project_sql_query( - parent_filter: Optional[str] = None, ancestors_filter: Optional[str] = None + parent_filter: Optional[str] = None, + ancestors_filter: Optional[str] = None, + include_labels: bool = False, ) -> str: """Build SQL query for loading projects from Asset API. Args: parent_filter: Only load projects directly under this parent ancestors_filter: Only load projects with this resource in their ancestors + include_labels: If True, include resource.data.labels in SELECT Returns: SQL query string for Asset API """ + labels_col = ", resource.data.labels" if include_labels else "" base_query = ( "SELECT name, resource.data.projectNumber, resource.data.projectId, " - "resource.data.parent, ancestors " + f"resource.data.parent, ancestors{labels_col} " "FROM `cloudresourcemanager_googleapis_com_Project` " "WHERE resource.data.lifecycleState = 'ACTIVE'" ) @@ -277,6 +285,7 @@ def load_folders_asset( ancestors_filter: Optional[str] = None, query_parent: Optional[str] = None, root_ancestor: Optional[str] = None, + include_labels: bool = False, ): """Load folders from Asset API. @@ -288,6 +297,7 @@ def load_folders_asset( Defaults to node.organization.name. root_ancestor: Override for the root ancestor in ancestor chains. Defaults to node.organization.name. + include_labels: If True, fetch labels for each folder. Note: parent_filter and ancestors_filter are mutually exclusive. If neither is provided, loads ALL folders under the org. @@ -297,7 +307,9 @@ def load_folders_asset( root = root_ancestor or node.organization.name # Build SQL query - statement = build_folder_sql_query(parent_filter, ancestors_filter) + statement = build_folder_sql_query( + parent_filter, ancestors_filter, include_labels=include_labels + ) logger.debug(f"Folders query: {statement}") query_request = asset_v1.QueryAssetsRequest( @@ -319,7 +331,7 @@ def load_folders_asset( for row in response.query_result.rows: try: # Parse the folder row using parsers module - folder_data = parse_folder_row(row) + folder_data = parse_folder_row(row, has_labels=include_labels) # Get the parent - either from the API response or from parent_filter if folder_data["parent"]: @@ -344,6 +356,7 @@ def load_folders_asset( ancestors=ancestors, organization=node, parent=folder_parent, + labels=folder_data.get("labels", {}), ) node.folders[f.name] = f @@ -355,11 +368,37 @@ def load_folders_asset( fix_folder_ancestors(node, root_ancestor=root) +def _resolve_project_parent( + project_data: dict, + parent_filter: Optional[str], + fallback_parent: str, +) -> str: + """Determine the parent resource name for a project from Asset API data. + + Priority: explicit parent > ancestors > parent_filter > org fallback. + """ + if project_data["parent"]: + return project_data["parent"] + + ancestors = project_data["ancestors"] + if not ancestors: + return parent_filter or fallback_parent + + # If first ancestor is the project itself, use second ancestor as parent + if ancestors[0] == project_data["name"]: + if len(ancestors) > 1: + return ancestors[1] + return parent_filter or fallback_parent + + return ancestors[0] + + def load_projects_asset( node, parent_filter: Optional[str] = None, ancestors_filter: Optional[str] = None, query_parent: Optional[str] = None, + include_labels: bool = False, ): """Load projects from Asset API. @@ -369,6 +408,7 @@ def load_projects_asset( ancestors_filter: Only load projects with this resource in their ancestors query_parent: Override for QueryAssetsRequest.parent (e.g., a folder). Defaults to node.organization.name. + include_labels: If True, fetch labels for each project. Returns: List of Project objects @@ -382,9 +422,9 @@ def load_projects_asset( api_parent = query_parent or node.organization.name projects: List[Project] = [] - # Build SQL query - statement = build_project_sql_query(parent_filter, ancestors_filter) - + statement = build_project_sql_query( + parent_filter, ancestors_filter, include_labels=include_labels + ) logger.debug(f"Projects query: {statement}") query_request = asset_v1.QueryAssetsRequest( parent=api_parent, @@ -393,50 +433,25 @@ def load_projects_asset( try: response = asset_client.query_assets(request=query_request) - logger.debug( - f"GCP API: query_assets(projects) returned for {api_parent}" - ) + logger.debug(f"GCP API: query_assets(projects) returned for {api_parent}") - # Iterate directly over the response if not response.query_result or not response.query_result.rows: logger.debug("No project rows returned from Asset API") - logger.debug(f"Query result: {response.query_result}") return projects - # Import Project class locally to avoid circular dependency - from gcpath.core import Project - for row in response.query_result.rows: try: - # Parse the project row using parsers module - project_data = parse_project_row(row) - - # Determine parent - prefer from API, then ancestors, then fallback - if project_data["parent"]: - parent_res = project_data["parent"] - elif not project_data["ancestors"]: - # No ancestors and no parent from API - use parent_filter if set, otherwise org - if parent_filter: - parent_res = parent_filter - else: - parent_res = node.organization.name - elif ( - project_data["ancestors"] - and project_data["ancestors"][0] == project_data["name"] - ): - if len(project_data["ancestors"]) > 1: - parent_res = project_data["ancestors"][1] - elif parent_filter: - parent_res = parent_filter - else: - parent_res = node.organization.name - else: - # ancestors is guaranteed non-empty here (checked in elif above) - parent_res = project_data["ancestors"][0] - - parent_folder = None - if parent_res.startswith(_FOLDER_PREFIX): - parent_folder = node.folders.get(parent_res) + project_data = parse_project_row(row, has_labels=include_labels) + parent_res = _resolve_project_parent( + project_data, + parent_filter, + node.organization.name, + ) + parent_folder = ( + node.folders.get(parent_res) + if parent_res.startswith(_FOLDER_PREFIX) + else None + ) proj = Project( name=project_data["name"], @@ -445,6 +460,7 @@ def load_projects_asset( parent=parent_res, organization=node, folder=parent_folder, + labels=project_data.get("labels", {}), ) logger.debug( f"Added project {project_data['project_id']} to hierarchy " @@ -520,3 +536,95 @@ def load_organizationless_projects(existing_project_names: set): logger.error(f"Error searching organizationless projects: {e}") return projects + + +def _parse_tag_binding_row(row) -> Optional[tuple]: + """Parse a single TagBinding row from Asset API. + + Returns (parent_resource_name, tag_key, tag_value) or None on error. + """ + from gcpath.parsers import extract_value, clean_asset_name + + try: + row_dict = dict(row) + f_list = row_dict.get("f", []) + if len(f_list) < 4: + return None + + parent_val = extract_value(f_list[1]) + tag_key = extract_value(f_list[2]) + tag_value = extract_value(f_list[3]) + + if not parent_val or not tag_key or not tag_value: + return None + + parent_name = clean_asset_name(str(parent_val)) + return (parent_name, str(tag_key), str(tag_value)) + except (TypeError, AttributeError, KeyError) as e: + logger.warning(f"Error parsing tag binding row: {e}") + return None + + +def load_tags_asset(parent: str) -> Dict[str, Dict[str, str]]: + """Load tag bindings from Asset API. + + Queries the TagBinding asset type to get all tag bindings for resources + under the given parent scope. + + Args: + parent: The scope to query (e.g., 'organizations/123' or 'folders/456') + + Returns: + Dict mapping resource names to their tags: {resource_name: {tag_key: tag_value}} + """ + asset_client = asset_v1.AssetServiceClient() + tags_map: Dict[str, Dict[str, str]] = {} + + statement = ( + "SELECT name, resource.data.parent, resource.data.tagKey, resource.data.tagValue " + "FROM `cloudresourcemanager_googleapis_com_TagBinding`" + ) + + logger.debug(f"Tags query: {statement}") + query_request = asset_v1.QueryAssetsRequest( + parent=parent, + statement=statement, + ) + + try: + response = asset_client.query_assets(request=query_request) + logger.debug(f"GCP API: query_assets(tags) returned for {parent}") + + if not response.query_result or not response.query_result.rows: + logger.debug("No tag binding rows returned from Asset API") + return tags_map + + for row in response.query_result.rows: + parsed = _parse_tag_binding_row(row) + if parsed: + parent_name, tag_key, tag_value = parsed + if parent_name not in tags_map: + tags_map[parent_name] = {} + tags_map[parent_name][tag_key] = tag_value + + except exceptions.PermissionDenied: + logger.warning("Permission denied querying tag bindings") + except Exception as e: + logger.error(f"Error querying tag bindings via Asset API: {e}") + + return tags_map + + +def apply_tags(hierarchy, tags_map: Dict[str, Dict[str, str]]) -> None: + """Apply tag bindings to resources in a hierarchy. + + Args: + hierarchy: Hierarchy object to update + tags_map: Dict mapping resource names to their tags + """ + for folder in hierarchy.folders: + if folder.name in tags_map: + folder.tags = tags_map[folder.name] + for project in hierarchy.projects: + if project.name in tags_map: + project.tags = tags_map[project.name] diff --git a/src/gcpath/parsers.py b/src/gcpath/parsers.py index 45ab528..a8a64f9 100644 --- a/src/gcpath/parsers.py +++ b/src/gcpath/parsers.py @@ -60,6 +60,34 @@ def extract_list_values(ancestors_wrapper: Any) -> List[str]: ] +def extract_labels(labels_col: Any) -> Dict[str, str]: + """Extract labels from Asset API response column. + + The Asset API returns labels as a MapComposite/dict-like structure + where keys and values are strings. + + Args: + labels_col: Labels column from Asset API row + + Returns: + Dict of label key-value pairs, empty dict if no labels + """ + raw = extract_value(labels_col) + if not raw: + return {} + + try: + # MapComposite behaves like a dict + if hasattr(raw, "items"): + return {str(k): str(v) for k, v in raw.items()} + if isinstance(raw, dict): + return {str(k): str(v) for k, v in raw.items()} + except (TypeError, AttributeError) as e: + logger.warning(f"Error extracting labels: {e}") + + return {} + + def parse_parent_struct(parent_col: Any) -> Optional[str]: """Parse parent STRUCT from Asset API response. @@ -130,21 +158,24 @@ def validate_row_structure(row: Any, expected_columns: int, row_type: str) -> bo return False -def parse_project_row(row: Any) -> Dict[str, Any]: +def parse_project_row(row: Any, has_labels: bool = False) -> Dict[str, Any]: """Parse a project row from Asset API. Expected columns: name, projectNumber, projectId, parent (STRUCT), ancestors + With labels: name, projectNumber, projectId, parent (STRUCT), ancestors, labels Args: row: Project row from Asset API response + has_labels: If True, expect labels column at index 5 Returns: - Dict with keys: name, project_id, display_name, parent, ancestors + Dict with keys: name, project_id, display_name, parent, ancestors[, labels] Raises: ValueError: If row structure is invalid """ - if not validate_row_structure(row, 5, "project"): + expected_cols = 6 if has_labels else 5 + if not validate_row_structure(row, expected_cols, "project"): raise ValueError("Invalid project row structure") row_dict = dict(row) @@ -159,12 +190,7 @@ def parse_project_row(row: Any) -> Dict[str, Any]: name = clean_asset_name(str(name_val)) raw_ancestors = extract_list_values(ancestors_wrapper) - logger.debug( - f"Parsed project from Asset API: project_id={project_id}, name={name}, " - f"parent_from_api={parent_from_api}, ancestors={raw_ancestors}" - ) - - return { + result: Dict[str, Any] = { "name": name, "project_id": str(project_id), "display_name": str(project_id), # Use projectId as displayName @@ -172,22 +198,35 @@ def parse_project_row(row: Any) -> Dict[str, Any]: "ancestors": raw_ancestors, } + if has_labels: + result["labels"] = extract_labels(f_list[5]) -def parse_folder_row(row: Any) -> Dict[str, Any]: + logger.debug( + f"Parsed project from Asset API: project_id={project_id}, name={name}, " + f"parent_from_api={parent_from_api}, ancestors={raw_ancestors}" + ) + + return result + + +def parse_folder_row(row: Any, has_labels: bool = False) -> Dict[str, Any]: """Parse a folder row from Asset API. Expected columns: name, displayName, parent, ancestors + With labels: name, displayName, parent, ancestors, labels Args: row: Folder row from Asset API response + has_labels: If True, expect labels column at index 4 Returns: - Dict with keys: name, display_name, parent, ancestors + Dict with keys: name, display_name, parent, ancestors[, labels] Raises: ValueError: If row structure is invalid or missing required fields """ - if not validate_row_structure(row, 4, "folder"): + expected_cols = 5 if has_labels else 4 + if not validate_row_structure(row, expected_cols, "folder"): raise ValueError("Invalid folder row structure") row_dict = dict(row) @@ -206,18 +245,56 @@ def parse_folder_row(row: Any) -> Dict[str, Any]: parent = str(parent_val) if parent_val else None raw_ancestors = extract_list_values(ancestors_wrapper) - logger.debug( - f"Parsed folder from Asset API: name={name}, display_name={display_name}, " - f"parent={parent}, ancestors={raw_ancestors}" - ) - - return { + result: Dict[str, Any] = { "name": name, "display_name": display_name, "parent": parent, "ancestors": raw_ancestors, } + if has_labels: + result["labels"] = extract_labels(f_list[4]) + + logger.debug( + f"Parsed folder from Asset API: name={name}, display_name={display_name}, " + f"parent={parent}, ancestors={raw_ancestors}" + ) + + return result + + +def _ensure_org_tail(ancestors: List[str], org_name: str) -> None: + """Append org_name to ancestors if the chain doesn't already end with an organization.""" + if not ancestors or not ancestors[-1].startswith("organizations/"): + ancestors.append(org_name) + + +def _build_chain_from_parent( + name: str, + parent: str, + loaded_folders: Dict[str, Any], + org_name: str, +) -> List[str]: + """Build ancestor chain starting from name, traversing through parent.""" + ancestors = [name] + if not (parent and parent.startswith("folders/")): + _ensure_org_tail(ancestors, org_name) + return ancestors + + ancestors.append(parent) + if parent in loaded_folders: + parent_folder = loaded_folders[parent] + ancestors_set = set(ancestors) + for anc in parent_folder.ancestors: + if anc not in ancestors_set: + ancestors.append(anc) + ancestors_set.add(anc) + else: + ancestors.append(org_name) + + _ensure_org_tail(ancestors, org_name) + return ancestors + def build_folder_ancestors( name: str, @@ -241,37 +318,13 @@ def build_folder_ancestors( Complete ancestor chain from folder to organization """ # Ensure ancestors start with self - if not raw_ancestors or raw_ancestors[0] != name: - ancestors = [name] + raw_ancestors - else: + if raw_ancestors and raw_ancestors[0] == name: ancestors = raw_ancestors + else: + ancestors = [name] + raw_ancestors - # If we have empty or single-item ancestors, build the full chain - if not ancestors or (len(ancestors) == 1 and ancestors[0] == name): - ancestors = [name] - current_parent = parent - - # Traverse up the parent chain - while current_parent and current_parent.startswith("folders/"): - ancestors.append(current_parent) - - # Check if this parent is already loaded - if current_parent in loaded_folders: - parent_folder = loaded_folders[current_parent] - # Add remaining ancestors from the parent (excluding duplicates) - for anc in parent_folder.ancestors: - if anc != current_parent and anc not in ancestors: - ancestors.append(anc) - break - else: - # Parent not loaded yet, add org and break - ancestors.append(org_name) - break - - # If we didn't find any folders in the chain, add org - if len(ancestors) == 1 or ( - len(ancestors) > 1 and not ancestors[-1].startswith("organizations/") - ): - ancestors.append(org_name) + # If ancestors are incomplete (just [self] or empty), build the full chain + if len(ancestors) <= 1: + return _build_chain_from_parent(name, parent, loaded_folders, org_name) return ancestors diff --git a/src/gcpath/serializers.py b/src/gcpath/serializers.py index 279f322..800025e 100644 --- a/src/gcpath/serializers.py +++ b/src/gcpath/serializers.py @@ -37,10 +37,18 @@ def serialize_resource( elif isinstance(item, Folder): d["resource_name"] = item.name d["display_name"] = item.display_name + if item.labels: + d["labels"] = item.labels + if item.tags: + d["tags"] = item.tags elif isinstance(item, Project): d["resource_name"] = item.name d["display_name"] = item.display_name d["project_id"] = item.project_id + if item.labels: + d["labels"] = item.labels + if item.tags: + d["tags"] = item.tags return d @@ -69,7 +77,9 @@ def _node_to_dict(node: Union[OrganizationNode, Folder]) -> Tuple[str, Dict[str, } -def _get_child_folders(node: Union[OrganizationNode, Folder], parent_name: str) -> List[Folder]: +def _get_child_folders( + node: Union[OrganizationNode, Folder], parent_name: str +) -> List[Folder]: """Get sorted direct child folders of a node.""" org_ref = node if isinstance(node, OrganizationNode) else node.organization if not org_ref: @@ -101,14 +111,18 @@ def serialize_tree_node( children: List[Dict[str, Any]] = [] for f in _get_child_folders(node, parent_name): - sub = serialize_tree_node(f, projects_by_parent, level, current_depth + 1, type_filter) + sub = serialize_tree_node( + f, projects_by_parent, level, current_depth + 1, type_filter + ) if type_filter == "project": children.extend(sub.get("children", [])) else: children.append(sub) if type_filter != "folder": - for p in sorted(projects_by_parent.get(parent_name, []), key=lambda x: x.display_name): + for p in sorted( + projects_by_parent.get(parent_name, []), key=lambda x: x.display_name + ): children.append(serialize_resource(p.path, p)) d["children"] = children @@ -126,7 +140,9 @@ def serialize_tree( result = [] for node in nodes_to_process: result.append( - serialize_tree_node(node, projects_by_parent, level, type_filter=type_filter) + serialize_tree_node( + node, projects_by_parent, level, type_filter=type_filter + ) ) if orgless_projects and type_filter != "folder": @@ -153,8 +169,7 @@ def serialize_ancestors( chain: List of (resource_name, display_name, type) tuples from root to leaf. """ return [ - {"resource_name": name, "display_name": dn, "type": t} - for name, dn, t in chain + {"resource_name": name, "display_name": dn, "type": t} for name, dn, t in chain ] @@ -185,4 +200,6 @@ def dump_json(data: Any) -> str: def dump_yaml(data: Any) -> str: """Serialize data to YAML string.""" - return yaml.dump(data, default_flow_style=False, allow_unicode=True, sort_keys=False) + return yaml.dump( + data, default_flow_style=False, allow_unicode=True, sort_keys=False + ) diff --git a/tests/fixtures/sample_cache_v2.json b/tests/fixtures/sample_cache_v2.json new file mode 100644 index 0000000..3f548d5 --- /dev/null +++ b/tests/fixtures/sample_cache_v2.json @@ -0,0 +1,43 @@ +{ + "version": 2, + "timestamp": "2023-10-27T10:00:00Z", + "organizations": [ + { + "organization": { + "name": "organizations/123", + "display_name": "example.com" + }, + "folders": { + "folders/456": { + "name": "folders/456", + "display_name": "Engineering", + "ancestors": ["folders/456", "organizations/123"], + "parent": "organizations/123", + "labels": {"env": "prod", "team": "eng"}, + "tags": {} + } + }, + "projects": [ + { + "name": "projects/789", + "project_id": "test-project", + "display_name": "Test Project", + "parent": "folders/456", + "folder_name": "folders/456", + "labels": {"env": "staging"}, + "tags": {"org/environment": "staging"} + } + ] + } + ], + "organizationless_projects": [ + { + "name": "projects/000", + "project_id": "orphan-project", + "display_name": "Orphan Project", + "parent": "organizations/0", + "labels": {}, + "tags": {} + } + ] +} diff --git a/tests/test_cache.py b/tests/test_cache.py index de27b44..0d4f933 100644 --- a/tests/test_cache.py +++ b/tests/test_cache.py @@ -25,7 +25,8 @@ from gcpath.core import Hierarchy, OrganizationNode, Folder, Project FIXTURES_DIR = Path(__file__).parent / "fixtures" -SAMPLE_CACHE_FILE = FIXTURES_DIR / "sample_cache_v1.json" +SAMPLE_CACHE_FILE = FIXTURES_DIR / "sample_cache_v2.json" +SAMPLE_CACHE_V1_FILE = FIXTURES_DIR / "sample_cache_v1.json" @pytest.fixture @@ -125,6 +126,28 @@ def test_load_from_fixture(): assert len(hierarchy.organizations) == 1 assert len(hierarchy.projects) == 2 + # Verify labels/tags round-trip + folder = hierarchy.organizations[0].folders["folders/456"] + assert folder.labels == {"env": "prod", "team": "eng"} + assert folder.tags == {} + + project = next(p for p in hierarchy.projects if p.name == "projects/789") + assert project.labels == {"env": "staging"} + assert project.tags == {"org/environment": "staging"} + + orphan = next(p for p in hierarchy.projects if p.name == "projects/000") + assert orphan.labels == {} + assert orphan.tags == {} + + +def test_load_v1_fixture_rejected(): + """Test that v1 cache fixtures are rejected due to version mismatch.""" + with open(SAMPLE_CACHE_V1_FILE, "r") as f: + data = json.load(f) + + hierarchy = _dict_to_hierarchy(data) + assert hierarchy is None + @patch("gcpath.cache.CACHE_FILE") def test_read_cache_not_found(mock_cache_file): @@ -386,7 +409,9 @@ def test_read_cache_scope_mismatch(mock_json_load, mock_open, mock_cache_file): @patch("gcpath.cache.CACHE_FILE") @patch("builtins.open") @patch("json.load") -def test_read_cache_old_cache_no_scope_field(mock_json_load, mock_open, mock_cache_file): +def test_read_cache_old_cache_no_scope_field( + mock_json_load, mock_open, mock_cache_file +): """Old cache without scope field works for unscoped loads (scope=None).""" mock_cache_file.exists.return_value = True mock_json_load.return_value = { @@ -404,7 +429,9 @@ def test_read_cache_old_cache_no_scope_field(mock_json_load, mock_open, mock_cac @patch("gcpath.cache.CACHE_FILE") @patch("builtins.open") @patch("json.load") -def test_read_cache_old_cache_rejected_for_scoped(mock_json_load, mock_open, mock_cache_file): +def test_read_cache_old_cache_rejected_for_scoped( + mock_json_load, mock_open, mock_cache_file +): """Old cache without scope field is rejected for scoped loads.""" mock_cache_file.exists.return_value = True mock_json_load.return_value = { diff --git a/tests/test_cli.py b/tests/test_cli.py index c3df131..7e755d6 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -4,6 +4,7 @@ import yaml from typer.testing import CliRunner from unittest.mock import patch, MagicMock +from conftest import make_test_hierarchy from gcpath.cli import app from gcpath.core import OrganizationNode, Hierarchy, Project, GCPathError from gcpath.cache import CacheInfo @@ -76,7 +77,9 @@ def test_ls_long_format_shows_org_resource_names(mock_load, mock_hierarchy): @patch("gcpath.core.Hierarchy.load") @patch("gcpath.cli.Hierarchy.resolve_ancestry") -def test_ls_long_format_shows_folder_resource_names(mock_resolve, mock_load, mock_hierarchy): +def test_ls_long_format_shows_folder_resource_names( + mock_resolve, mock_load, mock_hierarchy +): """Verify folder resource names appear in long format""" mock_load.return_value = mock_hierarchy mock_resolve.return_value = "//example.com" @@ -87,7 +90,9 @@ def test_ls_long_format_shows_folder_resource_names(mock_resolve, mock_load, moc @patch("gcpath.core.Hierarchy.load") @patch("gcpath.cli.Hierarchy.resolve_ancestry") -def test_ls_long_format_shows_project_resource_names(mock_resolve, mock_load, mock_hierarchy): +def test_ls_long_format_shows_project_resource_names( + mock_resolve, mock_load, mock_hierarchy +): """Verify project resource names appear in long format""" mock_load.return_value = mock_hierarchy mock_resolve.return_value = "//example.com/f1" @@ -135,8 +140,14 @@ def test_tree_prompts_on_unlimited_load( ): """Test that tree prompts when loading full org tree without limit""" mock_cache_info.return_value = CacheInfo( - exists=False, fresh=False, age_seconds=None, size_bytes=None, - version=None, org_count=0, folder_count=0, project_count=0 + exists=False, + fresh=False, + age_seconds=None, + size_bytes=None, + version=None, + org_count=0, + folder_count=0, + project_count=0, ) mock_confirm.return_value = True mock_load.return_value = mock_hierarchy @@ -153,8 +164,14 @@ def test_tree_prompts_on_large_level( ): """Test that tree prompts when level >= 4""" mock_cache_info.return_value = CacheInfo( - exists=False, fresh=False, age_seconds=None, size_bytes=None, - version=None, org_count=0, folder_count=0, project_count=0 + exists=False, + fresh=False, + age_seconds=None, + size_bytes=None, + version=None, + org_count=0, + folder_count=0, + project_count=0, ) mock_confirm.return_value = True mock_load.return_value = mock_hierarchy @@ -813,8 +830,14 @@ def test_try_read_cache_applies_org_filter(mock_rprint, mock_get_info, mock_read mock_read_cache.return_value = mock_hierarchy mock_get_info.return_value = CacheInfo( - exists=True, fresh=True, age_seconds=60.0, - size_bytes=100, version=1, org_count=2, folder_count=0, project_count=0 + exists=True, + fresh=True, + age_seconds=60.0, + size_bytes=100, + version=1, + org_count=2, + folder_count=0, + project_count=0, ) # Filter to only org1 @@ -1056,6 +1079,7 @@ def test_tree_type_folder_json(mock_confirm, mock_load, mock_hierarchy): result = runner.invoke(app, ["--json", "tree", "--type", "folder"]) assert result.exit_code == 0 data = json.loads(result.stdout) + # Check that no project children appear def check_no_projects(nodes): for node in nodes: @@ -1063,6 +1087,7 @@ def check_no_projects(nodes): for child in node["children"]: assert child.get("type") != "project" check_no_projects([child] if "children" in child else []) + check_no_projects(data) @@ -1242,3 +1267,99 @@ def test_ancestors_invalid_resource(): result = runner.invoke(app, ["ancestors", "invalid/123"]) assert result.exit_code == 1 assert "Invalid resource format" in result.output + + +# --- Label/tag CLI tests --- + + +@patch("gcpath.core.Hierarchy.load") +def test_ls_show_labels(mock_load): + """ls --show-labels --long should show a Labels column.""" + hierarchy = make_test_hierarchy() + f1 = hierarchy.organizations[0].folders["folders/1"] + f1.labels = {"env": "prod"} + mock_load.return_value = hierarchy + + result = runner.invoke(app, ["ls", "--show-labels", "-l"]) + assert result.exit_code == 0 + assert "Labels" in result.stdout + assert "env=prod" in result.stdout + + +@patch("gcpath.core.Hierarchy.load") +def test_ls_label_filter(mock_load): + """ls --label env=prod should only show matching resources.""" + hierarchy = make_test_hierarchy() + f1 = hierarchy.organizations[0].folders["folders/1"] + f1.labels = {"env": "prod"} + f11 = hierarchy.organizations[0].folders["folders/11"] + f11.labels = {"env": "dev"} + mock_load.return_value = hierarchy + + result = runner.invoke(app, ["ls", "-R", "--label", "env=prod"]) + assert result.exit_code == 0 + assert "f1" in result.stdout + # f11 has env=dev, should be filtered out + assert "f11" not in result.stdout + + +@patch("gcpath.core.Hierarchy.load") +def test_ls_label_filter_key_only(mock_load): + """ls --label env should match any resource with the env label key.""" + hierarchy = make_test_hierarchy() + f1 = hierarchy.organizations[0].folders["folders/1"] + f1.labels = {"env": "prod"} + f11 = hierarchy.organizations[0].folders["folders/11"] + f11.labels = {} + mock_load.return_value = hierarchy + + result = runner.invoke(app, ["ls", "-R", "--label", "env"]) + assert result.exit_code == 0 + assert "f1" in result.stdout + assert "f11" not in result.stdout + + +@patch("gcpath.core.Hierarchy.load") +def test_ls_show_tags_long(mock_load): + """ls --show-tags --long should show a Tags column.""" + hierarchy = make_test_hierarchy() + p1 = next(p for p in hierarchy.projects if p.name == "projects/p1") + p1.tags = {"org/env": "production"} + mock_load.return_value = hierarchy + + result = runner.invoke(app, ["ls", "--show-tags", "-l", "-R"]) + assert result.exit_code == 0 + assert "Tags" in result.stdout + assert "org/env=production" in result.stdout + + +@patch("gcpath.core.Hierarchy.load") +def test_ls_json_with_labels(mock_load): + """ls --json with labels should include labels in JSON output.""" + hierarchy = make_test_hierarchy() + f1 = hierarchy.organizations[0].folders["folders/1"] + f1.labels = {"env": "prod"} + mock_load.return_value = hierarchy + + result = runner.invoke(app, ["--json", "ls", "-R", "--show-labels"]) + assert result.exit_code == 0 + data = json.loads(result.stdout) + labeled = [item for item in data if item.get("labels")] + assert len(labeled) >= 1 + assert labeled[0]["labels"] == {"env": "prod"} + + +@patch("gcpath.core.Hierarchy.load") +def test_find_with_label_filter(mock_load): + """find --label should filter search results by label.""" + hierarchy = make_test_hierarchy() + f1 = hierarchy.organizations[0].folders["folders/1"] + f1.labels = {"env": "prod"} + f11 = hierarchy.organizations[0].folders["folders/11"] + f11.labels = {} + mock_load.return_value = hierarchy + + result = runner.invoke(app, ["find", "f*", "--label", "env=prod"]) + assert result.exit_code == 0 + assert "f1" in result.stdout + assert "f11" not in result.stdout diff --git a/tests/test_config.py b/tests/test_config.py index 5500656..9c808b2 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -16,8 +16,9 @@ def tmp_config(tmp_path): """Patch CONFIG_FILE to a temporary location.""" config_file = tmp_path / "config.json" - with patch("gcpath.config.CONFIG_FILE", config_file), patch( - "gcpath.config.CACHE_DIR", tmp_path + with ( + patch("gcpath.config.CONFIG_FILE", config_file), + patch("gcpath.config.CACHE_DIR", tmp_path), ): yield config_file diff --git a/tests/test_core.py b/tests/test_core.py index b4a6690..150d4e2 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -340,9 +340,7 @@ def test_hierarchy_get_path_errors(): @patch("gcpath.loaders.resourcemanager_v3") @patch("gcpath.loaders.asset_v1") @patch("gcpath.core.resourcemanager_v3") -def test_hierarchy_load_asset_api( - mock_core_rm, mock_loaders_asset, mock_loaders_rm -): +def test_hierarchy_load_asset_api(mock_core_rm, mock_loaders_asset, mock_loaders_rm): # Use the same mocks for both core and loaders mock_asset = mock_loaders_asset mock_rm = mock_core_rm @@ -526,7 +524,9 @@ def test_find_orgless_project_not_found(): """Test _find_orgless_project helper raises error when not found.""" h = Hierarchy([], []) - with pytest.raises(ResourceNotFoundError, match="not found in organizationless scope"): + with pytest.raises( + ResourceNotFoundError, match="not found in organizationless scope" + ): h._find_orgless_project("//_/NonExistent") diff --git a/tests/test_formatters.py b/tests/test_formatters.py index 450356e..d774ea4 100644 --- a/tests/test_formatters.py +++ b/tests/test_formatters.py @@ -351,7 +351,9 @@ def test_get_node_label_project_with_ids(mock_project): # Test Mermaid diagram generation -def test_build_diagram_mermaid(mock_org_node, mock_folder, mock_project, mock_hierarchy): +def test_build_diagram_mermaid( + mock_org_node, mock_folder, mock_project, mock_hierarchy +): """Test Mermaid diagram generation.""" mock_org_node.folders = {"folders/456": mock_folder} projects_by_parent = {"folders/456": [mock_project]} @@ -395,7 +397,9 @@ def test_build_diagram_d2(mock_org_node, mock_folder, mock_project, mock_hierarc assert "TestProject" in result -def test_build_diagram_with_ids(mock_org_node, mock_folder, mock_project, mock_hierarchy): +def test_build_diagram_with_ids( + mock_org_node, mock_folder, mock_project, mock_hierarchy +): """Test diagram generation with resource IDs in labels.""" mock_org_node.folders = {"folders/456": mock_folder} projects_by_parent = {"folders/456": [mock_project]} @@ -472,7 +476,9 @@ def test_build_diagram_unsupported_format(mock_org_node, mock_hierarchy): ) -def test_build_diagram_folder_root(mock_org_node, mock_folder, mock_project, mock_hierarchy): +def test_build_diagram_folder_root( + mock_org_node, mock_folder, mock_project, mock_hierarchy +): """Test diagram generation with a folder as root node.""" mock_org_node.folders = {"folders/456": mock_folder} projects_by_parent = {"folders/456": [mock_project]} @@ -539,3 +545,90 @@ def test_build_tree_view_type_filter_project( # Projects from inside folders should be added directly to root assert len(root.children) == 1 # The project, bubbled up + + +# --- Label/tag display tests --- + + +def test_format_tree_label_with_labels(): + """Test tree label with labels displayed.""" + org = OrganizationNode( + organization=resourcemanager_v3.Organization( + name="organizations/1", display_name="org" + ) + ) + folder = Folder( + name="folders/1", + display_name="TestFolder", + ancestors=["folders/1", "organizations/1"], + organization=org, + parent="organizations/1", + labels={"env": "prod", "team": "infra"}, + ) + + label = format_tree_label(folder, show_labels=True) + assert "TestFolder" in label + assert "env=prod" in label + assert "team=infra" in label + + +def test_format_tree_label_with_tags(): + """Test tree label with tags displayed.""" + org = OrganizationNode( + organization=resourcemanager_v3.Organization( + name="organizations/1", display_name="org" + ) + ) + project = Project( + name="projects/p1", + project_id="p1", + display_name="MyProject", + parent="organizations/1", + organization=org, + folder=None, + tags={"org/env": "production"}, + ) + + label = format_tree_label(project, show_tags=True) + assert "MyProject" in label + assert "org/env=production" in label + + +def test_format_tree_label_no_labels_no_tags(): + """Test tree label without labels or tags when not requested.""" + org = OrganizationNode( + organization=resourcemanager_v3.Organization( + name="organizations/1", display_name="org" + ) + ) + folder = Folder( + name="folders/1", + display_name="TestFolder", + ancestors=["folders/1", "organizations/1"], + organization=org, + parent="organizations/1", + labels={"env": "prod"}, + ) + + label = format_tree_label(folder, show_labels=False) + assert "env=prod" not in label + + +def test_format_tree_label_empty_labels(): + """Test tree label when labels are empty (no suffix added).""" + org = OrganizationNode( + organization=resourcemanager_v3.Organization( + name="organizations/1", display_name="org" + ) + ) + folder = Folder( + name="folders/1", + display_name="TestFolder", + ancestors=["folders/1", "organizations/1"], + organization=org, + parent="organizations/1", + labels={}, + ) + + label = format_tree_label(folder, show_labels=True) + assert "labels:" not in label diff --git a/tests/test_loaders.py b/tests/test_loaders.py index a7d15db..80e9e9e 100644 --- a/tests/test_loaders.py +++ b/tests/test_loaders.py @@ -625,7 +625,9 @@ def test_build_single_ancestor_chain_missing_parent(mock_org_node): @patch("google.cloud.asset_v1.AssetServiceClient") -def test_load_folders_asset_no_parent_uses_parent_filter(mock_asset_client_cls, mock_org_node): +def test_load_folders_asset_no_parent_uses_parent_filter( + mock_asset_client_cls, mock_org_node +): """Test that folders with no parent from API fall back to parent_filter.""" mock_client = mock_asset_client_cls.return_value @@ -653,7 +655,9 @@ def test_load_folders_asset_no_parent_uses_parent_filter(mock_asset_client_cls, @patch("google.cloud.asset_v1.AssetServiceClient") -def test_load_folders_asset_no_parent_no_filter_uses_root(mock_asset_client_cls, mock_org_node): +def test_load_folders_asset_no_parent_no_filter_uses_root( + mock_asset_client_cls, mock_org_node +): """Test that folders with no parent and no parent_filter fall back to root (org name).""" mock_client = mock_asset_client_cls.return_value @@ -680,7 +684,9 @@ def test_load_folders_asset_no_parent_no_filter_uses_root(mock_asset_client_cls, @patch("google.cloud.asset_v1.AssetServiceClient") -def test_load_projects_asset_no_parent_no_ancestors_with_filter(mock_asset_client_cls, mock_org_node): +def test_load_projects_asset_no_parent_no_ancestors_with_filter( + mock_asset_client_cls, mock_org_node +): """Test project with no parent and no ancestors falls back to parent_filter.""" mock_client = mock_asset_client_cls.return_value @@ -707,7 +713,9 @@ def test_load_projects_asset_no_parent_no_ancestors_with_filter(mock_asset_clien @patch("google.cloud.asset_v1.AssetServiceClient") -def test_load_projects_asset_no_parent_no_ancestors_no_filter(mock_asset_client_cls, mock_org_node): +def test_load_projects_asset_no_parent_no_ancestors_no_filter( + mock_asset_client_cls, mock_org_node +): """Test project with no parent and no ancestors falls back to org name.""" mock_client = mock_asset_client_cls.return_value @@ -734,7 +742,9 @@ def test_load_projects_asset_no_parent_no_ancestors_no_filter(mock_asset_client_ @patch("google.cloud.asset_v1.AssetServiceClient") -def test_load_projects_asset_self_in_ancestors_with_more(mock_asset_client_cls, mock_org_node): +def test_load_projects_asset_self_in_ancestors_with_more( + mock_asset_client_cls, mock_org_node +): """Test project where ancestors[0] == name and len > 1 uses ancestors[1].""" mock_client = mock_asset_client_cls.return_value @@ -777,7 +787,9 @@ def test_load_projects_asset_self_in_ancestors_with_more(mock_asset_client_cls, @patch("google.cloud.asset_v1.AssetServiceClient") -def test_load_projects_asset_self_in_ancestors_only_with_filter(mock_asset_client_cls, mock_org_node): +def test_load_projects_asset_self_in_ancestors_only_with_filter( + mock_asset_client_cls, mock_org_node +): """Test project where ancestors has only self, falls back to parent_filter.""" mock_client = mock_asset_client_cls.return_value @@ -808,7 +820,9 @@ def test_load_projects_asset_self_in_ancestors_only_with_filter(mock_asset_clien @patch("google.cloud.asset_v1.AssetServiceClient") -def test_load_projects_asset_self_in_ancestors_only_no_filter(mock_asset_client_cls, mock_org_node): +def test_load_projects_asset_self_in_ancestors_only_no_filter( + mock_asset_client_cls, mock_org_node +): """Test project where ancestors has only self and no filter, falls back to org.""" mock_client = mock_asset_client_cls.return_value @@ -839,7 +853,9 @@ def test_load_projects_asset_self_in_ancestors_only_no_filter(mock_asset_client_ @patch("google.cloud.asset_v1.AssetServiceClient") -def test_load_projects_asset_ancestors_first_not_self(mock_asset_client_cls, mock_org_node): +def test_load_projects_asset_ancestors_first_not_self( + mock_asset_client_cls, mock_org_node +): """Test project where ancestors[0] != name uses ancestors[0] as parent.""" mock_client = mock_asset_client_cls.return_value @@ -880,7 +896,9 @@ def test_load_projects_asset_ancestors_first_not_self(mock_asset_client_cls, moc @patch("google.cloud.asset_v1.AssetServiceClient") -def test_load_projects_asset_ancestors_first_not_self_with_filter(mock_asset_client_cls, mock_org_node): +def test_load_projects_asset_ancestors_first_not_self_with_filter( + mock_asset_client_cls, mock_org_node +): """Test else branch with empty ancestors falls back to parent_filter.""" mock_client = mock_asset_client_cls.return_value @@ -925,3 +943,88 @@ def test_load_projects_asset_ancestors_first_not_self_with_filter(mock_asset_cli assert len(projects) == 1 # ancestors[0] is folders/f4, so that wins over parent_filter assert projects[0].parent == "folders/f4" + + +# --- Label support tests --- + + +def test_build_folder_sql_query_with_labels(): + """Test that include_labels adds resource.data.labels to SELECT.""" + query = build_folder_sql_query(include_labels=True) + assert "resource.data.labels" in query + assert "lifecycleState = 'ACTIVE'" in query + + +def test_build_folder_sql_query_without_labels(): + """Test that include_labels=False does not add labels to SELECT.""" + query = build_folder_sql_query(include_labels=False) + assert "resource.data.labels" not in query + + +def test_build_project_sql_query_with_labels(): + """Test that include_labels adds resource.data.labels to SELECT.""" + query = build_project_sql_query(include_labels=True) + assert "resource.data.labels" in query + assert "lifecycleState = 'ACTIVE'" in query + + +def test_build_project_sql_query_without_labels(): + """Test that include_labels=False does not add labels to SELECT.""" + query = build_project_sql_query(include_labels=False) + assert "resource.data.labels" not in query + + +@patch("google.cloud.asset_v1.AssetServiceClient") +def test_load_folders_asset_with_labels(mock_asset_client_cls, mock_org_node): + """Test that labels are parsed when include_labels is True.""" + mock_client = mock_asset_client_cls.return_value + + row = { + "f": [ + {"v": "//cloudresourcemanager.googleapis.com/folders/1"}, + {"v": "f1"}, + {"v": "organizations/123"}, + {"v": [{"v": "folders/1"}, {"v": "organizations/123"}]}, + {"v": {"env": "prod", "team": "eng"}}, + ] + } + + mock_query_result = MagicMock() + mock_query_result.rows = [row] + mock_response = MagicMock() + mock_response.query_result = mock_query_result + mock_client.query_assets.return_value = mock_response + + load_folders_asset(mock_org_node, include_labels=True) + + assert "folders/1" in mock_org_node.folders + f = mock_org_node.folders["folders/1"] + assert f.labels == {"env": "prod", "team": "eng"} + + +@patch("google.cloud.asset_v1.AssetServiceClient") +def test_load_projects_asset_with_labels(mock_asset_client_cls, mock_org_node): + """Test that labels are parsed when include_labels is True.""" + mock_client = mock_asset_client_cls.return_value + + row = { + "f": [ + {"v": "//cloudresourcemanager.googleapis.com/projects/p1"}, + {"v": "12345"}, + {"v": "p1"}, + {"v": {"f": [{"v": "organization"}, {"v": "123"}]}}, + {"v": [{"v": "projects/p1"}, {"v": "organizations/123"}]}, + {"v": {"env": "dev"}}, + ] + } + + mock_query_result = MagicMock() + mock_query_result.rows = [row] + mock_response = MagicMock() + mock_response.query_result = mock_query_result + mock_client.query_assets.return_value = mock_response + + projects = load_projects_asset(mock_org_node, include_labels=True) + + assert len(projects) == 1 + assert projects[0].labels == {"env": "dev"} diff --git a/tests/test_parsers.py b/tests/test_parsers.py index d539820..e4f0970 100644 --- a/tests/test_parsers.py +++ b/tests/test_parsers.py @@ -5,6 +5,7 @@ clean_asset_name, extract_value, extract_list_values, + extract_labels, parse_parent_struct, validate_row_structure, parse_project_row, @@ -326,3 +327,93 @@ def test_build_folder_ancestors_org_parent(): name, raw_ancestors, parent, loaded_folders, org_name ) assert result == ["folders/1", "organizations/123"] + + +# Test extract_labels +def test_extract_labels_from_dict(): + """Test extracting labels from a dict-wrapped value.""" + labels_col = {"v": {"env": "prod", "team": "infra"}} + result = extract_labels(labels_col) + assert result == {"env": "prod", "team": "infra"} + + +def test_extract_labels_empty(): + """Test extracting labels when value is None.""" + labels_col = {"v": None} + result = extract_labels(labels_col) + assert result == {} + + +def test_extract_labels_no_labels(): + """Test extracting labels from empty dict.""" + labels_col = {"v": {}} + result = extract_labels(labels_col) + assert result == {} + + +# Test parse_folder_row with labels +def test_parse_folder_row_with_labels(): + """Test parsing a folder row with labels column.""" + row = { + "f": [ + {"v": "//cloudresourcemanager.googleapis.com/folders/456"}, + {"v": "Test Folder"}, + {"v": "folders/123"}, + {"v": [{"v": "folders/456"}, {"v": "organizations/123"}]}, + {"v": {"env": "prod"}}, + ] + } + + result = parse_folder_row(row, has_labels=True) + assert result["name"] == "folders/456" + assert result["labels"] == {"env": "prod"} + + +def test_parse_folder_row_without_labels_flag(): + """Test that labels are not parsed when has_labels=False.""" + row = { + "f": [ + {"v": "//cloudresourcemanager.googleapis.com/folders/456"}, + {"v": "Test Folder"}, + {"v": "folders/123"}, + {"v": [{"v": "folders/456"}]}, + ] + } + + result = parse_folder_row(row, has_labels=False) + assert "labels" not in result + + +# Test parse_project_row with labels +def test_parse_project_row_with_labels(): + """Test parsing a project row with labels column.""" + row = { + "f": [ + {"v": "//cloudresourcemanager.googleapis.com/projects/p1"}, + {"v": "12345"}, + {"v": "p1"}, + {"v": {"f": [{"v": "folder"}, {"v": "456"}]}}, + {"v": [{"v": "projects/p1"}]}, + {"v": {"team": "backend", "env": "dev"}}, + ] + } + + result = parse_project_row(row, has_labels=True) + assert result["name"] == "projects/p1" + assert result["labels"] == {"team": "backend", "env": "dev"} + + +def test_parse_project_row_without_labels_flag(): + """Test that labels are not parsed when has_labels=False.""" + row = { + "f": [ + {"v": "//cloudresourcemanager.googleapis.com/projects/p1"}, + {"v": "12345"}, + {"v": "p1"}, + {"v": {"f": [{"v": "folder"}, {"v": "456"}]}}, + {"v": [{"v": "projects/p1"}]}, + ] + } + + result = parse_project_row(row, has_labels=False) + assert "labels" not in result diff --git a/tests/test_serializers.py b/tests/test_serializers.py index 46f6544..d168d9a 100644 --- a/tests/test_serializers.py +++ b/tests/test_serializers.py @@ -107,7 +107,8 @@ def test_with_orgless(self): _, org_node, _, p1, orgless_p = _h() projects_by_parent = {"folders/1": [p1]} result = serialize_tree( - [org_node], projects_by_parent, + [org_node], + projects_by_parent, orgless_projects=[orgless_p], ) assert len(result) == 2 @@ -197,7 +198,8 @@ def test_folder_filter_excludes_orgless(self): _, org_node, _, p1, orgless_p = _h() projects_by_parent = {"folders/1": [p1]} result = serialize_tree( - [org_node], projects_by_parent, + [org_node], + projects_by_parent, orgless_projects=[orgless_p], type_filter="folder", ) @@ -215,9 +217,55 @@ def test_basic(self): ] result = serialize_ancestors(chain) assert len(result) == 3 - assert result[0] == {"resource_name": "organizations/123", "display_name": "example.com", "type": "organization"} - assert result[1] == {"resource_name": "folders/456", "display_name": "engineering", "type": "folder"} - assert result[2] == {"resource_name": "projects/p1", "display_name": "my-project", "type": "project"} + assert result[0] == { + "resource_name": "organizations/123", + "display_name": "example.com", + "type": "organization", + } + assert result[1] == { + "resource_name": "folders/456", + "display_name": "engineering", + "type": "folder", + } + assert result[2] == { + "resource_name": "projects/p1", + "display_name": "my-project", + "type": "project", + } def test_empty(self): assert serialize_ancestors([]) == [] + + +class TestSerializeResourceWithLabelsAndTags: + def test_folder_with_labels(self): + hierarchy = make_test_hierarchy() + org_node = hierarchy.organizations[0] + f1 = org_node.folders["folders/1"] + f1.labels = {"env": "prod", "team": "eng"} + d = serialize_resource(f1.path, f1) + assert d["labels"] == {"env": "prod", "team": "eng"} + assert "tags" not in d # Empty tags should not appear + + def test_project_with_tags(self): + hierarchy = make_test_hierarchy() + p1 = next(p for p in hierarchy.projects if p.name == "projects/p1") + p1.tags = {"org/env": "production"} + d = serialize_resource(p1.path, p1) + assert d["tags"] == {"org/env": "production"} + assert "labels" not in d # Empty labels should not appear + + def test_no_labels_no_tags(self): + hierarchy = make_test_hierarchy() + org_node = hierarchy.organizations[0] + f1 = org_node.folders["folders/1"] + d = serialize_resource(f1.path, f1) + assert "labels" not in d + assert "tags" not in d + + def test_organization_has_no_labels(self): + hierarchy = make_test_hierarchy() + org_node = hierarchy.organizations[0] + d = serialize_resource("//example.com", org_node) + assert "labels" not in d + assert "tags" not in d diff --git a/uv.lock b/uv.lock index f6d300f..ee66554 100644 --- a/uv.lock +++ b/uv.lock @@ -179,7 +179,7 @@ wheels = [ [[package]] name = "gcpath" -version = "0.8.0" +version = "0.9.0" source = { editable = "." } dependencies = [ { name = "google-cloud-asset" },