Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
fastapi
pydantic
uvicorn
pyzmq
pytest
pytest-asyncio
httpx
Empty file.
Empty file.
95 changes: 95 additions & 0 deletions src/marketing_organism/agents/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
from abc import ABC, abstractmethod
import asyncio
from typing import Dict, Any, List
import uuid
import logging

from src.marketing_organism.exceptions import AgentExecutionError

logger = logging.getLogger(__name__)

class BaseAgent(ABC):
def __init__(self, agent_id: str = None, max_memory_events: int = 100):
self.agent_id = agent_id or str(uuid.uuid4())
self.memory: Dict[str, Any] = {}
self.state: str = "initialized"
self._running = False
self._task = None
self.event_queue = asyncio.Queue()
self.max_memory_events = max_memory_events
self._consecutive_errors = 0

async def perceive(self, event):
"""Called by event bus when subscribed events occur."""
await self.event_queue.put(event)

@abstractmethod
async def decide(self) -> Any:
"""Evaluate internal state and memory to decide next action."""
pass

@abstractmethod
async def act(self, action: Any):
"""Execute the decided action."""
pass

async def _loop(self):
"""Main agent perception-decision-action loop."""
while self._running:
try:
# Perceive: fetch new events
# Use a timeout to ensure we periodically evaluate state
try:
event = await asyncio.wait_for(self.event_queue.get(), timeout=1.0)
self._process_event(event)
self.event_queue.task_done()
except asyncio.TimeoutError:
pass

# Decide
action = await self.decide()

# Act
if action:
await self.act(action)

self._consecutive_errors = 0 # reset on success

except asyncio.CancelledError:
break
except Exception as e:
self._consecutive_errors += 1
backoff_time = min(60, (2 ** self._consecutive_errors))
logger.error(
f"AgentExecutionError: Error in agent loop for {self.agent_id}: {e}. Backing off for {backoff_time}s",
exc_info=True
)
await asyncio.sleep(backoff_time)

def _process_event(self, event):
"""Internal method to update memory based on perceived event."""
# Derived classes can override to format event for memory
if "recent_events" not in self.memory:
self.memory["recent_events"] = []
self.memory["recent_events"].append(event)

# Enforce memory eviction policy (FIFO based on configured limit)
if len(self.memory["recent_events"]) > self.max_memory_events:
self.memory["recent_events"] = self.memory["recent_events"][-self.max_memory_events:]

def start(self):
if not self._running:
self.state = "running"
self._running = True
self._task = asyncio.create_task(self._loop())

async def stop(self):
self.state = "stopped"
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
37 changes: 37 additions & 0 deletions src/marketing_organism/agents/lifecycle.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from typing import Dict, Type, Any, Optional
from .base import BaseAgent

class AgentManager:
def __init__(self):
self.active_agents: Dict[str, BaseAgent] = {}
self.performance_metrics: Dict[str, float] = {}
self.config: Dict[str, Any] = {}

def spawn_agent(self, agent_class: Type[BaseAgent], agent_id: Optional[str] = None, config: Optional[Dict[str, Any]] = None) -> BaseAgent:
"""Dynamically instantiates and starts an agent."""
agent = agent_class(agent_id=agent_id)
if config:
agent.memory.update(config)

agent.start()
self.active_agents[agent.agent_id] = agent
self.performance_metrics[agent.agent_id] = 1.0 # default starting performance

return agent

async def retire_agent(self, agent_id: str):
"""Gracefully stops and removes an underperforming or obsolete agent."""
if agent_id in self.active_agents:
agent = self.active_agents[agent_id]
await agent.stop()
del self.active_agents[agent_id]
if agent_id in self.performance_metrics:
del self.performance_metrics[agent_id]

def evaluate_agents(self, threshold: float = 0.5):
"""Identify agents below the performance threshold for potential retirement."""
underperforming = []
for agent_id, metric in self.performance_metrics.items():
if metric < threshold:
underperforming.append(agent_id)
return underperforming
Empty file.
111 changes: 111 additions & 0 deletions src/marketing_organism/event_bus/bus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import asyncio
import logging
import fnmatch
from typing import Callable, Dict, List

from src.marketing_organism.exceptions import EventBusError
from .events import BaseEvent

logger = logging.getLogger(__name__)

class TopicRouter:
def __init__(self):
self.subscriptions: Dict[str, List[Callable]] = {}

def subscribe(self, topic_pattern: str, callback: Callable):
if topic_pattern not in self.subscriptions:
self.subscriptions[topic_pattern] = []
self.subscriptions[topic_pattern].append(callback)

def unsubscribe(self, topic_pattern: str, callback: Callable):
if topic_pattern in self.subscriptions:
try:
self.subscriptions[topic_pattern].remove(callback)
except ValueError:
pass

def get_callbacks(self, topic: str) -> List[Callable]:
callbacks = []
for pattern, subs in self.subscriptions.items():
if fnmatch.fnmatch(topic, pattern):
callbacks.extend(subs)
return callbacks

class EventBus:
def __init__(self, dlq_max_size: int = 1000):
self.router = TopicRouter()
self.queue = asyncio.Queue()
self.dlq = asyncio.Queue(maxsize=dlq_max_size)
self._running = False
self._task = None

def subscribe(self, topic_pattern: str, callback: Callable):
self.router.subscribe(topic_pattern, callback)

def unsubscribe(self, topic_pattern: str, callback: Callable):
self.router.unsubscribe(topic_pattern, callback)

async def publish(self, topic: str, event: BaseEvent):
await self.queue.put((topic, event))

async def _process_events(self):
while self._running:
try:
topic, event = await self.queue.get()
callbacks = self.router.get_callbacks(topic)

# Execute callbacks concurrently
if callbacks:
tasks = []
for cb in callbacks:
if asyncio.iscoroutinefunction(cb):
# Wrap async callbacks to catch exceptions individually and send to DLQ
async def safe_cb(callback=cb, t=topic, e=event):
try:
await callback(t, e)
except Exception as err:
logger.error(f"EventBusError: Async callback for {t} failed: {err}", exc_info=True)
try:
self.dlq.put_nowait((t, e, str(err)))
except asyncio.QueueFull:
logger.warning("EventBusError: DLQ is full. Dropping failed async event.")
tasks.append(asyncio.create_task(safe_cb()))
else:
# If sync callback, just call it directly
try:
cb(topic, event)
except Exception as err_sync:
logger.error(f"EventBusError: Sync callback for {topic} failed: {err_sync}", exc_info=True)
try:
self.dlq.put_nowait((topic, event, str(err_sync)))
except asyncio.QueueFull:
logger.warning("EventBusError: DLQ is full. Dropping failed sync event.")

if tasks:
await asyncio.gather(*tasks, return_exceptions=True)

self.queue.task_done()
except asyncio.CancelledError:
break
except Exception as e:
logger.error(f"EventBusError: Critical failure in event loop: {e}", exc_info=True)
# Send totally failed items to DLQ if possible
try:
self.dlq.put_nowait(("unknown_topic", None, str(e)))
except (asyncio.QueueFull, NameError):
logger.warning("EventBusError: Unable to place critical loop failure in DLQ.")

def start(self):
if not self._running:
self._running = True
self._task = asyncio.create_task(self._process_events())

async def stop(self):
self._running = False
if self._task:
self._task.cancel()
try:
await self._task
except asyncio.CancelledError:
pass
self._task = None
48 changes: 48 additions & 0 deletions src/marketing_organism/event_bus/events.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import hashlib
import json
from pydantic import BaseModel, Field, model_validator
from typing import Any, Dict, Optional
import uuid
from datetime import datetime, timezone

