diff --git a/apps/cache-sidecar/package.json b/apps/cache-sidecar/package.json new file mode 100644 index 000000000..c9ae25cfa --- /dev/null +++ b/apps/cache-sidecar/package.json @@ -0,0 +1,32 @@ +{ + "name": "@stripe/sync-cache-sidecar", + "version": "0.1.0", + "private": true, + "description": "HTTP sidecar for optimistic balance enforcement over synced Metronome data", + "type": "module", + "exports": { + ".": { + "bun": "./src/index.ts", + "types": "./dist/index.d.ts", + "import": "./dist/index.js" + } + }, + "scripts": { + "build": "tsc", + "dev": "tsx --watch --conditions bun src/index.ts", + "start": "node dist/index.js", + "test": "vitest run", + "test:watch": "vitest" + }, + "dependencies": { + "@hono/node-server": "^1", + "hono": "^4", + "ioredis": "^5", + "zod": "^4.3.6" + }, + "devDependencies": { + "@types/node": "^24.10.1", + "tsx": "^4", + "vitest": "^3.2.4" + } +} diff --git a/apps/cache-sidecar/src/config.ts b/apps/cache-sidecar/src/config.ts new file mode 100644 index 000000000..a0d3a58e9 --- /dev/null +++ b/apps/cache-sidecar/src/config.ts @@ -0,0 +1,22 @@ +import { z } from 'zod' + +const ConfigSchema = z.object({ + REDIS_URL: z.string().default('redis://localhost:56379'), + CHECKPOINT_PREFIX: z.string().default('sync:'), + OPTIMISTIC_PREFIX: z.string().default('optimistic:'), + CREDIT_TYPE_ID: z.string(), + PORT: z.coerce.number().default(4100), + WATERMARK_BUFFER_MS: z.coerce.number().default(10000), + FIXED_EVENT_COST: z.coerce.number().positive().default(1), +}) + +export type Config = z.infer + +export function loadConfig(): Config { + const result = ConfigSchema.safeParse(process.env) + if (!result.success) { + console.error('Invalid configuration:', result.error.format()) + process.exit(1) + } + return result.data +} diff --git a/apps/cache-sidecar/src/index.ts b/apps/cache-sidecar/src/index.ts new file mode 100644 index 000000000..aad7760f0 --- /dev/null +++ b/apps/cache-sidecar/src/index.ts @@ -0,0 +1,31 @@ +import { serve } from '@hono/node-server' +import { loadConfig } from './config.js' +import { FixedCostPricing } from './pricing.js' +import { createRedisClient } from './redis.js' +import { createApp } from './server.js' + +const config = loadConfig() +const redis = createRedisClient(config) +const pricing = new FixedCostPricing(config.FIXED_EVENT_COST) + +const app = createApp({ redis, config, pricing }) + +const server = serve({ fetch: app.fetch, port: config.PORT }, (info) => { + console.log( + JSON.stringify({ + msg: 'cache-sidecar started', + port: info.port, + redis_url: config.REDIS_URL, + }) + ) +}) + +function shutdown() { + console.log(JSON.stringify({ msg: 'shutting down' })) + redis.disconnect() + server.close() + process.exit(0) +} + +process.on('SIGTERM', shutdown) +process.on('SIGINT', shutdown) diff --git a/apps/cache-sidecar/src/pricing.ts b/apps/cache-sidecar/src/pricing.ts new file mode 100644 index 000000000..585ffa1a3 --- /dev/null +++ b/apps/cache-sidecar/src/pricing.ts @@ -0,0 +1,15 @@ +export interface PricingStrategy { + estimateCost(eventType: string, properties?: Record): number +} + +/** + * Fixed-cost pricing: every event costs the same amount of credits. + * Good enough for MVP — swap in a lookup-based strategy later. + */ +export class FixedCostPricing implements PricingStrategy { + constructor(private readonly cost: number) {} + + estimateCost(_eventType: string, _properties?: Record): number { + return this.cost + } +} diff --git a/apps/cache-sidecar/src/redis.ts b/apps/cache-sidecar/src/redis.ts new file mode 100644 index 000000000..57a2379c1 --- /dev/null +++ b/apps/cache-sidecar/src/redis.ts @@ -0,0 +1,65 @@ +import { Redis } from 'ioredis' +import type { Config } from './config.js' + +export interface CheckpointData { + balance: number + customer_id: string + credit_type_id: string + _synced_at: number +} + +export interface PendingEvent { + event_id: string + event_type: string + estimated_cost: number + timestamp: number + properties?: Record +} + +export type { Redis } + +export function createRedisClient(config: Config): Redis { + const client = new Redis(config.REDIS_URL, { maxRetriesPerRequest: 3 }) + client.on('error', (err) => { + console.error(JSON.stringify({ msg: 'redis error', error: err.message })) + }) + return client +} + +export function checkpointKey(config: Config, customerId: string): string { + return `${config.CHECKPOINT_PREFIX}net_balance:${customerId}:${config.CREDIT_TYPE_ID}` +} + +export function pendingSetKey(config: Config, customerId: string): string { + return `${config.OPTIMISTIC_PREFIX}pending:${customerId}` +} + +export async function getCheckpoint( + redis: Redis, + config: Config, + customerId: string +): Promise { + const raw = await redis.get(checkpointKey(config, customerId)) + if (!raw) return null + return JSON.parse(raw) as CheckpointData +} + +export async function getPendingEvents( + redis: Redis, + config: Config, + customerId: string +): Promise { + const members = await redis.zrange(pendingSetKey(config, customerId), 0, -1) + return members.map((m) => JSON.parse(m) as PendingEvent) +} + +export async function sumPendingCosts( + redis: Redis, + config: Config, + customerId: string +): Promise<{ total: number; count: number }> { + const events = await getPendingEvents(redis, config, customerId) + // Sub-cent precision; round to avoid IEEE-754 drift + const total = Math.round(events.reduce((sum, e) => sum + e.estimated_cost, 0) * 100) / 100 + return { total, count: events.length } +} diff --git a/apps/cache-sidecar/src/server.test.ts b/apps/cache-sidecar/src/server.test.ts new file mode 100644 index 000000000..c862f5abc --- /dev/null +++ b/apps/cache-sidecar/src/server.test.ts @@ -0,0 +1,412 @@ +import { describe, it, expect, beforeAll, beforeEach, afterAll } from 'vitest' +import { Redis } from 'ioredis' +import { createApp } from './server.js' +import { FixedCostPricing } from './pricing.js' +import type { Config } from './config.js' +import type { PendingEvent } from './redis.js' +import { checkpointKey, pendingSetKey } from './redis.js' + +// Use a unique prefix per test run to avoid collisions +const TEST_RUN_ID = `test_${Date.now()}_` + +const config: Config = { + REDIS_URL: 'redis://localhost:56379', + CHECKPOINT_PREFIX: `${TEST_RUN_ID}sync:`, + OPTIMISTIC_PREFIX: `${TEST_RUN_ID}optimistic:`, + CREDIT_TYPE_ID: 'test-credit-type-001', + PORT: 4199, + WATERMARK_BUFFER_MS: 10_000, + FIXED_EVENT_COST: 1, +} + +let redis: Redis +let app: ReturnType + +beforeAll(async () => { + redis = new Redis(config.REDIS_URL, { maxRetriesPerRequest: 3 }) + await redis.ping() + const pricing = new FixedCostPricing(config.FIXED_EVENT_COST) + app = createApp({ redis, config, pricing }) +}) + +beforeEach(async () => { + const patterns = [ + `${config.CHECKPOINT_PREFIX}*`, + `${config.OPTIMISTIC_PREFIX}*`, + ] + for (const pattern of patterns) { + let cursor = '0' + do { + const [next, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 200) + cursor = next + if (keys.length > 0) { + await redis.del(...keys) + } + } while (cursor !== '0') + } +}) + +afterAll(async () => { + const patterns = [ + `${config.CHECKPOINT_PREFIX}*`, + `${config.OPTIMISTIC_PREFIX}*`, + ] + for (const pattern of patterns) { + let cursor = '0' + do { + const [next, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 200) + cursor = next + if (keys.length > 0) { + await redis.del(...keys) + } + } while (cursor !== '0') + } + await redis.quit() +}) + +// ─── Helpers ───────────────────────────────────────────────────────────────── + +async function seedCheckpoint(customerId: string, balance: number, syncedAtSec?: number) { + const checkpoint = { + balance, + customer_id: customerId, + credit_type_id: config.CREDIT_TYPE_ID, + _synced_at: syncedAtSec ?? Math.floor(Date.now() / 1000), + } + const key = checkpointKey(config, customerId) + await redis.set(key, JSON.stringify(checkpoint)) + return checkpoint +} + +function request(method: string, path: string, body?: unknown) { + const init: RequestInit = { method } + if (body) { + init.body = JSON.stringify(body) + init.headers = { 'Content-Type': 'application/json' } + } + return app.request(path, init) +} + +// ─── GET /v1/health ────────────────────────────────────────────────────────── + +describe('GET /v1/health', () => { + it('returns ok:true when Redis is connected', async () => { + const res = await request('GET', '/v1/health') + expect(res.status).toBe(200) + const json = await res.json() + expect(json.ok).toBe(true) + expect(json.redis).toBe('connected') + }) + + it('reports last_checkpoint_age_ms when a checkpoint exists', async () => { + const now = Math.floor(Date.now() / 1000) + await seedCheckpoint('cust_health_test', 100, now) + + const res = await request('GET', '/v1/health') + const json = await res.json() + expect(json.ok).toBe(true) + expect(json.last_checkpoint_age_ms).toBeTypeOf('number') + expect(json.last_checkpoint_age_ms).toBeLessThan(5000) + }) + + it('returns 503 when Redis is unreachable', async () => { + const badRedis = new Redis('redis://localhost:1', { + maxRetriesPerRequest: 0, + lazyConnect: true, + connectTimeout: 100, + }) + const badApp = createApp({ + redis: badRedis, + config, + pricing: new FixedCostPricing(1), + }) + + const res = await badApp.request('/v1/health', { method: 'GET' }) + expect(res.status).toBe(503) + const json = await res.json() + expect(json.ok).toBe(false) + expect(json.redis).toBe('disconnected') + + badRedis.disconnect() + }) +}) + +// ─── GET /v1/balance/:customer_id ──────────────────────────────────────────── + +describe('GET /v1/balance/:customer_id', () => { + it('returns 503 when no checkpoint exists', async () => { + const res = await request('GET', '/v1/balance/cust_nonexistent') + expect(res.status).toBe(503) + const json = await res.json() + expect(json.error).toContain('No checkpoint') + }) + + it('returns checkpoint balance with no pending events', async () => { + const now = Math.floor(Date.now() / 1000) + await seedCheckpoint('cust_balance_1', 500, now) + + const res = await request('GET', '/v1/balance/cust_balance_1') + expect(res.status).toBe(200) + const json = await res.json() + expect(json.checkpoint_balance).toBe(500) + expect(json.optimistic_balance).toBe(500) + expect(json.pending_events).toBe(0) + expect(json.confidence).toBe('high') + }) + + it('returns optimistic balance when pending events exist', async () => { + const now = Math.floor(Date.now() / 1000) + await seedCheckpoint('cust_balance_2', 100, now) + + const key = pendingSetKey(config, 'cust_balance_2') + for (let i = 0; i < 3; i++) { + const member = JSON.stringify({ + event_id: `evt_test_${i}`, + event_type: 'api_call', + estimated_cost: 1, + }) + await redis.zadd(key, String(Date.now() + i), member) + } + + const res = await request('GET', '/v1/balance/cust_balance_2') + expect(res.status).toBe(200) + const json = await res.json() + expect(json.checkpoint_balance).toBe(100) + expect(json.optimistic_balance).toBe(97) + expect(json.pending_events).toBe(3) + }) + + it('prunes stale events on read (inline reconciliation)', async () => { + const now = Math.floor(Date.now() / 1000) + await seedCheckpoint('cust_inline_recon', 100, now) + + const key = pendingSetKey(config, 'cust_inline_recon') + + // Old event (before cutoff: _synced_at*1000 - WATERMARK_BUFFER_MS) + const oldTs = (now * 1000) - config.WATERMARK_BUFFER_MS - 5000 + await redis.zadd(key, String(oldTs), JSON.stringify({ + event_id: 'evt_old', + event_type: 'api_call', + estimated_cost: 1, + })) + + // Recent event (after cutoff) + await redis.zadd(key, String(Date.now()), JSON.stringify({ + event_id: 'evt_new', + event_type: 'api_call', + estimated_cost: 1, + })) + + const res = await request('GET', '/v1/balance/cust_inline_recon') + const json = await res.json() + expect(json.optimistic_balance).toBe(99) // only new event counted + expect(json.pending_events).toBe(1) + + // Verify old event was actually removed from Redis + const remaining = await redis.zrange(key, 0, -1) + expect(remaining).toHaveLength(1) + expect(JSON.parse(remaining[0]!).event_id).toBe('evt_new') + }) + + it('confidence is "high" when checkpoint is fresh (<30s)', async () => { + const now = Math.floor(Date.now() / 1000) + await seedCheckpoint('cust_conf_high', 100, now) + + const res = await request('GET', '/v1/balance/cust_conf_high') + const json = await res.json() + expect(json.confidence).toBe('high') + }) + + it('confidence is "medium" when checkpoint is 30s-120s old', async () => { + const sixtySecondsAgo = Math.floor(Date.now() / 1000) - 60 + await seedCheckpoint('cust_conf_med', 100, sixtySecondsAgo) + + const res = await request('GET', '/v1/balance/cust_conf_med') + const json = await res.json() + expect(json.confidence).toBe('medium') + }) + + it('confidence is "low" when checkpoint is >120s old', async () => { + const threeMinutesAgo = Math.floor(Date.now() / 1000) - 180 + await seedCheckpoint('cust_conf_low', 100, threeMinutesAgo) + + const res = await request('GET', '/v1/balance/cust_conf_low') + const json = await res.json() + expect(json.confidence).toBe('low') + }) +}) + +// ─── POST /v1/events ───────────────────────────────────────────────────────── + +describe('POST /v1/events', () => { + it('rejects missing customer_id', async () => { + const res = await request('POST', '/v1/events', { event_type: 'api_call' }) + expect(res.status).toBe(400) + const json = await res.json() + expect(json.error).toContain('customer_id') + }) + + it('rejects missing event_type', async () => { + const res = await request('POST', '/v1/events', { customer_id: 'cust_1' }) + expect(res.status).toBe(400) + const json = await res.json() + expect(json.error).toContain('event_type') + }) + + it('generates event_id and returns estimated cost', async () => { + await seedCheckpoint('cust_evt_1', 50) + + const res = await request('POST', '/v1/events', { + customer_id: 'cust_evt_1', + event_type: 'api_call', + }) + expect(res.status).toBe(200) + const json = await res.json() + expect(json.event_id).toMatch(/^evt_/) + expect(json.estimated_cost).toBe(1) + expect(json.optimistic_balance).toBe(49) + expect(json.pending_events).toBe(1) + }) + + it('uses idempotency_key as event_id when provided', async () => { + await seedCheckpoint('cust_evt_2', 50) + + const res = await request('POST', '/v1/events', { + customer_id: 'cust_evt_2', + event_type: 'api_call', + idempotency_key: 'my-custom-key-123', + }) + expect(res.status).toBe(200) + const json = await res.json() + expect(json.event_id).toBe('my-custom-key-123') + }) + + it('decrements balance with each event', async () => { + await seedCheckpoint('cust_evt_3', 10) + + await request('POST', '/v1/events', { + customer_id: 'cust_evt_3', + event_type: 'api_call', + }) + await request('POST', '/v1/events', { + customer_id: 'cust_evt_3', + event_type: 'api_call', + }) + + const res = await request('GET', '/v1/balance/cust_evt_3') + const json = await res.json() + expect(json.optimistic_balance).toBe(8) + expect(json.pending_events).toBe(2) + }) + + it('works even without a checkpoint (balance defaults to 0)', async () => { + const res = await request('POST', '/v1/events', { + customer_id: 'cust_no_checkpoint', + event_type: 'api_call', + }) + expect(res.status).toBe(200) + const json = await res.json() + expect(json.optimistic_balance).toBe(-1) + }) + + it('deduplicates events by idempotency_key', async () => { + await seedCheckpoint('cust_dedup', 100) + + const res1 = await request('POST', '/v1/events', { + customer_id: 'cust_dedup', + event_type: 'api_call', + idempotency_key: 'same_key', + }) + const json1 = await res1.json() + expect(json1.optimistic_balance).toBe(99) + + const res2 = await request('POST', '/v1/events', { + customer_id: 'cust_dedup', + event_type: 'api_call', + idempotency_key: 'same_key', + }) + const json2 = await res2.json() + expect(json2.duplicate).toBe(true) + expect(json2.optimistic_balance).toBe(99) + expect(json2.pending_events).toBe(1) + }) + + it('passes properties through to the pending event', async () => { + await seedCheckpoint('cust_props', 50) + + await request('POST', '/v1/events', { + customer_id: 'cust_props', + event_type: 'api_call', + properties: { model: 'gpt-4', tokens: 500 }, + }) + + const key = pendingSetKey(config, 'cust_props') + const members = await redis.zrange(key, 0, -1) + expect(members).toHaveLength(1) + const stored = JSON.parse(members[0]!) + expect(stored.properties).toEqual({ model: 'gpt-4', tokens: 500 }) + }) +}) + +// ─── FixedCostPricing ──────────────────────────────────────────────────────── + +describe('FixedCostPricing', () => { + it('returns configured cost for any event type', () => { + const pricing = new FixedCostPricing(5) + expect(pricing.estimateCost('api_call')).toBe(5) + expect(pricing.estimateCost('webhook')).toBe(5) + expect(pricing.estimateCost('anything', { foo: 'bar' })).toBe(5) + }) + + it('supports fractional cost', () => { + const pricing = new FixedCostPricing(0.25) + expect(pricing.estimateCost('api_call')).toBe(0.25) + }) +}) + +// ─── Full Flow ─────────────────────────────────────────────────────────────── + +describe('full flow: checkpoint -> events -> balance -> new checkpoint -> prune', () => { + it('end-to-end optimistic balance lifecycle', async () => { + const customerId = 'cust_e2e_flow' + const now = Math.floor(Date.now() / 1000) + + // Step 1: No checkpoint — balance returns 503 + const r1 = await request('GET', `/v1/balance/${customerId}`) + expect(r1.status).toBe(503) + + // Step 2: Checkpoint arrives + await seedCheckpoint(customerId, 1000, now) + + // Step 3: Balance shows full checkpoint amount + const r2 = await request('GET', `/v1/balance/${customerId}`) + const b2 = await r2.json() + expect(b2.checkpoint_balance).toBe(1000) + expect(b2.optimistic_balance).toBe(1000) + + // Step 4: User sends 3 events + for (let i = 0; i < 3; i++) { + await request('POST', '/v1/events', { + customer_id: customerId, + event_type: 'api_call', + }) + } + + // Step 5: Balance reflects pending deductions + const r3 = await request('GET', `/v1/balance/${customerId}`) + const b3 = await r3.json() + expect(b3.optimistic_balance).toBe(997) + expect(b3.pending_events).toBe(3) + + // Step 6: New checkpoint arrives (Metronome processed the events) + const laterSyncedAt = now + 30 + await seedCheckpoint(customerId, 997, laterSyncedAt) + + // Step 7: GET /v1/balance triggers inline reconciliation — prunes old events + const r4 = await request('GET', `/v1/balance/${customerId}`) + const b4 = await r4.json() + expect(b4.checkpoint_balance).toBe(997) + expect(b4.optimistic_balance).toBe(997) + expect(b4.pending_events).toBe(0) + }) +}) diff --git a/apps/cache-sidecar/src/server.ts b/apps/cache-sidecar/src/server.ts new file mode 100644 index 000000000..78d7d81f4 --- /dev/null +++ b/apps/cache-sidecar/src/server.ts @@ -0,0 +1,153 @@ +import { Hono } from 'hono' +import type { Config } from './config.js' +import type { Redis } from './redis.js' +import type { PricingStrategy } from './pricing.js' +import { + getCheckpoint, + pendingSetKey, + sumPendingCosts, +} from './redis.js' +import type { PendingEvent } from './redis.js' + +export interface ServerDeps { + redis: Redis + config: Config + pricing: PricingStrategy +} + +export function createApp(deps: ServerDeps): Hono { + const { redis, config, pricing } = deps + const app = new Hono() + + // Health check + app.get('/v1/health', async (c) => { + try { + await redis.ping() + let lastCheckpointAgeMs: number | null = null + const pattern = `${config.CHECKPOINT_PREFIX}net_balance:*` + let cursor = '0' + let foundKey: string | null = null + do { + const [nextCursor, keys] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100) + cursor = nextCursor + if (keys.length > 0) { + foundKey = keys[0]! + break + } + } while (cursor !== '0') + + if (foundKey) { + const raw = await redis.get(foundKey) + if (raw) { + const data = JSON.parse(raw) + const syncedAtMs = Number(data._synced_at) * 1000 + lastCheckpointAgeMs = Date.now() - syncedAtMs + } + } + return c.json({ + ok: true, + redis: 'connected', + last_checkpoint_age_ms: lastCheckpointAgeMs, + }) + } catch { + return c.json({ ok: false, redis: 'disconnected', last_checkpoint_age_ms: null }, 503) + } + }) + + // Get balance (inline reconciliation: prune stale events, compute fresh) + app.get('/v1/balance/:customer_id', async (c) => { + const customerId = c.req.param('customer_id') + const checkpoint = await getCheckpoint(redis, config, customerId) + + if (!checkpoint) { + return c.json( + { error: 'No checkpoint available. Start the sync engine.' }, + 503 + ) + } + + const syncedAtMs = checkpoint._synced_at * 1000 + const cutoff = syncedAtMs - config.WATERMARK_BUFFER_MS + + // Prune events older than cutoff (inline reconciliation) + const key = pendingSetKey(config, customerId) + await redis.zremrangebyscore(key, '-inf', cutoff) + + // Compute balance from remaining pending events + const { total, count } = await sumPendingCosts(redis, config, customerId) + const optimisticBalance = checkpoint.balance - total + + const ageMs = Date.now() - syncedAtMs + let confidence: 'high' | 'medium' | 'low' + if (ageMs < 30_000) { + confidence = 'high' + } else if (ageMs < 120_000) { + confidence = 'medium' + } else { + confidence = 'low' + } + + return c.json({ + checkpoint_balance: checkpoint.balance, + optimistic_balance: optimisticBalance, + pending_events: count, + last_checkpoint_at: Math.floor(syncedAtMs / 1000), + confidence, + }) + }) + + // Record event + app.post('/v1/events', async (c) => { + const body = await c.req.json() + const { customer_id, event_type, properties, idempotency_key } = body + + if (!customer_id) { + return c.json({ error: 'customer_id is required' }, 400) + } + if (!event_type) { + return c.json({ error: 'event_type is required' }, 400) + } + + const eventId = + idempotency_key || `evt_${Date.now()}_${Math.random().toString(36).slice(2, 10)}` + const estimatedCost = pricing.estimateCost(event_type, properties) + const timestamp = Date.now() + + const key = pendingSetKey(config, customer_id) + // Member is deterministic by event_id — excludes timestamp so retries with same + // idempotency_key don't create duplicates regardless of when they arrive + const member = JSON.stringify({ + event_id: eventId, + event_type, + estimated_cost: estimatedCost, + properties, + }) + + const added = await redis.zadd(key, 'NX', timestamp, member) + if (added === 0) { + const checkpoint = await getCheckpoint(redis, config, customer_id) + const { total, count } = await sumPendingCosts(redis, config, customer_id) + return c.json({ + event_id: eventId, + estimated_cost: estimatedCost, + optimistic_balance: (checkpoint?.balance ?? 0) - total, + pending_events: count, + duplicate: true, + }) + } + + const checkpoint = await getCheckpoint(redis, config, customer_id) + const checkpointBalance = checkpoint?.balance ?? 0 + const { total, count } = await sumPendingCosts(redis, config, customer_id) + const optimisticBalance = checkpointBalance - total + + return c.json({ + event_id: eventId, + estimated_cost: estimatedCost, + optimistic_balance: optimisticBalance, + pending_events: count, + }) + }) + + return app +} diff --git a/apps/cache-sidecar/tsconfig.json b/apps/cache-sidecar/tsconfig.json new file mode 100644 index 000000000..5edad813e --- /dev/null +++ b/apps/cache-sidecar/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src"], + "exclude": ["src/**/*.test.ts"] +} diff --git a/apps/cache-sidecar/vitest.config.ts b/apps/cache-sidecar/vitest.config.ts new file mode 100644 index 000000000..84e97a269 --- /dev/null +++ b/apps/cache-sidecar/vitest.config.ts @@ -0,0 +1,8 @@ +import { defineConfig } from 'vitest/config' + +export default defineConfig({ + test: { + testTimeout: 30_000, + hookTimeout: 15_000, + }, +}) diff --git a/apps/service/package.json b/apps/service/package.json index 5b8e571aa..0c359c7f9 100644 --- a/apps/service/package.json +++ b/apps/service/package.json @@ -32,12 +32,14 @@ "@scalar/hono-api-reference": "^0.6", "@stripe/sync-destination-google-sheets": "workspace:*", "@stripe/sync-destination-postgres": "workspace:*", + "@stripe/sync-destination-redis": "workspace:*", "@stripe/sync-destination-sqlite": "workspace:*", "@stripe/sync-destination-stripe": "workspace:*", "@stripe/sync-engine": "workspace:*", "@stripe/sync-hono-zod-openapi": "workspace:*", "@stripe/sync-logger": "workspace:*", "@stripe/sync-protocol": "workspace:*", + "@stripe/sync-source-metronome": "workspace:*", "@stripe/sync-source-postgres": "workspace:*", "@stripe/sync-source-stripe": "workspace:*", "@stripe/sync-ts-cli": "workspace:*", diff --git a/apps/service/src/cli.ts b/apps/service/src/cli.ts index a000e7204..cd27d7ac2 100644 --- a/apps/service/src/cli.ts +++ b/apps/service/src/cli.ts @@ -8,8 +8,10 @@ import { createPrettyFormatter } from './cli/pretty-output.js' import { serve } from '@hono/node-server' import { createConnectorResolver, startApiServer, type ApiServerHandle } from '@stripe/sync-engine' import sourceStripe from '@stripe/sync-source-stripe' +import sourceMetronome from '@stripe/sync-source-metronome' import sourcePostgres from '@stripe/sync-source-postgres' import destinationPostgres from '@stripe/sync-destination-postgres' +import destinationRedis from '@stripe/sync-destination-redis' import destinationSqlite from '@stripe/sync-destination-sqlite' import destinationGoogleSheets from '@stripe/sync-destination-google-sheets' import destinationStripe from '@stripe/sync-destination-stripe' @@ -29,9 +31,10 @@ import { log } from './logger.js' const defaultDataDir = process.env.DATA_DIR ?? `${homedir()}/.stripe-sync` const resolverPromise = createConnectorResolver({ - sources: { stripe: sourceStripe, postgres: sourcePostgres }, + sources: { stripe: sourceStripe, metronome: sourceMetronome, postgres: sourcePostgres }, destinations: { postgres: destinationPostgres, + redis: destinationRedis, sqlite: destinationSqlite, google_sheets: destinationGoogleSheets, stripe: destinationStripe, diff --git a/docs/plans/2026-05-11-cache-sidecar-architecture.md b/docs/plans/2026-05-11-cache-sidecar-architecture.md new file mode 100644 index 000000000..99d9d46a9 --- /dev/null +++ b/docs/plans/2026-05-11-cache-sidecar-architecture.md @@ -0,0 +1,274 @@ +# Cache Sidecar Architecture + +**Status:** Prototype (working MVP) +**Date:** 2026-05-11 +**Context:** Standalone sidecar for real-time UBB entitlement enforcement with optimistic writes + +--- + +## TL;DR + +- Standalone Node.js HTTP process (`apps/cache-sidecar`) sits between apps and Redis, providing sub-100ms entitlement checks by merging sync engine checkpoints with locally tracked usage events. +- Two Redis namespaces with strict ownership: `sync:*` (sync engine writes, sidecar reads) and `optimistic:*` (sidecar writes, apps never touch). +- Ships Metronome UBB-specific. Not general-purpose until a second use case proves the abstraction. + +--- + +## Problem + +Apps need real-time "can this customer do X?" answers for usage-based billing. The sync engine pipeline (source-metronome -> destination-redis) gives periodic balance snapshots, but: + +1. **Staleness gap.** Checkpoints arrive every 30-60s. Customers can blow past limits between updates. +2. **No write path.** Apps emit usage events that affect balance immediately. Checkpoints don't reflect pending work. +3. **Complexity leaks.** If every app reads Redis and does its own optimistic math, you get N inconsistent implementations with different failure modes. + +--- + +## Solution + +A speculative balance cache with bounded staleness. The sidecar reads immutable checkpoints from the sync pipeline, accepts usage events from apps via HTTP, and computes a merged balance accounting for both. Reconciliation happens inline on each balance read — prunes events older than the checkpoint's processing window, recomputes the optimistic delta. + +``` + +------------------+ + | Metronome API | + +--------+---------+ + | + (sync engine pipeline) + | + v + +--------+---------+ + | destination-redis| + | sync:* ns | + +--------+---------+ + | + (sidecar reads on demand) + | + v ++----------+ +---------+----------+ +----------+ +| App A |--HTTP----->| cache-sidecar |<--HTTP----| App B | ++----------+ | | +----------+ + | optimistic:* ns | + | (own Redis writes) | + +--------------------+ +``` + +Apps never touch Redis or Metronome directly. The sidecar is the single entitlement authority. + +--- + +## Redis Schema (as implemented) + +### `sync:*` namespace (owned by sync engine, read-only to sidecar) + +| Key Pattern | Type | Value | +|-------------|------|-------| +| `sync:net_balance:{customer_id}:{credit_type_id}` | String (JSON) | `{ balance, customer_id, credit_type_id, _synced_at }` | + +`_synced_at` is a unix timestamp (seconds) added by source-metronome when it polls getNetBalance. + +### `optimistic:*` namespace (owned by sidecar, never touched by sync engine) + +| Key Pattern | Type | Members | +|-------------|------|---------| +| `optimistic:pending:{customer_id}` | Sorted Set | score = timestamp (ms), member = JSON `{ event_id, event_type, estimated_cost, properties }` | + +Design notes: +- Sorted sets give O(log N) range deletion during reconciliation. +- Member is deterministic by event_id — retries with same idempotency_key don't create duplicates. +- The sidecar NEVER writes to `sync:*`. The sync engine NEVER writes to `optimistic:*`. + +--- + +## API Surface (as implemented) + +| Method | Path | Purpose | +|--------|------|---------| +| `GET` | `/v1/health` | Liveness + last checkpoint age | +| `GET` | `/v1/balance/:customer_id` | Speculative balance (inline reconciliation) | +| `POST` | `/v1/events` | Record pending usage event | + +### `GET /v1/health` + +```json +{ "ok": true, "redis": "connected", "last_checkpoint_age_ms": 12345 } +``` + +### `GET /v1/balance/:customer_id` + +Performs inline reconciliation (prunes stale events), then returns merged balance. + +```json +{ + "checkpoint_balance": 498.75, + "optimistic_balance": 497.73, + "pending_events": 3, + "last_checkpoint_at": 1715000000, + "confidence": "high" +} +``` + +Confidence: `high` < 30s stale, `medium` < 120s, `low` otherwise. + +### `POST /v1/events` + +Request: +```json +{ "customer_id": "cus_X", "event_type": "api_call", "properties": { "model": "gpt-4" }, "idempotency_key": "evt_Z" } +``` + +Response: +```json +{ "event_id": "evt_Z", "estimated_cost": 1, "optimistic_balance": 496.73, "pending_events": 4 } +``` + +Deduplication via `idempotency_key` — if already seen, returns `"duplicate": true` with current balance. + +--- + +## Reconciliation (as implemented) + +Reconciliation happens **inline on every GET /balance request**, not via a background loop. + +Logic: +1. Read checkpoint for customer (from `sync:net_balance:{customer_id}:{credit_type_id}`) +2. Compute cutoff: `checkpoint._synced_at * 1000 - WATERMARK_BUFFER_MS` +3. `ZREMRANGEBYSCORE` to prune all pending events with timestamp <= cutoff +4. Sum remaining pending costs, subtract from checkpoint balance + +`WATERMARK_BUFFER_MS` (default 10s) is a safety buffer accounting for Metronome processing lag — events within this window are kept even if they're older than the checkpoint, because we can't be sure Metronome has processed them yet. + +### Known limitation + +The current approach is time-based pruning with a fixed buffer. There is no way to know definitively which events are reflected in a given checkpoint value. See Reconciliation Strategies below for the full analysis. + +--- + +## Reconciliation Strategies + +**Core framing:** For paying customers, denying valid usage is WORSE than allowing brief overage. Overage is billable and self-corrects on the next checkpoint. Denial loses revenue and erodes trust. Exception: free-tier and fraud-prevention contexts need strict enforcement (never permissive). + +**Background:** Event processing has variable lag and out-of-order delivery is real. External apps may send events the sidecar never sees. + +### Comparison + +| Strategy | Overage Risk (brief permissive) | False Denial Risk | Deploy Time | Requires Metronome? | +|----------|--------------------------------|-------------------|-------------|---------------------| +| Time-based pruning + TTL floor | Low, bounded by buffer+TTL tuning | Low in lenient mode; near-zero in strict | Days | No | +| Balance-delta pruning | Medium, on refunds/credits/external events | Medium, same non-monotonic scenarios | Days | No | +| Event confirmation | None | None | Weeks | No (uses existing API) | +| Watermark-based | None (strict mode) | None | Blocked | Yes (not shipped) | + +### Time-based pruning with TTL floor + +Merged strategy. Prune an event only when BOTH conditions hold: `event_timestamp < checkpoint._synced_at - buffer` AND `event_timestamp < now() - TTL`. The checkpoint is the primary signal -- it reflects Metronome's confirmed state, so we prune based on what Metronome has actually processed. The TTL is a safety net -- it bounds accumulation when the sync engine is down or checkpoints stop arriving. + +- **Overage risk:** Low. Brief permissive window possible if buffer is too short relative to partition lag. Tunable per-customer. +- **False denial risk:** Low in lenient mode (short buffer, moderate TTL). Near-zero in strict mode (long TTL, conservative buffer) at the cost of pessimistic drift. +- **Right choice:** Default for paying customers (lenient). Also works for free-tier/fraud with conservative tuning (long TTL, short buffer, strict mode). +- **Deploy:** Days. No external dependencies. Current implementation uses the checkpoint-relative prune only (no TTL floor yet). + +### Balance-delta pruning + +Compare consecutive checkpoint values. The delta implies consumed credits. Remove oldest pending events summing to that delta. + +- **Overage risk:** Medium. Breaks on refunds, credit grants, plan changes, or external event sources -- any non-monotonic balance movement. +- **False denial risk:** Medium. Same non-monotonic scenarios can cause under-pruning or over-pruning in either direction. +- **Right choice:** Single-source monotonic meters with no manual adjustments. Narrow use case. +- **Deploy:** Days. No external dependencies. + +### Event confirmation (usage query) + +Query Metronome's usage API to confirm specific events appear in aggregated usage before pruning. Deterministic: only prune what you can prove is processed. + +- **Overage risk:** None. +- **False denial risk:** None. +- **Right choice:** High-value VIP customers where even brief pessimistic drift is unacceptable and API cost is justified. +- **Deploy:** Weeks. Requires usage API integration + rate limit management. No Metronome changes. + +### Watermark-based + +Metronome exposes `watermark_low` / `watermark_high` in balance responses. Prune events <= `watermark_low` (confirmed processed). Events between low and high are ambiguous. + +- **Overage risk:** None in strict mode. Tiny in lenient mode (ambiguous-zone events). +- **False denial risk:** None. +- **Right choice:** Long-term production steady-state. Precise, minimal drift, no polling overhead. +- **Deploy:** Blocked. Watermark metadata in balance responses is pending vendor support. Once available, days to integrate. + +--- + +## Pricing Strategy (as implemented) + +Current MVP uses `FIXED_EVENT_COST` (default: 1). This is known to be inaccurate — real unit cost observed via Preview Events API is 0.01 (sub-cent per api_call). + +See the main EP doc for the full pricing strategy comparison. + +--- + +## Configuration + +| Env Var | Default | Purpose | +|---------|---------|---------| +| `REDIS_URL` | `redis://localhost:56379` | Redis connection | +| `CHECKPOINT_PREFIX` | `sync:` | Key prefix for sync engine checkpoints | +| `OPTIMISTIC_PREFIX` | `optimistic:` | Key prefix for sidecar state | +| `CREDIT_TYPE_ID` | (required) | Metronome credit type to track | +| `PORT` | `4100` | HTTP server port | +| `WATERMARK_BUFFER_MS` | `10000` | Safety buffer for time-based pruning (ms) | +| `FIXED_EVENT_COST` | `1` | Cost deducted per event (known inaccurate, see pricing strategies) | + +--- + +## Data Flow + +End-to-end sequence (as implemented): + +``` +1. App emits event to Metronome (normal ingest path, unchanged) +2. App calls sidecar: POST /v1/events { customer_id, event_type, idempotency_key } +3. Sidecar estimates cost (fixed cost strategy) +4. Sidecar: ZADD optimistic:pending:{customer_id} +5. Sidecar reads checkpoint + sums pending, returns optimistic balance +6. App calls: GET /v1/balance/{customer_id} -> inline reconciliation + fresh balance + + ... 30-60s pass ... + +7. Sync engine polls Metronome getNetBalance API +8. destination-redis writes sync:net_balance:{cid}:{credit_type_id} +9. Next GET /balance reads fresh checkpoint, prunes stale events, returns reconciled balance +``` + +--- + +## What the prototype proves + +- Optimistic balance converges to checkpoint value once pending events are reconciled +- Deduplication works via Redis sorted set (ZADD NX by event_id) +- Sync engine pipeline runs e2e with real Metronome data +- Sub-50ms response times on balance reads and event writes + +--- + +## Gaps + +1. **Pricing accuracy.** Fixed cost is 100x off from reality (1 vs 0.01). Need rate card sync or Preview API integration. +2. **Reconciliation accuracy.** Time-based buffer is a guess. No way to know which events a checkpoint reflects. +3. **No TTL floor.** If sync engine stops, pending events accumulate forever. +4. **Single credit type.** Config takes one CREDIT_TYPE_ID. Multi-credit-type customers need multiple instances or config changes. + +--- + +## Open Questions + +1. **Preview API latency budget.** Preview Events API exists but throughput constraints are TBD. If p99 > 200ms, do we block the POST /events caller or return immediately with cached/estimated price and reconcile later? +2. **Watermark availability.** Watermark metadata in balance responses is pending vendor support. If delayed, we stay on time-based pruning. +3. **Tier boundary pricing.** If a customer is near a tier edge, the price-per-unit changes. Where does tier config live? +4. **Multi-instance coordination.** If we run N sidecar instances, is Redis the coordination layer (idempotent writes by event_id), or do we need leader election? + +--- + +## Non-Goals + +- **Not a general-purpose optimistic cache framework.** Metronome UBB only. Extract generic patterns after second use case. +- **Not inside the sync engine protocol.** Sync engine is unidirectional (source -> destination). The sidecar is a separate process reading sync output. +- **Not a replacement for Metronome.** Sidecar provides speculative answers between checkpoints. Metronome remains source of truth. +- **Not handling billing or invoicing.** We report balances and enforce limits. We don't compute invoices. diff --git a/examples/pixel-app/server.js b/examples/pixel-app/server.js index 5f01736be..c61fdf7d0 100644 --- a/examples/pixel-app/server.js +++ b/examples/pixel-app/server.js @@ -1,23 +1,18 @@ /** - * PixelDraw — Metronome + Redis entitlement demo. - * - * Each pixel drawn sends a usage event to Metronome (color = event type). - * Credit balance is checked in Redis — synced from Metronome via sync-engine. - * NO local state in Redis. The only data is replicated from Metronome. + * PixelDraw — Metronome + Cache Sidecar entitlement demo. * * Architecture: - * Browser → POST /api/draw → check Metronome-synced Redis balance → send usage to Metronome - * Metronome → webhook → source-metronome → destination-redis (keeps Redis fresh) + * Browser → POST /api/draw → check balance via cache-sidecar → sidecar records event + forwards to Metronome + * Sync Engine (separate process) keeps Redis fresh via source-metronome → destination-redis pipeline + * Cache Sidecar manages optimistic balance enforcement over the synced data * * Env vars: - * METRONOME_API_TOKEN — Metronome bearer token * METRONOME_CUSTOMER_ID — Customer ID in Metronome - * REDIS_URL — Redis connection (default: redis://localhost:56379) + * SIDECAR_URL — Cache sidecar URL (default: http://localhost:4100) * PORT — Server port (default: 4000) */ import express from 'express' -import { Redis } from 'ioredis' import { fileURLToPath } from 'node:url' import { dirname, join } from 'node:path' @@ -28,113 +23,69 @@ app.use(express.json()) app.use(express.static(join(__dirname, 'public'))) const PORT = process.env.PORT || 4000 -const METRONOME_API_TOKEN = process.env.METRONOME_API_TOKEN const METRONOME_CUSTOMER_ID = process.env.METRONOME_CUSTOMER_ID -const METRONOME_BASE_URL = process.env.METRONOME_BASE_URL || 'https://api.metronome.com' -const REDIS_URL = process.env.REDIS_URL || 'redis://localhost:56379' -const KEY_PREFIX = process.env.KEY_PREFIX || 'sync:' +const SIDECAR_URL = process.env.SIDECAR_URL || 'http://localhost:4100' -if (!METRONOME_API_TOKEN) { - console.error('ERROR: Set METRONOME_API_TOKEN') - process.exit(1) -} if (!METRONOME_CUSTOMER_ID) { console.error('ERROR: Set METRONOME_CUSTOMER_ID') process.exit(1) } -const redis = new Redis(REDIS_URL) - -// ---- Redis reads (Metronome-synced data only) ---- - -/** Get credit balance from Metronome-synced grant data in Redis */ -async function getCreditBalance() { - const keys = await scanKeys(`${KEY_PREFIX}credit_grants:*`) - let balance = 0 - - for (const key of keys) { - const raw = await redis.get(key) - if (!raw) continue - const grant = JSON.parse(raw) - if (grant.customer_id !== METRONOME_CUSTOMER_ID) continue - balance += grant.balance?.including_pending ?? 0 - } - - return balance -} +// ---- Sidecar calls ---- -/** Get entitlement for a specific product from Redis */ -async function getEntitlement(productName) { - const keys = await scanKeys(`${KEY_PREFIX}entitlements:${METRONOME_CUSTOMER_ID}:*`) - for (const key of keys) { - const raw = await redis.get(key) - if (!raw) continue - const ent = JSON.parse(raw) - if (ent.product_name === productName) { - return ent - } +async function getBalance() { + const res = await fetch(`${SIDECAR_URL}/v1/balance/${METRONOME_CUSTOMER_ID}`) + if (!res.ok) { + if (res.status === 503) return null + const text = await res.text() + throw new Error(`Sidecar balance failed: ${res.status} ${text}`) } - return null + return await res.json() } -/** Scan Redis keys matching a pattern */ -async function scanKeys(pattern) { - const keys = [] - let cursor = '0' - do { - const [next, batch] = await redis.scan(cursor, 'MATCH', pattern, 'COUNT', 100) - cursor = next - keys.push(...batch) - } while (cursor !== '0') - return keys -} - -// ---- Metronome usage ingestion ---- - -async function ingestUsage(color) { - const res = await fetch(`${METRONOME_BASE_URL}/v1/ingest`, { +async function recordEvent(color) { + const res = await fetch(`${SIDECAR_URL}/v1/events`, { method: 'POST', - headers: { - Authorization: `Bearer ${METRONOME_API_TOKEN}`, - 'Content-Type': 'application/json', - }, - body: JSON.stringify([ - { - customer_id: METRONOME_CUSTOMER_ID, - event_type: 'pixel_draw', - timestamp: new Date().toISOString(), - transaction_id: `px_${Date.now()}_${Math.random().toString(36).slice(2, 8)}`, - properties: { color }, - }, - ]), + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ + customer_id: METRONOME_CUSTOMER_ID, + event_type: 'pixel_draw', + properties: { color }, + }), }) - if (!res.ok) { const text = await res.text() - throw new Error(`Metronome ingest failed: ${res.status} ${text}`) + throw new Error(`Sidecar event failed: ${res.status} ${text}`) } + return await res.json() } // ---- API routes ---- -/** Health check */ +/** Health check — proxies to sidecar */ app.get('/api/health', async (_req, res) => { try { - await redis.ping() - res.json({ ok: true, redis: 'connected' }) + const sidecarRes = await fetch(`${SIDECAR_URL}/v1/health`) + const data = await sidecarRes.json() + res.json({ ok: data.ok, sidecar: 'connected', redis: data.redis }) } catch { - res.status(503).json({ ok: false, redis: 'disconnected' }) + res.status(503).json({ ok: false, sidecar: 'disconnected' }) } }) -/** Get current credit balance + entitlements from Metronome-synced Redis */ +/** Get current credit balance via sidecar */ app.get('/api/credits', async (_req, res) => { - const balance = await getCreditBalance() - const entitlement = await getEntitlement('API Access') + const data = await getBalance() + if (data === null) { + return res.status(503).json({ + error: 'Balance not available. Start the sync engine to populate Redis.', + }) + } res.json({ - balance, - entitled: entitlement?.entitled ?? false, - product: entitlement?.product_name ?? null, + balance: data.optimistic_balance, + checkpoint_balance: data.checkpoint_balance, + pending_events: data.pending_events, + confidence: data.confidence, }) }) @@ -145,9 +96,15 @@ app.post('/api/draw', async (req, res) => { return res.status(400).json({ error: 'color, x, y required' }) } - // 1. Check Metronome-synced credit balance in Redis - const balance = await getCreditBalance() - if (balance <= 0) { + // 1. Check optimistic balance via sidecar + const balanceData = await getBalance() + if (balanceData === null) { + return res.status(503).json({ + allowed: false, + error: 'Balance not available. Start the sync engine to populate Redis.', + }) + } + if (balanceData.optimistic_balance <= 0) { return res.status(402).json({ allowed: false, error: 'Out of credits', @@ -155,18 +112,28 @@ app.post('/api/draw', async (req, res) => { }) } - // 2. Send usage event to Metronome (async, don't block response) - ingestUsage(color).catch((err) => { - console.error('Usage ingest error:', err.message) - }) - - res.json({ - allowed: true, - balance, - color, - x, - y, - }) + // 2. Record event via sidecar (handles Metronome forwarding internally) + try { + const eventResult = await recordEvent(color) + res.json({ + allowed: true, + balance: eventResult.optimistic_balance, + pending_events: eventResult.pending_events, + color, + x, + y, + }) + } catch (err) { + console.error('Event recording error:', err.message) + // Still allow the draw since balance was positive + res.json({ + allowed: true, + balance: balanceData.optimistic_balance, + color, + x, + y, + }) + } }) // ---- Start ---- @@ -176,9 +143,7 @@ app.listen(PORT, () => { +==================================================+ | PixelDraw — http://localhost:${PORT} | | Metronome customer: ${METRONOME_CUSTOMER_ID.slice(0, 20)}... | -| Redis: ${REDIS_URL.padEnd(42)}| -| Balance: Metronome-synced only (no local state) | -| Usage sent to Metronome (async) | +| Sidecar: ${SIDECAR_URL.padEnd(39)}| +==================================================+ `) }) diff --git a/packages/source-metronome/src/client.ts b/packages/source-metronome/src/client.ts index e12a9f9d5..2da86b14d 100644 --- a/packages/source-metronome/src/client.ts +++ b/packages/source-metronome/src/client.ts @@ -99,7 +99,8 @@ export class MetronomeClient { method: 'GET' | 'POST', path: string, body?: Record, - startCursor?: string | null + startCursor?: string | null, + options?: { skipLimit?: boolean } ): AsyncGenerator> { let nextPage: string | undefined = startCursor ?? undefined @@ -113,8 +114,8 @@ export class MetronomeClient { } else { const reqBody: Record = { ...(body ?? {}), - limit: PAGE_SIZE, } + if (!options?.skipLimit) reqBody['limit'] = PAGE_SIZE if (nextPage) reqBody['next_page'] = nextPage page = await this.post>(path, reqBody) } diff --git a/packages/source-metronome/src/index.ts b/packages/source-metronome/src/index.ts index 5de8794a2..09683be27 100644 --- a/packages/source-metronome/src/index.ts +++ b/packages/source-metronome/src/index.ts @@ -34,13 +34,8 @@ function buildCatalog(): CatalogPayload { } } -/** Event types that affect credit balances or entitlements */ -const ENTITLEMENT_EVENT_TYPES = new Set([ - 'contract.create', - 'contract.start', - 'contract.edit', - 'contract.end', - 'contract.archive', +/** Event types that affect credit/commit balances */ +const BALANCE_EVENT_TYPES = new Set([ 'commit.create', 'commit.edit', 'commit.segment.start', @@ -51,10 +46,20 @@ const ENTITLEMENT_EVENT_TYPES = new Set([ 'credit.segment.end', ]) +/** Event types that affect contract entitlements */ +const ENTITLEMENT_EVENT_TYPES = new Set([ + 'contract.create', + 'contract.start', + 'contract.edit', + 'contract.end', + 'contract.archive', +]) + +/** All webhook event types we handle */ +const WEBHOOK_EVENT_TYPES = new Set([...BALANCE_EVENT_TYPES, ...ENTITLEMENT_EVENT_TYPES]) + /** * On a webhook event, re-fetch affected data from Metronome and yield updated records. - * For credit events: re-fetch credit grants for the customer. - * For contract events: re-fetch entitlements (rate schedule) for the customer's contracts. */ async function* processWebhookEvent( event: MetronomeWebhookEvent, @@ -74,23 +79,21 @@ async function* processWebhookEvent( const now = Math.floor(Date.now() / 1000) - // Re-fetch credit grants for this customer - if (configuredStreamNames.has('credit_grants')) { - for await (const page of client.paginate('POST', '/v1/credits/listGrants', { - customer_ids: [customerId], - })) { - for (const grant of page.data) { - yield msg.record({ - stream: 'credit_grants', - data: { ...(grant as Record), _synced_at: now }, - emitted_at: new Date().toISOString(), - }) - } - } + // Re-fetch net balance only for balance-affecting events + if (configuredStreamNames.has('net_balance') && BALANCE_EVENT_TYPES.has(event.type)) { + const result = await client.post<{ data: Record }>( + '/v1/contracts/customerBalances/getNetBalance', + { customer_id: customerId } + ) + yield msg.record({ + stream: 'net_balance', + data: { ...result.data, customer_id: customerId, _synced_at: now }, + emitted_at: new Date().toISOString(), + }) } - // Re-fetch entitlements (rate schedules) for this customer's contracts - if (configuredStreamNames.has('entitlements')) { + // Re-fetch entitlements only for contract-affecting events + if (configuredStreamNames.has('entitlements') && ENTITLEMENT_EVENT_TYPES.has(event.type)) { const contractId = event.contract_id ?? (event.properties?.contract_id as string | undefined) const contractIds: string[] = [] @@ -261,24 +264,46 @@ const source: Source = { // Per-customer: iterate customers const custIds = await ensureCustomerIds() - outer: for (const customerId of custIds) { - for await (const page of client.paginate(resource.method, resource.endpoint, { - customer_id: customerId, - })) { - for (const record of page.data) { - const data = { - ...(record as Record), - _synced_at: Math.floor(Date.now() / 1000), + if (resource.singleObject) { + // Endpoint returns a single object, not a paginated list + outer_single: for (const customerId of custIds) { + const result = await client.post<{ data: Record }>( + resource.endpoint, + { customer_id: customerId } + ) + const data = { + ...result.data, + customer_id: customerId, + _synced_at: Math.floor(Date.now() / 1000), + } + yield msg.record({ + stream: streamName, + data, + emitted_at: new Date().toISOString(), + }) + recordCount++ + if (config.backfill_limit && recordCount >= config.backfill_limit) break outer_single + } + } else { + outer: for (const customerId of custIds) { + for await (const page of client.paginate(resource.method, resource.endpoint, { + customer_id: customerId, + }, undefined, { skipLimit: resource.skipLimit })) { + for (const record of page.data) { + const data = { + ...(record as Record), + _synced_at: Math.floor(Date.now() / 1000), + } + yield msg.record({ + stream: streamName, + data, + emitted_at: new Date().toISOString(), + }) + recordCount++ + if (config.backfill_limit && recordCount >= config.backfill_limit) break outer } - yield msg.record({ - stream: streamName, - data, - emitted_at: new Date().toISOString(), - }) - recordCount++ - if (config.backfill_limit && recordCount >= config.backfill_limit) break outer + if (config.backfill_limit && recordCount >= config.backfill_limit) break } - if (config.backfill_limit && recordCount >= config.backfill_limit) break } } } else { @@ -286,7 +311,8 @@ const source: Source = { resource.method, resource.endpoint, undefined, - startCursor + startCursor, + { skipLimit: resource.skipLimit } )) { for (const record of page.data) { const data = { @@ -341,8 +367,8 @@ const source: Source = { let waiter: ((item: QueueItem) => void) | null = null const server = startWebhookServer(config.webhook_port, config.webhook_secret, (input) => { - if (!ENTITLEMENT_EVENT_TYPES.has(input.event.type)) { - log.debug({ eventType: input.event.type }, 'metronome: ignoring non-entitlement event') + if (!WEBHOOK_EVENT_TYPES.has(input.event.type)) { + log.debug({ eventType: input.event.type }, 'metronome: ignoring unhandled event type') return } const { promise, resolve } = Promise.withResolvers() @@ -372,10 +398,10 @@ const source: Source = { try { yield* processWebhookEvent(item.event, client, configuredStreamNames) // Emit state checkpoint so destination flushes immediately - if (configuredStreamNames.has('credit_grants')) { + if (configuredStreamNames.has('net_balance')) { yield msg.source_state({ state_type: 'stream', - stream: 'credit_grants', + stream: 'net_balance', data: { next_page: null }, }) } diff --git a/packages/source-metronome/src/resources.ts b/packages/source-metronome/src/resources.ts index 03fcf4ce5..4ef022011 100644 --- a/packages/source-metronome/src/resources.ts +++ b/packages/source-metronome/src/resources.ts @@ -13,6 +13,10 @@ export interface ResourceDefinition { perCustomer?: boolean /** If true, requires iterating parent customers AND their contracts */ perContract?: boolean + /** If true, endpoint returns a single object (not a paginated list) */ + singleObject?: boolean + /** If true, do not send `limit` in pagination requests */ + skipLimit?: boolean } export const resources: ResourceDefinition[] = [ @@ -112,6 +116,7 @@ export const resources: ResourceDefinition[] = [ endpoint: '/v1/contract-pricing/rate-cards/list', method: 'POST', primaryKey: [['id']], + skipLimit: true, jsonSchema: { type: 'object', properties: { @@ -124,44 +129,18 @@ export const resources: ResourceDefinition[] = [ }, }, { - name: 'credit_grants', - endpoint: '/v1/credits/listGrants', + name: 'net_balance', + endpoint: '/v1/contracts/customerBalances/getNetBalance', method: 'POST', - primaryKey: [['id']], - jsonSchema: { - type: 'object', - properties: { - id: { type: 'string' }, - name: { type: 'string' }, - customer_id: { type: 'string' }, - reason: { type: ['string', 'null'] }, - effective_at: { type: 'string' }, - expires_at: { type: ['string', 'null'] }, - priority: { type: 'number' }, - credit_grant_type: { type: ['string', 'null'] }, - balance: { type: 'object' }, - custom_fields: { type: 'object' }, - _synced_at: { type: 'integer' }, - }, - }, - }, - { - name: 'invoices', - endpoint: '/v1/invoices', - method: 'GET', - primaryKey: [['id']], + primaryKey: [['customer_id'], ['credit_type_id']], + perCustomer: true, + singleObject: true, jsonSchema: { type: 'object', properties: { - id: { type: 'string' }, customer_id: { type: 'string' }, - status: { type: 'string' }, - total: { type: 'number' }, - credit_type: { type: 'object' }, - start_timestamp: { type: 'string' }, - end_timestamp: { type: 'string' }, - line_items: { type: 'array' }, - custom_fields: { type: 'object' }, + balance: { type: 'number' }, + credit_type_id: { type: 'string' }, _synced_at: { type: 'integer' }, }, }, diff --git a/packages/source-metronome/src/webhook.ts b/packages/source-metronome/src/webhook.ts index 78b9bd5fc..a46c4a5c1 100644 --- a/packages/source-metronome/src/webhook.ts +++ b/packages/source-metronome/src/webhook.ts @@ -38,7 +38,7 @@ export interface WebhookInput { verified: boolean } -export type WebhookPushFn = (input: WebhookInput) => void +export type WebhookPushFn = (input: WebhookInput) => void | Promise /** * Start an HTTP server that receives Metronome webhook events. @@ -57,7 +57,7 @@ export function startWebhookServer( const chunks: Buffer[] = [] req.on('data', (chunk: Buffer) => chunks.push(chunk)) - req.on('end', () => { + req.on('end', async () => { const body = Buffer.concat(chunks).toString('utf8') try { @@ -79,7 +79,10 @@ export function startWebhookServer( 'metronome: webhook received' ) - push({ event, raw_body: body, verified }) + const result = push({ event, raw_body: body, verified }) + if (result && typeof result.then === 'function') { + await result + } res.writeHead(200).end('{"received":true}') } catch (err) { const message = err instanceof Error ? err.message : String(err) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index d2955c56b..96bc57767 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -65,6 +65,31 @@ importers: specifier: ^7.1.11 version: 7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + apps/cache-sidecar: + dependencies: + '@hono/node-server': + specifier: ^1 + version: 1.19.11(hono@4.12.8) + hono: + specifier: ^4 + version: 4.12.8 + ioredis: + specifier: ^5 + version: 5.10.1 + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/node': + specifier: ^24.10.1 + version: 24.10.1 + tsx: + specifier: ^4 + version: 4.21.0 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + apps/dashboard: dependencies: '@radix-ui/react-accordion': @@ -164,9 +189,15 @@ importers: '@stripe/sync-destination-postgres': specifier: workspace:* version: link:../../packages/destination-postgres + '@stripe/sync-destination-redis': + specifier: workspace:* + version: link:../../packages/destination-redis '@stripe/sync-destination-sqlite': specifier: workspace:* version: link:../../packages/destination-sqlite + '@stripe/sync-destination-stripe': + specifier: workspace:* + version: link:../../packages/destination-stripe '@stripe/sync-hono-zod-openapi': specifier: workspace:* version: link:../../packages/hono-zod-openapi @@ -176,6 +207,12 @@ importers: '@stripe/sync-protocol': specifier: workspace:* version: link:../../packages/protocol + '@stripe/sync-source-metronome': + specifier: workspace:* + version: link:../../packages/source-metronome + '@stripe/sync-source-postgres': + specifier: workspace:* + version: link:../../packages/source-postgres '@stripe/sync-source-stripe': specifier: workspace:* version: link:../../packages/source-stripe @@ -252,9 +289,15 @@ importers: '@stripe/sync-destination-postgres': specifier: workspace:* version: link:../../packages/destination-postgres + '@stripe/sync-destination-redis': + specifier: workspace:* + version: link:../../packages/destination-redis '@stripe/sync-destination-sqlite': specifier: workspace:* version: link:../../packages/destination-sqlite + '@stripe/sync-destination-stripe': + specifier: workspace:* + version: link:../../packages/destination-stripe '@stripe/sync-engine': specifier: workspace:* version: link:../engine @@ -267,6 +310,12 @@ importers: '@stripe/sync-protocol': specifier: workspace:* version: link:../../packages/protocol + '@stripe/sync-source-metronome': + specifier: workspace:* + version: link:../../packages/source-metronome + '@stripe/sync-source-postgres': + specifier: workspace:* + version: link:../../packages/source-postgres '@stripe/sync-source-stripe': specifier: workspace:* version: link:../../packages/source-stripe @@ -436,6 +485,9 @@ importers: '@stripe/sync-destination-postgres': specifier: workspace:* version: link:../packages/destination-postgres + '@stripe/sync-destination-stripe': + specifier: workspace:* + version: link:../packages/destination-stripe '@stripe/sync-engine': specifier: workspace:* version: link:../apps/engine @@ -448,6 +500,9 @@ importers: '@stripe/sync-service': specifier: workspace:* version: link:../apps/service + '@stripe/sync-source-postgres': + specifier: workspace:* + version: link:../packages/source-postgres '@stripe/sync-source-stripe': specifier: workspace:* version: link:../packages/source-stripe @@ -535,6 +590,28 @@ importers: specifier: ^3.2.4 version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/destination-redis: + dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger + '@stripe/sync-protocol': + specifier: workspace:* + version: link:../protocol + ioredis: + specifier: ^5.6.1 + version: 5.10.1 + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/node': + specifier: ^24.5.0 + version: 24.10.1 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/destination-sqlite: dependencies: '@stripe/sync-logger': @@ -547,6 +624,28 @@ importers: specifier: ^4.3.6 version: 4.3.6 + packages/destination-stripe: + dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger + '@stripe/sync-openapi': + specifier: workspace:* + version: link:../openapi + '@stripe/sync-protocol': + specifier: workspace:* + version: link:../protocol + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/node': + specifier: ^24.10.1 + version: 24.10.1 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/hono-zod-openapi: dependencies: '@hono/zod-validator': @@ -626,6 +725,50 @@ importers: specifier: ^3.2.1 version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/source-metronome: + dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger + '@stripe/sync-protocol': + specifier: workspace:* + version: link:../protocol + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/node': + specifier: ^24.5.0 + version: 24.10.1 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + + packages/source-postgres: + dependencies: + '@stripe/sync-logger': + specifier: workspace:* + version: link:../logger + '@stripe/sync-protocol': + specifier: workspace:* + version: link:../protocol + '@stripe/sync-util-postgres': + specifier: workspace:* + version: link:../util-postgres + pg: + specifier: ^8.16.3 + version: 8.16.3 + zod: + specifier: ^4.3.6 + version: 4.3.6 + devDependencies: + '@types/pg': + specifier: ^8.15.5 + version: 8.20.0 + vitest: + specifier: ^3.2.4 + version: 3.2.4(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + packages/source-stripe: dependencies: '@stripe/sync-logger': @@ -1409,6 +1552,9 @@ packages: cpu: [x64] os: [win32] + '@ioredis/commands@1.5.1': + resolution: {integrity: sha512-JH8ZL/ywcJyR9MmJ5BNqZllXNZQqQbnVZOqpPQqE1vHiFgAw4NHbvE0FOduNU8IX9babitBT46571OnPTT0Zcw==} + '@isaacs/balanced-match@4.0.1': resolution: {integrity: sha512-yzMTt9lEb8Gv7zRioUilSglI0c0smZ9k5D65677DLWLtWJaXIS3CqcGyUFByYKlnUj6TkjLVs54fBl6+TiGQDQ==} engines: {node: 20 || >=22} @@ -3023,6 +3169,10 @@ packages: resolution: {integrity: sha512-eYm0QWBtUrBWZWG0d386OGAw16Z995PiOVo2B7bjWSbHedGl5e0ZWaq65kOGgUSNesEIDkB9ISbTg/JK9dhCZA==} engines: {node: '>=6'} + cluster-key-slot@1.1.2: + resolution: {integrity: sha512-RMr0FhtfXemyinomL4hrWcYJxmX6deFdCxpJzhDttxgO1+bcCnkk+9drydLVDmAMG7NE6aN/fl4F7ucU/90gAA==} + engines: {node: '>=0.10.0'} + code-excerpt@4.0.0: resolution: {integrity: sha512-xxodCmBen3iy2i0WtAK8FlFNrRzjUqjRsMfho58xT/wvZU1YTM3fCnRjcy1gJPMepaRlgm/0e6w8SpWHpn3/cA==} engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} @@ -3134,6 +3284,10 @@ packages: resolution: {integrity: sha512-ZySD7Nf91aLB0RxL4KGrKHBXl7Eds1DAmEdcoVawXnLD7SDhpNgtuII2aAkg7a7QS41jxPSZ17p4VdGnMHk3MQ==} engines: {node: '>=0.4.0'} + denque@2.1.0: + resolution: {integrity: sha512-HVQE3AAb/pxF8fQAoiqpvg9i3evqug3hoiwakOyZAwJm+6vZehbkYXZ0l4JxS+I3QxM97v5aaRNhj8v5oBhekw==} + engines: {node: '>=0.10'} + detect-libc@2.1.2: resolution: {integrity: sha512-Btj2BOOO83o3WyH59e8MgXsxEQVcarkUOpEYrubB0urwnN10yQ364rsiByU11nZlqWYZm05i/of7io4mzihBtQ==} engines: {node: '>=8'} @@ -3600,6 +3754,10 @@ packages: react-devtools-core: optional: true + ioredis@5.10.1: + resolution: {integrity: sha512-HuEDBTI70aYdx1v6U97SbNx9F1+svQKBDo30o0b9fw055LMepzpOOd0Ccg9Q6tbqmBSJaMuY0fB7yw9/vjBYCA==} + engines: {node: '>=12.22.0'} + is-extglob@2.1.1: resolution: {integrity: sha512-SbKbANkN603Vi4jEZv49LeVJMn4yGwsbzZworEoyEiutsN3nJYdbO36zfhGJ6QEDpOZIFkDtnq5JRxmvl3jsoQ==} engines: {node: '>=0.10.0'} @@ -3801,6 +3959,12 @@ packages: lodash.camelcase@4.3.0: resolution: {integrity: sha512-TwuEnCnxbc3rAvhf/LbG7tJUDzhqXyFnv3dtzLOPgCG/hODL7WFnsbwktkD7yUV0RrreP/l1PALq/YSg6VvjlA==} + lodash.defaults@4.2.0: + resolution: {integrity: sha512-qjxPLHd3r5DnsdGacqOMU6pb/avJzdh9tFX2ymgoZE27BmjXrNy/y4LoaiTeAb+O3gL8AfpJGtqfX/ae2leYYQ==} + + lodash.isarguments@3.1.0: + resolution: {integrity: sha512-chi4NHZlZqZD18a0imDHnZPrDeBbTtVN7GXMwuGdRH9qotxAjYs3aVLKc7zNOG9eddR5Ksd8rvFEBc9SsggPpg==} + lodash.merge@4.6.2: resolution: {integrity: sha512-0KpjqXRVvrYyCsX1swR/XTK0va6VQkQM6MNo7PqW77ByjAhoARA8EfrP1N4+KlKj8YS0ZUCtRT/YUuhyYDujIQ==} @@ -4243,6 +4407,14 @@ packages: resolution: {integrity: sha512-57frrGM/OCTLqLOAh0mhVA9VBMHd+9U7Zb2THMGdBUoZVOtGbJzjxsYGDJ3A9AYYCP4hn6y1TVbaOfzWtm5GFg==} engines: {node: '>= 12.13.0'} + redis-errors@1.2.0: + resolution: {integrity: sha512-1qny3OExCf0UvUV/5wpYKf2YwPcOqXzkwKKSmKHiE6ZMQs5heeE/c8eXK+PNllPvmjgAbfnsbpkGZWy8cBpn9w==} + engines: {node: '>=4'} + + redis-parser@3.0.0: + resolution: {integrity: sha512-DJnGAeenTdpMEH6uAJRK/uiyEIH9WVsUmoLwzudwGJUwZPp80PDBWPHXSAGNPwNvIXAbe7MSUB1zQFugFml66A==} + engines: {node: '>=4'} + require-directory@2.1.1: resolution: {integrity: sha512-fGxEI7+wsG9xrvdjsrlmL22OMTTiHRwAMroiEeMgq8gzoLC/PQr7RsRDSTLUg/bZAZtF+TVIkHc6/4RIKrui+Q==} engines: {node: '>=0.10.0'} @@ -4398,6 +4570,9 @@ packages: stackback@0.0.2: resolution: {integrity: sha512-1XMJE5fQo1jGH6Y/7ebnwPOBEkIEnT4QF32d5R1+VXdXveM0IBMJt8zfaxX1P3QhVwrYe+576+jkANtSS2mBbw==} + standard-as-callback@2.1.0: + resolution: {integrity: sha512-qoRRSyROncaz1z0mvYqIE4lCd9p2R90i6GxW3uZv5ucSu8tU7B5HXUP1gG8pVZsYNVaXjk8ClXHPttLyxAL48A==} + std-env@3.9.0: resolution: {integrity: sha512-UGvjygr6F6tpH7o2qyqR6QYpwraIjKSdtzyBdyytFOHmPZY917kwdwLG0RbOjWOnKmnm3PeHjaoLLMie7kPLQw==} @@ -4679,6 +4854,7 @@ packages: uuid@9.0.1: resolution: {integrity: sha512-b+1eJOlsR9K8HJpow9Ok3fiWOWSIcIzXodvv0rQjVoOVNpWMpxf1wZNpt4y9h10odCNrqnYp1OBzRktckBe3sA==} + deprecated: uuid@10 and below is no longer supported. For ESM codebases, update to uuid@latest. For CommonJS codebases, use uuid@11 (but be aware this version will likely be deprecated in 2028). hasBin: true vite-node@3.2.4: @@ -5861,6 +6037,8 @@ snapshots: '@img/sharp-win32-x64@0.34.5': optional: true + '@ioredis/commands@1.5.1': {} + '@isaacs/balanced-match@4.0.1': {} '@isaacs/brace-expansion@5.0.0': @@ -7279,6 +7457,14 @@ snapshots: chai: 5.2.0 tinyrainbow: 2.0.0 + '@vitest/mocker@3.2.4(vite@7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1))': + dependencies: + '@vitest/spy': 3.2.4 + estree-walker: 3.0.3 + magic-string: 0.30.21 + optionalDependencies: + vite: 7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1) + '@vitest/mocker@3.2.4(vite@7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1))': dependencies: '@vitest/spy': 3.2.4 @@ -7611,6 +7797,8 @@ snapshots: clsx@2.1.1: {} + cluster-key-slot@1.1.2: {} + code-excerpt@4.0.0: dependencies: convert-to-spaces: 2.0.1 @@ -7698,6 +7886,8 @@ snapshots: delayed-stream@1.0.0: {} + denque@2.1.0: {} + detect-libc@2.1.2: {} detect-node-es@1.1.0: {} @@ -8231,6 +8421,20 @@ snapshots: - bufferutil - utf-8-validate + ioredis@5.10.1: + dependencies: + '@ioredis/commands': 1.5.1 + cluster-key-slot: 1.1.2 + debug: 4.4.3(supports-color@10.2.2) + denque: 2.1.0 + lodash.defaults: 4.2.0 + lodash.isarguments: 3.1.0 + redis-errors: 1.2.0 + redis-parser: 3.0.0 + standard-as-callback: 2.1.0 + transitivePeerDependencies: + - supports-color + is-extglob@2.1.1: {} is-fullwidth-code-point@3.0.0: {} @@ -8385,6 +8589,10 @@ snapshots: lodash.camelcase@4.3.0: {} + lodash.defaults@4.2.0: {} + + lodash.isarguments@3.1.0: {} + lodash.merge@4.6.2: {} long@5.3.2: {} @@ -8805,6 +9013,12 @@ snapshots: real-require@0.2.0: {} + redis-errors@1.2.0: {} + + redis-parser@3.0.0: + dependencies: + redis-errors: 1.2.0 + require-directory@2.1.1: {} require-from-string@2.0.2: {} @@ -9018,6 +9232,8 @@ snapshots: stackback@0.0.2: {} + standard-as-callback@2.1.0: {} + std-env@3.9.0: {} string-width@4.2.3: @@ -9341,7 +9557,7 @@ snapshots: dependencies: '@types/chai': 5.2.2 '@vitest/expect': 3.2.4 - '@vitest/mocker': 3.2.4(vite@7.2.2(@types/node@25.5.0)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1)) + '@vitest/mocker': 3.2.4(vite@7.2.2(@types/node@24.10.1)(jiti@2.6.1)(lightningcss@1.32.0)(terser@5.46.1)(tsx@4.21.0)(yaml@2.8.1)) '@vitest/pretty-format': 3.2.4 '@vitest/runner': 3.2.4 '@vitest/snapshot': 3.2.4 diff --git a/scripts/e2e-metronome-redis.sh b/scripts/e2e-metronome-redis.sh index b748d811a..2571fcbf3 100755 --- a/scripts/e2e-metronome-redis.sh +++ b/scripts/e2e-metronome-redis.sh @@ -2,7 +2,7 @@ # End-to-end test: Metronome → source-metronome → destination-redis # # Proves the full pipeline works with real data: -# 1. Backfill credit grants + entitlements to Redis +# 1. Backfill net_balance + entitlements to Redis # 2. Start webhook listener # 3. Simulate customer usage (send events to Metronome ingest API) # 4. Fire a webhook event → source re-fetches → Redis updates @@ -18,8 +18,8 @@ set -euo pipefail : "${METRONOME_API_TOKEN:?Set METRONOME_API_TOKEN}" -CUSTOMER_ID="1a6de34e-ec68-46b0-a1c3-bb3d49f66bb3" -GRANT_ID="30ec9faa-3c5d-4cea-9e2a-b44a4e4446bd" +CUSTOMER_ID="${CUSTOMER_ID:?Set CUSTOMER_ID env var}" +CREDIT_TYPE_ID="${CREDIT_TYPE_ID:?Set CREDIT_TYPE_ID env var}" REDIS_PORT=56379 WEBHOOK_PORT=4243 KEY_PREFIX="sync:" @@ -35,7 +35,7 @@ fi redis-cli -p "$REDIS_PORT" FLUSHDB >/dev/null -CATALOG='{"streams":[{"stream":{"name":"credit_grants","primary_key":[["id"]],"newer_than_field":"_synced_at","json_schema":{}},"sync_mode":"full_refresh","destination_sync_mode":"append_dedup"},{"stream":{"name":"entitlements","primary_key":[["customer_id"],["contract_id"],["product_id"]],"newer_than_field":"_synced_at","json_schema":{}},"sync_mode":"full_refresh","destination_sync_mode":"append_dedup"}]}' +CATALOG='{"streams":[{"stream":{"name":"net_balance","primary_key":[["customer_id"],["credit_type_id"]],"newer_than_field":"_synced_at","json_schema":{}},"sync_mode":"full_refresh","destination_sync_mode":"append_dedup"},{"stream":{"name":"entitlements","primary_key":[["customer_id"],["contract_id"],["product_id"]],"newer_than_field":"_synced_at","json_schema":{}},"sync_mode":"full_refresh","destination_sync_mode":"append_dedup"}]}' SOURCE_CONFIG="{\"api_key\": \"$METRONOME_API_TOKEN\", \"webhook_port\": $WEBHOOK_PORT}" DEST_CONFIG="{\"url\":\"redis://localhost:$REDIS_PORT\",\"key_prefix\":\"$KEY_PREFIX\",\"batch_size\":1}" @@ -54,10 +54,10 @@ echo "" # Step 2: Check initial state echo "Step 2: Initial Redis state after backfill:" -BALANCE_BEFORE=$(redis-cli -p "$REDIS_PORT" GET "${KEY_PREFIX}credit_grants:$GRANT_ID" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['balance']['including_pending'])") -SYNCED_BEFORE=$(redis-cli -p "$REDIS_PORT" GET "${KEY_PREFIX}credit_grants:$GRANT_ID" | python3 -c "import sys,json; print(json.load(sys.stdin)['_synced_at'])") -echo " Credit balance: $BALANCE_BEFORE" -echo " Synced at: $SYNCED_BEFORE" +BALANCE_BEFORE=$(redis-cli -p "$REDIS_PORT" GET "${KEY_PREFIX}net_balance:$CUSTOMER_ID:$CREDIT_TYPE_ID" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['balance'])") +SYNCED_BEFORE=$(redis-cli -p "$REDIS_PORT" GET "${KEY_PREFIX}net_balance:$CUSTOMER_ID:$CREDIT_TYPE_ID" | python3 -c "import sys,json; print(json.load(sys.stdin)['_synced_at'])") +echo " Net balance: $BALANCE_BEFORE" +echo " Synced at: $SYNCED_BEFORE" echo "" # Step 3: Simulate customer usage @@ -91,17 +91,17 @@ echo "" # Step 5: Verify Redis updated echo "Step 5: Redis state after webhook refresh:" -BALANCE_AFTER=$(redis-cli -p "$REDIS_PORT" GET "${KEY_PREFIX}credit_grants:$GRANT_ID" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['balance']['including_pending'])") -SYNCED_AFTER=$(redis-cli -p "$REDIS_PORT" GET "${KEY_PREFIX}credit_grants:$GRANT_ID" | python3 -c "import sys,json; print(json.load(sys.stdin)['_synced_at'])") -echo " Credit balance: $BALANCE_AFTER" -echo " Synced at: $SYNCED_AFTER" +BALANCE_AFTER=$(redis-cli -p "$REDIS_PORT" GET "${KEY_PREFIX}net_balance:$CUSTOMER_ID:$CREDIT_TYPE_ID" | python3 -c "import sys,json; d=json.load(sys.stdin); print(d['balance'])") +SYNCED_AFTER=$(redis-cli -p "$REDIS_PORT" GET "${KEY_PREFIX}net_balance:$CUSTOMER_ID:$CREDIT_TYPE_ID" | python3 -c "import sys,json; print(json.load(sys.stdin)['_synced_at'])") +echo " Net balance: $BALANCE_AFTER" +echo " Synced at: $SYNCED_AFTER" echo "" # Step 6: Verify timestamp changed (proves webhook triggered a re-fetch) if [ "$SYNCED_AFTER" -gt "$SYNCED_BEFORE" ]; then - echo "✓ SUCCESS: Redis was updated by webhook (synced_at $SYNCED_BEFORE → $SYNCED_AFTER)" + echo "SUCCESS: Redis was updated by webhook (synced_at $SYNCED_BEFORE → $SYNCED_AFTER)" else - echo "✗ FAIL: Redis was NOT updated by webhook" + echo "FAIL: Redis was NOT updated by webhook" exit 1 fi