Skip to content

feat: add Telnyx STT and TTS plugins#4665

Open
fmv1992 wants to merge 11 commits intolivekit:mainfrom
team-telnyx:add_telnyx_vendor
Open

feat: add Telnyx STT and TTS plugins#4665
fmv1992 wants to merge 11 commits intolivekit:mainfrom
team-telnyx:add_telnyx_vendor

Conversation

@fmv1992
Copy link

@fmv1992 fmv1992 commented Jan 30, 2026

PR Description

Summary

Adds support for Telnyx as a supported vendor for Speech-to-Text (STT) and Text-to-Speech (TTS) within the LiveKit Agents framework. This includes a new dedicated plugin package and an example voice agent demonstrating the integration.

New Features

  • Telnyx STT Plugin: Implements streaming speech-to-text using Telnyx's standalone API, supporting interim and final results with configurable transcription engines (Telnyx, Google, Deepgram, Azure).
  • Telnyx TTS Plugin: Implements a streaming text-to-speech plugin supporting high-definition NaturalHD voices and PCM/MP3 decoding.
  • Example Voice Agent: Provides a complete reference implementation in examples/voice_agents/telnyx_voice_agent.py using Telnyx for both STT and TTS alongside OpenAI for reasoning.

Implementation Details

  • Added livekit-plugins-telnyx package under livekit-plugins/.
  • Created SessionManager in common.py to handle shared aiohttp client sessions and API key resolution.
  • Implemented custom WAV header generation for STT streaming and audio stream decoding for TTS MP3-to-PCM conversion.
  • Registered the new plugin in the root pyproject.toml workspace.

Documentation

Summary by CodeRabbit

  • New Features

    • Telnyx voice agent example for real-time voice interactions with STT and TTS support
    • Sample callable tools for current time and weather lookup
    • Usage metrics collection and session lifecycle logging
  • Integrations

    • Telnyx plugin added to provide STT/TTS providers and plugin registration
  • Chores

    • New package configuration for Telnyx plugin distribution and versioning

✏️ Tip: You can customize this high-level summary in your review settings.


Open with Devin

@CLAassistant
Copy link

CLAassistant commented Jan 30, 2026

CLA assistant check
All committers have signed the CLA.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jan 30, 2026

📝 Walkthrough

Walkthrough

Adds a Telnyx plugin and example voice agent: streaming STT and TTS backends, plugin registration, session management utilities, project packaging, and an examples/voice_agents/telnyx_voice_agent.py demonstrating RTC session wiring, VAD prewarm, function tools, metrics collection, and a CLI entrypoint.

Changes

Cohort / File(s) Summary
Plugin package & registration
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.py, .../version.py, .../log.py
Adds Telnyx plugin registration, exposes STT, TTS, __version__, and initializes module logger.
Common utilities
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/common.py
Adds API endpoint constants, audio params, get_api_key helper, and SessionManager for managing aiohttp sessions.
Telnyx STT implementation
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
Implements streaming STT: STT class, _STTOptions, SpeechStream, WAV header framing, websocket send/recv tasks, event parsing (interim/final/start/end), session management, and graceful close.
Telnyx TTS implementation
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py
Implements streaming TTS: TTS backend, SynthesizeStream, websocket interaction, MP3 base64 decoding, AudioStreamDecoder integration, segmentation, error mapping, and cleanup.
Example voice agent
examples/voice_agents/telnyx_voice_agent.py
New TelnyxVoiceAgent example: agent class, function_tools (get_current_time, lookup_weather), server prewarm for Silero VAD, RTC session entrypoint wiring Telnyx STT/TTS + OpenAI LLM, usage collection, and CLI launch.
Packaging & workspace
livekit-plugins/.../pyproject.toml, pyproject.toml
Adds pyproject for the plugin package and registers livekit-plugins-telnyx as a workspace source.

Sequence Diagram(s)

sequenceDiagram
    participant Client as AgentServer/Agent
    participant STT as Telnyx STT
    participant Stream as SpeechStream
    participant WS as Telnyx WebSocket

    Client->>STT: recognize(audio_stream)
    STT->>Stream: create stream()
    Stream->>WS: connect (wss + auth)
    loop sending audio
        Client->>Stream: push(audio_frame)
        Stream->>WS: send WAV header / chunk
    end
    Client->>Stream: end_input
    loop receive events
        WS-->>Stream: JSON event (interim/final)
        Stream->>Client: emit INTERIM/FINAL transcript
    end
    WS-->>Stream: stream_finished
    Stream->>STT: close()
