diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 68536fe6e7418..6b0836ce2b909 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -162,8 +162,15 @@ Dockerfile.ci @potiuk @ashb @gopidesupavan @amoghrajesh @jscheffl @bugraoz93 @ja # Python SDK /task-sdk/ @ashb @kaxil @amoghrajesh +# AIP-108 - Coordinators +/task-sdk/src/airflow/sdk/coordinators/ @jason810496 @uranusjr +/task-sdk/src/airflow/sdk/execution_time/coordinator.py @jason810496 @uranusjr + # Golang SDK -/go-sdk/ @ashb @amoghrajesh +/go-sdk/ @ashb @amoghrajesh @jason810496 + +# Java SDK +/java-sdk/ @uranusjr @jason810496 # Shared Libraries /shared/ @ashb @amoghrajesh @potiuk diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 68a622477fc6a..f0337bb052b80 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -42,19 +42,6 @@ updates: patterns: - "*" - - package-ecosystem: "github-actions" - directory: "/" - cooldown: - default-days: 4 - schedule: - # Check for updates to GitHub Actions every week - interval: "weekly" - target-branch: v2-11-test - groups: - github-actions-updates: - patterns: - - "*" - - package-ecosystem: pip cooldown: default-days: 4 @@ -260,39 +247,6 @@ updates: - dependency-name: "*" update-types: ["version-update:semver-major"] - # Repeat dependency updates on 2.11 branch as well - - package-ecosystem: pip - cooldown: - default-days: 4 - directories: - - /clients/python - - /dev/breeze - - /docker_tests - - / - schedule: - interval: "weekly" - target-branch: v2-11-test - groups: - pip-dependency-updates: - patterns: - - "*" - - - package-ecosystem: npm - cooldown: - default-days: 4 - directories: - - /airflow/www/ - schedule: - interval: "weekly" - target-branch: v2-11-test - groups: - legacy-ui-package-updates: - patterns: - - "*" - ignore: - - dependency-name: "*" - update-types: ["version-update:semver-major"] - - package-ecosystem: "uv" cooldown: default-days: 4 diff --git a/.github/workflows/additional-ci-image-checks.yml b/.github/workflows/additional-ci-image-checks.yml index 6319967ae2285..f66a6627917b0 100644 --- a/.github/workflows/additional-ci-image-checks.yml +++ b/.github/workflows/additional-ci-image-checks.yml @@ -137,7 +137,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install Breeze" diff --git a/.github/workflows/additional-prod-image-tests.yml b/.github/workflows/additional-prod-image-tests.yml index 85124ef3801cc..bfb21345c4d20 100644 --- a/.github/workflows/additional-prod-image-tests.yml +++ b/.github/workflows/additional-prod-image-tests.yml @@ -143,7 +143,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 2 persist-credentials: false @@ -181,7 +181,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 2 persist-credentials: false @@ -212,7 +212,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 2 persist-credentials: false @@ -372,7 +372,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 2 persist-credentials: false diff --git a/.github/workflows/airflow-distributions-tests.yml b/.github/workflows/airflow-distributions-tests.yml index 6927c23d29933..d6eb98fddbae3 100644 --- a/.github/workflows/airflow-distributions-tests.yml +++ b/.github/workflows/airflow-distributions-tests.yml @@ -90,7 +90,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ matrix.python-version }}" diff --git a/.github/workflows/airflow-e2e-tests.yml b/.github/workflows/airflow-e2e-tests.yml index f047d2a86809c..3a87ea525f7b5 100644 --- a/.github/workflows/airflow-e2e-tests.yml +++ b/.github/workflows/airflow-e2e-tests.yml @@ -100,7 +100,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 2 persist-credentials: false diff --git a/.github/workflows/asf-allowlist-check.yml b/.github/workflows/asf-allowlist-check.yml index a46b53dfe0a37..7a1d0b4d56f80 100644 --- a/.github/workflows/asf-allowlist-check.yml +++ b/.github/workflows/asf-allowlist-check.yml @@ -28,7 +28,7 @@ jobs: asf-allowlist-check: runs-on: ubuntu-latest steps: - - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + - uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - uses: apache/infrastructure-actions/allowlist-check@4e9c961f587f72b170874b6f5cd4ac15f7f26eb8 # main diff --git a/.github/workflows/backport-cli.yml b/.github/workflows/backport-cli.yml index 170493b1bb7ce..46fbe0cae6f57 100644 --- a/.github/workflows/backport-cli.yml +++ b/.github/workflows/backport-cli.yml @@ -53,7 +53,7 @@ jobs: steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" id: checkout-for-backport - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: true fetch-depth: 0 diff --git a/.github/workflows/basic-tests.yml b/.github/workflows/basic-tests.yml index c6a9f4684cdd0..714aef1139ca4 100644 --- a/.github/workflows/basic-tests.yml +++ b/.github/workflows/basic-tests.yml @@ -87,7 +87,7 @@ jobs: - name: "Cleanup repo" shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + - uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: # Need to fetch all history for selective checks tests fetch-depth: 0 @@ -106,7 +106,7 @@ jobs: - name: "Cleanup repo" shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + - uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 0 persist-credentials: false @@ -115,7 +115,7 @@ jobs: - name: "Install SVN" run: sudo apt-get update && sudo apt-get install -y subversion - name: "Install Java (for Apache RAT)" - uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5.2.0 + uses: actions/setup-java@ad2b38190b15e4d6bdf0c97fb4fca8412226d287 # v5.3.0 with: distribution: 'temurin' java-version: '17' @@ -134,7 +134,7 @@ jobs: runs-on: ${{ fromJSON(inputs.runners) }} steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 1 persist-credentials: false @@ -159,7 +159,7 @@ jobs: if: inputs.run-scripts-tests == 'true' steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 1 persist-credentials: false @@ -188,11 +188,11 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: Setup pnpm - uses: pnpm/action-setup@0e279bb959325dab635dd2c09392533439d90093 # v6.0.8 + uses: pnpm/action-setup@0ebf47130e4866e96fce0953f49152a61190b271 # v6.0.9 with: version: 9 run_install: false @@ -250,7 +250,7 @@ jobs: runs-on: ${{ fromJSON(inputs.runners) }} steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install Breeze" @@ -271,7 +271,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install Breeze" @@ -285,7 +285,7 @@ jobs: platform: ${{ inputs.platform }} save-cache: true - name: Fetch incoming commit ${{ github.sha }} with its parent - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: ref: ${{ github.sha }} fetch-depth: 2 @@ -306,7 +306,7 @@ jobs: runs-on: ["windows-2025"] steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 2 persist-credentials: false @@ -327,7 +327,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install Breeze" @@ -374,7 +374,7 @@ jobs: FORCE_COLOR: 1 steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install uv" diff --git a/.github/workflows/ci-amd.yml b/.github/workflows/ci-amd.yml index 43276945a9c69..a1e07e1f16c15 100644 --- a/.github/workflows/ci-amd.yml +++ b/.github/workflows/ci-amd.yml @@ -26,10 +26,11 @@ name: Tests (AMD) on: # yamllint disable-line rule:truthy schedule: - # Mirror of the previous AMD canary cron from before the AMD/ARM split (PR #66348), - # offset by 30 min from ARM's `:28` slot in `ci-arm.yml` so the two scheduled - # canaries don't compete for runners at exactly the same minute. - - cron: '58 1,7,13,19 * * *' + # AMD canary runs 2x/day (01:58, 13:58), interleaved with ARM's 2x/day (07:28, 19:28) + # in `ci-arm.yml` so a full-matrix canary still runs roughly every ~6h (alternating + # architecture) while halving the scheduled AMD compute. The `:58` vs ARM's `:28` + # offset keeps the two from competing for runners at exactly the same minute. + - cron: '58 1,13 * * *' pull_request: branches: - main @@ -165,11 +166,11 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: Fetch incoming commit ${{ github.sha }} with its parent - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: ref: ${{ github.sha }} fetch-depth: 2 @@ -362,7 +363,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ needs.build-info.outputs.default-python-version }}" @@ -404,7 +405,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ needs.build-info.outputs.default-python-version }}" @@ -939,7 +940,7 @@ jobs: VERBOSE: "true" steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false # keep this in sync with go.mod in go-sdk/ @@ -976,11 +977,11 @@ jobs: VERBOSE: "true" steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: Setup Java - uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5.2.0 + uses: actions/setup-java@ad2b38190b15e4d6bdf0c97fb4fca8412226d287 # v5.3.0 with: distribution: 'temurin' java-version: ${{ env.JAVA_VERSION }} @@ -1006,7 +1007,7 @@ jobs: VERBOSE: "true" steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install Breeze" @@ -1109,7 +1110,7 @@ jobs: runs-on: ["ubuntu-22.04"] steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Get failing jobs" @@ -1222,7 +1223,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Free up disk space" diff --git a/.github/workflows/ci-arm.yml b/.github/workflows/ci-arm.yml index 688408aa8e49d..71847cc44b5c7 100644 --- a/.github/workflows/ci-arm.yml +++ b/.github/workflows/ci-arm.yml @@ -26,7 +26,11 @@ name: Tests (ARM) on: # yamllint disable-line rule:truthy schedule: - - cron: '28 1,3,7,9,13,15,19,21 * * *' + # ARM canary runs 2x/day (07:28, 19:28), interleaved with AMD's 2x/day (01:58, 13:58) + # in `ci-amd.yml` so a full-matrix canary still runs roughly every ~6h (alternating + # architecture). The `:28` vs AMD's `:58` offset keeps the two from competing for + # runners at exactly the same minute. + - cron: '28 7,19 * * *' push: # Post-merge pushes to release-prep / providers branches run on both # AMD and ARM (the matching block lives in the other wrapper too — @@ -155,11 +159,11 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: Fetch incoming commit ${{ github.sha }} with its parent - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: ref: ${{ github.sha }} fetch-depth: 2 @@ -352,7 +356,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ needs.build-info.outputs.default-python-version }}" @@ -394,7 +398,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ needs.build-info.outputs.default-python-version }}" @@ -929,7 +933,7 @@ jobs: VERBOSE: "true" steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false # keep this in sync with go.mod in go-sdk/ @@ -966,11 +970,11 @@ jobs: VERBOSE: "true" steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: Setup Java - uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5.2.0 + uses: actions/setup-java@ad2b38190b15e4d6bdf0c97fb4fca8412226d287 # v5.3.0 with: distribution: 'temurin' java-version: ${{ env.JAVA_VERSION }} @@ -996,7 +1000,7 @@ jobs: VERBOSE: "true" steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install Breeze" @@ -1099,7 +1103,7 @@ jobs: runs-on: ["ubuntu-22.04"] steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Get failing jobs" @@ -1212,7 +1216,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Free up disk space" diff --git a/.github/workflows/ci-duration-monitor.yml b/.github/workflows/ci-duration-monitor.yml index bc0f093203abe..2f3e1398d29ab 100644 --- a/.github/workflows/ci-duration-monitor.yml +++ b/.github/workflows/ci-duration-monitor.yml @@ -37,7 +37,7 @@ jobs: runs-on: ubuntu-latest steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false diff --git a/.github/workflows/ci-image-build.yml b/.github/workflows/ci-image-build.yml index d5a638dd44add..7f1fa61fe609e 100644 --- a/.github/workflows/ci-image-build.yml +++ b/.github/workflows/ci-image-build.yml @@ -118,7 +118,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout target branch" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Free up disk space" diff --git a/.github/workflows/ci-image-checks.yml b/.github/workflows/ci-image-checks.yml index d678fc2c096ae..5eba03f950cae 100644 --- a/.github/workflows/ci-image-checks.yml +++ b/.github/workflows/ci-image-checks.yml @@ -130,7 +130,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ inputs.default-python-version }}" @@ -183,7 +183,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ inputs.default-python-version }}" @@ -351,7 +351,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ inputs.default-python-version }}" @@ -476,12 +476,12 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 2 persist-credentials: false - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: repository: "apache/airflow-client-python" fetch-depth: 1 diff --git a/.github/workflows/ci-notification.yml b/.github/workflows/ci-notification.yml index 5ed31a99ff51e..c896a94a937b8 100644 --- a/.github/workflows/ci-notification.yml +++ b/.github/workflows/ci-notification.yml @@ -43,7 +43,7 @@ jobs: runs-on: ubuntu-latest steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index 624368905e3f6..20dbac2c97e7d 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -110,13 +110,13 @@ jobs: security-events: write steps: - name: Checkout repository - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: Setup Java if: matrix.language == 'java' - uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # v5.2.0 + uses: actions/setup-java@ad2b38190b15e4d6bdf0c97fb4fca8412226d287 # v5.3.0 with: distribution: 'temurin' java-version: '11' diff --git a/.github/workflows/e2e-flaky-tests-report.yml b/.github/workflows/e2e-flaky-tests-report.yml index 4deb9f3cb762a..b4408985236ef 100644 --- a/.github/workflows/e2e-flaky-tests-report.yml +++ b/.github/workflows/e2e-flaky-tests-report.yml @@ -36,7 +36,7 @@ jobs: runs-on: ubuntu-latest steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false diff --git a/.github/workflows/finalize-tests.yml b/.github/workflows/finalize-tests.yml index 7d8f479969588..e03d6f35e98fe 100644 --- a/.github/workflows/finalize-tests.yml +++ b/.github/workflows/finalize-tests.yml @@ -99,7 +99,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: # Needed to perform push action persist-credentials: false @@ -107,7 +107,7 @@ jobs: id: constraints-branch run: ./scripts/ci/constraints/ci_branch_constraints.sh >> ${GITHUB_OUTPUT} - name: Checkout ${{ steps.constraints-branch.outputs.branch }} - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: path: "constraints" ref: ${{ steps.constraints-branch.outputs.branch }} @@ -139,7 +139,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ matrix.python-version }}" diff --git a/.github/workflows/generate-constraints.yml b/.github/workflows/generate-constraints.yml index 5b6a839a4f3d3..55604039f7734 100644 --- a/.github/workflows/generate-constraints.yml +++ b/.github/workflows/generate-constraints.yml @@ -77,7 +77,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install prek" diff --git a/.github/workflows/helm-tests.yml b/.github/workflows/helm-tests.yml index 933728eb2b938..1f94a9a37fc56 100644 --- a/.github/workflows/helm-tests.yml +++ b/.github/workflows/helm-tests.yml @@ -76,7 +76,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ inputs.default-python-version }}" @@ -107,7 +107,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install Breeze" diff --git a/.github/workflows/integration-system-tests.yml b/.github/workflows/integration-system-tests.yml index a7c23458a74b2..466de7b0d0c31 100644 --- a/.github/workflows/integration-system-tests.yml +++ b/.github/workflows/integration-system-tests.yml @@ -98,7 +98,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ inputs.default-python-version }}" @@ -149,7 +149,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ inputs.default-python-version }}" @@ -194,7 +194,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Prepare breeze & CI image: ${{ inputs.default-python-version }}" diff --git a/.github/workflows/k8s-tests.yml b/.github/workflows/k8s-tests.yml index 34026d1393a75..04f587edf9640 100644 --- a/.github/workflows/k8s-tests.yml +++ b/.github/workflows/k8s-tests.yml @@ -81,7 +81,7 @@ jobs: echo "PYTHON_MAJOR_MINOR_VERSION=${KUBERNETES_COMBO}" | sed 's/-.*//' >> $GITHUB_ENV echo "KUBERNETES_VERSION=${KUBERNETES_COMBO}" | sed 's/=[^-]*-/=/' >> $GITHUB_ENV - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false # env.PYTHON_MAJOR_MINOR_VERSION, env.KUBERNETES_VERSION are set in the previous diff --git a/.github/workflows/milestone-tag-assistant.yml b/.github/workflows/milestone-tag-assistant.yml index 48d7af9fb1f8b..27f7b2030f91e 100644 --- a/.github/workflows/milestone-tag-assistant.yml +++ b/.github/workflows/milestone-tag-assistant.yml @@ -101,7 +101,7 @@ jobs: steps: - name: "Checkout repository" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false # Always checkout main to ensure Breeze with set-milestone command is available diff --git a/.github/workflows/notify-uv-lock-conflicts.yml b/.github/workflows/notify-uv-lock-conflicts.yml index 69889eb2363b0..b45ae572e5f8a 100644 --- a/.github/workflows/notify-uv-lock-conflicts.yml +++ b/.github/workflows/notify-uv-lock-conflicts.yml @@ -33,7 +33,7 @@ jobs: timeout-minutes: 10 steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install uv" diff --git a/.github/workflows/prod-image-build.yml b/.github/workflows/prod-image-build.yml index b7d1e60d7df22..a5a120ae127db 100644 --- a/.github/workflows/prod-image-build.yml +++ b/.github/workflows/prod-image-build.yml @@ -125,7 +125,7 @@ jobs: run: sudo rm -rf ${GITHUB_WORKSPACE}/* if: inputs.upload-package-artifact == 'true' - name: "Checkout target branch" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Make /mnt writeable" @@ -220,7 +220,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout target branch" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Make /mnt writeable" diff --git a/.github/workflows/publish-docs-to-s3.yml b/.github/workflows/publish-docs-to-s3.yml index 27cf7394cb59f..739be11a17379 100644 --- a/.github/workflows/publish-docs-to-s3.yml +++ b/.github/workflows/publish-docs-to-s3.yml @@ -116,7 +116,7 @@ jobs: ]'), github.event.sender.login) steps: - name: "Checkout for wave provider derivation" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false ref: ${{ inputs.ref }} @@ -235,7 +235,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout current version first to clean-up stuff" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false path: current-version @@ -253,7 +253,7 @@ jobs: # registry buildx cache pushed by the last "main" Test workflow so that layer rebuilds # are avoided whenever the ref is close enough to main for the cache to apply. - name: "Checkout ${{ inputs.ref }} " - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false ref: ${{ inputs.ref }} @@ -362,7 +362,7 @@ jobs: built: ${{ steps.sdk-present.outputs.exists }} steps: - name: "Checkout ${{ inputs.ref }}" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false ref: ${{ inputs.ref }} @@ -427,7 +427,7 @@ jobs: # but it will build the CI image from the version of Airflow that is used to check out things # We also fetch the whole history to be able to prepare SBOM files - name: "Checkout current version with all history for SBOM" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false fetch-depth: 0 diff --git a/.github/workflows/push-image-cache.yml b/.github/workflows/push-image-cache.yml index f01ac884824a1..04e62eeb2b0f5 100644 --- a/.github/workflows/push-image-cache.yml +++ b/.github/workflows/push-image-cache.yml @@ -114,7 +114,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Free up disk space" @@ -184,7 +184,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Free up disk space" diff --git a/.github/workflows/registry-backfill.yml b/.github/workflows/registry-backfill.yml index 0ce6cbee29895..6bd6493a09ee7 100644 --- a/.github/workflows/registry-backfill.yml +++ b/.github/workflows/registry-backfill.yml @@ -126,7 +126,7 @@ jobs: packages: read steps: - name: "Checkout repository" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false fetch-depth: 0 @@ -217,7 +217,7 @@ jobs: --provider "${PROVIDER}" ${VERSION_ARGS} - name: "Setup pnpm" - uses: pnpm/action-setup@0e279bb959325dab635dd2c09392533439d90093 # v6.0.8 + uses: pnpm/action-setup@0ebf47130e4866e96fce0953f49152a61190b271 # v6.0.9 with: version: 10 @@ -284,7 +284,7 @@ jobs: name: "Publish versions.json" steps: - name: "Checkout repository" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false diff --git a/.github/workflows/registry-build.yml b/.github/workflows/registry-build.yml index cd4294144108c..81847483ea448 100644 --- a/.github/workflows/registry-build.yml +++ b/.github/workflows/registry-build.yml @@ -112,7 +112,7 @@ jobs: contents: read steps: - name: "Checkout repository" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false # Tags drive the phantom-version filter in extract_metadata.py @@ -231,7 +231,7 @@ jobs: fi - name: "Setup pnpm" - uses: pnpm/action-setup@0e279bb959325dab635dd2c09392533439d90093 # v6.0.8 + uses: pnpm/action-setup@0ebf47130e4866e96fce0953f49152a61190b271 # v6.0.9 with: version: 10 diff --git a/.github/workflows/registry-tests.yml b/.github/workflows/registry-tests.yml index 1faa918952ed1..7a0cb3c518eed 100644 --- a/.github/workflows/registry-tests.yml +++ b/.github/workflows/registry-tests.yml @@ -45,7 +45,7 @@ jobs: timeout-minutes: 5 steps: - name: "Checkout repository" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false diff --git a/.github/workflows/release_dockerhub_image.yml b/.github/workflows/release_dockerhub_image.yml index 293229a90b59a..4973425b06c67 100644 --- a/.github/workflows/release_dockerhub_image.yml +++ b/.github/workflows/release_dockerhub_image.yml @@ -86,7 +86,7 @@ jobs: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install Breeze" diff --git a/.github/workflows/release_single_dockerhub_image.yml b/.github/workflows/release_single_dockerhub_image.yml index 0c50521bed653..c3926659bbc6d 100644 --- a/.github/workflows/release_single_dockerhub_image.yml +++ b/.github/workflows/release_single_dockerhub_image.yml @@ -77,7 +77,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install Breeze" @@ -171,7 +171,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install Breeze" diff --git a/.github/workflows/run-unit-tests.yml b/.github/workflows/run-unit-tests.yml index 9417049bd5f5a..d1aa896615026 100644 --- a/.github/workflows/run-unit-tests.yml +++ b/.github/workflows/run-unit-tests.yml @@ -183,7 +183,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Make /mnt writeable" diff --git a/.github/workflows/scheduled-verify-release-calendar.yml b/.github/workflows/scheduled-verify-release-calendar.yml index fceb957cdaca6..36fd1b938ddf8 100644 --- a/.github/workflows/scheduled-verify-release-calendar.yml +++ b/.github/workflows/scheduled-verify-release-calendar.yml @@ -31,7 +31,7 @@ jobs: timeout-minutes: 10 steps: - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install uv" diff --git a/.github/workflows/test-providers.yml b/.github/workflows/test-providers.yml index 81cc0d037d84a..d4dbe902a8d8e 100644 --- a/.github/workflows/test-providers.yml +++ b/.github/workflows/test-providers.yml @@ -89,7 +89,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install prek" @@ -199,7 +199,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Install prek" diff --git a/.github/workflows/ui-e2e-tests.yml b/.github/workflows/ui-e2e-tests.yml index 4057cda0533fc..ee0e417d9632d 100644 --- a/.github/workflows/ui-e2e-tests.yml +++ b/.github/workflows/ui-e2e-tests.yml @@ -103,7 +103,7 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: fetch-depth: 2 persist-credentials: false @@ -121,7 +121,7 @@ jobs: uses: ./.github/actions/breeze if: github.event_name == 'workflow_dispatch' - name: "Setup pnpm" - uses: pnpm/action-setup@0e279bb959325dab635dd2c09392533439d90093 # v6.0.8 + uses: pnpm/action-setup@0ebf47130e4866e96fce0953f49152a61190b271 # v6.0.9 with: version: 9 run_install: false diff --git a/.github/workflows/update-constraints-on-push-stable.yml b/.github/workflows/update-constraints-on-push-stable.yml index 384ad3bdb3842..2ad9cbf85bc0a 100644 --- a/.github/workflows/update-constraints-on-push-stable.yml +++ b/.github/workflows/update-constraints-on-push-stable.yml @@ -47,11 +47,11 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: Fetch incoming commit ${{ github.sha }} with its parent - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: ref: ${{ github.sha }} fetch-depth: 2 @@ -127,14 +127,14 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Set constraints branch name" id: constraints-branch run: ./scripts/ci/constraints/ci_branch_constraints.sh >> ${GITHUB_OUTPUT} - name: Checkout ${{ steps.constraints-branch.outputs.branch }} - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: path: "constraints" ref: ${{ steps.constraints-branch.outputs.branch }} diff --git a/.github/workflows/update-constraints-on-push.yml b/.github/workflows/update-constraints-on-push.yml index 5e3edb2a50479..16460bd89cbb0 100644 --- a/.github/workflows/update-constraints-on-push.yml +++ b/.github/workflows/update-constraints-on-push.yml @@ -50,11 +50,11 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: Fetch incoming commit ${{ github.sha }} with its parent - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: ref: ${{ github.sha }} fetch-depth: 2 @@ -130,14 +130,14 @@ jobs: shell: bash run: sudo rm -rf ${GITHUB_WORKSPACE}/* - name: "Checkout ${{ github.ref }} ( ${{ github.sha }} )" - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: persist-credentials: false - name: "Set constraints branch name" id: constraints-branch run: ./scripts/ci/constraints/ci_branch_constraints.sh >> ${GITHUB_OUTPUT} - name: Checkout ${{ steps.constraints-branch.outputs.branch }} - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: path: "constraints" ref: ${{ steps.constraints-branch.outputs.branch }} diff --git a/.github/workflows/upgrade-check.yml b/.github/workflows/upgrade-check.yml index fd416c9d1945c..cd3dce4d4e510 100644 --- a/.github/workflows/upgrade-check.yml +++ b/.github/workflows/upgrade-check.yml @@ -51,7 +51,7 @@ jobs: -u 0:0 bash -c "rm -rf /workspace/*" - name: >- [${{ inputs.target-branch }}] Checkout - uses: actions/checkout@df4cb1c069e1874edd31b4311f1884172cec0e10 # v6.0.3 + uses: actions/checkout@9c091bb21b7c1c1d1991bb908d89e4e9dddfe3e0 # v7.0.0 with: ref: ${{ inputs.target-branch }} fetch-depth: 0 diff --git a/RELEASE_NOTES.rst b/RELEASE_NOTES.rst index 345ddc1ce3b77..b50a38cc44dc7 100644 --- a/RELEASE_NOTES.rst +++ b/RELEASE_NOTES.rst @@ -24,28 +24,29 @@ .. towncrier release notes start -Airflow 3.3.0b2 (2026-07-06) ----------------------------- +Airflow 3.3.0 (2026-07-06) +-------------------------- Significant Changes ^^^^^^^^^^^^^^^^^^^ -Asset Partitioning -"""""""""""""""""" +Asset Partitioning (#64571, #65447, #66030, #66848, #67184, #67475, #67716, #68978) +""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" Building on the asset partitioning introduced in 3.2.0, Airflow 3.3.0 substantially expands how a single upstream asset event fans out to partitioned downstream Dag runs. New partition mappers — -``FanOutMapper`` (one-to-many), and ``FixedKeyMapper`` + ``SegmentWindow`` (categorical rollup) — join -the existing ``RollupMapper``, and compose with time windows (day/week/month/quarter/year) and a +``RollupMapper`` (many-to-one), ``FanOutMapper`` (one-to-many), and ``FixedKeyMapper`` + +``SegmentWindow`` (categorical rollup) — compose with time windows (day/week/month/quarter/year) and a ``wait_policy`` (``WaitForAll`` or ``MinimumCount(n)``) to control when partitioned runs fire. Windows can fan out forward or backward in time, and total fan-out per upstream event is bounded by the new -``[scheduler] partition_mapper_max_downstream_keys`` config (configurable per mapper). -(#66030, #66848, #67184, #67475, #67716) +``[scheduler] partition_mapper_max_downstream_keys`` config (configurable per mapper). Airflow 3.3.0 +also adds the ``PartitionedAtRuntime`` timetable, which lets a Dag declare that its partition key(s) +are assigned when the run starts rather than mapped from an upstream event. For detailed usage instructions, see :doc:`/authoring-and-scheduling/assets`. -Task and Asset State Store -"""""""""""""""""""""""""" +Task and Asset State Store (#65759, #66073, #66160, #66463, #66586, #66859, #67041, #67292, #67319) +""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" Airflow 3.3.0 introduces a first-class state store for tasks and assets (AIP-103). Tasks can persist arbitrary key-value state that survives across retries and runs via a new ``task_state_store`` accessor, and @@ -53,60 +54,54 @@ assets can carry their own state via ``asset_state_store`` — both available fr in the metadata database by default, or in a custom worker-side backend (``[workers] state_store_backend``), supports per-key retention with periodic garbage collection and an optional ``clear_on_success``, and is fully manageable through the Core API and Execution API. -(#65759, #66073, #66160, #66463, #66586, #66859, #67041, #67319) - -.. warning:: - Task and asset state storage is experimental in 3.3.0 and may change in future versions based on - user feedback. +For detailed usage instructions, see :doc:`/core-concepts/task-and-asset-state-store`. -Pluggable Retry Policies -"""""""""""""""""""""""" +Pluggable Retry Policies (#65474) +""""""""""""""""""""""""""""""""" Task retry behaviour is now pluggable (AIP-105). In addition to a fixed ``retries`` count, you can attach a custom retry policy that decides whether and when a task is retried, enabling strategies such -as retrying only on specific exceptions or backing off based on custom logic. (#65474) +as retrying only on specific exceptions or backing off based on custom logic. -Language Task SDK (Java and Go) -""""""""""""""""""""""""""""""" +For detailed usage instructions, see :ref:`concepts:retry-policies`. + +Language Task SDK (Java and Go) (#65958, #67161, #67635, #67699) +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" Airflow 3.3.0 adds a Coordinator layer (AIP-108) that lets individual task implementations be written in non-Python languages while the Dag and its scheduling stay in Python. A task is declared in the Dag with ``@task.stub(queue=...)``; the worker routes it to a configured coordinator (``JavaCoordinator`` for JVM languages, ``ExecutableCoordinator`` for self-contained native binaries such as Go) that runs the task in a language runtime and proxies Variables, Connections, and XComs back -through the Execution API. (#65958, #67118, #67161, #67635, #67699) - -.. warning:: +through the Execution API. - The Coordinator layer and the Java/Go SDKs are experimental in 3.3.0 and may change in future - versions based on user feedback. +For detailed usage instructions, see :doc:`/authoring-and-scheduling/language-sdks/index`. -DAG bundle version on clear, rerun, and backfill -"""""""""""""""""""""""""""""""""""""""""""""""" +Dag bundle version on clear, rerun, and backfill (#63884) +""""""""""""""""""""""""""""""""""""""""""""""""""""""""" The new ``rerun_with_latest_version`` setting controls whether a cleared, rerun, or backfilled Dag run uses the latest bundle version or the original version from the initial run. The default is resolved -by precedence: an explicit request parameter/CLI flag, then the DAG-level ``rerun_with_latest_version``, +by precedence: an explicit request parameter/CLI flag, then the Dag-level ``rerun_with_latest_version``, then ``[core] rerun_with_latest_version``, and finally ``False`` for clear/rerun and ``True`` for backfills (preserving historical behaviour). Airflow 2.x always reran with the latest code; 3.x introduced bundle versioning defaulting to the original version, and this setting gives users control. -(#63884) See :doc:`/administration-and-deployment/dag-bundles` for full details. -Provider example DAGs as dedicated bundles -"""""""""""""""""""""""""""""""""""""""""" +Provider example Dags as dedicated bundles (#66161) +""""""""""""""""""""""""""""""""""""""""""""""""""" -Example DAGs shipped by provider distributions are now discovered via ``ProvidersManager`` and +Example Dags shipped by provider distributions are now discovered via ``ProvidersManager`` and registered as their own Dag bundles — one per provider, named ``apache-airflow-providers--example-dags`` (or ``-example-dags`` for third-party providers). The ``[core] load_examples`` option still gates whether they are registered. -REST API clients that filtered ``bundle_name`` by ``"dags-folder"`` for provider-shipped example DAGs -must update to the new per-provider bundle names; DAG identifiers are unchanged. (#66161) +REST API clients that filtered ``bundle_name`` by ``"dags-folder"`` for provider-shipped example Dags +must update to the new per-provider bundle names; Dag identifiers are unchanged. -Remote logging resolution decoupled from ``airflow.logging_config`` -""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" +Remote logging resolution decoupled from ``airflow.logging_config`` (#67056) +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""" Remote task log handler resolution is now owned by the shared ``airflow_shared.logging.factory`` module and applies a single, well-defined precedence: @@ -124,27 +119,27 @@ not implement ``from_config`` are skipped with a warning and fall through to the Migration: replace direct calls to ``airflow.logging_config.load_logging_config()`` with the new helpers, and have provider remote-log handler classes implement a no-argument ``from_config`` -classmethod that reads ``airflow.providers.common.compat.sdk.conf``. (#67056) +classmethod that reads ``airflow.providers.common.compat.sdk.conf``. -OpenTelemetry timer metrics now use Histogram -""""""""""""""""""""""""""""""""""""""""""""" +OpenTelemetry timer metrics now use Histogram (#64207) +"""""""""""""""""""""""""""""""""""""""""""""""""""""" OpenTelemetry timer and timing metrics are now recorded as Histograms instead of Gauges, preserving -count, sum, and bucket distribution across recordings. (#64207) +count, sum, and bucket distribution across recordings. -DAG-processing "seconds ago" metric is now tagged -""""""""""""""""""""""""""""""""""""""""""""""""" +Dag-processing "seconds ago" metric is now tagged (#62487) +"""""""""""""""""""""""""""""""""""""""""""""""""""""""""" ``dag_processing.last_run.seconds_ago.{dag_file}`` is now a legacy metric. The new ``dag_processing.last_run.seconds_ago`` is emitted with ``file_path``, ``bundle_name`` and ``file_name`` tags (``file_path`` + ``bundle_name`` uniquely identify the Dag file). The legacy metric -is still emitted by default and can be disabled via ``[metrics] legacy_names_on``. (#62487) +is still emitted by default and can be disabled via ``[metrics] legacy_names_on``. -New Deadlines page under Browse -""""""""""""""""""""""""""""""" +New Deadlines page under Browse (#67586) +"""""""""""""""""""""""""""""""""""""""" A new **Deadlines** page is available under the Browse menu, accessible to any role that already has -``can_read`` and ``menu_access`` on **DAG Runs**. (#67586) +``can_read`` and ``menu_access`` on **Dag Runs**. New Features @@ -179,7 +174,7 @@ New Features - Add ``awaiting_input`` task state for Human-in-the-Loop, running off the triggerer (#68028) - Add bulk API to mark Dag runs as success or failed (#67948) - Record writer info for every asset store write for better cross-linkage (#67902) -- Register XCom ``output_type`` classes from a worker-side DAG walk (#67875) +- Register XCom ``output_type`` classes from a worker-side Dag walk (#67875) - Add ``FixedKeyMapper`` and ``SegmentWindow`` for categorical asset-partition rollup (#67716) - Add a bulk ``POST /dags/{dag_id}/clearDagRuns`` API endpoint (#67709) - Return Pydantic model instances through XCom for structured output (#67644) @@ -231,22 +226,21 @@ New Features - UI: Show expected duration based on historical average in Dag Run details (#65722) - Add team name to the task context (#65617) - Add an ``on_kill()`` hook to ``BaseTrigger`` to handle user actions on triggers (#65590) -- Allow accessing a DAG's members via ``[]`` (#65586) +- Allow accessing a Dag's members via ``[]`` (#65586) - Add pluggable retry policies for Airflow tasks (AIP-105) (#65474) - Add support for ``format="Duration"`` in params (#65469) -- Add the ``PartitionAtRuntime`` authoring API to the Task SDK (AIP-76) (#65447) - UI: Add pagination to the grid view (#65388) - Add ``partition_key`` to the task context (#65359) - Make the blocked-thread warning threshold configurable (#65009) - Add name fields to SDK deadline alerts (#64926) - Add dynamic interval resolution support via Variables for deadline alerts (#64751) -- Add an ``is_backfillable`` property to DAG API responses (#64644) +- Add an ``is_backfillable`` property to Dag API responses (#64644) - Return dag-specified results in the dag run wait API (#64577) - Hold a Dag run until all upstream partitions arrive (AIP-76) (#64571) - Add a way to mark a return-value XCom as the dag result (#64522) - Allow accessing a TaskGroup's members via ``[]`` (#64430) - UI: Redo the Gantt chart (#64335) -- UI: Add task-level filters to the DAG graph tab (#64271) +- UI: Add task-level filters to the Dag graph tab (#64271) - UI: Add bulk Clear, Mark Success/Fail, and delete for multiple task instances (#64141) - Check that multi-team is enabled when a team name is provided to the API (#63994) - Add a ``DagRunType`` for operators (#63733) @@ -259,6 +253,9 @@ New Features - Re-enable the ``start_from_trigger`` feature with template-field rendering (#55068) - Backfill partitioned Dags by partition-date range (#67537) - Add a producer-side acknowledgement channel to shared-stream triggers (#67523) +- UI: Add ``partition_date`` to the Dag run detail page (#68977) +- Expose the upstream ``partition_key`` on ``triggering_asset_events`` and ``dag_run.consumed_asset_events`` (AIP-76) (#69120) +- Allow ``get``/``set``/``delete``/``clear`` of ``AssetStateStoreAccessor`` to run on the triggerer (#68966) Bug Fixes ^^^^^^^^^ @@ -320,7 +317,7 @@ Bug Fixes - Reject negative ``default_retention_days`` in the Task SDK and core API routes (#67890) - Fix ``none_failed_min_one_success`` trigger rule checks (#67873) - Remove trigger kwargs from the REST API response (#67868) -- UI: Fix long parameter names overflowing the Trigger DAG modal (#67859) +- UI: Fix long parameter names overflowing the Trigger Dag modal (#67859) - Fix misleading log message in the task runner clear-on-success block (#67836) - Fix scheduler crash when logging orphaned task resets (#67822) - UI: Fix dashboard pool summary showing incorrect deferred slot usage (#67818) @@ -333,14 +330,14 @@ Bug Fixes - Apply per-file authorization to the dag-source endpoint (#67662) - Fix ``airflow dags next-execution --table`` crash when no next run exists (#67642) - UI: Fix the time picker omitting seconds (#67636) -- Filter scheduling-dependencies graph edges by readable-DAG access (#67627) +- Filter scheduling-dependencies graph edges by readable-Dag access (#67627) - Mask per-key secrets-backend-kwarg overrides on the Config API (#67622) - Fix ``GET /auth/login`` missing a 400 response in the OpenAPI spec (#67571) - Fix ``GET /pools`` incorrectly documenting a 404 response in the OpenAPI spec (#67570) - Add a compatibility layer for import errors caused by ``AirflowSecretsBackendAccessDenied`` (#67560) - UI: Fix rendering of None child state (#67552) - Fix sort order for mapped task instances (#67551) -- Fix import errors total-entries count with multiple DAGs per file (#67550) +- Fix import errors total-entries count with multiple Dags per file (#67550) - UI: Prefer active over queued state for collapsed groups (#67543) - Fix callback state not updating from executor events due to a UUID type mismatch (#67542) - Reject wildcard origin in CORS config instead of toggling credentials (#67502) @@ -357,7 +354,7 @@ Bug Fixes - UI: Restore the Monaco find widget in the Dag Code view (#67391) - UI: Fix an HTTPException import that turned a 400 into a 500 in the dags endpoint (#67363) - Restore ``fail_fast`` handling when reschedule exceeds the MySQL ``TIMESTAMP`` limit (#67353) -- Fix the Triggered DAG button not being visible during queued/running state (#67327) +- Fix the Triggered Dag button not being visible during queued/running state (#67327) - Fix variables import with structured falsy values (#67060) - Avoid logging Execution API bearer credentials (#67059) - Sanitize Dag processor metric file names (#67029) @@ -365,9 +362,9 @@ Bug Fixes - Prevent ``AlreadyRunningBackfill`` error caused by an invalid date range request (#66874) - Restrict owner-link and extra-link ``href`` values to safe schemes (http, https, mailto, relative) (#66741) - Add a session parameter to the ``BaseStateBackend`` interface to fix custom backends (#66708) -- Allow deadline callbacks within the same DAG module (#66702) +- Allow deadline callbacks within the same Dag module (#66702) - UI: Fix relative React plugin bundle URLs in dev mode (#66618) -- Validate DAG trigger conf as a JSON object or null (#66617) +- Validate Dag trigger conf as a JSON object or null (#66617) - Require a trust sentinel for ``state.user`` injection in ``get_user()`` (#66562) - Use ``hmac.compare_digest`` for ``SimpleAuthManager`` password comparison (CWE-208) (#66556) - Set ``SameSite=Lax`` on the ``SimpleAuthManager`` all-admins login cookie (#66502) @@ -382,7 +379,7 @@ Bug Fixes - Fix ``resolve_xcom_backend`` to rely on the config schema default (#65938) - Fix a missing import cast error in the dag_run API route (#65748) - Mask Dag processor connection and variable responses (#65704) -- UI: Show import error for deactivated DAGs (#65687) +- UI: Show import error for deactivated Dags (#65687) - Forward MySQL SSL params from ``sql_alchemy_conn`` to ``airflow db shell`` (#65575) - Disable SQLite FK checks in the 0111 migration downgrade (#65545) - Handle Variable values that cannot be decrypted gracefully in the stable REST API (#65452) @@ -393,7 +390,7 @@ Bug Fixes - Fix ``task_defer`` with non-JSON ``next_kwargs`` in ``TaskInstance`` (#64714) - Add the error as ``context["exception"]`` in ``InProcessTestSupervisor`` (#64568) - Fix NPM security alerts in the simple auth manager (#64309) -- Fix DAG run trigger to surface errors instead of swallowing them (#64130) +- Fix Dag run trigger to surface errors instead of swallowing them (#64130) - Fix Task SDK Connection extras built from a URI constructor (#64120) - Add insert/update-on-conflict for rendered task instance fields (#63874) - Fix ``timeout_with_traceback`` crashes on Windows and non-main threads (#63664) @@ -425,6 +422,11 @@ Bug Fixes - Fix ``dag.test()`` not re-syncing sibling Dags across repeated calls (#66205) - UI: Invalidate per-attempt task instance caches after actions so logs and details are not stale (#67212) - Fix task runner failure on a duplicate task instance success-state update (#63355) +- Fix a race condition on the ``order_by`` parameter when listing Dag runs via the REST API (#68948) +- Exclude non-successful Dag runs from the ``DeadlineReference.AVERAGE_RUNTIME`` deadline calculation so failed runs no longer skew the computed deadline (#68949) +- Allow ``InProcessExecutionAPI`` to start without ``api_auth.jwt_secret`` configured (#68982) +- Make ``airflow dags test`` wait for Human-in-the-loop input instead of looping indefinitely on parked HITL tasks (#69104) +- Fix the Java coordinator rejecting macOS dual-stack loopback connections (#68973) Miscellaneous ^^^^^^^^^^^^^ @@ -484,11 +486,11 @@ Miscellaneous - Pass user teams to the ``create_asset_event`` endpoint (#66367) - Load ``USFederalHolidayCalendar`` lazily to reduce memory usage when loading examples (#66303) - Propagate task OpenTelemetry trace context through IPC into Execution API requests (#66151) -- Surface worker DAG parse duration in the task log (#66138) -- Skip deserializing ``trigger_kwargs`` when loading serialized DAGs (#66002) +- Surface worker Dag parse duration in the task log (#66138) +- Skip deserializing ``trigger_kwargs`` when loading serialized Dags (#66002) - Honor ``AUTH_ROLE_PUBLIC`` in the FastAPI API server (#65685) - Add extended sysinfo for the Edge worker (#65472) -- Clarify logs when a DAG is being processed in the Dag processor (#65196) +- Clarify logs when a Dag is being processed in the Dag processor (#65196) - Add indexes on ``task_instance.dag_version_id`` and ``dag_run.created_dag_version_id`` (#64818) - Improve creation of ``RuntimeTaskInstance`` in ``TriggerRunner`` for ``start_for_trigger`` functionality (#64298) - Mark the Triggerer supervisor as a server context so it can read metastore connections (#64022) @@ -504,6 +506,7 @@ Miscellaneous - Allow synchronous deadline callbacks (``SyncCallback``) to access Connections and Variables (#65269) - Add a ``team_name`` tag to deadline metrics for multi-team deployments (#68589) - Add a ``team_name`` tag to scheduler metrics for multi-team deployments (#68594) +- Defer the Cadwyn import so FastAPI/Starlette stay off the Task SDK worker path, reducing per-worker memory (#69029) Doc Only Changes ^^^^^^^^^^^^^^^^ @@ -535,13 +538,17 @@ Doc Only Changes - Document ``on_kill()``/``cleanup()`` for triggers (#65671) - Explain ``xcom_pull`` behaviour without ``task_ids`` in the docs (#65406) - Improve standalone authentication documentation for Airflow 3.x (#65330) -- Clarify manual DAG run data interval semantics in Airflow 3 (#64740) -- Document and test ``xcom_pull`` ``run_id`` usage for triggered DAG runs (#63030) +- Clarify manual Dag run data interval semantics in Airflow 3 (#64740) +- Document and test ``xcom_pull`` ``run_id`` usage for triggered Dag runs (#63030) - Update params in the backfill documentation (#61821) - Document the ``apache-airflow-mypy`` package in the core docs (#68561) - Fix typos and formatting in the Fundamentals documentation (#68524) - Complete the Hindi (``hi``) UI translation (#68574) - Fill the Taiwanese Mandarin (``zh-TW``) UI translation gap (#68563) +- Document that Dag bundle ``kwargs`` should reference a Connection rather than inline credentials (#69105) +- Add example plugins and expand the asset-partitions documentation (#69017) +- Java SDK docs: ``JUL`` setup, pinning ``java_executable``, and a config-reload note (#69020) +- Correct the example config for the coordinators (#68940) Airflow 3.2.2 (2026-05-29) -------------------------- diff --git a/airflow-core/docs/administration-and-deployment/dag-bundles.rst b/airflow-core/docs/administration-and-deployment/dag-bundles.rst index 7e1eaf0123b20..354e6ecda1626 100644 --- a/airflow-core/docs/administration-and-deployment/dag-bundles.rst +++ b/airflow-core/docs/administration-and-deployment/dag-bundles.rst @@ -61,6 +61,24 @@ Configuring Dag bundles Dag bundles are configured in :ref:`config:dag_processor__dag_bundle_config_list`. You can add one or more Dag bundles here. +.. warning:: Reference credentials through a Connection — do not inline them + + Bundle ``kwargs`` are stored in the ``[dag_processor] dag_bundle_config_list`` + configuration, which Airflow exposes through the Config API when + :ref:`config:api__expose_config` is enabled. Any user authorized to read the + configuration can read these values verbatim, so they must not contain secrets. + + Do **not** embed credentials directly in bundle ``kwargs`` — for example a + token placed directly in a ``repo_url`` like + ``https://x-access-token:@github.com/org/repo.git``. Instead reference + an Airflow :doc:`Connection <../authoring-and-scheduling/connections>` + (``git_conn_id`` for Git, ``aws_conn_id`` for S3, ``gcp_conn_id`` for GCS) and + keep the credential in your + :doc:`secrets backend <../security/secrets/secrets-backend/index>`. Connection + fields are resolved at runtime and are not written into + ``dag_bundle_config_list``. + + By default, Airflow adds a ``LocalDagBundle`` pointing at the configured Dags folder, maintaining the same behaviour as Airflow 2's Dags folder. The only kwarg is ``path``, which defaults to the value of :ref:`config:core__dags_folder` when omitted: .. code-block:: ini diff --git a/airflow-core/docs/authoring-and-scheduling/assets.rst b/airflow-core/docs/authoring-and-scheduling/assets.rst index d039fbfd1ffd3..b5561392587db 100644 --- a/airflow-core/docs/authoring-and-scheduling/assets.rst +++ b/airflow-core/docs/authoring-and-scheduling/assets.rst @@ -523,6 +523,33 @@ creates asset events with a partition key on each run. Partitioned events are intended for partition-aware downstream scheduling, and do not trigger non-partition-aware Dags. +Pre-determined vs runtime partitioning +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Both kinds attach a partition key to a Dag run — the difference is *when* and +*by whom* the key is decided: + +* **Pre-determined partitioning** — the partition key is worked out before the + task runs, using the timetable's schedule cadence and partition mappers to match + upstream keys to downstream keys and trigger partition-based Dag runs. + :class:`~airflow.sdk.CronPartitionTimetable` uses this kind as a producer; + :class:`~airflow.sdk.PartitionedAssetTimetable` uses it as a consumer. + +* **Runtime partitioning** — the partition key is deferred to task runtime: + the producing task records key(s) via ``outlet_events[self].add_partitions(...)``. + :class:`~airflow.sdk.PartitionedAtRuntime` uses this kind and never schedules on its + own (``can_be_scheduled=False``); a schedulable timetable can also defer to runtime + by subclassing :class:`~airflow.timetables.trigger.CronTriggerTimetable` and setting + ``partitioned_at_runtime = True`` (see the custom plugin example below). + +A timetable uses one kind or the other, not both: it either resolves partitions +ahead of the run or defers them to task runtime. + +**Practical rule:** use :class:`~airflow.sdk.CronPartitionTimetable` when the +partition key follows from the schedule cadence; use :class:`~airflow.sdk.PartitionedAtRuntime` +when the key is only known once the task runs (e.g. a watermark from source data); +use :class:`~airflow.sdk.PartitionedAssetTimetable` downstream to consume either kind. + For downstream partition-aware scheduling, use ``PartitionedAssetTimetable``: .. code-block:: python @@ -753,6 +780,62 @@ so the run is held indefinitely) and the fall-back day has twenty-five (the repe hour is dropped). Use a UTC-based upstream mapper for any rollup that crosses a DST boundary; see the ``DayWindow`` class docstring for the full discussion. +Wait policies +~~~~~~~~~~~~~ + +:class:`~airflow.sdk.RollupMapper` accepts an optional ``wait_policy`` argument +that decides when the downstream Dag run fires given the expected window vs the +upstream keys that have actually arrived. + +* :class:`~airflow.sdk.WaitForAll` (the default) holds the run until every expected + upstream key in the window has arrived. +* :class:`~airflow.sdk.MinimumCount` ``(n)`` fires early once at least ``n`` of the + expected keys have arrived — useful to tolerate a slow or missing upstream partition + rather than holding the run indefinitely. + +.. code-block:: python + + from airflow.sdk import ( + DAG, + Asset, + FixedKeyMapper, + MinimumCount, + PartitionedAtRuntime, + PartitionedAssetTimetable, + RollupMapper, + SegmentWindow, + asset, + ) + + + @asset( + uri="file://incoming/player-stats/multi-region.csv", + schedule=PartitionedAtRuntime(), + ) + def multi_region_player_stats(self, outlet_events): + outlet_events[self].add_partitions(["us", "eu", "apac"]) + + + # Consumer: fires once at least two of the three declared region partitions arrive. + with DAG( + dag_id="segment_region_stats_early_rollup", + schedule=PartitionedAssetTimetable( + assets=Asset.ref(name="multi_region_player_stats"), + default_partition_mapper=RollupMapper( + upstream_mapper=FixedKeyMapper("all_regions"), + window=SegmentWindow(["us", "eu", "apac"]), + wait_policy=MinimumCount(2), + ), + ), + catchup=False, + ): + ... + +``MinimumCount(-1)`` is the relative spelling of the same threshold — "at most one +missing" — and is equivalent to ``MinimumCount(2)`` for a three-member window. +Pass :class:`~airflow.sdk.WaitForAll` explicitly when you want to document intent +rather than relying on the default. + .. _segment-categorical-rollup: Segment (categorical) rollup @@ -781,7 +864,7 @@ semantics (the default). DAG, Asset, FixedKeyMapper, - PartitionAtRuntime, + PartitionedAtRuntime, PartitionedAssetTimetable, RollupMapper, SegmentWindow, @@ -792,7 +875,7 @@ semantics (the default). @asset( uri="file://incoming/player-stats/multi-region.csv", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), ) def multi_region_player_stats(self, outlet_events): # Emit one event per region in a single run. @@ -836,17 +919,17 @@ Setting partition keys at runtime When the partition key is not known ahead of time (for example, a watermark discovered from the source data, a late-arriving file, or a backfill request), let the producing task decide it while it runs. Schedule the producer with -``PartitionAtRuntime()`` and record the key(s) on the emitted event with +``PartitionedAtRuntime()`` and record the key(s) on the emitted event with ``outlet_events[self].add_partitions(...)``: .. code-block:: python - from airflow.sdk import PartitionAtRuntime, asset + from airflow.sdk import PartitionedAtRuntime, asset @asset( uri="file://incoming/player-stats/live-region.csv", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), ) def live_region_player_stats(self, outlet_events): # The key is only known once the task runs. @@ -862,7 +945,7 @@ keys collapse to a single event: @asset( uri="file://incoming/player-stats/multi-region.csv", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), ) def multi_region_player_stats(self, outlet_events): outlet_events[self].add_partitions(["us", "eu", "apac"]) @@ -872,5 +955,189 @@ When a runtime run emits exactly one partition key, the producing these events the same way as timetable-produced partitions, through ``PartitionedAssetTimetable``. +Fan-out mappers +~~~~~~~~~~~~~~~ + +.. versionadded:: 3.3.0 + +:class:`~airflow.sdk.FanOutMapper` is the mirror of :class:`~airflow.sdk.RollupMapper`: +instead of holding many upstream events until one downstream run can fire, a single +upstream event fans *out* to one downstream Dag run per window member. It composes an +``upstream_mapper`` (which normalizes the upstream key to the window anchor) with a +:class:`~airflow.sdk.Window` that enumerates the downstream period, and an optional +``downstream_mapper`` that converts each window member into a downstream partition key +string. + +For temporal windows (:class:`~airflow.sdk.WeekWindow`, :class:`~airflow.sdk.MonthWindow`, +etc.) a default ``downstream_mapper`` is applied automatically — for example +:class:`~airflow.sdk.WeekWindow` defaults to :class:`~airflow.sdk.StartOfDayMapper`, +so each of the seven daily members is encoded as a ``YYYY-MM-DD`` string. For +:class:`~airflow.sdk.SegmentWindow` there is no default-table entry, so an explicit +``downstream_mapper`` is required. + +The following example fans a weekly model artifact out to seven daily inference runs — +one Dag run per day in the week: + +.. code-block:: python + + from airflow.sdk import ( + DAG, + Asset, + CronPartitionTimetable, + FanOutMapper, + PartitionedAssetTimetable, + StartOfWeekMapper, + WeekWindow, + task, + ) + + weekly_model_artifact = Asset(uri="file://artifacts/models/weekly.bin", name="weekly_model_artifact") + + # Producer: emits one partitioned event per week (key is the Monday date). + with DAG( + dag_id="train_weekly_model", + schedule=CronPartitionTimetable("0 0 * * 1", timezone="UTC"), + catchup=False, + ): + + @task(outlets=[weekly_model_artifact]) + def train_model(): + pass + + train_model() + + + # Consumer: one Dag run per day derived from the weekly upstream event. + with DAG( + dag_id="daily_inference", + schedule=PartitionedAssetTimetable( + assets=weekly_model_artifact, + default_partition_mapper=FanOutMapper( + upstream_mapper=StartOfWeekMapper(), + window=WeekWindow(), + max_downstream_keys=7, + ), + ), + catchup=False, + ): + + @task + def run_inference(dag_run=None): + # dag_run.partition_key is one daily key, e.g. "2026-03-10". + print(dag_run.partition_key) + + run_inference() + +``max_downstream_keys`` caps how many downstream Dag runs one upstream event may +create. When exceeded, the runs for that event are **not** queued and a "partition +fan-out exceeded" audit-log entry is recorded instead. Omitting it falls back to the +global ``[scheduler] partition_mapper_max_downstream_keys`` config (default 1000). +Set it explicitly to document intent and guard against accidental fan-out explosions. + +Window direction: FORWARD and BACKWARD +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Every :class:`~airflow.sdk.Window` supports a ``direction`` parameter that controls +which period the window enumerates relative to its anchor. + +* ``Window.Direction.FORWARD`` (the default) — yields the period *starting* at the + upstream key. For a weekly upstream key ``"2026-03-09"`` (Monday), + ``WeekWindow()`` yields the seven days ``2026-03-09`` through ``2026-03-15``. +* ``Window.Direction.BACKWARD`` — yields the trailing period *ending* at the key. + The same ``"2026-03-09"`` key with ``WeekWindow(direction=Window.Direction.BACKWARD)`` + yields the seven days ending on that Monday (``2026-03-03`` through ``2026-03-09``). + +.. code-block:: python + + from airflow.sdk import FanOutMapper, PartitionedAssetTimetable, StartOfWeekMapper, WeekWindow, Window + + PartitionedAssetTimetable( + assets=weekly_model_artifact, + default_partition_mapper=FanOutMapper( + upstream_mapper=StartOfWeekMapper(), + window=WeekWindow(direction=Window.Direction.BACKWARD), + ), + ) + +Direction applies to rollup windows too — ``RollupMapper`` uses the same window +classes, so ``DayWindow(direction=Window.Direction.BACKWARD)`` holds a downstream +run until the twenty-four hours *preceding* midnight of the downstream key have +all arrived. + +Custom partition mappers, windows, and timetables in plugins +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Custom :class:`~airflow.partition_mappers.base.PartitionMapper`, +:class:`~airflow.partition_mappers.window.Window`, and partition-aware +:class:`Timetable ` classes can be shipped as +Airflow plugins by listing them in ``AirflowPlugin.partition_mappers``, +``.windows``, and ``.timetables`` respectively. Once a plugin is installed, +these classes become usable in :class:`~airflow.sdk.PartitionedAssetTimetable` +and :class:`~airflow.partition_mappers.base.RollupMapper` without modifying core +Airflow. + +**Custom partition mapper** — strip a namespace prefix so that upstream keys +like ``"eu::daily-sales"`` and ``"us::daily-sales"`` both collapse to the +downstream key ``"daily-sales"``: + +.. exampleinclude:: /../src/airflow/example_dags/plugins/custom_partition_mapper.py + :language: python + :start-after: [START custom_partition_mapper] + :end-before: [END custom_partition_mapper] + +**Custom rollup window** — yield only weekday period-starts from a calendar +month so a downstream asset waits only for business-day upstream partitions: + +.. exampleinclude:: /../src/airflow/example_dags/plugins/business_day_window.py + :language: python + :start-after: [START custom_window] + :end-before: [END custom_window] + +**Custom runtime-partitioned timetable** — a schedulable cron timetable that +defers the partition key to task runtime, so the producing task can check whether +the period's data exists before emitting a partition: + +.. exampleinclude:: /../src/airflow/example_dags/plugins/custom_partition_timetable.py + :language: python + :start-after: [START custom_partition_timetable] + :end-before: [END custom_partition_timetable] + +A producer Dag schedules on the timetable's cron cadence and decides the +partition key at runtime — emitting it only when the period's upstream data is +present. If the data has not arrived, the task simply does not call +``add_partitions`` (an empty or ``None`` key is rejected anyway), so no +partitioned event — and therefore no downstream ``PartitionedAssetTimetable`` +run — is produced for that period: + +.. code-block:: python + + from airflow.sdk import DAG, Asset, task + + # ScheduledRuntimePartitionTimetable is provided by the plugin registered above. + from my_plugin.plugins import ScheduledRuntimePartitionTimetable + + daily_export = Asset(uri="file://exports/daily.csv", name="daily_export") + + with DAG( + dag_id="export_when_ready", + schedule=ScheduledRuntimePartitionTimetable("0 6 * * *", timezone="UTC"), + catchup=False, + ): + + @task(outlets=[daily_export]) + def export(*, outlet_events): + # Fires every day at 06:00 UTC. The partition key is not fixed by the + # schedule — decide it at runtime from the data itself: find which + # day's source file has actually landed. + partition_key = latest_ready_day("s3://raw") # your own check; e.g. "2026-06-23" or None + if partition_key: + build_export(partition_key) + outlet_events[daily_export].add_partitions(partition_key) + # If nothing is ready, emit nothing: with no partition key recorded, + # no partitioned event is produced and no downstream + # PartitionedAssetTimetable run is triggered for this period. + + export() + For complete runnable examples, see ``airflow-core/src/airflow/example_dags/example_asset_partition.py``. diff --git a/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst b/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst index 25dac2e2e1043..55a80f9967fce 100644 --- a/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst +++ b/airflow-core/docs/authoring-and-scheduling/language-sdks/java.rst @@ -340,13 +340,15 @@ Add the artifact: implementation("org.apache.airflow:airflow-sdk-jul:${version}") -and call ``AirflowJulHandler.install()`` on startup to attach the handler to the -JUL root logger before any task runs: +and call ``AirflowJulHandler.setup()`` on startup, before any task runs. It clears the JUL root +logger's existing handlers (including the default ``ConsoleHandler``, whose stderr output Airflow +would otherwise capture as ``task.stderr`` at ERROR level, duplicating each record and mislabeling +its level) and installs ``AirflowJulHandler`` in their place: .. code-block:: java public static void main(String[] args) { - AirflowJulHandler.install(); + AirflowJulHandler.setup(); Server.create(args).serve(new MyBundle()); } @@ -669,6 +671,40 @@ All ``kwargs`` in the ``coordinators`` config entry are passed to the - Seconds to wait for the JVM subprocess to connect after launch. Increase this if your JVM startup is slow (e.g. on constrained hardware or with a large classpath). +.. note:: + + The ``[sdk]`` configuration is read at startup, so changes to ``coordinators`` or + ``queue_to_coordinator`` (for example adding ``jvm_args``) only take effect after you restart the + scheduler (or ``airflow standalone``). A rebuilt bundle JAR, by contrast, is picked up on the next + task launch without a restart, because a fresh JVM is spawned per task instance. + +.. _java-sdk/java-executable: + +Pinning the Java executable +--------------------------- + +As a general recommendation, set ``java_executable`` to an absolute path rather than relying on +``java`` resolving from ``$PATH``. This pins tasks to a known JDK, which matters most in production or +corporate environments where the Airflow admin may not control the system-wide ``java`` (the same +reasoning behind pinning a Python version). + +For example, if you install the JDK with Homebrew on macOS, its ``java`` is not on ``$PATH``, so +point ``java_executable`` at it explicitly: + +.. code-block:: ini + + [sdk] + coordinators = { + "java-jdk17": { + "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", + "kwargs": { + "jars_root": ["/opt/airflow/jars"], + "java_executable": "/opt/homebrew/opt/openjdk@17/bin/java" + } + } + } + queue_to_coordinator = {"java": "java-jdk17"} + .. _java-sdk/limitations: Limitations diff --git a/airflow-core/docs/core-concepts/asset-state-store.rst b/airflow-core/docs/core-concepts/asset-state-store.rst index 1f019d89d224b..3aa7ff7719f8a 100644 --- a/airflow-core/docs/core-concepts/asset-state-store.rst +++ b/airflow-core/docs/core-concepts/asset-state-store.rst @@ -150,6 +150,80 @@ Deletes *all* asset state store keys for the asset. # Using context context["asset_state_store"][my_asset].clear() +Using ``asset_state_store`` inside a Watcher Trigger +----------------------------------------------------- + +:class:`~airflow.triggers.base.BaseEventTrigger` subclasses (watcher triggers) can read and write asset state store directly from within ``run()``. The triggerer injects ``self.asset_state_store`` before ``run()`` is called, scoped to the asset the trigger is watching. It is not available during ``__init__`` or ``serialize()``, only access it from within ``run()``. + +Unlike task-based access (where the asset is identified by an inlet or outlet declaration), the accessor in a watcher trigger is automatically bound to the watched asset, so no subscripting is needed. + +.. code-block:: python + + import asyncio + from collections.abc import AsyncIterator + from typing import Any + + from airflow.triggers.base import BaseEventTrigger, TriggerEvent + + + class PollEventsTrigger(BaseEventTrigger): + def __init__(self, source: str, waiter_delay: int, **kwargs): + super().__init__(**kwargs) + self.source = source + self.waiter_delay = waiter_delay + + def serialize(self) -> tuple[str, dict[str, Any]]: + return ( + f"{self.__class__.__module__}.{self.__class__.__qualname__}", + {"source": self.source, "waiter_delay": self.waiter_delay}, + ) + + def _poll_for_new_record(self, source: str, last_seen: str) -> str | None: + ... # Add logic for polling a certain source + return None + + async def run(self) -> AsyncIterator[TriggerEvent]: + while True: + last_seen = self.asset_state_store.get("last_seen_id", default=0) + new_id = self._poll_for_new_record( + source=self.source, + last_seen=last_seen, + ) + + if new_id is not None: + self.asset_state_store.set("last_seen_id", new_id) + yield TriggerEvent({"status": "success", "record_id": new_id}) + return + + await asyncio.sleep(self.waiter_delay) + +The corresponding :class:`~airflow.sdk.definitions.asset.AssetWatcher` wires the trigger to the asset: + +.. code-block:: python + + from airflow.sdk import Asset, AssetWatcher + + from my_dag.triggers import PollEventsTrigger + + my_asset = Asset( + name="orders_api", + watchers=[ + AssetWatcher( + name="orders_api_watcher", + trigger=PollEventsTrigger(source="orders", waiter_delay=30), + ) + ], + ) + + ... + +``self.asset_state_store`` behaves identically to the per-asset accessor described in the task sections above: ``get``, ``set``, ``delete``, and ``clear`` are all available. Values written by the trigger are visible to any task that declares ``my_asset`` as an inlet or outlet, and vice versa. + +.. note:: + + ``self.asset_state_store`` is only available inside :class:`~airflow.triggers.base.BaseEventTrigger` subclasses. Plain :class:`~airflow.triggers.base.BaseTrigger` subclasses (used for task deferral) do not have access to asset state store. + + Some Example Use cases ---------------------- diff --git a/airflow-core/docs/howto/deadline-alerts.rst b/airflow-core/docs/howto/deadline-alerts.rst index e36908009a0f1..8e729516fefcb 100644 --- a/airflow-core/docs/howto/deadline-alerts.rst +++ b/airflow-core/docs/howto/deadline-alerts.rst @@ -110,14 +110,14 @@ Airflow provides several built-in reference points that you can use with Deadlin Specifies a fixed point in time. Useful when Dags must complete by a specific time. ``DeadlineReference.AVERAGE_RUNTIME`` - Calculates deadlines based on the average runtime of previous Dag runs. This reference + Calculates deadlines based on the average runtime of previous successful Dag runs. This reference analyzes historical execution data to predict when the current run should complete. The deadline is set to the current time plus the calculated average runtime plus the interval. If insufficient historical data exists, no deadline is created. Parameters: - * ``max_runs`` (int, optional): Maximum number of recent Dag runs to analyze. Defaults to 10. - * ``min_runs`` (int, optional): Minimum number of completed runs required to calculate average. Defaults to same value as ``max_runs``. + * ``max_runs`` (int, optional): Maximum number of successful recent Dag runs to analyze. Defaults to 10. + * ``min_runs`` (int, optional): Minimum number of successful recent Dag runs required to calculate average. Defaults to same value as ``max_runs``. Example usage: diff --git a/airflow-core/docs/tutorial/hitl.rst b/airflow-core/docs/tutorial/hitl.rst index 8e89b49d88de4..aea843486f404 100644 --- a/airflow-core/docs/tutorial/hitl.rst +++ b/airflow-core/docs/tutorial/hitl.rst @@ -196,6 +196,37 @@ When the operator creates an HITL request that is waiting for a human response, :end-before: [END howto_hitl_entry_operator] +Testing HITL Dags locally +------------------------- + +``airflow dags test`` (and the underlying ``dag.test()``) supports HITL tasks. A task that reaches +the ``awaiting_input`` state stays parked -- the test run never resolves it itself -- and the run +waits, logging which tasks await input, until a response is recorded from outside. The response +goes through the same channels as on a real deployment: the Required Actions page or the HITL REST +API (``PATCH .../hitlDetails``) of an api-server sharing the metadata database (for example +``airflow standalone``, or a separately started ``airflow api-server``). Once the response lands, +the test run resumes the task and continues with downstream tasks. + +This also lets AI agents drive a HITL pipeline end-to-end locally: run ``airflow dags test``, watch +for the waiting log line, ask the human, and submit their answer through the HITL REST API. The two +calls involved (``~`` works as a wildcard for ``dag_id`` and ``dag_run_id``): + +.. code-block:: text + + # Discover pending requests (subject, options, params, run/task identifiers) + GET /api/v2/dags/~/dagRuns/~/hitlDetails?response_received=false + + # Submit the response; the test run resumes the task on its next poll. + # map_index is -1 for non-mapped tasks. + PATCH /api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/taskInstances/{task_id}/{map_index}/hitlDetails + {"chosen_options": ["Approve"], "params_input": {}} + +.. note:: + + ``response_timeout`` and timeout defaults are enforced by the scheduler, which does not run + under ``airflow dags test``. A parked task therefore waits indefinitely for a response; supply + one through the UI or REST API to let the run finish. + Benefits and Common Use Cases ----------------------------- diff --git a/airflow-core/src/airflow/api_fastapi/common/parameters.py b/airflow-core/src/airflow/api_fastapi/common/parameters.py index 17185fe20e251..935b547c1a7cf 100644 --- a/airflow-core/src/airflow/api_fastapi/common/parameters.py +++ b/airflow-core/src/airflow/api_fastapi/common/parameters.py @@ -718,7 +718,7 @@ def dynamic_depends(self, default: str | Sequence[str] | None = None) -> Callabl ) def inner(order_by: list[str] = _order_by_query) -> SortParam: - return self.set_value(order_by) + return SortParam(self.allowed_attrs, self.model, self.to_replace).set_value(order_by) return inner diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/app.py b/airflow-core/src/airflow/api_fastapi/execution_api/app.py index 449019db7998d..5a87cdb81e04c 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/app.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/app.py @@ -99,8 +99,11 @@ async def lifespan(app: FastAPI, registry: svcs.Registry): app.state.svcs_registry = registry registry.register_factory(JWTGenerator, _jwt_generator) - # Create an app scoped validator, so that we don't have to fetch it every time - registry.register_value(JWTValidator, _jwt_validator(), ping=JWTValidator.status) + + # InProcessExecutionAPI stubs out JWTValidator: don't re-register in that case. + if JWTValidator not in registry: + # Create an app scoped validator, so that we don't have to fetch it every time + registry.register_value(JWTValidator, _jwt_validator(), ping=JWTValidator.status) yield @@ -282,7 +285,7 @@ def _inject_trace_context_dep(routes, mode: str) -> None: route.dependencies.append(dep) -def create_task_execution_api_app() -> FastAPI: +def create_task_execution_api_app(lifespan: svcs.fastapi.lifespan = lifespan) -> FastAPI: """Create FastAPI app for task execution API.""" from airflow.api_fastapi.execution_api.routes import execution_api_router from airflow.api_fastapi.execution_api.versions import bundle @@ -388,7 +391,15 @@ def app(self): from airflow.api_fastapi.execution_api.routes.xcoms import has_xcom_access from airflow.api_fastapi.execution_api.security import _jwt_bearer - self._app = create_task_execution_api_app() + # Give this app its own lifespan + services registry so that stubbing services + # (e.g. JWTValidator) doesn't affect the module-level ``lifespan.registry``. + registry = svcs.Registry() + private_lifespan = attrs.evolve(lifespan, registry=registry) + self._app = create_task_execution_api_app(lifespan=private_lifespan) + + # In-process callers don't need a real JWTValidator: auth is bypassed below via + # ``dependency_overrides``. + registry.register_value(JWTValidator, None) # Set up dag_bag in app state for dependency injection self._app.state.dag_bag = create_dag_bag() diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py index bf20bcbc30c55..ad051b3e6d340 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/datamodels/taskinstance.py @@ -317,6 +317,7 @@ class AssetEventDagRunReference(StrictBaseModel): source_map_index: int | None source_aliases: list[AssetAliasReferenceAssetEventDagRun] timestamp: UtcDateTime + partition_key: str | None = None class DagRun(StrictBaseModel): diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state_store.py b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state_store.py index f602962528e7f..4424e4ad3f180 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state_store.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/routes/asset_state_store.py @@ -30,6 +30,7 @@ import json from typing import Annotated +from uuid import UUID from cadwyn import VersionedAPIRouter from fastapi import HTTPException, Query, status @@ -49,6 +50,7 @@ from airflow.state.metastore import MetastoreBackend _TIWriterFields = tuple[str, str, str, int] +NULL_UUID = UUID(int=0) def _fetch_ti_writer_fields(token: TIToken, session: SessionDep) -> _TIWriterFields: @@ -130,18 +132,30 @@ def _put_asset_state_store( ) -> None: backend = get_state_backend() if isinstance(backend, MetastoreBackend): - dag_id, run_id, task_id, map_index = _fetch_ti_writer_fields(token, session) - backend.set_asset_state_store( - scope, - key, - json.dumps(body.value), - kind=AssetStateStoreWriterKind.TASK, - dag_id=dag_id, - run_id=run_id, - task_id=task_id, - map_index=map_index, - session=session, - ) + if token.id == NULL_UUID: + # Since the asset state store routes do not have `task_instance_id` in their path params, the default kicks in which is"00000000-0000-0000-0000-000000000000" + backend.set_asset_state_store( + scope, + key, + json.dumps(body.value), + kind=AssetStateStoreWriterKind.WATCHER, + session=session, + ) + else: + ti_fields = _fetch_ti_writer_fields(token, session) + dag_id, run_id, task_id, map_index = ti_fields + + backend.set_asset_state_store( + scope, + key, + json.dumps(body.value), + kind=AssetStateStoreWriterKind.TASK, + dag_id=dag_id, + run_id=run_id, + task_id=task_id, + map_index=map_index, + session=session, + ) else: backend.set(scope, key, json.dumps(body.value), session=session) diff --git a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py index 5a3b0f0f5fc2d..e3b995011f4cf 100644 --- a/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py +++ b/airflow-core/src/airflow/api_fastapi/execution_api/versions/v2026_04_06.py @@ -29,6 +29,7 @@ ) from airflow.api_fastapi.execution_api.datamodels.dagrun import TriggerDAGRunPayload from airflow.api_fastapi.execution_api.datamodels.taskinstance import ( + AssetEventDagRunReference, DagRun, TIDeferredStatePayload, TIRunContext, @@ -43,6 +44,7 @@ class AddPartitionKeyField(VersionChange): instructions_to_migrate_to_previous_version = ( schema(DagRun).field("partition_key").didnt_exist, schema(AssetEventResponse).field("partition_key").didnt_exist, + schema(AssetEventDagRunReference).field("partition_key").didnt_exist, schema(TriggerDAGRunPayload).field("partition_key").didnt_exist, schema(DagRunAssetReference).field("partition_key").didnt_exist, ) @@ -50,8 +52,12 @@ class AddPartitionKeyField(VersionChange): @convert_response_to_previous_version_for(TIRunContext) # type: ignore[arg-type] def remove_partition_key_from_dag_run(response: ResponseInfo) -> None: # type: ignore[misc] """Remove the `partition_key` field from the dag_run object when converting to the previous version.""" - if "dag_run" in response.body and isinstance(response.body["dag_run"], dict): - response.body["dag_run"].pop("partition_key", None) + dag_run = response.body.get("dag_run") + if isinstance(dag_run, dict): + dag_run.pop("partition_key", None) + for event in dag_run.get("consumed_asset_events") or (): + if isinstance(event, dict): + event.pop("partition_key", None) @convert_response_to_previous_version_for(AssetEventsResponse) # type: ignore[arg-type] def remove_partition_key_from_asset_events(response: ResponseInfo) -> None: # type: ignore[misc] diff --git a/airflow-core/src/airflow/config_templates/config.yml b/airflow-core/src/airflow/config_templates/config.yml index 29763f422feb7..975e6ba11213b 100644 --- a/airflow-core/src/airflow/config_templates/config.yml +++ b/airflow-core/src/airflow/config_templates/config.yml @@ -2091,6 +2091,7 @@ sdk: "jdk-17": { "classpath": "airflow.sdk.coordinators.java.JavaCoordinator", "kwargs": { + "jars_root": ["/opt/airflow/java-bundles"], "java_executable": "/usr/lib/jvm/java-17-openjdk/bin/java", "jvm_args": ["-Xmx1024m"] }, @@ -2099,6 +2100,12 @@ sdk: "worker_container_repository": "apache/airflow", "worker_container_tag": "3.3.0" } + }, + "go-sdk": { + "classpath": "airflow.sdk.coordinators.executable.ExecutableCoordinator", + "kwargs": { + "executables_root": ["/opt/airflow/executable-bundles"] + } } } default: ~ @@ -3274,11 +3281,11 @@ state_store: backend: description: | Full dotted path to the class that implements state storage for tasks and assets. - The class must be a subclass of ``BaseStateBackend``. + The class must be a subclass of ``BaseStoreBackend``. The default implementation persists state in the Airflow metadata database. version_added: 3.3.0 type: string - example: "mypackage.state.CustomStateBackend" + example: "mypackage.state.CustomStoreBackend" default: "airflow.state.metastore.MetastoreBackend" clear_on_success: description: | diff --git a/airflow-core/src/airflow/example_dags/example_asset_partition.py b/airflow-core/src/airflow/example_dags/example_asset_partition.py index a7929ed92581f..e7151680404ed 100644 --- a/airflow-core/src/airflow/example_dags/example_asset_partition.py +++ b/airflow-core/src/airflow/example_dags/example_asset_partition.py @@ -30,8 +30,8 @@ IdentityMapper, MinimumCount, MonthWindow, - PartitionAtRuntime, PartitionedAssetTimetable, + PartitionedAtRuntime, ProductMapper, RollupMapper, SegmentWindow, @@ -44,6 +44,7 @@ WeekWindow, Window, asset, + get_current_context, task, ) @@ -160,7 +161,7 @@ def check_partition_alignment(): with DAG( dag_id="ingest_regional_sales", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), tags=["example", "sales", "ingestion"], ): """Produce regional sales data with composite ``region|timestamp`` partition keys at runtime.""" @@ -207,7 +208,7 @@ def aggregate_sales(dag_run=None): with DAG( dag_id="ingest_region_stats", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), tags=["example", "player-stats", "regional"], ): """ @@ -217,9 +218,14 @@ def aggregate_sales(dag_run=None): """ @task(outlets=[region_raw_stats]) - def ingest_region(): + def ingest_region(dag_run=None): """Materialize player statistics for a single region partition.""" - pass + context = get_current_context() + if TYPE_CHECKING: + assert dag_run + print( + f"dag_run partition key {dag_run.partition_key} context partition key {context['partition_key']}" + ) ingest_region() @@ -244,14 +250,14 @@ def regional_stats_breakdown(): @asset( uri="file://incoming/player-stats/live-region.csv", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), tags=["player-stats", "runtime"], ) def live_region_player_stats(self, outlet_events): """ Produce a single region partition whose key is decided at runtime. - This asset demonstrates PartitionAtRuntime, which records the partition key on the + This asset demonstrates PartitionedAtRuntime, which records the partition key on the emitted event with ``add_partitions`` while the task runs rather than from a timetable. """ outlet_events[self].add_partitions("us") @@ -281,7 +287,7 @@ def summarize_live_region(dag_run=None): @asset( uri="file://incoming/player-stats/multi-region.csv", - schedule=PartitionAtRuntime(), + schedule=PartitionedAtRuntime(), tags=["player-stats", "runtime"], ) def multi_region_player_stats(self, outlet_events): diff --git a/airflow-core/src/airflow/example_dags/example_skip_dag.py b/airflow-core/src/airflow/example_dags/example_skip_dag.py index a2fe914c0356d..f5eed85f2de17 100644 --- a/airflow-core/src/airflow/example_dags/example_skip_dag.py +++ b/airflow-core/src/airflow/example_dags/example_skip_dag.py @@ -48,7 +48,6 @@ def create_test_pipeline(suffix, trigger_rule): :param str suffix: Suffix to append to the operator task_ids :param str trigger_rule: TriggerRule for the join task - :param DAG dag_: The DAG to run the operators on """ skip_operator = EmptySkipOperator(task_id=f"skip_operator_{suffix}") always_true = EmptyOperator(task_id=f"always_true_{suffix}") diff --git a/airflow-core/src/airflow/example_dags/plugins/custom_partition_mapper.py b/airflow-core/src/airflow/example_dags/plugins/custom_partition_mapper.py new file mode 100644 index 0000000000000..ab6ff673044d7 --- /dev/null +++ b/airflow-core/src/airflow/example_dags/plugins/custom_partition_mapper.py @@ -0,0 +1,96 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from typing import TYPE_CHECKING + +from airflow.partition_mappers.base import PartitionMapper +from airflow.plugins_manager import AirflowPlugin + +if TYPE_CHECKING: + from typing import Any + + +# [START custom_partition_mapper] +class PrefixStripMapper(PartitionMapper): + """ + A partition mapper that strips a fixed namespace prefix from upstream keys. + + Upstream systems often qualify partition keys with a region or environment + prefix — for example ``"eu::daily-sales"`` or ``"us::daily-sales"``. A + downstream asset that aggregates across regions only cares about the base key + (``"daily-sales"``). ``PrefixStripMapper`` strips the given prefix (including + a configurable separator) so that all upstream namespaces collapse to the + same downstream partition key. + + If the upstream key does not start with the configured prefix the key is + returned unchanged, which is deliberate: keys that already live in the target + namespace pass through without modification. + + This class demonstrates registering a custom :class:`PartitionMapper + ` subclass via the + ``AirflowPlugin.partition_mappers`` registry. Any plugin that lists it in + ``partition_mappers = [...]`` makes it available to + :class:`~airflow.sdk.PartitionedAssetTimetable` and + :class:`~airflow.partition_mappers.base.RollupMapper` without modifying core + Airflow. + + :param prefix: The namespace prefix to strip, e.g. ``"eu"``. + :param separator: The string that separates the prefix from the base key. + Defaults to ``"::"`` to match a common ``"region::key"`` convention. + """ + + def __init__( + self, + prefix: str, + *, + separator: str = "::", + max_downstream_keys: int | None = None, + ) -> None: + super().__init__(max_downstream_keys=max_downstream_keys) + if not prefix: + raise ValueError("prefix must be a non-empty string.") + self.prefix = prefix + self.separator = separator + + def to_downstream(self, key: str) -> str: + full_prefix = self.prefix + self.separator + if key.startswith(full_prefix): + return key[len(full_prefix) :] + return key + + def serialize(self) -> dict[str, Any]: + data: dict[str, Any] = {"prefix": self.prefix, "separator": self.separator} + if self.max_downstream_keys is not None: + data["max_downstream_keys"] = self.max_downstream_keys + return data + + @classmethod + def deserialize(cls, data: dict[str, Any]) -> PartitionMapper: + return cls( + prefix=data["prefix"], + separator=data.get("separator", "::"), + max_downstream_keys=data.get("max_downstream_keys"), + ) + + +class PrefixStripMapperPlugin(AirflowPlugin): + name = "prefix_strip_mapper_plugin" + partition_mappers = [PrefixStripMapper] + + +# [END custom_partition_mapper] diff --git a/airflow-core/src/airflow/example_dags/plugins/custom_partition_timetable.py b/airflow-core/src/airflow/example_dags/plugins/custom_partition_timetable.py new file mode 100644 index 0000000000000..f74bc99b62ebc --- /dev/null +++ b/airflow-core/src/airflow/example_dags/plugins/custom_partition_timetable.py @@ -0,0 +1,55 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +from airflow.plugins_manager import AirflowPlugin +from airflow.timetables.trigger import CronTriggerTimetable + + +# [START custom_partition_timetable] +class ScheduledRuntimePartitionTimetable(CronTriggerTimetable): + """ + A schedulable timetable whose partition key is decided at task runtime. + + Runs fire on the given cron cadence, exactly like an ordinary + :class:`~airflow.timetables.trigger.CronTriggerTimetable`. The partition key, + however, is not derived from the schedule: it is set while the producing task + runs — typically after the task checks whether the period's source data has + arrived — by calling ``outlet_events[self].add_partitions(...)``. + + This uses runtime partitioning on a regular cron schedule: the timetable stays + schedulable (``can_be_scheduled`` is ``True``) yet sets + ``partitioned_at_runtime = True`` so the partition key is deferred to task + runtime. It differs from :class:`~airflow.sdk.PartitionedAtRuntime` (which also + defers the key to runtime but never schedules a run on its own) and from + :class:`~airflow.timetables.trigger.CronPartitionTimetable` (which works out the + partition key from the cadence ahead of the run). ``partitioned`` stays + ``False``: no partition key is worked out ahead of the run. + + Registering it via the ``AirflowPlugin.timetables`` registry makes it usable + by Dag authors without modifying core Airflow. + """ + + partitioned_at_runtime = True + + +class ScheduledRuntimePartitionTimetablePlugin(AirflowPlugin): + name = "scheduled_runtime_partition_timetable_plugin" + timetables = [ScheduledRuntimePartitionTimetable] + + +# [END custom_partition_timetable] diff --git a/airflow-core/src/airflow/jobs/triggerer_job_runner.py b/airflow-core/src/airflow/jobs/triggerer_job_runner.py index b29978e52a943..eccd9d2fc048d 100644 --- a/airflow-core/src/airflow/jobs/triggerer_job_runner.py +++ b/airflow-core/src/airflow/jobs/triggerer_job_runner.py @@ -60,13 +60,20 @@ from airflow.sdk.api.datamodels._generated import HITLDetailResponse from airflow.sdk.definitions.asset import Asset from airflow.sdk.execution_time.comms import ( + AssetStateStoreResult, + ClearAssetStateStoreByName, + ClearAssetStateStoreByUri, CommsDecoder, ConnectionResult, DagRunStateResult, + DeleteAssetStateStoreByName, + DeleteAssetStateStoreByUri, DeleteVariable, DeleteXCom, DRCount, ErrorResponse, + GetAssetStateStoreByName, + GetAssetStateStoreByUri, GetConnection, GetDagRunState, GetDRCount, @@ -80,6 +87,8 @@ MaskSecret, OKResponse, PutVariable, + SetAssetStateStoreByName, + SetAssetStateStoreByUri, SetXCom, TaskStatesResult, TICount, @@ -92,8 +101,14 @@ ) from airflow.sdk.execution_time.context import AssetStateStoreAccessors from airflow.sdk.execution_time.request_handlers import ( + handle_clear_asset_state_store_by_name, + handle_clear_asset_state_store_by_uri, + handle_delete_asset_state_store_by_name, + handle_delete_asset_state_store_by_uri, handle_delete_variable, handle_delete_xcom, + handle_get_asset_state_store_by_name, + handle_get_asset_state_store_by_uri, handle_get_connection, handle_get_dag_run_state, handle_get_dr_count, @@ -105,6 +120,8 @@ handle_get_xcom, handle_mask_secret, handle_put_variable, + handle_set_asset_state_store_by_name, + handle_set_asset_state_store_by_uri, handle_set_xcom, ) from airflow.sdk.execution_time.supervisor import WatchedSubprocess, make_buffered_socket_reader @@ -353,6 +370,7 @@ def from_api_response(cls, response: HITLDetailResponse) -> HITLDetailResponseRe | DRCount | TICount | TaskStatesResult + | AssetStateStoreResult | HITLDetailResponseResult | ErrorResponse | OKResponse, @@ -376,6 +394,14 @@ def from_api_response(cls, response: HITLDetailResponse) -> HITLDetailResponseRe | SetXCom | GetTICount | GetTaskStates + | ClearAssetStateStoreByName + | ClearAssetStateStoreByUri + | DeleteAssetStateStoreByName + | DeleteAssetStateStoreByUri + | GetAssetStateStoreByName + | GetAssetStateStoreByUri + | SetAssetStateStoreByName + | SetAssetStateStoreByUri | GetDagRunState | GetDRCount | GetPreviousTI @@ -622,6 +648,28 @@ def _handle_request(self, msg: ToTriggerSupervisor, log: FilteringBoundLogger, r resp = HITLDetailResponseResult.from_api_response(response=api_resp) elif isinstance(msg, MaskSecret): handle_mask_secret(msg) + elif isinstance(msg, ClearAssetStateStoreByName): + handle_clear_asset_state_store_by_name(self.client, msg) + resp = OKResponse(ok=True) + elif isinstance(msg, ClearAssetStateStoreByUri): + handle_clear_asset_state_store_by_uri(self.client, msg) + resp = OKResponse(ok=True) + elif isinstance(msg, DeleteAssetStateStoreByName): + handle_delete_asset_state_store_by_name(self.client, msg) + resp = OKResponse(ok=True) + elif isinstance(msg, DeleteAssetStateStoreByUri): + handle_delete_asset_state_store_by_uri(self.client, msg) + resp = OKResponse(ok=True) + elif isinstance(msg, GetAssetStateStoreByName): + resp, dump_opts = handle_get_asset_state_store_by_name(self.client, msg) + elif isinstance(msg, GetAssetStateStoreByUri): + resp, dump_opts = handle_get_asset_state_store_by_uri(self.client, msg) + elif isinstance(msg, SetAssetStateStoreByName): + handle_set_asset_state_store_by_name(self.client, msg) + resp = OKResponse(ok=True) + elif isinstance(msg, SetAssetStateStoreByUri): + handle_set_asset_state_store_by_uri(self.client, msg) + resp = OKResponse(ok=True) else: raise ValueError(f"Unknown message type {type(msg)}") diff --git a/airflow-core/src/airflow/serialization/definitions/deadline.py b/airflow-core/src/airflow/serialization/definitions/deadline.py index 89e231cba24dc..20fac2b54e874 100644 --- a/airflow-core/src/airflow/serialization/definitions/deadline.py +++ b/airflow-core/src/airflow/serialization/definitions/deadline.py @@ -207,6 +207,7 @@ def _evaluate_with(self, *, session: Session, **kwargs: Any) -> datetime | None: from sqlalchemy import func, select, text from airflow.models import DagRun + from airflow.utils.state import DagRunState dag_id = kwargs["dag_id"] @@ -222,9 +223,17 @@ def _evaluate_with(self, *, session: Session, **kwargs: Any) -> datetime | None: else: raise ValueError(f"Unsupported database dialect: {dialect}") + # Only SUCCESSFUL runs represent a "normal" runtime. A run that failed fast or hung + # before failing would otherwise skew the average and produce a misleading deadline + # (too short -> spurious misses, or too long -> real slowness never trips it). query = ( select(duration_expr) - .filter(DagRun.dag_id == dag_id, DagRun.start_date.isnot(None), DagRun.end_date.isnot(None)) + .filter( + DagRun.dag_id == dag_id, + DagRun.state == DagRunState.SUCCESS, + DagRun.start_date.isnot(None), + DagRun.end_date.isnot(None), + ) .order_by(DagRun.logical_date.desc()) .limit(self.max_runs) ) diff --git a/airflow-core/src/airflow/serialization/encoders.py b/airflow-core/src/airflow/serialization/encoders.py index b31b7c828c68c..8ab533b888ae5 100644 --- a/airflow-core/src/airflow/serialization/encoders.py +++ b/airflow-core/src/airflow/serialization/encoders.py @@ -70,8 +70,8 @@ from airflow.sdk.definitions.asset import AssetRef from airflow.sdk.definitions.timetables.assets import ( AssetTriggeredTimetable, - PartitionAtRuntime, PartitionedAssetTimetable, + PartitionedAtRuntime, ) from airflow.sdk.definitions.timetables.simple import ContinuousTimetable, NullTimetable, OnceTimetable from airflow.sdk.definitions.timetables.trigger import CronPartitionTimetable @@ -329,7 +329,7 @@ class _Serializer: MultipleCronTriggerTimetable: "airflow.timetables.trigger.MultipleCronTriggerTimetable", NullTimetable: "airflow.timetables.simple.NullTimetable", OnceTimetable: "airflow.timetables.simple.OnceTimetable", - PartitionAtRuntime: "airflow.timetables.simple.PartitionAtRuntime", + PartitionedAtRuntime: "airflow.timetables.simple.PartitionedAtRuntime", PartitionedAssetTimetable: "airflow.timetables.simple.PartitionedAssetTimetable", } @@ -355,9 +355,9 @@ def serialize_timetable(self, timetable: BaseTimetable | CoreTimetable) -> dict[ @serialize_timetable.register(ContinuousTimetable) @serialize_timetable.register(NullTimetable) @serialize_timetable.register(OnceTimetable) - @serialize_timetable.register(PartitionAtRuntime) + @serialize_timetable.register(PartitionedAtRuntime) def _( - self, timetable: ContinuousTimetable | NullTimetable | OnceTimetable | PartitionAtRuntime + self, timetable: ContinuousTimetable | NullTimetable | OnceTimetable | PartitionedAtRuntime ) -> dict[str, Any]: return {} diff --git a/airflow-core/src/airflow/timetables/base.py b/airflow-core/src/airflow/timetables/base.py index 4bee4ab38a368..84d8aa2e46fee 100644 --- a/airflow-core/src/airflow/timetables/base.py +++ b/airflow-core/src/airflow/timetables/base.py @@ -240,7 +240,7 @@ class Timetable(Protocol): partitioned_at_runtime: bool = False """Whether this timetable defers partition selection to task runtime. - *True* for :class:`~airflow.timetables.simple.PartitionAtRuntime`; + *True* for :class:`~airflow.timetables.simple.PartitionedAtRuntime`; downstream code can branch on this flag instead of using ``isinstance``. """ diff --git a/airflow-core/src/airflow/timetables/simple.py b/airflow-core/src/airflow/timetables/simple.py index a8b499dbe50ba..87741a8eb0d7e 100644 --- a/airflow-core/src/airflow/timetables/simple.py +++ b/airflow-core/src/airflow/timetables/simple.py @@ -186,11 +186,11 @@ def next_dagrun_info( return DagRunInfo.interval(start, end) -class PartitionAtRuntime(NullTimetable): +class PartitionedAtRuntime(NullTimetable): """ Timetable that never schedules anything; partition keys are set at runtime. - This corresponds to ``schedule=PartitionAtRuntime()``. + This corresponds to ``schedule=PartitionedAtRuntime()``. A run's ``partition_key`` (run-level provenance) must be supplied at trigger time — for example via the REST API's ``partition_key`` field. Partition keys @@ -204,7 +204,7 @@ class PartitionAtRuntime(NullTimetable): @property def summary(self) -> str: - return "PartitionAtRuntime" + return "PartitionedAtRuntime" class AssetTriggeredTimetable(_TrivialTimetable): diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json index 3aaa5115bb2f5..7b5f37c9cfd3b 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/en/common.json @@ -72,6 +72,7 @@ "expectedDuration": "Expected Duration", "lastSchedulingDecision": "Last Scheduling Decision", "mappedPartitionKey": "Mapped Partition key", + "partitionDate": "Partition Date", "partitionKey": "Partition key", "queuedAt": "Queued At", "runAfter": "Run After", diff --git a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json index 90debb241c24b..a98d200635ce3 100644 --- a/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json +++ b/airflow-core/src/airflow/ui/public/i18n/locales/zh-TW/common.json @@ -72,6 +72,7 @@ "expectedDuration": "預計時長", "lastSchedulingDecision": "最後排程決策", "mappedPartitionKey": "映射分區鍵", + "partitionDate": "資產分區日期", "partitionKey": "資產分區鍵", "queuedAt": "開始排隊時間", "runAfter": "最早可執行時間", diff --git a/airflow-core/src/airflow/ui/src/pages/Run/Details.tsx b/airflow-core/src/airflow/ui/src/pages/Run/Details.tsx index e233c528b54cc..e7ec9fa836437 100644 --- a/airflow-core/src/airflow/ui/src/pages/Run/Details.tsx +++ b/airflow-core/src/airflow/ui/src/pages/Run/Details.tsx @@ -145,6 +145,14 @@ export const Details = () => {