From 9a3efe8aec4b3d15d59f6eb4c378331417be19b3 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 10:48:45 +0900 Subject: [PATCH 01/20] Retention API: scaffold PriorityEvictionQueue with sidecar storage Introduce a new self-contained module for priority-based KV-cache eviction. Per-block retention metadata is stored in a side-table keyed by block_id, keeping KVCacheBlock untouched. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 39 +++++++++++++++++++++++++ vllm/v1/core/priority_eviction_queue.py | 33 +++++++++++++++++++++ 2 files changed, 72 insertions(+) create mode 100644 tests/v1/core/test_priority_eviction.py create mode 100644 vllm/v1/core/priority_eviction_queue.py diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py new file mode 100644 index 000000000000..052adb47805c --- /dev/null +++ b/tests/v1/core/test_priority_eviction.py @@ -0,0 +1,39 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +from vllm.v1.core.kv_cache_utils import KVCacheBlock +from vllm.v1.core.priority_eviction_queue import ( + PriorityEvictionQueue, + RetentionMeta, +) + + +def _make_block(block_id: int) -> KVCacheBlock: + return KVCacheBlock(block_id=block_id) + + +def _set_meta( + queue: PriorityEvictionQueue, + block: KVCacheBlock, + *, + priority: int, + expiry: float | None = None, + scope: str | None = None, + last_freed: float = 0.0, +) -> None: + """Test helper: install a sidecar entry directly without going through + apply_directives. Reaches into the queue's private dict — acceptable + in tests of the queue itself.""" + queue._meta[block.block_id] = RetentionMeta( + priority=priority, + expiry=expiry, + scope=scope, + last_freed_time=last_freed, + ) + + +class TestPriorityEvictionQueue: + def test_empty_queue(self): + queue = PriorityEvictionQueue() + assert queue.num_blocks == 0 + assert queue.pop_lowest() is None diff --git a/vllm/v1/core/priority_eviction_queue.py b/vllm/v1/core/priority_eviction_queue.py new file mode 100644 index 000000000000..ee7f6b5b5f25 --- /dev/null +++ b/vllm/v1/core/priority_eviction_queue.py @@ -0,0 +1,33 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Priority-based eviction queue with sidecar storage of per-block +retention metadata.""" + +from dataclasses import dataclass + +from vllm.v1.core.kv_cache_utils import KVCacheBlock + + +@dataclass(slots=True) +class RetentionMeta: + priority: int + expiry: float | None + scope: str | None + last_freed_time: float + + +class PriorityEvictionQueue: + def __init__(self) -> None: + self._meta: dict[int, RetentionMeta] = {} + self._heap: list[tuple[int, float, int, KVCacheBlock]] = [] + self._in_queue: set[int] = set() + + @property + def num_blocks(self) -> int: + return len(self._in_queue) + + def pop_lowest(self) -> KVCacheBlock | None: + if not self._in_queue: + return None + # Implementation deferred to Task 3 + raise NotImplementedError From 11b48364ecadd19f9be0b8b91dbbf22ae342d1dc Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 10:54:31 +0900 Subject: [PATCH 02/20] Retention API: implement try_insert and pop_lowest with min-heap try_insert returns False for blocks without sidecar entries, enabling single-line routing at the caller. pop_lowest consumes the sidecar entry on eviction. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 17 ++++++++++++++ vllm/v1/core/priority_eviction_queue.py | 30 +++++++++++++++++++++---- 2 files changed, 43 insertions(+), 4 deletions(-) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index 052adb47805c..c4f7943842ea 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -37,3 +37,20 @@ def test_empty_queue(self): queue = PriorityEvictionQueue() assert queue.num_blocks == 0 assert queue.pop_lowest() is None + + def test_insert_and_pop_single(self): + queue = PriorityEvictionQueue() + block = _make_block(1) + _set_meta(queue, block, priority=50) + assert queue.try_insert(block) is True + assert queue.num_blocks == 1 + popped = queue.pop_lowest() + assert popped is block + assert queue.num_blocks == 0 + + def test_try_insert_returns_false_for_unprioritized_block(self): + queue = PriorityEvictionQueue() + block = _make_block(1) + # No _set_meta call — sidecar entry absent. + assert queue.try_insert(block) is False + assert queue.num_blocks == 0 diff --git a/vllm/v1/core/priority_eviction_queue.py b/vllm/v1/core/priority_eviction_queue.py index ee7f6b5b5f25..7d80c718aa73 100644 --- a/vllm/v1/core/priority_eviction_queue.py +++ b/vllm/v1/core/priority_eviction_queue.py @@ -3,6 +3,7 @@ """Priority-based eviction queue with sidecar storage of per-block retention metadata.""" +import heapq from dataclasses import dataclass from vllm.v1.core.kv_cache_utils import KVCacheBlock @@ -26,8 +27,29 @@ def __init__(self) -> None: def num_blocks(self) -> int: return len(self._in_queue) + def try_insert(self, block: KVCacheBlock) -> bool: + """If the block has a sidecar entry, insert into the heap and + return True. Otherwise return False (caller routes elsewhere).""" + meta = self._meta.get(block.block_id) + if meta is None: + return False + heapq.heappush( + self._heap, + (meta.priority, meta.last_freed_time, block.block_id, block), + ) + self._in_queue.add(block.block_id) + return True + def pop_lowest(self) -> KVCacheBlock | None: - if not self._in_queue: - return None - # Implementation deferred to Task 3 - raise NotImplementedError + """Pop the lowest-priority block from the heap. Stale entries + (block_id no longer in _in_queue) are skipped. Returns None when + the queue is empty.""" + while self._heap: + _, _, block_id, block = heapq.heappop(self._heap) + if block_id in self._in_queue: + self._in_queue.discard(block_id) + # Consuming the block discards its sidecar entry: priority + # is one-shot per free-evict cycle. + self._meta.pop(block_id, None) + return block + return None From 5e6247d211bb8a6c1631ff180fb906c25be982ac Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 11:01:26 +0900 Subject: [PATCH 03/20] Retention API: test eviction order by priority and freed-time tiebreak Lock in the heap-key ordering contract: (priority ASC, last_freed ASC, block_id ASC). Lowest priority leaves first; equal-priority blocks leave oldest-freed first. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index c4f7943842ea..e7bee029bb7f 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -54,3 +54,27 @@ def test_try_insert_returns_false_for_unprioritized_block(self): # No _set_meta call — sidecar entry absent. assert queue.try_insert(block) is False assert queue.num_blocks == 0 + + def test_eviction_order_by_priority(self): + queue = PriorityEvictionQueue() + blocks = [_make_block(i) for i in range(3)] + # Insert in non-ascending order to verify the heap reorders. + for blk, p in zip(blocks, [80, 20, 50]): + _set_meta(queue, blk, priority=p) + queue.try_insert(blk) + # Lowest priority must come out first. + assert queue.pop_lowest() is blocks[1] # priority 20 + assert queue.pop_lowest() is blocks[2] # priority 50 + assert queue.pop_lowest() is blocks[0] # priority 80 + + def test_eviction_order_tiebreak_by_time(self): + queue = PriorityEvictionQueue() + blocks = [_make_block(i) for i in range(3)] + # Same priority; differ only in last_freed_time. + for blk, t in zip(blocks, [300.0, 100.0, 200.0]): + _set_meta(queue, blk, priority=50, last_freed=t) + queue.try_insert(blk) + # Oldest-freed evicted first. + assert queue.pop_lowest() is blocks[1] # t=100 + assert queue.pop_lowest() is blocks[2] # t=200 + assert queue.pop_lowest() is blocks[0] # t=300 From 1438b62b660e96b481f7b358ab81b8e81eb78a20 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 11:05:30 +0900 Subject: [PATCH 04/20] Retention API: remove() with lazy delete; pop_lowest skips stale entries remove() drops from _in_queue without touching the heap; pop_lowest skips entries whose block_id is no longer in _in_queue. Sidecar metadata is preserved so priority survives a reuse cycle. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 34 +++++++++++++++++++++++++ vllm/v1/core/priority_eviction_queue.py | 6 +++++ 2 files changed, 40 insertions(+) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index e7bee029bb7f..e07fe98aac6b 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -78,3 +78,37 @@ def test_eviction_order_tiebreak_by_time(self): assert queue.pop_lowest() is blocks[1] # t=100 assert queue.pop_lowest() is blocks[2] # t=200 assert queue.pop_lowest() is blocks[0] # t=300 + + def test_remove_keeps_sidecar(self): + queue = PriorityEvictionQueue() + block = _make_block(1) + _set_meta(queue, block, priority=50) + queue.try_insert(block) + queue.remove(block) + assert queue.num_blocks == 0 + # Sidecar entry survives so that priority returns if the block + # is freed again later. + assert block.block_id in queue._meta + assert queue.try_insert(block) is True + assert queue.num_blocks == 1 + + def test_remove_nonexistent_is_noop(self): + queue = PriorityEvictionQueue() + block = _make_block(1) + # No insert; remove must not raise. + queue.remove(block) + assert queue.num_blocks == 0 + + def test_stale_heap_entries_are_skipped_in_pop_lowest(self): + queue = PriorityEvictionQueue() + # Insert two blocks; remove one (leaving a stale heap entry). + block_a = _make_block(1) + block_b = _make_block(2) + _set_meta(queue, block_a, priority=10) + _set_meta(queue, block_b, priority=50) + queue.try_insert(block_a) + queue.try_insert(block_b) + queue.remove(block_a) # block_a now stale in heap + # pop_lowest must skip the stale entry and return block_b. + assert queue.pop_lowest() is block_b + assert queue.pop_lowest() is None diff --git a/vllm/v1/core/priority_eviction_queue.py b/vllm/v1/core/priority_eviction_queue.py index 7d80c718aa73..cbf00edc3ad1 100644 --- a/vllm/v1/core/priority_eviction_queue.py +++ b/vllm/v1/core/priority_eviction_queue.py @@ -40,6 +40,12 @@ def try_insert(self, block: KVCacheBlock) -> bool: self._in_queue.add(block.block_id) return True + def remove(self, block: KVCacheBlock) -> None: + """Remove the block from the heap (lazy delete: just drops from + _in_queue). The sidecar entry is preserved so that the block's + priority survives a reuse cycle and is restored on next free.""" + self._in_queue.discard(block.block_id) + def pop_lowest(self) -> KVCacheBlock | None: """Pop the lowest-priority block from the heap. Stale entries (block_id no longer in _in_queue) are skipped. Returns None when From c135b79633bd28c6cb4a33317c3c1640846b67ba Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 11:09:11 +0900 Subject: [PATCH 05/20] Retention API: honor expiry in pop_lowest Entries whose monotonic expiry has elapsed are silently discarded by pop_lowest. Callers receive None when only expired entries remain; the block falls back to the LRU free list at the next allocation. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 29 +++++++++++++++++++++++++ vllm/v1/core/priority_eviction_queue.py | 20 ++++++++++++----- 2 files changed, 43 insertions(+), 6 deletions(-) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index e07fe98aac6b..10183c6200c1 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -112,3 +112,32 @@ def test_stale_heap_entries_are_skipped_in_pop_lowest(self): # pop_lowest must skip the stale entry and return block_b. assert queue.pop_lowest() is block_b assert queue.pop_lowest() is None + + def test_ttl_not_expired(self, monkeypatch): + import time as time_mod + + queue = PriorityEvictionQueue() + block = _make_block(1) + # Set "now" to 100; expiry is at 200 (not yet reached). + monkeypatch.setattr(time_mod, "monotonic", lambda: 100.0) + _set_meta(queue, block, priority=50, expiry=200.0) + queue.try_insert(block) + # Even when "now" advances to 150, expiry (200) is in the future. + monkeypatch.setattr(time_mod, "monotonic", lambda: 150.0) + assert queue.pop_lowest() is block + + def test_ttl_expiry(self, monkeypatch): + import time as time_mod + + queue = PriorityEvictionQueue() + block = _make_block(1) + # Insert with expiry=200. + monkeypatch.setattr(time_mod, "monotonic", lambda: 100.0) + _set_meta(queue, block, priority=50, expiry=200.0) + queue.try_insert(block) + # Advance past expiry — block is treated as unprioritized. + monkeypatch.setattr(time_mod, "monotonic", lambda: 250.0) + # pop_lowest discards expired entries and returns None when none + # remain. + assert queue.pop_lowest() is None + assert queue.num_blocks == 0 diff --git a/vllm/v1/core/priority_eviction_queue.py b/vllm/v1/core/priority_eviction_queue.py index cbf00edc3ad1..7ed2ae641169 100644 --- a/vllm/v1/core/priority_eviction_queue.py +++ b/vllm/v1/core/priority_eviction_queue.py @@ -4,6 +4,7 @@ retention metadata.""" import heapq +import time from dataclasses import dataclass from vllm.v1.core.kv_cache_utils import KVCacheBlock @@ -48,14 +49,21 @@ def remove(self, block: KVCacheBlock) -> None: def pop_lowest(self) -> KVCacheBlock | None: """Pop the lowest-priority block from the heap. Stale entries - (block_id no longer in _in_queue) are skipped. Returns None when - the queue is empty.""" + (block_id no longer in _in_queue) and expired entries are skipped. + Returns None when the queue is empty.""" + now = time.monotonic() while self._heap: _, _, block_id, block = heapq.heappop(self._heap) - if block_id in self._in_queue: + if block_id not in self._in_queue: + continue + meta = self._meta.get(block_id) + if meta is not None and meta.expiry is not None and meta.expiry <= now: + # Expired — discard sidecar and skip (caller treats as + # unprioritized; will be served by the LRU path). self._in_queue.discard(block_id) - # Consuming the block discards its sidecar entry: priority - # is one-shot per free-evict cycle. self._meta.pop(block_id, None) - return block + continue + self._in_queue.discard(block_id) + self._meta.pop(block_id, None) + return block return None From 645b89e1d5a8fbf16900389d750298fa8c8e5201 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 11:12:50 +0900 Subject: [PATCH 06/20] Retention API: apply_directives with overlap matching and duration For each full block, the highest-priority directive whose token range overlaps the block's range wins. Open-ended ranges (end=None) cover from start to end-of-sequence. Duration translates into a monotonic expiry timestamp. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 54 +++++++++++++++++++++++++ vllm/v1/core/priority_eviction_queue.py | 42 +++++++++++++++++++ 2 files changed, 96 insertions(+) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index 10183c6200c1..b8c2f70d6cc1 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -141,3 +141,57 @@ def test_ttl_expiry(self, monkeypatch): # remain. assert queue.pop_lowest() is None assert queue.num_blocks == 0 + + +class TestApplyDirectives: + def _peek_meta(self, queue: PriorityEvictionQueue, block_id: int): + return queue._meta.get(block_id) + + def test_apply_retention_to_block_single_match(self): + queue = PriorityEvictionQueue() + block = _make_block(0) # tokens 0..15 (block_size=16) + directives = [{"start": 0, "end": 16, "priority": 50}] + queue.apply_directives([block], directives, scope=None, block_size=16) + meta = self._peek_meta(queue, 0) + assert meta is not None + assert meta.priority == 50 + + def test_apply_retention_no_match(self): + queue = PriorityEvictionQueue() + block = _make_block(0) # tokens 0..15 + # Directive covers tokens 100..200 — no overlap. + directives = [{"start": 100, "end": 200, "priority": 50}] + queue.apply_directives([block], directives, scope=None, block_size=16) + assert self._peek_meta(queue, 0) is None + + def test_apply_retention_highest_priority_wins(self): + queue = PriorityEvictionQueue() + block = _make_block(0) # tokens 0..15 + directives = [ + {"start": 0, "end": 16, "priority": 30}, + {"start": 0, "end": 16, "priority": 80}, # higher wins + {"start": 0, "end": 16, "priority": 50}, + ] + queue.apply_directives([block], directives, scope=None, block_size=16) + assert self._peek_meta(queue, 0).priority == 80 + + def test_apply_retention_open_ended_range(self): + queue = PriorityEvictionQueue() + blocks = [_make_block(i) for i in range(3)] # tokens 0..15, 16..31, 32..47 + # end=None means "from start to end of sequence". + directives = [{"start": 16, "end": None, "priority": 70}] + queue.apply_directives(blocks, directives, scope=None, block_size=16) + assert self._peek_meta(queue, 0) is None # tokens 0..15 not covered + assert self._peek_meta(queue, 1).priority == 70 + assert self._peek_meta(queue, 2).priority == 70 + + def test_apply_retention_with_duration(self, monkeypatch): + import time as time_mod + + monkeypatch.setattr(time_mod, "monotonic", lambda: 1000.0) + queue = PriorityEvictionQueue() + block = _make_block(0) + directives = [{"start": 0, "end": 16, "priority": 50, "duration": 60.0}] + queue.apply_directives([block], directives, scope=None, block_size=16) + meta = self._peek_meta(queue, 0) + assert meta.expiry == 1060.0 diff --git a/vllm/v1/core/priority_eviction_queue.py b/vllm/v1/core/priority_eviction_queue.py index 7ed2ae641169..27da75e23d77 100644 --- a/vllm/v1/core/priority_eviction_queue.py +++ b/vllm/v1/core/priority_eviction_queue.py @@ -67,3 +67,45 @@ def pop_lowest(self) -> KVCacheBlock | None: self._meta.pop(block_id, None) return block return None + + def apply_directives( + self, + blocks: list[KVCacheBlock], + directives: list[dict], + scope: str | None, + block_size: int, + ) -> None: + """For each full block, find the highest-priority directive whose + token range overlaps the block's range and update the sidecar + entry. Blocks with no matching directive are left untouched at + this stage (ownership/clear semantics added in Task 7).""" + if not directives: + return + now = time.monotonic() + for idx, block in enumerate(blocks): + if block.is_null: + continue + token_start = idx * block_size + token_end = token_start + block_size + best_priority = -1 + best_duration: float | None = None + for d in directives: + d_start = d.get("start", 0) + d_end = d.get("end") + if d_end is not None and d_end <= token_start: + continue + if d_start >= token_end: + continue + p = d.get("priority", 0) + if p > best_priority: + best_priority = p + best_duration = d.get("duration") + if best_priority < 0: + continue + expiry = now + best_duration if best_duration is not None else None + self._meta[block.block_id] = RetentionMeta( + priority=best_priority, + expiry=expiry, + scope=scope, + last_freed_time=0.0, + ) From f89917bdde775f4c787d7df526993dd04b7df077 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 11:18:23 +0900 Subject: [PATCH 07/20] Retention API: scope-based ownership in apply_directives Escalation is open to any caller; downgrade and refresh are restricted to the current owner; a no-match directive set from the owner's scope clears the entry. Non-owner downgrades and anonymous (scope=None) clears are silently ignored. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 86 +++++++++++++++++++++++++ vllm/v1/core/priority_eviction_queue.py | 47 ++++++++++---- 2 files changed, 121 insertions(+), 12 deletions(-) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index b8c2f70d6cc1..c368b27ca8d7 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -195,3 +195,89 @@ def test_apply_retention_with_duration(self, monkeypatch): queue.apply_directives([block], directives, scope=None, block_size=16) meta = self._peek_meta(queue, 0) assert meta.expiry == 1060.0 + + def test_escalation_from_different_scope(self): + queue = PriorityEvictionQueue() + block = _make_block(0) + _set_meta(queue, block, priority=30, scope="alice") + # bob escalates to 80 — allowed regardless of scope. + queue.apply_directives( + [block], + [{"start": 0, "end": 16, "priority": 80}], + scope="bob", + block_size=16, + ) + meta = self._peek_meta(queue, 0) + assert meta.priority == 80 + assert meta.scope == "bob" + + def test_downgrade_blocked_from_different_scope(self): + queue = PriorityEvictionQueue() + block = _make_block(0) + _set_meta(queue, block, priority=80, scope="alice") + # bob tries to downgrade to 20 — denied (alice owns the block). + queue.apply_directives( + [block], + [{"start": 0, "end": 16, "priority": 20}], + scope="bob", + block_size=16, + ) + meta = self._peek_meta(queue, 0) + assert meta.priority == 80 + assert meta.scope == "alice" + + def test_owner_can_downgrade(self): + queue = PriorityEvictionQueue() + block = _make_block(0) + _set_meta(queue, block, priority=80, scope="alice") + queue.apply_directives( + [block], + [{"start": 0, "end": 16, "priority": 20}], + scope="alice", + block_size=16, + ) + meta = self._peek_meta(queue, 0) + assert meta.priority == 20 + assert meta.scope == "alice" + + def test_owner_clear_on_no_match(self): + queue = PriorityEvictionQueue() + block = _make_block(0) + _set_meta(queue, block, priority=50, scope="alice") + # alice issues directives that don't cover this block — sidecar + # entry is cleared. + queue.apply_directives( + [block], + [{"start": 100, "end": 200, "priority": 90}], + scope="alice", + block_size=16, + ) + assert self._peek_meta(queue, 0) is None + + def test_non_owner_no_clear_on_no_match(self): + queue = PriorityEvictionQueue() + block = _make_block(0) + _set_meta(queue, block, priority=50, scope="alice") + # bob's directives don't cover this block — alice's entry stays. + queue.apply_directives( + [block], + [{"start": 100, "end": 200, "priority": 90}], + scope="bob", + block_size=16, + ) + meta = self._peek_meta(queue, 0) + assert meta is not None + assert meta.scope == "alice" + + def test_no_scope_no_clear(self): + queue = PriorityEvictionQueue() + block = _make_block(0) + _set_meta(queue, block, priority=50, scope="alice") + # scope=None caller is anonymous — must not clear anyone's entry. + queue.apply_directives( + [block], + [{"start": 100, "end": 200, "priority": 90}], + scope=None, + block_size=16, + ) + assert self._peek_meta(queue, 0) is not None diff --git a/vllm/v1/core/priority_eviction_queue.py b/vllm/v1/core/priority_eviction_queue.py index 27da75e23d77..60cf2a1ed7d1 100644 --- a/vllm/v1/core/priority_eviction_queue.py +++ b/vllm/v1/core/priority_eviction_queue.py @@ -75,12 +75,16 @@ def apply_directives( scope: str | None, block_size: int, ) -> None: - """For each full block, find the highest-priority directive whose - token range overlaps the block's range and update the sidecar - entry. Blocks with no matching directive are left untouched at - this stage (ownership/clear semantics added in Task 7).""" - if not directives: - return + """For each full block, find the highest-priority overlapping + directive and update the sidecar entry under these rules: + + - Escalation (new > current priority): any caller may raise priority + and takes ownership of the block. + - Downgrade or refresh (new <= current priority): only the current + owner may do this. + - No matching directive: if the caller has scope ownership of this + block, the sidecar entry is cleared. + """ now = time.monotonic() for idx, block in enumerate(blocks): if block.is_null: @@ -100,12 +104,31 @@ def apply_directives( if p > best_priority: best_priority = p best_duration = d.get("duration") + + current = self._meta.get(block.block_id) + if best_priority < 0: + # No matching directive. Owner-initiated clear only. + if scope is not None and current is not None and current.scope == scope: + self._meta.pop(block.block_id, None) continue + expiry = now + best_duration if best_duration is not None else None - self._meta[block.block_id] = RetentionMeta( - priority=best_priority, - expiry=expiry, - scope=scope, - last_freed_time=0.0, - ) + current_priority = current.priority if current is not None else -1 + if best_priority > current_priority: + # Escalation: any caller may raise priority and takes ownership. + self._meta[block.block_id] = RetentionMeta( + priority=best_priority, + expiry=expiry, + scope=scope, + last_freed_time=current.last_freed_time if current else 0.0, + ) + elif current is not None and scope is not None and current.scope == scope: + # Same scope: owner may downgrade or refresh. + self._meta[block.block_id] = RetentionMeta( + priority=best_priority, + expiry=expiry, + scope=scope, + last_freed_time=current.last_freed_time, + ) + # Non-owner downgrade: silently ignored. From 5b2edf191f33330aeddefe0797c007944ae1eb82 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 11:21:45 +0900 Subject: [PATCH 08/20] Retention API: cleanup hooks clear_priority, clear, __contains__ clear_priority drops both sidecar and heap state for one block; clear drops everything; __contains__ reports whether a block is currently in the heap. Locks in the sidecar lifecycle invariants. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 56 +++++++++++++++++++++++++ vllm/v1/core/priority_eviction_queue.py | 15 +++++++ 2 files changed, 71 insertions(+) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index c368b27ca8d7..cda0c046c76f 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -281,3 +281,59 @@ def test_no_scope_no_clear(self): block_size=16, ) assert self._peek_meta(queue, 0) is not None + + +class TestSidecarLifecycle: + def test_sidecar_entry_cleared_on_pop_lowest(self): + queue = PriorityEvictionQueue() + block = _make_block(0) + _set_meta(queue, block, priority=50) + queue.try_insert(block) + queue.pop_lowest() + assert 0 not in queue._meta + + def test_sidecar_entry_cleared_on_clear_priority(self): + queue = PriorityEvictionQueue() + block = _make_block(0) + _set_meta(queue, block, priority=50) + queue.try_insert(block) + queue.clear_priority(0) + assert 0 not in queue._meta + assert 0 not in queue._in_queue + + def test_sidecar_persists_through_reuse_cycle(self): + queue = PriorityEvictionQueue() + block = _make_block(0) + _set_meta(queue, block, priority=50) + queue.try_insert(block) + queue.remove(block) # block reused via touch + # Sidecar entry is preserved so try_insert succeeds again on next free. + assert queue.try_insert(block) is True + # And the heap entry reflects the same priority. + popped = queue.pop_lowest() + assert popped is block + + def test_sidecar_cleared_on_clear_all(self): + queue = PriorityEvictionQueue() + for i in range(3): + block = _make_block(i) + _set_meta(queue, block, priority=50) + queue.try_insert(block) + queue.clear() + assert len(queue._meta) == 0 + assert len(queue._in_queue) == 0 + assert queue.num_blocks == 0 + + def test_clear_priority_for_unknown_block_is_noop(self): + queue = PriorityEvictionQueue() + queue.clear_priority(999) # not present — must not raise + + def test_contains_reflects_heap_membership(self): + queue = PriorityEvictionQueue() + block = _make_block(0) + _set_meta(queue, block, priority=50) + assert block not in queue + queue.try_insert(block) + assert block in queue + queue.remove(block) + assert block not in queue diff --git a/vllm/v1/core/priority_eviction_queue.py b/vllm/v1/core/priority_eviction_queue.py index 60cf2a1ed7d1..a7117f0b9a4b 100644 --- a/vllm/v1/core/priority_eviction_queue.py +++ b/vllm/v1/core/priority_eviction_queue.py @@ -28,6 +28,9 @@ def __init__(self) -> None: def num_blocks(self) -> int: return len(self._in_queue) + def __contains__(self, block: KVCacheBlock) -> bool: + return block.block_id in self._in_queue + def try_insert(self, block: KVCacheBlock) -> bool: """If the block has a sidecar entry, insert into the heap and return True. Otherwise return False (caller routes elsewhere).""" @@ -68,6 +71,18 @@ def pop_lowest(self) -> KVCacheBlock | None: return block return None + def clear_priority(self, block_id: int) -> None: + """Drop the sidecar entry for a block (called when its hash is + reset or it is permanently evicted from prefix cache).""" + self._meta.pop(block_id, None) + self._in_queue.discard(block_id) + + def clear(self) -> None: + """Drop all sidecar entries and heap state.""" + self._meta.clear() + self._heap.clear() + self._in_queue.clear() + def apply_directives( self, blocks: list[KVCacheBlock], From 55a1a391013c265d2d2c2115bb9e8980117b051d Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 14:23:21 +0900 Subject: [PATCH 09/20] Retention API: wire PriorityEvictionQueue into BlockPool Add the queue as a member of BlockPool and clear it on reset_prefix_cache. Two single-line hooks; no changes to KVCacheBlock. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 27 +++++++++++++++++++++++++ vllm/v1/core/block_pool.py | 6 ++++++ 2 files changed, 33 insertions(+) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index cda0c046c76f..e677a26bb6e6 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -337,3 +337,30 @@ def test_contains_reflects_heap_membership(self): assert block in queue queue.remove(block) assert block not in queue + + +class TestBlockPoolPriorityEviction: + def _make_pool(self, num_blocks=8, block_size=16): + from vllm.v1.core.block_pool import BlockPool + + return BlockPool( + num_gpu_blocks=num_blocks, + enable_caching=True, + hash_block_size=block_size, + enable_kv_cache_events=False, + ) + + def test_pool_has_priority_eviction_queue(self): + pool = self._make_pool() + assert isinstance(pool.priority_eviction_queue, PriorityEvictionQueue) + assert pool.priority_eviction_queue.num_blocks == 0 + + def test_reset_prefix_cache_clears_priority_queue(self): + pool = self._make_pool() + # Seed the queue with one prioritized free block. + block = pool.blocks[1] + _set_meta(pool.priority_eviction_queue, block, priority=50) + pool.priority_eviction_queue.try_insert(block) + assert pool.priority_eviction_queue.num_blocks == 1 + pool.reset_prefix_cache() + assert pool.priority_eviction_queue.num_blocks == 0 diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py index 9097079ef33a..16267b19d9d5 100644 --- a/vllm/v1/core/block_pool.py +++ b/vllm/v1/core/block_pool.py @@ -26,6 +26,7 @@ make_block_hash_with_group_id, maybe_convert_block_hash, ) +from vllm.v1.core.priority_eviction_queue import PriorityEvictionQueue from vllm.v1.request import Request logger = init_logger(__name__) @@ -181,6 +182,10 @@ def __init__( self.metrics_collector = metrics_collector + # Sidecar storage for priority-based KV-cache eviction. No-op fast + # path when num_blocks == 0 (no directives have been applied). + self.priority_eviction_queue = PriorityEvictionQueue() + def get_cached_block( self, block_hash: BlockHash, kv_cache_group_ids: list[int] ) -> list[KVCacheBlock] | None: @@ -460,6 +465,7 @@ def reset_prefix_cache(self) -> bool: # Remove all hashes so that no new blocks will hit. self.cached_block_hash_to_block = BlockHashToBlockMap() + self.priority_eviction_queue.clear() # Remove all hashes from all blocks. for block in self.blocks: From a0c61506e0ad66c23891169affbda73a007f1c3e Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 14:33:05 +0900 Subject: [PATCH 10/20] Retention API: wire cache_full_blocks to apply retention directives Reads retention_directives and retention_scope from extra_args and forwards them to the priority eviction queue. Returns early when neither is present, preserving the existing zero-overhead path. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 46 +++++++++++++++++++++++++ vllm/v1/core/block_pool.py | 25 ++++++++++++++ 2 files changed, 71 insertions(+) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index e677a26bb6e6..6c6a8fe8f715 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -364,3 +364,49 @@ def test_reset_prefix_cache_clears_priority_queue(self): assert pool.priority_eviction_queue.num_blocks == 1 pool.reset_prefix_cache() assert pool.priority_eviction_queue.num_blocks == 0 + + def test_cache_full_blocks_routes_directives_to_queue(self): + from vllm.sampling_params import SamplingParams + + pool = self._make_pool(num_blocks=8, block_size=16) + sampling = SamplingParams( + extra_args={ + "retention_directives": [ + {"start": 0, "end": 16, "priority": 80}, + ], + "retention_scope": "alice", + } + ) + + # Minimal stub request — only what the hook needs. + class _Req: + sampling_params: SamplingParams + + request = _Req() + request.sampling_params = sampling + + blocks = [pool.blocks[1]] + # Drive the hook directly to isolate its behavior from the rest of + # cache_full_blocks. + pool._apply_retention_hook(request, blocks, num_full_blocks=1, block_size=16) + + meta = pool.priority_eviction_queue._meta.get(blocks[0].block_id) + assert meta is not None + assert meta.priority == 80 + assert meta.scope == "alice" + + def test_no_extra_args_zero_overhead_path(self): + from vllm.sampling_params import SamplingParams + + pool = self._make_pool() + sampling = SamplingParams() # no extra_args + + class _Req: + sampling_params: SamplingParams + + request = _Req() + request.sampling_params = sampling + blocks = [pool.blocks[1]] + pool._apply_retention_hook(request, blocks, num_full_blocks=1, block_size=16) + assert pool.priority_eviction_queue.num_blocks == 0 + assert blocks[0].block_id not in pool.priority_eviction_queue._meta diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py index 16267b19d9d5..ae787e7d053a 100644 --- a/vllm/v1/core/block_pool.py +++ b/vllm/v1/core/block_pool.py @@ -278,6 +278,8 @@ def cache_full_blocks( if new_hashes is not None: new_hashes.append(maybe_convert_block_hash(block_hash)) + self._apply_retention_hook(request, blocks, num_full_blocks, block_size) + if self.enable_kv_cache_events: if num_cached_blocks == 0: parent_block_hash: ExternalBlockHash | None = None @@ -324,6 +326,29 @@ def cache_full_blocks( ) ) + def _apply_retention_hook( + self, + request, + blocks: list[KVCacheBlock], + num_full_blocks: int, + block_size: int, + ) -> None: + """Read retention directives from request.sampling_params.extra_args + and apply them to the given full blocks. No-op when neither + retention_directives nor retention_scope is present (zero-overhead + fast path for non-retention requests).""" + extra = getattr(request.sampling_params, "extra_args", None) or {} + directives = extra.get("retention_directives") + scope = extra.get("retention_scope") + if directives is None and scope is None: + return + self.priority_eviction_queue.apply_directives( + blocks[:num_full_blocks], + directives or [], + scope, + block_size, + ) + def get_new_blocks(self, num_blocks: int) -> list[KVCacheBlock]: """Get new blocks from the free block pool. From 091b8dc40683cfd0f878ae251f2186847cd4ddd2 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 14:50:32 +0900 Subject: [PATCH 11/20] Retention API: priority-aware allocation in get_new_blocks Fast path is bit-for-bit equivalent to LRU when the priority queue is empty (single integer compare). Otherwise: drain LRU first, then pop lowest-priority blocks from the queue. get_num_free_blocks now sums both pools. reset_prefix_cache updated to use free_block_queue.num_free_blocks directly (pre-Task-13 guard against double-counting with sidecar). Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 32 +++++++++++++++++++++++++ vllm/v1/core/block_pool.py | 27 +++++++++++++++++---- 2 files changed, 55 insertions(+), 4 deletions(-) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index 6c6a8fe8f715..6dae2ce2919d 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -410,3 +410,35 @@ class _Req: pool._apply_retention_hook(request, blocks, num_full_blocks=1, block_size=16) assert pool.priority_eviction_queue.num_blocks == 0 assert blocks[0].block_id not in pool.priority_eviction_queue._meta + + def test_eviction_drains_lru_before_priority(self): + pool = self._make_pool(num_blocks=8, block_size=16) + # Mark 3 blocks as prioritized, remove them from LRU first so they + # live exclusively in the priority queue (avoids double-allocation + # before Task 13 integrates the free-path). + prioritized_ids = [1, 2, 3] + for bid in prioritized_ids: + block = pool.blocks[bid] + pool.free_block_queue.remove(block) + _set_meta(pool.priority_eviction_queue, block, priority=50) + pool.priority_eviction_queue.try_insert(block) + # After removal from LRU, count the remaining LRU-only free blocks. + free_lru_before = pool.free_block_queue.num_free_blocks + # Allocate up to free_lru_before + 1 blocks — the +1 must come from + # the priority queue. + ret = pool.get_new_blocks(free_lru_before + 1) + assert len(ret) == free_lru_before + 1 + # The last block returned should be the one we marked prioritized + # (since the LRU drained first). + assert ret[-1].block_id in prioritized_ids + + def test_get_num_free_blocks_sums_both(self): + pool = self._make_pool(num_blocks=8) + free_before = pool.get_num_free_blocks() + block = pool.blocks[1] + _set_meta(pool.priority_eviction_queue, block, priority=50) + pool.priority_eviction_queue.try_insert(block) + # The block is "in" both the LRU and the priority queue at this + # point — but the LRU count remains the same; the priority count + # adds. + assert pool.get_num_free_blocks() == free_before + 1 diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py index ae787e7d053a..f5ab909e5134 100644 --- a/vllm/v1/core/block_pool.py +++ b/vllm/v1/core/block_pool.py @@ -363,7 +363,23 @@ def get_new_blocks(self, num_blocks: int) -> list[KVCacheBlock]: if num_blocks > self.get_num_free_blocks(): raise ValueError(f"Cannot get {num_blocks} free blocks from the pool") - ret: list[KVCacheBlock] = self.free_block_queue.popleft_n(num_blocks) + # Fast path: no prioritized blocks → pure LRU (zero overhead). + ret: list[KVCacheBlock] + if self.priority_eviction_queue.num_blocks == 0: + ret = self.free_block_queue.popleft_n(num_blocks) + else: + # Drain unprioritized blocks from the LRU free list first, then + # take lowest-priority blocks from the priority queue. + num_from_free = min(num_blocks, self.free_block_queue.num_free_blocks) + ret = ( + self.free_block_queue.popleft_n(num_from_free) if num_from_free else [] + ) + while len(ret) < num_blocks: + evicted = self.priority_eviction_queue.pop_lowest() + assert evicted is not None, ( + "Priority queue empty but more blocks required" + ) + ret.append(evicted) # In order to only iterate the list once, we duplicated code a bit if self.enable_caching: @@ -479,7 +495,7 @@ def reset_prefix_cache(self) -> bool: bool: True if the prefix cache is successfully reset, False otherwise. """ - num_used_blocks = self.num_gpu_blocks - self.get_num_free_blocks() + num_used_blocks = self.num_gpu_blocks - self.free_block_queue.num_free_blocks if num_used_blocks != 1: # The null block is always marked as used logger.warning( "Failed to reset prefix cache because some " @@ -510,9 +526,12 @@ def get_num_free_blocks(self) -> int: """Get the number of free blocks in the pool. Returns: - The number of free blocks. + The number of free blocks (LRU + prioritized). """ - return self.free_block_queue.num_free_blocks + return ( + self.free_block_queue.num_free_blocks + + self.priority_eviction_queue.num_blocks + ) def get_usage(self) -> float: """Get the KV cache usage. From 34112973a5c18a3eb1f862f5dd51f584b9ea1d4e Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 14:59:31 +0900 Subject: [PATCH 12/20] Retention API: revert reset_prefix_cache change from priority-aware allocation The earlier priority-aware get_num_free_blocks rewrite (sum of LRU and priority queue) required a reset_prefix_cache change to keep an earlier eviction-queue test green. That side-effect is reverted here; the test is instead adjusted to remove the block from the LRU before adding it to the priority queue, matching the single-queue invariant enforced later when touch() is updated. Keeps reset_prefix_cache correct in production after full integration. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 5 ++++- vllm/v1/core/block_pool.py | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index 6dae2ce2919d..755537a42156 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -357,8 +357,11 @@ def test_pool_has_priority_eviction_queue(self): def test_reset_prefix_cache_clears_priority_queue(self): pool = self._make_pool() - # Seed the queue with one prioritized free block. + # Seed the queue with one prioritized free block. Remove from the + # LRU first so the block lives in exactly one queue, matching the + # invariant that free_blocks (Task 13) will enforce. block = pool.blocks[1] + pool.free_block_queue.remove(block) _set_meta(pool.priority_eviction_queue, block, priority=50) pool.priority_eviction_queue.try_insert(block) assert pool.priority_eviction_queue.num_blocks == 1 diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py index f5ab909e5134..69dfa9dea8a1 100644 --- a/vllm/v1/core/block_pool.py +++ b/vllm/v1/core/block_pool.py @@ -495,7 +495,7 @@ def reset_prefix_cache(self) -> bool: bool: True if the prefix cache is successfully reset, False otherwise. """ - num_used_blocks = self.num_gpu_blocks - self.free_block_queue.num_free_blocks + num_used_blocks = self.num_gpu_blocks - self.get_num_free_blocks() if num_used_blocks != 1: # The null block is always marked as used logger.warning( "Failed to reset prefix cache because some " From dbbeae2356d6ef80c5079993e27f9dcd5a4c9d0a Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 15:03:10 +0900 Subject: [PATCH 13/20] Retention API: touch removes from priority queue when block was there Single 3-line dispatch in touch(): if the reused block is in the priority queue, remove from there; otherwise remove from the LRU free list (existing behavior). Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 24 ++++++++++++++++++++++++ vllm/v1/core/block_pool.py | 9 ++++++--- 2 files changed, 30 insertions(+), 3 deletions(-) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index 755537a42156..52bab7ff6648 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -445,3 +445,27 @@ def test_get_num_free_blocks_sums_both(self): # point — but the LRU count remains the same; the priority count # adds. assert pool.get_num_free_blocks() == free_before + 1 + + def test_touch_removes_from_priority_queue(self): + pool = self._make_pool() + block = pool.blocks[1] + block.ref_cnt = 0 # free state + # Remove from LRU first so the block lives in exactly one queue, + # consistent with the Task-13 invariant. + pool.free_block_queue.remove(block) + _set_meta(pool.priority_eviction_queue, block, priority=50) + pool.priority_eviction_queue.try_insert(block) + assert pool.priority_eviction_queue.num_blocks == 1 + pool.touch([block]) + # touch() reuses the block: it must leave the priority queue. + assert pool.priority_eviction_queue.num_blocks == 0 + assert block.ref_cnt == 1 + + def test_touch_removes_from_lru(self): + pool = self._make_pool() + # Use a block already in LRU (not prioritized). + block = pool.blocks[2] + block.ref_cnt = 0 + # Block is in free_block_queue by default after init. + pool.touch([block]) + assert block.ref_cnt == 1 diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py index 69dfa9dea8a1..bc065c33e4fa 100644 --- a/vllm/v1/core/block_pool.py +++ b/vllm/v1/core/block_pool.py @@ -443,10 +443,13 @@ def touch(self, blocks: Sequence[KVCacheBlock]) -> None: blocks: A list of blocks to touch. """ for block in blocks: - # ref_cnt=0 means this block is in the free list (i.e. eviction - # candidate), so remove it. + # ref_cnt=0 means this block is in a free queue (LRU or + # priority); remove it from whichever queue it sits in. if block.ref_cnt == 0 and not block.is_null: - self.free_block_queue.remove(block) + if block in self.priority_eviction_queue: + self.priority_eviction_queue.remove(block) + else: + self.free_block_queue.remove(block) block.ref_cnt += 1 if self.metrics_collector: self.metrics_collector.on_block_accessed(block) From b2b40ba27541334adaf99bb253f9541ae2545887 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 15:09:34 +0900 Subject: [PATCH 14/20] Retention API: route prioritized blocks via try_insert in free_blocks Single-line filter: blocks for which try_insert returns False fall through to the LRU free list (existing path). try_insert stamps the current monotonic time so the heap tiebreak reflects this most-recent free. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 35 +++++++++++++++++++++++++ vllm/v1/core/block_pool.py | 12 ++++++++- vllm/v1/core/priority_eviction_queue.py | 15 +++++++++-- 3 files changed, 59 insertions(+), 3 deletions(-) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index 52bab7ff6648..9fec8e72fb57 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -469,3 +469,38 @@ def test_touch_removes_from_lru(self): # Block is in free_block_queue by default after init. pool.touch([block]) assert block.ref_cnt == 1 + + def test_free_unprioritized_goes_to_lru(self): + pool = self._make_pool() + block = pool.blocks[1] + block.ref_cnt = 1 + # Block must not be in LRU while ref_cnt > 0 — remove it first to + # mirror the production state of an active block. + pool.free_block_queue.remove(block) + free_before = pool.free_block_queue.num_free_blocks + pool.free_blocks([block]) + assert pool.free_block_queue.num_free_blocks == free_before + 1 + assert pool.priority_eviction_queue.num_blocks == 0 + + def test_free_prioritized_goes_to_priority_queue(self, monkeypatch): + import time as time_mod + + monkeypatch.setattr(time_mod, "monotonic", lambda: 12345.0) + pool = self._make_pool() + block = pool.blocks[1] + block.ref_cnt = 1 + # Block must not be in LRU while ref_cnt > 0 — remove it first. + pool.free_block_queue.remove(block) + # Install a sidecar entry so try_insert recognizes the block as + # prioritized. + _set_meta(pool.priority_eviction_queue, block, priority=50) + free_before = pool.free_block_queue.num_free_blocks + pool.free_blocks([block]) + # Did NOT land in the LRU queue. + assert pool.free_block_queue.num_free_blocks == free_before + # Did land in the priority queue with updated last_freed_time. + assert pool.priority_eviction_queue.num_blocks == 1 + assert ( + pool.priority_eviction_queue._meta[block.block_id].last_freed_time + == 12345.0 + ) diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py index bc065c33e4fa..b8696679ef31 100644 --- a/vllm/v1/core/block_pool.py +++ b/vllm/v1/core/block_pool.py @@ -1,5 +1,6 @@ # SPDX-License-Identifier: Apache-2.0 # SPDX-FileCopyrightText: Copyright contributors to the vLLM project +import time from collections.abc import Iterable, Sequence from typing import Any @@ -466,8 +467,17 @@ def free_blocks(self, ordered_blocks: Iterable[KVCacheBlock]) -> None: blocks_list = list(ordered_blocks) for block in blocks_list: block.ref_cnt -= 1 + free_able = [b for b in blocks_list if b.ref_cnt == 0 and not b.is_null] + # Stamp the current monotonic time so the heap tiebreak reflects + # this most-recent free. try_insert returns False for blocks + # without a sidecar entry; those fall through to the LRU free list. + now = time.monotonic() self.free_block_queue.append_n( - [block for block in blocks_list if block.ref_cnt == 0 and not block.is_null] + [ + b + for b in free_able + if not self.priority_eviction_queue.try_insert(b, last_freed_time=now) + ] ) def evict_blocks(self, block_ids: set[int]) -> None: diff --git a/vllm/v1/core/priority_eviction_queue.py b/vllm/v1/core/priority_eviction_queue.py index a7117f0b9a4b..789def89227f 100644 --- a/vllm/v1/core/priority_eviction_queue.py +++ b/vllm/v1/core/priority_eviction_queue.py @@ -31,12 +31,23 @@ def num_blocks(self) -> int: def __contains__(self, block: KVCacheBlock) -> bool: return block.block_id in self._in_queue - def try_insert(self, block: KVCacheBlock) -> bool: + def try_insert( + self, + block: KVCacheBlock, + last_freed_time: float | None = None, + ) -> bool: """If the block has a sidecar entry, insert into the heap and - return True. Otherwise return False (caller routes elsewhere).""" + return True. Otherwise return False (caller routes elsewhere). + + last_freed_time, when provided, overrides the value stored in the + sidecar entry. This is used by free_blocks() to stamp the current + monotonic time on freshly-freed blocks so the heap tiebreak + reflects this most-recent free.""" meta = self._meta.get(block.block_id) if meta is None: return False + if last_freed_time is not None: + meta.last_freed_time = last_freed_time heapq.heappush( self._heap, (meta.priority, meta.last_freed_time, block.block_id, block), From b10c395985d04c3d0321cf80ca5685e00063bb82 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 15:17:36 +0900 Subject: [PATCH 15/20] Retention API: clear sidecar on evict_blocks Single-line hook in the evict_blocks per-block loop. Ensures the sidecar dict tracks only blocks that are currently in the prefix cache. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 9 +++++++++ vllm/v1/core/block_pool.py | 1 + 2 files changed, 10 insertions(+) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index 9fec8e72fb57..ff9ba28ee15b 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -504,3 +504,12 @@ def test_free_prioritized_goes_to_priority_queue(self, monkeypatch): pool.priority_eviction_queue._meta[block.block_id].last_freed_time == 12345.0 ) + + def test_evict_blocks_clears_sidecar(self): + pool = self._make_pool() + block = pool.blocks[1] + _set_meta(pool.priority_eviction_queue, block, priority=50) + # Don't insert into heap — just install sidecar (simulating a + # block whose ref_cnt > 0 but had a prior priority). + pool.evict_blocks({block.block_id}) + assert block.block_id not in pool.priority_eviction_queue._meta diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py index b8696679ef31..63a5335db888 100644 --- a/vllm/v1/core/block_pool.py +++ b/vllm/v1/core/block_pool.py @@ -498,6 +498,7 @@ def evict_blocks(self, block_ids: set[int]) -> None: ) block = self.blocks[block_id] self._maybe_evict_cached_block(block) + self.priority_eviction_queue.clear_priority(block_id) def reset_prefix_cache(self) -> bool: """Reset prefix cache. This function may be used in RLHF From 4f93efb663d7265e07bc2eeb1a1e796509bbb5df Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 15:23:22 +0900 Subject: [PATCH 16/20] Retention API: add retention_directives + retention_scope chat fields MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new Pydantic fields on ChatCompletionRequest, forwarded into SamplingParams.extra_args. Pure addition — no existing-line changes. Signed-off-by: HyunKyun Moon --- ...est_chat_completion_request_validations.py | 21 ++++++++++++++++++ .../openai/chat_completion/protocol.py | 22 +++++++++++++++++++ 2 files changed, 43 insertions(+) diff --git a/tests/tool_use/test_chat_completion_request_validations.py b/tests/tool_use/test_chat_completion_request_validations.py index 70f4ac22541a..48d6923e1a12 100644 --- a/tests/tool_use/test_chat_completion_request_validations.py +++ b/tests/tool_use/test_chat_completion_request_validations.py @@ -61,3 +61,24 @@ def test_chat_completion_request_with_tool_choice_but_no_tools(tool_choice): "tools": None, } ) + + +def test_retention_directives_field_round_trips(): + from vllm.entrypoints.openai.chat_completion.protocol import ( + ChatCompletionRequest, + ) + + req = ChatCompletionRequest( + model="dummy", + messages=[{"role": "user", "content": "hi"}], + retention_directives=[{"start": 0, "end": 16, "priority": 80}], + retention_scope="alice", + ) + sp = req.to_sampling_params( + max_tokens=16, + default_sampling_params={}, + ) + assert sp.extra_args["retention_directives"] == [ + {"start": 0, "end": 16, "priority": 80} + ] + assert sp.extra_args["retention_scope"] == "alice" diff --git a/vllm/entrypoints/openai/chat_completion/protocol.py b/vllm/entrypoints/openai/chat_completion/protocol.py index 03e4678c8214..6e0375306560 100644 --- a/vllm/entrypoints/openai/chat_completion/protocol.py +++ b/vllm/entrypoints/openai/chat_completion/protocol.py @@ -385,6 +385,24 @@ class ChatCompletionRequest(OpenAIBaseModel): description="KVTransfer parameters used for disaggregated serving.", ) + retention_directives: list[dict[str, Any]] | None = Field( + default=None, + description=( + "Retention directives for priority-based KV-cache eviction. " + "Each directive: {start: int, end: int|null, " + "priority: int (0-100), duration: float|null}." + ), + ) + + retention_scope: str | None = Field( + default=None, + description=( + "Opaque scope identifier for retention ownership. " + "Only the scope that set a block's priority can downgrade " + "or clear it. Typically a session or workflow ID." + ), + ) + vllm_xargs: dict[str, str | int | float | list[str | int | float]] | None = Field( default=None, description=( @@ -582,6 +600,10 @@ def to_sampling_params( if self.kv_transfer_params: # Pass in kv_transfer_params via extra_args extra_args["kv_transfer_params"] = self.kv_transfer_params + if self.retention_directives is not None: + extra_args["retention_directives"] = self.retention_directives + if self.retention_scope is not None: + extra_args["retention_scope"] = self.retention_scope return SamplingParams.from_optional( n=self.n, presence_penalty=self.presence_penalty, From b5a53458a858c0abb554cebd68d69bc0bfa4a2d0 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 15:31:34 +0900 Subject: [PATCH 17/20] Retention API: validator enforces non-increasing priorities Across rising token positions, priorities must be non-increasing (prefix-cache constraint: cached prefixes are shared across requests, so later token spans cannot retain blocks that earlier spans do not). Validator runs before model construction; sorts by start, then checks. Signed-off-by: HyunKyun Moon --- ...est_chat_completion_request_validations.py | 64 +++++++++++++++++++ .../openai/chat_completion/protocol.py | 31 +++++++++ 2 files changed, 95 insertions(+) diff --git a/tests/tool_use/test_chat_completion_request_validations.py b/tests/tool_use/test_chat_completion_request_validations.py index 48d6923e1a12..0ccd92b7708c 100644 --- a/tests/tool_use/test_chat_completion_request_validations.py +++ b/tests/tool_use/test_chat_completion_request_validations.py @@ -82,3 +82,67 @@ def test_retention_directives_field_round_trips(): {"start": 0, "end": 16, "priority": 80} ] assert sp.extra_args["retention_scope"] == "alice" + + +def _build_request_with_directives(directives): + return ChatCompletionRequest( + model="dummy", + messages=[{"role": "user", "content": "hi"}], + retention_directives=directives, + ) + + +def test_retention_directives_monotonic_priorities_valid(): + # Strictly decreasing priorities across rising token positions: valid. + _build_request_with_directives( + [ + {"start": 0, "end": 100, "priority": 90}, + {"start": 100, "end": 200, "priority": 60}, + {"start": 200, "end": 300, "priority": 30}, + ] + ) + + +def test_retention_directives_monotonic_priorities_equal_is_valid(): + # Non-increasing (equal) priorities: valid. + _build_request_with_directives( + [ + {"start": 0, "end": 100, "priority": 50}, + {"start": 100, "end": 200, "priority": 50}, + ] + ) + + +def test_retention_directives_empty_or_none_is_valid(): + _build_request_with_directives(None) + _build_request_with_directives([]) + + +def test_retention_directives_single_directive_is_valid(): + _build_request_with_directives([{"start": 0, "end": 16, "priority": 80}]) + + +def test_retention_directives_increasing_priority_rejected(): + with pytest.raises(ValueError, match="non-increasing"): + _build_request_with_directives( + [ + {"start": 0, "end": 100, "priority": 30}, + {"start": 100, "end": 200, "priority": 80}, # increases — invalid + ] + ) + + +def test_retention_directives_unsorted_input_still_validated(): + # Input order is unsorted, but the validator should sort by start + # before checking monotonicity. + with pytest.raises(ValueError, match="non-increasing"): + _build_request_with_directives( + [ + {"start": 100, "end": 200, "priority": 80}, + { + "start": 0, + "end": 100, + "priority": 30, + }, # sorted: 30 then 80 → invalid + ] + ) diff --git a/vllm/entrypoints/openai/chat_completion/protocol.py b/vllm/entrypoints/openai/chat_completion/protocol.py index 6e0375306560..1c81785f82b5 100644 --- a/vllm/entrypoints/openai/chat_completion/protocol.py +++ b/vllm/entrypoints/openai/chat_completion/protocol.py @@ -665,6 +665,37 @@ def validate_response_format(cls, data): return data + @model_validator(mode="before") + @classmethod + def validate_retention_directives_monotonic(cls, data): + if not isinstance(data, dict): + return data + directives = data.get("retention_directives") + if not directives: + return data + # Sort by token start position; ties broken by original order. + sorted_directives = sorted( + enumerate(directives), + key=lambda pair: (pair[1].get("start", 0), pair[0]), + ) + prev_priority: int | None = None + prev_start: int | None = None + for _, directive in sorted_directives: + priority = directive.get("priority", 0) + start = directive.get("start", 0) + if prev_priority is not None and priority > prev_priority: + raise VLLMValidationError( + "`retention_directives` priorities must be non-increasing " + "across token positions (prefix-cache constraint): " + f"directive at start={start} has priority={priority} > " + f"earlier directive at start={prev_start} with " + f"priority={prev_priority}.", + parameter="retention_directives", + ) + prev_priority = priority + prev_start = start + return data + @model_validator(mode="before") @classmethod def validate_stream_options(cls, data): From e582d5fbf1722a5338198a5f29fda880d46a60d4 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 15:36:22 +0900 Subject: [PATCH 18/20] Retention API: structural-invariant tests for sidecar pattern Two regression tests prevent KVCacheBlock or Request from accidentally gaining retention-specific fields in future commits. They make the sidecar contract self-enforcing. Signed-off-by: HyunKyun Moon --- tests/v1/core/test_priority_eviction.py | 43 +++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index ff9ba28ee15b..f51ff9bfb995 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -513,3 +513,46 @@ def test_evict_blocks_clears_sidecar(self): # block whose ref_cnt > 0 but had a prior priority). pool.evict_blocks({block.block_id}) assert block.block_id not in pool.priority_eviction_queue._meta + + +class TestStructuralInvariants: + """Lock in the spec's 'sidecar pattern' contract: + - KVCacheBlock must not gain feature-specific fields for retention. + - Request must not gain retention attributes. + + If these tests fail, you are about to break the additive-only feel + of this PR. Move the new state into PriorityEvictionQueue's sidecar + instead. + """ + + def test_kv_cache_block_has_no_priority_fields(self): + from dataclasses import fields + + from vllm.v1.core.kv_cache_utils import KVCacheBlock + + names = {f.name for f in fields(KVCacheBlock)} + forbidden = { + "priority", + "priority_expiry", + "priority_scope", + "last_freed_time", + } + leaks = names & forbidden + assert not leaks, ( + f"KVCacheBlock has retention-specific fields {leaks!r}. " + "Move them to PriorityEvictionQueue's sidecar (see " + "docs/superpowers/specs/2026-05-14-retention-api-super-minimal-design.md)." + ) + + def test_request_has_no_retention_attributes(self): + import inspect + + from vllm.v1.request import Request + + src = inspect.getsource(Request.__init__) + forbidden = ("retention_directives", "retention_scope") + leaks = [name for name in forbidden if f"self.{name}" in src] + assert not leaks, ( + f"Request.__init__ assigns to {leaks!r}. Read retention from " + "request.sampling_params.extra_args at the use site instead." + ) From 85a85f1bcc9c7c1da44be7677e58c4ca7456ab95 Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Thu, 14 May 2026 15:40:24 +0900 Subject: [PATCH 19/20] Retention API: port throughput benchmark with sidecar access patterns Adapt the existing benchmark to read priority via the queue API rather than KVCacheBlock fields. Signed-off-by: HyunKyun Moon --- benchmarks/benchmark_retention_eviction.py | 233 +++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 benchmarks/benchmark_retention_eviction.py diff --git a/benchmarks/benchmark_retention_eviction.py b/benchmarks/benchmark_retention_eviction.py new file mode 100644 index 000000000000..efcbac9ebefb --- /dev/null +++ b/benchmarks/benchmark_retention_eviction.py @@ -0,0 +1,233 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project +"""Simulate agentic multi-turn eviction with retention directives. + +This script validates that the two-structure evictor (LRU + priority queue) +correctly protects prioritized blocks under memory pressure. It simulates +a workload where: + + 1. Multiple "sessions" allocate blocks (simulating multi-turn agentic + requests with growing context). + 2. Some blocks receive retention priority (simulating system-prompt and + tool-call-awaiting blocks that the orchestrator wants to protect). + 3. Memory pressure forces eviction. + 4. We measure whether prioritized blocks survive longer than unprioritized + ones. + +No GPU required — runs against the actual BlockPool implementation. + +Usage: + python benchmarks/benchmark_retention_eviction.py \ + --num-gpu-blocks 1000 --block-size 16 --num-sessions 20 +""" + +import argparse +import time + +from vllm.v1.core.block_pool import BlockPool +from vllm.v1.core.kv_cache_utils import KVCacheBlock +from vllm.v1.core.priority_eviction_queue import RetentionMeta + + +def run_simulation( + num_gpu_blocks: int, + block_size: int, + num_sessions: int, + blocks_per_session: int, + priority_fraction: float, + priority_value: int, +): + """Run the eviction simulation. + + Args: + num_gpu_blocks: Total blocks in the pool. + block_size: Tokens per block (for hash_block_size). + num_sessions: Number of concurrent sessions to simulate. + blocks_per_session: Blocks allocated per session. + priority_fraction: Fraction of each session's blocks that get + retention priority (0.0 - 1.0). + priority_value: Priority value to assign (0-100). + """ + pool = BlockPool( + num_gpu_blocks=num_gpu_blocks, + enable_caching=True, + hash_block_size=block_size, + ) + + # Track which blocks are prioritized vs not. + prioritized_block_ids: set[int] = set() + unprioritized_block_ids: set[int] = set() + + sessions: list[list[KVCacheBlock]] = [] + total_allocated = 0 + + print(f"Pool: {num_gpu_blocks} blocks, {num_gpu_blocks - 1} usable") + print(f"Sessions: {num_sessions}, {blocks_per_session} blocks each") + print( + f"Priority: {priority_fraction * 100:.0f}% of blocks at " + f"priority={priority_value}" + ) + print(f"Total demand: {num_sessions * blocks_per_session} blocks") + print() + + # Phase 1: Allocate sessions until we run out of blocks. + for i in range(num_sessions): + free = pool.get_num_free_blocks() + if free < blocks_per_session: + print( + f" Session {i}: only {free} free blocks, " + f"need {blocks_per_session} — stopping allocation" + ) + break + + blocks = pool.get_new_blocks(blocks_per_session) + total_allocated += len(blocks) + + # Mark some blocks as prioritized. + num_priority = int(len(blocks) * priority_fraction) + for j, block in enumerate(blocks): + if j < num_priority: + pool.priority_eviction_queue._meta[block.block_id] = RetentionMeta( + priority=priority_value, + expiry=None, + scope=None, + last_freed_time=0.0, + ) + prioritized_block_ids.add(block.block_id) + else: + unprioritized_block_ids.add(block.block_id) + + sessions.append(blocks) + print( + f" Session {i}: allocated {len(blocks)} blocks " + f"({num_priority} prioritized), " + f"{pool.get_num_free_blocks()} free remaining" + ) + + print(f"\nTotal allocated: {total_allocated}") + print(f" Prioritized: {len(prioritized_block_ids)}") + print(f" Unprioritized: {len(unprioritized_block_ids)}") + + # Phase 2: Free all sessions (simulating requests completing). + for session_blocks in sessions: + pool.free_blocks(session_blocks) + + print("\nAfter freeing all sessions:") + print(f" LRU free list: {pool.free_block_queue.num_free_blocks}") + print(f" Priority queue: {pool.priority_eviction_queue.num_blocks}") + print(f" Total free: {pool.get_num_free_blocks()}") + + # Phase 3: Allocate under pressure and track which blocks get evicted. + eviction_target = total_allocated // 2 + print(f"\nEvicting {eviction_target} blocks (allocating new ones)...") + + evicted_prioritized = 0 + evicted_unprioritized = 0 + evicted_other = 0 + + t_start = time.monotonic() + new_blocks = pool.get_new_blocks(eviction_target) + t_elapsed = time.monotonic() - t_start + + for block in new_blocks: + bid = block.block_id + if bid in unprioritized_block_ids: + evicted_unprioritized += 1 + elif bid in prioritized_block_ids: + evicted_prioritized += 1 + else: + evicted_other += 1 + + print(f" Time: {t_elapsed * 1000:.2f} ms") + print(f" Evicted from unprioritized: {evicted_unprioritized}") + print(f" Evicted from prioritized: {evicted_prioritized}") + print(f" Evicted from other (null/fresh): {evicted_other}") + + # Verify correctness: unprioritized blocks should be evicted first. + total_unprioritized = len(unprioritized_block_ids) + if eviction_target <= total_unprioritized: + # Should not have touched any prioritized blocks. + if evicted_prioritized == 0: + print("\n PASS: No prioritized blocks evicted (all evictions from LRU)") + else: + print( + f"\n FAIL: {evicted_prioritized} prioritized blocks " + f"evicted when {total_unprioritized} unprioritized " + f"were available" + ) + else: + # Some prioritized blocks must be evicted. + expected_from_priority = eviction_target - total_unprioritized + print( + f"\n INFO: Needed {expected_from_priority} from priority " + f"queue (exhausted {total_unprioritized} unprioritized)" + ) + if evicted_unprioritized == total_unprioritized: + print(" PASS: All unprioritized evicted before any prioritized") + else: + print( + f" FAIL: Only {evicted_unprioritized}/{total_unprioritized}" + f" unprioritized evicted before touching prioritized" + ) + + # Phase 4: Verify remaining priority queue state. + remaining_in_pq = pool.priority_eviction_queue.num_blocks + surviving_prioritized = len(prioritized_block_ids) - evicted_prioritized + print(f"\n Surviving prioritized blocks: {surviving_prioritized}") + print(f" Priority queue size: {remaining_in_pq}") + + return evicted_prioritized == 0 or evicted_unprioritized == len( + unprioritized_block_ids + ) + + +def main(): + parser = argparse.ArgumentParser(description="Benchmark retention-based eviction") + parser.add_argument( + "--num-gpu-blocks", type=int, default=1000, help="Total KV-cache blocks in pool" + ) + parser.add_argument("--block-size", type=int, default=16, help="Tokens per block") + parser.add_argument( + "--num-sessions", type=int, default=20, help="Number of concurrent sessions" + ) + parser.add_argument( + "--blocks-per-session", type=int, default=40, help="Blocks per session" + ) + parser.add_argument( + "--priority-fraction", + type=float, + default=0.3, + help="Fraction of blocks with priority (0-1)", + ) + parser.add_argument( + "--priority-value", + type=int, + default=80, + help="Priority value for retained blocks (0-100)", + ) + args = parser.parse_args() + + print("=" * 60) + print("Retention-Based Eviction Benchmark") + print("=" * 60) + print() + + success = run_simulation( + num_gpu_blocks=args.num_gpu_blocks, + block_size=args.block_size, + num_sessions=args.num_sessions, + blocks_per_session=args.blocks_per_session, + priority_fraction=args.priority_fraction, + priority_value=args.priority_value, + ) + + print() + print("=" * 60) + print(f"Result: {'PASS' if success else 'FAIL'}") + print("=" * 60) + + return 0 if success else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) From 18a77e9adfec6fde8266002f5a246c49cde193cd Mon Sep 17 00:00:00 2001 From: HyunKyun Moon Date: Tue, 9 Jun 2026 12:29:43 +0900 Subject: [PATCH 20/20] =?UTF-8?q?[V1][Core]=20Fix=20priority-inversion=20i?= =?UTF-8?q?n=20PriorityEvictionQueue=20via=20per-block=20generation=20coun?= =?UTF-8?q?ter=20PriorityEvictionQueue=20holds=20freed=20blocks=20in=20a?= =?UTF-8?q?=20min-heap=20keyed=20by=20(priority,=20last=5Ffreed=5Ftime).?= =?UTF-8?q?=20A=20remove()+re-free=20reuse=20cycle=20pushed=20a=20second?= =?UTF-8?q?=20tuple=20for=20the=20same=20block=5Fid=20without=20removing?= =?UTF-8?q?=20the=20prior=20one,=20so=20the=20heap=20could=20carry=20stale?= =?UTF-8?q?=20tuples=20with=20an=20outdated=20(priority,=20last=5Ffreed=5F?= =?UTF-8?q?time).=20pop=5Flowest=20could=20then=20evict=20using=20a=20stal?= =?UTF-8?q?e=20tuple=20=E2=80=94=20e.g.=20a=20block=20re-protected=20at=20?= =?UTF-8?q?priority=2090=20evicted=20as=20if=20still=20priority=2050=20?= =?UTF-8?q?=E2=80=94=20inverting=20the=20intended=20eviction=20order=20and?= =?UTF-8?q?=20silently=20violating=20retention=20directives.=20Fix:=20stam?= =?UTF-8?q?p=20every=20pushed=20tuple=20with=20a=20per-block=20monotonic?= =?UTF-8?q?=20generation=20(=5Fgen[block=5Fid],=20bumped=20on=20each=20try?= =?UTF-8?q?=5Finsert)=20and=20have=20pop=5Flowest=20skip=20any=20popped=20?= =?UTF-8?q?tuple=20whose=20generation=20no=20longer=20matches=20the=20bloc?= =?UTF-8?q?k's=20current=20generation=20(lazy=20deletion).=20Eviction=20no?= =?UTF-8?q?w=20always=20orders=20by=20the=20block's=20CURRENT=20(priority,?= =?UTF-8?q?=20last=5Ffreed=5Ftime).=20Bundled=20(entangled=20in=20the=20sa?= =?UTF-8?q?me=20files):=20-=20drain=5Fexpired():=20demote=20expired=20side?= =?UTF-8?q?car=20entries=20to=20the=20LRU=20free=20list=20=20=20rather=20t?= =?UTF-8?q?han=20letting=20them=20top=20the=20next=20pop=5Flowest=20(was?= =?UTF-8?q?=20destroying=20=20=20prefix-cache=20hit=20rate=20under=20TTL'd?= =?UTF-8?q?=20protection).=20-=20try=5Finsert=20routes=20below-threshold?= =?UTF-8?q?=20sidecar=20entries=20to=20the=20LRU=20instead=20=20=20of=20th?= =?UTF-8?q?e=20priority=20queue.=20Tests:=20stale-tuple/inversion=20+=20th?= =?UTF-8?q?reshold-routing=20cases=20in=20test=5Fpriority=5Feviction.py;?= =?UTF-8?q?=20conftest=20lowers=20=5FPRIORITY=5FTHRESHOLD=20to=200=20for?= =?UTF-8?q?=20the=20dir's=20priority=3D50=20convention.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: HyunKyun Moon --- tests/v1/core/conftest.py | 21 + tests/v1/core/test_priority_eviction.py | 492 +++++++++++++++++++++++- vllm/v1/core/block_pool.py | 93 ++++- vllm/v1/core/priority_eviction_queue.py | 97 ++++- 4 files changed, 663 insertions(+), 40 deletions(-) create mode 100644 tests/v1/core/conftest.py diff --git a/tests/v1/core/conftest.py b/tests/v1/core/conftest.py new file mode 100644 index 000000000000..16b87e590d06 --- /dev/null +++ b/tests/v1/core/conftest.py @@ -0,0 +1,21 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +import pytest + + +@pytest.fixture(autouse=True) +def _retention_priority_threshold_zero(monkeypatch): + """Lower `_PRIORITY_THRESHOLD` to 0 inside every test in this dir. + + The retention priority threshold is a production safety knob (default + 60) that routes sub-threshold sidecar entries to the LRU rather than + the priority queue. The unit tests in this directory use priority=50 + throughout as a convention to mean "any prioritized entry"; we lower + the threshold inside tests so the convention keeps working without + rewriting every test fixture. Tests that explicitly exercise the + threshold behavior re-set it via monkeypatch in-body. + """ + import vllm.v1.core.priority_eviction_queue as pq_mod + + monkeypatch.setattr(pq_mod, "_PRIORITY_THRESHOLD", 0) diff --git a/tests/v1/core/test_priority_eviction.py b/tests/v1/core/test_priority_eviction.py index f51ff9bfb995..27f1eb4113bc 100644 --- a/tests/v1/core/test_priority_eviction.py +++ b/tests/v1/core/test_priority_eviction.py @@ -55,6 +55,46 @@ def test_try_insert_returns_false_for_unprioritized_block(self): assert queue.try_insert(block) is False assert queue.num_blocks == 0 + def test_try_insert_below_threshold_drops_sidecar_routes_to_lru(self, monkeypatch): + """A sidecar entry whose priority is below the configured + threshold must be rejected: try_insert returns False and the + sidecar is dropped so the caller (free_blocks) routes the block + to the LRU free list. The threshold guarantees enough cached + blocks stay in LRU to absorb get_new_blocks demand without + draining the priority queue.""" + import vllm.v1.core.priority_eviction_queue as pq_mod + + monkeypatch.setattr(pq_mod, "_PRIORITY_THRESHOLD", 60) + queue = PriorityEvictionQueue() + block = _make_block(1) + _set_meta(queue, block, priority=50) + + assert queue.try_insert(block) is False, ( + "priority below threshold must be rejected so the block " + "goes to LRU; the threshold is the whole point of Fix 4.0." + ) + assert queue.num_blocks == 0 + assert block.block_id not in queue._meta, ( + "rejected entry must drop its sidecar so a later " + "apply_directives starts from a clean slate." + ) + + def test_try_insert_at_or_above_threshold_enters_queue(self, monkeypatch): + """At-or-above the threshold the regular insertion path runs: + sidecar stays, the block lands in _in_queue, num_blocks goes up.""" + import vllm.v1.core.priority_eviction_queue as pq_mod + + monkeypatch.setattr(pq_mod, "_PRIORITY_THRESHOLD", 60) + queue = PriorityEvictionQueue() + block = _make_block(1) + _set_meta(queue, block, priority=70) + + assert queue.try_insert(block) is True, ( + "priority at/above threshold must enter the queue normally." + ) + assert queue.num_blocks == 1 + assert block.block_id in queue._meta + def test_eviction_order_by_priority(self): queue = PriorityEvictionQueue() blocks = [_make_block(i) for i in range(3)] @@ -113,6 +153,57 @@ def test_stale_heap_entries_are_skipped_in_pop_lowest(self): assert queue.pop_lowest() is block_b assert queue.pop_lowest() is None + def test_reinsert_after_remove_orders_by_current_priority(self): + """Regression (priority inversion): a block removed (touch/reuse) and + re-inserted with an ESCALATED priority must be ordered by the current + priority, not the stale heap tuple left behind by the lazy remove(). + + Sequence: A enters at 50 -> remove() (lazy delete leaves a (50,A) + tuple in the heap) -> A escalated to 90 -> re-inserted (pushes a + (90,A) tuple; both A tuples now satisfy the block_id-in-_in_queue + test). A live priority-70 block B must evict BEFORE A. The buggy + code pops the stale (50,A) tuple first and evicts the + escalated-to-90 block ahead of the 70 block.""" + queue = PriorityEvictionQueue() + a = _make_block(1) + b = _make_block(2) + _set_meta(queue, a, priority=50) + queue.try_insert(a) + queue.remove(a) # touch: lazy delete, stale (50,A) tuple stays + _set_meta(queue, a, priority=90) # escalation via apply_directives + queue.try_insert(a) # re-freed: pushes (90,A); (50,A) still in heap + _set_meta(queue, b, priority=70) + queue.try_insert(b) + assert queue.pop_lowest() is b, ( + "priority-70 block must evict before the escalated-to-90 block; " + "a stale heap tuple must not order eviction by the old priority." + ) + assert queue.pop_lowest() is a + assert queue.pop_lowest() is None + + def test_reinsert_after_remove_orders_by_current_time(self): + """Regression (recency tiebreak): same priority, but a block removed + and re-freed with a NEWER last_freed_time must be ordered by the new + time. The stale tuple carries the OLD (smaller) time and would + otherwise make the block look older-freed than it is, evicting it + before a genuinely older block.""" + queue = PriorityEvictionQueue() + a = _make_block(1) + b = _make_block(2) + _set_meta(queue, a, priority=50, last_freed=100.0) + queue.try_insert(a, last_freed_time=100.0) + queue.remove(a) # stale (t=100,A) left in heap + queue.try_insert(a, last_freed_time=300.0) # re-freed later; A now newest + _set_meta(queue, b, priority=50, last_freed=200.0) + queue.try_insert(b, last_freed_time=200.0) + # A was last freed at 300 (newer than B's 200) -> B evicts first. + assert queue.pop_lowest() is b, ( + "block re-freed at t=300 must evict after the t=200 block; " + "a stale t=100 tuple must not defeat the recency tiebreak." + ) + assert queue.pop_lowest() is a + assert queue.pop_lowest() is None + def test_ttl_not_expired(self, monkeypatch): import time as time_mod @@ -126,7 +217,14 @@ def test_ttl_not_expired(self, monkeypatch): monkeypatch.setattr(time_mod, "monotonic", lambda: 150.0) assert queue.pop_lowest() is block - def test_ttl_expiry(self, monkeypatch): + def test_drain_expired_then_pop_lowest_workflow(self, monkeypatch): + """The full eviction workflow with TTL: drain_expired first + removes expired entries from the queue (caller demotes them to + LRU). pop_lowest then sees only live entries. + + Expired entries no longer leak into pop_lowest — that was the + limbo-fix-era band-aid and is superseded by drain_expired. + """ import time as time_mod queue = PriorityEvictionQueue() @@ -135,11 +233,74 @@ def test_ttl_expiry(self, monkeypatch): monkeypatch.setattr(time_mod, "monotonic", lambda: 100.0) _set_meta(queue, block, priority=50, expiry=200.0) queue.try_insert(block) - # Advance past expiry — block is treated as unprioritized. + + # Advance past expiry. monkeypatch.setattr(time_mod, "monotonic", lambda: 250.0) - # pop_lowest discards expired entries and returns None when none - # remain. + + # New semantic: drain_expired returns the block_id, queue is + # empty afterwards. The caller (BlockPool.get_new_blocks) is + # responsible for routing the corresponding block into the LRU. + drained = queue.drain_expired() + assert drained == [block.block_id] + assert queue.num_blocks == 0 + # pop_lowest now sees an empty queue. assert queue.pop_lowest() is None + + def test_drain_expired_returns_only_expired_block_ids(self, monkeypatch): + """drain_expired returns block_ids whose sidecar.expiry has passed, + and removes those entries from _in_queue + _meta. Non-expired + entries stay in the queue.""" + import time as time_mod + + queue = PriorityEvictionQueue() + b_exp = _make_block(1) + b_live = _make_block(2) + b_no_exp = _make_block(3) + + monkeypatch.setattr(time_mod, "monotonic", lambda: 100.0) + _set_meta(queue, b_exp, priority=50, expiry=150.0) # will expire + _set_meta(queue, b_live, priority=70, expiry=250.0) # still alive + _set_meta(queue, b_no_exp, priority=90, expiry=None) # no TTL + queue.try_insert(b_exp) + queue.try_insert(b_live) + queue.try_insert(b_no_exp) + + # Advance time past b_exp's expiry only. + monkeypatch.setattr(time_mod, "monotonic", lambda: 200.0) + drained = queue.drain_expired() + + assert drained == [b_exp.block_id] + assert b_exp.block_id not in queue._in_queue + assert b_exp.block_id not in queue._meta + assert b_live.block_id in queue._in_queue + assert b_no_exp.block_id in queue._in_queue + assert queue.num_blocks == 2 + + def test_drain_expired_empty_queue_is_noop(self): + """drain_expired on an empty queue returns an empty list.""" + queue = PriorityEvictionQueue() + assert queue.drain_expired() == [] + assert queue.num_blocks == 0 + + def test_try_insert_expired_meta_routes_to_lru(self, monkeypatch): + """try_insert must return False when the sidecar entry is already + expired so the caller (free_blocks) routes the block to the LRU + free list instead of the priority queue. Otherwise the block would + live in the priority queue forever, or land in limbo on the next + pop_lowest.""" + import time as time_mod + + queue = PriorityEvictionQueue() + block = _make_block(1) + monkeypatch.setattr(time_mod, "monotonic", lambda: 100.0) + _set_meta(queue, block, priority=50, expiry=150.0) + # Advance past expiry BEFORE try_insert. + monkeypatch.setattr(time_mod, "monotonic", lambda: 200.0) + assert queue.try_insert(block, last_freed_time=200.0) is False + # The expired sidecar must be cleaned up — otherwise a later + # apply_directives could re-prime the same block back into the + # priority queue. + assert block.block_id not in queue._meta assert queue.num_blocks == 0 @@ -505,6 +666,86 @@ def test_free_prioritized_goes_to_priority_queue(self, monkeypatch): == 12345.0 ) + def test_touch_on_limbo_block_does_not_raise(self): + """A block in neither the priority queue nor the LRU free list + must not crash touch(). This guards against the pre-fix scenario + where pop_lowest silently dropped an expired entry, leaving the + block in limbo and crashing the next prefix-cache hit.""" + pool = self._make_pool() + block = pool.blocks[1] + # Take it out of LRU by hand to simulate the post-pop_lowest + # limbo: ref_cnt=0, not in priority queue, not in free list. + pool.free_block_queue.remove(block) + assert block.prev_free_block is None + assert block.next_free_block is None + assert block not in pool.priority_eviction_queue + assert block.ref_cnt == 0 + # touch() must not raise. + pool.touch([block]) + assert block.ref_cnt == 1 + + def test_get_new_blocks_drains_all_expired_to_lru(self, monkeypatch): + """drain_expired must move ALL expired entries to the LRU, not + just enough to satisfy the current allocation. Otherwise the + next get_new_blocks call would re-fire the cache-eviction storm + on the entries left behind in the priority queue. + + Pre-fix behavior: get_new_blocks(1) pops 1 entry from the + priority queue via pop_lowest, leaves the other 2 expired + entries in the queue. Each subsequent get_new_blocks would + evict another cached block from the map. + + Post-fix behavior: drain_expired moves all 3 expired blocks to + the LRU tail BEFORE any pop happens. get_new_blocks(1) then + pops 1 from the LRU and the other 2 expired blocks sit in the + LRU with their cached hashes intact until normal LRU order + reaches them. + """ + import time as time_mod + + pool = self._make_pool() + + # Stash 3 blocks into the priority queue with expiring sidecars. + monkeypatch.setattr(time_mod, "monotonic", lambda: 100.0) + target_ids = [] + for bid in (1, 2, 3): + block = pool.blocks[bid] + pool.free_block_queue.remove(block) # take out of LRU + _set_meta( + pool.priority_eviction_queue, + block, + priority=50, + expiry=150.0, + last_freed=100.0, + ) + pool.priority_eviction_queue.try_insert(block, last_freed_time=100.0) + target_ids.append(bid) + assert pool.priority_eviction_queue.num_blocks == 3 + lru_before = pool.free_block_queue.num_free_blocks + + # Advance past expiry, then ask for ONE block. + monkeypatch.setattr(time_mod, "monotonic", lambda: 200.0) + allocated = pool.get_new_blocks(1) + assert len(allocated) == 1 + + # Post-fix invariant: the priority queue is fully drained (0 + # entries), AND the LRU has gained 2 entries (we drained 3 and + # consumed 1). + # Pre-fix would leave 2 entries in the priority queue and the + # LRU would have lost 0 entries net (started empty for our 3 + # blocks, ended empty too). + assert pool.priority_eviction_queue.num_blocks == 0, ( + f"priority queue should be drained, has " + f"{pool.priority_eviction_queue.num_blocks} entries" + ) + assert pool.free_block_queue.num_free_blocks == lru_before + 2, ( + f"LRU should have gained 2 demoted-from-priority entries; " + f"got {pool.free_block_queue.num_free_blocks - lru_before} delta" + ) + # Sidecars also cleaned up for all 3. + for bid in target_ids: + assert bid not in pool.priority_eviction_queue._meta + def test_evict_blocks_clears_sidecar(self): pool = self._make_pool() block = pool.blocks[1] @@ -514,6 +755,249 @@ def test_evict_blocks_clears_sidecar(self): pool.evict_blocks({block.block_id}) assert block.block_id not in pool.priority_eviction_queue._meta + def test_priority_queue_pop_preserves_cache_map(self, monkeypatch): + """A block popped from the priority queue keeps its hash in the + cache map. Today the storm: every priority-queue pop wipes the + cache hash; under the fix, only LRU-popleft does. This is the + unit-level guard for the 1B cache-eviction-storm regression. + """ + import time as time_mod + + from vllm.v1.core.kv_cache_utils import ( + BlockHash, + make_block_hash_with_group_id, + ) + + pool = self._make_pool() + block = pool.blocks[1] + # Promote to in-use + cached, then free into the priority queue. + pool.free_block_queue.remove(block) + block.ref_cnt = 1 + raw_hash = BlockHash((42).to_bytes(32, "little")) + h = make_block_hash_with_group_id(raw_hash, 0) + block.block_hash = h + pool.cached_block_hash_to_block.insert(h, block) + + monkeypatch.setattr(time_mod, "monotonic", lambda: 100.0) + _set_meta( + pool.priority_eviction_queue, + block, + priority=50, + expiry=None, + last_freed=100.0, + ) + pool.free_blocks([block]) + assert block in pool.priority_eviction_queue + assert pool.get_cached_block(raw_hash, [0]) is not None + + # Drain the LRU so the next get_new_blocks must dip into the + # priority queue. + for b in list(pool.free_block_queue.get_all_free_blocks()): + if b is not block and b is not pool.null_block: + pool.free_block_queue.remove(b) + b.ref_cnt = 1 + assert pool.free_block_queue.num_free_blocks == 0 + assert pool.priority_eviction_queue.num_blocks == 1 + + # Now pop via get_new_blocks. The block must come back with its + # hash intact and the cache map entry preserved. + allocated = pool.get_new_blocks(1) + assert len(allocated) == 1 + assert allocated[0] is block + assert block.ref_cnt == 1 + # The fix's guarantee: + assert block.block_hash == h, ( + "Block hash was reset on priority-queue pop; the fix should " + "have left it intact." + ) + cached = pool.get_cached_block(raw_hash, [0]) + assert cached is not None and cached[0] is block, ( + "cached_block_hash_to_block entry was evicted on " + "priority-queue pop; the fix should preserve it for " + "subsequent prefix hits." + ) + + def test_lru_popleft_still_clears_cache_map(self): + """LRU eviction's semantics are unchanged: popleft on a cached + block must clear the cache map entry and reset the hash. Guards + against accidentally decoupling the LRU path along with the + priority-queue path.""" + from vllm.v1.core.kv_cache_utils import ( + BlockHash, + make_block_hash_with_group_id, + ) + + pool = self._make_pool() + block = pool.blocks[1] + # Cache the block but leave it in LRU (no retention meta). + pool.free_block_queue.remove(block) + block.ref_cnt = 1 + raw_hash = BlockHash((77).to_bytes(32, "little")) + h = make_block_hash_with_group_id(raw_hash, 0) + block.block_hash = h + pool.cached_block_hash_to_block.insert(h, block) + pool.free_blocks([block]) + # Block is now in LRU with cache map entry intact. + assert block not in pool.priority_eviction_queue + cached = pool.get_cached_block(raw_hash, [0]) + assert cached is not None and cached[0] is block + + # Force LRU drain: ask for everything in LRU. + n_free = pool.free_block_queue.num_free_blocks + pool.get_new_blocks(n_free) + + # LRU semantics: cache map entry is gone, hash is cleared. + assert pool.get_cached_block(raw_hash, [0]) is None, ( + "LRU popleft must still clear cached_block_hash_to_block; " + "the fix targets PQ pop only." + ) + assert block.block_hash is None, ( + "LRU popleft must still reset block.block_hash; the fix " + "targets PQ pop only." + ) + + def test_cache_full_blocks_lazy_cleanup(self, monkeypatch): + """A block popped from the priority queue carries its old hash. + When cache_full_blocks runs on it again with a new hash, the + old hash must be lazily removed from the cache map before the + new hash is registered. This is the integration test that pairs + with the priority-queue-preserves-cache-map test. + + Drives `cache_full_blocks` end-to-end (not the helper sequence) + so that removing the lazy-cleanup branch in block_pool.py would + be caught here. + """ + import time as time_mod + + from vllm.sampling_params import SamplingParams + from vllm.v1.core.kv_cache_utils import ( + BlockHash, + make_block_hash_with_group_id, + ) + + pool = self._make_pool() + block = pool.blocks[1] + pool.free_block_queue.remove(block) + block.ref_cnt = 1 + raw_old = BlockHash((123).to_bytes(32, "little")) + h_old = make_block_hash_with_group_id(raw_old, 0) + block.block_hash = h_old + pool.cached_block_hash_to_block.insert(h_old, block) + + monkeypatch.setattr(time_mod, "monotonic", lambda: 100.0) + _set_meta( + pool.priority_eviction_queue, + block, + priority=50, + expiry=None, + last_freed=100.0, + ) + pool.free_blocks([block]) + + # Drain LRU + pop the priority-queue entry. Block now carries + # h_old; cache map still has h_old → block (per Task 1's fix). + for b in list(pool.free_block_queue.get_all_free_blocks()): + if b is not block and b is not pool.null_block: + pool.free_block_queue.remove(b) + b.ref_cnt = 1 + pool.get_new_blocks(1) + assert block.block_hash == h_old, ( + "PQ pop should preserve the old hash for prefix-hit purposes." + ) + cached_old = pool.get_cached_block(raw_old, [0]) + assert cached_old is not None and cached_old[0] is block + + # Now drive cache_full_blocks with a NEW raw hash. The real code + # path must lazy-clean h_old from the cache map before + # registering h_new — this exercises the + # `if blk.block_hash is not None: self._maybe_evict_cached_block(blk)` + # branch in block_pool.cache_full_blocks. + raw_new = BlockHash((456).to_bytes(32, "little")) + h_new = make_block_hash_with_group_id(raw_new, 0) + + # Minimal stub request — same pattern as + # test_cache_full_blocks_routes_directives_to_queue. Without + # extra_args the retention hook is a no-op, and with + # enable_kv_cache_events=False the events branch is skipped, so + # only block_hashes is load-bearing. + class _Req: + sampling_params: SamplingParams + block_hashes: list + + request = _Req() + request.sampling_params = SamplingParams() + request.block_hashes = [raw_new] + + pool.cache_full_blocks( + request=request, + blocks=[block], + num_cached_blocks=0, + num_full_blocks=1, + block_size=pool.hash_block_size, + kv_cache_group_id=0, + ) + + assert pool.get_cached_block(raw_old, [0]) is None, ( + "cache_full_blocks must lazy-clean the stale hash before " + "assigning the new one." + ) + assert block.block_hash == h_new, ( + "cache_full_blocks must register the new hash on the block." + ) + cached_new = pool.get_cached_block(raw_new, [0]) + assert cached_new is not None and cached_new[0] is block + + def test_prefix_hit_after_priority_queue_pop(self, monkeypatch): + """End-to-end behavior: after a block is popped via the + priority queue (ref_cnt becomes 1), a subsequent + get_cached_block call for the same hash still returns that + block. This is what the eviction-storm fix is for: the cache + map entry survives across a PQ-driven allocation so the next + prefix-matching request can hit. + """ + import time as time_mod + + from vllm.v1.core.kv_cache_utils import ( + BlockHash, + make_block_hash_with_group_id, + ) + + pool = self._make_pool() + block = pool.blocks[1] + pool.free_block_queue.remove(block) + block.ref_cnt = 1 + raw_hash = BlockHash((321).to_bytes(32, "little")) + h = make_block_hash_with_group_id(raw_hash, 0) + block.block_hash = h + pool.cached_block_hash_to_block.insert(h, block) + + monkeypatch.setattr(time_mod, "monotonic", lambda: 100.0) + _set_meta( + pool.priority_eviction_queue, + block, + priority=50, + expiry=None, + last_freed=100.0, + ) + pool.free_blocks([block]) + + # Drain LRU + pop priority queue → block.ref_cnt = 1. + for b in list(pool.free_block_queue.get_all_free_blocks()): + if b is not block and b is not pool.null_block: + pool.free_block_queue.remove(b) + b.ref_cnt = 1 + pool.get_new_blocks(1) + assert block.ref_cnt == 1 + + # The cache hit must still resolve. (The application would now + # touch() this block; touch() handles ref_cnt > 0 correctly + # without trying to remove from any queue.) + hit = pool.get_cached_block(raw_hash, [0]) + assert hit is not None and hit[0] is block, ( + "Prefix-cache hit must succeed after priority-queue pop; " + "this is the cache-eviction-storm regression guard." + ) + class TestStructuralInvariants: """Lock in the spec's 'sidecar pattern' contract: diff --git a/vllm/v1/core/block_pool.py b/vllm/v1/core/block_pool.py index 63a5335db888..16d354e06073 100644 --- a/vllm/v1/core/block_pool.py +++ b/vllm/v1/core/block_pool.py @@ -267,6 +267,13 @@ def cache_full_blocks( # align mode. We skip null blocks here. if blk.is_null: continue + # Lazy cleanup: a block popped from the priority queue + # retains its prior hash (so it could keep serving prefix + # hits between pop and reuse). Now that we're about to + # assign a new hash, drop the stale + # cached_block_hash_to_block entry and reset. + if blk.block_hash is not None: + self._maybe_evict_cached_block(blk) assert blk.block_hash is None block_hash = new_block_hashes[i] @@ -364,39 +371,72 @@ def get_new_blocks(self, num_blocks: int) -> list[KVCacheBlock]: if num_blocks > self.get_num_free_blocks(): raise ValueError(f"Cannot get {num_blocks} free blocks from the pool") + # Demote any expired retention entries to the LRU tail before any + # eviction decision. Expired = "protection released", not "evict + # immediately". Leaving them in the priority queue makes them top + # eviction candidates and resets their cached hashes on the very + # next pop_lowest — destroying prefix cache hit rate (see 1B + # smoke regression). + for block_id in self.priority_eviction_queue.drain_expired(): + self.free_block_queue.append(self.blocks[block_id]) + # Fast path: no prioritized blocks → pure LRU (zero overhead). - ret: list[KVCacheBlock] if self.priority_eviction_queue.num_blocks == 0: ret = self.free_block_queue.popleft_n(num_blocks) - else: - # Drain unprioritized blocks from the LRU free list first, then - # take lowest-priority blocks from the priority queue. - num_from_free = min(num_blocks, self.free_block_queue.num_free_blocks) - ret = ( - self.free_block_queue.popleft_n(num_from_free) if num_from_free else [] - ) - while len(ret) < num_blocks: - evicted = self.priority_eviction_queue.pop_lowest() - assert evicted is not None, ( - "Priority queue empty but more blocks required" - ) - ret.append(evicted) + if self.enable_caching: + for block in ret: + # LRU popleft = explicit "evict now" signal: drop the + # prefix cache entry and reset the hash. + self._maybe_evict_cached_block(block) + assert block.ref_cnt == 0 + block.ref_cnt += 1 + if self.metrics_collector: + self.metrics_collector.on_block_allocated(block) + else: + for block in ret: + assert block.ref_cnt == 0 + block.ref_cnt += 1 + if self.metrics_collector: + self.metrics_collector.on_block_allocated(block) + return ret + + # Mixed path: take what we can from LRU, top up from the + # priority queue. Track the source so we can apply different + # cache-map handling per source. + num_from_free = min(num_blocks, self.free_block_queue.num_free_blocks) + from_lru = ( + self.free_block_queue.popleft_n(num_from_free) if num_from_free else [] + ) + from_pq: list[KVCacheBlock] = [] + while len(from_lru) + len(from_pq) < num_blocks: + evicted = self.priority_eviction_queue.pop_lowest() + assert evicted is not None, "Priority queue empty but more blocks required" + from_pq.append(evicted) - # In order to only iterate the list once, we duplicated code a bit if self.enable_caching: - for block in ret: + for block in from_lru: + # LRU evict semantics: drop cache map + reset hash. self._maybe_evict_cached_block(block) assert block.ref_cnt == 0 block.ref_cnt += 1 if self.metrics_collector: self.metrics_collector.on_block_allocated(block) + for block in from_pq: + # Priority-queue pop = reorder, not evict-from-cache. + # Preserve block.block_hash and the cache map entry; + # cache_full_blocks will clean up lazily when this + # block is reused with a new hash. + assert block.ref_cnt == 0 + block.ref_cnt += 1 + if self.metrics_collector: + self.metrics_collector.on_block_allocated(block) else: - for block in ret: + for block in from_lru + from_pq: assert block.ref_cnt == 0 block.ref_cnt += 1 if self.metrics_collector: self.metrics_collector.on_block_allocated(block) - return ret + return from_lru + from_pq def _maybe_evict_cached_block(self, block: KVCacheBlock) -> bool: """ @@ -444,13 +484,24 @@ def touch(self, blocks: Sequence[KVCacheBlock]) -> None: blocks: A list of blocks to touch. """ for block in blocks: - # ref_cnt=0 means this block is in a free queue (LRU or - # priority); remove it from whichever queue it sits in. + # ref_cnt=0 means this block is logically free; remove it from + # whichever queue it sits in. A block can be in the priority + # queue, the LRU free list, or — defensively — neither (limbo). + # Check membership at the queue level rather than assuming the + # boolean complement of "in priority queue". if block.ref_cnt == 0 and not block.is_null: if block in self.priority_eviction_queue: self.priority_eviction_queue.remove(block) - else: + elif ( + block.prev_free_block is not None + or block.next_free_block is not None + ): self.free_block_queue.remove(block) + # else: block is in neither queue. This is a defensive + # branch — current code paths should keep the two queues + # exhaustive for ref_cnt=0 cached blocks. If it triggers, + # the bookkeeping invariant in priority_eviction_queue + + # free_block_queue has been violated upstream. block.ref_cnt += 1 if self.metrics_collector: self.metrics_collector.on_block_accessed(block) diff --git a/vllm/v1/core/priority_eviction_queue.py b/vllm/v1/core/priority_eviction_queue.py index 789def89227f..f031dba50e1d 100644 --- a/vllm/v1/core/priority_eviction_queue.py +++ b/vllm/v1/core/priority_eviction_queue.py @@ -4,11 +4,22 @@ retention metadata.""" import heapq +import os import time from dataclasses import dataclass from vllm.v1.core.kv_cache_utils import KVCacheBlock +# Priority threshold for entering the priority eviction queue. Entries +# whose sidecar priority falls below this value get routed to the LRU +# free list instead. The LRU has to stay populated; otherwise every +# get_new_blocks call has to pull from the priority queue, and the +# protected entries never get the breathing room to satisfy +# prefix-cache hits (1B Slack-regime regression). Continuum's +# priorities are 90 (system_prompt) / 70 (history) / 50 (tail); the +# default 60 routes tail to LRU and keeps the rest protected. +_PRIORITY_THRESHOLD = int(os.environ.get("VLLM_RETENTION_PRIORITY_THRESHOLD", "60")) + @dataclass(slots=True) class RetentionMeta: @@ -21,8 +32,18 @@ class RetentionMeta: class PriorityEvictionQueue: def __init__(self) -> None: self._meta: dict[int, RetentionMeta] = {} - self._heap: list[tuple[int, float, int, KVCacheBlock]] = [] + self._heap: list[tuple[int, float, int, int, KVCacheBlock]] = [] self._in_queue: set[int] = set() + # Per-block monotonic generation counter. try_insert bumps the + # block's generation and stamps it on the pushed heap tuple; + # pop_lowest skips any tuple whose generation is not the block's + # current one. This closes the lazy-delete hazard: remove() leaves a + # tuple in the heap and a later re-insert pushes a second tuple for + # the same block_id, so without the generation stamp pop_lowest could + # evict using the stale tuple's (priority, last_freed_time) and invert + # the eviction order. _gen is monotonic per block_id (never reset on + # remove/pop, only on clear) and bounded by the physical block count. + self._gen: dict[int, int] = {} @property def num_blocks(self) -> int: @@ -36,8 +57,14 @@ def try_insert( block: KVCacheBlock, last_freed_time: float | None = None, ) -> bool: - """If the block has a sidecar entry, insert into the heap and - return True. Otherwise return False (caller routes elsewhere). + """If the block has an unexpired sidecar entry, insert into the heap + and return True. Otherwise return False (caller routes elsewhere). + + If the sidecar entry exists but is already expired, drop the sidecar + and return False so the caller routes the block to the LRU free + list. Inserting an expired entry would leave the block in the + priority queue indefinitely (or, on subsequent pop_lowest, drop it + into the limbo state where it belongs to no queue at all). last_freed_time, when provided, overrides the value stored in the sidecar entry. This is used by free_blocks() to stamp the current @@ -46,11 +73,25 @@ def try_insert( meta = self._meta.get(block.block_id) if meta is None: return False + if meta.expiry is not None and meta.expiry <= time.monotonic(): + # Expired-on-free: drop sidecar, route to LRU free list. + self._meta.pop(block.block_id, None) + return False + if meta.priority < _PRIORITY_THRESHOLD: + # Below-threshold: drop sidecar, route to LRU. Keeps the + # LRU populated with low-priority cached blocks so + # get_new_blocks can satisfy demand without draining + # protected (high-priority) entries from the priority + # queue. See the priority-threshold spec for rationale. + self._meta.pop(block.block_id, None) + return False if last_freed_time is not None: meta.last_freed_time = last_freed_time + gen = self._gen.get(block.block_id, 0) + 1 + self._gen[block.block_id] = gen heapq.heappush( self._heap, - (meta.priority, meta.last_freed_time, block.block_id, block), + (meta.priority, meta.last_freed_time, gen, block.block_id, block), ) self._in_queue.add(block.block_id) return True @@ -62,26 +103,51 @@ def remove(self, block: KVCacheBlock) -> None: self._in_queue.discard(block.block_id) def pop_lowest(self) -> KVCacheBlock | None: - """Pop the lowest-priority block from the heap. Stale entries - (block_id no longer in _in_queue) and expired entries are skipped. - Returns None when the queue is empty.""" - now = time.monotonic() + """Pop the lowest-priority block from the heap. + + A heap tuple is stale and skipped when either (a) its block_id is no + longer in _in_queue (removed via touch/drain), or (b) its generation + is not the block's current generation — i.e. a later try_insert + pushed a newer tuple for the same block_id after a remove()+re-free + reuse cycle. Skipping (b) keeps eviction ordered by the block's + CURRENT (priority, last_freed_time) instead of a stale tuple's. + + Expired entries are NOT special-cased here: callers must invoke + drain_expired() first to demote expired entries to the LRU free list. + + Returns None only when the heap has no live entries left.""" while self._heap: - _, _, block_id, block = heapq.heappop(self._heap) + _, _, gen, block_id, block = heapq.heappop(self._heap) if block_id not in self._in_queue: continue - meta = self._meta.get(block_id) - if meta is not None and meta.expiry is not None and meta.expiry <= now: - # Expired — discard sidecar and skip (caller treats as - # unprioritized; will be served by the LRU path). - self._in_queue.discard(block_id) - self._meta.pop(block_id, None) + if gen != self._gen.get(block_id): + # Superseded by a newer insert for the same block_id. continue self._in_queue.discard(block_id) self._meta.pop(block_id, None) return block return None + def drain_expired(self) -> list[int]: + """Pop all currently-expired entries off the priority queue and + return their block_ids. The caller is expected to route the + corresponding blocks into the LRU free list — expiry means + "protection released", not "evict immediately". + + Drains lazily: walks _in_queue, checks each entry's sidecar + expiry, and discards from _in_queue + _meta where expired. Stale + heap entries are cleaned up by the next pop_lowest as usual. + """ + now = time.monotonic() + drained: list[int] = [] + for block_id in list(self._in_queue): + meta = self._meta.get(block_id) + if meta is not None and meta.expiry is not None and meta.expiry <= now: + self._in_queue.discard(block_id) + self._meta.pop(block_id, None) + drained.append(block_id) + return drained + def clear_priority(self, block_id: int) -> None: """Drop the sidecar entry for a block (called when its hash is reset or it is permanently evicted from prefix cache).""" @@ -93,6 +159,7 @@ def clear(self) -> None: self._meta.clear() self._heap.clear() self._in_queue.clear() + self._gen.clear() def apply_directives( self,