Loading
sequenceDiagram
    participant Client as AgentServer/Agent
    participant TTS as Telnyx TTS
    participant Stream as SynthesizeStream
    participant WS as Telnyx WebSocket
    participant Decoder as MP3 Decoder

    Client->>TTS: stream()
    TTS->>Stream: new SynthesizeStream()
    loop send text segments
        Client->>Stream: push_text(segment)
    end
    Client->>Stream: flush()
    Stream->>WS: connect (wss + auth) and send text
    loop receive audio chunks
        WS-->>Stream: JSON with base64 MP3
        Stream->>Decoder: feed MP3 bytes
        Decoder-->>Stream: PCM frames
        Stream->>Client: emit PCM frame
    end
    WS-->>Stream: stream_finished
    Stream->>TTS: close()
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

Possibly related PRs

Suggested reviewers

  • davidzhao
  • longcw
  • tinalenguyen

Poem

🐰 I hopped to the WebSocket and gave it a cheer,
Voices and bytes now whispering near,
Frames hop in rhythm, VAD guides the way,
Telnyx sings back what the agent will say,
Little rabbit claps — streaming’s here today! 🎧✨

🚥 Pre-merge checks | ✅ 2 | ❌ 1
❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 3.57% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'feat: add Telnyx STT and TTS plugins' accurately and concisely summarizes the main changes in the PR, which introduces new Telnyx plugins for Speech-to-Text and Text-to-Speech capabilities.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

coderabbitai[bot]

This comment was marked as resolved.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

🤖 Fix all issues with AI agents
In `@livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py`:
- Around line 221-225: The current debug log prints the entire STT payload
(logger.debug("Telnyx STT received: %s", data)), which can expose PII; change
the logging in the WebSocket TEXT handler to avoid dumping raw transcripts by
creating a redacted summary before logging: inspect the parsed data (variable
data from json.loads(msg.data) used before calling _process_stream_event),
replace or omit fields like "transcript", "text", "alternatives", or "raw" with
either a redaction token (e.g. "[REDACTED]") or metadata such as their lengths,
and then call logger.debug with that summary (only metadata/keys/timestamps),
not the full payload. Ensure _process_stream_event continues to receive the
original data unmodified.
- Around line 203-219: The recv_task can falsely raise APIStatusError because
closing_ws is only set after the 1s sleep and close; set closing_ws = True
before awaiting the delay/closing so the recv_task sees the flag if the server
closes the connection in response to our shutdown. Update the shutdown sequence
in the function that calls ws.close() (the block that currently does await
asyncio.sleep(1.0); closing_ws = True; await ws.close()) to assign closing_ws =
True first, then await asyncio.sleep(1.0) and finally await ws.close(), so
recv_task (which checks closing_ws) will treat server-side closes as expected
shutdowns.

In `@livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py`:
- Around line 125-134: The generic exception handler in _run currently wraps all
errors from _run_ws as APIConnectionError, losing APIStatusError and
APITimeoutError details; change the except Exception as e block in _run to
re-raise those known API exceptions (APITimeoutError, APIStatusError) untouched
(raise) and only wrap other unknown exceptions as APIConnectionError (raise
APIConnectionError() from e), so that _run preserves diagnostics and retry
semantics coming from _run_ws.
- Around line 139-212: The code currently calls
output_emitter.start_segment(segment_id=segment_id) before the WebSocket work
but only calls output_emitter.end_segment() after the try/except that can be
bypassed on exceptions; move the end_segment() call into a finally that always
runs (alongside the existing decoder.aclose() cleanup) so that
output_emitter.end_segment() executes regardless of WS/connect/handler failures
(update references around ensure_session().ws_connect, the ws async-with block,
and the finally that currently awaits decoder.aclose()).
🧹 Nitpick comments (2)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py (1)

44-301: Add Google-style docstrings for the public STT surface.