class BaseEvent(BaseModel):
id: str = Field(default_factory=lambda: str(uuid.uuid4()))
timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
source: str
metadata: Dict[str, Any] = Field(default_factory=dict)
cryptographic_hash: str = Field(default="")

@model_validator(mode='after')
def compute_hash(self) -> 'BaseEvent':
if not self.cryptographic_hash:
# Create a deterministic representation for hashing
data_to_hash = {
"id": self.id,
"timestamp": self.timestamp.isoformat(),
"source": self.source,
"metadata": self.metadata
}
# For subclasses, add their specific fields to the hash
for field in self.model_fields.keys():
if field not in ["id", "timestamp", "source", "metadata", "cryptographic_hash"]:
data_to_hash[field] = getattr(self, field)

encoded = json.dumps(data_to_hash, sort_keys=True).encode('utf-8')
self.cryptographic_hash = hashlib.sha256(encoded).hexdigest()
return self

class PerformanceAnomalyEvent(BaseEvent):
metric: str
deviation: float
direction: str # e.g., "up", "down"
context: Optional[str] = None

class AudienceSignalEvent(BaseEvent):
signal_type: str
confidence: float
segment: str

class CapabilityGapEvent(BaseEvent):
gap_type: str
description: str
priority: int
Empty file.
71 changes: 71 additions & 0 deletions src/marketing_organism/evolution/genome.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import uuid
import random
import copy
import hashlib
import json
from typing import Dict, Any

class StrategyGenome:
def __init__(self, parameters: Dict[str, Any] = None, lineage: list = None):
self.genome_id = str(uuid.uuid4())
self.parameters = parameters or {}
# Core genes representation
self.genes = {
"objective_weights": self.parameters.get("objective_weights", [1.0, 0.5, 0.2]),
"audience_targeting": self.parameters.get("audience_targeting", ["segment_a"]),
"budget_allocation": self.parameters.get("budget_allocation", 100.0),
"adaptation_rate": self.parameters.get("adaptation_rate", 0.05)
}
self.fitness_history = []
self.lineage = lineage or []
self.cryptographic_hash = self._compute_hash()

def _compute_hash(self) -> str:
data = {
"genome_id": self.genome_id,
"genes": self.genes,
"lineage": self.lineage
}
encoded = json.dumps(data, sort_keys=True).encode('utf-8')
return hashlib.sha256(encoded).hexdigest()

def mutate(self, mutation_rate: float = 0.1):
"""Randomly alters a subset of parameters."""
mutated_genes = copy.deepcopy(self.genes)

# Simple point mutations
for i in range(len(mutated_genes["objective_weights"])):
if random.random() < mutation_rate:
mutated_genes["objective_weights"][i] += random.uniform(-0.1, 0.1)

if random.random() < mutation_rate:
mutated_genes["budget_allocation"] *= random.uniform(0.9, 1.1)

if random.random() < mutation_rate:
mutated_genes["adaptation_rate"] = max(0.01, mutated_genes["adaptation_rate"] + random.uniform(-0.02, 0.02))

offspring = StrategyGenome(parameters=mutated_genes, lineage=self.lineage + [self.cryptographic_hash])
return offspring

def crossover(self, other_genome: 'StrategyGenome') -> 'StrategyGenome':
"""Combines genes from self and another genome."""
child_genes = {}

# Pick traits from parent 1 or 2
for key in self.genes:
if random.random() < 0.5:
child_genes[key] = copy.deepcopy(self.genes[key])
else:
child_genes[key] = copy.deepcopy(other_genome.genes[key])

offspring = StrategyGenome(parameters=child_genes, lineage=[self.cryptographic_hash, other_genome.cryptographic_hash])
return offspring

def update_fitness(self, score: float):
self.fitness_history.append(score)

@property
def current_fitness(self) -> float:
if not self.fitness_history:
return 0.0
return sum(self.fitness_history[-5:]) / len(self.fitness_history[-5:])
Loading