"""Event module."""
import asyncio
import inspect
from dataclasses import dataclass, field
from typing import (
Any,
Awaitable,
Callable,
Dict,
List,
Optional,
Protocol,
TypeVar,
runtime_checkable,
)
from pepperpy.core import PepperpyError
from pepperpy.module import BaseModule, ModuleConfig
[docs]
class EventError(PepperpyError):
"""Event error."""
[docs]
def __init__(
self,
message: str,
context: Optional[Dict[str, Any]] = None,
cause: Optional[Exception] = None,
event_type: Optional[str] = None,
event_id: Optional[str] = None,
) -> None:
"""Initialize event error.
Args:
message: Error message
context: Error context
cause: Error cause
event_type: Event type
event_id: Event ID
"""
super().__init__(message, context, cause)
self.event_type = event_type
self.event_id = event_id
T = TypeVar("T")
[docs]
@dataclass
class Event:
"""Event class."""
name: str
data: Any = None
metadata: Dict[str, Any] = field(default_factory=dict)
def __post_init__(self) -> None:
"""Validate event."""
if not isinstance(self.name, str):
raise ValueError("Event name must be a string")
if not self.name:
raise ValueError("Event name cannot be empty")
[docs]
@dataclass
class EventListener:
"""Event listener class."""
event_name: str
handler: Callable[[Event], Awaitable[None]]
priority: int = 0
[docs]
@runtime_checkable
class EventMiddleware(Protocol):
"""Event middleware protocol."""
[docs]
async def before_event(self, event: Event) -> None:
"""Called before event is processed.
Args:
event: Event being processed
"""
...
[docs]
async def after_event(self, event: Event) -> None:
"""Called after event is processed.
Args:
event: Event that was processed
"""
...
[docs]
@dataclass
class EventBusConfig(ModuleConfig):
"""Event bus configuration."""
name: str = "event_bus"
max_listeners: Optional[int] = None
metadata: Dict[str, Any] = field(default_factory=dict)
[docs]
class EventBus(BaseModule[EventBusConfig]):
"""Event bus implementation."""
[docs]
def __init__(self, config: Optional[EventBusConfig] = None) -> None:
"""Initialize event bus.
Args:
config: Optional event bus configuration
"""
super().__init__(config or EventBusConfig())
self._handlers: Dict[str, List[EventListener]] = {}
self._middleware: List[EventMiddleware] = []
self._lock = asyncio.Lock()
self._stats: Dict[str, int] = {"events_processed": 0}
async def _setup(self) -> None:
"""Set up event bus."""
pass
async def _teardown(self) -> None:
"""Clean up event bus."""
await self.clear()
[docs]
async def add_listener(
self,
event_name: str,
handler: Callable[[Event], Awaitable[None]],
priority: int = 0,
) -> None:
"""Add event listener.
Args:
event_name: Event name
handler: Event handler
priority: Handler priority (higher priority handlers are called first)
Raises:
EventError: If handler is not async or max listeners reached
"""
if not inspect.iscoroutinefunction(handler):
raise EventError(
"Event handler must be async",
{"event_name": event_name},
)
async with self._lock:
if event_name not in self._handlers:
self._handlers[event_name] = []
if (
self.config.max_listeners is not None
and len(self._handlers[event_name]) >= self.config.max_listeners
):
raise EventError(
"Max listeners reached",
{
"event_name": event_name,
"max_listeners": self.config.max_listeners,
},
)
listener = EventListener(
event_name=event_name, handler=handler, priority=priority
)
self._handlers[event_name].append(listener)
self._handlers[event_name].sort(key=lambda x: x.priority, reverse=True)
[docs]
async def remove_listener(
self, event_name: str, handler: Callable[[Event], Awaitable[None]]
) -> None:
"""Remove event listener.
Args:
event_name: Event name
handler: Event handler
"""
async with self._lock:
if event_name in self._handlers:
self._handlers[event_name] = [
listener
for listener in self._handlers[event_name]
if listener.handler != handler
]
if not self._handlers[event_name]:
del self._handlers[event_name]
[docs]
async def add_middleware(self, middleware: EventMiddleware) -> None:
"""Add event middleware.
Args:
middleware: Event middleware
"""
self._middleware.append(middleware)
[docs]
async def remove_middleware(self, middleware: EventMiddleware) -> None:
"""Remove event middleware.
Args:
middleware: Event middleware
"""
if middleware in self._middleware:
self._middleware.remove(middleware)
[docs]
async def emit(self, event: Event) -> None:
"""Emit event.
Args:
event: Event to emit
"""
async with self._lock:
listeners = self._handlers.get(event.name, []).copy()
if not listeners:
return
# Call middleware before event
for middleware in self._middleware:
await middleware.before_event(event)
# Call handlers
tasks = [listener.handler(event) for listener in listeners]
await asyncio.gather(*tasks)
# Call middleware after event
for middleware in self._middleware:
await middleware.after_event(event)
# Update stats
self._stats["events_processed"] += 1
[docs]
def get_listeners(self, event_name: str) -> List[EventListener]:
"""Get event listeners.
Args:
event_name: Event name
Returns:
List of event listeners
"""
return self._handlers.get(event_name, []).copy()
[docs]
def get_stats(self) -> Dict[str, Any]:
"""Get event bus stats.
Returns:
Event bus stats
"""
return dict(self._stats)
[docs]
async def clear(self) -> None:
"""Clear all event handlers and middleware."""
async with self._lock:
self._handlers.clear()
self._middleware.clear()
self._stats["events_processed"] = 0
__all__ = [
"Event",
"EventBus",
"EventBusConfig",
"EventError",
"EventListener",
"EventMiddleware",
]