Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
GROQ_API_KEY=<key>
AFK_INCOMING_ROLE=user
12 changes: 6 additions & 6 deletions afk/agents/__init__.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from .base import BaseAgent, load_chain_of_thought_prompt
from .pm import PMAgent, pm_manifest
from .analyst import AnalystAgent, analyst_manifest
from .qa import QAAgent, qa_manifest
from .pm import PMAgent, create_pm_manifest
from .analyst import AnalystAgent, create_analyst_manifest
from .qa import QAAgent, create_qa_manifest

__all__ = [
"BaseAgent",
"PMAgent",
"AnalystAgent",
"QAAgent",
"pm_manifest",
"analyst_manifest",
"qa_manifest",
"create_pm_manifest",
"create_analyst_manifest",
"create_qa_manifest",
"load_chain_of_thought_prompt",
]
21 changes: 11 additions & 10 deletions afk/agents/analyst.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
"Critique your findings and produce a thorough markdown report before delegating to the QA agent for verification."
)

analyst_manifest = AgentManifest(
id="analyst",
system_prompt=ANALYST_SYSTEM_PROMPT,
allowed_agents=["qa"],
delegate_rules={"qa": DelegateRule(when_to_delegate="When analysis is complete")},
overseer="qa",
mcp_bindings=ANALYST_BINDINGS,
)
def create_analyst_manifest(agent_id: str) -> AgentManifest:
return AgentManifest(
id=agent_id,
system_prompt=ANALYST_SYSTEM_PROMPT,
allowed_agents=["qa"],
delegate_rules={"qa": DelegateRule(when_to_delegate="When analysis is complete")},
overseer="qa",
mcp_bindings=ANALYST_BINDINGS,
)


class AnalystAgent(BaseAgent):
def __init__(self, orchestrator: Optional[Any] = None) -> None:
super().__init__(analyst_manifest, orchestrator)
def __init__(self, agent_id: str = "analyst", orchestrator: Optional[Any] = None) -> None:
super().__init__(create_analyst_manifest(agent_id), orchestrator, agent_type="analyst")
71 changes: 61 additions & 10 deletions afk/agents/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,20 +216,24 @@ class BaseAgent:
"parameters": {
"type": "object",
"properties": {
"recipient_id": {"type": "string"},
"agent_type": {"type": "string"},
"context_summary": {"type": "string"},
},
"required": ["recipient_id", "context_summary"],
"required": ["agent_type", "context_summary"],
"additionalProperties": False,
},
},
},
]

def __init__(
self, manifest: AgentManifest, orchestrator: Optional[Any] = None
self,
manifest: AgentManifest,
orchestrator: Optional[Any] = None,
agent_type: Optional[str] = None,
) -> None:
self.manifest = manifest
self.agent_type = agent_type or manifest.id
self.orchestrator = orchestrator
self.session = Session()
self.history: List[Dict[str, str]] = []
Expand Down Expand Up @@ -491,6 +495,58 @@ def stream(self, user_message: str) -> Iterator[StreamChunk]:
if assistant_content:
self.history.append({"role": "assistant", "content": assistant_content})

def respond(self, user_message: str) -> Iterator[StreamChunk]:
"""Stream a response and automatically re-ping until complete."""
keep_pinging = True
in_thought = False
orig_thinking = False
content = user_message

while keep_pinging:
keep_pinging = False
ended = False
had_post_end = False
call_seen = False
rollback_len = len(self.history)
try:
for c in self.stream(content):
orig_thinking = c.get("thinking", orig_thinking)
if c.get("status") == "start_thinking":
in_thought = True
if c.get("status") == "end_thinking":
ended = True
elif ended and (c.get("text") or c.get("call")):
had_post_end = True
if c.get("text"):
in_thought = False
if in_thought:
c["thinking"] = True
yield c
if c.get("call"):
call_seen = True
except openai.OpenAIError:
self.history = self.history[:rollback_len]
raise

rollback_len = len(self.history)
if call_seen:
keep_pinging = True
content = ""
in_thought = True
elif ended and not had_post_end:
keep_pinging = True
content = ""
in_thought = True
elif orig_thinking:
keep_pinging = True
content = ""
in_thought = True
orig_thinking = False
else:
content = ""
in_thought = False
orig_thinking = False

