Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
318 changes: 318 additions & 0 deletions background_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,318 @@
package frankenphp

// #include "frankenphp.h"
import "C"
import (
"fmt"
"log/slog"
"strings"
"sync"
"sync/atomic"
"unsafe"
)

// defaultMaxBackgroundWorkers is the default safety cap for catch-all
// background workers when the user doesn't set max_threads. Caps the
// number of distinct lazy-started instances from a single catch-all.
const defaultMaxBackgroundWorkers = 16

// BackgroundScope identifies an isolation boundary for background workers.
// Each php_server block uses a distinct scope so that two blocks can
// declare workers with the same name without conflict. The zero value is
// the global/embed scope (used when no per-block scope was assigned).
// Representation is opaque; obtain values via NextBackgroundWorkerScope.
type BackgroundScope int

var backgroundScopeCounter atomic.Uint64

// NextBackgroundWorkerScope returns a unique scope for background worker
// isolation. Each php_server block should call this once during
// provisioning.
func NextBackgroundWorkerScope() BackgroundScope {
return BackgroundScope(backgroundScopeCounter.Add(1))
}

// backgroundLookups maps scopes to their background worker lookup.
// Scope 0 is the global/embed scope; each php_server block gets its own.
var backgroundLookups map[BackgroundScope]*backgroundWorkerLookup

// backgroundWorkerLookup maps worker names to their registry, with a
// separate slot for the catch-all (name-less) declaration.
type backgroundWorkerLookup struct {
byName map[string]*backgroundWorkerRegistry
catchAll *backgroundWorkerRegistry
}

func newBackgroundWorkerLookup() *backgroundWorkerLookup {
return &backgroundWorkerLookup{
byName: make(map[string]*backgroundWorkerRegistry),
}
}

// resolve returns the registry for the given name, falling back to
// catch-all. Returns nil if neither matches.
func (l *backgroundWorkerLookup) resolve(name string) *backgroundWorkerRegistry {
if r, ok := l.byName[name]; ok {
return r
}
return l.catchAll
}

// backgroundWorkerRegistry tracks the template options from a single
// declaration plus the live instances started from it. Named declarations
// have at most one entry keyed by their declared name; the catch-all can
// have many, up to maxWorkers.
type backgroundWorkerRegistry struct {
entrypoint string
maxWorkers int // cap for catch-all instances; 0 = unlimited (named workers)

mu sync.Mutex
workers map[string]*worker

// Template options preserved so lazy-started workers inherit the same
// scope/env/watch/failure policy as their eagerly-started siblings.
scope BackgroundScope
env PreparedEnv
watch []string
maxConsecutiveFailures int
requestOptions []RequestOption

// declared is the pre-existing *worker for a named declaration. It
// lets lazy-start (num=0 named) reuse the already-built worker
// struct without scanning the global workersByName map (which is not
// scope-aware: scoped bg workers sharing a user-facing name would
// otherwise collide). nil for catch-all registries (each lazy-
// started name gets a fresh worker).
declared *worker
}

func newBackgroundWorkerRegistry(entrypoint string) *backgroundWorkerRegistry {
return &backgroundWorkerRegistry{
entrypoint: entrypoint,
workers: make(map[string]*worker),
maxConsecutiveFailures: -1,
}
}

// buildBackgroundWorkerLookups constructs a scope->lookup map from declared
// worker options. Each scope (php_server block, or 0 for global/embed)
// gets its own lookup so workers declared with the same name in different
// blocks don't collide. Each declaration gets its own registry so shared-
// entrypoint declarations keep their own template options.
func buildBackgroundWorkerLookups(workers []*worker, opts []workerOpt) map[BackgroundScope]*backgroundWorkerLookup {
lookups := make(map[BackgroundScope]*backgroundWorkerLookup)

for i, o := range opts {
if !o.isBackgroundWorker {
continue
}

scope := o.backgroundScope
lookup, ok := lookups[scope]
if !ok {
lookup = newBackgroundWorkerLookup()
lookups[scope] = lookup
}

registry := newBackgroundWorkerRegistry(o.fileName)
registry.scope = scope
registry.env = o.env
registry.watch = o.watch
registry.maxConsecutiveFailures = o.maxConsecutiveFailures
registry.requestOptions = o.requestOptions

w := workers[i]
w.backgroundScope = scope
phpName := strings.TrimPrefix(w.name, "m#")
if phpName != "" && phpName != w.fileName {
lookup.byName[phpName] = registry
// Named declaration: remember the *worker so lazy-start can
// reuse it instead of scanning workersByName (which is not
// scope-aware for bg workers).
registry.declared = w
// Eagerly-started named workers (num > 0) register themselves
// so a later ensure() observes the live instance instead of
// trying to spawn a duplicate. Lazy named workers (num == 0)
// stay unregistered until ensure() schedules them.
if w.num > 0 {
registry.workers[phpName] = w
}
} else {
maxW := defaultMaxBackgroundWorkers
if o.maxThreads > 1 {
maxW = o.maxThreads
}
registry.maxWorkers = maxW
lookup.catchAll = registry
}

w.backgroundRegistry = registry
}

if len(lookups) == 0 {
return nil
}
return lookups
}

