Skip to content

Feature: WebSocket Gateway (@WebSocketGateway, @SubscribeMessage, real-time communication) #121

@ItayTheDar

Description

@ItayTheDar

Overview

PyNest has no WebSocket support. Issue #104 shows the community is actively looking for it. Real-time bi-directional communication is a table-stakes feature for modern APIs — especially for agentic systems where agents need to stream responses, receive live events, and maintain persistent sessions.

This feature request proposes NestJS-compatible WebSocket Gateways as a first-class PyNest primitive, built on FastAPI's native WebSocket support and optionally python-socketio.


Motivation

Use cases that require WebSockets:

  • Streaming AI model responses to clients in real time
  • Live dashboards with push updates
  • Multi-agent coordination (agent A notifies agent B via WS)
  • Chat applications
  • Collaborative editing
  • Background job progress tracking

Without WebSocket support, PyNest developers must add raw FastAPI WebSocket routes outside the module system, losing DI, guards, and structured event handling.


Proposed API

@WebSocketGateway(port?, namespace?, options?)

```python
from nest.websockets import WebSocketGateway, SubscribeMessage, ConnectedSocket, MessageBody
from nest.websockets import OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect

@WebSocketGateway(namespace="/chat")
class ChatGateway(OnGatewayConnection, OnGatewayDisconnect):

def __init__(self, chat_service: ChatService):  # DI works!
    self.service = chat_service

async def on_connection(self, client: WebSocket) -> None:
    print(f"Client connected: {client.client}")

async def on_disconnect(self, client: WebSocket) -> None:
    print(f"Client disconnected")

@SubscribeMessage("send_message")
async def handle_message(
    self,
    @MessageBody() data: SendMessageDto,
    @ConnectedSocket() client: WebSocket,
) -> MessageAckDto:
    message = await self.service.save(data)
    return MessageAckDto(id=message.id, status="delivered")

```

@SubscribeMessage(event) — handle incoming WS events

```python
@SubscribeMessage("join_room")
async def handle_join(self, @MessageBody() room: JoinRoomDto, @ConnectedSocket() client: WebSocket):
await self.rooms.add(client, room.name)
await client.send_json({"event": "joined", "room": room.name})

@SubscribeMessage("ping")
async def handle_ping(self, @ConnectedSocket() client: WebSocket):
return {"event": "pong", "data": {}} # auto-sent back to client
```


Lifecycle Interfaces

```python
from nest.websockets import OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect

class OnGatewayInit:
async def after_init(self, server: WebSocketServer) -> None: ...

class OnGatewayConnection:
async def on_connection(self, client: WebSocket, *args) -> None: ...

class OnGatewayDisconnect:
async def on_disconnect(self, client: WebSocket) -> None: ...
```


WebSocketServer — broadcast to rooms/all clients

```python
from nest.websockets import WebSocketGateway, WebSocketServer

@WebSocketGateway(namespace="/events")
class EventsGateway(OnGatewayInit):
server: WebSocketServer # auto-injected after after_init()

async def after_init(self, server: WebSocketServer):
    self.server = server

async def broadcast_update(self, payload: dict):
    await self.server.emit("update", payload)           # all clients

async def send_to_room(self, room: str, payload: dict):
    await self.server.to(room).emit("update", payload)  # room only

async def send_to_client(self, client_id: str, payload: dict):
    await self.server.to(client_id).emit("update", payload)

```


Guards on WebSocket Events

```python
@WebSocketGateway(namespace="/private")
@UseGuards(WsJwtGuard) # guard applies to all messages
class PrivateGateway:

@SubscribeMessage("secret")
@UseGuards(AdminWsGuard)    # route-level guard
async def handle_secret(self, @MessageBody() data: dict):
    ...

```

WS-specific guards receive a WsArgumentsHost (via context.switch_to_ws()):

```python
class WsJwtGuard(BaseGuard):
async def can_activate(self, context: ExecutionContext) -> bool:
ws_host = context.switch_to_ws()
client = ws_host.get_client()
token = client.headers.get("authorization")
return await self.jwt.verify(token)
```


Module Registration

```python
@module(
providers=[ChatService, ChatGateway], # Gateway is a provider
controllers=[],
)
class ChatModule:
pass
```

Gateways are registered as providers — they participate in DI and their WebSocket routes are mounted alongside the HTTP routes automatically.


Streaming Support (Agent-Friendly)

```python
@WebSocketGateway(namespace="/ai")
class AgentGateway:

def __init__(self, llm_service: LLMService):
    self.llm = llm_service

@SubscribeMessage("prompt")
async def handle_prompt(
    self,
    @MessageBody() data: PromptDto,
    @ConnectedSocket() client: WebSocket,
):
    # Stream tokens back to the client as they arrive
    async for token in self.llm.stream(data.prompt):
        await client.send_json({"event": "token", "data": token})
    await client.send_json({"event": "done", "data": {}})

```


Transport Adapters

Adapter Protocol Use Case
FastAPIAdapter Native WS (RFC 6455) Default, no extra deps
SocketIOAdapter Socket.IO Browser clients, rooms, reconnect

```python

Default (FastAPI native WS):

@WebSocketGateway(namespace="/ws")
class MyGateway: ...

Socket.IO adapter (optional):

@WebSocketGateway(namespace="/socket.io", adapter=SocketIOAdapter)
class MySocketGateway: ...
```


pynest generate gateway CLI

```bash
pynest generate gateway chat

Creates:

chat/

chat.gateway.py

chat.module.py (with gateway registered)

```


Acceptance Criteria

  • @WebSocketGateway(namespace?, options?) class decorator in nest/websockets/
  • @SubscribeMessage(event) method decorator
  • @ConnectedSocket() and @MessageBody() param decorators for WS handlers
  • OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect lifecycle interfaces
  • WebSocketServer with emit(), to(room).emit(), broadcast() API
  • Gateways register as providers and participate in DI
  • Guards work on WS events (@UseGuards, context.switch_to_ws(), WsArgumentsHost)
  • Interceptors work on WS events (Feature update docs #6 integration)
  • Exception Filters work on WS events (Feature update readme #1 integration)
  • Native FastAPI WebSocket adapter (default, zero extra deps)
  • Optional Socket.IO adapter (pip install pynest-api[socketio])
  • Token streaming pattern documented and tested
  • pynest generate gateway <name> CLI command
  • Unit tests for message routing, guards, and lifecycle hooks
  • Integration test with an async WS client
  • Full documentation page

Dependencies


Related

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions