Skip to content

Feature: Event Emitter Module (@OnEvent, EventEmitter, decoupled pub/sub) #119

@ItayTheDar

Description

@ItayTheDar

Overview

PyNest has no event system. When one service needs to notify another that something happened (e.g., UserCreated, OrderShipped), developers currently call methods directly — creating tight coupling between modules that should be independent.

This feature request proposes an EventEmitterModule modeled after NestJS's @nestjs/event-emitter, enabling decoupled pub/sub within the application process.


Motivation

```python

Today — tight coupling:

@Injectable
class OrderService:
def init(self, email_service: EmailService, audit_service: AuditService):
self.email = email_service
self.audit = audit_service

async def create_order(self, data: CreateOrderDto):
    order = await self.repo.create(data)
    await self.email.send_confirmation(order)  # direct call
    await self.audit.log(order)                # direct call
    return order

With EventEmitter — zero coupling:

@Injectable
class OrderService:
def init(self, event_emitter: EventEmitter):
self.events = event_emitter

async def create_order(self, data: CreateOrderDto):
    order = await self.repo.create(data)
    self.events.emit("order.created", OrderCreatedEvent(order))
    return order

EmailService and AuditService react independently:

@Injectable
class EmailService:
@onevent("order.created")
async def handle_order_created(self, event: OrderCreatedEvent):
await self.send_confirmation(event.order)
```


Proposed API

EventEmitterModule.for_root()

```python
@module(
imports=[
EventEmitterModule.for_root(
wildcard=True, # support "order.*" patterns
delimiter=".", # event name delimiter
global_=True, # make EventEmitter injectable everywhere
max_listeners=10,
ignore_errors=False,
)
],
...
)
class AppModule:
pass
```

EventEmitter — injectable service

```python
@Injectable
class OrderService:
def init(self, event_emitter: EventEmitter):
self.events = event_emitter

async def create_order(self, dto: CreateOrderDto):
    order = await self.repo.create(dto)
    self.events.emit("order.created", OrderCreatedEvent(order=order))
    return order

```

@OnEvent(event_name) — listener decorator

```python
from nest.common.events import OnEvent

@Injectable
class NotificationService:

@OnEvent("order.created")
async def on_order_created(self, event: OrderCreatedEvent):
    await self.send_email(event.order.user_email)

@OnEvent("order.*")           # wildcard — fires for any order event
async def on_any_order(self, event: BaseOrderEvent):
    await self.audit_log(event)

@OnEvent("user.created", once=True)   # fires only once
async def on_first_user(self, event: UserCreatedEvent):
    await self.send_welcome_campaign()

```


Event Classes (Recommended Pattern)

```python
from dataclasses import dataclass
from nest.common.events import BaseEvent

@DataClass
class OrderCreatedEvent(BaseEvent):
order_id: str
user_id: str
total: float
created_at: datetime = field(default_factory=datetime.utcnow)
```


Async vs Sync Emission

Fire-and-forget (default)

```python
self.events.emit("order.created", event) # schedules as asyncio task
```

Await all listeners

```python
await self.events.emit_async("order.created", event) # awaits all handlers
```

Synchronous (for non-async contexts)

```python
self.events.emit_sync("order.created", event) # runs synchronous handlers only
```


Error Handling

```python
EventEmitterModule.for_root(
ignore_errors=False # re-raise errors from listeners (default: True = swallow)
)

Per-listener error handling:

@onevent("order.created")
async def handle(self, event: OrderCreatedEvent):
try:
await self.notify(event)
except EmailException as e:
logger.error(f"Email failed: {e}") # don't crash other listeners
```


Wildcard Patterns

```python

Exact:

@onevent("user.created")

Single-level wildcard (matches "order.created", "order.cancelled"):

@onevent("order.*")

Multi-level wildcard (matches "user.profile.updated", "user.address.updated"):

@onevent("user.**")
```


EventEmitter Full Interface

```python
class EventEmitter:
def emit(self, event: str, *payload) -> bool: ...
async def emit_async(self, event: str, *payload) -> None: ...
def emit_sync(self, event: str, *payload) -> bool: ...
def on(self, event: str, listener: callable) -> None: ...
def once(self, event: str, listener: callable) -> None: ...
def off(self, event: str, listener: callable) -> None: ...
def remove_all_listeners(self, event: str = None) -> None: ...
def listener_count(self, event: str) -> int: ...
def listeners(self, event: str) -> list[callable]: ...
```


Acceptance Criteria

  • EventEmitterModule with for_root(wildcard, delimiter, global_, max_listeners, ignore_errors) in nest/common/events/
  • EventEmitter injectable service with emit(), emit_async(), emit_sync(), on(), once(), off()
  • @OnEvent(event_name, once=False) method decorator on @Injectable classes
  • Wildcard pattern support (* single-level, ** multi-level) when wildcard=True
  • once=True on @OnEvent auto-removes listener after first call
  • Async listeners properly scheduled as asyncio tasks (non-blocking emit)
  • emit_async() awaits all listeners before returning
  • ignore_errors option controls whether listener exceptions propagate
  • global_=True makes EventEmitter injectable without importing EventEmitterModule
  • BaseEvent dataclass with common fields (event_name, timestamp)
  • Unit tests for emit/listen, wildcards, once, error handling
  • Documentation page with the order-service decoupling example

Agentic Coding Value

This feature is especially valuable in agentic architectures where multiple AI-driven services need to react to the same events (e.g., tool.called, agent.response_received, context.updated) without forming explicit dependency chains. An agent-generated service can subscribe to events from any other module without modifying the emitting module.


Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions