From 41f54f9f4fec1d833884ff2650410c93c80245a2 Mon Sep 17 00:00:00 2001 From: BIN Zhang Date: Mon, 16 Mar 2026 00:48:39 +0800 Subject: [PATCH 1/2] feat: register stage handler plugins --- demos/persona_workflow_demo.py | 3 + demos/stage_handlers.py | 313 +++++++++++++++++++++++++++++++ demos/task_registry.py | 326 ++++++--------------------------- 3 files changed, 370 insertions(+), 272 deletions(-) create mode 100644 demos/stage_handlers.py diff --git a/demos/persona_workflow_demo.py b/demos/persona_workflow_demo.py index 6b5a8fa..46b1b6d 100644 --- a/demos/persona_workflow_demo.py +++ b/demos/persona_workflow_demo.py @@ -17,6 +17,7 @@ build_deliverable, build_stage_output, persona_path_for, + stage_handler_id_for, stage_sequence_for, supported_task_types, ) @@ -44,6 +45,7 @@ def load_persona_loader_module() -> Any: def snapshot_result(result: dict[str, object]) -> dict[str, object]: return { "stage_name": result.get("stage_name"), + "handler_id": result.get("handler_id"), "persona_name": result.get("persona_name"), "role": result.get("role"), "task_output": result.get("task_output"), @@ -199,6 +201,7 @@ def run_stage( ) result = { "stage_name": stage_definition.stage_name, + "handler_id": stage_handler_id_for(task_type, stage_definition.stage_name), "persona_id": stage_definition.persona_id, "persona_name": persona.name, "role": persona.role, diff --git a/demos/stage_handlers.py b/demos/stage_handlers.py new file mode 100644 index 0000000..3a7fb63 --- /dev/null +++ b/demos/stage_handlers.py @@ -0,0 +1,313 @@ +from __future__ import annotations + +from typing import Any + +from demos.task_registry import StageHandlerDefinition, register_stage_handler + + +def subject_inputs(task_input: dict[str, Any]) -> dict[str, Any]: + return task_input.get("inputs", {}) + + +def summarize_subject(task_input: dict[str, Any]) -> str: + inputs = subject_inputs(task_input) + feature_list = ", ".join(inputs.get("subject_attributes", [])) or "core attributes" + return ( + f"{inputs.get('subject_name', 'Unknown subject')} in " + f"{inputs.get('subject_category', 'Unknown')}" + f" with {feature_list}" + ) + + +def with_indefinite_article(text: str) -> str: + article = "an" if text[:1].lower() in {"a", "e", "i", "o", "u"} else "a" + return f"{article} {text}" + + +def infer_market_trends(task_input: dict[str, Any]) -> list[str]: + inputs = subject_inputs(task_input) + features = {str(feature).lower() for feature in inputs.get("subject_attributes", [])} + trends: list[str] = [] + if "5g" in features: + trends.append("5G adoption remains a purchase driver in premium devices") + if "fast charging" in features: + trends.append("Battery convenience influences upgrade decisions") + if "touchscreen" in features: + trends.append("Large responsive displays remain central to daily usage") + if str(inputs.get("subject_category", "")).lower() == "electronics": + trends.append("Consumers compare feature density with price sensitivity") + return trends or ["General market demand should be validated with recent signals"] + + +def infer_design_constraints(task_input: dict[str, Any]) -> list[str]: + inputs = subject_inputs(task_input) + constraints = inputs.get("constraints", []) + if constraints: + return constraints + return [ + "Balance concept ambition with implementation feasibility", + "Keep differentiation obvious to the target audience", + ] + + +def infer_ux_findings(task_input: dict[str, Any]) -> list[str]: + inputs = subject_inputs(task_input) + attributes = {str(item).lower() for item in inputs.get("subject_attributes", [])} + findings: list[str] = [] + if "form abandonment" in attributes: + findings.append("Long form steps likely interrupt momentum before payment.") + if "payment trust" in attributes: + findings.append("Trust cues near payment decisions need to be more visible.") + if "mobile checkout" in attributes: + findings.append("Mobile users need fewer taps and clearer progress markers.") + return findings or [ + "Review friction around the primary user path and prioritize the highest drop-off points." + ] + + +def objective_for( + task_input: dict[str, Any], + index: int, + default: str, +) -> str: + objectives = task_input.get("objectives", []) + if index < len(objectives): + return str(objectives[index]) + return default + + +def build_market_research_design_output(task_input: dict[str, Any]) -> dict[str, Any]: + return { + "summary": f"Design a concept for {summarize_subject(task_input)}", + "objective": objective_for(task_input, 0, "Design a product concept"), + "subject": subject_inputs(task_input), + "focus": [ + "clarify the value proposition", + "connect concept choices to buyer expectations", + "prepare a market-aware design brief", + ], + } + + +def build_market_research_research_output(task_input: dict[str, Any]) -> dict[str, Any]: + inputs = subject_inputs(task_input) + return { + "summary": f"Research market demand for {inputs['subject_name']}", + "objective": objective_for(task_input, 1, "Research market trends"), + "market_trends": infer_market_trends(task_input), + "focus": [ + "collect supporting market signals", + "distill concise findings", + "translate evidence into technical context", + ], + } + + +def build_market_research_marketing_output(task_input: dict[str, Any]) -> dict[str, Any]: + inputs = subject_inputs(task_input) + attributes = ", ".join(inputs["subject_attributes"]) + hero_attribute = ( + inputs["subject_attributes"][0] + if inputs["subject_attributes"] + else "the clearest differentiator" + ) + return { + "summary": f"Create marketing strategy for {inputs['subject_name']}", + "objective": objective_for(task_input, 2, "Create marketing strategy"), + "positioning": ( + f"{inputs['subject_name']} is positioned as " + f"{with_indefinite_article(inputs['subject_category'].lower())} offer that " + f"combines {attributes} into one concise value story." + ), + "campaign_hooks": [ + f"Lead with {hero_attribute} as the hero differentiator", + "Use research-backed proof points in launch messaging", + "Connect product utility to everyday buyer outcomes", + ], + "focus": [ + "turn research into positioning", + "shape a campaign-ready narrative", + "package the final deliverable", + ], + } + + +def build_product_design_design_output(task_input: dict[str, Any]) -> dict[str, Any]: + inputs = subject_inputs(task_input) + return { + "summary": ( + f"Shape the product direction for {inputs['subject_name']} with " + "attention to concept clarity and buildability" + ), + "objective": objective_for(task_input, 0, "Shape the product concept"), + "concept_direction": { + "subject": inputs["subject_name"], + "category": inputs["subject_category"], + "hero_attributes": inputs["subject_attributes"][:3], + }, + "focus": [ + "define the concept direction", + "prioritize user-facing differentiators", + "keep the concept buildable under constraints", + ], + } + + +def build_product_design_research_output(task_input: dict[str, Any]) -> dict[str, Any]: + inputs = subject_inputs(task_input) + return { + "summary": f"Research constraints and adjacent patterns for {inputs['subject_name']}", + "objective": objective_for( + task_input, + 1, + "Review design constraints and competitive cues", + ), + "design_constraints": infer_design_constraints(task_input), + "focus": [ + "compare adjacent product patterns", + "identify feasibility and usability constraints", + "surface practical tradeoffs for the concept", + ], + } + + +def build_product_design_marketing_output(task_input: dict[str, Any]) -> dict[str, Any]: + inputs = subject_inputs(task_input) + attributes = ", ".join(inputs["subject_attributes"]) + return { + "summary": f"Prepare launch framing for the {inputs['subject_name']} concept", + "objective": objective_for( + task_input, + 2, + "Prepare launch messaging for the concept", + ), + "launch_story": ( + f"Present {inputs['subject_name']} as a concept that turns {attributes} " + "into a coherent premium product direction." + ), + "stakeholder_hooks": [ + "Frame the concept as feasible, differentiated, and buyer-relevant", + "Use constraints to explain why the chosen direction is disciplined", + ], + "focus": [ + "translate the concept into launch language", + "align story with buyer expectations", + "package the concept as a reviewable brief", + ], + } + + +def build_ux_review_design_output(task_input: dict[str, Any]) -> dict[str, Any]: + inputs = subject_inputs(task_input) + return { + "summary": ( + f"Define an improved experience for {inputs['subject_name']} in " + f"{inputs['subject_category']}" + ), + "objective": objective_for(task_input, 0, "Define a better checkout experience"), + "experience_map": { + "subject": inputs["subject_name"], + "primary_friction_signals": inputs["subject_attributes"][:3], + "target_audience": inputs.get("target_audience"), + }, + "focus": [ + "map the intended user journey", + "reduce points of hesitation", + "prepare a concise improvement concept", + ], + } + + +def build_ux_review_research_output(task_input: dict[str, Any]) -> dict[str, Any]: + inputs = subject_inputs(task_input) + return { + "summary": f"Research UX friction patterns for {inputs['subject_name']}", + "objective": objective_for(task_input, 1, "Identify friction and usage patterns"), + "ux_findings": infer_ux_findings(task_input), + "focus": [ + "identify friction signals", + "connect issues to user behavior patterns", + "prioritize what should change first", + ], + } + + +def build_ux_review_marketing_output(task_input: dict[str, Any]) -> dict[str, Any]: + inputs = subject_inputs(task_input) + return { + "summary": f"Package the UX rollout plan for {inputs['subject_name']}", + "objective": objective_for( + task_input, + 2, + "Package the UX improvement plan for rollout", + ), + "rollout_story": ( + f"Position the {inputs['subject_name']} update as a friction-reduction effort " + "that improves confidence and completion rates." + ), + "stakeholder_hooks": [ + "Tie each improvement to a visible user pain point", + "Explain the rollout in terms of reduced friction and clearer trust cues", + ], + "focus": [ + "frame the UX work for stakeholders", + "turn findings into an adoption story", + "package a rollout-ready improvement plan", + ], + } + + +register_stage_handler( + StageHandlerDefinition( + handler_id="market_research.design", + builder=build_market_research_design_output, + ) +) +register_stage_handler( + StageHandlerDefinition( + handler_id="market_research.research", + builder=build_market_research_research_output, + ) +) +register_stage_handler( + StageHandlerDefinition( + handler_id="market_research.marketing", + builder=build_market_research_marketing_output, + ) +) +register_stage_handler( + StageHandlerDefinition( + handler_id="product_design.design", + builder=build_product_design_design_output, + ) +) +register_stage_handler( + StageHandlerDefinition( + handler_id="product_design.research", + builder=build_product_design_research_output, + ) +) +register_stage_handler( + StageHandlerDefinition( + handler_id="product_design.marketing", + builder=build_product_design_marketing_output, + ) +) +register_stage_handler( + StageHandlerDefinition( + handler_id="ux_review.design", + builder=build_ux_review_design_output, + ) +) +register_stage_handler( + StageHandlerDefinition( + handler_id="ux_review.research", + builder=build_ux_review_research_output, + ) +) +register_stage_handler( + StageHandlerDefinition( + handler_id="ux_review.marketing", + builder=build_ux_review_marketing_output, + ) +) diff --git a/demos/task_registry.py b/demos/task_registry.py index d59e522..07caf74 100644 --- a/demos/task_registry.py +++ b/demos/task_registry.py @@ -12,6 +12,12 @@ class PersonaDefinition: file_path: str +@dataclass(frozen=True) +class StageHandlerDefinition: + handler_id: str + builder: StageOutputBuilder + + @dataclass(frozen=True) class StageDefinition: stage_name: str @@ -26,10 +32,11 @@ class TaskTypeDefinition: task_type: str deliverable_type: str stage_sequence: tuple[StageDefinition, ...] - handlers: dict[str, StageOutputBuilder] + stage_handlers: dict[str, str] PERSONA_REGISTRY: dict[str, PersonaDefinition] = {} +STAGE_HANDLER_REGISTRY: dict[str, StageHandlerDefinition] = {} TASK_REGISTRY: dict[str, TaskTypeDefinition] = {} @@ -51,6 +58,24 @@ def persona_path_for(persona_id: str) -> str: return get_persona_definition(persona_id).file_path +def register_stage_handler(definition: StageHandlerDefinition) -> None: + STAGE_HANDLER_REGISTRY[definition.handler_id] = definition + + +def get_stage_handler_definition(handler_id: str) -> StageHandlerDefinition: + try: + return STAGE_HANDLER_REGISTRY[handler_id] + except KeyError as exc: + available = ", ".join(sorted(STAGE_HANDLER_REGISTRY)) + raise ValueError( + f"Unknown stage handler `{handler_id}`. Expected one of: {available}." + ) from exc + + +def registered_stage_handler_ids() -> frozenset[str]: + return frozenset(STAGE_HANDLER_REGISTRY) + + def register_task_type(definition: TaskTypeDefinition) -> None: TASK_REGISTRY[definition.task_type] = definition @@ -69,19 +94,24 @@ def get_task_definition(task_type: str) -> TaskTypeDefinition: ) from exc -def build_stage_output( - task_type: str, - stage_name: str, - task_input: dict[str, Any], -) -> dict[str, Any]: +def stage_handler_id_for(task_type: str, stage_name: str) -> str: definition = get_task_definition(task_type) try: - builder = definition.handlers[stage_name] + return definition.stage_handlers[stage_name] except KeyError as exc: raise ValueError( f"Task type `{task_type}` does not define a `{stage_name}` handler." ) from exc - return builder(task_input) + + +def build_stage_output( + task_type: str, + stage_name: str, + task_input: dict[str, Any], +) -> dict[str, Any]: + handler_id = stage_handler_id_for(task_type, stage_name) + handler = get_stage_handler_definition(handler_id) + return handler.builder(task_input) def stage_sequence_for(task_type: str) -> tuple[StageDefinition, ...]: @@ -108,6 +138,7 @@ def build_deliverable(task_type: str, context: Any) -> dict[str, Any]: "deliverable_type": definition.deliverable_type, "inputs": context.task_input.get("inputs", {}), "objectives": context.task_input.get("objectives", []), + "stage_handlers": dict(definition.stage_handlers), } for stage in definition.stage_sequence: @@ -128,258 +159,6 @@ def build_deliverable(task_type: str, context: Any) -> dict[str, Any]: return deliverable -def subject_inputs(task_input: dict[str, Any]) -> dict[str, Any]: - return task_input.get("inputs", {}) - - -def summarize_subject(task_input: dict[str, Any]) -> str: - inputs = subject_inputs(task_input) - feature_list = ", ".join(inputs.get("subject_attributes", [])) or "core attributes" - return ( - f"{inputs.get('subject_name', 'Unknown subject')} in " - f"{inputs.get('subject_category', 'Unknown')}" - f" with {feature_list}" - ) - - -def with_indefinite_article(text: str) -> str: - article = "an" if text[:1].lower() in {"a", "e", "i", "o", "u"} else "a" - return f"{article} {text}" - - -def infer_market_trends(task_input: dict[str, Any]) -> list[str]: - inputs = subject_inputs(task_input) - features = {str(feature).lower() for feature in inputs.get("subject_attributes", [])} - trends: list[str] = [] - if "5g" in features: - trends.append("5G adoption remains a purchase driver in premium devices") - if "fast charging" in features: - trends.append("Battery convenience influences upgrade decisions") - if "touchscreen" in features: - trends.append("Large responsive displays remain central to daily usage") - if str(inputs.get("subject_category", "")).lower() == "electronics": - trends.append("Consumers compare feature density with price sensitivity") - return trends or ["General market demand should be validated with recent signals"] - - -def infer_design_constraints(task_input: dict[str, Any]) -> list[str]: - inputs = subject_inputs(task_input) - constraints = inputs.get("constraints", []) - if constraints: - return constraints - return [ - "Balance concept ambition with implementation feasibility", - "Keep differentiation obvious to the target audience", - ] - - -def infer_ux_findings(task_input: dict[str, Any]) -> list[str]: - inputs = subject_inputs(task_input) - attributes = {str(item).lower() for item in inputs.get("subject_attributes", [])} - findings: list[str] = [] - if "form abandonment" in attributes: - findings.append("Long form steps likely interrupt momentum before payment.") - if "payment trust" in attributes: - findings.append("Trust cues near payment decisions need to be more visible.") - if "mobile checkout" in attributes: - findings.append("Mobile users need fewer taps and clearer progress markers.") - return findings or [ - "Review friction around the primary user path and prioritize the highest drop-off points." - ] - - -def objective_for( - task_input: dict[str, Any], - index: int, - default: str, -) -> str: - objectives = task_input.get("objectives", []) - if index < len(objectives): - return str(objectives[index]) - return default - - -def build_market_research_design_output(task_input: dict[str, Any]) -> dict[str, Any]: - return { - "summary": f"Design a concept for {summarize_subject(task_input)}", - "objective": objective_for(task_input, 0, "Design a product concept"), - "subject": subject_inputs(task_input), - "focus": [ - "clarify the value proposition", - "connect concept choices to buyer expectations", - "prepare a market-aware design brief", - ], - } - - -def build_market_research_research_output(task_input: dict[str, Any]) -> dict[str, Any]: - inputs = subject_inputs(task_input) - return { - "summary": f"Research market demand for {inputs['subject_name']}", - "objective": objective_for(task_input, 1, "Research market trends"), - "market_trends": infer_market_trends(task_input), - "focus": [ - "collect supporting market signals", - "distill concise findings", - "translate evidence into technical context", - ], - } - - -def build_market_research_marketing_output(task_input: dict[str, Any]) -> dict[str, Any]: - inputs = subject_inputs(task_input) - attributes = ", ".join(inputs["subject_attributes"]) - hero_attribute = ( - inputs["subject_attributes"][0] - if inputs["subject_attributes"] - else "the clearest differentiator" - ) - return { - "summary": f"Create marketing strategy for {inputs['subject_name']}", - "objective": objective_for(task_input, 2, "Create marketing strategy"), - "positioning": ( - f"{inputs['subject_name']} is positioned as " - f"{with_indefinite_article(inputs['subject_category'].lower())} offer that " - f"combines {attributes} into one concise value story." - ), - "campaign_hooks": [ - f"Lead with {hero_attribute} as the hero differentiator", - "Use research-backed proof points in launch messaging", - "Connect product utility to everyday buyer outcomes", - ], - "focus": [ - "turn research into positioning", - "shape a campaign-ready narrative", - "package the final deliverable", - ], - } - - -def build_product_design_design_output(task_input: dict[str, Any]) -> dict[str, Any]: - inputs = subject_inputs(task_input) - return { - "summary": ( - f"Shape the product direction for {inputs['subject_name']} with " - "attention to concept clarity and buildability" - ), - "objective": objective_for(task_input, 0, "Shape the product concept"), - "concept_direction": { - "subject": inputs["subject_name"], - "category": inputs["subject_category"], - "hero_attributes": inputs["subject_attributes"][:3], - }, - "focus": [ - "define the concept direction", - "prioritize user-facing differentiators", - "keep the concept buildable under constraints", - ], - } - - -def build_product_design_research_output(task_input: dict[str, Any]) -> dict[str, Any]: - inputs = subject_inputs(task_input) - return { - "summary": f"Research constraints and adjacent patterns for {inputs['subject_name']}", - "objective": objective_for( - task_input, - 1, - "Review design constraints and competitive cues", - ), - "design_constraints": infer_design_constraints(task_input), - "focus": [ - "compare adjacent product patterns", - "identify feasibility and usability constraints", - "surface practical tradeoffs for the concept", - ], - } - - -def build_product_design_marketing_output(task_input: dict[str, Any]) -> dict[str, Any]: - inputs = subject_inputs(task_input) - attributes = ", ".join(inputs["subject_attributes"]) - return { - "summary": f"Prepare launch framing for the {inputs['subject_name']} concept", - "objective": objective_for( - task_input, - 2, - "Prepare launch messaging for the concept", - ), - "launch_story": ( - f"Present {inputs['subject_name']} as a concept that turns {attributes} " - "into a coherent premium product direction." - ), - "stakeholder_hooks": [ - "Frame the concept as feasible, differentiated, and buyer-relevant", - "Use constraints to explain why the chosen direction is disciplined", - ], - "focus": [ - "translate the concept into launch language", - "align story with buyer expectations", - "package the concept as a reviewable brief", - ], - } - - -def build_ux_review_design_output(task_input: dict[str, Any]) -> dict[str, Any]: - inputs = subject_inputs(task_input) - return { - "summary": ( - f"Define an improved experience for {inputs['subject_name']} in " - f"{inputs['subject_category']}" - ), - "objective": objective_for(task_input, 0, "Define a better checkout experience"), - "experience_map": { - "subject": inputs["subject_name"], - "primary_friction_signals": inputs["subject_attributes"][:3], - "target_audience": inputs.get("target_audience"), - }, - "focus": [ - "map the intended user journey", - "reduce points of hesitation", - "prepare a concise improvement concept", - ], - } - - -def build_ux_review_research_output(task_input: dict[str, Any]) -> dict[str, Any]: - inputs = subject_inputs(task_input) - return { - "summary": f"Research UX friction patterns for {inputs['subject_name']}", - "objective": objective_for(task_input, 1, "Identify friction and usage patterns"), - "ux_findings": infer_ux_findings(task_input), - "focus": [ - "identify friction signals", - "connect issues to user behavior patterns", - "prioritize what should change first", - ], - } - - -def build_ux_review_marketing_output(task_input: dict[str, Any]) -> dict[str, Any]: - inputs = subject_inputs(task_input) - return { - "summary": f"Package the UX rollout plan for {inputs['subject_name']}", - "objective": objective_for( - task_input, - 2, - "Package the UX improvement plan for rollout", - ), - "rollout_story": ( - f"Position the {inputs['subject_name']} update as a friction-reduction effort " - "that improves confidence and completion rates." - ), - "stakeholder_hooks": [ - "Tie each improvement to a visible user pain point", - "Explain the rollout in terms of reduced friction and clearer trust cues", - ], - "focus": [ - "frame the UX work for stakeholders", - "turn findings into an adoption story", - "package a rollout-ready improvement plan", - ], - } - - COMMON_STAGE_SEQUENCE = ( StageDefinition( stage_name="design", @@ -424,15 +203,18 @@ def build_ux_review_marketing_output(task_input: dict[str, Any]) -> dict[str, An ) +from demos import stage_handlers as _stage_handlers + + register_task_type( TaskTypeDefinition( task_type="market_research", deliverable_type="market_strategy_report", stage_sequence=COMMON_STAGE_SEQUENCE, - handlers={ - "design": build_market_research_design_output, - "research": build_market_research_research_output, - "marketing": build_market_research_marketing_output, + stage_handlers={ + "design": "market_research.design", + "research": "market_research.research", + "marketing": "market_research.marketing", }, ) ) @@ -442,10 +224,10 @@ def build_ux_review_marketing_output(task_input: dict[str, Any]) -> dict[str, An task_type="product_design", deliverable_type="product_concept_brief", stage_sequence=COMMON_STAGE_SEQUENCE, - handlers={ - "design": build_product_design_design_output, - "research": build_product_design_research_output, - "marketing": build_product_design_marketing_output, + stage_handlers={ + "design": "product_design.design", + "research": "product_design.research", + "marketing": "product_design.marketing", }, ) ) @@ -455,10 +237,10 @@ def build_ux_review_marketing_output(task_input: dict[str, Any]) -> dict[str, An task_type="ux_review", deliverable_type="ux_improvement_plan", stage_sequence=COMMON_STAGE_SEQUENCE, - handlers={ - "design": build_ux_review_design_output, - "research": build_ux_review_research_output, - "marketing": build_ux_review_marketing_output, + stage_handlers={ + "design": "ux_review.design", + "research": "ux_review.research", + "marketing": "ux_review.marketing", }, ) ) From acaa8fdca8ff52fc98794a604d683a117dc7eb68 Mon Sep 17 00:00:00 2001 From: Bin Zhang <138868899+joy7758@users.noreply.github.com> Date: Mon, 16 Mar 2026 01:27:40 +0800 Subject: [PATCH 2/2] feat: split builtin task type definitions (#17) * feat: split builtin task type definitions * feat: split builtin persona definitions (#18) * feat: split builtin persona definitions * feat: add registry discovery entrypoint (#19) * feat: add registry discovery entrypoint * feat: support external plugin packages (#20) --- demos/discovery.py | 100 +++++++++++++++++++++ demos/persona_definitions.py | 22 +++++ demos/persona_workflow_demo.py | 10 ++- demos/stage_handlers.py | 41 +++------ demos/task_registry.py | 153 ++++++++++++++++----------------- demos/task_types.py | 64 ++++++++++++++ 6 files changed, 282 insertions(+), 108 deletions(-) create mode 100644 demos/discovery.py create mode 100644 demos/persona_definitions.py create mode 100644 demos/task_types.py diff --git a/demos/discovery.py b/demos/discovery.py new file mode 100644 index 0000000..40622e1 --- /dev/null +++ b/demos/discovery.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import importlib +import os +import pkgutil +import sys +from pathlib import Path +from types import ModuleType + +PLUGIN_PACKAGES_ENV = "POP_PLUGIN_PACKAGES" +PLUGIN_PACKAGE_PATHS_ENV = "POP_PLUGIN_PACKAGE_PATHS" + + +def split_package_names(value: str) -> tuple[str, ...]: + return tuple(part.strip() for part in value.split(",") if part.strip()) + + +def split_search_paths(value: str) -> tuple[Path, ...]: + return tuple( + Path(part).expanduser().resolve() + for part in value.split(os.pathsep) + if part.strip() + ) + + +def configure_plugin_search_paths(paths: tuple[Path, ...]) -> tuple[Path, ...]: + configured: list[Path] = [] + for path in paths: + if not path.exists(): + raise FileNotFoundError(f"Plugin package path does not exist: {path}") + path_str = str(path) + if path_str not in sys.path: + sys.path.append(path_str) + configured.append(path) + return tuple(configured) + + +def configured_package_names( + default_packages: tuple[str, ...] = ("demos",), + extra_packages: tuple[str, ...] = (), +) -> tuple[str, ...]: + plugin_paths = split_search_paths(os.environ.get(PLUGIN_PACKAGE_PATHS_ENV, "")) + configure_plugin_search_paths(plugin_paths) + env_packages = split_package_names(os.environ.get(PLUGIN_PACKAGES_ENV, "")) + + ordered_packages: list[str] = [] + seen: set[str] = set() + for package_name in (*default_packages, *extra_packages, *env_packages): + if package_name and package_name not in seen: + ordered_packages.append(package_name) + seen.add(package_name) + return tuple(ordered_packages) + + +def import_matching_modules( + package_names: str | tuple[str, ...], + exact_names: tuple[str, ...] = (), + prefixes: tuple[str, ...] = (), +) -> list[ModuleType]: + if isinstance(package_names, str): + package_names = (package_names,) + + imported_modules: list[ModuleType] = [] + seen_module_names: set[str] = set() + + for package_name in package_names: + package = importlib.import_module(package_name) + module_names: set[str] = set() + package_paths = getattr(package, "__path__", None) + + if package_paths is not None: + for _, name, _ in pkgutil.iter_modules(package_paths): + if name in exact_names or any(name.startswith(prefix) for prefix in prefixes): + module_names.add(name) + else: + for name in exact_names: + qualified_name = f"{package_name}.{name}" + if importlib.util.find_spec(qualified_name) is not None: + module_names.add(name) + + for module_name in sorted(module_names): + qualified_name = f"{package_name}.{module_name}" + if qualified_name in seen_module_names: + continue + imported_modules.append(importlib.import_module(qualified_name)) + seen_module_names.add(qualified_name) + + return imported_modules + + +def collect_module_exports( + package_names: str | tuple[str, ...], + export_name: str, + exact_names: tuple[str, ...] = (), + prefixes: tuple[str, ...] = (), +) -> tuple[object, ...]: + exports: list[object] = [] + for module in import_matching_modules(package_names, exact_names, prefixes): + exports.extend(getattr(module, export_name, ())) + return tuple(exports) diff --git a/demos/persona_definitions.py b/demos/persona_definitions.py new file mode 100644 index 0000000..e335c08 --- /dev/null +++ b/demos/persona_definitions.py @@ -0,0 +1,22 @@ +from __future__ import annotations + +from demos.task_registry import PersonaDefinition + + +PERSONA_DEFINITIONS = ( + PersonaDefinition( + persona_id="design_persona", + file_path="personas/design_persona.json", + ), + PersonaDefinition( + persona_id="research_persona", + file_path="personas/researcher_persona.json", + ), + PersonaDefinition( + persona_id="marketing_persona", + file_path="personas/marketing_persona.json", + ), +) + + +BUILTIN_PERSONAS = PERSONA_DEFINITIONS diff --git a/demos/persona_workflow_demo.py b/demos/persona_workflow_demo.py index 46b1b6d..267ed1d 100644 --- a/demos/persona_workflow_demo.py +++ b/demos/persona_workflow_demo.py @@ -16,7 +16,7 @@ from demos.task_registry import ( build_deliverable, build_stage_output, - persona_path_for, + resolve_persona_path, stage_handler_id_for, stage_sequence_for, supported_task_types, @@ -155,7 +155,7 @@ def load_personas_for_task(task_type: str) -> dict[str, Persona]: for stage in stage_sequence_for(task_type): if stage.persona_id in personas: continue - persona_path = PROJECT_ROOT / persona_path_for(stage.persona_id) + persona_path = resolve_persona_path(stage.persona_id, PROJECT_ROOT) personas[stage.persona_id] = load_persona(persona_path) return personas @@ -272,7 +272,11 @@ def workflow(task_input_path: Path, output_path: Path | None = None) -> Path: def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( - description="Run the persona workflow demo with dynamic task input." + description=( + "Run the persona workflow demo with dynamic task input. " + "External plugin packages can be added with " + "POP_PLUGIN_PACKAGES and POP_PLUGIN_PACKAGE_PATHS." + ) ) parser.add_argument( "--task-input", diff --git a/demos/stage_handlers.py b/demos/stage_handlers.py index 3a7fb63..166d908 100644 --- a/demos/stage_handlers.py +++ b/demos/stage_handlers.py @@ -2,7 +2,7 @@ from typing import Any -from demos.task_registry import StageHandlerDefinition, register_stage_handler +from demos.task_registry import StageHandlerDefinition def subject_inputs(task_input: dict[str, Any]) -> dict[str, Any]: @@ -257,57 +257,44 @@ def build_ux_review_marketing_output(task_input: dict[str, Any]) -> dict[str, An } -register_stage_handler( +STAGE_HANDLER_DEFINITIONS = ( StageHandlerDefinition( handler_id="market_research.design", builder=build_market_research_design_output, - ) -) -register_stage_handler( + ), StageHandlerDefinition( handler_id="market_research.research", builder=build_market_research_research_output, - ) -) -register_stage_handler( + ), StageHandlerDefinition( handler_id="market_research.marketing", builder=build_market_research_marketing_output, - ) -) -register_stage_handler( + ), StageHandlerDefinition( handler_id="product_design.design", builder=build_product_design_design_output, - ) -) -register_stage_handler( + ), StageHandlerDefinition( handler_id="product_design.research", builder=build_product_design_research_output, - ) -) -register_stage_handler( + ), StageHandlerDefinition( handler_id="product_design.marketing", builder=build_product_design_marketing_output, - ) -) -register_stage_handler( + ), StageHandlerDefinition( handler_id="ux_review.design", builder=build_ux_review_design_output, - ) -) -register_stage_handler( + ), StageHandlerDefinition( handler_id="ux_review.research", builder=build_ux_review_research_output, - ) -) -register_stage_handler( + ), StageHandlerDefinition( handler_id="ux_review.marketing", builder=build_ux_review_marketing_output, - ) + ), ) + + +BUILTIN_STAGE_HANDLERS = STAGE_HANDLER_DEFINITIONS diff --git a/demos/task_registry.py b/demos/task_registry.py index 07caf74..2813570 100644 --- a/demos/task_registry.py +++ b/demos/task_registry.py @@ -1,6 +1,8 @@ from __future__ import annotations +import importlib from dataclasses import dataclass +from pathlib import Path from typing import Any, Callable StageOutputBuilder = Callable[[dict[str, Any]], dict[str, Any]] @@ -10,6 +12,7 @@ class PersonaDefinition: persona_id: str file_path: str + package_name: str | None = None @dataclass(frozen=True) @@ -38,6 +41,7 @@ class TaskTypeDefinition: PERSONA_REGISTRY: dict[str, PersonaDefinition] = {} STAGE_HANDLER_REGISTRY: dict[str, StageHandlerDefinition] = {} TASK_REGISTRY: dict[str, TaskTypeDefinition] = {} +LOADED_REGISTRY_PACKAGES: tuple[str, ...] = () def register_persona(definition: PersonaDefinition) -> None: @@ -58,6 +62,26 @@ def persona_path_for(persona_id: str) -> str: return get_persona_definition(persona_id).file_path +def resolve_persona_path(persona_id: str, project_root: str | Path) -> Path: + definition = get_persona_definition(persona_id) + persona_path = Path(definition.file_path).expanduser() + if persona_path.is_absolute(): + return persona_path + if definition.package_name: + package = importlib.import_module(definition.package_name) + package_paths = list(getattr(package, "__path__", [])) + if package_paths: + return Path(package_paths[0]) / persona_path + package_file = getattr(package, "__file__", None) + if package_file is not None: + return Path(package_file).resolve().parent / persona_path + return Path(project_root) / persona_path + + +def registered_persona_ids() -> frozenset[str]: + return frozenset(PERSONA_REGISTRY) + + def register_stage_handler(definition: StageHandlerDefinition) -> None: STAGE_HANDLER_REGISTRY[definition.handler_id] = definition @@ -76,6 +100,16 @@ def registered_stage_handler_ids() -> frozenset[str]: return frozenset(STAGE_HANDLER_REGISTRY) +def reset_registry() -> None: + PERSONA_REGISTRY.clear() + STAGE_HANDLER_REGISTRY.clear() + TASK_REGISTRY.clear() + + +def loaded_registry_packages() -> tuple[str, ...]: + return LOADED_REGISTRY_PACKAGES + + def register_task_type(definition: TaskTypeDefinition) -> None: TASK_REGISTRY[definition.task_type] = definition @@ -159,88 +193,51 @@ def build_deliverable(task_type: str, context: Any) -> dict[str, Any]: return deliverable -COMMON_STAGE_SEQUENCE = ( - StageDefinition( - stage_name="design", - persona_id="design_persona", - progress_label="Designing", - deliverable_section="design_brief", - ), - StageDefinition( - stage_name="research", - persona_id="research_persona", - progress_label="Researching", - deliverable_section="research_summary", - depends_on=("design",), - ), - StageDefinition( - stage_name="marketing", - persona_id="marketing_persona", - progress_label="Marketing", - deliverable_section="marketing_plan", - depends_on=("design", "research"), - ), -) - - -register_persona( - PersonaDefinition( - persona_id="design_persona", - file_path="personas/design_persona.json", +def load_registry( + extra_packages: tuple[str, ...] = (), + reset: bool = True, +) -> tuple[str, ...]: + from demos.discovery import collect_module_exports, configured_package_names + + package_names = configured_package_names( + default_packages=("demos",), + extra_packages=extra_packages, ) -) -register_persona( - PersonaDefinition( - persona_id="research_persona", - file_path="personas/researcher_persona.json", + if reset: + reset_registry() + + handlers = collect_module_exports( + package_names, + "STAGE_HANDLER_DEFINITIONS", + exact_names=("stage_handlers",), + prefixes=("stage_handlers_",), ) -) -register_persona( - PersonaDefinition( - persona_id="marketing_persona", - file_path="personas/marketing_persona.json", + personas = collect_module_exports( + package_names, + "PERSONA_DEFINITIONS", + exact_names=("persona_definitions",), + prefixes=("persona_definitions_",), + ) + task_types = collect_module_exports( + package_names, + "TASK_TYPE_DEFINITIONS", + exact_names=("task_types",), + prefixes=("task_types_",), ) -) + for handler in handlers: + register_stage_handler(handler) + for persona in personas: + register_persona(persona) + for task_type in task_types: + register_task_type(task_type) + global LOADED_REGISTRY_PACKAGES + LOADED_REGISTRY_PACKAGES = package_names + return package_names -from demos import stage_handlers as _stage_handlers +def load_builtin_registry() -> tuple[str, ...]: + return load_registry() -register_task_type( - TaskTypeDefinition( - task_type="market_research", - deliverable_type="market_strategy_report", - stage_sequence=COMMON_STAGE_SEQUENCE, - stage_handlers={ - "design": "market_research.design", - "research": "market_research.research", - "marketing": "market_research.marketing", - }, - ) -) - -register_task_type( - TaskTypeDefinition( - task_type="product_design", - deliverable_type="product_concept_brief", - stage_sequence=COMMON_STAGE_SEQUENCE, - stage_handlers={ - "design": "product_design.design", - "research": "product_design.research", - "marketing": "product_design.marketing", - }, - ) -) - -register_task_type( - TaskTypeDefinition( - task_type="ux_review", - deliverable_type="ux_improvement_plan", - stage_sequence=COMMON_STAGE_SEQUENCE, - stage_handlers={ - "design": "ux_review.design", - "research": "ux_review.research", - "marketing": "ux_review.marketing", - }, - ) -) + +load_registry() diff --git a/demos/task_types.py b/demos/task_types.py new file mode 100644 index 0000000..0aec595 --- /dev/null +++ b/demos/task_types.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +from demos.task_registry import StageDefinition, TaskTypeDefinition + + +COMMON_STAGE_SEQUENCE = ( + StageDefinition( + stage_name="design", + persona_id="design_persona", + progress_label="Designing", + deliverable_section="design_brief", + ), + StageDefinition( + stage_name="research", + persona_id="research_persona", + progress_label="Researching", + deliverable_section="research_summary", + depends_on=("design",), + ), + StageDefinition( + stage_name="marketing", + persona_id="marketing_persona", + progress_label="Marketing", + deliverable_section="marketing_plan", + depends_on=("design", "research"), + ), +) + + +TASK_TYPE_DEFINITIONS = ( + TaskTypeDefinition( + task_type="market_research", + deliverable_type="market_strategy_report", + stage_sequence=COMMON_STAGE_SEQUENCE, + stage_handlers={ + "design": "market_research.design", + "research": "market_research.research", + "marketing": "market_research.marketing", + }, + ), + TaskTypeDefinition( + task_type="product_design", + deliverable_type="product_concept_brief", + stage_sequence=COMMON_STAGE_SEQUENCE, + stage_handlers={ + "design": "product_design.design", + "research": "product_design.research", + "marketing": "product_design.marketing", + }, + ), + TaskTypeDefinition( + task_type="ux_review", + deliverable_type="ux_improvement_plan", + stage_sequence=COMMON_STAGE_SEQUENCE, + stage_handlers={ + "design": "ux_review.design", + "research": "ux_review.research", + "marketing": "ux_review.marketing", + }, + ), +) + + +BUILTIN_TASK_TYPES = TASK_TYPE_DEFINITIONS