diff --git a/deploy/README.md b/deploy/README.md index 00f69e7..f7be033 100644 --- a/deploy/README.md +++ b/deploy/README.md @@ -76,6 +76,7 @@ docker compose -f deploy/docker-compose.yml --env-file deploy/.env port remote-s | `OPENMETER_URL` | yes | — | OpenMeter / Konnect base URL (from bootstrap) | | `OPENMETER_INGEST_URL` | yes | — | Ingest endpoint (`${OPENMETER_URL}/events` for Konnect) | | `OPENMETER_API_KEY` | yes | — | Konnect PAT (`kpat_…`) (from bootstrap) | +| `OPENMETER_DEFAULT_PLAN_KEY` | no | `clearinghouse_default_ppu` | Plan subscribed on customer upsert | | `ETH_USD_PRICE` | no | `3500` | ETH/USD rate for Wei→USD micros conversion | | `AUTH0_PUBLIC_CLIENT_ID` | no | — | Auth0 public client id (from bootstrap) | @@ -109,8 +110,28 @@ Signer computed_fee (wei) Markup rules are defined in the bootstrap CLI catalog. Collector pipeline config: [`deploy/openmeter-collector/collector.yaml`](openmeter-collector/collector.yaml). -The collector does not yet emit `billable_usd_micros` (phase 2); until then the billable meter -stays empty while the catalog is ready. + +### Customer upsert (collector self-heal) + +The collector runs a local Go provision sidecar (`deploy/openmeter-collector/provision`, Kong `sdk-konnect-go`) that holds the +OpenMeter admin credentials — the identity webhook does **not** need them. + +For each `create_signed_ticket` event: + +1. Benthos maps the CloudEvent (including `billable_usd_micros`, initially equal to `network_fee_usd_micros`). +2. `POST http://127.0.0.1:8091/ensure` idempotently creates customer + subscription (`OPENMETER_DEFAULT_PLAN_KEY`). +3. Event is ingested to Konnect. +4. On ingest failure (e.g. `no customer found for event subject`), the collector ensures again and retries once. + +### Future admin/query boundary (OAuth later) + +When an admin/query API is added, introduce a small internal **billing-gateway** service: + +- Move ensure-customer and usage-query logic behind that gateway. +- Protect caller-to-gateway with OAuth (client credentials / service-to-service). +- Keep gateway-to-OpenMeter on backend machine credentials (`kpat_…`). + +The collector provision sidecar is the thin local equivalent until that gateway exists. ### API-key identity contract @@ -121,5 +142,5 @@ stays empty while the catalog is ready. Example customer key: `demo-client:demo-user` -Per-customer provisioning (Konnect customer + subscription) is a follow-up — not -yet implemented in the Go CLI. +Customer upsert is handled by the collector provision sidecar (see above). The Go bootstrap CLI +still provisions meters/features/plans; per-event customer+subscription ensure runs in the collector. diff --git a/deploy/docker-compose.yml b/deploy/docker-compose.yml index 5a44ef1..629b8d3 100644 --- a/deploy/docker-compose.yml +++ b/deploy/docker-compose.yml @@ -90,6 +90,7 @@ services: OPENMETER_URL: ${OPENMETER_URL} OPENMETER_INGEST_URL: ${OPENMETER_INGEST_URL} OPENMETER_API_KEY: ${OPENMETER_API_KEY} + OPENMETER_DEFAULT_PLAN_KEY: ${OPENMETER_DEFAULT_PLAN_KEY:-clearinghouse_default_ppu} ETH_USD_PRICE: ${ETH_USD_PRICE:-3500} # Signer datadir is bind-mounted from deploy/data (see SIGNER_DATA_DIR). diff --git a/deploy/openmeter-collector/Dockerfile b/deploy/openmeter-collector/Dockerfile index 1a79f70..af6fb31 100644 --- a/deploy/openmeter-collector/Dockerfile +++ b/deploy/openmeter-collector/Dockerfile @@ -1,6 +1,32 @@ # OpenMeter Benthos collector: Kafka create_signed_ticket events -> Konnect/OpenMeter ingest. -FROM ghcr.io/openmeterio/benthos-collector:latest +# Includes a local Go provision sidecar for idempotent customer upsert (Kong sdk-konnect-go). +FROM golang:1.25.11-alpine AS provision-build +WORKDIR /src +COPY deploy/openmeter-collector/provision/go.mod deploy/openmeter-collector/provision/go.sum ./ +RUN go mod download +COPY deploy/openmeter-collector/provision/ ./ +RUN CGO_ENABLED=0 go build -trimpath -ldflags="-s -w" -o /provision-server . + +FROM ghcr.io/openmeterio/benthos-collector:latest AS benthos + +FROM alpine:3.21 + +RUN apk add --no-cache wget ca-certificates + +WORKDIR /app + +COPY --from=provision-build /provision-server /app/provision-server +COPY deploy/openmeter-collector/entrypoint.sh /entrypoint.sh COPY deploy/openmeter-collector/collector.yaml /config.yaml +COPY --from=benthos /usr/local/bin/benthos /usr/local/bin/benthos + +RUN chmod +x /entrypoint.sh /app/provision-server + +ENV PROVISION_PORT=8091 +ENV OPENMETER_DEFAULT_PLAN_KEY=clearinghouse_default_ppu + +HEALTHCHECK --interval=10s --timeout=3s --retries=6 --start-period=10s \ + CMD wget -q -O- http://127.0.0.1:8091/health || exit 1 -CMD ["-c", "/config.yaml"] +ENTRYPOINT ["/entrypoint.sh"] diff --git a/deploy/openmeter-collector/collector.yaml b/deploy/openmeter-collector/collector.yaml index d520b94..4c25e37 100644 --- a/deploy/openmeter-collector/collector.yaml +++ b/deploy/openmeter-collector/collector.yaml @@ -6,6 +6,7 @@ # - Uses first-colon split semantics to preserve compatibility with builder-sdk. # - Konnect ingest: POST {OPENMETER_INGEST_URL} (set to ${OPENMETER_URL}/events). # For self-hosted OpenMeter set OPENMETER_INGEST_URL to ${OPENMETER_URL}/api/v1/events. +# - Customer upsert: local Go provision sidecar on :8091/ensure (Kong sdk-konnect-go). input: kafka: @@ -46,6 +47,7 @@ pipeline: "client_id": $client_id, "external_user_id": $external_user_id, "network_fee_usd_micros": $fee_usd_micros, + "billable_usd_micros": $fee_usd_micros, "pipeline": if $data.pipeline != "" && $data.pipeline != null { $data.pipeline } else { "unknown" }, "model_id": if $data.model_id != "" && $data.model_id != null { $data.model_id } else { "unknown" }, "pixels": $data.pixels.string().or("0"), @@ -61,14 +63,58 @@ pipeline: message: "signed_ticket mapping failed: ${! error() }" - mapping: root = deleted() + # Proactive idempotent customer ensure before ingest (preserve CloudEvent body). + - branch: + request_map: | + root.client_id = this.data.client_id + root.external_user_id = this.data.external_user_id + root.auth_id = this.subject + processors: + - http: + url: http://127.0.0.1:8091/ensure + verb: POST + headers: + Content-Type: application/json + timeout: 10s + result_map: "" + output: - http_client: - url: ${OPENMETER_INGEST_URL} - verb: POST - headers: - Authorization: "Bearer ${OPENMETER_API_KEY}" - Content-Type: application/cloudevents+json - successful_on: - - 200 - - 202 - - 204 + fallback: + - http_client: + url: ${OPENMETER_INGEST_URL} + verb: POST + headers: + Authorization: "Bearer ${OPENMETER_API_KEY}" + Content-Type: application/cloudevents+json + successful_on: + - 200 + - 202 + - 204 + # Self-heal: on ingest failure (e.g. no customer), ensure then retry once. + - processors: + - log: + level: WARN + message: "ingest failed, ensuring customer then retrying: ${! error() }" + - branch: + request_map: | + root.client_id = this.data.client_id + root.external_user_id = this.data.external_user_id + root.auth_id = this.subject + processors: + - http: + url: http://127.0.0.1:8091/ensure + verb: POST + headers: + Content-Type: application/json + timeout: 10s + result_map: "" + http_client: + url: ${OPENMETER_INGEST_URL} + verb: POST + headers: + Authorization: "Bearer ${OPENMETER_API_KEY}" + Content-Type: application/cloudevents+json + successful_on: + - 200 + - 202 + - 204 diff --git a/deploy/openmeter-collector/entrypoint.sh b/deploy/openmeter-collector/entrypoint.sh new file mode 100644 index 0000000..aa08e18 --- /dev/null +++ b/deploy/openmeter-collector/entrypoint.sh @@ -0,0 +1,27 @@ +#!/bin/sh +set -eu + +if [ -z "${OPENMETER_URL:-}" ] || [ -z "${OPENMETER_API_KEY:-}" ]; then + echo "entrypoint: OPENMETER_URL and OPENMETER_API_KEY are required" >&2 + exit 1 +fi + +/app/provision-server & +PROVISION_PID=$! + +cleanup() { + if kill -0 "$PROVISION_PID" 2>/dev/null; then + kill "$PROVISION_PID" 2>/dev/null || true + fi +} +trap cleanup EXIT INT TERM + +# Wait for provision sidecar before Benthos starts posting events. +for _ in $(seq 1 30); do + if wget -q -O- http://127.0.0.1:${PROVISION_PORT:-8091}/health >/dev/null 2>&1; then + break + fi + sleep 0.2 +done + +exec /usr/local/bin/benthos -c /config.yaml diff --git a/deploy/openmeter-collector/provision/go.mod b/deploy/openmeter-collector/provision/go.mod new file mode 100644 index 0000000..63eaf45 --- /dev/null +++ b/deploy/openmeter-collector/provision/go.mod @@ -0,0 +1,11 @@ +module github.com/livepeer/clearinghouse/deploy/openmeter-collector/provision + +go 1.25.10 + +require github.com/Kong/sdk-konnect-go v0.39.0 + +require ( + github.com/itchyny/gojq v0.12.17 // indirect + github.com/itchyny/timefmt-go v0.1.6 // indirect + github.com/spyzhov/ajson v0.8.0 // indirect +) diff --git a/deploy/openmeter-collector/provision/go.sum b/deploy/openmeter-collector/provision/go.sum new file mode 100644 index 0000000..b34824b --- /dev/null +++ b/deploy/openmeter-collector/provision/go.sum @@ -0,0 +1,16 @@ +github.com/Kong/sdk-konnect-go v0.39.0 h1:fQhfBsMOERUYkfxi5/cmXaiKC508ZsT+pWA6u7o56qE= +github.com/Kong/sdk-konnect-go v0.39.0/go.mod h1:rDWJcOR3KBkB4Mmrc+2RThAzlNZmcvGVdGpESfgTK9A= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/itchyny/gojq v0.12.17 h1:8av8eGduDb5+rvEdaOO+zQUjA04MS0m3Ps8HiD+fceg= +github.com/itchyny/gojq v0.12.17/go.mod h1:WBrEMkgAfAGO1LUcGOckBl5O726KPp+OlkKug0I/FEY= +github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q= +github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/spyzhov/ajson v0.8.0 h1:sFXyMbi4Y/BKjrsfkUZHSjA2JM1184enheSjjoT/zCc= +github.com/spyzhov/ajson v0.8.0/go.mod h1:63V+CGM6f1Bu/p4nLIN8885ojBdt88TbLoSFzyqMuVA= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/deploy/openmeter-collector/provision/main.go b/deploy/openmeter-collector/provision/main.go new file mode 100644 index 0000000..8f4d5b7 --- /dev/null +++ b/deploy/openmeter-collector/provision/main.go @@ -0,0 +1,166 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" + "log" + "net/http" + "os" + "strings" +) + +type ensureRequest struct { + ClientID string `json:"client_id"` + ClientIDAlt string `json:"clientId"` + ExternalUserID string `json:"external_user_id"` + ExternalUserAlt string `json:"externalUserId"` + AuthID string `json:"auth_id"` + AuthIDAlt string `json:"authId"` + Subject string `json:"subject"` + Data map[string]any `json:"data"` +} + +func requiredEnv(name string) (string, error) { + value := strings.TrimSpace(os.Getenv(name)) + if value == "" { + return "", fmt.Errorf("%s is required", name) + } + return value, nil +} + +func stringField(body map[string]any, keys ...string) string { + for _, key := range keys { + if raw, ok := body[key]; ok { + if value, ok := raw.(string); ok { + if trimmed := strings.TrimSpace(value); trimmed != "" { + return trimmed + } + } + } + } + return "" +} + +func parseIdentity(body ensureRequest) (clientID, externalUserID string, ok bool) { + data := body.Data + if data == nil { + data = map[string]any{} + } + + clientID = firstNonEmpty( + body.ClientID, + body.ClientIDAlt, + stringField(data, "client_id", "clientId"), + ) + externalUserID = firstNonEmpty( + body.ExternalUserID, + body.ExternalUserAlt, + stringField(data, "external_user_id", "externalUserId"), + ) + if clientID != "" && externalUserID != "" { + return clientID, externalUserID, true + } + + authID := firstNonEmpty( + body.AuthID, + body.AuthIDAlt, + stringField(data, "auth_id", "authId"), + body.Subject, + ) + colon := strings.Index(authID, ":") + if colon > 0 && colon < len(authID)-1 { + return authID[:colon], authID[colon+1:], true + } + return "", "", false +} + +func firstNonEmpty(values ...string) string { + for _, value := range values { + if trimmed := strings.TrimSpace(value); trimmed != "" { + return trimmed + } + } + return "" +} + +func writeJSON(w http.ResponseWriter, status int, body any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(status) + _ = json.NewEncoder(w).Encode(body) +} + +func main() { + port := strings.TrimSpace(os.Getenv("PROVISION_PORT")) + if port == "" { + port = "8091" + } + planKey := strings.TrimSpace(os.Getenv("OPENMETER_DEFAULT_PLAN_KEY")) + if planKey == "" { + planKey = "clearinghouse_default_ppu" + } + + baseURL, err := requiredEnv("OPENMETER_URL") + if err != nil { + log.Fatal(err) + } + apiKey, err := requiredEnv("OPENMETER_API_KEY") + if err != nil { + log.Fatal(err) + } + + provisioner := NewProvisioner(baseURL, apiKey, planKey) + + mux := http.NewServeMux() + mux.HandleFunc("/health", func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/plain") + w.WriteHeader(http.StatusOK) + _, _ = w.Write([]byte("ok")) + }) + mux.HandleFunc("/ensure", func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + http.NotFound(w, r) + return + } + + raw, err := io.ReadAll(r.Body) + if err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request body"}) + return + } + + var body ensureRequest + if len(raw) > 0 { + if err := json.Unmarshal(raw, &body); err != nil { + writeJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid request json"}) + return + } + } + + clientID, externalUserID, ok := parseIdentity(body) + if !ok { + writeJSON(w, http.StatusBadRequest, map[string]string{ + "error": "client_id and external_user_id (or auth_id) are required", + }) + return + } + + result, err := provisioner.Ensure(r.Context(), ProvisionInput{ + ClientID: clientID, + ExternalUserID: externalUserID, + DisplayName: fmt.Sprintf("%s:%s", clientID, externalUserID), + }) + if err != nil { + log.Printf("provision-server ensure failed: %v", err) + writeJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()}) + return + } + writeJSON(w, http.StatusOK, result) + }) + + addr := "127.0.0.1:" + port + log.Printf("provision-server listening on %s plan=%s", addr, planKey) + if err := http.ListenAndServe(addr, mux); err != nil { + log.Fatal(err) + } +} diff --git a/deploy/openmeter-collector/provision/provision.go b/deploy/openmeter-collector/provision/provision.go new file mode 100644 index 0000000..d6f2d78 --- /dev/null +++ b/deploy/openmeter-collector/provision/provision.go @@ -0,0 +1,307 @@ +package main + +import ( + "context" + "fmt" + "strings" + + sdkkonnectgo "github.com/Kong/sdk-konnect-go" + "github.com/Kong/sdk-konnect-go/models/components" + "github.com/Kong/sdk-konnect-go/models/operations" +) + +type Provisioner struct { + sdk *sdkkonnectgo.SDK + planKey string +} + +type ProvisionInput struct { + ClientID string + ExternalUserID string + DisplayName string +} + +type ProvisionResult struct { + CustomerKey string `json:"customerKey"` + CustomerID string `json:"customerId"` + SubscriptionID string `json:"subscriptionId"` + PlanKey string `json:"planKey"` + Status string `json:"status"` + Created struct { + Customer bool `json:"customer"` + Subscription bool `json:"subscription"` + } `json:"created"` +} + +func buildCustomerKey(clientID, externalUserID string) string { + return strings.TrimSpace(clientID) + ":" + strings.TrimSpace(externalUserID) +} + +func normalizeOpenMeterURL(baseURL string) string { + url := strings.TrimSpace(baseURL) + url = strings.TrimSuffix(url, "/") + url = strings.TrimSuffix(url, "/events") + url = strings.TrimSuffix(url, "/openmeter") + url = strings.TrimSuffix(url, "/v3") + return url +} + +func NewProvisioner(baseURL, apiKey, planKey string) *Provisioner { + url := normalizeOpenMeterURL(baseURL) + opts := []sdkkonnectgo.SDKOption{ + sdkkonnectgo.WithSecurity(components.Security{ + PersonalAccessToken: sdkkonnectgo.Pointer(apiKey), + }), + } + if url != "" { + opts = append(opts, sdkkonnectgo.WithServerURL(url)) + } else { + opts = append(opts, sdkkonnectgo.WithServerIndex(1)) + } + return &Provisioner{ + sdk: sdkkonnectgo.New(opts...), + planKey: strings.TrimSpace(planKey), + } +} + +func (p *Provisioner) Ensure(ctx context.Context, input ProvisionInput) (*ProvisionResult, error) { + clientID := strings.TrimSpace(input.ClientID) + externalUserID := strings.TrimSpace(input.ExternalUserID) + if clientID == "" || externalUserID == "" { + return nil, fmt.Errorf("clientId and externalUserId must be non-empty") + } + if p.planKey == "" { + return nil, fmt.Errorf("plan key must be non-empty") + } + + customerKey := buildCustomerKey(clientID, externalUserID) + displayName := strings.TrimSpace(input.DisplayName) + if displayName == "" { + displayName = customerKey + } + + customerID, customerCreated, err := p.ensureCustomer(ctx, customerKey, displayName) + if err != nil { + return nil, err + } + + subscriptionID, status, subscriptionCreated, err := p.ensureSubscription(ctx, customerID) + if err != nil { + return nil, err + } + + result := &ProvisionResult{ + CustomerKey: customerKey, + CustomerID: customerID, + SubscriptionID: subscriptionID, + PlanKey: p.planKey, + Status: status, + } + result.Created.Customer = customerCreated + result.Created.Subscription = subscriptionCreated + return result, nil +} + +func (p *Provisioner) ensureCustomer(ctx context.Context, customerKey, displayName string) (string, bool, error) { + if existing, err := p.findCustomerByKey(ctx, customerKey); err != nil { + return "", false, err + } else if existing != "" { + return existing, false, nil + } + + res, err := p.sdk.OpenMeterCustomers.CreateCustomer(ctx, components.CreateCustomerRequest{ + Key: customerKey, + Name: displayName, + UsageAttribution: &components.UsageAttribution{ + SubjectKeys: []string{customerKey}, + }, + }) + if err != nil { + if existing, findErr := p.findCustomerByKey(ctx, customerKey); findErr == nil && existing != "" { + return existing, false, nil + } + return "", false, fmt.Errorf("creating customer %s: %w", customerKey, err) + } + if res.BillingCustomer == nil || res.BillingCustomer.ID == "" { + return "", false, fmt.Errorf("creating customer %s: empty response", customerKey) + } + if res.StatusCode < 200 || res.StatusCode >= 300 { + return "", false, fmt.Errorf("creating customer %s: status %d", customerKey, res.StatusCode) + } + return res.BillingCustomer.ID, true, nil +} + +func (p *Provisioner) findCustomerByKey(ctx context.Context, customerKey string) (string, error) { + res, err := p.sdk.OpenMeterCustomers.ListCustomers(ctx, operations.ListCustomersRequest{ + Filter: &components.ListCustomersParamsFilter{ + Key: &components.StringFieldFilter{ + Eq: sdkkonnectgo.Pointer(customerKey), + }, + }, + Page: &components.PagePaginationQuery{ + Number: sdkkonnectgo.Pointer(int64(1)), + Size: sdkkonnectgo.Pointer(int64(100)), + }, + }) + if err != nil { + return "", fmt.Errorf("listing customers for key %s: %w", customerKey, err) + } + if res.CustomerPagePaginatedResponse != nil { + for _, customer := range res.CustomerPagePaginatedResponse.Data { + if customer.Key == customerKey && customer.ID != "" { + return customer.ID, nil + } + } + } + + getRes, err := p.sdk.OpenMeterCustomers.GetCustomer(ctx, customerKey) + if err == nil && getRes.BillingCustomer != nil && getRes.BillingCustomer.ID != "" { + return getRes.BillingCustomer.ID, nil + } + if err == nil && (getRes.StatusCode == 404) { + return "", nil + } + if err == nil && getRes.StatusCode >= 200 && getRes.StatusCode < 300 { + return "", nil + } + if err != nil { + return "", nil + } + return "", nil +} + +func subscriptionStatusActive(status components.BillingSubscriptionStatus) bool { + switch status { + case components.BillingSubscriptionStatusActive, + components.BillingSubscriptionStatusScheduled: + return true + default: + return false + } +} + +func planStatusUsable(status components.BillingPlanStatus) bool { + switch status { + case components.BillingPlanStatusActive, + components.BillingPlanStatusScheduled: + return true + default: + return false + } +} + +func (p *Provisioner) resolveActivePlan(ctx context.Context) (components.Plan, error) { + const pageSize = int64(50) + page := int64(1) + var best *components.BillingPlan + + for { + res, err := p.sdk.OpenMeterProductCatalog.ListPlans(ctx, operations.ListPlansRequest{ + Filter: &components.ListPlansParamsFilter{ + Key: &components.StringFieldFilter{ + Eq: sdkkonnectgo.Pointer(p.planKey), + }, + }, + Page: &components.PagePaginationQuery{ + Number: sdkkonnectgo.Pointer(page), + Size: sdkkonnectgo.Pointer(pageSize), + }, + }) + if err != nil { + return components.Plan{}, fmt.Errorf("listing plans for key %s: %w", p.planKey, err) + } + if res.PlanPagePaginatedResponse == nil { + break + } + for i := range res.PlanPagePaginatedResponse.Data { + plan := res.PlanPagePaginatedResponse.Data[i] + if plan.DeletedAt != nil || !planStatusUsable(plan.Status) || plan.ID == "" { + continue + } + if best == nil || planVersion(plan) > planVersion(*best) { + copy := plan + best = © + } + } + if len(res.PlanPagePaginatedResponse.Data) < int(pageSize) { + break + } + page++ + } + + if best == nil { + return components.Plan{}, fmt.Errorf("no active plan found for key %s (latest version may be deleted; re-run clearinghouse-bootstrap --skip-auth0)", p.planKey) + } + + return components.Plan{ + ID: sdkkonnectgo.Pointer(best.ID), + Key: sdkkonnectgo.Pointer(p.planKey), + Version: best.Version, + }, nil +} + +func planVersion(plan components.BillingPlan) int64 { + if plan.Version == nil { + return 0 + } + return *plan.Version +} + +func (p *Provisioner) ensureSubscription(ctx context.Context, customerID string) (string, string, bool, error) { + const pageSize = int64(100) + page := int64(1) + + for { + customerFilter := components.CreateULIDFieldFilterStr(customerID) + res, err := p.sdk.OpenMeterSubscriptions.ListSubscriptions(ctx, operations.ListSubscriptionsRequest{ + Filter: &components.ListSubscriptionsParamsFilter{ + CustomerID: &customerFilter, + PlanKey: &components.StringFieldFilterExact{ + Eq: sdkkonnectgo.Pointer(p.planKey), + }, + }, + Page: &components.PagePaginationQuery{ + Number: sdkkonnectgo.Pointer(page), + Size: sdkkonnectgo.Pointer(pageSize), + }, + }) + if err != nil { + return "", "", false, fmt.Errorf("listing subscriptions for customer %s: %w", customerID, err) + } + if res.SubscriptionPagePaginatedResponse != nil { + for _, sub := range res.SubscriptionPagePaginatedResponse.Data { + if sub.ID != "" && subscriptionStatusActive(sub.Status) { + return sub.ID, string(sub.Status), false, nil + } + } + if len(res.SubscriptionPagePaginatedResponse.Data) < int(pageSize) { + break + } + } else { + break + } + page++ + } + + planRef, err := p.resolveActivePlan(ctx) + if err != nil { + return "", "", false, err + } + + createRes, err := p.sdk.OpenMeterSubscriptions.CreateSubscription(ctx, components.BillingSubscriptionCreate{ + Customer: components.BillingSubscriptionCreateCustomer{ + ID: sdkkonnectgo.Pointer(customerID), + }, + Plan: planRef, + }) + if err != nil { + return "", "", false, fmt.Errorf("creating subscription for customer %s plan %s: %w", customerID, p.planKey, err) + } + if createRes.BillingSubscription == nil || createRes.BillingSubscription.ID == "" { + return "", "", false, fmt.Errorf("creating subscription for customer %s plan %s: empty response", customerID, p.planKey) + } + if createRes.StatusCode < 200 || createRes.StatusCode >= 300 { + return "", "", false, fmt.Errorf("creating subscription for customer %s plan %s: status %d", customerID, p.planKey, createRes.StatusCode) + } + return createRes.BillingSubscription.ID, string(createRes.BillingSubscription.Status), true, nil +}