# ------------------------------------------------------------------
# IPython helpers
# ------------------------------------------------------------------
Expand Down Expand Up @@ -518,16 +574,11 @@ def _handle_cap_function(self, name: str, args: Dict[str, Any]) -> str:
self.orchestrator.dispatch_message(msg)
return "message sent"
elif name == "delegate_task":
from ..cap.models import DelegationNotice

notice = DelegationNotice(
task_id=self.orchestrator.next_task_id(),
return self.orchestrator.delegate_task(
sender_id=self.manifest.id,
recipient_id=args["recipient_id"],
agent_type=args["agent_type"],
context_summary=args["context_summary"],
)
self.orchestrator.dispatch_message(notice)
return "task delegated"
return ""

# ------------------------------------------------------------------
Expand Down
21 changes: 11 additions & 10 deletions afk/agents/pm.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,17 @@
"Delegate analysis to the analyst agent when ready and oversee results via the QA agent."
)

pm_manifest = AgentManifest(
id="pm",
system_prompt=PM_SYSTEM_PROMPT,
allowed_agents=["analyst"],
delegate_rules={"analyst": DelegateRule(when_to_delegate="When tasks require analysis")},
overseer="qa",
mcp_bindings=PM_BINDINGS,
)
def create_pm_manifest(agent_id: str) -> AgentManifest:
return AgentManifest(
id=agent_id,
system_prompt=PM_SYSTEM_PROMPT,
allowed_agents=["analyst"],
delegate_rules={"analyst": DelegateRule(when_to_delegate="When tasks require analysis")},
overseer="qa",
mcp_bindings=PM_BINDINGS,
)


class PMAgent(BaseAgent):
def __init__(self, orchestrator: Optional[Any] = None) -> None:
super().__init__(pm_manifest, orchestrator)
def __init__(self, agent_id: str = "pm", orchestrator: Optional[Any] = None) -> None:
super().__init__(create_pm_manifest(agent_id), orchestrator, agent_type="pm")
21 changes: 11 additions & 10 deletions afk/agents/qa.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,17 @@
"Communicate any issues back to the analyst and confirm when the analysis looks sound before responding to the PM."
)

qa_manifest = AgentManifest(
id="qa",
system_prompt=QA_SYSTEM_PROMPT,
allowed_agents=[],
delegate_rules={},
overseer=None,
mcp_bindings=QA_BINDINGS,
)
def create_qa_manifest(agent_id: str) -> AgentManifest:
return AgentManifest(
id=agent_id,
system_prompt=QA_SYSTEM_PROMPT,
allowed_agents=[],
delegate_rules={},
overseer=None,
mcp_bindings=QA_BINDINGS,
)


class QAAgent(BaseAgent):
def __init__(self, orchestrator: Optional[Any] = None) -> None:
super().__init__(qa_manifest, orchestrator)
def __init__(self, agent_id: str = "qa", orchestrator: Optional[Any] = None) -> None:
super().__init__(create_qa_manifest(agent_id), orchestrator, agent_type="qa")
1 change: 1 addition & 0 deletions afk/cap/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ class DelegationNotice(BaseMessage):
"""Handover message for delegation."""

type: Literal["delegation_notice"] = "delegation_notice"
agent_type: str
context_summary: str


Expand Down
105 changes: 56 additions & 49 deletions afk/cap/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@

from __future__ import annotations

from typing import Dict, Iterator, List, Tuple
from typing import Dict, Iterator, List, Tuple, Callable
import uuid

from ..config import settings

import openai

from ..agents.base import BaseAgent, StreamChunk
from ..agents import PMAgent, AnalystAgent, QAAgent
from .models import BaseMessage, DelegationNotice


Expand All @@ -15,13 +19,20 @@ class Orchestrator:

def __init__(self) -> None:
self.agents: Dict[str, BaseAgent] = {}
self.agent_types: Dict[str, str] = {}
self.task_counter = 0
self.queue: List[BaseMessage] = []
self.user_messages: List[BaseMessage] = []
self.agent_builders: Dict[str, Callable[[str, "Orchestrator"], BaseAgent]] = {
"pm": PMAgent,
"analyst": AnalystAgent,
"qa": QAAgent,
}

