Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions deploy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ docker compose -f deploy/docker-compose.yml --env-file deploy/.env port remote-s
| `OPENMETER_URL` | yes | — | OpenMeter / Konnect base URL (from bootstrap) |
| `OPENMETER_INGEST_URL` | yes | — | Ingest endpoint (`${OPENMETER_URL}/events` for Konnect) |
| `OPENMETER_API_KEY` | yes | — | Konnect PAT (`kpat_…`) (from bootstrap) |
| `OPENMETER_DEFAULT_PLAN_KEY` | no | `clearinghouse_default_ppu` | Plan subscribed on customer upsert |
| `ETH_USD_PRICE` | no | `3500` | ETH/USD rate for Wei→USD micros conversion |
| `AUTH0_PUBLIC_CLIENT_ID` | no | — | Auth0 public client id (from bootstrap) |

Expand Down Expand Up @@ -109,8 +110,28 @@ Signer computed_fee (wei)

Markup rules are defined in the bootstrap CLI catalog. Collector
pipeline config: [`deploy/openmeter-collector/collector.yaml`](openmeter-collector/collector.yaml).
The collector does not yet emit `billable_usd_micros` (phase 2); until then the billable meter
stays empty while the catalog is ready.

### Customer upsert (collector self-heal)

The collector runs a local Go provision sidecar (`deploy/openmeter-collector/provision`, Kong `sdk-konnect-go`) that holds the
OpenMeter admin credentials — the identity webhook does **not** need them.

For each `create_signed_ticket` event:

1. Benthos maps the CloudEvent (including `billable_usd_micros`, initially equal to `network_fee_usd_micros`).
2. `POST http://127.0.0.1:8091/ensure` idempotently creates customer + subscription (`OPENMETER_DEFAULT_PLAN_KEY`).
3. Event is ingested to Konnect.
4. On ingest failure (e.g. `no customer found for event subject`), the collector ensures again and retries once.

### Future admin/query boundary (OAuth later)

When an admin/query API is added, introduce a small internal **billing-gateway** service:

- Move ensure-customer and usage-query logic behind that gateway.
- Protect caller-to-gateway with OAuth (client credentials / service-to-service).
- Keep gateway-to-OpenMeter on backend machine credentials (`kpat_…`).

The collector provision sidecar is the thin local equivalent until that gateway exists.

### API-key identity contract

Expand All @@ -121,5 +142,5 @@ stays empty while the catalog is ready.

Example customer key: `demo-client:demo-user`

Per-customer provisioning (Konnect customer + subscription) is a follow-up — not
yet implemented in the Go CLI.
Customer upsert is handled by the collector provision sidecar (see above). The Go bootstrap CLI
still provisions meters/features/plans; per-event customer+subscription ensure runs in the collector.
1 change: 1 addition & 0 deletions deploy/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ services:
OPENMETER_URL: ${OPENMETER_URL}
OPENMETER_INGEST_URL: ${OPENMETER_INGEST_URL}
OPENMETER_API_KEY: ${OPENMETER_API_KEY}
OPENMETER_DEFAULT_PLAN_KEY: ${OPENMETER_DEFAULT_PLAN_KEY:-clearinghouse_default_ppu}
ETH_USD_PRICE: ${ETH_USD_PRICE:-3500}

# Signer datadir is bind-mounted from deploy/data (see SIGNER_DATA_DIR).
Expand Down
30 changes: 28 additions & 2 deletions deploy/openmeter-collector/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,32 @@
# OpenMeter Benthos collector: Kafka create_signed_ticket events -> Konnect/OpenMeter ingest.
FROM ghcr.io/openmeterio/benthos-collector:latest
# Includes a local Go provision sidecar for idempotent customer upsert (Kong sdk-konnect-go).
FROM golang:1.25.11-alpine AS provision-build

WORKDIR /src
COPY deploy/openmeter-collector/provision/go.mod deploy/openmeter-collector/provision/go.sum ./
RUN go mod download
COPY deploy/openmeter-collector/provision/ ./
RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /provision-server .

FROM ghcr.io/openmeterio/benthos-collector:latest AS benthos

FROM alpine:3.21

RUN apk add --no-cache wget ca-certificates

WORKDIR /app

COPY --from=provision-build /provision-server /app/provision-server
COPY deploy/openmeter-collector/entrypoint.sh /entrypoint.sh
COPY deploy/openmeter-collector/collector.yaml /config.yaml
COPY --from=benthos /usr/local/bin/benthos /usr/local/bin/benthos

RUN chmod +x /entrypoint.sh /app/provision-server

ENV PROVISION_PORT=8091
ENV OPENMETER_DEFAULT_PLAN_KEY=clearinghouse_default_ppu

