Skip to content

feat(collector): implement customer upsert functionality in OpenMeter collector#39

Open
eliteprox wants to merge 1 commit into
feat/identity-webhookfrom
feat/openmeter-provision
Open

feat(collector): implement customer upsert functionality in OpenMeter collector#39
eliteprox wants to merge 1 commit into
feat/identity-webhookfrom
feat/openmeter-provision

Conversation

@eliteprox

@eliteprox eliteprox commented Jun 19, 2026

Copy link
Copy Markdown
Collaborator

Problem

The collector previously assumed customers already existed in Konnect/OpenMeter before events arrived. If a create_signed_ticket event reached the collector before the customer was provisioned (e.g. identity webhook race, cold start, or new deployment), ingest would fail silently with a "no customer found for event subject" error. There was also a gap where billable_usd_micros was never emitted to the billing meters.

Solution

Add a lightweight Go provision sidecar that runs co-located with the Benthos collector inside the same container. Before each event is ingested, the sidecar idempotently ensures the corresponding Konnect customer and subscription exist. If ingest still fails, the collector self-heals by ensuring again and retrying once.

Presumptions

  • If we are getting ticket events from the signer for a customer that doesn't exist, we should always create a customer record in openmeter if they don't exist, to avoid billing issues later
  • Customers created this way are assigned a default plan subscription

Changes

New: deploy/openmeter-collector/provision/ — Go provision sidecar

  • Exposes POST /ensure (and GET /health) on 127.0.0.1:8091
  • Accepts client_id + external_user_id (or auth_id in clientID:externalUserID colon-split form) from flexible key aliases in both top-level fields and a nested data map
  • Idempotently creates a Konnect customer (with matching subjectKey for usage attribution) and subscribes it to OPENMETER_DEFAULT_PLAN_KEY — resolves the latest active plan version automatically
  • Race-safe: on concurrent duplicate creates, falls back to lookup rather than propagating the conflict error
  • Uses Kong sdk-konnect-go with the same kpat_… PAT already required by the collector; the identity webhook no longer needs OpenMeter credentials

Updated: Dockerfile — multi-stage build

  • Stage 1: builds the Go sidecar (CGO_ENABLED=0, trimpath, stripped)
  • Stage 2: assembles final image from alpine:3.21 with the benthos binary, sidecar, and config copied in
  • Adds a HEALTHCHECK against 127.0.0.1:8091/health

New: entrypoint.sh

  • Starts the provision sidecar in the background with trap-based cleanup on exit/signal
  • Polls /health up to 30 × 0.2 s before launching Benthos, ensuring the sidecar is ready before any events are processed

Updated: collector.yaml

  • Adds billable_usd_micros to the CloudEvent mapping (initially equal to network_fee_usd_micros)
  • Adds a branch processor before ingest that calls /ensure with the event's identity fields (CloudEvent body is preserved via result_map: "")
  • Replaces the single http_client output with a fallback: primary ingest attempt → on failure, ensure again + retry once

Updated: docker-compose.yml / deploy/README.md

  • Threads OPENMETER_DEFAULT_PLAN_KEY through compose (default: clearinghouse_default_ppu)
  • Documents the new customer-upsert flow, the separation of concerns between the sidecar and the bootstrap CLI, and a forward-looking note on a future billing-gateway service for OAuth-protected admin/query access

Architecture note

The collector holds the only OpenMeter admin credentials (kpat_…). The identity webhook, clearinghouse API, and any future caller talk to OpenMeter through the collector's sidecar (or, eventually, a dedicated billing-gateway). This keeps credential scope narrow and makes the event pipeline self-healing without adding a hard boot-time dependency on external provisioning state.

…llector

Added a local Go provision sidecar to handle idempotent customer upsert requests. Updated Dockerfile and entrypoint script to support the new provision server. Enhanced the collector's configuration to include the OPENMETER_DEFAULT_PLAN_KEY environment variable and updated the collector's pipeline to ensure customer creation before event ingestion.
@eliteprox eliteprox requested a review from rickstaa June 19, 2026 01:01
@eliteprox eliteprox marked this pull request as ready for review June 19, 2026 01:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant