fix(realtime): make AsyncRealtimeChannel.subscribe() await server acknowledgement#1394
fix(realtime): make AsyncRealtimeChannel.subscribe() await server acknowledgement#1394kelvinvelasquez-SDE wants to merge 4 commits intosupabase:mainfrom
Conversation
📝 WalkthroughWalkthroughSubscribe now awaits server confirmation using a Python Future and optional timeout; an internal callback resolves or rejects the Future based on SUBSCRIBED, CHANNEL_ERROR, or timeout. Join/config payload construction and server binding reconciliation were adjusted. Tests added/updated for success, error, and timeout flows. Changes
Sequence Diagram(s)sequenceDiagram
participant Client
participant Channel as AsyncRealtimeChannel
participant Future as SubscriptionFuture
participant Server as RealtimeServer
Client->>Channel: subscribe(callback, timeout)
Channel->>Future: create Future
Channel->>Channel: setup _internal_callback
Channel->>Server: send join_push (join/config)
activate Channel
Channel->>Future: await (timeout)
deactivate Channel
alt Server responds SUBSCRIBED and bindings ok
Server->>Channel: join_push SUCCESS (ReplyPostgresChanges)
Channel->>Channel: _internal_callback(SUBSCRIBED)
Channel->>Future: set_result(channel)
Future-->>Client: returns AsyncRealtimeChannel
else Server responds ERROR or binding mismatch
Server->>Channel: join_push ERROR or invalid bindings
Channel->>Channel: _internal_callback(CHANNEL_ERROR, exception)
Channel->>Future: set_exception(exception)
Future-->>Client: raises Exception
else Timeout
Future->>Channel: wait_for times out
Channel->>Future: cancel / set_exception(TimeoutError)
Future-->>Client: raises TimeoutError
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Suggested reviewers
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/realtime/src/realtime/_async/channel.py`:
- Around line 240-276: In on_join_push_ok, don't let an empty or None
server_postgres_changes silently clear client callbacks; after retrieving
server_postgres_changes check if self.postgres_changes_callbacks is non-empty
but server_postgres_changes is empty/None or has a different length/content, and
in that case call asyncio.create_task(self.unsubscribe()), invoke
_internal_callback(RealtimeSubscribeStates.CHANNEL_ERROR, Exception("mismatch
between server and client bindings for postgres changes")), and return;
otherwise proceed with the existing reconciliation loop that compares
server_binding fields to each postgres_callback and builds new_postgres_bindings
before assigning self.postgres_changes_callbacks and calling
_internal_callback(RealtimeSubscribeStates.SUBSCRIBED, None).
…client has bindings
There was a problem hiding this comment.
Actionable comments posted: 3
🤖 Fix all issues with AI agents
In `@src/realtime/src/realtime/_async/channel.py`:
- Around line 197-210: The _internal_callback function can exit early if the
user-supplied callback raises, causing subscribe() to hang; wrap the callback
invocation in a try/except block around the call to callback(status, err) inside
_internal_callback so any exception from the user callback is caught and ignored
(or captured separately) and does not prevent the subsequent future resolution
logic for RealtimeSubscribeStates.SUBSCRIBED, CHANNEL_ERROR, or TIMED_OUT;
ensure you still use the original err when calling future.set_exception for
CHANNEL_ERROR and that future.set_result/future.set_exception always runs even
if the callback raised.
- Around line 305-314: The timeout handler after await asyncio.wait_for(future,
timeout=wait_timeout) cancels the python future but leaves the channel's join
push active, allowing late ACKs to mark the channel joined; update the except
asyncio.TimeoutError block in the subscribe/join routine to also locate the join
push object created earlier (e.g., the local variable often named push or
join_push) and explicitly clean it up before raising: if the push exists and is
not yet resolved, call its cleanup API (e.g., unsubscribe(), cancel(), or
destroy() depending on the Push class implementation) or send a
leave/unsubscribe on the channel, then cancel the future and raise the
TimeoutError as before. Ensure you reference the same push variable used when
creating the join push so the join is properly torn down on python-level
timeouts.
- Around line 182-185: The push() method's docstring wrongly states the timeout
is in milliseconds; update the docstring to state timeout is in seconds to match
AsyncPush (which uses asyncio.sleep(self.timeout)) and the module's socket
timeout convention, and ensure any related mentions (e.g., in AsyncPush or
Channel.push references) consistently document "seconds" for the timeout
parameter.
…ck errors, clean up join_push on timeout, fix docstring
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@src/realtime/src/realtime/_async/channel.py`:
- Around line 310-322: The timeout selection uses a falsy check so a
caller-passed 0 is ignored; change the logic to use an explicit None check
(e.g., set wait_timeout = timeout if timeout is not None else self.timeout) in
the method in realtime/_async/channel.py where wait_timeout is computed (the
block referencing future, self.timeout, and self.join_push.destroy()). This
preserves intentional 0 or other falsy timeouts while still defaulting to
self.timeout when timeout is not provided.
- Around line 245-291: The on_join_push_ok handler does not detect when
server_postgres_changes contains more bindings than the client registered
(postgres_changes_callbacks), so add an explicit length check and treat any size
mismatch as an error: inside on_join_push_ok (the function handling the join
reply), after reading server_postgres_changes and before assigning
self.postgres_changes_callbacks/new_postgres_bindings, compare
len(server_postgres_changes) to len(self.postgres_changes_callbacks) (or
len(new_postgres_bindings) vs original callbacks) and if server has extra
entries call asyncio.create_task(self.unsubscribe()) and invoke
_internal_callback(RealtimeSubscribeStates.CHANNEL_ERROR, Exception(...)) to
surface the mismatch, keeping the existing mismatch message and flow used
elsewhere in this function.
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/realtime/src/realtime/_async/channel.py`:
- Around line 223-227: The bug is that the bound method
self.presence._has_callback_attached is referenced instead of called, so
presence_enabled is always truthy; update the assignment in the Channel (or
relevant class) where presence_enabled is computed to call the method
(self.presence._has_callback_attached()) and combine its boolean result with
presence.get("enabled", False) before setting presence["enabled"]; ensure you
reference the existing symbols presence_enabled,
self.presence._has_callback_attached, and presence in your fix so behavior only
enables presence when the callback is actually attached or the config enables
it.
| presence_enabled = self.presence._has_callback_attached or presence.get( | ||
| "enabled", False | ||
| ) | ||
| presence["enabled"] = presence_enabled | ||
|
|
There was a problem hiding this comment.
Call _has_callback_attached(); current code always enables presence.
Line 223 references the bound method instead of invoking it, so presence_enabled is always truthy and presence gets enabled for all channels.
🐛 Proposed fix
- presence_enabled = self.presence._has_callback_attached or presence.get(
+ presence_enabled = self.presence._has_callback_attached() or presence.get(
"enabled", False
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| presence_enabled = self.presence._has_callback_attached or presence.get( | |
| "enabled", False | |
| ) | |
| presence["enabled"] = presence_enabled | |
| presence_enabled = self.presence._has_callback_attached() or presence.get( | |
| "enabled", False | |
| ) | |
| presence["enabled"] = presence_enabled |
🤖 Prompt for AI Agents
In `@src/realtime/src/realtime/_async/channel.py` around lines 223 - 227, The bug
is that the bound method self.presence._has_callback_attached is referenced
instead of called, so presence_enabled is always truthy; update the assignment
in the Channel (or relevant class) where presence_enabled is computed to call
the method (self.presence._has_callback_attached()) and combine its boolean
result with presence.get("enabled", False) before setting presence["enabled"];
ensure you reference the existing symbols presence_enabled,
self.presence._has_callback_attached, and presence in your fix so behavior only
enables presence when the callback is actually attached or the config enables
it.
🚀 Description
This PR addresses a critical race condition in the
realtime-pyclient whereAsyncRealtimeChannel.subscribe()would return immediately without waiting for the server's acknowledgement (ACK).The Problem
Previously,
await channel.subscribe()initiated the subscription request generally but did not await confirmation. This meant that subsequent operations (like broadcasting a message) could fail or be dropped if executed immediately after "subscribing", because the channel was not yet actually joined on the server side.The Solution
I have enhanced
subscribe()to be fully awaitable and blocking (asynchronously) until a definitive response is received from the server.Key Implementation Details:
asyncio.Futurethat tracks the subscription state.SUBSCRIBEDand rejects onCHANNEL_ERRORorTIMED_OUT.timeoutparameter tosubscribe(), allowing callers to override the default socket timeout.callbackparameter is still supported and functions as before, ensuring no breaking changes for users relying on the callback pattern.Type of Change
🔗 Related Issue
Fixes #1209
📸 Screenshots (if UI related)
N/A
✅ Checklist:
🧪 Testing Strategy
tests/test_issue_1209.pycovering:tests/test_connection.pyto ensure existing tests correctly simulate the server ACK, preventing regressions in the test suite.Summary by CodeRabbit
New Features
Bug Fixes
Tests