HEALTHCHECK --interval=10s --timeout=3s --retries=6 --start-period=10s \
CMD wget -q -O- http://127.0.0.1:8091/health || exit 1

CMD ["-c", "/config.yaml"]
ENTRYPOINT ["/entrypoint.sh"]
66 changes: 56 additions & 10 deletions deploy/openmeter-collector/collector.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
# - Uses first-colon split semantics to preserve compatibility with builder-sdk.
# - Konnect ingest: POST {OPENMETER_INGEST_URL} (set to ${OPENMETER_URL}/events).
# For self-hosted OpenMeter set OPENMETER_INGEST_URL to ${OPENMETER_URL}/api/v1/events.
# - Customer upsert: local Go provision sidecar on :8091/ensure (Kong sdk-konnect-go).

input:
kafka:
Expand Down Expand Up @@ -46,6 +47,7 @@ pipeline:
"client_id": $client_id,
"external_user_id": $external_user_id,
"network_fee_usd_micros": $fee_usd_micros,
"billable_usd_micros": $fee_usd_micros,
"pipeline": if $data.pipeline != "" && $data.pipeline != null { $data.pipeline } else { "unknown" },
"model_id": if $data.model_id != "" && $data.model_id != null { $data.model_id } else { "unknown" },
"pixels": $data.pixels.string().or("0"),
Expand All @@ -61,14 +63,58 @@ pipeline:
message: "signed_ticket mapping failed: ${! error() }"
- mapping: root = deleted()

# Proactive idempotent customer ensure before ingest (preserve CloudEvent body).
- branch:
request_map: |
root.client_id = this.data.client_id
root.external_user_id = this.data.external_user_id
root.auth_id = this.subject
processors:
- http:
url: http://127.0.0.1:8091/ensure
verb: POST
headers:
Content-Type: application/json
timeout: 10s
result_map: ""

output:
http_client:
url: ${OPENMETER_INGEST_URL}
verb: POST
headers:
Authorization: "Bearer ${OPENMETER_API_KEY}"
Content-Type: application/cloudevents+json
successful_on:
- 200
- 202
- 204
fallback:
- http_client:
url: ${OPENMETER_INGEST_URL}
verb: POST
headers:
Authorization: "Bearer ${OPENMETER_API_KEY}"
Content-Type: application/cloudevents+json
successful_on:
- 200
- 202
- 204
# Self-heal: on ingest failure (e.g. no customer), ensure then retry once.
- processors:
- log:
level: WARN
message: "ingest failed, ensuring customer then retrying: ${! error() }"
- branch:
request_map: |
root.client_id = this.data.client_id
root.external_user_id = this.data.external_user_id
root.auth_id = this.subject
processors:
- http:
url: http://127.0.0.1:8091/ensure
verb: POST
headers:
Content-Type: application/json
timeout: 10s
result_map: ""
http_client:
url: ${OPENMETER_INGEST_URL}
verb: POST
headers:
Authorization: "Bearer ${OPENMETER_API_KEY}"
Content-Type: application/cloudevents+json
successful_on:
- 200
- 202
- 204
27 changes: 27 additions & 0 deletions deploy/openmeter-collector/entrypoint.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#!/bin/sh
set -eu

if [ -z "${OPENMETER_URL:-}" ] || [ -z "${OPENMETER_API_KEY:-}" ]; then
echo "entrypoint: OPENMETER_URL and OPENMETER_API_KEY are required" >&2
exit 1
fi

/app/provision-server &
PROVISION_PID=$!

cleanup() {
if kill -0 "$PROVISION_PID" 2>/dev/null; then
kill "$PROVISION_PID" 2>/dev/null || true
fi
}
trap cleanup EXIT INT TERM

# Wait for provision sidecar before Benthos starts posting events.
for _ in $(seq 1 30); do
if wget -q -O- http://127.0.0.1:${PROVISION_PORT:-8091}/health >/dev/null 2>&1; then
break
fi
sleep 0.2
done

exec /usr/local/bin/benthos -c /config.yaml
11 changes: 11 additions & 0 deletions deploy/openmeter-collector/provision/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
module github.com/livepeer/clearinghouse/deploy/openmeter-collector/provision

go 1.25.10

require github.com/Kong/sdk-konnect-go v0.39.0

