-
Notifications
You must be signed in to change notification settings - Fork 17.3k
Add Execution API endpoints for callback lifecycle and token swap #66141
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| from __future__ import annotations | ||
|
|
||
| from enum import Enum | ||
|
|
||
| from airflow.api_fastapi.core_api.base import StrictBaseModel | ||
|
|
||
|
|
||
| class CallbackTerminalState(str, Enum): | ||
| """Terminal states a callback can transition to from RUNNING.""" | ||
|
|
||
| SUCCESS = "success" | ||
| FAILED = "failed" | ||
|
|
||
|
|
||
| class CallbackTerminalStatePayload(StrictBaseModel): | ||
| """Payload for transitioning a callback from RUNNING to a terminal state.""" | ||
|
|
||
| state: CallbackTerminalState | ||
| output: str | None = None |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,153 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
| from __future__ import annotations | ||
|
|
||
| from typing import Annotated | ||
| from uuid import UUID | ||
|
|
||
| import structlog | ||
| from cadwyn import VersionedAPIRouter | ||
| from fastapi import Body, HTTPException, Response, Security, status | ||
| from structlog.contextvars import bind_contextvars | ||
|
|
||
| from airflow.api_fastapi.auth.tokens import JWTGenerator | ||
| from airflow.api_fastapi.common.db.common import SessionDep | ||
| from airflow.api_fastapi.execution_api.datamodels.callback import CallbackTerminalStatePayload | ||
| from airflow.api_fastapi.execution_api.datamodels.token import TIToken | ||
| from airflow.api_fastapi.execution_api.deps import DepContainer | ||
| from airflow.api_fastapi.execution_api.security import CurrentTIToken, ExecutionAPIRoute, require_auth | ||
| from airflow.models.callback import Callback | ||
| from airflow.utils.state import CallbackState | ||
|
|
||
| log = structlog.get_logger(__name__) | ||
|
|
||
| router = VersionedAPIRouter(route_class=ExecutionAPIRoute) | ||
|
|
||
|
|
||
| def _require_self(token: TIToken, callback_id: UUID) -> None: | ||
| """Mirror the ``ti:self`` enforcement from security.py for callback routes.""" | ||
| if str(token.id) != str(callback_id): | ||
| raise HTTPException( | ||
| status_code=status.HTTP_403_FORBIDDEN, | ||
| detail="Token subject does not match callback id", | ||
| ) | ||
|
|
||
|
|
||
| @router.post( | ||
| "/{callback_id}/run", | ||
| status_code=status.HTTP_204_NO_CONTENT, | ||
| dependencies=[Security(require_auth, scopes=["token:execution", "token:workload"])], | ||
| responses={ | ||
| status.HTTP_403_FORBIDDEN: {"description": "Token subject does not match callback id"}, | ||
| status.HTTP_404_NOT_FOUND: {"description": "Callback not found"}, | ||
| status.HTTP_409_CONFLICT: {"description": "Callback is not in a state that can be marked running"}, | ||
| }, | ||
| ) | ||
| def callback_run( | ||
| callback_id: UUID, | ||
| response: Response, | ||
| session: SessionDep, | ||
| services=DepContainer, | ||
| token: TIToken = CurrentTIToken, | ||
| ) -> Response: | ||
| """ | ||
| Mark a callback as RUNNING. | ||
|
|
||
| Mirrors ``PATCH /task-instances/{id}/run``: this is the single endpoint that | ||
| accepts a workload-scoped token and atomically (a) transitions the callback | ||
| from QUEUED to RUNNING and (b) issues a fresh execution-scoped token via the | ||
| ``Refreshed-API-Token`` response header. All subsequent supervisor calls hit | ||
| execution-only routes. | ||
| """ | ||
| bind_contextvars(callback_id=str(callback_id)) | ||
| _require_self(token, callback_id) | ||
|
|
||
| callback = session.get(Callback, callback_id) | ||
| if callback is None: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_404_NOT_FOUND, | ||
| detail={"reason": "not_found", "message": "Callback not found"}, | ||
| ) | ||
|
|
||
| # Allow QUEUED → RUNNING transition; treat RUNNING as idempotent so a retried | ||
| # supervisor start does not 409. Anything else (PENDING / SCHEDULED / terminal) rejects. | ||
| if callback.state == CallbackState.RUNNING: | ||
| log.info("Duplicate start request received", callback_id=str(callback.id)) | ||
| elif callback.state == CallbackState.QUEUED: | ||
| callback.state = CallbackState.RUNNING | ||
| session.add(callback) | ||
| else: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_409_CONFLICT, | ||
| detail={ | ||
| "reason": "invalid_state", | ||
| "message": "Callback was not in a state where it could be marked running", | ||
| "current_state": callback.state, | ||
| }, | ||
| ) | ||
|
|
||
| if token.claims.scope == "workload": | ||
| generator: JWTGenerator = services.get(JWTGenerator) | ||
| execution_token = generator.generate(extras={"sub": str(callback_id), "scope": "execution"}) | ||
| response.headers["Refreshed-API-Token"] = execution_token | ||
|
|
||
| response.status_code = status.HTTP_204_NO_CONTENT | ||
| return response | ||
|
|
||
|
|
||
| @router.patch( | ||
| "/{callback_id}/state", | ||
| status_code=status.HTTP_204_NO_CONTENT, | ||
| responses={ | ||
| status.HTTP_403_FORBIDDEN: {"description": "Token subject does not match callback id"}, | ||
| status.HTTP_404_NOT_FOUND: {"description": "Callback not found"}, | ||
| status.HTTP_409_CONFLICT: {"description": "Callback is not in RUNNING state"}, | ||
| }, | ||
| ) | ||
| def callback_update_state( | ||
| callback_id: UUID, | ||
| payload: Annotated[CallbackTerminalStatePayload, Body()], | ||
| session: SessionDep, | ||
| token: TIToken = CurrentTIToken, | ||
| ) -> Response: | ||
| """Mark a RUNNING callback as SUCCESS or FAILED.""" | ||
| bind_contextvars(callback_id=str(callback_id)) | ||
| _require_self(token, callback_id) | ||
|
|
||
| callback = session.get(Callback, callback_id) | ||
| if callback is None: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_404_NOT_FOUND, | ||
| detail={"reason": "not_found", "message": "Callback not found"}, | ||
| ) | ||
|
|
||
| if callback.state != CallbackState.RUNNING: | ||
| raise HTTPException( | ||
| status_code=status.HTTP_409_CONFLICT, | ||
| detail={ | ||
| "reason": "invalid_state", | ||
| "message": "Callback was not in RUNNING state", | ||
| "current_state": callback.state, | ||
| }, | ||
| ) | ||
|
|
||
| callback.state = CallbackState(payload.state) | ||
| if payload.output is not None: | ||
| callback.output = payload.output | ||
| session.add(callback) | ||
|
|
||
| return Response(status_code=status.HTTP_204_NO_CONTENT) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -51,6 +51,7 @@ | |
| AddTeamNameField, | ||
| AddVariableKeysEndpoint, | ||
| ) | ||
| from airflow.api_fastapi.execution_api.versions.v2026_07_01 import AddCallbackEndpoints | ||
|
|
||
| bundle = VersionBundle( | ||
| HeadVersion(), | ||
|
|
@@ -66,6 +67,10 @@ | |
| AddAssetsByAliasEndpoint, | ||
| AddPartitionDateField, | ||
| ), | ||
| Version( | ||
| "2026-06-01", | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please reconcile before this merges: the version module is named
It works today (Cadwyn keys off the string, and |
||
| AddCallbackEndpoints, | ||
| ), | ||
| Version( | ||
| "2026-04-06", | ||
| AddPartitionKeyField, | ||
|
|
||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Since this has missed 3.3.0 we will want to future-date this version to ~2026-07-31 (and the release manager will fix the version when it's included in a release) |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| # Licensed to the Apache Software Foundation (ASF) under one | ||
| # or more contributor license agreements. See the NOTICE file | ||
| # distributed with this work for additional information | ||
| # regarding copyright ownership. The ASF licenses this file | ||
| # to you under the Apache License, Version 2.0 (the | ||
| # "License"); you may not use this file except in compliance | ||
| # with the License. You may obtain a copy of the License at | ||
| # | ||
| # http://www.apache.org/licenses/LICENSE-2.0 | ||
| # | ||
| # Unless required by applicable law or agreed to in writing, | ||
| # software distributed under the License is distributed on an | ||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| # KIND, either express or implied. See the License for the | ||
| # specific language governing permissions and limitations | ||
| # under the License. | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| from cadwyn import VersionChange, endpoint | ||
|
|
||
|
|
||
| class AddCallbackEndpoints(VersionChange): | ||
|
wjddn279 marked this conversation as resolved.
|
||
| """Add the ``POST /callbacks/{callback_id}/run`` and ``PATCH /callbacks/{callback_id}/state`` endpoints.""" | ||
|
|
||
| description = __doc__ | ||
|
|
||
| instructions_to_migrate_to_previous_version = ( | ||
| endpoint("/callbacks/{callback_id}/run", ["POST"]).didnt_exist, | ||
| endpoint("/callbacks/{callback_id}/state", ["PATCH"]).didnt_exist, | ||
| ) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this doc string is what shows up in the OpenAPI generated spec, and thus as inline docs for any clients generated off this.
As a result we should re-word this to make it be more user/client focused. (It's pretty close, I just don't think the "Mirrors PATCH" part is relevant to users. That can be a comment inside the fn itself if you think that is relevant.