-
Notifications
You must be signed in to change notification settings - Fork 147
Open
Labels
bugSomething isn't workingSomething isn't working
Description
What are you really trying to do?
I have a parent workflow which spawns multiple child workflows. I want to write a test to make sure that cancelling one child workflow doesn't affect the execution of others. Also, I have an activity to be executed on child workflow failures excluding Cancelled errors that I want to test as well.
Describe the bug
When trying to cancel a workflow I keep getting a warning from temporal_sdk_core about an invalid transition of state:
2026-01-16T16:29:02.805388Z WARN temporal_sdk_core::worker::workflow::managed_run: Error while updating workflow error=Fatal("ActivityMachine in state ScheduledActivityCancelCommandCreated says the transition is invalid during event EventInfo {
event_id: 10, event_type: ActivityTaskStarted }")
2026-01-16T16:29:02.805454Z WARN temporal_sdk_core::worker::workflow: Failing workflow task run_id=61d7166e-1f0f-4f45-8646-2ef967ee51c0 failure=Failure { failure: Some(Failure { message: "Fatal error in workflow machines: ActivityMachine in stat
e ScheduledActivityCancelCommandCreated says the transition is invalid during event EventInfo { event_id: 10, event_type: ActivityTaskStarted }", source: "", stack_trace: "", encoded_attributes: None, cause: None, failure_info: Some(ApplicationFa
ilureInfo(ApplicationFailureInfo { r#type: "", non_retryable: false, details: None, next_retry_delay: None })) }), force_cause: Unspecified }
As a result, the workflow is not cancelled and the await handle.cancel() call doesn't raise any exception.
Minimal Reproduction
I wrote the following reproducer based on your samples. It should fail on every run, but it's possible to be a little bit flaky.
import asyncio
import logging
import pytest
import uuid
from dataclasses import dataclass
from datetime import timedelta
from temporalio import activity, workflow
from temporalio.client import WorkflowExecutionStatus
from temporalio.exceptions import (
ChildWorkflowError,
is_cancelled_exception,
)
from temporalio.service import RPCError
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker
@activity.defn
async def get_string() -> str:
return "blah"
@dataclass
class ComposeGreetingInput:
greeting: str
name: str
@workflow.defn
class ComposeGreetingWorkflow1:
@workflow.run
async def run(self, input: ComposeGreetingInput) -> str:
logging.info(f"start child: {input.greeting}")
r = await workflow.execute_activity(
get_string,
start_to_close_timeout=timedelta(seconds=10),
)
logging.info(f"{input.greeting}: {r}")
return f"{input.greeting}, {input.name}!"
@dataclass
class ErrorInput:
errors: list
@activity.defn
async def return_errors(input: ErrorInput):
activity.logger.info("Running activity with parameter %s" % input)
logging.info(f"Errors: {input.errors}")
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> None:
logging.info("start parent wf")
results = []
errors = []
greetings = ("hello", "ciao", "hola")
wfs = [
workflow.execute_child_workflow(
ComposeGreetingWorkflow1.run,
arg=ComposeGreetingInput(greeting, name),
id=f"{greeting}-wf-id",
parent_close_policy=workflow.ParentClosePolicy.REQUEST_CANCEL,
)
for greeting in greetings
]
for wf_execution in workflow.as_completed(wfs):
try:
result = await wf_execution
results.append(result)
except ChildWorkflowError as ex:
logging.info(ex)
if not is_cancelled_exception(ex):
errors.append(ex)
if errors:
await workflow.execute_activity(
return_errors,
arg=ErrorInput(errors),
start_to_close_timeout=timedelta(seconds=10),
)
logging.info(f"Results: {results}")
@pytest.mark.asyncio
class TestChildWorkflowCancellation:
async def test_wf_cancellation(self):
async with await WorkflowEnvironment.start_time_skipping() as env:
async with Worker(
env.client,
task_queue="main",
workflows=[
GreetingWorkflow,
ComposeGreetingWorkflow1,
],
activities=[get_string, return_errors],
):
handle = await env.client.start_workflow(
GreetingWorkflow.run,
"World",
id=str(uuid.uuid4()),
task_queue="main",
)
hello_wf_handle = env.client.get_workflow_handle("hello-wf-id")
hola_wf_handle = env.client.get_workflow_handle("hola-wf-id")
ciao_wf_handle = env.client.get_workflow_handle("ciao-wf-id")
await asyncio.wait_for(
cancel_workflow_immediately(ciao_wf_handle), timeout=5
)
await handle.result()
assert (
await get_workflow_status(hello_wf_handle)
) == WorkflowExecutionStatus.COMPLETED
assert (
await get_workflow_status(hola_wf_handle)
) == WorkflowExecutionStatus.COMPLETED
async def cancel_workflow_immediately(handle):
"""Calls `await handle.cancel()` until it succeeds.
Use this with `asyncio.wait_for(cancel_workflow_immediately(h), timeout=X)`
to specify a timeout and avoid to potentially run it forever.
The function will then assert that the status of the workflow is canceled.
"""
while (
status := await get_workflow_status(handle)
) != WorkflowExecutionStatus.RUNNING:
continue
while True:
try:
await handle.cancel()
break
except RPCError:
continue
# Iterate until the workflow is not running anymore: the cancellation is not
# an immediate action and might take a bit of time.
while (
status := await get_workflow_status(handle)
) == WorkflowExecutionStatus.RUNNING:
continue
assert status == WorkflowExecutionStatus.CANCELED, (
f"Workflow not in Canceled status. Status: {status}"
)
async def get_workflow_status(handle) -> WorkflowExecutionStatus:
while True:
try:
return (await handle.describe()).status
except RPCError:
continueEnvironment/Versions
- OS and processor: Linux
- Python SDK version: 1.8.0
- Are you using Docker or Kubernetes or building Temporal from source? No
I don't know if it belongs here or rather in the https://github.com/temporalio/sdk-core repository.
Metadata
Metadata
Assignees
Labels
bugSomething isn't workingSomething isn't working