require (
github.com/itchyny/gojq v0.12.17 // indirect
github.com/itchyny/timefmt-go v0.1.6 // indirect
github.com/spyzhov/ajson v0.8.0 // indirect
)
16 changes: 16 additions & 0 deletions deploy/openmeter-collector/provision/go.sum
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
github.com/Kong/sdk-konnect-go v0.39.0 h1:fQhfBsMOERUYkfxi5/cmXaiKC508ZsT+pWA6u7o56qE=
github.com/Kong/sdk-konnect-go v0.39.0/go.mod h1:rDWJcOR3KBkB4Mmrc+2RThAzlNZmcvGVdGpESfgTK9A=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/itchyny/gojq v0.12.17 h1:8av8eGduDb5+rvEdaOO+zQUjA04MS0m3Ps8HiD+fceg=
github.com/itchyny/gojq v0.12.17/go.mod h1:WBrEMkgAfAGO1LUcGOckBl5O726KPp+OlkKug0I/FEY=
github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q=
github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spyzhov/ajson v0.8.0 h1:sFXyMbi4Y/BKjrsfkUZHSjA2JM1184enheSjjoT/zCc=
github.com/spyzhov/ajson v0.8.0/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
166 changes: 166 additions & 0 deletions deploy/openmeter-collector/provision/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package main

import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
)

type ensureRequest struct {
ClientID string `json:"client_id"`
ClientIDAlt string `json:"clientId"`
ExternalUserID string `json:"external_user_id"`
ExternalUserAlt string `json:"externalUserId"`
AuthID string `json:"auth_id"`
AuthIDAlt string `json:"authId"`
Subject string `json:"subject"`
Data map[string]any `json:"data"`
}

func requiredEnv(name string) (string, error) {
value := strings.TrimSpace(os.Getenv(name))
if value == "" {
return "", fmt.Errorf("%s is required", name)
}
return value, nil
}

func stringField(body map[string]any, keys ...string) string {
for _, key := range keys {
if raw, ok := body[key]; ok {
if value, ok := raw.(string); ok {
if trimmed := strings.TrimSpace(value); trimmed != "" {
return trimmed
}
}
}
}
return ""
}

func parseIdentity(body ensureRequest) (clientID, externalUserID string, ok bool) {
data := body.Data
if data == nil {
data = map[string]any{}
}

clientID = firstNonEmpty(
body.ClientID,
body.ClientIDAlt,
stringField(data, "client_id", "clientId"),
)
externalUserID = firstNonEmpty(
body.ExternalUserID,
body.ExternalUserAlt,
stringField(data, "external_user_id", "externalUserId"),
)
if clientID != "" && externalUserID != "" {
return clientID, externalUserID, true
}

authID := firstNonEmpty(
body.AuthID,
body.AuthIDAlt,
stringField(data, "auth_id", "authId"),
body.Subject,
)
colon := strings.Index(authID, ":")
if colon > 0 && colon < len(authID)-1 {
return authID[:colon], authID[colon+1:], true
}
return "", "", false
}

func firstNonEmpty(values ...string) string {
for _, value := range values {
if trimmed := strings.TrimSpace(value); trimmed != "" {
return trimmed
}
}
return ""
}

func writeJSON(w http.ResponseWriter, status int, body any) {
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(status)
_ = json.NewEncoder(w).Encode(body)
}

func main() {
port := strings.TrimSpace(os.Getenv("PROVISION_PORT"))
if port == "" {
port = "8091"
}
planKey := strings.TrimSpace(os.Getenv("OPENMETER_DEFAULT_PLAN_KEY"))
if planKey == "" {
planKey = "clearinghouse_default_ppu"
}

baseURL, err := requiredEnv("OPENMETER_URL")
if err != nil {
log.Fatal(err)
}
apiKey, err := requiredEnv("OPENMETER_API_KEY")
if err != nil {
log.Fatal(err)
}

provisioner := NewProvisioner(baseURL, apiKey, planKey)

mux := http.NewServeMux()
mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) {
w.Header().Set("Content-Type", "text/plain")
w.WriteHeader(http.StatusOK)
_, _ = w.Write([]byte("ok"))
})
mux.HandleFunc("/ensure", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.NotFound(w, r)
return
}

raw, err := io.ReadAll(r.Body)
if err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body"})
return
}

var body ensureRequest
if len(raw) > 0 {
if err := json.Unmarshal(raw, &body); err != nil {
writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request json"})
return
}
}

clientID, externalUserID, ok := parseIdentity(body)
if !ok {
writeJSON(w, http.StatusBadRequest, map[string]string{
"error": "client_id and external_user_id (or auth_id) are required",
})
return
}

result, err := provisioner.Ensure(r.Context(), ProvisionInput{
ClientID: clientID,
ExternalUserID: externalUserID,
DisplayName: fmt.Sprintf("%s:%s", clientID, externalUserID),
})
if err != nil {
log.Printf("provision-server ensure failed: %v", err)
writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
return
}
writeJSON(w, http.StatusOK, result)
})

addr := "127.0.0.1:" + port
log.Printf("provision-server listening on %s plan=%s", addr, planKey)
if err := http.ListenAndServe(addr, mux); err != nil {
log.Fatal(err)
}
}
Loading