Fix task state store Execution API rejecting keys that contain slashes#69178
Fix task state store Execution API rejecting keys that contain slashes#69178takayoshi-makabe wants to merge 5 commits into
Conversation
SameerMesiah97
left a comment
There was a problem hiding this comment.
Looks good but CI needs to be triggered.
ashb
left a comment
There was a problem hiding this comment.
Should we accept path, or should we make the client url encode the key it sends? (Or both?)
|
@ashb Client-side encoding could also be a good idea for added safety, but I think that can be treated as a separate concern. |
|
@ashb I think both is a good idea. @takayoshi-makabe Worth including in this PR since it's the same root cause. |
amoghrajesh
left a comment
There was a problem hiding this comment.
@takayoshi-makabe changes look fine, can you handle client side encoding too? I will approve once thats done
| assert "Task instance" in response.json()["detail"]["message"] | ||
|
|
||
| def test_get_key_with_slash(self, client: TestClient, create_task_instance: CreateTaskInstance): | ||
| """Keys containing slashes must be routed correctly — route uses {key:path}.""" |
There was a problem hiding this comment.
| """Keys containing slashes must be routed correctly — route uses {key:path}.""" |
| assert response.status_code == 404 | ||
|
|
||
| def test_put_key_with_slash(self, client: TestClient, create_task_instance: CreateTaskInstance): | ||
| """Keys containing slashes must be stored and retrieved correctly — route uses {key:path}.""" |
There was a problem hiding this comment.
| """Keys containing slashes must be stored and retrieved correctly — route uses {key:path}.""" |
| assert client.get(_api_url(ti.id, "checkpoint")).json() == {"value": "b"} | ||
|
|
||
| def test_delete_key_with_slash(self, client: TestClient, create_task_instance: CreateTaskInstance): | ||
| """Keys containing slashes must be deletable — route uses {key:path}.""" |
There was a problem hiding this comment.
| """Keys containing slashes must be deletable — route uses {key:path}.""" |
|
I added the client-side processing based on the code below.
commit: 2efa57a |
amoghrajesh
left a comment
There was a problem hiding this comment.
Please update the tests as suggested.
| def test_get_url_quotes_key_as_single_path_segment(self): | ||
| """Test that task state store keys cannot escape the store API path.""" | ||
| requests_seen = [] | ||
| crafted_key = "../../../variables/secret_key" | ||
|
|
||
| def handle_request(request: httpx.Request) -> httpx.Response: | ||
| requests_seen.append(request) | ||
| if request.url.path == "/variables/secret_key": | ||
| return httpx.Response( | ||
| status_code=200, | ||
| json={"key": "secret_key", "value": "super-secret-value"}, | ||
| ) | ||
| return httpx.Response( | ||
| status_code=404, | ||
| json={"detail": {"reason": "not_found", "message": "Task state key not found"}}, | ||
| ) | ||
|
|
||
| client = make_client(transport=httpx.MockTransport(handle_request)) | ||
| result = client.task_state_store.get(ti_id=self.TI_ID, key=crafted_key) | ||
|
|
||
| assert ( | ||
| requests_seen[0].url.raw_path | ||
| == f"/store/ti/{self.TI_ID}/..%2F..%2F..%2Fvariables%2Fsecret_key".encode() | ||
| ) | ||
| assert requests_seen[0].url.path != "/variables/secret_key" | ||
| assert isinstance(result, ErrorResponse) | ||
| assert "super-secret-value" not in str(result) |
There was a problem hiding this comment.
Doesn't have to be so complex, can simply be similar to:
def test_get_url_encodes_key_with_slash(self):
requests_seen = []
def handle_request(request: httpx.Request) -> httpx.Response:
requests_seen.append(request)
return httpx.Response(
status_code=200,
json={"value": "spark_app_001"},
)
client = make_client(transport=httpx.MockTransport(handle_request))
client.task_state_store.get(ti_id=self.TI_ID, key="spark/job_id")
assert b"%2F" in requests_seen[0].url.raw_path
assert requests_seen[0].url.raw_path == f"/store/ti/{self.TI_ID}/spark%2Fjob_id".encode()There was a problem hiding this comment.
Same for the other tests.
|
@amoghrajesh |
Keys containing
/(e.g."spark/job_id") silently fail with 404 when accessed through the Execution API used by running tasks. The routes were defined with{key}instead of{key:path}, so FastAPI treated the slash as a path separator and could not match the route.The Public API (used by UI/external clients), defined in
airflow-core/src/airflow/api_fastapi/core_api/routes/public/task_state_store.py, already uses/{key:path}and is correct. This PR aligns the Execution API (airflow-core/src/airflow/api_fastapi/execution_api/routes/task_state_store.py) with that existing pattern.A task managing multiple external jobs may naturally use slash-namespaced keys such as
"spark/job_id"or"bigquery/status"to group related state — these silently returned 404 before this fix.Was generative AI tooling used to co-author this PR?
Claude Code
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.