def register_agent(self, agent: BaseAgent) -> None:
"""Register an agent instance."""
self.agents[agent.manifest.id] = agent
self.agent_types[agent.manifest.id] = agent.agent_type

def next_task_id(self) -> str:
self.task_counter += 1
Expand All @@ -36,6 +47,7 @@ def handle_user_message(self, content: str) -> None:
task_id=self.next_task_id(),
sender_id="user",
recipient_id=recipient,
agent_type="pm",
context_summary=content,
)
self.dispatch_message(notice)
Expand All @@ -47,6 +59,27 @@ def dispatch_message(self, message: BaseMessage) -> None:
else:
self.queue.append(message)

def _create_agent(self, agent_type: str, agent_id: str) -> BaseAgent:
builder = self.agent_builders.get(agent_type)
if not builder:
raise ValueError(f"Unknown agent type: {agent_type}")
return builder(agent_id, self)

def delegate_task(self, sender_id: str, agent_type: str, context_summary: str) -> str:
"""Spawn a new agent and dispatch a delegation notice."""
agent_id = uuid.uuid4().hex[:5]
agent = self._create_agent(agent_type, agent_id)
self.register_agent(agent)
notice = DelegationNotice(
task_id=self.next_task_id(),
sender_id=sender_id,
recipient_id=agent_id,
context_summary=context_summary,
agent_type=agent_type,
)
self.dispatch_message(notice)
return agent_id

def run(self) -> Iterator[Tuple[str, StreamChunk]]:
"""Process queued messages yielding agent outputs."""
while self.queue:
Expand All @@ -56,51 +89,25 @@ def run(self) -> Iterator[Tuple[str, StreamChunk]]:
continue
content = msg.content or getattr(msg, "context_summary", "")

keep_pinging = True
in_thought = False
orig_thinking = False
while keep_pinging:
keep_pinging = False
ended = False
had_post_end = False
call_seen = False
history_len = len(agent.history)
try:
for chunk in agent.stream(content):
c = dict(chunk)
orig_thinking = c.get("thinking", orig_thinking)
if c.get("status") == "start_thinking":
in_thought = True
if c.get("status") == "end_thinking":
ended = True
elif ended and (c.get("text") or c.get("call")):
had_post_end = True
if c.get("text"):
in_thought = False
if in_thought:
c["thinking"] = True
yield agent.manifest.id, c
if c.get("call"):
call_seen = True
except openai.OpenAIError as e:
self.queue.insert(0, msg)
agent.history = agent.history[:history_len]
yield agent.manifest.id, {"error": str(e), "body": getattr(e, "body", None)}
return

if call_seen:
keep_pinging = True
content = ""
in_thought = True
elif ended and not had_post_end:
keep_pinging = True
content = ""
in_thought = True
elif orig_thinking:
keep_pinging = True
content = ""
in_thought = True
else:
content = ""
in_thought = False
orig_thinking = False
rollback_len = len(agent.history)

if msg.sender_id != "user":
sender_type = self.agent_types.get(msg.sender_id, "unknown")
prefix = (
f"Agent {sender_type} (id: {msg.sender_id}) sent the following message:\n"
)
role = settings.incoming_role
agent.history.append({"role": role, "content": prefix + content})
content = ""
else:
agent.history.append({"role": "user", "content": content})
content = ""

try:
for chunk in agent.respond(content):
yield agent.manifest.id, chunk
except openai.OpenAIError as e:
self.queue.insert(0, msg)
agent.history = agent.history[:rollback_len]
yield agent.manifest.id, {"error": str(e), "body": getattr(e, "body", None)}
return
1 change: 1 addition & 0 deletions afk/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ class Settings:
def __init__(self) -> None:
self.debug = get_env("AFK_DEBUG", "false").lower() == "true"
self.default_model = get_env("AFK_MODEL", "llama-3.3-70b-versatile")
self.incoming_role = get_env("AFK_INCOMING_ROLE", "user")


settings = Settings()
Loading