Overview
PyNest has no event system. When one service needs to notify another that something happened (e.g., UserCreated, OrderShipped), developers currently call methods directly — creating tight coupling between modules that should be independent.
This feature request proposes an EventEmitterModule modeled after NestJS's @nestjs/event-emitter, enabling decoupled pub/sub within the application process.
Motivation
```python
Today — tight coupling:
@Injectable
class OrderService:
def init (self, email_service: EmailService, audit_service: AuditService):
self.email = email_service
self.audit = audit_service
async def create_order(self, data: CreateOrderDto):
order = await self.repo.create(data)
await self.email.send_confirmation(order) # direct call
await self.audit.log(order) # direct call
return order
With EventEmitter — zero coupling:
@Injectable
class OrderService:
def init (self, event_emitter: EventEmitter):
self.events = event_emitter
async def create_order(self, data: CreateOrderDto):
order = await self.repo.create(data)
self.events.emit("order.created", OrderCreatedEvent(order))
return order
EmailService and AuditService react independently:
@Injectable
class EmailService:
@onevent ("order.created")
async def handle_order_created(self, event: OrderCreatedEvent):
await self.send_confirmation(event.order)
```
Proposed API
EventEmitterModule.for_root()
```python
@module (
imports=[
EventEmitterModule.for_root(
wildcard=True, # support "order.*" patterns
delimiter=".", # event name delimiter
global_=True, # make EventEmitter injectable everywhere
max_listeners=10,
ignore_errors=False,
)
],
...
)
class AppModule:
pass
```
EventEmitter — injectable service
```python
@Injectable
class OrderService:
def init (self, event_emitter: EventEmitter):
self.events = event_emitter
async def create_order(self, dto: CreateOrderDto):
order = await self.repo.create(dto)
self.events.emit("order.created", OrderCreatedEvent(order=order))
return order
```
@OnEvent(event_name) — listener decorator
```python
from nest.common.events import OnEvent
@Injectable
class NotificationService:
@OnEvent("order.created")
async def on_order_created(self, event: OrderCreatedEvent):
await self.send_email(event.order.user_email)
@OnEvent("order.*") # wildcard — fires for any order event
async def on_any_order(self, event: BaseOrderEvent):
await self.audit_log(event)
@OnEvent("user.created", once=True) # fires only once
async def on_first_user(self, event: UserCreatedEvent):
await self.send_welcome_campaign()
```
Event Classes (Recommended Pattern)
```python
from dataclasses import dataclass
from nest.common.events import BaseEvent
@DataClass
class OrderCreatedEvent(BaseEvent):
order_id: str
user_id: str
total: float
created_at: datetime = field(default_factory=datetime.utcnow)
```
Async vs Sync Emission
Fire-and-forget (default)
```python
self.events.emit("order.created", event) # schedules as asyncio task
```
Await all listeners
```python
await self.events.emit_async("order.created", event) # awaits all handlers
```
Synchronous (for non-async contexts)
```python
self.events.emit_sync("order.created", event) # runs synchronous handlers only
```
Error Handling
```python
EventEmitterModule.for_root(
ignore_errors=False # re-raise errors from listeners (default: True = swallow)
)
Per-listener error handling:
@onevent ("order.created")
async def handle(self, event: OrderCreatedEvent):
try:
await self.notify(event)
except EmailException as e:
logger.error(f"Email failed: {e}") # don't crash other listeners
```
Wildcard Patterns
```python
Exact:
@onevent ("user.created")
Single-level wildcard (matches "order.created", "order.cancelled"):
@onevent ("order.*")
Multi-level wildcard (matches "user.profile.updated", "user.address.updated"):
@onevent ("user.**")
```
EventEmitter Full Interface
```python
class EventEmitter:
def emit(self, event: str, *payload) -> bool: ...
async def emit_async(self, event: str, *payload) -> None: ...
def emit_sync(self, event: str, *payload) -> bool: ...
def on(self, event: str, listener: callable) -> None: ...
def once(self, event: str, listener: callable) -> None: ...
def off(self, event: str, listener: callable) -> None: ...
def remove_all_listeners(self, event: str = None) -> None: ...
def listener_count(self, event: str) -> int: ...
def listeners(self, event: str) -> list[callable]: ...
```
Acceptance Criteria
EventEmitterModule with for_root(wildcard, delimiter, global_, max_listeners, ignore_errors) in nest/common/events/
EventEmitter injectable service with emit(), emit_async(), emit_sync(), on(), once(), off()
@OnEvent(event_name, once=False) method decorator on @Injectable classes
Wildcard pattern support (* single-level, ** multi-level) when wildcard=True
once=True on @OnEvent auto-removes listener after first call
Async listeners properly scheduled as asyncio tasks (non-blocking emit)
emit_async() awaits all listeners before returning
ignore_errors option controls whether listener exceptions propagate
global_=True makes EventEmitter injectable without importing EventEmitterModule
BaseEvent dataclass with common fields (event_name, timestamp)
Unit tests for emit/listen, wildcards, once, error handling
Documentation page with the order-service decoupling example
Agentic Coding Value
This feature is especially valuable in agentic architectures where multiple AI-driven services need to react to the same events (e.g., tool.called, agent.response_received, context.updated) without forming explicit dependency chains. An agent-generated service can subscribe to events from any other module without modifying the emitting module.
Related
Overview
PyNest has no event system. When one service needs to notify another that something happened (e.g.,
UserCreated,OrderShipped), developers currently call methods directly — creating tight coupling between modules that should be independent.This feature request proposes an
EventEmitterModulemodeled after NestJS's@nestjs/event-emitter, enabling decoupled pub/sub within the application process.Motivation
```python
Today — tight coupling:
@Injectable
class OrderService:
def init(self, email_service: EmailService, audit_service: AuditService):
self.email = email_service
self.audit = audit_service
With EventEmitter — zero coupling:
@Injectable
class OrderService:
def init(self, event_emitter: EventEmitter):
self.events = event_emitter
EmailService and AuditService react independently:
@Injectable
class EmailService:
@onevent("order.created")
async def handle_order_created(self, event: OrderCreatedEvent):
await self.send_confirmation(event.order)
```
Proposed API
EventEmitterModule.for_root()```python
@module(
imports=[
EventEmitterModule.for_root(
wildcard=True, # support "order.*" patterns
delimiter=".", # event name delimiter
global_=True, # make EventEmitter injectable everywhere
max_listeners=10,
ignore_errors=False,
)
],
...
)
class AppModule:
pass
```
EventEmitter— injectable service```python
@Injectable
class OrderService:
def init(self, event_emitter: EventEmitter):
self.events = event_emitter
```
@OnEvent(event_name)— listener decorator```python
from nest.common.events import OnEvent
@Injectable
class NotificationService:
```
Event Classes (Recommended Pattern)
```python
from dataclasses import dataclass
from nest.common.events import BaseEvent
@DataClass
class OrderCreatedEvent(BaseEvent):
order_id: str
user_id: str
total: float
created_at: datetime = field(default_factory=datetime.utcnow)
```
Async vs Sync Emission
Fire-and-forget (default)
```python
self.events.emit("order.created", event) # schedules as asyncio task
```
Await all listeners
```python
await self.events.emit_async("order.created", event) # awaits all handlers
```
Synchronous (for non-async contexts)
```python
self.events.emit_sync("order.created", event) # runs synchronous handlers only
```
Error Handling
```python
EventEmitterModule.for_root(
ignore_errors=False # re-raise errors from listeners (default: True = swallow)
)
Per-listener error handling:
@onevent("order.created")
async def handle(self, event: OrderCreatedEvent):
try:
await self.notify(event)
except EmailException as e:
logger.error(f"Email failed: {e}") # don't crash other listeners
```
Wildcard Patterns
```python
Exact:
@onevent("user.created")
Single-level wildcard (matches "order.created", "order.cancelled"):
@onevent("order.*")
Multi-level wildcard (matches "user.profile.updated", "user.address.updated"):
@onevent("user.**")
```
EventEmitterFull Interface```python
class EventEmitter:
def emit(self, event: str, *payload) -> bool: ...
async def emit_async(self, event: str, *payload) -> None: ...
def emit_sync(self, event: str, *payload) -> bool: ...
def on(self, event: str, listener: callable) -> None: ...
def once(self, event: str, listener: callable) -> None: ...
def off(self, event: str, listener: callable) -> None: ...
def remove_all_listeners(self, event: str = None) -> None: ...
def listener_count(self, event: str) -> int: ...
def listeners(self, event: str) -> list[callable]: ...
```
Acceptance Criteria
EventEmitterModulewithfor_root(wildcard, delimiter, global_, max_listeners, ignore_errors)innest/common/events/EventEmitterinjectable service withemit(),emit_async(),emit_sync(),on(),once(),off()@OnEvent(event_name, once=False)method decorator on@Injectableclasses*single-level,**multi-level) whenwildcard=Trueonce=Trueon@OnEventauto-removes listener after first callasynciotasks (non-blocking emit)emit_async()awaits all listeners before returningignore_errorsoption controls whether listener exceptions propagateglobal_=TruemakesEventEmitterinjectable without importingEventEmitterModuleBaseEventdataclass with common fields (event_name,timestamp)Agentic Coding Value
This feature is especially valuable in agentic architectures where multiple AI-driven services need to react to the same events (e.g.,
tool.called,agent.response_received,context.updated) without forming explicit dependency chains. An agent-generated service can subscribe to events from any other module without modifying the emitting module.Related