Skip to content

Commit 50cd76a

Browse files
youkaichaozhuohan123
authored andcommitted
[core] simplify seq group code (vllm-project#9569)
Co-authored-by: Zhuohan Li <[email protected]>
1 parent fa75d40 commit 50cd76a

File tree

6 files changed

+62
-566
lines changed

6 files changed

+62
-566
lines changed

tests/core/test_chunked_prefill_scheduler.py

Lines changed: 0 additions & 153 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import pytest # noqa
55

66
from vllm.config import CacheConfig, SchedulerConfig
7-
from vllm.core.interfaces import AllocStatus
87
from vllm.core.scheduler import Scheduler
98
from vllm.sequence import Logprob, SequenceGroup
109

@@ -347,158 +346,6 @@ def test_prompt_limit_exceed():
347346
assert out.ignored_seq_groups[0] == seq_group
348347

349348

350-
def test_swap():
351-
"""Verify swapping works with chunked prefill requests"""
352-
block_size = 4
353-
max_seqs = 30
354-
max_model_len = 200
355-
max_num_batched_tokens = 30
356-
scheduler_config = SchedulerConfig(
357-
"generate",
358-
max_num_batched_tokens,
359-
max_seqs,
360-
max_model_len,
361-
enable_chunked_prefill=True,
362-
)
363-
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
364-
cache_config.num_cpu_blocks = 16
365-
cache_config.num_gpu_blocks = 16
366-
scheduler = Scheduler(scheduler_config, cache_config, None)
367-
368-
_, seq_group = create_dummy_prompt("1",
369-
prompt_length=60,
370-
best_of=2,
371-
block_size=block_size)
372-
scheduler.add_seq_group(seq_group)
373-
_, out = schedule_and_update_computed_tokens(scheduler)
374-
# The request is chunked.
375-
# prefill scheduled now.
376-
assert len(out.scheduled_seq_groups) == 1
377-
assert out.num_prefill_groups == 1
378-
assert seq_group.is_prefill()
379-
assert out.num_batched_tokens == max_num_batched_tokens
380-
381-
# The last request should be swapped out.
382-
scheduler.block_manager.can_append_slots = MagicMock()
383-
384-
def cannot_append_second_group(seq_group, num_lookahead_slots):
385-
return seq_group.request_id != "1"
386-
387-
scheduler.block_manager.can_append_slots.side_effect = (
388-
cannot_append_second_group)
389-
390-
# The running prefill is now swapped.
391-
_, out = schedule_and_update_computed_tokens(scheduler)
392-
assert len(out.scheduled_seq_groups) == 0
393-
assert out.num_batched_tokens == 0
394-
assert out.blocks_to_swap_out != []
395-
assert out.blocks_to_swap_in == []
396-
397-
# Add 1 more task. Swap should be prioritized over new prefill.
398-
_, seq_group = create_dummy_prompt("2", prompt_length=60)
399-
scheduler.add_seq_group(seq_group)
400-
_, out = schedule_and_update_computed_tokens(scheduler)
401-
assert len(out.scheduled_seq_groups) == 1
402-
# 3 decodes. It is swapped in.
403-
assert out.num_batched_tokens == 30
404-
assert out.blocks_to_swap_in != []
405-
assert out.blocks_to_swap_out == []
406-
407-
408-
def test_running_prefill_prioritized_over_swap():
409-
block_size = 4
410-
max_seqs = 30
411-
max_model_len = 200
412-
max_num_batched_tokens = 30
413-
scheduler_config = SchedulerConfig(
414-
"generate",
415-
max_num_batched_tokens,
416-
max_seqs,
417-
max_model_len,
418-
enable_chunked_prefill=True,
419-
)
420-
cache_config = CacheConfig(block_size, 1.0, 1, "auto")
421-
cache_config.num_cpu_blocks = 32
422-
cache_config.num_gpu_blocks = 32
423-
scheduler = Scheduler(scheduler_config, cache_config, None)
424-
425-
_, seq_group = create_dummy_prompt("1",
426-
prompt_length=60,
427-
best_of=2,
428-
block_size=block_size)
429-
scheduler.add_seq_group(seq_group)
430-
_, out = schedule_and_update_computed_tokens(scheduler)
431-
# The request is chunked.
432-
# prefill scheduled now.
433-
assert len(out.scheduled_seq_groups) == 1
434-
assert out.num_prefill_groups == 1
435-
assert seq_group.is_prefill()
436-
assert out.num_batched_tokens == max_num_batched_tokens
437-
438-
# The request should be swapped out.
439-
scheduler.block_manager.can_append_slots = MagicMock()
440-
441-
def cannot_append_second_group(seq_group, num_lookahead_slots):
442-
return seq_group.request_id != "1"
443-
444-
scheduler.block_manager.can_append_slots.side_effect = (
445-
cannot_append_second_group)
446-
447-
# The running prefill is now swapped.
448-
_, out = schedule_and_update_computed_tokens(scheduler)
449-
assert len(out.scheduled_seq_groups) == 0
450-
assert out.num_batched_tokens == 0
451-
assert out.blocks_to_swap_out != []
452-
assert out.blocks_to_swap_in == []
453-
454-
# Add 1 more task. Swap is not possible, so prefill is running.
455-
scheduler.block_manager.can_swap_in = MagicMock()
456-
scheduler.block_manager.can_swap_in.return_value = AllocStatus.LATER
457-
458-
_, seq_group2 = create_dummy_prompt("2",
459-
prompt_length=60,
460-
block_size=block_size)
461-
scheduler.add_seq_group(seq_group2)
462-
_, out = schedule_and_update_computed_tokens(scheduler)
463-
assert len(out.scheduled_seq_groups) == 1
464-
# 3 decodes. It is swapped in.
465-
assert out.num_batched_tokens == 30
466-
assert out.blocks_to_swap_in == []
467-
assert out.blocks_to_swap_out == []
468-
assert out.scheduled_seq_groups[0].seq_group == seq_group2
469-
470-
# Now although swap is possible, running prefill is prioritized.
471-
scheduler.block_manager.can_swap_in.return_value = AllocStatus.OK
472-
_, out = schedule_and_update_computed_tokens(scheduler)
473-
assert len(out.scheduled_seq_groups) == 1
474-
# 3 decodes. It is swapped in.
475-
assert out.num_batched_tokens == 30
476-
assert out.blocks_to_swap_in == []
477-
assert out.blocks_to_swap_out == []
478-
assert not seq_group2.is_prefill()
479-
assert out.scheduled_seq_groups[0].seq_group == seq_group2
480-
append_new_token(seq_group2, 1)
481-
482-
# Decoding is prioritized.
483-
_, out = schedule_and_update_computed_tokens(scheduler)
484-
assert len(out.scheduled_seq_groups) == 1
485-
# 3 decodes. It is swapped in.
486-
assert out.num_batched_tokens == 1
487-
assert out.blocks_to_swap_in == []
488-
assert out.blocks_to_swap_out == []
489-
assert not seq_group2.is_prefill()
490-
assert out.scheduled_seq_groups[0].seq_group == seq_group2
491-
append_new_token(seq_group2, 1)
492-
493-
# Since we abort the sequence group, we can finally swap.
494-
scheduler.abort_seq_group(seq_group2.request_id)
495-
_, out = schedule_and_update_computed_tokens(scheduler)
496-
assert len(out.scheduled_seq_groups) == 1
497-
assert out.num_batched_tokens == 30
498-
assert out.blocks_to_swap_in != []
499-
assert out.blocks_to_swap_out == []
500-
501-
502349
def test_chunked_prefill_preempt():
503350
"""Verify preempt works with chunked prefill requests"""
504351
block_size = 4

tests/core/test_scheduler.py

Lines changed: 1 addition & 203 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from vllm.core.interfaces import AllocStatus
1111
from vllm.core.scheduler import Scheduler, SchedulingBudget
1212
from vllm.lora.request import LoRARequest
13-
from vllm.sequence import SequenceGroup, SequenceStatus
13+
from vllm.sequence import SequenceGroup
1414

1515
from .utils import (append_new_token, append_new_token_seq_group,
1616
create_dummy_prompt, get_sequence_groups,
@@ -296,55 +296,6 @@ def test_scheduler_delay_factor():
296296
append_new_token(out, 1)
297297

298298

299-
def test_swapped_out_prioritized():
300-
block_size = 4
301-
scheduler = initialize_scheduler(max_num_seqs=6,
302-
block_size=block_size,
303-
num_cpu_blocks=64,
304-
num_gpu_blocks=64)
305-
# best_of=2 * 3 == 6 sequences.
306-
for i in range(3):
307-
_, seq_group = create_dummy_prompt(str(i),
308-
prompt_length=60,
309-
best_of=2,
310-
block_size=block_size)
311-
scheduler.add_seq_group(seq_group)
312-
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
313-
# prefill scheduled now.
314-
assert len(out.scheduled_seq_groups) == 3
315-
append_new_token(out, 1)
316-
317-
# The last request should be swapped out.
318-
scheduler.block_manager.can_append_slots = MagicMock()
319-
320-
def cannot_append_second_group(seq_group, num_lookahead_slots):
321-
return seq_group.request_id != "2"
322-
323-
scheduler.block_manager.can_append_slots.side_effect = (
324-
cannot_append_second_group)
325-
326-
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
327-
assert len(out.scheduled_seq_groups) == 2
328-
assert out.num_batched_tokens == 2
329-
assert out.blocks_to_swap_out != []
330-
assert out.blocks_to_swap_in == []
331-
append_new_token(out, 1)
332-
333-
# Add 1 more task. Swap should be prioritized over prefill.
334-
_, seq_group = create_dummy_prompt(str(i),
335-
prompt_length=60,
336-
best_of=2,
337-
block_size=block_size)
338-
scheduler.add_seq_group(seq_group)
339-
seq_group_meta, out = schedule_and_update_computed_tokens(scheduler)
340-
append_new_token(out, 1)
341-
assert len(out.scheduled_seq_groups) == 3
342-
# 3 decodes. It is swapped in.
343-
assert out.num_batched_tokens == 3
344-
assert out.blocks_to_swap_in != []
345-
assert out.blocks_to_swap_out == []
346-
347-
348299
def initialize_scheduler(
349300
*,
350301
max_num_seqs=1000,
@@ -646,60 +597,6 @@ def cannot_append_second_group(seq_group, num_lookahead_slots):
646597
assert output.blocks_to_copy == []
647598

648599

649-
def test_decode_swap_beam_search():
650-
"""
651-
Test best_of > 1 swap out blocks
652-
"""
653-
block_size = 4
654-
scheduler = initialize_scheduler(block_size=block_size,
655-
num_gpu_blocks=64,
656-
num_cpu_blocks=64)
657-
curr_loras = None
658-
budget = create_token_budget()
659-
for i in range(3):
660-
_, seq_group = create_dummy_prompt(str(i),
661-
prompt_length=60,
662-
best_of=2,
663-
block_size=block_size)
664-
scheduler._allocate_and_set_running(seq_group)
665-
scheduler._add_seq_group_to_running(seq_group)
666-
append_new_token_seq_group(60, seq_group, 1)
667-
budget.add_num_seqs(seq_group.request_id,
668-
seq_group.get_max_num_running_seqs())
669-
budget.add_num_batched_tokens(
670-
seq_group.request_id, seq_group.num_seqs(SequenceStatus.RUNNING))
671-
672-
# The last request should be swapped out.
673-
scheduler.block_manager.can_append_slots = MagicMock()
674-
675-
def cannot_append_second_group(seq_group, num_lookahead_slots):
676-
return seq_group.request_id != "2"
677-
678-
scheduler.block_manager.can_append_slots.side_effect = (
679-
cannot_append_second_group)
680-
scheduler.block_manager.swap_out = MagicMock()
681-
expected_swap_mapping = [("5", "7")]
682-
scheduler.block_manager.swap_out.return_value = expected_swap_mapping
683-
684-
output = scheduler._schedule_running(budget, curr_loras)
685-
remainig_running = scheduler.running
686-
assert len(remainig_running) == 0
687-
assert len(output.decode_seq_groups) == 2
688-
assert len(output.prefill_seq_groups) == 0
689-
assert output.decode_seq_groups[0].seq_group.request_id == "0"
690-
assert output.decode_seq_groups[1].seq_group.request_id == "1"
691-
assert len(output.preempted) == 0
692-
assert len(output.swapped_out) == 1
693-
# Budget should refledct preempted requests.
694-
assert budget.num_batched_tokens == 2
695-
# since there are 2 sequences, 2 should be subtracted.
696-
assert budget.num_curr_seqs == 4
697-
# Both should be preempted, not swapped.
698-
assert output.blocks_to_swap_out == expected_swap_mapping
699-
# Nothing is copied.
700-
assert output.blocks_to_copy == []
701-
702-
703600
def test_schedule_decode_blocks_to_copy_update():
704601
"""
705602
Verify blocks_to_copy is updated.
@@ -736,105 +633,6 @@ def test_schedule_decode_blocks_to_copy_update():
736633
assert output.blocks_to_copy == [(2, 3)]
737634

738635

739-
def test_schedule_swapped_simple():
740-
block_size = 4
741-
scheduler = initialize_scheduler(block_size=block_size)
742-
curr_loras = None
743-
blocks_to_swap_out: List[Tuple[int, int]] = []
744-
_, seq_group = create_dummy_prompt("1",
745-
prompt_length=4,
746-
best_of=2,
747-
block_size=block_size)
748-
scheduler._allocate_and_set_running(seq_group)
749-
append_new_token_seq_group(4, seq_group, 1)
750-
scheduler._swap_out(seq_group, blocks_to_swap_out)
751-
scheduler._add_seq_group_to_swapped(seq_group)
752-
753-
budget = create_token_budget()
754-
output = scheduler._schedule_swapped(budget, curr_loras)
755-
remaining_swapped = scheduler.swapped
756-
assert len(remaining_swapped) == 0
757-
assert budget.num_batched_tokens == 1
758-
assert budget.num_curr_seqs == 2
759-
assert len(output.decode_seq_groups) == 1
760-
assert len(output.prefill_seq_groups) == 0
761-
# swap in is the reverse of swap out
762-
blocks_to_swap_in_reverse = []
763-
for swapin, swapout in output.blocks_to_swap_in:
764-
blocks_to_swap_in_reverse.append((swapout, swapin))
765-
assert blocks_to_swap_out == blocks_to_swap_in_reverse
766-
767-
768-
def test_schedule_swapped_max_token_budget():
769-
block_size = 4
770-
scheduler = initialize_scheduler(block_size=block_size,
771-
num_cpu_blocks=32,
772-
num_gpu_blocks=32)
773-
curr_loras = None
774-
blocks_to_swap_out: List[Tuple[int, int]] = []
775-
for i in range(2):
776-
_, seq_group = create_dummy_prompt(str(i), prompt_length=60, best_of=2)
777-
scheduler._allocate_and_set_running(seq_group)
778-
append_new_token_seq_group(60, seq_group, 1)
779-
scheduler._swap_out(seq_group, blocks_to_swap_out)
780-
scheduler._add_seq_group_to_swapped(seq_group)
781-
782-
budget = create_token_budget(token_budget=1)
783-
output = scheduler._schedule_swapped(budget, curr_loras)
784-
remaining_swapped = scheduler.swapped
785-
assert len(remaining_swapped) == 1
786-
assert budget.num_batched_tokens == 1
787-
assert budget.num_curr_seqs == 2
788-
assert len(output.decode_seq_groups) == 1
789-
assert len(output.prefill_seq_groups) == 0
790-
791-
# Verify num_batched_tokens are respected.
792-
budget = create_token_budget(token_budget=1)
793-
add_token_budget(budget, 1, 0)
794-
output = scheduler._schedule_swapped(budget, curr_loras)
795-
remaining_swapped = scheduler.swapped
796-
assert len(remaining_swapped) == 1
797-
assert budget.num_batched_tokens == 1
798-
assert budget.num_curr_seqs == 0
799-
assert len(output.decode_seq_groups) == 0
800-
assert len(output.prefill_seq_groups) == 0
801-
802-
803-
def test_schedule_swapped_max_seqs():
804-
block_size = 4
805-
scheduler = initialize_scheduler(block_size=block_size,
806-
num_cpu_blocks=64,
807-
num_gpu_blocks=64)
808-
curr_loras = None
809-
blocks_to_swap_out: List[Tuple[int, int]] = []
810-
for i in range(4):
811-
_, seq_group = create_dummy_prompt(str(i),
812-
prompt_length=60,
813-
block_size=4)
814-
scheduler._allocate_and_set_running(seq_group)
815-
append_new_token_seq_group(60, seq_group, 1)
816-
scheduler._swap_out(seq_group, blocks_to_swap_out)
817-
scheduler._add_seq_group_to_swapped(seq_group)
818-
819-
budget = create_token_budget(max_num_seqs=2)
820-
output = scheduler._schedule_swapped(budget, curr_loras)
821-
remaining_swapped = scheduler.swapped
822-
assert len(remaining_swapped) == 2
823-
assert budget.num_batched_tokens == 2
824-
assert budget.num_curr_seqs == 2
825-
assert len(output.decode_seq_groups) == 2
826-
assert len(output.prefill_seq_groups) == 0
827-
828-
# Verify num_curr_seqs are respected.
829-
output = scheduler._schedule_swapped(budget, curr_loras)
830-
remaining_swapped = scheduler.swapped
831-
assert len(remaining_swapped) == 2
832-
assert budget.num_batched_tokens == 2
833-
assert budget.num_curr_seqs == 2
834-
assert len(output.decode_seq_groups) == 0
835-
assert len(output.prefill_seq_groups) == 0
836-
837-
838636
def test_schedule_swapped_max_loras():
839637
block_size = 4
840638
lora_config = LoRAConfig(max_lora_rank=8, max_loras=1)

0 commit comments

Comments
 (0)