diff --git a/SKILL.md b/SKILL.md index 6c36f07..39a4bf2 100644 --- a/SKILL.md +++ b/SKILL.md @@ -1,6 +1,6 @@ --- name: temporal-developer -description: This skill should be used when the user asks to "create a Temporal workflow", "write a Temporal activity", "debug stuck workflow", "fix non-determinism error", "Temporal Python", "Temporal TypeScript", "workflow replay", "activity timeout", "signal workflow", "query workflow", "worker not starting", "activity keeps retrying", "Temporal heartbeat", "continue-as-new", "child workflow", "saga pattern", "workflow versioning", "durable execution", "reliable distributed systems", or mentions Temporal SDK development. +description: This skill should be used when the user asks to "create a Temporal workflow", "write a Temporal activity", "debug stuck workflow", "fix non-determinism error", "Temporal Python", "Temporal TypeScript", "Temporal Go", "Temporal Golang", "workflow replay", "activity timeout", "signal workflow", "query workflow", "worker not starting", "activity keeps retrying", "Temporal heartbeat", "continue-as-new", "child workflow", "saga pattern", "workflow versioning", "durable execution", "reliable distributed systems", or mentions Temporal SDK development. version: 1.0.0 --- @@ -8,7 +8,7 @@ version: 1.0.0 ## Overview -Temporal is a durable execution platform that makes workflows survive failures automatically. This skill provides guidance for building Temporal applications in Python and TypeScript. +Temporal is a durable execution platform that makes workflows survive failures automatically. This skill provides guidance for building Temporal applications in Python, TypeScript, and Go. ## Core Architecture @@ -92,6 +92,7 @@ Once you've downloaded the file, extract the downloaded archive and add the temp 1. First, read the getting started guide for the language you are working in: - Python -> read `references/python/python.md` - TypeScript -> read `references/typescript/typescript.md` + - Go -> read `references/go/go.md` 2. Second, read appropriate `core` and language-specific references for the task at hand. diff --git a/references/core/determinism.md b/references/core/determinism.md index bf4f1ec..af824d2 100644 --- a/references/core/determinism.md +++ b/references/core/determinism.md @@ -78,9 +78,11 @@ For a few simple cases, like timestamps, random values, UUIDs, etc. the Temporal ## SDK Protection Mechanisms Each Temporal SDK language provides a protection mechanism to make it easier to catch non-determinism errors earlier in development: -- Python: The Python SDK runs workflows in a sandbox that intercepts and aborts non-deterministic calls at runtime. +- Python: The Python SDK runs workflows in a sandbox that intercepts and aborts non-deterministic calls early at runtime. - TypeScript: The TypeScript SDK runs workflows in an isolated V8 sandbox, intercepting many common sources of non-determinism and replacing them automatically with deterministic variants. +- Go: The Go SDK has no runtime sandbox. Therefore, non-determinism bugs will never be immediately appararent, and are usually only observable during replay. The optional `workflowcheck` static analysis tool can be used to check for many sources of non-determinism at compile time. +Regardless of which SDK you are using, it is your responsibility to ensure that workflow code does not contain sources of non-determinism. Use SDK-specific tools as well as replay tests for doing so. ## Detecting Non-Determinism diff --git a/references/core/patterns.md b/references/core/patterns.md index 93f774d..566e6f8 100644 --- a/references/core/patterns.md +++ b/references/core/patterns.md @@ -76,6 +76,7 @@ Client Workflow - Synchronous - caller waits for completion - Can mutate state AND return values - Supports validators to reject invalid updates before they even get persisted into history +- **Validators must NOT mutate workflow state or block** (no activities, sleeps, or commands) — they are read-only, similar to query handlers - Recorded in history **Example Flow**: @@ -424,6 +425,7 @@ Activity calls heartbeat() - Less visibility in Temporal UI (no separate task) - Must complete on the same worker - Not suitable for long-running operations +- **Risk with consecutive local activities:** Local activity completions are only persisted when the current Workflow Task completes. Calling multiple local activities in a row (with nothing in between to yield the Workflow Task) increases the risk of losing work if the worker crashes mid-sequence. If you need a chain of operations with durable checkpoints between each step, use regular activities instead. ## Choosing Between Patterns diff --git a/references/go/advanced-features.md b/references/go/advanced-features.md new file mode 100644 index 0000000..55e4e57 --- /dev/null +++ b/references/go/advanced-features.md @@ -0,0 +1,187 @@ +# Go SDK Advanced Features + +## Schedules + +Create recurring workflow executions using the Schedule API. + +```go +scheduleHandle, err := c.ScheduleClient().Create(ctx, client.ScheduleOptions{ + ID: "daily-report", + Spec: client.ScheduleSpec{ + CronExpressions: []string{"0 9 * * *"}, + }, + Action: &client.ScheduleWorkflowAction{ + ID: "daily-report-workflow", + Workflow: DailyReportWorkflow, + TaskQueue: "reports", + }, +}) +``` + +Using intervals instead of cron: + +```go +scheduleHandle, err := c.ScheduleClient().Create(ctx, client.ScheduleOptions{ + ID: "hourly-sync", + Spec: client.ScheduleSpec{ + Intervals: []client.ScheduleIntervalSpec{ + {Every: time.Hour}, + }, + }, + Action: &client.ScheduleWorkflowAction{ + ID: "hourly-sync-workflow", + Workflow: SyncWorkflow, + TaskQueue: "sync", + }, +}) +``` + +Manage schedules: + +```go +handle := c.ScheduleClient().GetHandle(ctx, "daily-report") + +// Pause / unpause +handle.Pause(ctx, client.SchedulePauseOptions{Note: "Maintenance window"}) +handle.Unpause(ctx, client.ScheduleUnpauseOptions{Note: "Maintenance complete"}) + +// Trigger immediately +handle.Trigger(ctx, client.ScheduleTriggerOptions{}) + +// Describe +desc, err := handle.Describe(ctx) + +// Delete +handle.Delete(ctx) +``` + +## Async Activity Completion + +For activities that complete asynchronously (e.g., human tasks, external callbacks). +If you configure a heartbeat_timeout on this activity, the external completer is responsible for sending heartbeats via the async handle. +If you do NOT set a heartbeat_timeout, no heartbeats are required. + +**Note:** If the external system that completes the asynchronous action can reliably be trusted to do the task and Signal back with the result, and it doesn't need to Heartbeat or receive Cancellation, then consider using **signals** instead. + +**Step 1: Return `activity.ErrResultPending` from the activity.** + +```go +func RequestApproval(ctx context.Context, requestID string) (string, error) { + activityInfo := activity.GetInfo(ctx) + taskToken := activityInfo.TaskToken + + // Store taskToken externally (e.g., database) for later completion + err := storeTaskToken(requestID, taskToken) + if err != nil { + return "", err + } + + // Signal that this activity will be completed externally + return "", activity.ErrResultPending +} +``` + +**Step 2: Complete from another process using the task token.** + +```go +temporalClient, err := client.Dial(client.Options{}) + +// Complete the activity +err = temporalClient.CompleteActivity(ctx, taskToken, "approved", nil) + +// Or fail it +err = temporalClient.CompleteActivity(ctx, taskToken, nil, errors.New("rejected")) +``` + +Or complete by ID (no task token needed): + +```go +err = temporalClient.CompleteActivityByID(ctx, namespace, workflowID, runID, activityID, "approved", nil) +``` + +## Worker Tuning + +Configure `worker.Options` for production workloads: + +```go +w := worker.New(c, "my-task-queue", worker.Options{ + // Max concurrent activity executions (default: 1000) + MaxConcurrentActivityExecutionSize: 500, + + // Max concurrent workflow task executions (default: 1000) + MaxConcurrentWorkflowTaskExecutionSize: 500, + + // Max concurrent activity task pollers (default: 2) + MaxConcurrentActivityTaskPollers: 4, + + // Max concurrent workflow task pollers (default: 2) + MaxConcurrentWorkflowTaskPollers: 4, + + // Graceful shutdown timeout (default: 0) + WorkerStopTimeout: 30 * time.Second, +}) +``` + +Scale pollers based on task queue throughput. If you observe high schedule-to-start latency, increase the number of pollers or add more workers. + +## Sessions + +Go-specific feature for routing multiple activities to the same worker. All activities using the session context execute on the same worker host. + +**Enable on the worker:** + +```go +w := worker.New(c, "fileprocessing", worker.Options{ + EnableSessionWorker: true, + MaxConcurrentSessionExecutionSize: 100, // default: 1000 +}) +``` + +**Use in a workflow:** + +```go +func FileProcessingWorkflow(ctx workflow.Context, file FileParam) error { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + sessionCtx, err := workflow.CreateSession(ctx, &workflow.SessionOptions{ + CreationTimeout: time.Minute, + ExecutionTimeout: 10 * time.Minute, + }) + if err != nil { + return err + } + defer workflow.CompleteSession(sessionCtx) + + // All three activities run on the same worker + var downloadResult string + err = workflow.ExecuteActivity(sessionCtx, DownloadFile, file.URL).Get(sessionCtx, &downloadResult) + if err != nil { + return err + } + + var processResult string + err = workflow.ExecuteActivity(sessionCtx, ProcessFile, downloadResult).Get(sessionCtx, &processResult) + if err != nil { + return err + } + + err = workflow.ExecuteActivity(sessionCtx, UploadFile, processResult).Get(sessionCtx, nil) + return err +} +``` + +Key points: +- `workflow.ErrSessionFailed` is returned if the worker hosting the session dies +- `CompleteSession` releases resources -- always call it (use `defer`) +- Use case: file processing (download, process, upload on same host), GPU workloads, or any pipeline needing local state +- `MaxConcurrentSessionExecutionSize` on `worker.Options` limits how many sessions a single worker can handle + +**Limitations:** +- Sessions do not survive worker process restarts — if the worker dies, the session fails and activities must be retried from the workflow level +- There is no server-side support for sessions — the Go SDK implements them entirely client-side using internal task queue routing +- Session concurrency limiting is per-process, not per-host — only one worker process per host if you rely on this + +**Relationship to worker-specific task queues:** Sessions are essentially a convenience API over the "worker-specific task queue" pattern, where each worker creates a unique task queue and routes activities to it. For simple cases where you don't need separate activities (e.g., download + process + upload can be one unit), consider using a single long-running activity with heartbeating instead. diff --git a/references/go/data-handling.md b/references/go/data-handling.md new file mode 100644 index 0000000..e887e7b --- /dev/null +++ b/references/go/data-handling.md @@ -0,0 +1,262 @@ +# Go SDK Data Handling + +## Overview + +The Go SDK uses the `converter.DataConverter` interface to serialize/deserialize workflow inputs, outputs, and activity parameters. The default converter converts values to JSON. + +## Default Data Converter + +The default `CompositeDataConverter` applies converters in order until one returns a non-nil Payload: + +1. `converter.NewNilPayloadConverter()` -- nil values +2. `converter.NewByteSlicePayloadConverter()` -- `[]byte` +3. `converter.NewProtoJSONPayloadConverter()` -- Protobuf messages as JSON +4. `converter.NewProtoPayloadConverter()` -- Protobuf messages as binary +5. `converter.NewJSONPayloadConverter()` -- anything JSON-serializable + +Structs must have exported fields to be serialized. + +## Custom Data Converter + +In most cases you don't implement the full `DataConverter` interface directly. Instead, implement a **`PayloadConverter`** for your specific type and insert it into a `CompositeDataConverter`. The `PayloadConverter` interface has four methods: + +```go +type PayloadConverter interface { + ToPayload(value interface{}) (*commonpb.Payload, error) // return nil if this type isn't handled + FromPayload(payload *commonpb.Payload, valuePtr interface{}) error + ToString(payload *commonpb.Payload) string + Encoding() string // e.g. "json/msgpack" +} +``` + +**Example — custom msgpack PayloadConverter:** + +```go +import ( + "encoding/json" + "fmt" + + commonpb "go.temporal.io/api/common/v1" + "go.temporal.io/sdk/converter" + "github.com/vmihailenco/msgpack/v5" +) + +const encodingMsgpack = "binary/msgpack" + +type MsgpackPayloadConverter struct{} + +func (c *MsgpackPayloadConverter) Encoding() string { + return encodingMsgpack +} + +func (c *MsgpackPayloadConverter) ToPayload(value interface{}) (*commonpb.Payload, error) { + if value == nil { + return nil, nil + } + data, err := msgpack.Marshal(value) + if err != nil { + return nil, fmt.Errorf("msgpack marshal: %w", err) + } + return &commonpb.Payload{ + Metadata: map[string][]byte{ + converter.MetadataEncoding: []byte(encodingMsgpack), + }, + Data: data, + }, nil +} + +func (c *MsgpackPayloadConverter) FromPayload(payload *commonpb.Payload, valuePtr interface{}) error { + if string(payload.GetMetadata()[converter.MetadataEncoding]) != encodingMsgpack { + return fmt.Errorf("unsupported encoding") + } + return msgpack.Unmarshal(payload.Data, valuePtr) +} + +func (c *MsgpackPayloadConverter) ToString(payload *commonpb.Payload) string { + // Decode to a map for human-readable display + var v interface{} + if err := msgpack.Unmarshal(payload.Data, &v); err != nil { + return fmt.Sprintf("", err) + } + b, _ := json.Marshal(v) + return string(b) +} +``` + +**Register in a CompositeDataConverter and pass to the client:** + +```go +dataConverter := converter.NewCompositeDataConverter( + converter.NewNilPayloadConverter(), + converter.NewByteSlicePayloadConverter(), + &MsgpackPayloadConverter{}, // handles your type; falls through to JSON for everything else + converter.NewJSONPayloadConverter(), +) + +c, err := client.Dial(client.Options{ + DataConverter: dataConverter, +}) +``` + +**Per-activity/child-workflow override** — use a different converter for specific calls: + +```go +actCtx := workflow.WithDataConverter(ctx, mySpecialConverter) +workflow.ExecuteActivity(actCtx, SensitiveActivity, input) +``` + +**Note:** If your converter makes remote calls (e.g., to a KMS for encryption), wrap it with `workflow.DataConverterWithoutDeadlockDetection` to avoid deadlock detection timeouts in workflow code. + +## Composition of Payload Converters + +Use `converter.NewCompositeDataConverter` to chain type-specific converters. The first converter that can handle the type wins. + +```go +dataConverter := converter.NewCompositeDataConverter( + converter.NewNilPayloadConverter(), + converter.NewByteSlicePayloadConverter(), + converter.NewProtoJSONPayloadConverter(), + converter.NewProtoPayloadConverter(), + YourCustomPayloadConverter(), + converter.NewJSONPayloadConverter(), +) +``` + +## Protobuf Support + +Binary protobuf: +```go +converter.NewProtoPayloadConverter() +``` + +JSON protobuf: +```go +converter.NewProtoJSONPayloadConverter() +``` + +Both are included in the default data converter. SDK v1.26.0 (March 2024) migrated from gogo/protobuf to google/protobuf. If you need backward compatibility with older payloads encoded with gogo, use the `LegacyTemporalProtoCompat` option. + +## Payload Encryption + +Implement the `converter.PayloadCodec` interface (`Encode` and `Decode`) and wrap the default data converter: + +```go +// Codec implements converter.PayloadCodec for encryption. +type Codec struct{} + +func (Codec) Encode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + result := make([]*commonpb.Payload, len(payloads)) + for i, p := range payloads { + origBytes, err := p.Marshal() + if err != nil { + return payloads, err + } + encrypted := encrypt(origBytes) // your encryption logic + result[i] = &commonpb.Payload{ + Metadata: map[string][]byte{converter.MetadataEncoding: []byte("binary/encrypted")}, + Data: encrypted, + } + } + return result, nil +} + +func (Codec) Decode(payloads []*commonpb.Payload) ([]*commonpb.Payload, error) { + result := make([]*commonpb.Payload, len(payloads)) + for i, p := range payloads { + if string(p.Metadata[converter.MetadataEncoding]) != "binary/encrypted" { + result[i] = p + continue + } + decrypted := decrypt(p.Data) // your decryption logic + result[i] = &commonpb.Payload{} + err := result[i].Unmarshal(decrypted) + if err != nil { + return payloads, err + } + } + return result, nil +} +``` + +Wrap with `CodecDataConverter` and pass to client: + +```go +var DataConverter = converter.NewCodecDataConverter( + converter.GetDefaultDataConverter(), + &Codec{}, +) + +c, err := client.Dial(client.Options{ + DataConverter: DataConverter, +}) +``` + +## Search Attributes + +Set at workflow start: + +```go +handle, err := c.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: "order-123", + TaskQueue: "orders", + SearchAttributes: map[string]interface{}{ + "OrderStatus": "pending", + "CustomerId": "cust-456", + }, +}, OrderWorkflow, input) +``` + +Upsert from within a workflow: + +```go +err := workflow.UpsertSearchAttributes(ctx, map[string]interface{}{ + "OrderStatus": "completed", +}) +``` + +Typed search attributes (v1.26.0+, preferred): + +```go +var OrderStatusKey = temporal.NewSearchAttributeKeyKeyword("OrderStatus") + +err := workflow.UpsertTypedSearchAttributes(ctx, OrderStatusKey.ValueSet("completed")) +``` + +Query workflows by search attributes: + +```go +resp, err := c.ListWorkflow(ctx, &workflowservice.ListWorkflowExecutionsRequest{ + Query: `OrderStatus = "pending" AND CustomerId = "cust-456"`, +}) +``` + +## Workflow Memo + +Set in start options: + +```go +handle, err := c.ExecuteWorkflow(ctx, client.StartWorkflowOptions{ + ID: "order-123", + TaskQueue: "orders", + Memo: map[string]interface{}{ + "customerName": "Alice", + "notes": "Priority customer", + }, +}, OrderWorkflow, input) +``` + +Read memo from workflow info. Upsert memo (Go SDK only): + +```go +err := workflow.UpsertMemo(ctx, map[string]interface{}{ + "notes": "Updated notes", +}) +``` + +## Best Practices + +1. Use structs with exported fields for inputs and outputs +2. Prefer JSON for readability during development, protobuf for performance in production +3. Keep payloads small -- see `references/core/gotchas.md` for limits +4. Use `PayloadCodec` for encryption; never store sensitive data unencrypted +5. Configure the same data converter on both client and worker diff --git a/references/go/determinism-protection.md b/references/go/determinism-protection.md new file mode 100644 index 0000000..4a6f5f4 --- /dev/null +++ b/references/go/determinism-protection.md @@ -0,0 +1,98 @@ +# Go Workflow Determinism Protection + +## Overview + +The Go SDK has no runtime sandbox. Determinism is enforced by **developer convention** and **optional static analysis**. Unlike the Python and TypeScript SDKs, the Go SDK will not intercept or replace non-deterministic calls at runtime. The Go SDK does perform a limited runtime command-ordering check, but catching non-deterministic code before deployment requires the `workflowcheck` tool and testing, in particular replay tests (see `references/go/testing`). + +## workflowcheck Static Analysis + +### Install + +```bash +go install go.temporal.io/sdk/contrib/tools/workflowcheck@latest +``` + +### Run + +```bash +workflowcheck ./... +``` + +No output means all registered workflows are deterministic. Non-deterministic code produces hierarchical output showing the call chain to the offending code. + +Use `-show-pos` for exact file positions: + +```bash +workflowcheck -show-pos ./... +``` + +### What It Detects + +**Non-deterministic functions/variables:** +- `time.Now` -- obtaining current time +- `time.Sleep` -- sleeping +- `crypto/rand.Reader` -- crypto random reader +- `math/rand.globalRand` -- global pseudorandom +- `os.Stdin`, `os.Stdout`, `os.Stderr` -- standard I/O streams + +**Non-deterministic Go constructs:** +- Starting a goroutine (`go func()`) +- Sending to a channel +- Receiving from a channel +- Iterating over a channel via `range` +- Iterating over a map via `range` + +### Limitations + +`workflowcheck` cannot catch everything. It does **not** detect: +- Global variable mutation +- Non-determinism via reflection +- Runtime-conditional non-determinism + +### Suppressing False Positives + +Add `//workflowcheck:ignore` on or directly above the offending line: + +```go +now := time.Now() //workflowcheck:ignore +``` + +For broader suppression, use a YAML config file: + +```yaml +# workflowcheck.config.yaml +decls: + path/to/package.MyDeterministicFunc: false +``` + +```bash +workflowcheck -config workflowcheck.config.yaml ./... +``` + +## Determinism Rules + +**You must:** +- Use `workflow.Go(ctx, func(ctx workflow.Context) { ... })` instead of `go` +- Use `workflow.NewChannel(ctx)` instead of `chan` +- Use `workflow.NewSelector(ctx)` instead of `select` +- Use `workflow.Sleep(ctx, duration)` instead of `time.Sleep()` +- Use `workflow.Now(ctx)` instead of `time.Now()` +- Use `workflow.GetLogger(ctx)` instead of `fmt.Println` / `log.Println` +- Sort map keys before iterating, or use `workflow.SideEffect` / an activity + +**You must not:** +- Start native goroutines +- Use native channels or `select` +- Call `time.Now()` or `time.Sleep()` +- Use `math/rand` global functions or `crypto/rand.Reader` +- Access `os.Stdin`, `os.Stdout`, or `os.Stderr` +- Mutate global variables +- Make network calls, file I/O, or database queries (use activities) + +## Best Practices + +1. **Run `workflowcheck` in CI / pre-commit** -- catch non-deterministic code before it reaches production +2. **Keep workflow code thin** -- workflows should orchestrate; delegate all I/O and non-deterministic work to activities +3. **Use struct methods for activities** -- keeps imports clean and avoids pulling non-deterministic dependencies into workflow files +4. **Separate workflow and activity files** -- reduces the surface area that `workflowcheck` needs to analyze and keeps concerns isolated +5. **Test with replay** after any workflow code change to verify backward compatibility diff --git a/references/go/determinism.md b/references/go/determinism.md new file mode 100644 index 0000000..0cff905 --- /dev/null +++ b/references/go/determinism.md @@ -0,0 +1,52 @@ +# Go SDK Determinism + +## Overview + +The Go SDK has NO runtime sandbox (unlike Python/TypeScript). Workflows must be deterministic for replay, and determinism is enforced entirely by developer convention and optional static analysis via the `workflowcheck` tool (see `references/go/determinism-protection.md`). + +## Why Determinism Matters: History Replay + +Temporal provides durable execution through **History Replay**. When a Worker restores workflow state, it re-executes workflow code from the beginning. This requires the code to be **deterministic**. See `references/core/determinism.md` for a deep explanation. + +## Forbidden Operations + +Do not use any of the following in workflow code: + +- **Native goroutines** (`go func()`) -- use `workflow.Go()` instead +- **Native channels** (`chan`, send, receive, `range` over channel) -- use `workflow.Channel` instead +- **Native `select`** -- use `workflow.Selector` instead +- **`time.Now()`** -- use `workflow.Now(ctx)` instead +- **`time.Sleep()`** -- use `workflow.Sleep(ctx, duration)` instead +- **`math/rand` global** (e.g., `rand.Intn()`) -- use `workflow.SideEffect` instead +- **`crypto/rand.Reader`** -- use an activity instead +- **`os.Stdin` / `os.Stdout` / `os.Stderr`** -- use `workflow.GetLogger(ctx)` for logging +- **Map range iteration** (`for k, v := range myMap`) -- sort keys first, then iterate +- **Mutating global variables** -- use local state or `workflow.SideEffect` +- **Anonymous functions as local activities** -- the name is derived from the function and will be non-deterministic across replays; always use named functions for local activities + +## Safe Builtin Alternatives + +| Instead of | Use | +|---|---| +| `go func() { ... }()` | `workflow.Go(ctx, func(ctx workflow.Context) { ... })` | +| `chan T` | `workflow.NewChannel(ctx)` / `workflow.NewBufferedChannel(ctx, size)` | +| `select { ... }` | `workflow.NewSelector(ctx)` | +| `time.Now()` | `workflow.Now(ctx)` | +| `time.Sleep(d)` | `workflow.Sleep(ctx, d)` | +| `rand.Intn(100)` | `workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { return rand.Intn(100) })` | +| `uuid.New()` | `workflow.SideEffect` or pass as activity result | +| `log.Println(...)` | `workflow.GetLogger(ctx).Info(...)` | + +## Testing Replay Compatibility + +Use `worker.WorkflowReplayer` to verify code changes are compatible with existing histories. See the Workflow Replay Testing section of `references/go/testing.md` + +## Best Practices + +1. Run `workflowcheck ./...` in CI to catch non-deterministic code early +2. Always use `workflow.*` APIs instead of native Go concurrency and time primitives +3. Move all I/O operations (network, filesystem, database) into activities +4. Sort map keys before iterating if you must iterate over a map in workflow code +5. Use `workflow.GetLogger(ctx)` instead of `fmt.Println` or `log.Println` for replay-safe logging +6. Keep workflow code focused on orchestration; delegate non-deterministic work to activities +7. Test with replay after making changes to workflow definitions diff --git a/references/go/error-handling.md b/references/go/error-handling.md new file mode 100644 index 0000000..92a856b --- /dev/null +++ b/references/go/error-handling.md @@ -0,0 +1,184 @@ +# Go SDK Error Handling + +## Overview + +The Go SDK uses error return values (not exceptions). All Temporal errors implement the `error` interface. Activity errors returned to workflows are wrapped in `*temporal.ActivityError`; use `errors.As` to unwrap them. + +## Application Errors + +```go +import "go.temporal.io/sdk/temporal" + +func ValidateOrder(ctx context.Context, order Order) error { + if !order.IsValid() { + return temporal.NewApplicationError( + "Invalid order", + "ValidationError", + ) + } + return nil +} +``` + +`temporal.NewApplicationError(message, errType, details...)` creates a retryable `*temporal.ApplicationError`. Use `NewApplicationErrorWithCause` to include a wrapped cause. + +## Non-Retryable Errors + +```go +func ChargeCard(ctx context.Context, input ChargeCardInput) (string, error) { + if !isValidCard(input.CardNumber) { + return "", temporal.NewNonRetryableApplicationError( + "Permanent failure - invalid credit card", + "PaymentError", + nil, // cause + ) + } + return processPayment(input.CardNumber, input.Amount) +} +``` + +`temporal.NewNonRetryableApplicationError(message, errType, cause, details...)` is always non-retryable regardless of RetryPolicy. You can also mark error types as non-retryable in the RetryPolicy instead: + +```go +RetryPolicy: &temporal.RetryPolicy{ + NonRetryableErrorTypes: []string{"PaymentError", "ValidationError"}, +}, +``` + +## Handling Activity Errors in Workflows + +```go +import ( + "errors" + + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" +) + +func MyWorkflow(ctx workflow.Context) (string, error) { + var result string + err := workflow.ExecuteActivity(ctx, RiskyActivity).Get(ctx, &result) + if err != nil { + var applicationErr *temporal.ApplicationError + if errors.As(err, &applicationErr) { + switch applicationErr.Type() { + case "ValidationError": + // handle validation error + case "PaymentError": + // handle payment error + default: + // handle unknown error type + } + } + + var timeoutErr *temporal.TimeoutError + if errors.As(err, &timeoutErr) { + switch timeoutErr.TimeoutType() { + case enumspb.TIMEOUT_TYPE_START_TO_CLOSE: + // handle start-to-close timeout + case enumspb.TIMEOUT_TYPE_HEARTBEAT: + // handle heartbeat timeout + } + } + + var canceledErr *temporal.CanceledError + if errors.As(err, &canceledErr) { + // handle cancellation + } + + var panicErr *temporal.PanicError + if errors.As(err, &panicErr) { + // panicErr.Error() and panicErr.StackTrace() + } + + return "", err + } + return result, nil +} +``` + +## Retry Configuration + +```go +import ( + "time" + + "go.temporal.io/sdk/temporal" + "go.temporal.io/sdk/workflow" +) + +func MyWorkflow(ctx workflow.Context) error { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: 10 * time.Minute, + RetryPolicy: &temporal.RetryPolicy{ + InitialInterval: time.Second, + BackoffCoefficient: 2.0, + MaximumInterval: time.Minute, + MaximumAttempts: 5, + NonRetryableErrorTypes: []string{"ValidationError", "PaymentError"}, + }, + } + ctx = workflow.WithActivityOptions(ctx, ao) + return workflow.ExecuteActivity(ctx, MyActivity).Get(ctx, nil) +} +``` + +Only set options such as `MaximumInterval`, `MaximumAttempts`, etc. if you have a domain-specific reason to. If not, prefer to leave them at their defaults. + +## Timeout Configuration + +```go +ao := workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, // Single attempt max duration + ScheduleToCloseTimeout: 30 * time.Minute, // Total time including retries + ScheduleToStartTimeout: 10 * time.Minute, // Time waiting in task queue + HeartbeatTimeout: 2 * time.Minute, // Between heartbeats +} +ctx = workflow.WithActivityOptions(ctx, ao) +``` + +- **StartToCloseTimeout**: Max time for a single Activity Task Execution. Prefer this over ScheduleToCloseTimeout. +- **ScheduleToCloseTimeout**: Total time including retries. +- **ScheduleToStartTimeout**: Time an Activity Task can wait in the Task Queue before a Worker picks it up. Rarely needed. +- **HeartbeatTimeout**: Max time between heartbeats. Required for long-running activities to detect failures. + +Either `StartToCloseTimeout` or `ScheduleToCloseTimeout` must be set. + +## Workflow Failure + +Returning any error from a workflow function fails the execution. Return `nil` for success. + +**Important Go-specific behavior:** In the Go SDK, returning any error from a workflow fails the workflow execution by default — there is no automatic retry. This differs from other SDKs (Python, TypeScript) where non-`ApplicationError` exceptions cause the workflow task to retry indefinitely. In Go, if you want workflow-level retries, you must explicitly set a `RetryPolicy` on the `StartWorkflowOptions`. + +```go +func MyWorkflow(ctx workflow.Context) (string, error) { + if someCondition { + return "", temporal.NewApplicationError( + "Cannot process order", + "BusinessError", + ) + } + return "success", nil +} +``` + +To prevent workflow retry, return a non-retryable error: + +```go +return "", temporal.NewNonRetryableApplicationError( + "Unrecoverable failure", + "FatalError", + nil, +) +``` + +**Note:** If an activity returns a non-retryable error, the workflow receives an `*temporal.ActivityError` wrapping it. To fail the workflow without retry, wrap it in a new `NewNonRetryableApplicationError`. + +## Best Practices + +1. Use specific error types for different failure modes +2. Mark permanent failures as non-retryable +3. Set appropriate timeouts; prefer `StartToCloseTimeout` over `ScheduleToCloseTimeout` +4. Let Temporal handle retries via RetryPolicy rather than implementing retry logic yourself +5. Use `errors.As` to unwrap and inspect specific error types +6. Design activities to be idempotent for safe retries (see `references/core/patterns.md`) diff --git a/references/go/go.md b/references/go/go.md new file mode 100644 index 0000000..cc87a6a --- /dev/null +++ b/references/go/go.md @@ -0,0 +1,242 @@ +# Temporal Go SDK Reference + +## Overview + +The Temporal Go SDK (`go.temporal.io/sdk`) provides a strongly-typed, idiomatic Go approach to building durable workflows. Workflows are regular exported Go functions. The Go SDK does not have an automatic sandbox -- determinism is the developer's responsibility, aided by the `workflowcheck` static analysis tool. + +## Quick Start + +**Add Dependency:** In your Go module, add the Temporal SDK: +```bash +go get go.temporal.io/sdk +``` + +**workflows/greeting.go** - Workflow definition: +```go +package workflows + +import ( + "time" + + "go.temporal.io/sdk/workflow" +) + +func GreetingWorkflow(ctx workflow.Context, name string) (string, error) { + ao := workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + } + ctx = workflow.WithActivityOptions(ctx, ao) + + var result string + err := workflow.ExecuteActivity(ctx, "Greet", name).Get(ctx, &result) + if err != nil { + return "", err + } + return result, nil +} +``` + +**activities/greet.go** - Activity definition: +```go +package activities + +import ( + "context" + "fmt" +) + +type Activities struct{} + +func (a *Activities) Greet(ctx context.Context, name string) (string, error) { + return fmt.Sprintf("Hello, %s!", name), nil +} +``` + +**worker/main.go** - Worker setup: +```go +package main + +import ( + "log" + + "yourmodule/activities" + "yourmodule/workflows" + + "go.temporal.io/sdk/client" + "go.temporal.io/sdk/worker" +) + +func main() { + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + w := worker.New(c, "my-task-queue", worker.Options{}) + + w.RegisterWorkflow(workflows.GreetingWorkflow) + w.RegisterActivity(&activities.Activities{}) + + err = w.Run(worker.InterruptCh()) + if err != nil { + log.Fatalln("Unable to start worker", err) + } +} +``` + +**Start the dev server:** Start `temporal server start-dev` in the background. + +**Start the worker:** Run `go run worker/main.go` in the background. + +**starter/main.go** - Start a workflow execution: +```go +package main + +import ( + "context" + "fmt" + "log" + + "yourmodule/workflows" + + "github.com/google/uuid" + "go.temporal.io/sdk/client" +) + +func main() { + c, err := client.Dial(client.Options{}) + if err != nil { + log.Fatalln("Unable to create client", err) + } + defer c.Close() + + options := client.StartWorkflowOptions{ + ID: uuid.NewString(), + TaskQueue: "my-task-queue", + } + + we, err := c.ExecuteWorkflow(context.Background(), options, workflows.GreetingWorkflow, "my name") + if err != nil { + log.Fatalln("Unable to execute workflow", err) + } + + var result string + err = we.Get(context.Background(), &result) + if err != nil { + log.Fatalln("Unable to get workflow result", err) + } + + fmt.Println("Result:", result) +} +``` + +**Run the workflow:** Run `go run starter/main.go`. Should output: `Result: Hello, my name!`. + +## Key Concepts + +### Workflow Definition +- Exported function with `workflow.Context` as the first parameter +- Returns `(ResultType, error)` or just `error` +- Signature: `func MyWorkflow(ctx workflow.Context, input MyInput) (MyOutput, error)` +- Use `workflow.SetQueryHandler()`, `workflow.SetUpdateHandler()` for handlers +- Register with `w.RegisterWorkflow(MyWorkflow)` + +### Activity Definition +- Regular function or struct methods with `context.Context` as the first parameter +- Struct methods are preferred for dependency injection +- Signature: `func (a *Activities) MyActivity(ctx context.Context, input string) (string, error)` +- Register struct with `w.RegisterActivity(&Activities{})` (registers all exported methods) + +### Worker Setup +- Create client with `client.Dial(client.Options{})` +- Create worker with `worker.New(c, "task-queue", worker.Options{})` +- Register workflows and activities +- Run with `w.Run(worker.InterruptCh())` + +### Determinism + +**Workflow code must be deterministic!** The Go SDK has no sandbox -- determinism is enforced by convention and tooling. + +Use Temporal replacements instead of native Go constructs: +- `workflow.Go()` instead of `go` (goroutines) +- `workflow.Channel` instead of `chan` +- `workflow.Selector` instead of `select` +- `workflow.Sleep()` instead of `time.Sleep()` +- `workflow.Now()` instead of `time.Now()` +- `workflow.GetLogger()` instead of `log` / `fmt.Println` for replay-safe logging + +Use the **`workflowcheck`** static analysis tool to catch non-deterministic code: +```bash +go install go.temporal.io/sdk/contrib/tools/workflowcheck@latest +workflowcheck ./... +``` + +Read `references/core/determinism.md` and `references/go/determinism.md` to understand more. + +## File Organization Best Practice + +**Use separate packages for workflows, activities, and worker.** Activities as struct methods enable dependency injection at the worker level. + +``` +myapp/ +├── workflows/ +│ └── greeting.go # Only Workflow functions +├── activities/ +│ └── greet.go # Activity struct and methods +├── worker/ +│ └── main.go # Worker setup, imports both +└── starter/ + └── main.go # Client code to start workflows +``` + +**Activities as struct methods for dependency injection:** +```go +// activities/greet.go +type Activities struct { + HTTPClient *http.Client + DB *sql.DB +} + +func (a *Activities) FetchData(ctx context.Context, url string) (string, error) { + // Use a.HTTPClient, a.DB, etc. +} +``` + +```go +// worker/main.go - inject dependencies at worker startup +activities := &activities.Activities{ + HTTPClient: http.DefaultClient, + DB: db, +} +w.RegisterActivity(activities) +``` + +## Common Pitfalls + +1. **Using native goroutines/channels/select** - Use `workflow.Go()`, `workflow.Channel`, `workflow.Selector` +2. **Using `time.Sleep` or `time.Now`** - Use `workflow.Sleep()` and `workflow.Now()` +3. **Iterating over maps with `range`** - Map iteration order is non-deterministic; sort keys first +4. **Forgetting to register workflows/activities** - Worker will fail tasks for unregistered types +5. **Registering activity functions instead of struct** - Use `w.RegisterActivity(&Activities{})` not `w.RegisterActivity(a.MyMethod)` +6. **Forgetting to heartbeat** - Long-running activities need `activity.RecordHeartbeat(ctx, details)` +7. **Using `fmt.Println` in workflows** - Use `workflow.GetLogger(ctx)` for replay-safe logging +8. **Not setting Activity timeouts** - `StartToCloseTimeout` or `ScheduleToCloseTimeout` is required in `ActivityOptions` + +## Writing Tests + +See `references/go/testing.md` for info on writing tests. + +## Additional Resources + +### Reference Files +- **`references/go/patterns.md`** - Signals, queries, child workflows, saga pattern, etc. +- **`references/go/determinism.md`** - Determinism rules, workflowcheck tool, safe alternatives +- **`references/go/gotchas.md`** - Go-specific mistakes and anti-patterns +- **`references/go/error-handling.md`** - ApplicationError, retry policies, non-retryable errors +- **`references/go/observability.md`** - Logging, metrics, tracing, Search Attributes +- **`references/go/testing.md`** - TestWorkflowEnvironment, time-skipping, activity mocking +- **`references/go/advanced-features.md`** - Schedules, worker tuning, and more +- **`references/go/data-handling.md`** - Data converters, payload codecs, encryption +- **`references/go/versioning.md`** - Patching API (`workflow.GetVersion`), Worker Versioning +- **`references/python/determinism-protection.md`** - Information on **`workflowcheck`** tool to help statically check for determinism issues. diff --git a/references/go/gotchas.md b/references/go/gotchas.md new file mode 100644 index 0000000..4b7ddf3 --- /dev/null +++ b/references/go/gotchas.md @@ -0,0 +1,290 @@ +# Go Gotchas + +Go-specific mistakes and anti-patterns. See also [Common Gotchas](references/core/gotchas.md) for language-agnostic concepts. + +## Goroutines and Concurrency + +### Using Native Go Concurrency Primitives + +**The Problem**: Native `go`, `chan`, and `select` are non-deterministic and will cause replay failures. + +```go +// BAD - Native goroutine +func MyWorkflow(ctx workflow.Context) error { + go func() { // Non-deterministic! + // do work + }() + return nil +} + +// GOOD - Use workflow.Go +func MyWorkflow(ctx workflow.Context) error { + workflow.Go(ctx, func(gCtx workflow.Context) { + // do work + }) + return nil +} +``` + +```go +// BAD - Native channel +func MyWorkflow(ctx workflow.Context) error { + ch := make(chan string) // Non-deterministic! + return nil +} + +// GOOD - Use workflow.Channel +func MyWorkflow(ctx workflow.Context) error { + ch := workflow.NewChannel(ctx) + return nil +} +``` + +```go +// BAD - Native select +select { +case val := <-ch1: + // handle +case val := <-ch2: + // handle +} + +// GOOD - Use workflow.Selector +selector := workflow.NewSelector(ctx) +selector.AddReceive(ch1, func(c workflow.ReceiveChannel, more bool) { + var val string + c.Receive(ctx, &val) + // handle +}) +selector.AddReceive(ch2, func(c workflow.ReceiveChannel, more bool) { + var val string + c.Receive(ctx, &val) + // handle +}) +selector.Select(ctx) +``` + +## Non-Deterministic Operations + +### Map Iteration + +```go +// BAD - Map range order is randomized +for k, v := range myMap { + // Non-deterministic order! +} + +// GOOD - Sort keys first +keys := make([]string, 0, len(myMap)) +for k := range myMap { + keys = append(keys, k) +} +sort.Strings(keys) +for _, k := range keys { + v := myMap[k] + // Deterministic order +} +``` + +### Time and Randomness + +```go +// BAD +t := time.Now() // System clock, non-deterministic +time.Sleep(time.Second) // Not replay-safe +r := rand.Intn(100) // Non-deterministic + +// GOOD +t := workflow.Now(ctx) // Deterministic +workflow.Sleep(ctx, time.Second) // Durable timer +encoded := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} { + return rand.Intn(100) +}) +var r int +encoded.Get(&r) +``` + +Use the `workflowcheck` static analysis tool to catch non-deterministic calls. For false positives, annotate with `//workflowcheck:ignore` on the line above. + +### Anonymous Functions as Local Activities + +**The Problem**: The Go SDK derives the local activity name from the function. Anonymous functions get a non-deterministic name that can change across builds, causing replay failures. + +```go +// BAD - anonymous function: name is non-deterministic +workflow.ExecuteLocalActivity(ctx, func(ctx context.Context) (string, error) { + return "result", nil +}) + +// GOOD - named function: stable, deterministic name +func QuickLookup(ctx context.Context) (string, error) { + return "result", nil +} + +workflow.ExecuteLocalActivity(ctx, QuickLookup) +``` + +Always use named functions for local activities (and regular activities). + +## Wrong Retry Classification + +**Example:** Transient network errors should be retried. Authentication errors should not be. +See `references/go/error-handling.md` for detailed guidance on error classification and retry policies. + +## Heartbeating + +### Forgetting to Heartbeat Long Activities + +```go +// BAD - No heartbeat, can't detect stuck activities or receive cancellation +func ProcessLargeFile(ctx context.Context, path string) error { + for _, chunk := range readChunks(path) { + process(chunk) // Takes hours, no heartbeat + } + return nil +} + +// GOOD - Regular heartbeats with progress +func ProcessLargeFile(ctx context.Context, path string) error { + for i, chunk := range readChunks(path) { + activity.RecordHeartbeat(ctx, fmt.Sprintf("Processing chunk %d", i)) + process(chunk) + } + return nil +} +``` + +### Heartbeat Timeout Too Short + +```go +// BAD - Heartbeat timeout shorter than processing time +ao := workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + HeartbeatTimeout: 10 * time.Second, // Too short! +} + +// GOOD - Heartbeat timeout allows for processing variance +ao := workflow.ActivityOptions{ + StartToCloseTimeout: 30 * time.Minute, + HeartbeatTimeout: 2 * time.Minute, +} +``` + +Set heartbeat timeout as high as acceptable for your use case -- each heartbeat counts as an action. + +## Cancellation + +### Not Handling Workflow Cancellation + +```go +// BAD - Cleanup doesn't run on cancellation +func BadWorkflow(ctx workflow.Context) error { + _ = workflow.ExecuteActivity(ctx, AcquireResource).Get(ctx, nil) + _ = workflow.ExecuteActivity(ctx, DoWork).Get(ctx, nil) + _ = workflow.ExecuteActivity(ctx, ReleaseResource).Get(ctx, nil) // Never runs if cancelled! + return nil +} + +// GOOD - Use defer with NewDisconnectedContext for cleanup +func GoodWorkflow(ctx workflow.Context) error { + defer func() { + if !errors.Is(ctx.Err(), workflow.ErrCanceled) { + return + } + newCtx, _ := workflow.NewDisconnectedContext(ctx) + _ = workflow.ExecuteActivity(newCtx, ReleaseResource).Get(newCtx, nil) + }() + + err := workflow.ExecuteActivity(ctx, AcquireResource).Get(ctx, nil) + if err != nil { + return err + } + return workflow.ExecuteActivity(ctx, DoWork).Get(ctx, nil) +} +``` + +### Not Handling Activity Cancellation + +Activities must **opt in** to receive cancellation. This requires: +1. **Heartbeating** - Cancellation is delivered via heartbeat +2. **Checking ctx.Done()** - Detect when cancellation arrives + +```go +// BAD - Activity ignores cancellation +func LongActivity(ctx context.Context) error { + doExpensiveWork() // Runs to completion even if cancelled + return nil +} + +// GOOD - Heartbeat and check ctx.Done() +func LongActivity(ctx context.Context) error { + for i, item := range items { + select { + case <-ctx.Done(): + cleanup() + return ctx.Err() + default: + activity.RecordHeartbeat(ctx, fmt.Sprintf("Processing item %d", i)) + process(item) + } + } + return nil +} +``` + +## Testing + +### Not Testing Failures + +It is important to make sure workflows work as expected under failure paths in addition to happy paths. Please see `references/go/testing.md` for more info. + +### Not Testing Replay + +Replay tests help you test that you do not have hidden sources of non-determinism bugs in your workflow code, and should be considered in addition to standard testing. Please see `references/go/testing.md` for more info. + +## Timers and Sleep + +### Using time.Sleep Instead of workflow.Sleep + +```go +// BAD: time.Sleep is not deterministic during replay +func BadWorkflow(ctx workflow.Context) error { + time.Sleep(60 * time.Second) // Non-deterministic! + return nil +} + +// GOOD: Use workflow.Sleep for deterministic timers +func GoodWorkflow(ctx workflow.Context) error { + workflow.Sleep(ctx, 60*time.Second) // Deterministic + return nil +} +``` + +### Using time.After Instead of workflow.NewTimer + +```go +// BAD: time.After is not replay-safe +func BadWorkflow(ctx workflow.Context) error { + <-time.After(5 * time.Minute) // Non-deterministic! + return nil +} + +// GOOD: Use workflow.NewTimer for durable timers +func GoodWorkflow(ctx workflow.Context) error { + timer := workflow.NewTimer(ctx, 5*time.Minute) + _ = timer.Get(ctx, nil) // Deterministic, durable + return nil +} +``` + +### Using time.Now() Instead of workflow.Now() + +```go +// BAD: time.Now() differs between execution and replay +deadline := time.Now().Add(24 * time.Hour) + +// GOOD: workflow.Now() is replay-safe +deadline := workflow.Now(ctx).Add(24 * time.Hour) +``` + +**Why this matters:** `time.Now()`, `time.Sleep()`, and `time.After()` use the system clock, which differs between original execution and replay. The `workflow.*` equivalents create durable, deterministic entries in the event history. diff --git a/references/go/observability.md b/references/go/observability.md new file mode 100644 index 0000000..ba55140 --- /dev/null +++ b/references/go/observability.md @@ -0,0 +1,153 @@ +# Go SDK Observability + +## Overview + +The Go SDK provides replay-safe logging via `workflow.GetLogger`, metrics via the Tally library with Prometheus export, and tracing via OpenTelemetry, OpenTracing, or Datadog. + +## Logging / Replay-Aware Logging + +### Workflow Logging + +Use `workflow.GetLogger(ctx)` for replay-safe logging. This logger automatically suppresses duplicate messages during replay. + +```go +func MyWorkflow(ctx workflow.Context, input string) (string, error) { + logger := workflow.GetLogger(ctx) + logger.Info("Workflow started", "input", input) + + var result string + err := workflow.ExecuteActivity(ctx, MyActivity, input).Get(ctx, &result) + if err != nil { + logger.Error("Activity failed", "error", err) + return "", err + } + + logger.Info("Workflow completed", "result", result) + return result, nil +} +``` + +The workflow logger automatically: +- Suppresses duplicate logs during replay +- Includes workflow context (workflow ID, run ID, etc.) + +### Activity Logging + +Use `activity.GetLogger(ctx)` for context-aware activity logging: + +```go +func MyActivity(ctx context.Context, input string) (string, error) { + logger := activity.GetLogger(ctx) + logger.Info("Processing input", "input", input) + // ... + return "done", nil +} +``` + +Activity logger includes: +- Activity ID, type, and task queue +- Workflow ID and run ID +- Attempt number (for retries) + +### Adding Persistent Fields + +Use `log.With` to create a logger with key-value pairs included in every entry: + +```go +logger := log.With(workflow.GetLogger(ctx), "orderId", orderId, "customerId", customerId) +logger.Info("Processing order") // includes orderId and customerId +``` + +## Customizing the Logger + +Set a custom logger via `client.Options{Logger: myLogger}`. Implement the `log.Logger` interface (Debug, Info, Warn, Error methods). + +### Using slog (Go 1.21+) + +```go +import ( + "log/slog" + "os" + + tlog "go.temporal.io/sdk/log" +) + +slogHandler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}) +logger := tlog.NewStructuredLogger(slog.New(slogHandler)) + +c, err := client.Dial(client.Options{ + Logger: logger, +}) +``` + +### Using Third-Party Loggers (Logrus, Zap, etc.) + +Use the [logur](https://github.com/logur/logur) adapter package: + +```go +import ( + "github.com/sirupsen/logrus" + logrusadapter "logur.dev/adapter/logrus" + "logur.dev/logur" +) + +logger := logur.LoggerToKV(logrusadapter.New(logrus.New())) +c, err := client.Dial(client.Options{ + Logger: logger, +}) +``` + +## Metrics + +Use the Tally library (`go.temporal.io/sdk/contrib/tally`) with Prometheus: + +```go +import ( + sdktally "go.temporal.io/sdk/contrib/tally" + "github.com/uber-go/tally/v4" + "github.com/uber-go/tally/v4/prometheus" +) + +func newPrometheusScope(c prometheus.Configuration) tally.Scope { + reporter, err := c.NewReporter( + prometheus.ConfigurationOptions{}, + ) + if err != nil { + log.Fatalln("error creating prometheus reporter", err) + } + scopeOpts := tally.ScopeOptions{ + CacheReporter: reporter, + Separator: "_", + SanitizeOptions: &sdktally.PrometheusSanitizeOptions, + } + scope, _ := tally.NewRootScope(scopeOpts, time.Second) + scope = sdktally.NewPrometheusNamingScope(scope) + return scope +} + +c, err := client.Dial(client.Options{ + MetricsHandler: sdktally.NewMetricsHandler(newPrometheusScope(prometheus.Configuration{ + ListenAddress: "0.0.0.0:9090", + TimerType: "histogram", + })), +}) +``` + +Key SDK metrics: +- `temporal_workflow_task_execution_latency` -- Workflow task processing time +- `temporal_activity_execution_latency` -- Activity execution time +- `temporal_workflow_task_replay_latency` -- Replay duration +- `temporal_request` -- Client requests to server +- `temporal_activity_schedule_to_start_latency` -- Time from scheduling to start + +## Search Attributes (Visibility) + +See the Search Attributes section of `references/go/data-handling.md` + +## Best Practices + +1. Always use `workflow.GetLogger(ctx)` in workflows -- never `fmt.Println` or `log.Println` (they produce duplicates on replay) +2. Use `activity.GetLogger(ctx)` in activities for structured context +3. Set up Prometheus metrics in production +4. Use search attributes for operational visibility and debugging +5. Use `workflow.IsReplaying(ctx)` only for custom side-effect-free logging -- the built-in logger handles replay suppression automatically diff --git a/references/go/patterns.md b/references/go/patterns.md new file mode 100644 index 0000000..732083f --- /dev/null +++ b/references/go/patterns.md @@ -0,0 +1,536 @@ +# Go SDK Patterns + +## Signals + +In Go, signals are received via channels, not handler functions. + +```go +func OrderWorkflow(ctx workflow.Context) (string, error) { + approved := false + var items []string + + approveCh := workflow.GetSignalChannel(ctx, "approve") + addItemCh := workflow.GetSignalChannel(ctx, "add-item") + + // Listen for signals in a goroutine so workflow can proceed + workflow.Go(ctx, func(ctx workflow.Context) { + for { + selector := workflow.NewSelector(ctx) + selector.AddReceive(approveCh, func(c workflow.ReceiveChannel, more bool) { + c.Receive(ctx, &approved) + }) + selector.AddReceive(addItemCh, func(c workflow.ReceiveChannel, more bool) { + var item string + c.Receive(ctx, &item) + items = append(items, item) + }) + selector.Select(ctx) + } + }) + + // Wait for approval + workflow.Await(ctx, func() bool { return approved }) + return fmt.Sprintf("Processed %d items", len(items)), nil +} +``` + +### Blocking receive from a single channel + +When waiting on a single signal, no Selector is needed: + +```go +var approveInput ApproveInput +workflow.GetSignalChannel(ctx, "approve").Receive(ctx, &approveInput) +``` + +## Queries + +**Important:** Queries must NOT modify workflow state. Query handlers run outside workflow context -- do not call `workflow.Go()`, `workflow.NewChannel()`, or any blocking workflow functions. + +```go +func StatusWorkflow(ctx workflow.Context) error { + currentState := "started" + progress := 0 + + err := workflow.SetQueryHandler(ctx, "get-status", func() (string, error) { + return currentState, nil + }) + if err != nil { + return err + } + + err = workflow.SetQueryHandler(ctx, "get-progress", func() (int, error) { + return progress, nil + }) + if err != nil { + return err + } + + // Workflow logic updates currentState and progress as it runs + currentState = "running" + for i := 0; i < 100; i++ { + progress = i + err := workflow.ExecuteActivity( + workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: time.Minute, + }), + ProcessItem, i, + ).Get(ctx, nil) + if err != nil { + currentState = "failed" + return err + } + } + currentState = "done" + return nil +} +``` + +## Updates + +```go +func OrderWorkflow(ctx workflow.Context) (int, error) { + var items []string + + err := workflow.SetUpdateHandlerWithOptions( + ctx, + "add-item", + func(ctx workflow.Context, item string) (int, error) { + // Handler can mutate workflow state and return a value + items = append(items, item) + return len(items), nil + }, + workflow.UpdateHandlerOptions{ + Validator: func(ctx workflow.Context, item string) error { + if item == "" { + return fmt.Errorf("item cannot be empty") + } + if len(items) >= 100 { + return fmt.Errorf("order is full") + } + return nil + }, + }, + ) + if err != nil { + return 0, err + } + + // Block until cancelled + _ = ctx.Done().Receive(ctx, nil) + return len(items), nil +} +``` + +**Important:** Validators must NOT mutate workflow state or do anything blocking (no activities, sleeps, or other commands). They are read-only, similar to query handlers. Return an error to reject the update; return `nil` to accept. + +## Child Workflows + +```go +func ParentWorkflow(ctx workflow.Context, orders []Order) ([]string, error) { + cwo := workflow.ChildWorkflowOptions{ + WorkflowExecutionTimeout: 30 * time.Minute, + } + ctx = workflow.WithChildOptions(ctx, cwo) + + var results []string + for _, order := range orders { + var result string + err := workflow.ExecuteChildWorkflow(ctx, ProcessOrderWorkflow, order).Get(ctx, &result) + if err != nil { + return nil, err + } + results = append(results, result) + } + return results, nil +} +``` + +### Child Workflow Options + +```go +import enumspb "go.temporal.io/api/enums/v1" + +cwo := workflow.ChildWorkflowOptions{ + WorkflowID: fmt.Sprintf("child-%s", workflow.GetInfo(ctx).WorkflowExecution.ID), + + // ParentClosePolicy - what happens to child when parent closes + // PARENT_CLOSE_POLICY_TERMINATE (default), PARENT_CLOSE_POLICY_ABANDON, PARENT_CLOSE_POLICY_REQUEST_CANCEL + ParentClosePolicy: enumspb.PARENT_CLOSE_POLICY_ABANDON, + + WorkflowExecutionTimeout: 10 * time.Minute, + WorkflowTaskTimeout: time.Minute, +} +ctx = workflow.WithChildOptions(ctx, cwo) + +future := workflow.ExecuteChildWorkflow(ctx, ChildWorkflow, input) + +// Wait for child to start (important for ABANDON policy) +if err := future.GetChildWorkflowExecution().Get(ctx, nil); err != nil { + return err +} +``` + +## Handles to External Workflows + +```go +func CoordinatorWorkflow(ctx workflow.Context, targetWorkflowID string) error { + // Signal an external workflow + err := workflow.SignalExternalWorkflow(ctx, targetWorkflowID, "", "data-ready", payload).Get(ctx, nil) + if err != nil { + return err + } + + // Cancel an external workflow + err = workflow.RequestCancelExternalWorkflow(ctx, targetWorkflowID, "").Get(ctx, nil) + return err +} +``` + +## Parallel Execution + +Use `workflow.Go` to launch parallel work and `workflow.Selector` to collect results. + +```go +func ParallelWorkflow(ctx workflow.Context, items []string) ([]string, error) { + actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + + // Launch activities in parallel + futures := make([]workflow.Future, len(items)) + for i, item := range items { + futures[i] = workflow.ExecuteActivity(actCtx, ProcessItem, item) + } + + // Collect all results + results := make([]string, len(items)) + for i, future := range futures { + if err := future.Get(ctx, &results[i]); err != nil { + return nil, err + } + } + return results, nil +} +``` + +### Using workflow.Go for background goroutines + +```go +ch := workflow.NewChannel(ctx) + +workflow.Go(ctx, func(ctx workflow.Context) { + // Background work + var result string + _ = workflow.ExecuteActivity(actCtx, SomeActivity).Get(ctx, &result) + ch.Send(ctx, result) +}) + +var result string +ch.Receive(ctx, &result) +``` + +## Selector Pattern + +`workflow.Selector` replaces Go's native `select` -- required for deterministic workflow execution. Use it to wait on multiple channels, futures, and timers simultaneously. + +```go +func ApprovalWorkflow(ctx workflow.Context) (string, error) { + actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + + var outcome string + signalCh := workflow.GetSignalChannel(ctx, "approve") + actFuture := workflow.ExecuteActivity(actCtx, AutoReviewActivity) + + // Cancel timer if signal or activity wins + timerCtx, cancelTimer := workflow.WithCancel(ctx) + timer := workflow.NewTimer(timerCtx, 24*time.Hour) + + selector := workflow.NewSelector(ctx) + + // Branch 1: Signal received + selector.AddReceive(signalCh, func(c workflow.ReceiveChannel, more bool) { + var approved bool + c.Receive(ctx, &approved) + cancelTimer() + if approved { + outcome = "approved-by-signal" + } else { + outcome = "rejected-by-signal" + } + }) + + // Branch 2: Activity completed + selector.AddFuture(actFuture, func(f workflow.Future) { + var result string + _ = f.Get(ctx, &result) + cancelTimer() + outcome = result + }) + + // Branch 3: Timeout + selector.AddFuture(timer, func(f workflow.Future) { + if err := f.Get(ctx, nil); err == nil { + outcome = "timed-out" + } + // If timer was cancelled, err is CanceledError -- ignore + }) + + selector.Select(ctx) // Blocks until one branch fires + return outcome, nil +} +``` + +Key points: +- `AddReceive(channel, callback)` -- fires when a channel has a message (must consume with `c.Receive`) +- `AddFuture(future, callback)` -- fires when a future resolves (once per Selector) +- `AddDefault(callback)` -- fires immediately if nothing else is ready +- `Select(ctx)` -- blocks until one branch fires; call multiple times to process multiple events + +## Continue-as-New + +```go +func LongRunningWorkflow(ctx workflow.Context, state WorkflowState) (string, error) { + for { + state = processBatch(ctx, state) + + if state.IsComplete { + return "done", nil + } + + // Check if history is getting large + if workflow.GetInfo(ctx).GetContinueAsNewSuggested() { + return "", workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state) + } + } +} +``` + +Drain signals before continue-as-new to avoid signal loss: + +```go +for { + var signalVal string + ok := signalChan.ReceiveAsync(&signalVal) + if !ok { + break + } + // process signal +} +return "", workflow.NewContinueAsNewError(ctx, LongRunningWorkflow, state) +``` + +## Cancellation Handling + +Use `ctx.Done()` to detect cancellation and `workflow.NewDisconnectedContext` for cleanup that must run even after cancellation. + +```go +func MyWorkflow(ctx workflow.Context) error { + actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: time.Hour, + }) + + err := workflow.ExecuteActivity(actCtx, LongRunningActivity).Get(ctx, nil) + if err != nil && temporal.IsCanceledError(ctx.Err()) { + // Workflow was cancelled -- run cleanup with a disconnected context + workflow.GetLogger(ctx).Info("Workflow cancelled, running cleanup") + disconnectedCtx, _ := workflow.NewDisconnectedContext(ctx) + disconnectedCtx = workflow.WithActivityOptions(disconnectedCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + _ = workflow.ExecuteActivity(disconnectedCtx, CleanupActivity).Get(disconnectedCtx, nil) + return err // Return CanceledError + } + return err +} +``` + +## Saga Pattern (Compensations) + +**Important:** Compensation activities should be idempotent -- they may be retried (as with ALL activities). + +Use `workflow.NewDisconnectedContext` when running compensations so they execute even if the workflow is cancelled. + +```go +func OrderWorkflow(ctx workflow.Context, order Order) (string, error) { + actCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + + var compensations []func(ctx workflow.Context) error + + // Helper to run all compensations in reverse, using a disconnected context + // so compensations run even if the workflow is cancelled. + runCompensations := func() { + disconnectedCtx, _ := workflow.NewDisconnectedContext(ctx) + compCtx := workflow.WithActivityOptions(disconnectedCtx, workflow.ActivityOptions{ + StartToCloseTimeout: 5 * time.Minute, + }) + for i := len(compensations) - 1; i >= 0; i-- { + if err := compensations[i](compCtx); err != nil { + workflow.GetLogger(ctx).Error("Compensation failed", "error", err) + } + } + } + + // Register compensation BEFORE running the activity. + // If the activity completes the effect but fails on return, + // we still need the compensation. + compensations = append(compensations, func(ctx workflow.Context) error { + return workflow.ExecuteActivity(ctx, ReleaseInventoryIfReserved, order).Get(ctx, nil) + }) + if err := workflow.ExecuteActivity(actCtx, ReserveInventory, order).Get(ctx, nil); err != nil { + runCompensations() + return "", err + } + + compensations = append(compensations, func(ctx workflow.Context) error { + return workflow.ExecuteActivity(ctx, RefundPaymentIfCharged, order).Get(ctx, nil) + }) + if err := workflow.ExecuteActivity(actCtx, ChargePayment, order).Get(ctx, nil); err != nil { + runCompensations() + return "", err + } + + if err := workflow.ExecuteActivity(actCtx, ShipOrder, order).Get(ctx, nil); err != nil { + runCompensations() + return "", err + } + + return "Order completed", nil +} +``` + +## Wait Condition with Timeout + +```go +func ApprovalWorkflow(ctx workflow.Context) (string, error) { + approved := false + + // Set up signal handler + workflow.Go(ctx, func(ctx workflow.Context) { + workflow.GetSignalChannel(ctx, "approve").Receive(ctx, &approved) + }) + + // Wait with 24-hour timeout -- returns (conditionMet, error) + conditionMet, err := workflow.AwaitWithTimeout(ctx, 24*time.Hour, func() bool { + return approved + }) + if err != nil { + return "", err + } + + if conditionMet { + return "approved", nil + } + return "auto-rejected due to timeout", nil +} +``` + +Without timeout: + +```go +err := workflow.Await(ctx, func() bool { return ready }) +``` + +## Waiting for All Handlers to Finish + +Signal and update handlers may run activities asynchronously. Use `workflow.Await` with `workflow.AllHandlersFinished` before completing or continuing-as-new to prevent the workflow from closing while handlers are still running. + +```go +func MyWorkflow(ctx workflow.Context) (string, error) { + // ... register handlers, main workflow logic ... + + // Before exiting, wait for all handlers to finish + err := workflow.Await(ctx, func() bool { + return workflow.AllHandlersFinished(ctx) + }) + if err != nil { + return "", err + } + return "done", nil +} +``` + +## Activity Heartbeat Details + +### WHY: +- **Support activity cancellation** -- Cancellations are delivered via heartbeat; activities that don't heartbeat won't know they've been cancelled +- **Resume progress after worker failure** -- Heartbeat details persist across retries + +### WHEN: +- **Cancellable activities** -- Any activity that should respond to cancellation +- **Long-running activities** -- Track progress for resumability +- **Checkpointing** -- Save progress periodically + +```go +func ProcessLargeFile(ctx context.Context, filePath string) (string, error) { + // Recover from previous attempt + startIdx := 0 + if activity.HasHeartbeatDetails(ctx) { + if err := activity.GetHeartbeatDetails(ctx, &startIdx); err == nil { + startIdx++ // Resume from next item + } + } + + lines := readFileLines(filePath) + + for i := startIdx; i < len(lines); i++ { + processLine(lines[i]) + + // Heartbeat with progress -- if cancelled, ctx will be cancelled + activity.RecordHeartbeat(ctx, i) + + if ctx.Err() != nil { + // Activity was cancelled + cleanup() + return "", ctx.Err() + } + } + + return "completed", nil +} +``` + +## Timers + +```go +func TimerWorkflow(ctx workflow.Context) (string, error) { + // Simple sleep + err := workflow.Sleep(ctx, time.Hour) + if err != nil { + return "", err + } + + // Timer as a Future -- for use with Selector + timerCtx, cancelTimer := workflow.WithCancel(ctx) + timer := workflow.NewTimer(timerCtx, 30*time.Minute) + + // Cancel the timer when no longer needed + cancelTimer() + + return "Timer fired", nil +} +``` + +## Local Activities + +**Purpose**: Reduce latency for short, lightweight operations by skipping the task queue. ONLY use these when necessary for performance. Do NOT use these by default, as they are not durable and distributed. + +```go +func MyWorkflow(ctx workflow.Context) (string, error) { + lao := workflow.LocalActivityOptions{ + StartToCloseTimeout: 5 * time.Second, + } + ctx = workflow.WithLocalActivityOptions(ctx, lao) + + var result string + err := workflow.ExecuteLocalActivity(ctx, QuickLookup, "key").Get(ctx, &result) + if err != nil { + return "", err + } + return result, nil +} +``` diff --git a/references/go/testing.md b/references/go/testing.md new file mode 100644 index 0000000..ab74bbd --- /dev/null +++ b/references/go/testing.md @@ -0,0 +1,238 @@ +# Go SDK Testing + +## Overview + +The Go SDK provides the `testsuite` package for testing Workflows and Activities. It uses the [testify](https://github.com/stretchr/testify) library for assertions (`assert`/`require`) and mocking (`mock`). The test environment supports automatic time-skipping for Workflows with timers. + +## Test Environment Setup + +Two approaches: struct-based with `suite.Suite` or function-based with `testsuite.NewTestWorkflowEnvironment()`. + +**Approach 1: Struct-based (testify suite)** + +```go +package sample + +import ( + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "go.temporal.io/sdk/testsuite" +) + +type UnitTestSuite struct { + suite.Suite + testsuite.WorkflowTestSuite + + env *testsuite.TestWorkflowEnvironment +} + +func (s *UnitTestSuite) SetupTest() { + s.env = s.NewTestWorkflowEnvironment() +} + +func (s *UnitTestSuite) AfterTest(suiteName, testName string) { + s.env.AssertExpectations(s.T()) +} + +func (s *UnitTestSuite) Test_MyWorkflow_Success() { + s.env.ExecuteWorkflow(MyWorkflow, "input") + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) +} + +func TestUnitTestSuite(t *testing.T) { + suite.Run(t, new(UnitTestSuite)) +} +``` + +**Approach 2: Function-based** + +```go +package sample + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/testsuite" +) + +func Test_MyWorkflow(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestWorkflowEnvironment() + env.RegisterActivity(MyActivity) + + env.ExecuteWorkflow(MyWorkflow, "input") + assert.True(t, env.IsWorkflowCompleted()) + assert.NoError(t, env.GetWorkflowError()) + + var result string + assert.NoError(t, env.GetWorkflowResult(&result)) + assert.Equal(t, "expected", result) +} +``` + +You must register all Activity Definitions used by the Workflow with `env.RegisterActivity(ActivityFunc)`. The Workflow itself does not need to be registered. + +## Activity Mocking + +Mock activities with `env.OnActivity()` to test Workflow logic in isolation. + +**Return mock values:** + +```go +env.OnActivity(MyActivity, mock.Anything, mock.Anything).Return("mock_result", nil) +``` + +**Return a function replacement** (for parameter validation or custom logic): + +```go +env.OnActivity(MyActivity, mock.Anything, mock.Anything).Return( + func(ctx context.Context, input string) (string, error) { + // Custom logic, assertions, etc. + return "computed_result", nil + }, +) +``` + +**Match specific arguments:** + +```go +env.OnActivity(MyActivity, mock.Anything, "specific_input").Return("result", nil) +``` + +When using mocks, you do not need to call `env.RegisterActivity()` for that Activity. The mock signature must match the original Activity function signature. + +## Testing Signals and Queries + +Use `RegisterDelayedCallback` to send Signals during Workflow execution. Use `QueryWorkflow` to test query handlers. + +```go +func (s *UnitTestSuite) Test_SignalsAndQueries() { + // Register a delayed callback to send a signal after 5 seconds + s.env.RegisterDelayedCallback(func() { + s.env.SignalWorkflow("approve", SignalData{Approved: true}) + }, time.Second*5) + + s.env.ExecuteWorkflow(ApprovalWorkflow, input) + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) +} +``` + +**Query a running Workflow** (must be called inside `RegisterDelayedCallback` or after `ExecuteWorkflow`): + +```go +s.env.RegisterDelayedCallback(func() { + res, err := s.env.QueryWorkflow("getProgress") + s.NoError(err) + + var progress int + err = res.Get(&progress) + s.NoError(err) + s.Equal(50, progress) +}, time.Second*10+time.Millisecond) +``` + +`QueryWorkflow` returns a `converter.EncodedValue`. Use `.Get(&result)` to decode the value. + +For "Signal-With-Start" testing, set the delay to `0`. + +## Testing Failure Cases + +```go +func (s *UnitTestSuite) Test_WorkflowFailure() { + // Mock activity to return an error + s.env.OnActivity(MyActivity, mock.Anything, mock.Anything).Return( + "", errors.New("activity failed")) + + s.env.ExecuteWorkflow(MyWorkflow, "input") + + s.True(s.env.IsWorkflowCompleted()) + + err := s.env.GetWorkflowError() + s.Error(err) + + var applicationErr *temporal.ApplicationError + s.True(errors.As(err, &applicationErr)) + s.Equal("activity failed", applicationErr.Error()) +} +``` + +`env.GetWorkflowError()` returns the Workflow error. Use `errors.As(err, &applicationErr)` to check the error type. Mock activities returning errors to test Workflow error-handling paths. + +## Replay Testing + +Use `worker.NewWorkflowReplayer()` to verify that code changes do not break determinism. Load history from a JSON file exported via the Temporal CLI or Web UI. + +```go +package sample + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "go.temporal.io/sdk/worker" +) + +func Test_ReplayFromFile(t *testing.T) { + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(MyWorkflow) + + err := replayer.ReplayWorkflowHistoryFromJSONFile(nil, "my_workflow_history.json") + assert.NoError(t, err) +} +``` + +Export history via CLI: `temporal workflow show --workflow-id --output json > history.json` + +**Replay from a programmatically fetched history:** + +```go +func Test_ReplayFromServer(t *testing.T) { + // Fetch history from the server + hist, err := GetWorkflowHistory(ctx, client, workflowID, runID) + assert.NoError(t, err) + + replayer := worker.NewWorkflowReplayer() + replayer.RegisterWorkflow(MyWorkflow) + + err = replayer.ReplayWorkflowHistory(nil, hist) + assert.NoError(t, err) +} +``` + +## Activity Testing + +Test Activities in isolation using `TestActivityEnvironment`. No Worker or Workflow needed. + +```go +func Test_MyActivity(t *testing.T) { + testSuite := &testsuite.WorkflowTestSuite{} + env := testSuite.NewTestActivityEnvironment() + env.RegisterActivity(MyActivity) + + val, err := env.ExecuteActivity(MyActivity, "input") + assert.NoError(t, err) + + var result string + assert.NoError(t, val.Get(&result)) + assert.Equal(t, "expected_output", result) +} +``` + +`ExecuteActivity` returns `(converter.EncodedValue, error)`. Use `val.Get(&result)` to extract the typed result. The Activity executes synchronously in the calling goroutine. + +## Best Practices + +1. Register all Activities used by the Workflow with `env.RegisterActivity()`, unless you mock them with `env.OnActivity()` +2. Use mocks to isolate Workflow logic from Activity implementations +3. Test failure paths by mocking Activities that return errors +4. Use replay testing before deploying Workflow code changes to catch non-determinism errors +5. Use unique task queues per test when running integration tests +6. Call `env.AssertExpectations(s.T())` in `AfterTest` to verify all mocks were called diff --git a/references/go/versioning.md b/references/go/versioning.md new file mode 100644 index 0000000..b6b6c27 --- /dev/null +++ b/references/go/versioning.md @@ -0,0 +1,232 @@ +# Go SDK Versioning + +For conceptual overview and guidance on choosing an approach, see `references/core/versioning.md`. + +## GetVersion API + +`workflow.GetVersion` safely performs backwards-incompatible changes to Workflow Definitions. It returns the version to branch on, recording the result as a marker in the Event History. + +```go +v := workflow.GetVersion(ctx, "changeID", workflow.DefaultVersion, maxSupported) +``` + +- `changeID`: unique string identifying the change +- `minSupported`: oldest version still supported (`workflow.DefaultVersion` is `-1`) +- `maxSupported`: current/newest version +- Returns `maxSupported` for new executions; returns the recorded version on replay + +### Three-Step Lifecycle + +**Step 1: Add GetVersion with both code paths** + +Original code calls `ActivityA`. You want to replace it with `ActivityC`: + +```go +v := workflow.GetVersion(ctx, "Step1", workflow.DefaultVersion, 1) +if v == workflow.DefaultVersion { + // Old code path (for replay of existing workflows) + err = workflow.ExecuteActivity(ctx, ActivityA, data).Get(ctx, &result1) +} else { + // New code path + err = workflow.ExecuteActivity(ctx, ActivityC, data).Get(ctx, &result1) +} +``` + +For new executions, `GetVersion` returns `1` and records a marker. For replay of pre-change workflows (no marker), it returns `DefaultVersion` (`-1`). + +**Step 2: Remove old branch (increase minSupported)** + +After all `DefaultVersion` Workflow Executions have completed: + +```go +v := workflow.GetVersion(ctx, "Step1", 1, 1) +// Only the new code path remains +err = workflow.ExecuteActivity(ctx, ActivityC, data).Get(ctx, &result1) +``` + +Keep the `GetVersion` call even with a single branch. This ensures: +1. If an older execution replays on this code, it fails fast instead of proceeding incorrectly +2. If you need further changes, you just bump `maxSupported` + +**Step 3: Further changes (bump maxSupported)** + +Later, replace `ActivityC` with `ActivityD`: + +```go +v := workflow.GetVersion(ctx, "Step1", 1, 2) +if v == 1 { + err = workflow.ExecuteActivity(ctx, ActivityC, data).Get(ctx, &result1) +} else { + err = workflow.ExecuteActivity(ctx, ActivityD, data).Get(ctx, &result1) +} +``` + +After all version-1 executions complete, collapse again: + +```go +_ = workflow.GetVersion(ctx, "Step1", 2, 2) +err = workflow.ExecuteActivity(ctx, ActivityD, data).Get(ctx, &result1) +``` + +### Using GetVersion in Loops + +The return value for a given `changeID` is immutable once recorded. In loops, append the iteration number to the `changeID`: + +```go +for i := 0; i < 10; i++ { + v := workflow.GetVersion(ctx, fmt.Sprintf("myChange-%d", i), workflow.DefaultVersion, 1) + if v == workflow.DefaultVersion { + // old path + } else { + // new path + } +} +``` + +## Workflow Type Versioning + +Create a new Workflow Type for incompatible changes: + +```go +// Original +func MyWorkflow(ctx workflow.Context, input Input) (string, error) { + // v1 implementation +} + +// New version +func MyWorkflowV2(ctx workflow.Context, input Input) (string, error) { + // v2 implementation +} +``` + +Register both with the Worker: + +```go +w := worker.New(c, "my-task-queue", worker.Options{}) +w.RegisterWorkflow(MyWorkflow) +w.RegisterWorkflow(MyWorkflowV2) +``` + +Route new executions to the new type. Old workflows continue on the old type. Check for open executions before removing the old type: + +```bash +temporal workflow list --query 'WorkflowType = "MyWorkflow" AND ExecutionStatus = "Running"' +``` + +## Worker Versioning + +Worker Versioning manages versions at the deployment level, allowing multiple Worker versions to run simultaneously. + +### Key Concepts + +**Worker Deployment**: A logical service grouping similar Workers together (e.g., "loan-processor"). All versions of your code live under this umbrella. + +**Worker Deployment Version**: A specific snapshot of your code identified by a deployment name and Build ID (e.g., "loan-processor:v1.0" or "loan-processor:abc123"). + +### Configuring Workers for Versioning + +```go +w := worker.New(c, "my-task-queue", worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: worker.WorkerDeploymentVersion{ + DeploymentName: "my-service", + BuildId: "v1.0.0", // or git commit hash + }, + DefaultVersioningBehavior: workflow.VersioningBehaviorPinned, + }, +}) +``` + +**Configuration fields:** +- `UseVersioning`: enables Worker Versioning +- `Version`: identifies the Worker Deployment Version (deployment name + build ID) +- `DefaultVersioningBehavior`: `VersioningBehaviorPinned` or `VersioningBehaviorAutoUpgrade` +- Build ID: typically a git commit hash, version number, or timestamp + +### PINNED vs AUTO_UPGRADE Behaviors + +**PINNED Behavior** + +Workflows stay locked to their original Worker version. + +**When to use PINNED:** +- Short-running workflows (minutes to hours) +- Consistency is critical (e.g., financial transactions) +- You want to eliminate version compatibility complexity +- Building new applications and want simplest development experience + +**AUTO_UPGRADE Behavior** + +Workflows can move to newer versions. + +**When to use AUTO_UPGRADE:** +- Long-running workflows (weeks or months) +- Workflows need to benefit from bug fixes during execution +- Migrating from traditional rolling deployments +- You are already using GetVersion for version transitions + +**Important:** AUTO_UPGRADE workflows still need GetVersion to handle version transitions safely since they can move between Worker versions. + +### Worker Configuration with Default Behavior + +```go +// For short-running workflows, prefer PINNED +w := worker.New(c, "orders-task-queue", worker.Options{ + DeploymentOptions: worker.DeploymentOptions{ + UseVersioning: true, + Version: worker.WorkerDeploymentVersion{ + DeploymentName: "order-service", + BuildId: os.Getenv("BUILD_ID"), + }, + DefaultVersioningBehavior: workflow.VersioningBehaviorPinned, + }, +}) +``` + +### Deployment Strategies + +**Blue-Green Deployments** + +Maintain two environments and switch traffic between them: +1. Deploy new code to idle environment +2. Run tests and validation +3. Switch traffic to new environment +4. Keep old environment for instant rollback + +**Rainbow Deployments** + +Multiple versions run simultaneously: +- New workflows use latest version +- Existing workflows complete on their original version +- Add new versions alongside existing ones +- Gradually sunset old versions as workflows complete + +This works well with Kubernetes where you manage multiple ReplicaSets running different Worker versions. + +Deploy a new version, then set it as current: + +```bash +temporal worker deployment set-current-version \ + --deployment-name my-service \ + --build-id v2.0.0 +``` + +### Querying Workflows by Worker Version + +```bash +# Find workflows on a specific Worker version +temporal workflow list --query \ + 'TemporalWorkerDeploymentVersion = "my-service:v1.0.0" AND ExecutionStatus = "Running"' +``` + +## Best Practices + +1. **Keep GetVersion calls** even when only a single branch remains -- it guards against stale replays and simplifies future changes +2. **Use `TemporalChangeVersion` search attribute** to find Workflows running on old versions: + ```bash + temporal workflow list --query \ + 'WorkflowType = "MyWorkflow" AND ExecutionStatus = "Running" AND TemporalChangeVersion = "Step1"' + ``` +3. **Test with replay** before removing old branches to verify determinism is preserved +4. **Prefer Worker Versioning** for large-scale deployments to avoid accumulating patching branches diff --git a/references/python/patterns.md b/references/python/patterns.md index 762977b..91ea80a 100644 --- a/references/python/patterns.md +++ b/references/python/patterns.md @@ -106,6 +106,8 @@ class OrderWorkflow: raise ValueError("Order is full") ``` +**Important:** Validators must NOT mutate workflow state or do anything blocking (no activities, sleeps, or other commands). They are read-only, similar to query handlers. Raise an exception to reject the update; return `None` to accept. + ## Child Workflows ```python diff --git a/references/typescript/patterns.md b/references/typescript/patterns.md index 878f9f0..fd7afb8 100644 --- a/references/typescript/patterns.md +++ b/references/typescript/patterns.md @@ -132,6 +132,8 @@ export async function orderWorkflow(): Promise { } ``` +**Important:** Validators must NOT mutate workflow state or do anything blocking (no activities, sleeps, or other commands). They are read-only, similar to query handlers. Throw an error to reject the update; return normally to accept. + ## Child Workflows ```typescript