STT, SpeechStream, and _create_streaming_wav_header are public/central APIs but lack Google-style docstrings. Please add concise docstrings with Args/Returns where applicable.
As per coding guidelines: Use Google-style docstrings.

livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py (1)

37-212: Add Google-style docstrings for the public TTS surface.

TTS and SynthesizeStream are public APIs but lack Google-style docstrings. Please add docstrings with Args/Returns where applicable.
As per coding guidelines: Use Google-style docstrings.

📜 Review details

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 1a17c6a and cdfaca5.

⛔ Files ignored due to path filters (1)
  • uv.lock is excluded by !**/*.lock
📒 Files selected for processing (5)
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.py
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/py.typed
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py
  • livekit-plugins/livekit-plugins-telnyx/pyproject.toml
🚧 Files skipped from review as they are similar to previous changes (1)
  • livekit-plugins/livekit-plugins-telnyx/pyproject.toml
🧰 Additional context used
📓 Path-based instructions (1)
**/*.py

📄 CodeRabbit inference engine (AGENTS.md)

**/*.py: Format code with ruff
Run ruff linter and auto-fix issues
Run mypy type checker in strict mode
Maintain line length of 100 characters maximum
Ensure Python 3.9+ compatibility
Use Google-style docstrings

Files:

  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.py
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
🧠 Learnings (5)
📚 Learning: 2026-01-30T12:53:12.738Z
Learnt from: milanperovic
Repo: livekit/agents PR: 4660
File: livekit-plugins/livekit-plugins-personaplex/livekit/plugins/personaplex/__init__.py:19-21
Timestamp: 2026-01-30T12:53:12.738Z
Learning: In plugin __init__.py files under the livekit-plugins or similar plugin directories, place internal imports (for example, from .log import logger) after the __all__ definition. These imports are used for plugin registration and are not part of the public API. This pattern is used across plugins (e.g., openai, deepgram, ultravox) and helps avoid E402 violations while keeping the public API surface clean.

Applied to files:

  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.py
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
📚 Learning: 2026-01-16T07:44:56.353Z
Learnt from: CR
Repo: livekit/agents PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-16T07:44:56.353Z
Learning: Applies to **/*.py : Run ruff linter and auto-fix issues

Applied to files:

  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
