11import asyncio
2- from datetime import datetime , timezone
2+ import typing
3+ from datetime import datetime , timedelta , timezone
34from typing import Any
45from unittest .mock import MagicMock
56
67import anyio
78import pytest
8- from taskiq import AsyncBroker , TaskiqScheduler
9+ from faststream .types import SendableMessage
10+ from freezegun import freeze_time
11+ from taskiq import AsyncBroker
912from taskiq .cli .scheduler .args import SchedulerArgs
1013from taskiq .cli .scheduler .run import run_scheduler
1114from taskiq .schedule_sources import LabelScheduleSource
1215
13- from taskiq_faststream import BrokerWrapper
16+ from taskiq_faststream import BrokerWrapper , StreamScheduler
17+ from tests import messages
1418
1519
1620@pytest .mark .anyio
@@ -53,7 +57,7 @@ async def handler(msg: str) -> None:
5357 task = asyncio .create_task (
5458 run_scheduler (
5559 SchedulerArgs (
56- scheduler = TaskiqScheduler (
60+ scheduler = StreamScheduler (
5761 broker = taskiq_broker ,
5862 sources = [LabelScheduleSource (taskiq_broker )],
5963 ),
@@ -67,3 +71,74 @@ async def handler(msg: str) -> None:
6771
6872 mock .assert_called_once_with ("Hi!" )
6973 task .cancel ()
74+
75+ @pytest .mark .parametrize (
76+ "msg" ,
77+ [
78+ messages .message , # regular msg
79+ messages .sync_callable_msg , # sync callable
80+ messages .async_callable_msg , # async callable
81+ messages .sync_generator_msg , # sync generator
82+ messages .async_generator_msg , # async generator
83+ messages .sync_callable_class_message , # sync callable class
84+ messages .async_callable_class_message , # async callable class
85+ ],
86+ )
87+ async def test_task_multiple_schedules_by_cron (
88+ self ,
89+ subject : str ,
90+ broker : Any ,
91+ event : asyncio .Event ,
92+ msg : typing .Union [
93+ None ,
94+ SendableMessage ,
95+ typing .Callable [[], SendableMessage ],
96+ typing .Callable [[], typing .Awaitable [SendableMessage ]],
97+ typing .Callable [[], typing .Generator [SendableMessage , None , None ]],
98+ typing .Callable [[], typing .AsyncGenerator [SendableMessage , None ]],
99+ ],
100+ ) -> None :
101+ """Test cron runs twice via StreamScheduler."""
102+ received_message = []
103+
104+ @broker .subscriber (subject )
105+ async def handler (message : str ) -> None :
106+ received_message .append (message )
107+ event .set ()
108+
109+ taskiq_broker = self .build_taskiq_broker (broker )
110+
111+ taskiq_broker .task (
112+ msg ,
113+ ** {self .subj_name : subject },
114+ schedule = [
115+ {
116+ "cron" : "* * * * *" ,
117+ },
118+ ],
119+ )
120+
121+ async with self .test_class (broker ):
122+ with freeze_time ("00:00:00" , tick = True ) as frozen_datetime :
123+ task = asyncio .create_task (
124+ run_scheduler (
125+ SchedulerArgs (
126+ scheduler = StreamScheduler (
127+ broker = taskiq_broker ,
128+ sources = [LabelScheduleSource (taskiq_broker )],
129+ ),
130+ modules = [],
131+ ),
132+ ),
133+ )
134+
135+ await asyncio .wait_for (event .wait (), 2.0 )
136+ event .clear ()
137+ frozen_datetime .tick (timedelta (minutes = 2 ))
138+ await asyncio .wait_for (event .wait (), 2.0 )
139+
140+ task .cancel ()
141+
142+ assert received_message == [messages .message , messages .message ], (
143+ received_message
144+ )
0 commit comments