Skip to content

[Bug] Unable to test child workflow cancellation: Invalid transition in activity #1280

@alemar99

Description

@alemar99

What are you really trying to do?

I have a parent workflow which spawns multiple child workflows. I want to write a test to make sure that cancelling one child workflow doesn't affect the execution of others. Also, I have an activity to be executed on child workflow failures excluding Cancelled errors that I want to test as well.

Describe the bug

When trying to cancel a workflow I keep getting a warning from temporal_sdk_core about an invalid transition of state:

2026-01-16T16:29:02.805388Z  WARN temporal_sdk_core::worker::workflow::managed_run: Error while updating workflow error=Fatal("ActivityMachine in state ScheduledActivityCancelCommandCreated says the transition is invalid during event EventInfo { 
event_id: 10, event_type: ActivityTaskStarted }")
2026-01-16T16:29:02.805454Z  WARN temporal_sdk_core::worker::workflow: Failing workflow task run_id=61d7166e-1f0f-4f45-8646-2ef967ee51c0 failure=Failure { failure: Some(Failure { message: "Fatal error in workflow machines: ActivityMachine in stat
e ScheduledActivityCancelCommandCreated says the transition is invalid during event EventInfo { event_id: 10, event_type: ActivityTaskStarted }", source: "", stack_trace: "", encoded_attributes: None, cause: None, failure_info: Some(ApplicationFa
ilureInfo(ApplicationFailureInfo { r#type: "", non_retryable: false, details: None, next_retry_delay: None })) }), force_cause: Unspecified }

As a result, the workflow is not cancelled and the await handle.cancel() call doesn't raise any exception.

Minimal Reproduction

I wrote the following reproducer based on your samples. It should fail on every run, but it's possible to be a little bit flaky.

import asyncio
import logging
import pytest
import uuid
from dataclasses import dataclass
from datetime import timedelta

from temporalio import activity, workflow
from temporalio.client import WorkflowExecutionStatus
from temporalio.exceptions import (
    ChildWorkflowError,
    is_cancelled_exception,
)
from temporalio.service import RPCError
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import Worker


@activity.defn
async def get_string() -> str:
    return "blah"


@dataclass
class ComposeGreetingInput:
    greeting: str
    name: str


@workflow.defn
class ComposeGreetingWorkflow1:
    @workflow.run
    async def run(self, input: ComposeGreetingInput) -> str:
        logging.info(f"start child: {input.greeting}")
        r = await workflow.execute_activity(
            get_string,
            start_to_close_timeout=timedelta(seconds=10),
        )
        logging.info(f"{input.greeting}: {r}")
        return f"{input.greeting}, {input.name}!"


@dataclass
class ErrorInput:
    errors: list


@activity.defn
async def return_errors(input: ErrorInput):
    activity.logger.info("Running activity with parameter %s" % input)
    logging.info(f"Errors: {input.errors}")


@workflow.defn
class GreetingWorkflow:
    @workflow.run
    async def run(self, name: str) -> None:
        logging.info("start parent wf")
        results = []
        errors = []
        greetings = ("hello", "ciao", "hola")
        wfs = [
            workflow.execute_child_workflow(
                ComposeGreetingWorkflow1.run,
                arg=ComposeGreetingInput(greeting, name),
                id=f"{greeting}-wf-id",
                parent_close_policy=workflow.ParentClosePolicy.REQUEST_CANCEL,
            )
            for greeting in greetings
        ]

        for wf_execution in workflow.as_completed(wfs):
            try:
                result = await wf_execution
                results.append(result)
            except ChildWorkflowError as ex:
                logging.info(ex)
                if not is_cancelled_exception(ex):
                    errors.append(ex)

        if errors:
            await workflow.execute_activity(
                return_errors,
                arg=ErrorInput(errors),
                start_to_close_timeout=timedelta(seconds=10),
            )

        logging.info(f"Results: {results}")


@pytest.mark.asyncio
class TestChildWorkflowCancellation:
    async def test_wf_cancellation(self):
        async with await WorkflowEnvironment.start_time_skipping() as env:
            async with Worker(
                env.client,
                task_queue="main",
                workflows=[
                    GreetingWorkflow,
                    ComposeGreetingWorkflow1,
                ],
                activities=[get_string, return_errors],
            ):
                handle = await env.client.start_workflow(
                    GreetingWorkflow.run,
                    "World",
                    id=str(uuid.uuid4()),
                    task_queue="main",
                )
                hello_wf_handle = env.client.get_workflow_handle("hello-wf-id")
                hola_wf_handle = env.client.get_workflow_handle("hola-wf-id")
                ciao_wf_handle = env.client.get_workflow_handle("ciao-wf-id")

                await asyncio.wait_for(
                    cancel_workflow_immediately(ciao_wf_handle), timeout=5
                )

                await handle.result()

                assert (
                    await get_workflow_status(hello_wf_handle)
                ) == WorkflowExecutionStatus.COMPLETED
                assert (
                    await get_workflow_status(hola_wf_handle)
                ) == WorkflowExecutionStatus.COMPLETED


async def cancel_workflow_immediately(handle):
    """Calls `await handle.cancel()` until it succeeds.

    Use this with `asyncio.wait_for(cancel_workflow_immediately(h), timeout=X)`
    to specify a timeout and avoid to potentially run it forever.

    The function will then assert that the status of the workflow is canceled.
    """
    while (
        status := await get_workflow_status(handle)
    ) != WorkflowExecutionStatus.RUNNING:
        continue

    while True:
        try:
            await handle.cancel()
            break
        except RPCError:
            continue

    # Iterate until the workflow is not running anymore: the cancellation is not
    # an immediate action and might take a bit of time.
    while (
        status := await get_workflow_status(handle)
    ) == WorkflowExecutionStatus.RUNNING:
        continue

    assert status == WorkflowExecutionStatus.CANCELED, (
        f"Workflow not in Canceled status. Status: {status}"
    )


async def get_workflow_status(handle) -> WorkflowExecutionStatus:
    while True:
        try:
            return (await handle.describe()).status
        except RPCError:
            continue

Environment/Versions

  • OS and processor: Linux
  • Python SDK version: 1.8.0
  • Are you using Docker or Kubernetes or building Temporal from source? No

I don't know if it belongs here or rather in the https://github.com/temporalio/sdk-core repository.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions