diff --git a/test/e2e/apisix/e2e_test.go b/test/e2e/apisix/e2e_test.go index fde91636..d6657b63 100644 --- a/test/e2e/apisix/e2e_test.go +++ b/test/e2e/apisix/e2e_test.go @@ -45,3 +45,8 @@ func TestAPISIXE2E(t *testing.T) { _, _ = fmt.Fprintf(GinkgoWriter, "Starting APISIX standalone e2e suite\n") RunSpecs(t, "apisix standalone e2e suite") } + +// Tear down any prewarmed environments left in the pools when the suite ends. +var _ = AfterSuite(func() { + scaffold.ShutdownAllPools() +}) diff --git a/test/e2e/crds/v2/route.go b/test/e2e/crds/v2/route.go index d228885d..213128d4 100644 --- a/test/e2e/crds/v2/route.go +++ b/test/e2e/crds/v2/route.go @@ -2201,9 +2201,19 @@ spec: &apiv2.ApisixRoute{}, fmt.Sprintf(apisixRouteSpec, s.Namespace())) By("check upstreams") - upstreams, err := s.DefaultDataplaneResource().Upstream().List(context.Background()) - Expect(err).ShouldNot(HaveOccurred()) - Expect(upstreams).Should(HaveLen(4)) + // Poll instead of asserting once: the controller syncs the four + // upstreams to the data plane asynchronously after the ApisixRoute is + // applied, so a single immediate check can race and observe fewer. + s.RetryAssertion(func() error { + upstreams, err := s.DefaultDataplaneResource().Upstream().List(context.Background()) + if err != nil { + return err + } + if len(upstreams) != 4 { + return fmt.Errorf("expected 4 upstreams, got %d", len(upstreams)) + } + return nil + }).ShouldNot(HaveOccurred(), "waiting for 4 upstreams to be synced") By("verify ApisixRoute works") s.RequestAssert(&scaffold.RequestAssert{ diff --git a/test/e2e/framework/k8s.go b/test/e2e/framework/k8s.go index c901332d..efadfae7 100644 --- a/test/e2e/framework/k8s.go +++ b/test/e2e/framework/k8s.go @@ -74,6 +74,13 @@ func (f *Framework) ensureService(name, namespace string, desiredEndpoints int) return f.ensureServiceWithTimeout(name, namespace, desiredEndpoints, 120) } +// EnsureServiceReadyE waits until the named Service has the desired number of +// ready endpoints, returning an error instead of failing the test. It is used by +// the prewarm pool, whose background workers must not call Ginkgo assertions. +func (f *Framework) EnsureServiceReadyE(namespace, name string, desiredEndpoints int) error { + return f.ensureService(name, namespace, desiredEndpoints) +} + func (f *Framework) ensureServiceWithTimeout(name, namespace string, desiredEndpoints, timeout int) error { backoff := wait.Backoff{ Duration: 6 * time.Second, @@ -290,10 +297,14 @@ func WaitPodsAvailable(t testing.TestingT, kubeOps *k8s.KubectlOptions, opts met } func waitExponentialBackoff(condFunc func() (bool, error)) error { + // Poll at a fixed 2s interval up to ~180s. The previous exponential schedule + // (500ms, factor 2, 8 steps) polled at 7.5/15.5/31.5/63.5s, i.e. sparsely + // exactly during the 10-30s window when pods usually become ready, adding up + // to ~15s of needless waiting per call. backoff := wait.Backoff{ - Duration: 500 * time.Millisecond, - Factor: 2, - Steps: 8, + Duration: 2 * time.Second, + Factor: 1, + Steps: 90, } return wait.ExponentialBackoff(backoff, condFunc) } diff --git a/test/e2e/scaffold/apisix_deployer.go b/test/e2e/scaffold/apisix_deployer.go index 07504946..f68b99e2 100644 --- a/test/e2e/scaffold/apisix_deployer.go +++ b/test/e2e/scaffold/apisix_deployer.go @@ -59,6 +59,34 @@ func NewAPISIXDeployer(s *Scaffold) Deployer { } func (s *APISIXDeployer) BeforeEach() { + // Fast path: pick up a prewarmed environment so the deploy/readiness + // latency is paid by a background worker (overlapping the previous spec) + // instead of on this spec's critical path. Only the default profile is + // pooled; anything else, or any provisioning failure, falls back to the + // synchronous deploy below. + if prewarmEnabled() && isPoolable(s.opts) && specPrewarmable() { + fw, opts := s.Framework, s.opts + pool := getOrStartPool(profileKey(opts), prewarmDepth(), func() *pooledEnv { + return provisionAPISIXEnv(fw, opts) + }) + env := pool.acquire() + if env != nil && env.err == nil { + if err := s.loadPooledEnv(env); err != nil { + s.Logf("prewarm tunnel setup failed, falling back to synchronous deploy: %v", err) + destroyPooledEnv(env) + } else { + return + } + } else if env != nil && env.err != nil { + s.Logf("prewarm provision failed, falling back to synchronous deploy: %v", env.err) + destroyPooledEnv(env) + } + } + + s.beforeEachSync() +} + +func (s *APISIXDeployer) beforeEachSync() { s.runtimeOpts = s.opts s.namespace = fmt.Sprintf("ingress-apisix-e2e-tests-%s-%d", s.runtimeOpts.Name, time.Now().Nanosecond()) s.kubectlOptions = &k8s.KubectlOptions{ diff --git a/test/e2e/scaffold/apisix_prewarm.go b/test/e2e/scaffold/apisix_prewarm.go new file mode 100644 index 00000000..03091736 --- /dev/null +++ b/test/e2e/scaffold/apisix_prewarm.go @@ -0,0 +1,273 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package scaffold + +import ( + "bytes" + "fmt" + "os" + "strings" + "sync/atomic" + "time" + + "github.com/gruntwork-io/terratest/modules/k8s" + . "github.com/onsi/ginkgo/v2" //nolint:staticcheck + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/utils/ptr" + + "github.com/apache/apisix-ingress-controller/test/e2e/framework" +) + +var nsCounter int64 + +// defaultProfileName is the scaffold Options.Name used by the default profile. +const defaultProfileName = "default" + +// streamRouteLabels marks Gateway API stream-route specs (TCP/TLS/UDP). +var streamRouteLabels = map[string]struct{}{ + "tcproute": {}, + "tlsroute": {}, + "udproute": {}, +} + +// specPrewarmable reports whether the currently running spec may be served from +// the prewarm pool. Stream-route specs (Gateway API TCP/TLS/UDP routes and the +// ApisixRoute stream-route suite) are excluded and fall back to synchronous +// deployment: under apisix-standalone a stream route served from a prewarmed +// data plane is flaky, because the data plane's stream subsystem races with the +// concurrent background provisioning of the next pooled environment, whereas +// HTTP routes tolerate that contention. +func specPrewarmable() bool { + report := CurrentSpecReport() + for _, label := range report.Labels() { + if _, ok := streamRouteLabels[label]; ok { + return false + } + } + for _, text := range report.ContainerHierarchyTexts { + if strings.Contains(text, "StreamRoute") { + return false + } + } + return true +} + +// isPoolable reports whether an environment with these options can be served +// from the prewarm pool. Only the default profile is pooled; webhook-enabled +// and custom-keyed environments fall back to synchronous deployment. +func isPoolable(o Options) bool { + return !o.SkipHooks && + !o.EnableWebhook && + o.ControllerName == "" && + o.APISIXAdminAPIKey == "" +} + +// profileKey identifies the pool an environment belongs to. Within a process +// all default scaffolds share one pool. +func profileKey(o Options) string { + name := o.Name + if name == "" { + name = defaultProfileName + } + return "name=" + name +} + +func formatRegistry(workloadTemplate string) string { + if customRegistry, ok := os.LookupEnv("REGISTRY"); ok { + return strings.ReplaceAll(workloadTemplate, "127.0.0.1:5000", customRegistry) + } + return workloadTemplate +} + +// provisionAPISIXEnv builds a complete default-profile environment using +// error-returning primitives only, so it is safe to run in a background +// goroutine. Any failure is captured in pooledEnv.err for the caller to handle. +func provisionAPISIXEnv(fw *framework.Framework, opts Options) *pooledEnv { + t := &bgTestingT{} + env := &pooledEnv{} + + name := opts.Name + if name == "" { + name = defaultProfileName + } + ns := fmt.Sprintf("ingress-apisix-e2e-tests-%s-p%d-%d", + name, GinkgoParallelProcess(), atomic.AddInt64(&nsCounter, 1)) + env.namespace = ns + env.kubectlOptions = &k8s.KubectlOptions{ + ConfigPath: GetKubeconfig(), + Namespace: ns, + } + env.adminKey = getEnvOrDefault("APISIX_ADMIN_KEY", "edd1c9f034335f136f87ad84b625c8f1") + env.controllerName = fmt.Sprintf("%s/%s", DefaultControllerName, ns) + + if err := k8s.CreateNamespaceE(t, env.kubectlOptions, ns); err != nil { + env.err = fmt.Errorf("creating namespace: %w", err) + return env + } + + // 1) Data plane (APISIX, plus etcd when the provider needs it). + svc, err := provisionDataplane(t, env, opts) + if err != nil { + env.err = err + return env + } + env.dataplaneService = svc + + // 2) Ingress controller. + if err := provisionIngress(t, env); err != nil { + env.err = err + return env + } + + // 3) httpbin test backend. + httpbinSvc, err := provisionHTTPBIN(fw, t, env) + if err != nil { + env.err = err + return env + } + env.httpbinService = httpbinSvc + + return env +} + +func provisionDataplane(t *bgTestingT, env *pooledEnv, _ Options) (*corev1.Service, error) { + serviceName := framework.ProviderType + configProvider := framework.ConfigProviderTypeYaml + if framework.ProviderType == framework.ProviderTypeAPISIX { + configProvider = framework.ConfigProviderTypeEtcd + } + + if configProvider == framework.ConfigProviderTypeEtcd { + if err := k8s.KubectlApplyFromStringE(t, env.kubectlOptions, framework.EtcdSpec); err != nil { + return nil, fmt.Errorf("applying etcd: %w", err) + } + if err := framework.WaitPodsAvailable(t, env.kubectlOptions, metav1.ListOptions{ + LabelSelector: "app=etcd", + }); err != nil { + return nil, fmt.Errorf("waiting for etcd pod: %w", err) + } + } + + deployOpts := APISIXDeployOptions{ + Namespace: env.namespace, + AdminKey: env.adminKey, + ServiceName: serviceName, + ServiceHTTPPort: 9080, + ServiceHTTPSPort: 9443, + ConfigProvider: configProvider, + Replicas: ptr.To(1), + } + buf := bytes.NewBuffer(nil) + if err := framework.APISIXStandaloneTpl.Execute(buf, &deployOpts); err != nil { + return nil, fmt.Errorf("rendering apisix template: %w", err) + } + if err := k8s.KubectlApplyFromStringE(t, env.kubectlOptions, buf.String()); err != nil { + return nil, fmt.Errorf("applying apisix: %w", err) + } + if err := framework.WaitPodsAvailable(t, env.kubectlOptions, metav1.ListOptions{ + LabelSelector: "app.kubernetes.io/name=apisix", + }); err != nil { + return nil, fmt.Errorf("waiting for apisix pod: %w", err) + } + + svc, err := k8s.GetServiceE(t, env.kubectlOptions, serviceName) + if err != nil { + return nil, fmt.Errorf("getting dataplane service: %w", err) + } + return svc, nil +} + +func provisionIngress(t *bgTestingT, env *pooledEnv) error { + opts := framework.IngressDeployOpts{ + ControllerName: env.controllerName, + ProviderType: framework.ProviderType, + ProviderSyncPeriod: 1 * time.Hour, + Namespace: env.namespace, + Replicas: ptr.To(1), + WebhookEnable: false, + } + buf := bytes.NewBuffer(nil) + if err := framework.IngressSpecTpl.Execute(buf, opts); err != nil { + return fmt.Errorf("rendering ingress template: %w", err) + } + if err := k8s.KubectlApplyFromStringE(t, env.kubectlOptions, buf.String()); err != nil { + return fmt.Errorf("applying ingress controller: %w", err) + } + if err := framework.WaitPodsAvailable(t, env.kubectlOptions, metav1.ListOptions{ + LabelSelector: "control-plane=controller-manager", + }); err != nil { + return fmt.Errorf("waiting for controller pod: %w", err) + } + return nil +} + +func provisionHTTPBIN(fw *framework.Framework, t *bgTestingT, env *pooledEnv) (*corev1.Service, error) { + deployment := fmt.Sprintf(formatRegistry(_httpbinDeploymentTemplate), 1) + if err := k8s.KubectlApplyFromStringE(t, env.kubectlOptions, deployment); err != nil { + return nil, fmt.Errorf("applying httpbin deployment: %w", err) + } + if err := k8s.KubectlApplyFromStringE(t, env.kubectlOptions, _httpService); err != nil { + return nil, fmt.Errorf("applying httpbin service: %w", err) + } + if err := fw.EnsureServiceReadyE(env.namespace, HTTPBinServiceName, 1); err != nil { + return nil, fmt.Errorf("waiting for httpbin endpoints: %w", err) + } + svc, err := k8s.GetServiceE(t, env.kubectlOptions, HTTPBinServiceName) + if err != nil { + return nil, fmt.Errorf("getting httpbin service: %w", err) + } + return svc, nil +} + +// loadPooledEnv installs a prewarmed environment onto the deployer's scaffold so +// the rest of the spec behaves exactly as if it had been deployed synchronously. +// +// Port-forward tunnels are (re)created here, on the spec's critical path, rather +// than reused from the background prewarm worker. A port-forward opened during +// prewarm can be left in a broken state when it is established before the data +// plane's listener is fully serving (e.g. the TLS stream listener) and then sits +// idle in the pool buffer until a spec picks it up, surfacing as an EOF on first +// use. Recreating them here against a fully-provisioned data plane matches the +// synchronous path exactly; tunnel setup is cheap (~1-2s) relative to the +// deploy/readiness latency that prewarm hides. +func (s *APISIXDeployer) loadPooledEnv(env *pooledEnv) error { + s.runtimeOpts = s.opts + s.namespace = env.namespace + s.kubectlOptions = env.kubectlOptions + s.runtimeOpts.ControllerName = env.controllerName + s.runtimeOpts.APISIXAdminAPIKey = env.adminKey + s.dataplaneService = env.dataplaneService + s.httpbinService = env.httpbinService + s.finalizers = nil + s.additionalGateways = make(map[string]*GatewayResources) + + apisixTunnels, err := s.createDataplaneTunnels(env.dataplaneService, s.kubectlOptions, env.dataplaneService.Name) + if err != nil { + return fmt.Errorf("creating dataplane tunnels: %w", err) + } + s.apisixTunnels = apisixTunnels + + adminTunnel, err := s.createAdminTunnel(env.dataplaneService) + if err != nil { + return fmt.Errorf("creating admin tunnel: %w", err) + } + s.adminTunnel = adminTunnel + + return nil +} diff --git a/test/e2e/scaffold/envpool.go b/test/e2e/scaffold/envpool.go new file mode 100644 index 00000000..5517cd7f --- /dev/null +++ b/test/e2e/scaffold/envpool.go @@ -0,0 +1,210 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package scaffold + +import ( + "fmt" + "os" + "strconv" + "sync" + + "github.com/gruntwork-io/terratest/modules/k8s" + corev1 "k8s.io/api/core/v1" +) + +// pooledEnv is a fully provisioned, ready-to-use test environment produced by +// the prewarm pool ahead of time, so a spec's BeforeEach can pick it up without +// paying the deploy/readiness latency on its critical path. +type pooledEnv struct { + namespace string + kubectlOptions *k8s.KubectlOptions + controllerName string + adminKey string + dataplaneService *corev1.Service + httpbinService *corev1.Service + + // err is non-nil if provisioning failed; the consumer falls back to a + // synchronous deploy and discards this env. + err error +} + +// bgTestingT is a minimal terratest testing.TestingT implementation usable from +// background goroutines (the prewarm workers run outside any Ginkgo spec, so +// Ginkgo's GinkgoT/Expect must not be used there). Fatal/FailNow abort the +// current provision via panic, which safeProvision recovers into pooledEnv.err. +type bgTestingT struct{} + +type bgAbort struct{ msg string } + +func (bgAbort) Error() string { return "prewarm provision aborted" } + +func (t *bgTestingT) Fail() {} +func (t *bgTestingT) FailNow() { panic(bgAbort{msg: "FailNow"}) } +func (t *bgTestingT) Fatal(args ...any) { panic(bgAbort{msg: fmt.Sprint(args...)}) } +func (t *bgTestingT) Fatalf(f string, args ...any) { panic(bgAbort{msg: fmt.Sprintf(f, args...)}) } +func (t *bgTestingT) Error(args ...any) {} +func (t *bgTestingT) Errorf(f string, args ...any) {} +func (t *bgTestingT) Name() string { return "prewarm" } + +// envPool maintains a small set of prewarmed environments for one profile. +// A buffered channel of capacity `depth` holds ready environments; `depth` +// worker goroutines keep refilling it. With depth=1 this is a double buffer: +// one env is ready while the next is being built concurrently with the +// currently running spec. +type envPool struct { + ch chan *pooledEnv + stop chan struct{} + stopOnce sync.Once + wg sync.WaitGroup + provision func() *pooledEnv +} + +func newEnvPool(depth int, provision func() *pooledEnv) *envPool { + if depth < 1 { + depth = 1 + } + p := &envPool{ + ch: make(chan *pooledEnv, depth), + stop: make(chan struct{}), + provision: provision, + } + for i := 0; i < depth; i++ { + p.wg.Add(1) + go p.worker() + } + return p +} + +func (p *envPool) worker() { + defer p.wg.Done() + for { + select { + case <-p.stop: + return + default: + } + env := safeProvision(p.provision) + select { + case p.ch <- env: + case <-p.stop: + // Shutting down before this env was consumed: clean it up. + destroyPooledEnv(env) + return + } + } +} + +func (p *envPool) acquire() *pooledEnv { + select { + case env := <-p.ch: + return env + case <-p.stop: + return nil + } +} + +func (p *envPool) shutdown() { + p.stopOnce.Do(func() { close(p.stop) }) + p.wg.Wait() + // Drain and destroy any environments still buffered. + for { + select { + case env := <-p.ch: + destroyPooledEnv(env) + default: + return + } + } +} + +// safeProvision runs a provision function, converting any panic (including a +// bgTestingT Fatal/FailNow) into a pooledEnv carrying the error. +func safeProvision(provision func() *pooledEnv) (env *pooledEnv) { + defer func() { + if r := recover(); r != nil { + if env == nil { + env = &pooledEnv{} + } + env.err = fmt.Errorf("panic during prewarm provision: %v", r) + } + }() + return provision() +} + +// destroyPooledEnv tears down an environment that will not be used by a spec by +// deleting its namespace. Tunnels are only created once an env is handed to a +// spec (see loadPooledEnv), so an unused pooled env owns no port-forwards. +func destroyPooledEnv(env *pooledEnv) { + if env == nil { + return + } + if env.namespace != "" && env.kubectlOptions != nil { + _ = k8s.RunKubectlE(&bgTestingT{}, env.kubectlOptions, "delete", "namespace", env.namespace, "--wait=false") + } +} + +// --- process-global pool registry, keyed by profile ------------------------- + +var ( + poolsMu sync.Mutex + pools = map[string]*envPool{} +) + +func getOrStartPool(key string, depth int, provision func() *pooledEnv) *envPool { + poolsMu.Lock() + defer poolsMu.Unlock() + if p, ok := pools[key]; ok { + return p + } + p := newEnvPool(depth, provision) + pools[key] = p + return p +} + +// ShutdownAllPools stops every prewarm pool and tears down any environments +// still buffered. It is registered as an AfterSuite by the e2e suite root, not +// here, so that suites which merely import this package (e.g. the benchmark +// suite, which declares its own AfterSuite) are not given a duplicate node. +func ShutdownAllPools() { + poolsMu.Lock() + ps := pools + pools = map[string]*envPool{} + poolsMu.Unlock() + for _, p := range ps { + p.shutdown() + } +} + +// --- knobs ------------------------------------------------------------------ + +// prewarmEnabled reports whether the prewarm pool is active. It is on by +// default and can be disabled with E2E_PREWARM=false. +func prewarmEnabled() bool { + return getEnvOrDefault("E2E_PREWARM", "true") != "false" +} + +// prewarmDepth is the pool depth (ready + in-flight environments) per profile +// per process. Default 1 (double buffer). Override with E2E_PREWARM_DEPTH. +func prewarmDepth() int { + if v := os.Getenv("E2E_PREWARM_DEPTH"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + return n + } + } + return 1 +} diff --git a/test/e2e/scaffold/scaffold.go b/test/e2e/scaffold/scaffold.go index 17e2a152..45a4c96b 100644 --- a/test/e2e/scaffold/scaffold.go +++ b/test/e2e/scaffold/scaffold.go @@ -145,7 +145,7 @@ func (s *Scaffold) AdminKey() string { // NewScaffold creates an e2e test scaffold. func NewScaffold(o Options) *Scaffold { if o.Name == "" { - o.Name = "default" + o.Name = defaultProfileName } if o.Kubeconfig == "" { o.Kubeconfig = GetKubeconfig()