📚 Learning: 2026-01-16T07:44:56.353Z
Learnt from: CR
Repo: livekit/agents PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-16T07:44:56.353Z
Learning: Applies to **/*.py : Format code with ruff

Applied to files:

  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
📚 Learning: 2026-01-16T07:44:56.353Z
Learnt from: CR
Repo: livekit/agents PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-16T07:44:56.353Z
Learning: Implement Model Interface Pattern for STT, TTS, LLM, and Realtime models with provider-agnostic interfaces, fallback adapters for resilience, and stream adapters for different streaming patterns

Applied to files:

  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py
  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py
📚 Learning: 2026-01-16T07:44:56.353Z
Learnt from: CR
Repo: livekit/agents PR: 0
File: AGENTS.md:0-0
Timestamp: 2026-01-16T07:44:56.353Z
Learning: Follow the Plugin System pattern where plugins in livekit-plugins/ are separate packages registered via the Plugin base class

Applied to files:

  • livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.py
🧬 Code graph analysis (3)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py (6)
livekit-agents/livekit/agents/_exceptions.py (3)
  • APIConnectionError (84-88)
  • APIStatusError (45-81)
  • APITimeoutError (91-95)
livekit-agents/livekit/agents/types.py (1)
  • APIConnectOptions (54-88)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/common.py (4)
  • SessionManager (22-36)
  • get_api_key (15-19)
  • close (33-36)
  • ensure_session (27-31)
livekit-agents/livekit/agents/tts/tts.py (3)
  • TTSCapabilities (47-51)
  • num_channels (121-122)
  • exception (213-214)
livekit-agents/livekit/agents/utils/aio/channel.py (1)
  • Chan (49-178)
livekit-agents/livekit/agents/utils/codecs/decoder.py (1)
  • AudioStreamDecoder (119-339)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.py (3)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py (1)
  • STT (44-132)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py (1)
  • TTS (37-87)
livekit-agents/livekit/agents/plugin.py (2)
  • Plugin (13-56)
  • register_plugin (31-36)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/stt.py (5)
livekit-agents/livekit/agents/types.py (1)
  • APIConnectOptions (54-88)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/common.py (4)
  • SessionManager (22-36)
  • get_api_key (15-19)
  • close (33-36)
  • ensure_session (27-31)
livekit-agents/livekit/agents/stt/stt.py (4)
  • SpeechEvent (70-74)
  • SpeechEventType (32-49)
  • SpeechData (53-61)
  • RecognizeStream (252-469)
livekit-agents/livekit/agents/utils/log.py (1)
  • log_exceptions (9-41)
livekit-agents/livekit/agents/utils/audio.py (1)
  • AudioByteStream (41-157)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
  • GitHub Check: type-check (3.9)
  • GitHub Check: unit-tests
  • GitHub Check: type-check (3.13)
🔇 Additional comments (1)
livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/__init__.py (1)

1-18: LGTM — plugin registration and exports are wired correctly.

✏️ Tip: You can disable this entire section by setting review_details to false in your review settings.

Comment on lines 139 to 212
segment_id = utils.shortuuid()
output_emitter.start_segment(segment_id=segment_id)

url = f"{self._tts._opts.base_url}?voice={self._tts._opts.voice}"
headers = {"Authorization": f"Bearer {self._tts._opts.api_key}"}

decoder = utils.codecs.AudioStreamDecoder(
sample_rate=SAMPLE_RATE,
num_channels=NUM_CHANNELS,
format="audio/mp3",
)

async def send_task(ws: aiohttp.ClientWebSocketResponse) -> None:
await ws.send_str(json.dumps({"text": " "}))
self._mark_started()
await ws.send_str(json.dumps({"text": text}))
await ws.send_str(json.dumps({"text": ""}))

async def recv_task(ws: aiohttp.ClientWebSocketResponse) -> None:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
try:
data = json.loads(msg.data)
audio_data = data.get("audio")
if audio_data:
audio_bytes = base64.b64decode(audio_data)
if audio_bytes:
decoder.push(audio_bytes)
except json.JSONDecodeError:
logger.warning("Telnyx TTS: Received invalid JSON")

elif msg.type in (
aiohttp.WSMsgType.CLOSE,
aiohttp.WSMsgType.CLOSED,
aiohttp.WSMsgType.CLOSING,
):
break
elif msg.type == aiohttp.WSMsgType.ERROR:
logger.error(f"Telnyx TTS WebSocket error: {ws.exception()}")
break

decoder.end_input()

async def decode_task() -> None:
async for frame in decoder:
output_emitter.push(frame.data.tobytes())

try:
ws = await asyncio.wait_for(
self._tts._session_manager.ensure_session().ws_connect(url, headers=headers),
self._conn_options.timeout,
)
async with ws:
tasks = [
asyncio.create_task(send_task(ws)),
asyncio.create_task(recv_task(ws)),
asyncio.create_task(decode_task()),
]
try:
await asyncio.gather(*tasks)
finally:
await utils.aio.gracefully_cancel(*tasks)
except asyncio.TimeoutError:
raise APITimeoutError() from None
except aiohttp.ClientResponseError as e:
raise APIStatusError(
message=e.message, status_code=e.status, request_id=None, body=None
) from None
except Exception as e:
raise APIConnectionError() from e
finally:
await decoder.aclose()

output_emitter.end_segment()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Ensure end_segment() is emitted even on WS failures.

start_segment() is called before the WS connect. If an exception is raised, end_segment() is skipped and downstream consumers may wait indefinitely. Move end_segment() into a finally block.

🧹 Always end the segment
-        try:
-            ws = await asyncio.wait_for(
-                self._tts._session_manager.ensure_session().ws_connect(url, headers=headers),
-                self._conn_options.timeout,
-            )
-            async with ws:
-                tasks = [
-                    asyncio.create_task(send_task(ws)),
-                    asyncio.create_task(recv_task(ws)),
-                    asyncio.create_task(decode_task()),
-                ]
-                try:
-                    await asyncio.gather(*tasks)
-                finally:
-                    await utils.aio.gracefully_cancel(*tasks)
-        except asyncio.TimeoutError:
-            raise APITimeoutError() from None
-        except aiohttp.ClientResponseError as e:
-            raise APIStatusError(
-                message=e.message, status_code=e.status, request_id=None, body=None
-            ) from None
-        except Exception as e:
-            raise APIConnectionError() from e
-        finally:
-            await decoder.aclose()
-
-        output_emitter.end_segment()
+        try:
+            ws = await asyncio.wait_for(
+                self._tts._session_manager.ensure_session().ws_connect(url, headers=headers),
+                self._conn_options.timeout,
+            )
+            async with ws:
+                tasks = [
+                    asyncio.create_task(send_task(ws)),
+                    asyncio.create_task(recv_task(ws)),
+                    asyncio.create_task(decode_task()),
+                ]
+                try:
+                    await asyncio.gather(*tasks)
+                finally:
+                    await utils.aio.gracefully_cancel(*tasks)
+        except asyncio.TimeoutError:
+            raise APITimeoutError() from None
+        except aiohttp.ClientResponseError as e:
+            raise APIStatusError(
+                message=e.message, status_code=e.status, request_id=None, body=None
+            ) from None
+        except Exception as e:
+            raise APIConnectionError() from e
+        finally:
+            await decoder.aclose()
+            output_emitter.end_segment()
🤖 Prompt for AI Agents
In `@livekit-plugins/livekit-plugins-telnyx/livekit/plugins/telnyx/tts.py` around
lines 139 - 212, The code currently calls
output_emitter.start_segment(segment_id=segment_id) before the WebSocket work
but only calls output_emitter.end_segment() after the try/except that can be
bypassed on exceptions; move the end_segment() call into a finally that always
runs (alongside the existing decoder.aclose() cleanup) so that
output_emitter.end_segment() executes regardless of WS/connect/handler failures
(update references around ensure_session().ws_connect, the ws async-with block,
and the finally that currently awaits decoder.aclose()).

@sonamg-droid
Copy link

@theomonnom Hi Théo, would love it if you or someone from your team could take a look at this PR and help review/approve it so we can get it merged and make the integration official.

We’re contributing this from Telnyx and are excited to formally plug into the LiveKit ecosystem. There are already several companies in the ecosystem here (e.g. Twilio, Deepgram, Gladia), and we think this fits naturally alongside those and makes the platform more complete.

Happy to make any changes or iterate based on feedback - just let us know

devin-ai-integration[bot]

This comment was marked as resolved.

Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 6 additional flags in Devin Review.

Open in Devin Review

Comment on lines +248 to +252
params = {
"transcription_engine": opts.transcription_engine,
"language": self._language,
"input_format": "wav",
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 interim_results STT option is stored but never sent to the Telnyx API

The interim_results parameter is accepted in the STT constructor (line 50) and stored in _STTOptions (line 67), but it is never included in the WebSocket connection parameters sent to the Telnyx API.

Click to expand

Issue Details

In _connect_ws (lines 248-254), the query parameters only include:

params = {
    "transcription_engine": opts.transcription_engine,
    "language": self._language,
    "input_format": "wav",
}

The opts.interim_results value is never used, meaning users who set interim_results=False expecting to only receive final transcripts will still receive interim results from the API (if Telnyx supports this parameter).

Impact

Users cannot control whether interim results are returned from the STT stream, despite the option being exposed in the API. The STTCapabilities is correctly set with the interim_results value, but the actual API behavior won't match this capability declaration.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 15 additional findings in Devin Review.

Open in Devin Review

Comment on lines +127 to +134
except asyncio.TimeoutError:
raise APITimeoutError() from None
except aiohttp.ClientResponseError as e:
raise APIStatusError(
message=e.message, status_code=e.status, request_id=request_id, body=None
) from None
except Exception as e:
raise APIConnectionError() from e
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 TTS _run exception handlers swallow specific error types from _run_ws, downgrading them to generic APIConnectionError

The _run method's exception handling re-wraps errors already converted by _run_ws, losing specific error type information.

Root Cause

_run_ws (lines 201-208) correctly catches asyncio.TimeoutError and aiohttp.ClientResponseError and converts them to APITimeoutError and APIStatusError respectively. However, these API exception classes (APITimeoutErrorAPIConnectionErrorAPIErrorException) are not subclasses of asyncio.TimeoutError or aiohttp.ClientResponseError.

When these converted exceptions propagate through _run_segmentsasyncio.gather back into _run, the outer handler at lines 127-134:

except asyncio.TimeoutError:      # never matches API errors
    raise APITimeoutError() from None
except aiohttp.ClientResponseError as e:  # never matches API errors
    raise APIStatusError(...) from None
except Exception as e:            # catches ALL API errors here
    raise APIConnectionError() from e

The first two except blocks are dead code for exceptions originating from _run_ws. All API errors fall through to except Exception and get re-wrapped as generic APIConnectionError, discarding the specific error type (timeout vs status error) and associated data (status codes, etc.).

Impact: Timeout errors from the Telnyx TTS WebSocket are reported as connection errors, and status errors lose their HTTP status codes. This degrades error diagnostics and could affect retry logic if callers make decisions based on error type.

Suggested change
except asyncio.TimeoutError:
raise APITimeoutError() from None
except aiohttp.ClientResponseError as e:
raise APIStatusError(
message=e.message, status_code=e.status, request_id=request_id, body=None
) from None
except Exception as e:
raise APIConnectionError() from e
except asyncio.TimeoutError:
raise APITimeoutError() from None
except aiohttp.ClientResponseError as e:
raise APIStatusError(
message=e.message, status_code=e.status, request_id=request_id, body=None
) from None
except APIConnectionError:
raise
except Exception as e:
raise APIConnectionError() from e
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

fmv1992 and others added 5 commits February 13, 2026 06:16
Ensures output_emitter.end_segment() is always called even when
exceptions occur during WebSocket handling. Fixes segment count
mismatch error on TTS failures.

Addresses review comment from Devin AI.
Wraps stream usage in try/finally to ensure aclose() is called,
preventing WebSocket connection leaks.

Addresses review comment from Devin AI.
- STT: redact transcript content from debug logs to avoid PII exposure
- TTS: use lazy % formatting instead of f-string in logger.error
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 16 additional findings in Devin Review.

Open in Devin Review

def __init__(self, *, tts: TTS, conn_options: APIConnectOptions = DEFAULT_API_CONNECT_OPTIONS):
super().__init__(tts=tts, conn_options=conn_options)
self._tts: TTS = tts
self._segments_ch = utils.aio.Chan[str]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 TTS _segments_ch created in __init__ is not re-created in _run, breaking retries

The _segments_ch channel is created once in SynthesizeStream.__init__ (line 94) but is closed at the end of _collect_segments inside _run (line 115). The base class _main_task (livekit-agents/livekit/agents/tts/tts.py:450) retries by calling _run again in a loop. On retry, _segments_ch is already closed, so _collect_segments calls send_nowait on a closed channel, and _run_segments exits immediately since iteration over a closed channel yields nothing.

Root Cause and Impact

The base class retry loop at livekit-agents/livekit/agents/tts/tts.py:450-494 calls self._run(output_emitter) up to max_retry + 1 times. On the first call, _collect_segments closes self._segments_ch at line 115. On the second call, the closed channel means no segments are forwarded to _run_ws, so no audio is ever produced.

Impact: Any transient error that triggers a retry will result in silent failure — no audio output on subsequent attempts.

Prompt for agents
Move the creation of `_segments_ch` from `__init__` into the beginning of the `_run` method, so that a fresh channel is created for each retry attempt. Specifically, remove `self._segments_ch = utils.aio.Chan[str]()` from line 94 of tts.py and add it as the first line inside `_run` (before `request_id = utils.shortuuid()` on line 97). This ensures each call to `_run` gets a fresh, open channel.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@j0nscalet
Copy link

The failing CI checks (unit-tests, type-check) appear to be pre-existing issues in the main codebase, not related to this Telnyx plugin PR. The plugin-specific code passes ruff and has been E2E tested locally.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's remove this example. it doesn't offer any additional value to readers other than the new addition of the model.. which will be available on our docs site

for frame in audio_bstream.flush():
await ws.send_bytes(frame.data.tobytes())

await asyncio.sleep(1.0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is it necessary to sleep here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not, I have removed this. Thanks for the review.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants

Comments