// getLookup returns the background-worker lookup for the given thread.
// The scope is resolved from the thread's handler (for worker threads
// inheriting their worker's scope) or from the request context (for
// regular HTTP threads with WithRequestBackgroundScope).
//
// If the resolved scope has no workers declared (its lookup is nil), the
// caller falls through to the global/embed scope (0) so that globally-
// declared workers remain reachable from scoped requests. Scopes that
// declared their own workers stay strictly isolated because their lookup
// is non-nil.
func getLookup(thread *phpThread) *backgroundWorkerLookup {
if backgroundLookups == nil {
return nil
}
var scope BackgroundScope
if thread != nil {
switch handler := thread.handler.(type) {
case *workerThread:
scope = handler.worker.backgroundScope
case *backgroundWorkerThread:
scope = handler.worker.backgroundScope
default:
if fc, ok := fromContext(thread.context()); ok {
scope = fc.backgroundScope
}
}
}
if scope != 0 {
if l := backgroundLookups[scope]; l != nil {
return l
}
}
return backgroundLookups[0]
}

// ensureBackgroundWorker lazy-starts the named worker if it is not already
// running. Fire-and-forget: returns once the bg worker thread has been
// launched, without waiting for the PHP script to reach any particular
// state. Safe to call concurrently; only the first caller actually
// allocates the instance, the rest see the existing one.
func ensureBackgroundWorker(thread *phpThread, bgWorkerName string) error {
if bgWorkerName == "" {
return fmt.Errorf("background worker name must not be empty")
}
lookup := getLookup(thread)
if lookup == nil {
return fmt.Errorf("no background worker configured")
}
registry := lookup.resolve(bgWorkerName)
if registry == nil || registry.entrypoint == "" {
return fmt.Errorf("no background worker configured for name %q", bgWorkerName)
}

registry.mu.Lock()
if _, ok := registry.workers[bgWorkerName]; ok {
registry.mu.Unlock()
return nil
}

if registry.maxWorkers > 0 && len(registry.workers) >= registry.maxWorkers {
registry.mu.Unlock()
return fmt.Errorf("cannot start background worker %q: limit of %d reached (increase max_threads on the catch-all background worker or declare it as a named worker)", bgWorkerName, registry.maxWorkers)
}

// A num=0 named declaration already created a worker struct at init
// time; reuse it instead of creating a duplicate. Named bg workers
// across distinct scopes share the user-facing PHP name but each has
// its own *worker struct, so we look it up via registry.declared
// rather than the global workersByName map (which is not scope-aware
// for bg workers). For catch-all (registry.declared == nil), each
// lazy-started name gets a fresh *worker.
var w *worker
freshlyBuilt := false
if registry.declared != nil {
w = registry.declared
} else {
// Clone env and slices: newWorker mutates env (writes
// FRANKENPHP_WORKER) and appends to requestOptions, so sharing
// these across lazy-started instances would race with HTTP
// threads reading the originals.
env := make(PreparedEnv, len(registry.env)+1)
for k, v := range registry.env {
env[k] = v
}
watch := append([]string(nil), registry.watch...)
requestOptions := append([]RequestOption(nil), registry.requestOptions...)

var err error
w, err = newWorker(workerOpt{
name: bgWorkerName,
fileName: registry.entrypoint,
num: 1,
isBackgroundWorker: true,
backgroundScope: registry.scope,
env: env,
watch: watch,
maxConsecutiveFailures: registry.maxConsecutiveFailures,
requestOptions: requestOptions,
})
if err != nil {
registry.mu.Unlock()
return fmt.Errorf("failed to create background worker: %w", err)
}
freshlyBuilt = true
}
w.backgroundRegistry = registry
w.backgroundScope = registry.scope
registry.workers[bgWorkerName] = w
registry.mu.Unlock()

t := getInactivePHPThread()
if t == nil {
registry.mu.Lock()
delete(registry.workers, bgWorkerName)
registry.mu.Unlock()
return fmt.Errorf("no available PHP thread for background worker (increase max_threads)")
}

if freshlyBuilt {
scalingMu.Lock()
workers = append(workers, w)
scalingMu.Unlock()
// Intentionally NOT registered in workersByName: bg workers are
// resolved per-scope via backgroundLookups, not via the global
// name map, so two scopes can share a user-facing name.
}

convertToBackgroundWorkerThread(t, w)

if globalLogger.Enabled(globalCtx, slog.LevelInfo) {
globalLogger.LogAttrs(globalCtx, slog.LevelInfo, "background worker started",
slog.String("name", bgWorkerName))
}

return nil
}

// go_frankenphp_ensure_background_worker declares a dependency on one or
// more background workers by name. Each named worker is lazy-started if
// not already running. Fire-and-forget: there is no readiness signal in
// this step, so callers cannot block on the workers reaching any state.
// The C side has already validated that names is non-empty and that every
// element is a non-empty unique string.
//
//export go_frankenphp_ensure_background_worker
func go_frankenphp_ensure_background_worker(threadIndex C.uintptr_t, names **C.char, nameLens *C.size_t, nameCount C.int) *C.char {
thread := phpThreads[threadIndex]
n := int(nameCount)
if n <= 0 {
return nil
}
nameSlice := unsafe.Slice(names, n)
nameLenSlice := unsafe.Slice(nameLens, n)
for i := 0; i < n; i++ {
goName := C.GoStringN(nameSlice[i], C.int(nameLenSlice[i]))
if err := ensureBackgroundWorker(thread, goName); err != nil {
return C.CString(err.Error())
}
}
return nil
}
Loading
Loading