Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
131 commits
Select commit Hold shift + click to select a range
594a425
WIP
GeneDer Apr 1, 2025
e3fc0a9
Merge branch 'master' into prototype-custom-replica-scheduler
GeneDer Apr 1, 2025
854af70
WIP: refactor locality minxin
GeneDer Apr 1, 2025
4f44870
Merge branch 'master' into prototype-custom-replica-scheduler
GeneDer Apr 1, 2025
33dc3a3
use context vars for tracking flags
GeneDer Apr 1, 2025
603d074
serialize replica scheduler
GeneDer Apr 2, 2025
33d7a53
fix update mulitplexed mode id
GeneDer Apr 2, 2025
2fe7ddf
more refactor
GeneDer Apr 2, 2025
2b4a89e
WIP: still breaking, need to test passing the _RequestSchedulingConte…
GeneDer Apr 4, 2025
df1ac0d
fix bug
jujipotle Apr 22, 2025
762e7ce
Merge branch 'master' into prefix-aware-scheduler
jujipotle Apr 22, 2025
d69314d
Merge branch 'master' into prototype-custom-replica-scheduler
GeneDer Apr 22, 2025
70b1953
begin migrate prefix aware
jujipotle Apr 24, 2025
4105417
Opening up replica_scheduler
jujipotle Apr 25, 2025
9b5e440
implement get_deployment_config
jujipotle Apr 25, 2025
5b28af4
Minimal changes to swap in custom scheduler using llm config, since d…
jujipotle Apr 28, 2025
fea040a
Match requests based on internal_request_id
jujipotle Apr 29, 2025
b205643
Refactor prefix aware scheduler with newer design
jujipotle Apr 30, 2025
bf9d5c6
Wrap ObjectRefGenerator so callback can peek without iterating
jujipotle May 1, 2025
60c9975
Fix callback for update tree to work with streaming responses
jujipotle May 1, 2025
57dd69c
Clean up
jujipotle May 1, 2025
28fee7d
Merge branch 'master' into prototype-custom-replica-scheduler
GeneDer May 1, 2025
19eed43
Callback takes longer with streaming
jujipotle May 1, 2025
75512bf
add get_deployment_config api and use the replica scheduler set onto …
GeneDer May 1, 2025
632568e
lint
GeneDer May 1, 2025
6b88c51
fix getting deployment config
GeneDer May 1, 2025
3d58a07
fix the key passed into choose_replicas
GeneDer May 1, 2025
2ff36e6
WIP
GeneDer May 2, 2025
d0142f4
WIP: fix some tests
GeneDer May 2, 2025
da6d85b
fix all tests
GeneDer May 2, 2025
29886be
fix import
GeneDer May 2, 2025
9090deb
WIP
GeneDer May 5, 2025
dfd3113
refactor common scheduler logics
GeneDer May 5, 2025
db54615
fix
GeneDer May 5, 2025
b9e022d
enable user to pass scheduling stats
GeneDer May 5, 2025
60a31a4
try again
GeneDer May 5, 2025
5a659a7
create alias for public replica scheduler module
GeneDer May 6, 2025
b6315ee
fix choose_replicas interface and add scheduling_stats property
GeneDer May 6, 2025
40bcbc8
fix type
GeneDer May 6, 2025
da329b6
expose ReplicaID and ReplicaSchedulingInfo publically
GeneDer May 6, 2025
2993285
expose the replica scheduler fields publically and allow the deployme…
GeneDer May 7, 2025
9854c40
Merge branch 'master' into prototype-custom-replica-scheduler
GeneDer May 9, 2025
0d7ec47
allow to reconfigure replica scheduler for a given handle
GeneDer May 9, 2025
fb8d416
add on_request_scheduled callback
GeneDer May 9, 2025
34f86e0
allow the replica scheduler to configure to match on the exact request
GeneDer May 9, 2025
dad320c
refactor choose_replicas into choose_replicas_with_backoff and choose…
GeneDer May 12, 2025
cb03495
implement rank_replicas on multiplexed and locality mixin and update …
GeneDer May 12, 2025
2f49b2f
create FIFOMixin and change the default to OOO scheduling
GeneDer May 12, 2025
97c7b7c
add select_available_replicas to filter replicas unavailable to take …
GeneDer May 12, 2025
139fd33
uncomment commented code
GeneDer May 12, 2025
280d966
fix assign_request on LocalRouter
GeneDer May 12, 2025
e7e8983
Merge branch 'master' into prototype-custom-replica-scheduler
GeneDer May 12, 2025
f910031
call on_request_scheduled directly instead of allowing setting with .…
GeneDer May 12, 2025
07ceeab
fix on_request_scheduled interface
GeneDer May 12, 2025
bef7948
Merge branch 'master' into prefix-aware-scheduler
jujipotle May 12, 2025
0756a1c
Merge branch 'master' into prototype-custom-replica-scheduler
GeneDer May 12, 2025
d1dd98b
allow the controller to update scheduling stats
GeneDer May 12, 2025
60d1df4
Merge branch 'genesu-prototype-custom-replica-scheduler' into prefix-…
jujipotle May 12, 2025
5e58b11
fixes
GeneDer May 13, 2025
935cd1f
allow the controller to update scheduling stats
GeneDer May 12, 2025
587f0a6
fixes
GeneDer May 13, 2025
af8dbe1
update the choose_replicas to be a list of list instead of list of se…
GeneDer May 13, 2025
b0bd482
Clean up to match gene's work
jujipotle May 13, 2025
a49a794
add some newlines
jujipotle May 13, 2025
526a7cf
Merge remote-tracking branch 'upstream/genesu-prototype-custom-replic…
jujipotle May 13, 2025
7afc1bf
add requestSchedulingStatsPeriodS and requestSchedulingStatsTimeoutS …
GeneDer May 13, 2025
77f81f4
fix controller test
GeneDer May 13, 2025
60e0740
implement get_scheduling_stats onto MockReplicaActorWrapper
GeneDer May 13, 2025
bb84acc
update interface for ranking helpers
GeneDer May 13, 2025
b4b12d8
lint
GeneDer May 13, 2025
935bbad
Merge branch 'prototype-custom-replica-scheduler' into genesu-prototy…
GeneDer May 13, 2025
b1f1c6d
fix recording scheduling stats
GeneDer May 13, 2025
3befc66
Merge branch 'prototype-custom-replica-scheduler' into genesu-prototy…
GeneDer May 13, 2025
f48bc60
expoxe ReplicaResult publically
GeneDer May 14, 2025
8f3ebbe
Merge branch 'prototype-custom-replica-scheduler' into genesu-prototy…
GeneDer May 14, 2025
94e91c1
Get prefix aware scheduler working
jujipotle May 14, 2025
8f2798a
Merge remote-tracking branch 'upstream/genesu-prototype-custom-replic…
jujipotle May 14, 2025
67dea5c
small fixes
jujipotle May 14, 2025
eebe613
Make PrefixAware inherit Pow2, edit PrefixTree
jujipotle May 15, 2025
8e601c9
Apply chat template before prefix matching
jujipotle May 15, 2025
8b0d64e
Autoscaling, make vllm request processing an option and not default
jujipotle May 15, 2025
fe0f7bf
Eviction
jujipotle May 17, 2025
8943097
Add eviction loop, benchmark pow2 against prefix aware
jujipotle May 19, 2025
5b77a85
cleanup
jujipotle May 19, 2025
bcaa08c
Fix prompt extraction bug
jujipotle May 19, 2025
b242923
Linting
jujipotle May 19, 2025
12be5a7
Fix eviction loop to be non-blocking, add eviction threshold and targ…
jujipotle May 20, 2025
9186b4c
Move eviction loop to inside prefix tree, not scheduler. Add unit tes…
jujipotle May 20, 2025
0e5412d
Clean up prefix aware scheduler init
jujipotle May 22, 2025
37a098f
Merge branch 'master' into prefix-aware-scheduler
jujipotle May 22, 2025
cd655e2
E2E tests
jujipotle May 24, 2025
6720a7e
Remove vllm metrics hacking
jujipotle May 27, 2025
c970e55
Prevent repetitive eviction loops, add on_request_scheduled to pow2
jujipotle May 27, 2025
5cb7541
add E2E tests. Need to figure out why benchmark is slowing down.
jujipotle May 28, 2025
a0b9650
Fix benchmark overhead. Moved benchmarking scripts to ray.
jujipotle May 28, 2025
2fdc715
Delete large files
jujipotle May 28, 2025
81ffe6e
Delete some benchmarking results, start writing replication_tutorial.md
jujipotle May 29, 2025
748956f
Replication tutorial
jujipotle May 29, 2025
3b6bce4
Linting
jujipotle May 29, 2025
b903c5c
Clean up file directory structure; edit replication_tutorial instruct…
jujipotle May 30, 2025
75cf584
Edit replication_tutorial.md
jujipotle May 30, 2025
7f82e4f
Merge remote-tracking branch 'upstream/master' into prefix-aware-sche…
jujipotle May 30, 2025
6fd2e8c
Merge remote-tracking branch 'upstream/master' into prefix-aware-sche…
jujipotle May 30, 2025
0d35848
Manually make my PR similar to master
jujipotle May 30, 2025
da87c0f
Rename scheduler -> router
jujipotle May 30, 2025
85d1f35
Rename replica scheduling -> request routing
jujipotle May 30, 2025
a33282f
Remove logs
jujipotle May 30, 2025
598ce92
Update docs
jujipotle May 30, 2025
e1cb298
Explain how to visualize results
jujipotle May 30, 2025
37f445a
Fix import error that was preventing CI from passing
jujipotle May 31, 2025
109a23a
Merge master, resolve conflicts in test_prefix_tree.py and setup-dev.py
eicherseiji Jun 2, 2025
a12c869
Add type hints
eicherseiji Jun 2, 2025
4458cbc
Lint
eicherseiji Jun 2, 2025
361b381
Fix parameter name
eicherseiji Jun 3, 2025
dfb6166
Add back packages to link
eicherseiji Jun 3, 2025
667c055
Merge branch 'master' into prefix-aware-scheduler
eicherseiji Jun 3, 2025
aaadb12
Adapt to new choose_replicas signature
eicherseiji Jun 3, 2025
2efabd2
Move benchmark scripts to https://github.com/anyscale/serve-llm-repli…
eicherseiji Jun 4, 2025
a00c7dc
Remove injected stats logger
eicherseiji Jun 4, 2025
25c3cdf
Convert warning logs to debugs
eicherseiji Jun 4, 2025
5b21833
Expose prefix aware router via request_router __init__.py
eicherseiji Jun 4, 2025
ecaca59
Don't export PrefixAwareRequestRouter from serve since it's only used…
eicherseiji Jun 4, 2025
16b2426
Merge branch 'master' into prefix-aware-scheduler
eicherseiji Jun 5, 2025
62d9a47
Remove _track_metrics from PR
eicherseiji Jun 5, 2025
b4c3f2b
Update to call base class with warning that autoscaling not supported
eicherseiji Jun 5, 2025
0cc56e4
Change name to PrefixAwarePow2ReplicaScheduler
eicherseiji Jun 5, 2025
aa3b071
Use a detached actor to avoid issues with actor lifetime
eicherseiji Jun 6, 2025
e7c8a34
Change name to end with 'Router'
eicherseiji Jun 6, 2025
dae5f4e
Autoscaling is now handled by the prefix tree with detached lifetime
eicherseiji Jun 6, 2025
e5c39c7
Set default LLMServer router to PowerOfTwoChoicesRequestRouter
eicherseiji Jun 8, 2025
fd089ac
Don't override Ray Serve router class in LLMDeployment
eicherseiji Jun 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -694,7 +694,6 @@ def as_deployment(


@serve.deployment(
# TODO make this configurable
autoscaling_config={
"min_replicas": 1,
"initial_replicas": 1,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
from __future__ import annotations

import asyncio
import logging
import os
from threading import RLock
from typing import Any, Dict, List, Optional, Tuple

import ray
from ray.serve._private.constants import (
SERVE_LOGGER_NAME,
)

logger = logging.getLogger(__name__)
logger = logging.getLogger(SERVE_LOGGER_NAME)


class Node:
Expand Down Expand Up @@ -86,12 +90,13 @@ def __init__(self) -> None:
# Root is always the head of the LRU list for each tenant.
self.root: Node = Node()

# Tracks total character count per tenant. Can be used by the replica scheduler to determine which tenant to evict, and by how much.
# Tracks total character count per tenant. Can be used by the replica request router to determine which tenant to evict, and by how much.
# Also uses the keys to track the active tenants in the tree.
self.tenant_to_char_count: Dict[str, int] = {}

# LRU tracking - root is always the head, tail is the least recently used.
self.tenant_to_lru_tail: Dict[str, Optional[Node]] = {}
self._eviction_task: Optional[asyncio.Task] = None

@staticmethod
def _shared_prefix_count(a: str, b: str) -> int:
Expand All @@ -113,34 +118,15 @@ def _get_lru_chain(self, tenant: str) -> List[Node]:
Note: This method is intended to be used only in tests.
"""
with self.lock:
if tenant not in self.tenant_to_char_count:
return []
nodes = []
current_node = self.root
while current_node:
nodes.append(current_node)
current_node = current_node.tenant_to_older_node.get(tenant)
return nodes

def _add_tenant(self, tenant: str) -> None:
"""
Add a new tenant to the tree.

If the tenant already exists, this is a no-op with a warning log.

Args:
tenant: Tenant to add
"""
with self.lock:
if tenant in self.tenant_to_char_count:
logger.warning(f"Tenant '{tenant}' already exists. No action taken.")
return

self.tenant_to_char_count[tenant] = 0
self.tenant_to_lru_tail[tenant] = self.root

# Initialize the root node as the head of the LRU list for this tenant
self.root.tenant_to_newer_node[tenant] = None
self.root.tenant_to_older_node[tenant] = None

def _insert_node_into_linked_list(
self,
node: Node,
Expand All @@ -153,7 +139,9 @@ def _insert_node_into_linked_list(
"""
with self.lock:
if tenant not in self.tenant_to_char_count:
logger.warning(f"Tenant '{tenant}' does not exist. No action taken.")
logger.debug(
f"[_insert_node_into_linked_list] Tenant '{tenant}' does not exist. No action taken."
)
return

# Skip if node is the root
Expand All @@ -178,7 +166,9 @@ def _remove_node_from_linked_list(self, node: Node, tenant: str) -> None:
"""
with self.lock:
if tenant not in self.tenant_to_char_count:
logger.warning(f"Tenant '{tenant}' does not exist. No action taken.")
logger.debug(
f"[_remove_node_from_linked_list] Tenant '{tenant}' does not exist. No action taken."
)
return

# Skip if node is the root
Expand Down Expand Up @@ -216,11 +206,13 @@ def _remove_tenant_single_node(self, tenant: str, node: Node) -> int:
"""
with self.lock:
if tenant not in self.tenant_to_char_count:
logger.warning(f"Tenant '{tenant}' does not exist. No action taken.")
logger.debug(
f"[_remove_tenant_single_node] Tenant '{tenant}' does not exist. No action taken."
)
return 0
if tenant not in node.tenant_to_last_access_time:
logger.warning(
f"Tenant '{tenant}' does not have node '{node.text}'. No action taken."
logger.debug(
f"[_remove_tenant_single_node] Tenant '{tenant}' does not have node '{node.text}'. No action taken."
)
return 0

Expand All @@ -239,11 +231,38 @@ def _remove_tenant_single_node(self, tenant: str, node: Node) -> int:

return removed_chars_len

def add_tenants(self, tenants: List[str], time_s: float) -> None:
"""
Add multiple new tenants to the tree. Also inserts an empty string for each tenant into the tree.

For each tenant that already exists, a warning is logged and that tenant is skipped.

Args:
tenants: List of tenants to add
time_s: Current timestamp in seconds
"""
with self.lock:
for tenant in tenants:
if tenant in self.tenant_to_char_count:
logger.debug(
f"[_add_tenants] Tenant '{tenant}' already exists. Skipping."
)
continue

self.tenant_to_char_count[tenant] = 0
self.tenant_to_lru_tail[tenant] = self.root

# Initialize the root node as the head of the LRU list for this tenant
self.root.tenant_to_newer_node[tenant] = None
self.root.tenant_to_older_node[tenant] = None
self.insert("", tenant, time_s)

def insert(self, text: str, tenant: str, time_s: float) -> None:
"""
Insert text into tree for a specific tenant.
Insert text into tree for a specific tenant, but only if the tenant already exists.

If the tenant doesn't already exist in the tree, it will be automatically added.
If the tenant doesn't exist in the tree, this will log a warning and return without
inserting anything. Use add_tenants() first to add a new tenant.

Args:
text: Text to insert
Expand All @@ -263,7 +282,10 @@ def insert(self, text: str, tenant: str, time_s: float) -> None:
"""
with self.lock:
if tenant not in self.tenant_to_char_count:
self._add_tenant(tenant)
logger.debug(
f"[_insert] Tenant '{tenant}' does not exist. Use add_tenants() first."
)
return

curr_node: Node = self.root
i: int = 0
Expand Down Expand Up @@ -373,10 +395,6 @@ def prefix_match(
If the list of available tenants doesn't match any tenants in the tree: returns ("", None)
When no prefix match is found (does not traverse further than the root node): returns ("", list of available tenants)
When a prefix match is found: returns (matched_prefix, list of tenants that own the matched node)

Note:
A tenant is unable to be returned by prefix_match until it has inserted text into the tree, even if _add_tenant is called.
The replica scheduler is responsible for inserting text into new replicas; it should not only rely on prefix_match to select replicas.
"""
with self.lock:
if available_tenants:
Expand Down Expand Up @@ -433,38 +451,47 @@ def prefix_match(

return matched_text, matched_tenants

def remove_tenant(self, tenant: str) -> int:
def remove_tenants(self, tenants: List[str]) -> Dict[str, int]:
"""
Remove a tenant and all its nodes from the tree.
Time complexity: O(n) where n is the number of nodes owned by the tenant.
Remove multiple tenants and all their nodes from the tree.
Time complexity: O(n) where n is the total number of nodes owned by all tenants.

Args:
tenant: Tenant to remove
tenants: List of tenants to remove

Returns:
Number of characters removed (0 if tenant doesn't exist)
Dictionary mapping each tenant to the number of characters removed
(0 if tenant doesn't exist)
"""
chars_removed: Dict[str, int] = {}

with self.lock:
if tenant not in self.tenant_to_char_count:
logger.warning(f"Tenant '{tenant}' does not exist. No action taken.")
return 0
for tenant in tenants:
if tenant not in self.tenant_to_char_count:
logger.debug(
f"[_remove_tenants] Tenant '{tenant}' does not exist. Skipping."
)
chars_removed[tenant] = 0
continue

total_chars_removed: int = 0
tenant_chars_removed: int = 0

# Start from the tail and remove all nodes
current_tail = self.tenant_to_lru_tail.get(tenant)
while current_tail:
newer_neighbor = current_tail.tenant_to_newer_node.get(tenant)
total_chars_removed += self._remove_tenant_single_node(
tenant, current_tail
)
current_tail = newer_neighbor
# Start from the tail and remove all nodes
current_tail = self.tenant_to_lru_tail.get(tenant)
while current_tail:
newer_neighbor = current_tail.tenant_to_newer_node.get(tenant)
tenant_chars_removed += self._remove_tenant_single_node(
tenant, current_tail
)
current_tail = newer_neighbor

# Clean up tenant references
self.tenant_to_char_count.pop(tenant, None)
self.tenant_to_lru_tail.pop(tenant, None)
# Clean up tenant references
self.tenant_to_char_count.pop(tenant, None)
self.tenant_to_lru_tail.pop(tenant, None)

return total_chars_removed
chars_removed[tenant] = tenant_chars_removed

return chars_removed

def evict_tenant_by_lru(self, tenant: str, min_remove_size: int) -> int:
"""
Expand All @@ -485,14 +512,14 @@ def evict_tenant_by_lru(self, tenant: str, min_remove_size: int) -> int:
"""
with self.lock:
if tenant not in self.tenant_to_char_count:
logger.warning(
f"Cannot evict tenant '{tenant}': tenant does not exist. No action taken."
logger.debug(
f"[_evict_tenant_by_lru] Cannot evict tenant '{tenant}': tenant does not exist. No action taken."
)
return 0

if self.tenant_to_char_count[tenant] < min_remove_size:
logger.warning(
f"Cannot evict {min_remove_size} characters from tenant '{tenant}', which has only "
logger.debug(
f"[_evict_tenant_by_lru] Cannot evict {min_remove_size} characters from tenant '{tenant}', which has only "
f"{self.tenant_to_char_count[tenant]} characters. Will remove all available characters."
)
min_remove_size = self.tenant_to_char_count[tenant]
Expand Down Expand Up @@ -525,22 +552,65 @@ def evict_tenant_by_lru(self, tenant: str, min_remove_size: int) -> int:

return total_chars_removed

def get_smallest_tenant(self) -> Optional[str]:
def get_smallest_tenants(self) -> Optional[List[str]]:
"""
Get the tenant with the smallest total character count.
Get the tenants with the smallest total character count.

Returns:
Tenant with smallest character count, or None if no tenants
Tenants with smallest character count, or None if no tenants
"""
with self.lock:
if not self.tenant_to_char_count:
return None

return min(
self.tenant_to_char_count,
key=self.tenant_to_char_count.get,
default=None,
)
min_count = min(self.tenant_to_char_count.values())
return [
tenant
for tenant, count in self.tenant_to_char_count.items()
if count == min_count
]

def start_eviction_loop(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be more like a background thread. (event loop should not be kept busy because of eviction)

self, eviction_threshold: int, eviction_target: int, interval_secs: float
) -> bool:
"""Start a single eviction loop within the actor itself
Parameters:
eviction_threshold: Minimum number of characters a tenant must have to be evicted
eviction_target: The maximum number of characters a tenant should have after eviction
interval_secs: Number of seconds between eviction checks

Returns:
True if the loop was started, False if it was already running
"""
with self.lock:
if self._eviction_task is None:
self._eviction_task = asyncio.create_task(
self._run_eviction_loop(
eviction_threshold, eviction_target, interval_secs
)
)
return True
else:
logger.debug("[_start_eviction_loop] Eviction loop already running")
return False

async def _run_eviction_loop(
self, eviction_threshold, eviction_target, interval_secs
):
while True:
await asyncio.sleep(interval_secs)
with self.lock:
for tenant, char_count in self.tenant_to_char_count.items():
if char_count > eviction_threshold:
excess = char_count - eviction_target
self.evict_tenant_by_lru(tenant, excess)

def stop_eviction_loop(self):
with self.lock:
if self._eviction_task:
self._eviction_task.cancel()
# self._eviction_task.close()
self._eviction_task = None


@ray.remote
Expand All @@ -551,3 +621,6 @@ def getattr(self, attribute: str) -> Any:
Note: This method is intended to be used only in tests.
"""
return getattr(self, attribute)

def setattr(self, attribute: str, value: Any) -> None:
setattr(self, attribute, value)
Loading