diff --git a/apps/docs/integrations/cartesia.mdx b/apps/docs/integrations/cartesia.mdx new file mode 100644 index 00000000..fb13f7a1 --- /dev/null +++ b/apps/docs/integrations/cartesia.mdx @@ -0,0 +1,346 @@ +--- +title: "Cartesia" +sidebarTitle: "Cartesia (Voice)" +description: "Integrate Supermemory with Cartesia for conversational memory in voice AI agents" +icon: "/images/cartesia.svg" +--- + +Supermemory integrates with [Cartesia](https://cartesia.ai/agents), providing long-term memory capabilities for voice AI agents. Your Cartesia applications will remember past conversations and provide personalized responses based on user history. + +## Installation + +To use Supermemory with Cartesia, install the required dependencies: + +```bash +pip install supermemory-cartesia +``` + +Set up your API key as an environment variable: + +```bash +export SUPERMEMORY_API_KEY=your_supermemory_api_key +``` + +You can obtain an API key from [console.supermemory.ai](https://console.supermemory.ai). + +## Configuration + +Supermemory integration is provided through the `SupermemoryCartesiaAgent` wrapper class: + +```python +from supermemory_cartesia import SupermemoryCartesiaAgent +from line.llm_agent import LlmAgent, LlmConfig + +# Create base LLM agent +base_agent = LlmAgent( + model="anthropic/claude-haiku-4-5-20251001", + api_key=os.getenv("ANTHROPIC_API_KEY"), + config=LlmConfig( + system_prompt="""You are a helpful voice assistant with memory.""", + introduction="Hello! Great to talk with you again!", + ), +) + +# Wrap with Supermemory +memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag="user-123", + session_id="session-456", + config=SupermemoryCartesiaAgent.MemoryConfig( + mode="full", # "profile" | "query" | "full" + search_limit=10, # Max memories to retrieve + search_threshold=0.3, # Relevance threshold (0.0-1.0) + ), +) +``` + +## Agent Wrapper Pattern + +The `SupermemoryCartesiaAgent` wraps your existing `LlmAgent` to add memory capabilities: + +```python +from line.voice_agent_app import VoiceAgentApp + +async def get_agent(env, call_request): + # Extract container_tag from call metadata (typically user ID) + container_tag = call_request.metadata.get("user_id", "default-user") + + # Create base agent + base_agent = LlmAgent(...) + + # Wrap with memory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + container_tag=container_tag, + session_id=call_request.call_id, + ) + + return memory_agent + +# Create voice agent app +app = VoiceAgentApp(get_agent=get_agent) +``` + +## How It Works + +When integrated with Cartesia Line, Supermemory provides two key functionalities: + +### 1. Memory Retrieval + +When a `UserTurnEnded` event is detected, Supermemory retrieves relevant memories: + +- **Static Profile**: Persistent facts about the user +- **Dynamic Profile**: Recent context and preferences +- **Search Results**: Semantically relevant past memories + +### 2. Context Enhancement + +Retrieved memories are formatted and injected into the agent's system prompt before processing, giving the model awareness of past conversations. + +### 3. Background Storage + +Conversations are automatically stored in Supermemory (non-blocking) for future retrieval. + +## Memory Modes + +| Mode | Static Profile | Dynamic Profile | Search Results | Use Case | +| ----------- | -------------- | --------------- | -------------- | ------------------------------ | +| `"profile"` | Yes | Yes | No | Personalization without search | +| `"query"` | No | No | Yes | Finding relevant past context | +| `"full"` | Yes | Yes | Yes | Complete memory (default) | + +## Configuration Options + +You can customize how memories are retrieved and used: + +### MemoryConfig + +```python +SupermemoryCartesiaAgent.MemoryConfig( + mode="full", # Memory mode (default: "full") + search_limit=10, # Max memories to retrieve (default: 10) + search_threshold=0.1, # Similarity threshold 0.0-1.0 (default: 0.1) + system_prompt="Based on previous conversations:\n\n", +) +``` + +| Parameter | Type | Default | Description | +| ------------------ | ----- | -------------------------------------- | ---------------------------------------------------------- | +| `search_limit` | int | 10 | Maximum number of memories to retrieve per query | +| `search_threshold` | float | 0.1 | Minimum similarity threshold for memory retrieval | +| `mode` | str | "full" | Memory retrieval mode: `"profile"`, `"query"`, or `"full"` | +| `system_prompt` | str | "Based on previous conversations:\n\n" | Prefix text for memory context | + +### Agent Parameters + +```python +SupermemoryCartesiaAgent( + agent=base_agent, # Required: Cartesia Line LlmAgent + container_tag="user-123", # Required: Primary container tag (e.g., user ID) + session_id="session-456", # Optional: Session/conversation ID + container_tags=["org-acme", "prod"], # Optional: Additional tags + custom_id="conversation-789", # Optional: Groups all messages in same document + api_key=os.getenv("SUPERMEMORY_API_KEY"), # Optional: defaults to env var + config=MemoryConfig(...), # Optional: memory configuration + base_url=None, # Optional: custom API endpoint +) +``` + +| Parameter | Type | Required | Description | +| --------------- | ------------ | -------- | ------------------------------------------------------------------ | +| `agent` | LlmAgent | **Yes** | The Cartesia Line agent to wrap | +| `container_tag` | str | **Yes** | Primary container tag for memory scoping (e.g., user ID) | +| `session_id` | str | No | Session/conversation ID for grouping memories | +| `container_tags`| List[str] | No | Additional container tags for organization (e.g., ["org", "prod"]) | +| `custom_id` | str | No | Custom ID to store all messages in the same document (e.g., conversation ID) | +| `api_key` | str | No | Supermemory API key (or set `SUPERMEMORY_API_KEY` env var) | +| `config` | MemoryConfig | No | Advanced configuration | +| `base_url` | str | No | Custom API endpoint | + +## Container Tags + +Container tags allow you to organize memories across multiple dimensions: + +```python +memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + container_tag="user-alice", # Primary: user ID + container_tags=["org-acme", "prod"], # Additional: organization, environment +) +``` + +Memories are stored with all tags: +```json +{ + "content": "User: What's the weather?\nAssistant: It's sunny today!", + "container_tags": ["user-alice", "org-acme", "prod"], + "metadata": { "platform": "cartesia" } +} +``` + +## Automatic Document Grouping + +The SDK **automatically groups all messages from the same session** into a single Supermemory document using `session_id`: + +```python +memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + container_tag="user-alice", + session_id=call_request.call_id, # Automatically groups all messages together +) +``` + +**How it works:** +- When you provide `session_id`, it's automatically used as the `custom_id` internally +- All messages from that session are appended to the same document +- This ensures conversation continuity without any extra configuration + +**Advanced:** If you need custom grouping (e.g., grouping multiple sessions together), you can explicitly set `custom_id` to override this behavior. + +## Example: Basic Voice Agent with Memory + +Here's a complete example of a Cartesia Line voice agent with Supermemory integration: + +```python +import os +from line.llm_agent import LlmAgent, LlmConfig +from line.voice_agent_app import VoiceAgentApp +from supermemory_cartesia import SupermemoryCartesiaAgent + +async def get_agent(env, call_request): + # Extract container_tag from call metadata (typically user ID) + container_tag = call_request.metadata.get("user_id", "default-user") + + # Create base LLM agent + base_agent = LlmAgent( + model="anthropic/claude-haiku-4-5-20251001", + api_key=os.getenv("ANTHROPIC_API_KEY"), + config=LlmConfig( + system_prompt="""You are a helpful voice assistant with memory.""", + introduction="Hello! Great to talk with you again!", + ), + ) + + # Wrap with Supermemory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag=container_tag, + session_id=call_request.call_id, # Automatically groups all messages + ) + + return memory_agent + +# Create voice agent app +app = VoiceAgentApp(get_agent=get_agent) + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=8000) +``` + +## Example: Advanced Agent with Tools + +Here's an example with custom tools and multi-tag support: + +```python +import os +from line.llm_agent import LlmAgent, LlmConfig +from line.tools import LoopbackTool +from line.voice_agent_app import VoiceAgentApp +from supermemory_cartesia import SupermemoryCartesiaAgent + +# Define custom tool +async def get_weather(location: str) -> str: + return f"The weather in {location} is sunny, 72°F" + +weather_tool = LoopbackTool( + name="get_weather", + description="Get current weather for a location", + function=get_weather +) + +async def get_agent(env, call_request): + container_tag = call_request.metadata.get("user_id", "default-user") + org_id = call_request.metadata.get("org_id") + + # Create LLM agent with tools + base_agent = LlmAgent( + model="gemini/gemini-2.5-flash-preview-09-2025", + tools=[weather_tool], + config=LlmConfig( + system_prompt="You are a personal assistant with memory and tools.", + introduction="Hi! How can I help you today?" + ) + ) + + # Wrap with Supermemory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag=container_tag, + session_id=call_request.call_id, # Automatically groups all messages + container_tags=[org_id] if org_id else None, + config=SupermemoryCartesiaAgent.MemoryConfig( + mode="full", + search_limit=15, + search_threshold=0.15, + ) + ) + + return memory_agent + +app = VoiceAgentApp(get_agent=get_agent) +``` + +## Deployment + +To deploy to Cartesia Line, create a `main.py` file in your project root: + +```python +import os +import sys + +# Add src to path for local imports +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "src")) + +from line.llm_agent import LlmAgent, LlmConfig +from line.voice_agent_app import VoiceAgentApp +from supermemory_cartesia import SupermemoryCartesiaAgent + +async def get_agent(env, call_request): + """Create a memory-enabled voice agent.""" + container_tag = call_request.metadata.get("user_id", "default-user") + + base_agent = LlmAgent( + model="anthropic/claude-haiku-4-5-20251001", + api_key=os.getenv("ANTHROPIC_API_KEY"), + config=LlmConfig( + system_prompt="""You are a helpful voice assistant with memory. + You remember past conversations and can reference them naturally. + Keep responses brief and conversational.""", + introduction="Hello! Great to talk with you again!", + ), + ) + + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag=container_tag, + session_id=call_request.call_id, # Automatically groups all messages + ) + + return memory_agent + +app = VoiceAgentApp(get_agent=get_agent) +``` + +Then deploy with: + +```bash +cartesia deploy +``` + +Make sure to set these environment variables in your Cartesia deployment: +- `SUPERMEMORY_API_KEY` - Your Supermemory API key +- `ANTHROPIC_API_KEY` - Your Anthropic API key (or the key for your chosen LLM provider) diff --git a/packages/cartesia-sdk-python/README.md b/packages/cartesia-sdk-python/README.md new file mode 100644 index 00000000..3ba7533f --- /dev/null +++ b/packages/cartesia-sdk-python/README.md @@ -0,0 +1,222 @@ +# Supermemory Cartesia SDK + +Memory-enhanced voice agents with [Supermemory](https://supermemory.ai) and [Cartesia Line](https://cartesia.ai/agents). + +## Installation + +```bash +pip install supermemory-cartesia +``` + +## Quick Start + +```python +import os +from line.llm_agent import LlmAgent, LlmConfig +from line.voice_agent_app import VoiceAgentApp +from supermemory_cartesia import SupermemoryCartesiaAgent + +async def get_agent(env, call_request): + # Extract container_tag from call metadata (typically user ID) + container_tag = call_request.metadata.get("user_id", "default-user") + + # Create base LLM agent + base_agent = LlmAgent( + model="gemini/gemini-2.5-flash-preview-09-2025", + config=LlmConfig( + system_prompt="You are a helpful voice assistant with memory.", + introduction="Hello! Great to talk with you again!" + ) + ) + + # Wrap with Supermemory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag=container_tag, + session_id=call_request.call_id, + ) + + return memory_agent + +# Create voice agent app +app = VoiceAgentApp(get_agent=get_agent) + +if __name__ == "__main__": + app.run(host="0.0.0.0", port=8000) +``` + +## Configuration + +### Parameters + +| Parameter | Type | Required | Description | +| --------------- | ------------ | -------- | ------------------------------------------------------------------ | +| `agent` | LlmAgent | **Yes** | The Cartesia Line agent to wrap | +| `container_tag` | str | **Yes** | Primary container tag for memory scoping (e.g., user ID) | +| `session_id` | str | No | Session/conversation ID for grouping memories | +| `container_tags`| List[str] | No | Additional container tags for organization (e.g., ["org", "prod"]) | +| `api_key` | str | No | Supermemory API key (or set `SUPERMEMORY_API_KEY` env var) | +| `config` | MemoryConfig | No | Advanced configuration | +| `base_url` | str | No | Custom API endpoint | + +### Advanced Configuration + +```python +from supermemory_cartesia import SupermemoryCartesiaAgent + +memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + container_tag="user-123", + session_id="session-456", + container_tags=["org-acme", "prod"], # Optional: additional tags + config=SupermemoryCartesiaAgent.MemoryConfig( + search_limit=10, # Max memories to retrieve + search_threshold=0.1, # Similarity threshold + mode="full", # "profile", "query", or "full" + system_prompt="Based on previous conversations, I recall:\n\n", + ), +) +``` + +### Memory Modes + +| Mode | Static Profile | Dynamic Profile | Search Results | +| ----------- | -------------- | --------------- | -------------- | +| `"profile"` | Yes | Yes | No | +| `"query"` | No | No | Yes | +| `"full"` | Yes | Yes | Yes | + +## How It Works + +1. **Intercepts events** - Listens for `UserTurnEnded` events from Cartesia Line +2. **Retrieves memories** - Queries Supermemory `/v4/profile` API with user's message +3. **Enriches context** - Adds memories to event history as system message +4. **Stores messages** - Sends conversation to Supermemory (background, non-blocking) +5. **Passes to agent** - Forwards enriched event to wrapped LlmAgent + +### What Gets Stored + +User and assistant messages are sent to Supermemory: + +```json +{ + "content": "User: What's the weather?\nAssistant: It's sunny today!", + "container_tags": ["user-123", "org-acme", "prod"], + "metadata": { "platform": "cartesia" } +} +``` + +## Architecture + +Cartesia Line uses an event-driven architecture: + +``` +User Speaks (Audio) + ↓ +[Ink STT] → Automatic speech recognition + ↓ +UserTurnEnded Event {content: "user message", history: [...]} + ↓ +┌──────────────────────────────────────────────┐ +│ SUPERMEMORY CARTESIA AGENT (Wrapper) │ +│ │ +│ process(env, event): │ +│ 1. Intercept UserTurnEnded │ +│ 2. Extract user message │ +│ 3. Query Supermemory API │ +│ 4. Enrich event.history with memories │ +│ 5. Pass to wrapped LlmAgent │ +│ 6. Store conversation (async background) │ +└──────────────────────────────────────────────┘ + ↓ +AgentSendText Event {text: "response"} + ↓ +[Sonic TTS] → Ultra-fast speech synthesis + ↓ +Audio Output +``` + +## Comparison with Pipecat SDK + +| Aspect | Pipecat | Cartesia Line | +| ----------------------- | ------------------------------ | ---------------------------- | +| **Integration Pattern** | Extends `FrameProcessor` | Wrapper around `LlmAgent` | +| **Event Handling** | `process_frame()` method | `process()` method | +| **Events** | `LLMContextFrame`, `LLMMessagesFrame` | `UserTurnEnded`, `CallStarted` | +| **Context Object** | `LLMContext.get_messages()` | `event.history` | +| **Memory Injection** | Modify `context.add_message()` | Modify `event.history` | + +## Full Example with Tools + +```python +import os +from line.llm_agent import LlmAgent, LlmConfig +from line.tools import LoopbackTool +from line.voice_agent_app import VoiceAgentApp +from supermemory_cartesia import SupermemoryCartesiaAgent + +# Define custom tools +async def get_weather(location: str) -> str: + return f"The weather in {location} is sunny, 72°F" + +weather_tool = LoopbackTool( + name="get_weather", + description="Get current weather for a location", + function=get_weather +) + +async def get_agent(env, call_request): + container_tag = call_request.metadata.get("user_id", "default-user") + org_id = call_request.metadata.get("org_id") + + # Create LLM agent with tools + base_agent = LlmAgent( + model="gemini/gemini-2.5-flash-preview-09-2025", + tools=[weather_tool], + config=LlmConfig( + system_prompt="You are a personal assistant with memory and tools.", + introduction="Hi! How can I help you today?" + ) + ) + + # Wrap with Supermemory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag=container_tag, + session_id=call_request.call_id, + container_tags=[org_id] if org_id else None, + config=SupermemoryCartesiaAgent.MemoryConfig( + mode="full", + search_limit=15, + search_threshold=0.15, + ) + ) + + return memory_agent + +app = VoiceAgentApp(get_agent=get_agent) +``` + +## Development + +```bash +# Clone repository +git clone https://github.com/supermemoryai/supermemory +cd supermemory/packages/cartesia-sdk-python + +# Install in development mode +pip install -e ".[dev]" + +# Run tests +pytest + +# Format code +black . +isort . +``` + +## License + +MIT diff --git a/packages/cartesia-sdk-python/pyproject.toml b/packages/cartesia-sdk-python/pyproject.toml new file mode 100644 index 00000000..18b3b299 --- /dev/null +++ b/packages/cartesia-sdk-python/pyproject.toml @@ -0,0 +1,78 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "supermemory-cartesia" +version = "0.1.0" +description = "Supermemory integration for Cartesia Line - memory-enhanced voice agents" +readme = "README.md" +license = "MIT" +requires-python = ">=3.10" +authors = [ + { name = "Supermemory", email = "support@supermemory.ai" } +] +keywords = [ + "supermemory", + "cartesia", + "line", + "memory", + "conversational-ai", + "llm", + "voice-ai", +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Topic :: Scientific/Engineering :: Artificial Intelligence", +] +dependencies = [ + "supermemory>=3.16.0", + "cartesia-line>=0.2.0", + "pydantic>=2.10.0", + "loguru>=0.7.3", +] + +[project.optional-dependencies] +dev = [ + "pytest>=8.3.5", + "pytest-asyncio>=0.24.0", + "mypy>=1.14.1", + "black>=24.8.0", + "isort>=5.13.2", +] + +[project.urls] +Homepage = "https://supermemory.ai" +Documentation = "https://docs.supermemory.ai" +Repository = "https://github.com/supermemoryai/supermemory" + +[tool.hatch.build.targets.wheel] +packages = ["src/supermemory_cartesia"] + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", + "/README.md", + "/LICENSE", +] + +[tool.black] +line-length = 100 +target-version = ["py310"] + +[tool.isort] +profile = "black" +line_length = 100 + +[tool.mypy] +python_version = "3.10" +warn_return_any = true +warn_unused_configs = true +disallow_untyped_defs = true diff --git a/packages/cartesia-sdk-python/requirements.txt b/packages/cartesia-sdk-python/requirements.txt new file mode 100644 index 00000000..dd4b8fe5 --- /dev/null +++ b/packages/cartesia-sdk-python/requirements.txt @@ -0,0 +1,5 @@ +# Core dependencies for supermemory-cartesia +supermemory>=3.16.0 +pydantic>=2.10.0 +loguru>=0.7.3 +cartesia-line>=0.2.0 diff --git a/packages/cartesia-sdk-python/src/supermemory_cartesia/__init__.py b/packages/cartesia-sdk-python/src/supermemory_cartesia/__init__.py new file mode 100644 index 00000000..b21511d9 --- /dev/null +++ b/packages/cartesia-sdk-python/src/supermemory_cartesia/__init__.py @@ -0,0 +1,67 @@ +"""Supermemory Cartesia SDK - Memory-enhanced voice agents with Cartesia Line. + +This package provides seamless integration between Supermemory and Cartesia Line, +enabling persistent memory and context enhancement for voice AI applications. + +Example: + ```python + from supermemory_cartesia import SupermemoryCartesiaAgent, MemoryConfig + from line.llm_agent import LlmAgent, LlmConfig + + # Create base LLM agent + base_agent = LlmAgent( + model="gemini/gemini-2.5-flash-preview-09-2025", + config=LlmConfig( + system_prompt="You are a helpful assistant.", + introduction="Hello!" + ) + ) + + # Wrap with Supermemory + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag="user-123", + ) + ``` +""" + +from .agent import SupermemoryCartesiaAgent + +# Export MemoryConfig as a top-level class for convenience +MemoryConfig = SupermemoryCartesiaAgent.MemoryConfig + +from .exceptions import ( + APIError, + ConfigurationError, + MemoryRetrievalError, + MemoryStorageError, + NetworkError, + SupermemoryCartesiaError, +) +from .utils import ( + deduplicate_memories, + format_memories_to_text, + format_relative_time, + get_last_user_message, +) + +__version__ = "0.1.0" + +__all__ = [ + # Main agent + "SupermemoryCartesiaAgent", + "MemoryConfig", + # Exceptions + "SupermemoryCartesiaError", + "ConfigurationError", + "MemoryRetrievalError", + "MemoryStorageError", + "APIError", + "NetworkError", + # Utilities + "get_last_user_message", + "deduplicate_memories", + "format_memories_to_text", + "format_relative_time", +] diff --git a/packages/cartesia-sdk-python/src/supermemory_cartesia/agent.py b/packages/cartesia-sdk-python/src/supermemory_cartesia/agent.py new file mode 100644 index 00000000..e6337b23 --- /dev/null +++ b/packages/cartesia-sdk-python/src/supermemory_cartesia/agent.py @@ -0,0 +1,431 @@ +"""Supermemory Cartesia Line agent integration. + +This module provides a memory-enhanced agent wrapper that integrates with +Cartesia Line voice agents, adding persistent memory and context enrichment. +""" + +import asyncio +import os +import re +from typing import Any, AsyncGenerator, Dict, List, Literal, Optional + +from loguru import logger +from pydantic import BaseModel, Field + +from .exceptions import ConfigurationError, MemoryRetrievalError +from .utils import deduplicate_memories, format_memories_to_text + +try: + import supermemory +except ImportError: + supermemory = None # type: ignore + +try: + from line.events import Event +except ImportError: + Event = Any # type: ignore + +# XML tags for memory injection +MEMORY_TAG_START = "" +MEMORY_TAG_END = "" + + +class SupermemoryCartesiaAgent: + """Memory-enhanced wrapper for Cartesia Line agents. + + This wrapper intercepts UserTurnEnded events, retrieves relevant memories + from Supermemory, and enriches the conversation history before passing to + the wrapped agent. + + Example: + ```python + from line.llm_agent import LlmAgent, LlmConfig + from supermemory_cartesia import SupermemoryCartesiaAgent + + base_agent = LlmAgent( + model="anthropic/claude-haiku-4-5-20251001", + config=LlmConfig( + system_prompt="You are a helpful assistant.", + introduction="Hello! How can I help you today?" + ) + ) + + memory_agent = SupermemoryCartesiaAgent( + agent=base_agent, + api_key=os.getenv("SUPERMEMORY_API_KEY"), + container_tag="user-123", + ) + ``` + """ + + class MemoryConfig(BaseModel): + """Configuration for memory retrieval. + + Attributes: + search_limit: Maximum memories to retrieve per query. + search_threshold: Minimum similarity threshold (0.0-1.0). + system_prompt: Prefix text for memory context. + mode: "profile", "query", or "full". + """ + + search_limit: int = Field(default=10, ge=1) + search_threshold: float = Field(default=0.1, ge=0.0, le=1.0) + system_prompt: str = Field(default="Based on previous conversations:\n\n") + mode: Literal["profile", "query", "full"] = Field(default="full") + + def __init__( + self, + *, + agent: Any, + api_key: Optional[str] = None, + container_tag: str, + session_id: Optional[str] = None, + container_tags: Optional[List[str]] = None, + custom_id: Optional[str] = None, + config: Optional[MemoryConfig] = None, + base_url: Optional[str] = None, + ): + """Initialize the Supermemory Cartesia agent wrapper. + + Args: + agent: The inner Cartesia Line agent to wrap. + api_key: Supermemory API key (or SUPERMEMORY_API_KEY env var). + container_tag: Primary container tag for memory scoping (e.g., user ID). + session_id: Optional session ID for grouping memories. + container_tags: Optional list of additional container tags for + organization/categorization (e.g., ["org-acme", "prod"]). + custom_id: Optional custom ID to store all conversation messages in the same document. + Useful for grouping multi-turn conversations (e.g., session ID, conversation ID). + config: Memory retrieval configuration. + base_url: Optional custom Supermemory API URL. + + Raises: + ConfigurationError: If API key or container_tag is missing. + """ + self.agent = agent + self.container_tag = container_tag + self.session_id = session_id + self.custom_id = custom_id + + # Build container tags list: primary tag first, then additional tags + self.container_tags = [container_tag] + if container_tags: + self.container_tags.extend(container_tags) + + self.config = config or SupermemoryCartesiaAgent.MemoryConfig() + + self.api_key = api_key or os.getenv("SUPERMEMORY_API_KEY") + if not self.api_key: + raise ConfigurationError( + "API key required. Set SUPERMEMORY_API_KEY or pass api_key." + ) + + if not container_tag: + raise ConfigurationError("container_tag is required") + + self._supermemory_client = None + if supermemory is not None: + try: + self._supermemory_client = supermemory.AsyncSupermemory( + api_key=self.api_key, + base_url=base_url, + ) + logger.info(f"[Supermemory] Initialized client for container_tag={container_tag}, all_tags={self.container_tags}") + except Exception as e: + logger.error(f"[Supermemory] Failed to initialize client: {e}") + + self._messages_sent_count: int = 0 + self._last_query: Optional[str] = None + + async def _retrieve_memories(self, query: str) -> Dict[str, Any]: + """Retrieve memories from Supermemory.""" + if self._supermemory_client is None: + raise MemoryRetrievalError("Supermemory client not initialized") + + try: + # Use primary container tag for profile retrieval + kwargs: Dict[str, Any] = {"container_tag": self.container_tags[0]} + + if self.config.mode != "profile" and query: + kwargs["q"] = query + kwargs["threshold"] = self.config.search_threshold + kwargs["extra_body"] = {"limit": self.config.search_limit} + + logger.info(f"[Supermemory] Retrieving memories for query: {query[:50]}...") + + response = await asyncio.wait_for( + self._supermemory_client.profile(**kwargs), + timeout=10.0 + ) + + static_count = len(response.profile.static) if response.profile.static else 0 + dynamic_count = len(response.profile.dynamic) if response.profile.dynamic else 0 + search_count = len(response.search_results.results) if response.search_results and response.search_results.results else 0 + + logger.info(f"[Supermemory] Retrieved memories - static: {static_count}, dynamic: {dynamic_count}, search: {search_count}") + + search_results = [] + if response.search_results and response.search_results.results: + search_results = response.search_results.results + + return { + "profile": { + "static": response.profile.static or [], + "dynamic": response.profile.dynamic or [], + }, + "search_results": search_results, + } + + except asyncio.TimeoutError: + logger.warning("[Supermemory] Profile API timed out after 10s") + raise MemoryRetrievalError("Profile API timed out") + except Exception as e: + logger.error(f"[Supermemory] Error retrieving memories: {e}") + raise MemoryRetrievalError("Failed to retrieve memories", e) + + async def _store_messages(self, messages: List[Dict[str, Any]]) -> None: + """Store messages in Supermemory.""" + if self._supermemory_client is None or not messages: + return + + try: + # Format as conversation transcript + lines = [] + for msg in messages: + role = msg.get("role", "") + content = msg.get("content", "") + if role == "user": + lines.append(f"User: {content}") + elif role == "assistant": + lines.append(f"Assistant: {content}") + + logger.info(f"[Supermemory] Storing {len(messages)} messages to containers={self.container_tags}") + + # Build kwargs for add() call + add_kwargs: Dict[str, Any] = { + "content": "\n".join(lines), + "container_tags": self.container_tags, + "metadata": {"platform": "cartesia"}, + } + + # Add custom_id if provided, fallback to session_id for document grouping + if self.custom_id: + add_kwargs["custom_id"] = self.custom_id + logger.info(f"[Supermemory] Using custom_id={self.custom_id} for document grouping") + elif self.session_id: + add_kwargs["custom_id"] = self.session_id + logger.info(f"[Supermemory] Using session_id={self.session_id} as custom_id for document grouping") + + await self._supermemory_client.add(**add_kwargs) + + logger.info(f"[Supermemory] Successfully stored {len(messages)} messages") + + except Exception as e: + logger.error(f"[Supermemory] Error storing messages: {e}") + + def _build_memory_message(self, memories_data: Dict[str, Any]) -> Optional[str]: + """Build memory context from retrieved data.""" + profile = memories_data["profile"] + deduplicated = deduplicate_memories( + static=profile["static"], + dynamic=profile["dynamic"], + search_results=memories_data["search_results"], + ) + + total = ( + len(deduplicated["static"]) + + len(deduplicated["dynamic"]) + + len(deduplicated["search_results"]) + ) + + if total == 0: + return None + + include_profile = self.config.mode in ("profile", "full") + include_search = self.config.mode in ("query", "full") + + memory_text = format_memories_to_text( + deduplicated, + system_prompt=self.config.system_prompt, + include_static=include_profile, + include_dynamic=include_profile, + include_search=include_search, + ) + + if not memory_text: + return None + + return f"{MEMORY_TAG_START}\n{memory_text}\n{MEMORY_TAG_END}" + + def _extract_user_message(self, event: Any) -> Optional[str]: + """Extract user text from a UserTurnEnded event.""" + if not hasattr(event, 'content'): + return None + + content = event.content + + if isinstance(content, str): + return content + + if isinstance(content, list): + texts = [] + for item in content: + if hasattr(item, 'content') and isinstance(item.content, str): + texts.append(item.content) + elif isinstance(item, str): + texts.append(item) + return " ".join(texts) if texts else None + + if hasattr(content, 'content'): + return str(content.content) + + return str(content) + + def _extract_conversation_from_history(self, history: list) -> List[Dict[str, str]]: + """Extract messages from Cartesia event history.""" + messages = [] + seen = set() + + for item in history: + if isinstance(item, dict): + if item.get("role") in ("user", "assistant"): + content = item.get("content", "") + if content and content not in seen: + messages.append(item) + seen.add(content) + continue + + event_type = getattr(item, 'type', None) or type(item).__name__ + + if event_type in ('user_turn_ended', 'UserTurnEnded'): + nested = getattr(item, 'content', []) + if isinstance(nested, list): + for n in nested: + if hasattr(n, 'content') and isinstance(n.content, str): + if n.content not in seen: + messages.append({"role": "user", "content": n.content}) + seen.add(n.content) + + elif event_type in ('agent_turn_ended', 'AgentTurnEnded'): + nested = getattr(item, 'content', []) + if isinstance(nested, list): + texts = [n.content for n in nested if hasattr(n, 'content') and isinstance(n.content, str)] + if texts: + content = " ".join(texts) + if content not in seen: + messages.append({"role": "assistant", "content": content}) + seen.add(content) + + elif event_type in ('user_text_sent', 'UserTextSent'): + content = getattr(item, 'content', '') + if content and isinstance(content, str) and content not in seen: + messages.append({"role": "user", "content": content}) + seen.add(content) + + elif event_type in ('agent_text_sent', 'AgentTextSent'): + content = getattr(item, 'content', '') + if content and isinstance(content, str) and content not in seen: + messages.append({"role": "assistant", "content": content}) + seen.add(content) + + return messages + + async def _enrich_event_with_memories(self, event: Any) -> tuple[Any, Optional[str]]: + """Enrich event by retrieving memories. + + Returns: + Tuple of (event, memory_context) - memory_context is None if no memories found. + The event is returned unchanged; memory injection happens at the agent level. + """ + user_message = self._extract_user_message(event) + + if not user_message: + logger.warning("[Supermemory] Could not extract user message from event") + return event, None + + if user_message == self._last_query: + return event, None + + self._last_query = user_message + logger.info(f"[Supermemory] Processing user message: {user_message[:50]}...") + + try: + memories_data = await self._retrieve_memories(user_message) + memory_context = self._build_memory_message(memories_data) + + if not memory_context: + logger.info("[Supermemory] No memories found for context injection") + return event, None + + logger.info("[Supermemory] Retrieved memory context for injection") + return event, memory_context + + except MemoryRetrievalError as e: + logger.warning(f"[Supermemory] Memory retrieval failed: {e}") + return event, None + except Exception as e: + logger.error(f"[Supermemory] Error in memory enrichment: {e}") + return event, None + + async def process(self, env: Any, event: Event) -> AsyncGenerator[Event, None]: + """Process events with memory enrichment. + + Args: + env: Turn environment from Cartesia Line. + event: Input event to process. + + Yields: + Output events from the wrapped agent. + """ + try: + if type(event).__name__ == "UserTurnEnded": + logger.info("[Supermemory] Processing UserTurnEnded event") + event, memory_context = await self._enrich_event_with_memories(event) + + # Inject memory context into the agent's system prompt + if memory_context and hasattr(self.agent, 'config'): + original_prompt = getattr(self.agent.config, 'system_prompt', '') + # Remove old memory context if present + if MEMORY_TAG_START in original_prompt: + original_prompt = re.sub( + rf'{re.escape(MEMORY_TAG_START)}.*?{re.escape(MEMORY_TAG_END)}\s*', + '', + original_prompt, + flags=re.DOTALL + ) + # Prepend new memory context + self.agent.config.system_prompt = f"{memory_context}\n\n{original_prompt}" + logger.info("[Supermemory] Injected memory context into system prompt") + + # Store conversation in background + if hasattr(event, 'history') and event.history: + messages = self._extract_conversation_from_history(event.history) + unsent = messages[self._messages_sent_count:] + if unsent: + logger.info(f"[Supermemory] Queuing {len(unsent)} messages for storage") + asyncio.create_task(self._store_messages(unsent)) + self._messages_sent_count = len(messages) + else: + # No history yet, store just the current user message + user_content = self._extract_user_message(event) + if user_content: + logger.info(f"[Supermemory] No history, storing current user message: {user_content[:50]}...") + asyncio.create_task(self._store_messages([{"role": "user", "content": user_content}])) + self._messages_sent_count = 1 # CRITICAL: Increment counter to prevent duplicate storage + + async for output in self.agent.process(env, event): + yield output + else: + async for output in self.agent.process(env, event): + yield output + + except Exception as e: + logger.error(f"[Supermemory] Error in process: {e}") + async for output in self.agent.process(env, event): + yield output + + def reset_memory_tracking(self) -> None: + """Reset memory tracking for a new conversation.""" + self._messages_sent_count = 0 + self._last_query = None + logger.info("[Supermemory] Reset memory tracking state") diff --git a/packages/cartesia-sdk-python/src/supermemory_cartesia/exceptions.py b/packages/cartesia-sdk-python/src/supermemory_cartesia/exceptions.py new file mode 100644 index 00000000..25cd2814 --- /dev/null +++ b/packages/cartesia-sdk-python/src/supermemory_cartesia/exceptions.py @@ -0,0 +1,58 @@ +"""Custom exceptions for Supermemory Cartesia integration.""" + +from typing import Optional + + +class SupermemoryCartesiaError(Exception): + """Base exception for all Supermemory Cartesia errors.""" + + def __init__(self, message: str, original_error: Optional[Exception] = None): + super().__init__(message) + self.message = message + self.original_error = original_error + + def __str__(self) -> str: + if self.original_error: + return f"{self.message}: {self.original_error}" + return self.message + + +class ConfigurationError(SupermemoryCartesiaError): + """Raised when there are configuration issues (e.g., missing API key, invalid params).""" + + +class MemoryRetrievalError(SupermemoryCartesiaError): + """Raised when memory retrieval operations fail.""" + + +class MemoryStorageError(SupermemoryCartesiaError): + """Raised when memory storage operations fail.""" + + +class APIError(SupermemoryCartesiaError): + """Raised when Supermemory API requests fail.""" + + def __init__( + self, + message: str, + status_code: Optional[int] = None, + response_text: Optional[str] = None, + original_error: Optional[Exception] = None, + ): + super().__init__(message, original_error) + self.status_code = status_code + self.response_text = response_text + + def __str__(self) -> str: + parts = [self.message] + if self.status_code: + parts.append(f"Status: {self.status_code}") + if self.response_text: + parts.append(f"Response: {self.response_text}") + if self.original_error: + parts.append(f"Cause: {self.original_error}") + return " | ".join(parts) + + +class NetworkError(SupermemoryCartesiaError): + """Raised when network operations fail.""" diff --git a/packages/cartesia-sdk-python/src/supermemory_cartesia/utils.py b/packages/cartesia-sdk-python/src/supermemory_cartesia/utils.py new file mode 100644 index 00000000..eb366426 --- /dev/null +++ b/packages/cartesia-sdk-python/src/supermemory_cartesia/utils.py @@ -0,0 +1,134 @@ +"""Utility functions for Supermemory Cartesia integration.""" + +from datetime import datetime, timezone +from typing import Any, Dict, List, Union + + +def get_last_user_message(messages: List[Dict[str, str]]) -> str | None: + """Extract the last user message content from a list of messages.""" + for msg in reversed(messages): + if msg["role"] == "user": + return msg["content"] + return None + + +def format_relative_time(iso_timestamp: str) -> str: + """Convert ISO timestamp to relative time string. + + Format rules: + - [just now] - within 30 minutes + - [Xmins ago] - 30-60 minutes + - [X hrs ago] - less than 1 day + - [Xd ago] - less than 1 week + - [X Jul] - more than 1 week, same year + - [X Jul, 2023] - different year + """ + try: + dt = datetime.fromisoformat(iso_timestamp.replace("Z", "+00:00")) + now = datetime.now(timezone.utc) + diff = now - dt + + seconds = diff.total_seconds() + minutes = seconds / 60 + hours = seconds / 3600 + days = seconds / 86400 + + if minutes < 30: + return "just now" + elif minutes < 60: + return f"{int(minutes)}mins ago" + elif hours < 24: + return f"{int(hours)} hrs ago" + elif days < 7: + return f"{int(days)}d ago" + elif dt.year == now.year: + return f"{dt.day} {dt.strftime('%b')}" + else: + return f"{dt.day} {dt.strftime('%b')}, {dt.year}" + except Exception: + return "" + + +def deduplicate_memories( + static: List[str], + dynamic: List[str], + search_results: List[Dict[str, Any]], +) -> Dict[str, Union[List[str], List[Dict[str, Any]]]]: + """Deduplicate memories. Priority: static > dynamic > search. + + Args: + static: List of static memory strings. + dynamic: List of dynamic memory strings. + search_results: List of search result dicts with 'memory' and 'updatedAt'. + """ + seen = set() + + def unique_strings(memories: List[str]) -> List[str]: + out = [] + for m in memories: + if m not in seen: + seen.add(m) + out.append(m) + return out + + def unique_search(results: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + out = [] + for r in results: + memory = r.get("memory", "") + if memory and memory not in seen: + seen.add(memory) + out.append(r) + return out + + return { + "static": unique_strings(static), + "dynamic": unique_strings(dynamic), + "search_results": unique_search(search_results), + } + + +def format_memories_to_text( + memories: Dict[str, Union[List[str], List[Dict[str, Any]]]], + system_prompt: str = "Based on previous conversations, I recall:\n\n", + include_static: bool = True, + include_dynamic: bool = True, + include_search: bool = True, +) -> str: + """Format deduplicated memories into a text string for injection. + + Search results include temporal context (e.g., '3d ago') from updatedAt. + """ + sections = [] + + static = memories["static"] + dynamic = memories["dynamic"] + search_results = memories["search_results"] + + if include_static and static: + sections.append("## User Profile (Persistent)") + sections.append("\n".join(f"- {item}" for item in static)) + + if include_dynamic and dynamic: + sections.append("## Recent Context") + sections.append("\n".join(f"- {item}" for item in dynamic)) + + if include_search and search_results: + sections.append("## Relevant Memories") + lines = [] + for item in search_results: + if isinstance(item, dict): + memory = item.get("memory", "") + updated_at = item.get("updatedAt", "") + time_str = format_relative_time(updated_at) if updated_at else "" + if time_str: + lines.append(f"- [{time_str}] {memory}") + else: + lines.append(f"- {memory}") + else: + lines.append(f"- {item}") + sections.append("\n".join(lines)) + + if not sections: + return "" + + return f"{system_prompt}\n" + "\n\n".join(sections)