Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions demos/discovery.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
from __future__ import annotations

import importlib
import os
import pkgutil
import sys
from pathlib import Path
from types import ModuleType

PLUGIN_PACKAGES_ENV = "POP_PLUGIN_PACKAGES"
PLUGIN_PACKAGE_PATHS_ENV = "POP_PLUGIN_PACKAGE_PATHS"


def split_package_names(value: str) -> tuple[str, ...]:
return tuple(part.strip() for part in value.split(",") if part.strip())


def split_search_paths(value: str) -> tuple[Path, ...]:
return tuple(
Path(part).expanduser().resolve()
for part in value.split(os.pathsep)
if part.strip()
)


def configure_plugin_search_paths(paths: tuple[Path, ...]) -> tuple[Path, ...]:
configured: list[Path] = []
for path in paths:
if not path.exists():
raise FileNotFoundError(f"Plugin package path does not exist: {path}")
path_str = str(path)
if path_str not in sys.path:
sys.path.append(path_str)
configured.append(path)
return tuple(configured)


def configured_package_names(
default_packages: tuple[str, ...] = ("demos",),
extra_packages: tuple[str, ...] = (),
) -> tuple[str, ...]:
plugin_paths = split_search_paths(os.environ.get(PLUGIN_PACKAGE_PATHS_ENV, ""))
configure_plugin_search_paths(plugin_paths)
env_packages = split_package_names(os.environ.get(PLUGIN_PACKAGES_ENV, ""))

ordered_packages: list[str] = []
seen: set[str] = set()
for package_name in (*default_packages, *extra_packages, *env_packages):
if package_name and package_name not in seen:
ordered_packages.append(package_name)
seen.add(package_name)
return tuple(ordered_packages)


def import_matching_modules(
package_names: str | tuple[str, ...],
exact_names: tuple[str, ...] = (),
prefixes: tuple[str, ...] = (),
) -> list[ModuleType]:
if isinstance(package_names, str):
package_names = (package_names,)

imported_modules: list[ModuleType] = []
seen_module_names: set[str] = set()

for package_name in package_names:
package = importlib.import_module(package_name)
module_names: set[str] = set()
package_paths = getattr(package, "__path__", None)

if package_paths is not None:
for _, name, _ in pkgutil.iter_modules(package_paths):
if name in exact_names or any(name.startswith(prefix) for prefix in prefixes):
module_names.add(name)
else:
for name in exact_names:
qualified_name = f"{package_name}.{name}"
if importlib.util.find_spec(qualified_name) is not None:
module_names.add(name)

for module_name in sorted(module_names):
qualified_name = f"{package_name}.{module_name}"
if qualified_name in seen_module_names:
continue
imported_modules.append(importlib.import_module(qualified_name))
seen_module_names.add(qualified_name)

return imported_modules


def collect_module_exports(
package_names: str | tuple[str, ...],
export_name: str,
exact_names: tuple[str, ...] = (),
prefixes: tuple[str, ...] = (),
) -> tuple[object, ...]:
exports: list[object] = []
for module in import_matching_modules(package_names, exact_names, prefixes):
exports.extend(getattr(module, export_name, ()))
return tuple(exports)
22 changes: 22 additions & 0 deletions demos/persona_definitions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from __future__ import annotations

from demos.task_registry import PersonaDefinition


PERSONA_DEFINITIONS = (
PersonaDefinition(
persona_id="design_persona",
file_path="personas/design_persona.json",
),
PersonaDefinition(
persona_id="research_persona",
file_path="personas/researcher_persona.json",
),
PersonaDefinition(
persona_id="marketing_persona",
file_path="personas/marketing_persona.json",
),
)


BUILTIN_PERSONAS = PERSONA_DEFINITIONS
13 changes: 10 additions & 3 deletions demos/persona_workflow_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
from demos.task_registry import (
build_deliverable,
build_stage_output,
persona_path_for,
resolve_persona_path,
stage_handler_id_for,
stage_sequence_for,
supported_task_types,
)
Expand Down Expand Up @@ -44,6 +45,7 @@ def load_persona_loader_module() -> Any:
def snapshot_result(result: dict[str, object]) -> dict[str, object]:
return {
"stage_name": result.get("stage_name"),
"handler_id": result.get("handler_id"),
"persona_name": result.get("persona_name"),
"role": result.get("role"),
"task_output": result.get("task_output"),
Expand Down Expand Up @@ -153,7 +155,7 @@ def load_personas_for_task(task_type: str) -> dict[str, Persona]:
for stage in stage_sequence_for(task_type):
if stage.persona_id in personas:
continue
persona_path = PROJECT_ROOT / persona_path_for(stage.persona_id)
persona_path = resolve_persona_path(stage.persona_id, PROJECT_ROOT)
personas[stage.persona_id] = load_persona(persona_path)
return personas

Expand Down Expand Up @@ -199,6 +201,7 @@ def run_stage(
)
result = {
"stage_name": stage_definition.stage_name,
"handler_id": stage_handler_id_for(task_type, stage_definition.stage_name),
"persona_id": stage_definition.persona_id,
"persona_name": persona.name,
"role": persona.role,
Expand Down Expand Up @@ -269,7 +272,11 @@ def workflow(task_input_path: Path, output_path: Path | None = None) -> Path:

def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Run the persona workflow demo with dynamic task input."
description=(
"Run the persona workflow demo with dynamic task input. "
"External plugin packages can be added with "
"POP_PLUGIN_PACKAGES and POP_PLUGIN_PACKAGE_PATHS."
)
)
parser.add_argument(
"--task-input",
Expand Down
Loading
Loading