diff --git a/rdev_requirements.txt b/rdev_requirements.txt index 47fa22662..235683508 100644 --- a/rdev_requirements.txt +++ b/rdev_requirements.txt @@ -4,7 +4,7 @@ packaging pydantic<2,!=1.10.20 pytest requests -setuptools +setuptools < 80.0 setuptools_scm >= 6.2, < 8 tomlkit tomli diff --git a/subprojects/robotpy-wpilib/tests/conftest.py b/subprojects/robotpy-wpilib/tests/conftest.py index 88728aa91..f7d7f9655 100644 --- a/subprojects/robotpy-wpilib/tests/conftest.py +++ b/subprojects/robotpy-wpilib/tests/conftest.py @@ -2,9 +2,13 @@ import pytest import ntcore -import wpilib from wpilib.simulation._simulation import _resetWpilibSimulationData +try: + import commands2 +except ImportError: + commands2 = None + @pytest.fixture def cfg_logging(caplog): diff --git a/subprojects/robotpy-wpilib/tests/requirements.txt b/subprojects/robotpy-wpilib/tests/requirements.txt index e079f8a60..30e882850 100644 --- a/subprojects/robotpy-wpilib/tests/requirements.txt +++ b/subprojects/robotpy-wpilib/tests/requirements.txt @@ -1 +1,2 @@ pytest +pytest-reraise diff --git a/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py b/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py new file mode 100644 index 000000000..86b5a29a6 --- /dev/null +++ b/subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py @@ -0,0 +1,868 @@ +""" +Proof of concept to test TimedRobotPy using PyTest + +This POC was made by deconstructing pytest_plugin.py so that it is no longer a plugin but a class that provides +fixtures. + +To run / debug this: + +pytest subprojects/robotpy-wpilib/tests/test_poc_timedrobot.py --no-header -vvv -s + +""" + +from enum import Enum + +import pytest +import ntcore +import wpilib +from wpilib import RobotController +from wpilib.simulation._simulation import _resetWpilibSimulationData +from wpilib.simulation import pauseTiming, restartTiming + +import gc + +import weakref + +import hal +import hal.simulation +import wpilib.shuffleboard +import wpilib.simulation + +from wpilib.timedrobotpy import TimedRobotPy + +import contextlib +import typing +import threading + +from wpilib.simulation import DriverStationSim, stepTiming, stepTimingAsync + + +def nottest(obj): + obj.__test__ = False + return obj + + +@nottest +class TestController: + """ + Use this object to control the robot's state during tests + """ + + def __init__(self, reraise, robot: wpilib.RobotBase, expectFinished: bool): + self._reraise = reraise + + self._thread: typing.Optional[threading.Thread] = None + self._robot = robot + self._expectFinished = expectFinished + + self._cond = threading.Condition() + self._robot_started = False + self._robot_init_started = False + self._robot_finished = False + + def _on_robot__init_started(self): + with self._cond: + self._robot_init_started = True + self._cond.notify_all() + + def _robot_thread(self, robot): + with self._cond: + self._robot_started = True + self._cond.notify_all() + + with self._reraise(catch=True): + assert robot is not None # shouldn't happen... + + robot._TestRobot__robotInitStarted = self._on_robot__init_started + + try: + robot.startCompetition() + assert self._expectFinished == self._robot_finished + finally: + del robot + + @contextlib.contextmanager + def run_robot(self): + """ + Use this in a "with" statement to start your robot code at the + beginning of the with block, and end your robot code at the end + of the with block. + + Your `robotInit` function will not be called until this function + is called. + """ + + # remove robot reference so it gets cleaned up when gc.collect() is called + robot = self._robot + self._robot = None + + self._thread = th = threading.Thread( + target=self._robot_thread, args=(robot,), daemon=True + ) + th.start() + + with self._cond: + # make sure the thread didn't die + assert self._cond.wait_for(lambda: self._robot_started, timeout=1) + + # If your robotInit is taking more than 2 seconds in simulation, you're + # probably doing something wrong... but if not, please report a bug! + assert self._cond.wait_for(lambda: self._robot_init_started, timeout=2) + + try: + # in this block you should tell the sim to do sim things + yield + finally: + self._robot_finished = True + robot.endCompetition() + + if isinstance(self._reraise.exception, RuntimeError): + if str(self._reraise.exception).startswith( + "HAL: A handle parameter was passed incorrectly" + ): + msg = ( + "Do not reuse HAL objects in tests! This error may occur if you" + " stored a motor/sensor as a global or as a class variable" + " outside of a method." + ) + if hasattr(Exception, "add_note"): + self._reraise.exception.add_note(f"*** {msg}") + else: + e = self._reraise.exception + self._reraise.reset() + raise RuntimeError(msg) from e + + # Increment time by 1 second to ensure that any notifiers fire + stepTimingAsync(1.0) + + # the robot thread should exit quickly + th.join(timeout=1) + if th.is_alive(): + pytest.fail("robot did not exit within 2 seconds") + + self._thread = None + + @property + def robot_is_alive(self) -> bool: + """ + True if the robot code is alive + """ + th = self._thread + if not th: + return False + + return th.is_alive() + + def step_timing( + self, + *, + seconds: float, + autonomous: bool = False, + test: bool = False, + enabled: bool = False, + assert_alive: bool = True, + ) -> float: + """ + This utility will increment simulated time, while pretending that + there's a driver station connected and delivering new packets + every 0.2 seconds. + + :param seconds: Number of seconds to run (will step in increments of 0.2) + :param autonomous: Tell the robot that it is in autonomous mode + :param enabled: Tell the robot that it is enabled + + :returns: Number of seconds time was incremented + """ + + if self._expectFinished: + assert self.robot_is_alive, "did you call control.run_robot()?" + + assert seconds > 0 + + DriverStationSim.setDsAttached(True) + DriverStationSim.setAutonomous(autonomous) + DriverStationSim.setTest(test) + DriverStationSim.setEnabled(enabled) + + tm = 0.0 + + while tm < seconds: + DriverStationSim.notifyNewData() + stepTiming(0.001) + if assert_alive and self._expectFinished: + assert self.robot_is_alive + tm += 0.001 + + return tm + + +@pytest.fixture(scope="function") +def decorated_robot_class(myrobot_class) -> tuple: + # attach physics + + robotClass = myrobot_class + + # Tests need to know when robotInit is called, so override the robot + # to do that + class TestRobot(robotClass): + def robotInit(self): + self.__robotInitStarted() + super().robotInit() + + TestRobot.__name__ = robotClass.__name__ + TestRobot.__module__ = robotClass.__module__ + TestRobot.__qualname__ = robotClass.__qualname__ + + return TestRobot + + +@pytest.fixture(scope="function", autouse=False) +def robot_with_sim_setup_teardown(decorated_robot_class): + """ + Your robot instance + + .. note:: RobotPy/WPILib testing infrastructure is really sensitive + to ensuring that things get cleaned up properly. Make sure + that you don't store references to your robot or other + WPILib objects in a global or static context. + """ + + # + # This function needs to do the same things that RobotBase.main does + # plus some extra things needed for testing + # + # Previously this was separate from robot fixture, but we need to + # ensure that the robot cleanup happens deterministically relative to + # when handle cleanup/etc happens, otherwise unnecessary HAL errors will + # bubble up to the user + # + + nt_inst = ntcore.NetworkTableInstance.getDefault() + nt_inst.startLocal() + + pauseTiming() + restartTiming() + + wpilib.DriverStation.silenceJoystickConnectionWarning(True) + DriverStationSim.setAutonomous(False) + DriverStationSim.setEnabled(False) + DriverStationSim.notifyNewData() + + robot = decorated_robot_class() + + # Tests only get a proxy to ensure cleanup is more reliable + yield weakref.proxy(robot) + + # HACK: avoid motor safety deadlock + wpilib.simulation._simulation._resetMotorSafety() + + del robot + + # Double-check all objects are destroyed so that HAL handles are released + gc.collect() + + # shutdown networktables before other kinds of global cleanup + # -> some reset functions will re-register listeners, so it's important + # to do this before so that the listeners are active on the current + # NetworkTables instance + nt_inst.stopLocal() + nt_inst._reset() + + # Cleanup WPILib globals + # -> preferences, SmartDashboard, Shuffleboard, LiveWindow, MotorSafety + wpilib.simulation._simulation._resetWpilibSimulationData() + wpilib._wpilib._clearSmartDashboardData() + wpilib.shuffleboard._shuffleboard._clearShuffleboardData() + + # Cancel all periodic callbacks + hal.simulation.cancelAllSimPeriodicCallbacks() + + # Reset the HAL handles + hal.simulation.resetGlobalHandles() + + # Reset the HAL data + hal.simulation.resetAllSimData() + + # Don't call HAL shutdown! This is only used to cleanup HAL extensions, + # and functions will only be called the first time (unless re-registered) + # hal.shutdown() + + +@pytest.fixture(scope="function") +def getTestController( + reraise, robot_with_sim_setup_teardown: wpilib.RobotBase, expectFinished +) -> TestController: + """ + A pytest fixture that provides control over your robot_with_sim_setup_teardown + """ + return TestController(reraise, robot_with_sim_setup_teardown, expectFinished) + + +class TimedRobotPyExpectsException(TimedRobotPy): + + def __init__(self): + super().__init__() + self._callOrder = ":" + + def startCompetition(self) -> None: + hasAssertionError = False + try: + super().startCompetition() + except AssertionError: + hasAssertionError = True + print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") + assert hasAssertionError + + +class TimedRobotPyDoNotExpectException(TimedRobotPy): + + def __init__(self): + super().__init__() + self._callOrder = ":" + + def startCompetition(self) -> None: + hasAssertionError = False + try: + super().startCompetition() + except AssertionError: + hasAssertionError = True + print(f"TimedRobotPyExpectsException hasAssertionError={hasAssertionError}") + assert not hasAssertionError + + +def printEntryAndExit(func): + def wrapper(*args, **kwargs): + # name = inspect.currentframe().f_code.co_name + name = func.__name__ + args[0]._callOrder += name + "+:" + print(f"{RobotController.getFPGATime()/1000_000.0:.3f}s:Enter:{name}") + result = func(*args, **kwargs) + args[0]._callOrder += name + "-:" + print(f"{RobotController.getFPGATime()/1000_000.0:.3f}s:Exit_:{name}") + return result + + return wrapper + + +class MyRobotRobotDefaultPass: + + @printEntryAndExit + def robotInit(self): + pass + + @printEntryAndExit + def robotPeriodic(self): + pass + + @printEntryAndExit + def autonomousInit(self): + pass + + @printEntryAndExit + def autonomousPeriodic(self): + pass + + @printEntryAndExit + def autonomousExit(self): + pass + + @printEntryAndExit + def teleopInit(self): + pass + + @printEntryAndExit + def teleopPeriodic(self): + pass + + @printEntryAndExit + def teleopExit(self): + pass + + @printEntryAndExit + def testInit(self): + pass + + @printEntryAndExit + def testPeriodic(self): + pass + + @printEntryAndExit + def testExit(self): + pass + + @printEntryAndExit + def disabledInit(self): + pass + + @printEntryAndExit + def disabledPeriodic(self): + pass + + @printEntryAndExit + def disabledExit(self): + pass + + @printEntryAndExit + def _simulationInit(self): + pass + + @printEntryAndExit + def _simulationPeriodic(self): + pass + + +class MyRobotRobotInitFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def robotInit(self): + assert False + + +class MyRobotRobotPeriodicFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def robotPeriodic(self): + assert False + + +class MyRobotAutonomousInitFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def autonomousInit(self): + assert False + + +class MyRobotAutonomousPeriodicFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def autonomousPeriodic(self): + assert False + + +class MyRobotAutonomousExitFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def autonomousExit(self): + assert False + + +class MyRobotTeleopInitFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def teleopInit(self): + assert False + + +class MyRobotTeleopPeriodicFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def teleopPeriodic(self): + assert False + + +class MyRobotTeleopExitFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def teleopExit(self): + assert False + + +class MyRobotDisabledInitFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def disabledInit(self): + assert False + + +class MyRobotDisabledPeriodicFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def disabledPeriodic(self): + assert False + + +class MyRobotDisabledExitFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def disabledExit(self): + assert False + + +class MyRobotTestInitFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def testInit(self): + assert False + + +class MyRobotTestPeriodicFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def testPeriodic(self): + assert False + + +class MyRobotTestExitFails(MyRobotRobotDefaultPass): + @printEntryAndExit + def testExit(self): + assert False + + +class ExpectFinished(Enum): + kNotFinished = 0 + kFinished = 1 + + +class RobotMode(Enum): + kNone = 0 + kDisabled = 1 + kAutonomous = 2 + kTeleop = 3 + kTest = 4 + + def __init__(self, code): + self._code = code + + @property + def autonomous(self): + return self is RobotMode.kAutonomous + + @property + def test(self): + return self is RobotMode.kTest + + @property + def enabled(self): + return (self is not RobotMode.kDisabled) and (self is not RobotMode.kNone) + + +@pytest.fixture(scope="function", autouse=False) +def myrobot_class( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> type[TimedRobotPy]: + class MyRobot(myRobotAddMethods, timedRobotExpectation): + + @printEntryAndExit + def startCompetition(self): + super().startCompetition() + + @printEntryAndExit + def endCompetition(self): + super().endCompetition() + + return MyRobot + + +@pytest.fixture(scope="function", autouse=False) +def expectFinished( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> bool: + return _expectFinished is ExpectFinished.kFinished + + +@pytest.fixture(scope="function", autouse=False) +def robotModeFixture( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> RobotMode: + return _robotMode + + +@pytest.fixture(scope="function", autouse=False) +def callSequenceStr( + myRobotAddMethods, + timedRobotExpectation, + _expectFinished, + _robotMode, + _callSequenceStr, +) -> str: + return _callSequenceStr + + +@pytest.mark.parametrize( + "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", + [ + ( + MyRobotRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-:autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousExit+:autonomousExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:", + ), + ( + MyRobotRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-:teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopExit+:teleopExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:", + ), + ( + MyRobotRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-:testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testExit+:testExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:", + ), + ( + MyRobotRobotInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:startCompetition-:", + ), + ( + MyRobotAutonomousInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:startCompetition-:", + ), + ( + MyRobotAutonomousPeriodicFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-:autonomousPeriodic+:startCompetition-:", + ), + ( + MyRobotAutonomousExitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kAutonomous, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-:autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousExit+:startCompetition-:", + ), + ( + MyRobotTeleopInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:startCompetition-:", + ), + ( + MyRobotTeleopPeriodicFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-:teleopPeriodic+:startCompetition-:", + ), + ( + MyRobotTeleopExitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTeleop, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-:teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopExit+:startCompetition-:", + ), + ( + MyRobotTestInitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:startCompetition-:", + ), + ( + MyRobotTestPeriodicFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-:testPeriodic+:startCompetition-:", + ), + ( + MyRobotTestExitFails, + TimedRobotPyExpectsException, + ExpectFinished.kNotFinished, + RobotMode.kTest, + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-:testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testExit+:startCompetition-:", + ), + ], +) +class TestCanThrowExceptions: + + def test_robot_mode_with_exceptions( + self, + getTestController, + robot_with_sim_setup_teardown, + robotModeFixture, + callSequenceStr, + ): + with getTestController.run_robot(): + rmf = robotModeFixture + periodS = robot_with_sim_setup_teardown.getPeriod() + # Run disabled for a short period + print( + f"periodS={periodS} or {periodS*1.5} a={rmf.autonomous} t={rmf.test} e={rmf.enabled}" + ) + getTestController.step_timing( + seconds=periodS * 1.5, + autonomous=rmf.autonomous, + test=rmf.test, + enabled=False, + ) + + # Run in desired mode for 1 period + getTestController.step_timing( + seconds=periodS, + autonomous=rmf.autonomous, + test=rmf.test, + enabled=rmf.enabled, + ) + + # Disabled for 1 period + getTestController.step_timing( + seconds=periodS, autonomous=rmf.autonomous, test=rmf.test, enabled=False + ) + print(f"result={robot_with_sim_setup_teardown._callOrder}") + assert robot_with_sim_setup_teardown._callOrder == callSequenceStr + + +@pytest.mark.parametrize( + "myRobotAddMethods, timedRobotExpectation, _expectFinished, _robotMode, _callSequenceStr", + [ + ( + MyRobotRobotDefaultPass, + TimedRobotPyDoNotExpectException, + ExpectFinished.kFinished, + None, + None, + ), + ], +) +class TestSequenceThroughModes: + + def test_robot_mode_sequence( + self, + getTestController, + robot_with_sim_setup_teardown, + ): + callSequenceStr = ( + ":startCompetition+:robotInit+:robotInit-:_simulationInit+:_simulationInit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":autonomousInit+:autonomousInit-" + ":autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousPeriodic+:autonomousPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":autonomousExit+:autonomousExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":teleopInit+:teleopInit-" + ":teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopPeriodic+:teleopPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":teleopExit+:teleopExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:disabledExit+:disabledExit-" + ":testInit+:testInit-" + ":testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testPeriodic+:testPeriodic-" + ":robotPeriodic+:robotPeriodic-:_simulationPeriodic+:_simulationPeriodic-" + ":testExit+:testExit-" + ":disabledInit+:disabledInit-:disabledPeriodic+:disabledPeriodic-:robotPeriodic+:robotPeriodic-" + ":_simulationPeriodic+:_simulationPeriodic-:" + ) + + with getTestController.run_robot(): + periodS = robot_with_sim_setup_teardown.getPeriod() + # Run disabled for a short period + getTestController.step_timing( + seconds=periodS * 1.5, + autonomous=True, + test=False, + enabled=False, + ) + + # Run in autonomous mode for 2 periods + getTestController.step_timing( + seconds=periodS * 2, + autonomous=True, + test=False, + enabled=True, + ) + + # Disabled for 1 period + getTestController.step_timing( + seconds=periodS, autonomous=False, test=False, enabled=False + ) + + # Run in teleop mode for 2 periods + getTestController.step_timing( + seconds=periodS * 2, + autonomous=False, + test=False, + enabled=True, + ) + + # Disabled for 1 period + getTestController.step_timing( + seconds=periodS, autonomous=False, test=False, enabled=False + ) + + # Run in test mode for 2 periods + getTestController.step_timing( + seconds=periodS * 2, + autonomous=False, + test=True, + enabled=True, + ) + + # Disabled for 1 period + getTestController.step_timing( + seconds=periodS, autonomous=False, test=False, enabled=False + ) + + print(f"result={robot_with_sim_setup_teardown._callOrder}") + assert robot_with_sim_setup_teardown._callOrder == callSequenceStr diff --git a/subprojects/robotpy-wpilib/tests/test_timedrobot.py b/subprojects/robotpy-wpilib/tests/test_timedrobot.py new file mode 100644 index 000000000..168d5b878 --- /dev/null +++ b/subprojects/robotpy-wpilib/tests/test_timedrobot.py @@ -0,0 +1,41 @@ +import pytest + +from wpilib.timedrobotpy import _Callback + + +def test_calcFutureExpirationUs() -> None: + cb = _Callback(func=None, periodUs=20_000, expirationUs=100) + assert cb.calcFutureExpirationUs(100) == 20_100 + assert cb.calcFutureExpirationUs(101) == 20_100 + assert cb.calcFutureExpirationUs(20_099) == 20_100 + assert cb.calcFutureExpirationUs(20_100) == 40_100 + assert cb.calcFutureExpirationUs(20_101) == 40_100 + + cb = _Callback(func=None, periodUs=40_000, expirationUs=500) + assert cb.calcFutureExpirationUs(500) == 40_500 + assert cb.calcFutureExpirationUs(501) == 40_500 + assert cb.calcFutureExpirationUs(40_499) == 40_500 + assert cb.calcFutureExpirationUs(40_500) == 80_500 + assert cb.calcFutureExpirationUs(40_501) == 80_500 + + cb = _Callback(func=None, periodUs=1_000, expirationUs=0) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_000_000) + == 1_000_000_000_000_001_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_000_001) + == 1_000_000_000_000_001_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_000_999) + == 1_000_000_000_000_001_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_001_000) + == 1_000_000_000_000_002_000 + ) + assert ( + cb.calcFutureExpirationUs(1_000_000_000_000_001_001) + == 1_000_000_000_000_002_000 + ) diff --git a/subprojects/robotpy-wpilib/wpilib/__init__.py b/subprojects/robotpy-wpilib/wpilib/__init__.py index 9ee20f8d9..ca43bbf55 100644 --- a/subprojects/robotpy-wpilib/wpilib/__init__.py +++ b/subprojects/robotpy-wpilib/wpilib/__init__.py @@ -233,6 +233,8 @@ from .cameraserver import CameraServer from .deployinfo import getDeployData +from .iterativerobotpy import IterativeRobotPy +from .timedrobotpy import TimedRobotPy try: from .version import version as __version__ @@ -241,4 +243,4 @@ from ._impl.main import run -__all__ += ["CameraServer", "run"] +__all__ += ["CameraServer", "run", "IterativeRobotPy", "TimedRobotPy"] diff --git a/subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py b/subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py new file mode 100644 index 000000000..3a572c92c --- /dev/null +++ b/subprojects/robotpy-wpilib/wpilib/iterativerobotpy.py @@ -0,0 +1,476 @@ +from enum import Enum + +from hal import ( + report, + tResourceType, + tInstances, + observeUserProgramDisabled, + observeUserProgramTest, + observeUserProgramAutonomous, + observeUserProgramTeleop, + simPeriodicBefore, + simPeriodicAfter, +) +from ntcore import NetworkTableInstance +from wpilib import ( + DriverStation, + DSControlWord, + Watchdog, + LiveWindow, + RobotBase, + SmartDashboard, + reportWarning, +) +from wpilib.shuffleboard import Shuffleboard +import wpimath.units + +_kResourceType_SmartDashboard = tResourceType.kResourceType_SmartDashboard +_kSmartDashboard_LiveWindow = tInstances.kSmartDashboard_LiveWindow + + +class IterativeRobotMode(Enum): + kNone = 0 + kDisabled = 1 + kAutonomous = 2 + kTeleop = 3 + kTest = 4 + + +# todo should this be IterativeRobotPy or IterativeRobotBase (replacing) or IterativeRobotBasePy +class IterativeRobotPy(RobotBase): + """ + IterativeRobotPy implements a specific type of robot program framework, + extending the RobotBase class. It provides similar functionality as + IterativeRobotBase does in a python wrapper of C++, but in pure python. + + The IterativeRobotPy class does not implement StartCompetition(), so it + should not be used by teams directly. + + This class provides the following functions which are called by the main + loop, StartCompetition(), at the appropriate times: + + robotInit() -- provide for initialization at robot power-on + + DriverStationConnected() -- provide for initialization the first time the DS + is connected + + Init() functions -- each of the following functions is called once when the + appropriate mode is entered: + + - disabledInit() -- called each and every time disabled is entered from another mode + - autonomousInit() -- called each and every time autonomous is entered from another mode + - teleopInit() -- called each and every time teleop is entered from another mode + - testInit() -- called each and every time test is entered from another mode + + Periodic() functions -- each of these functions is called on an interval: + + - robotPeriodic() + - disabledPeriodic() + - autonomousPeriodic() + - teleopPeriodic() + - testPeriodic() + + Exit() functions -- each of the following functions is called once when the + appropriate mode is exited: + + - disabledExit() -- called each and every time disabled is exited + - autonomousExit() -- called each and every time autonomous is exited + - teleopExit() -- called each and every time teleop is exited + - testExit() -- called each and every time test is exited + """ + + def __init__(self, period: wpimath.units.seconds) -> None: + """ + Constructor for IterativeRobotPy. + + :param period: period of the main robot periodic loop in seconds. + """ + super().__init__() + self._periodS = period + self.watchdog = Watchdog(self._periodS, self.printLoopOverrunMessage) + self._networkTableInstanceDefault = NetworkTableInstance.getDefault() + self._mode: IterativeRobotMode = IterativeRobotMode.kNone + self._lastMode: IterativeRobotMode = IterativeRobotMode.kNone + self._ntFlushEnabled: bool = True + self._lwEnabledInTest: bool = False + self._calledDsConnected: bool = False + self._reportedLw: bool = False + self._robotPeriodicHasRun: bool = False + self._simulationPeriodicHasRun: bool = False + self._disabledPeriodicHasRun: bool = False + self._autonomousPeriodicHasRun: bool = False + self._teleopPeriodicHasRun: bool = False + self._testPeriodicHasRun: bool = False + + def robotInit(self) -> None: + """ + Robot-wide initialization code should go here. + + Users should override this method for default Robot-wide initialization + which will be called when the robot is first powered on. It will be called + exactly one time. + + Note: This method is functionally identical to the class constructor so + that should be used instead. + """ + pass + + def driverStationConnected(self) -> None: + """ + Code that needs to know the DS state should go here. + + Users should override this method for initialization that needs to occur + after the DS is connected, such as needing the alliance information. + """ + pass + + def _simulationInit(self) -> None: + """ + Robot-wide simulation initialization code should go here. + + Users should override this method for default Robot-wide simulation + related initialization which will be called when the robot is first + started. It will be called exactly one time after robotInit is called + only when the robot is in simulation. + """ + pass + + def disabledInit(self) -> None: + """ + Initialization code for disabled mode should go here. + + Users should override this method for initialization code which will be + called each time + the robot enters disabled mode. + """ + pass + + def autonomousInit(self) -> None: + """ + Initialization code for autonomous mode should go here. + + Users should override this method for initialization code which will be + called each time the robot enters autonomous mode. + """ + pass + + def teleopInit(self) -> None: + """ + Initialization code for teleop mode should go here. + + Users should override this method for initialization code which will be + called each time the robot enters teleop mode. + """ + pass + + def testInit(self) -> None: + """ + Initialization code for test mode should go here. + + Users should override this method for initialization code which will be + called each time the robot enters test mode. + """ + pass + + def robotPeriodic(self) -> None: + """ + Periodic code for all modes should go here. + + This function is called each time a new packet is received from the driver + station. + """ + if not self._robotPeriodicHasRun: + print(f"Default robotPeriodic() method...Override me!") + self._robotPeriodicHasRun = True + + def _simulationPeriodic(self) -> None: + """ + Periodic simulation code should go here. + + This function is called in a simulated robot after user code executes. + """ + if not self._simulationPeriodicHasRun: + print(f"Default _simulationPeriodic() method...Override me!") + self._simulationPeriodicHasRun = True + + def disabledPeriodic(self) -> None: + """ + Periodic code for disabled mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in disabled + mode. + """ + if not self._disabledPeriodicHasRun: + print(f"Default disabledPeriodic() method...Override me!") + self._disabledPeriodicHasRun = True + + def autonomousPeriodic(self) -> None: + """ + Periodic code for autonomous mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in + autonomous mode. + """ + if not self._autonomousPeriodicHasRun: + print(f"Default autonomousPeriodic() method...Override me!") + self._autonomousPeriodicHasRun = True + + def teleopPeriodic(self) -> None: + """ + Periodic code for teleop mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in teleop + mode. + """ + if not self._teleopPeriodicHasRun: + print(f"Default teleopPeriodic() method...Override me!") + self._teleopPeriodicHasRun = True + + def testPeriodic(self) -> None: + """ + Periodic code for test mode should go here. + + Users should override this method for code which will be called each time a + new packet is received from the driver station and the robot is in test + mode. + """ + if not self._testPeriodicHasRun: + print(f"Default testPeriodic() method...Override me!") + self._testPeriodicHasRun = True + + def disabledExit(self) -> None: + """ + Exit code for disabled mode should go here. + + Users should override this method for code which will be called each time + the robot exits disabled mode. + """ + pass + + def autonomousExit(self) -> None: + """ + Exit code for autonomous mode should go here. + + Users should override this method for code which will be called each time + the robot exits autonomous mode. + """ + pass + + def teleopExit(self) -> None: + """ + Exit code for teleop mode should go here. + + Users should override this method for code which will be called each time + the robot exits teleop mode. + """ + pass + + def testExit(self) -> None: + """ + Exit code for test mode should go here. + + Users should override this method for code which will be called each time + the robot exits test mode. + """ + pass + + def setNetworkTablesFlushEnabled(self, enabled: bool) -> None: + """ + Enables or disables flushing NetworkTables every loop iteration. + By default, this is enabled. + + :deprecated: Deprecated without replacement. + + :param enabled: True to enable, false to disable. + """ + + self._ntFlushEnabled = enabled + + def enableLiveWindowInTest(self, testLW: bool) -> None: + """ + Sets whether LiveWindow operation is enabled during test mode. + + :param testLW: True to enable, false to disable. Defaults to false. + @throws if called in test mode. + """ + if self.isTestEnabled(): + raise RuntimeError("Can't configure test mode while in test mode!") + if not self._reportedLw and testLW: + report(_kResourceType_SmartDashboard, _kSmartDashboard_LiveWindow) + self._reportedLw = True + self._lwEnabledInTest = testLW + + def isLiveWindowEnabledInTest(self) -> bool: + """ + Whether LiveWindow operation is enabled during test mode. + """ + return self._lwEnabledInTest + + def getPeriod(self) -> wpimath.units.seconds: + """ + Gets time period between calls to Periodic() functions. + """ + return self._periodS + + def _loopFunc(self) -> None: + """ + Loop function. + """ + DriverStation.refreshData() + self.watchdog.reset() + + isEnabled, isAutonomous, isTest = self.getControlState() + if not isEnabled: + self._mode = IterativeRobotMode.kDisabled + elif isAutonomous: + self._mode = IterativeRobotMode.kAutonomous + elif isTest: + self._mode = IterativeRobotMode.kTest + else: + self._mode = IterativeRobotMode.kTeleop + + if not self._calledDsConnected and DSControlWord().isDSAttached(): + self._calledDsConnected = True + self.driverStationConnected() + + # If self._mode changed, call self._mode exit and entry functions + if self._lastMode is not self._mode: + + if self._lastMode is IterativeRobotMode.kDisabled: + self.disabledExit() + elif self._lastMode is IterativeRobotMode.kAutonomous: + self.autonomousExit() + elif self._lastMode is IterativeRobotMode.kTeleop: + self.teleopExit() + elif self._lastMode is IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(False) + Shuffleboard.disableActuatorWidgets() + self.testExit() + """ + todo switch to match statements when we don't build with python 3.9 + match self._lastMode: + case IterativeRobotMode.kDisabled: + self.disabledExit() + case IterativeRobotMode.kAutonomous: + self.autonomousExit() + case IterativeRobotMode.kTeleop: + self.teleopExit() + case IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(False) + Shuffleboard.disableActuatorWidgets() + self.testExit() + """ + + if self._mode is IterativeRobotMode.kDisabled: + self.disabledInit() + self.watchdog.addEpoch("disabledInit()") + elif self._mode is IterativeRobotMode.kAutonomous: + self.autonomousInit() + self.watchdog.addEpoch("autonomousInit()") + elif self._mode is IterativeRobotMode.kTeleop: + self.teleopInit() + self.watchdog.addEpoch("teleopInit()") + elif self._mode is IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(True) + Shuffleboard.enableActuatorWidgets() + self.testInit() + self.watchdog.addEpoch("testInit()") + """ + match self._mode: + case IterativeRobotMode.kDisabled: + self.disabledInit() + self._watchdog.addEpoch("disabledInit()") + case IterativeRobotMode.kAutonomous: + self.autonomousInit() + self._watchdog.addEpoch("autonomousInit()") + case IterativeRobotMode.kTeleop: + self.teleopInit() + self._watchdog.addEpoch("teleopInit()") + case IterativeRobotMode.kTest: + if self._lwEnabledInTest: + LiveWindow.setEnabled(True) + Shuffleboard.enableActuatorWidgets() + self.testInit() + self._watchdog.addEpoch("testInit()") + """ + self._lastMode = self._mode + + # Call the appropriate function depending upon the current robot mode + if self._mode is IterativeRobotMode.kDisabled: + observeUserProgramDisabled() + self.disabledPeriodic() + self.watchdog.addEpoch("disabledPeriodic()") + elif self._mode is IterativeRobotMode.kAutonomous: + observeUserProgramAutonomous() + self.autonomousPeriodic() + self.watchdog.addEpoch("autonomousPeriodic()") + elif self._mode is IterativeRobotMode.kTeleop: + observeUserProgramTeleop() + self.teleopPeriodic() + self.watchdog.addEpoch("teleopPeriodic()") + elif self._mode is IterativeRobotMode.kTest: + observeUserProgramTest() + self.testPeriodic() + self.watchdog.addEpoch("testPeriodic()") + """ + match self._mode: + case IterativeRobotMode.kDisabled: + observeUserProgramDisabled() + self.disabledPeriodic() + self.watchdog.addEpoch("disabledPeriodic()") + case IterativeRobotMode.kAutonomous: + observeUserProgramAutonomous() + self.autonomousPeriodic() + self.watchdog.addEpoch("autonomousPeriodic()") + case IterativeRobotMode.kTeleop: + observeUserProgramTeleop() + self.teleopPeriodic() + self.watchdog.addEpoch("teleopPeriodic()") + case IterativeRobotMode.kTest: + observeUserProgramTest() + self.testPeriodic() + self.watchdog.addEpoch("testPeriodic()") + """ + + self.robotPeriodic() + self.watchdog.addEpoch("robotPeriodic()") + + SmartDashboard.updateValues() + self.watchdog.addEpoch("SmartDashboard.updateValues()") + + LiveWindow.updateValues() + self.watchdog.addEpoch("LiveWindow.updateValues()") + + Shuffleboard.update() + self.watchdog.addEpoch("Shuffleboard.update()") + + if self.isSimulation(): + simPeriodicBefore() + self._simulationPeriodic() + simPeriodicAfter() + self.watchdog.addEpoch("_simulationPeriodic()") + + self.watchdog.disable() + + # Flush NetworkTables + if self._ntFlushEnabled: + self._networkTableInstanceDefault.flushLocal() + + # Warn on loop time overruns + if self.watchdog.isExpired(): + self.printWatchdogEpochs() + + def printLoopOverrunMessage(self) -> None: + reportWarning(f"Loop time of {self.watchdog.getTimeout()}s overrun", False) + + def printWatchdogEpochs(self) -> None: + """ + Prints list of epochs added so far and their times. + """ + self.watchdog.printEpochs() diff --git a/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py b/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py new file mode 100644 index 000000000..713fae6e6 --- /dev/null +++ b/subprojects/robotpy-wpilib/wpilib/timedrobotpy.py @@ -0,0 +1,272 @@ +from typing import Any, Callable, Iterable, ClassVar +from heapq import heappush, heappop +from hal import ( + report, + initializeNotifier, + setNotifierName, + observeUserProgramStarting, + updateNotifierAlarm, + waitForNotifierAlarm, + stopNotifier, + tResourceType, + tInstances, +) +from wpilib import RobotController +import wpimath.units + +from .iterativerobotpy import IterativeRobotPy + +_getFPGATime = RobotController.getFPGATime +_kResourceType_Framework = tResourceType.kResourceType_Framework +_kFramework_Timed = tInstances.kFramework_Timed + +microsecondsAsInt = int + + +class _Callback: + def __init__( + self, + func: Callable[[], None], + periodUs: microsecondsAsInt, + expirationUs: microsecondsAsInt, + ) -> None: + self.func = func + self._periodUs = periodUs + self.expirationUs = expirationUs + + @classmethod + def makeCallBack( + cls, + func: Callable[[], None], + startTimeUs: microsecondsAsInt, + periodUs: microsecondsAsInt, + offsetUs: microsecondsAsInt, + ) -> "_Callback": + + callback = _Callback(func=func, periodUs=periodUs, expirationUs=startTimeUs) + + currentTimeUs = _getFPGATime() + callback.expirationUs = offsetUs + callback.calcFutureExpirationUs( + currentTimeUs + ) + return callback + + def calcFutureExpirationUs( + self, currentTimeUs: microsecondsAsInt + ) -> microsecondsAsInt: + # increment the expiration time by the number of full periods it's behind + # plus one to avoid rapid repeat fires from a large loop overrun. + # + # This routine is called when either: + # this callback has never ran and self.expirationUs is + # TimedRobot._starttimeUs and we are calculating where the first + # expiration should be relative to TimedRobot._starttimeUs + # or: + # this call back has ran and self.expirationUs is when it was scheduled to + # expire and we are calculating when the next expirations should be relative + # to the last scheduled expiration + # + # We assume currentTime ≥ self.expirationUs rather than checking for it since the + # callback wouldn't be running otherwise. + # + # We take when we previously expired or when we started: self.expirationUs + # add + self._periodUs to get at least one period in the future + # then calculate how many whole periods we are behind: + # ((currentTimeUs - self.expirationUs) // self._periodUs) + # and multiply that by self._periodUs to calculate how much time in full + # periods we need to skip to catch up, and add that to the sum to calculate + # when we should run again. + return ( + self.expirationUs + + self._periodUs + + ((currentTimeUs - self.expirationUs) // self._periodUs) * self._periodUs + ) + + def setNextStartTimeUs(self, currentTimeUs: microsecondsAsInt) -> None: + self.expirationUs = self.calcFutureExpirationUs(currentTimeUs) + + def __lt__(self, other) -> bool: + return self.expirationUs < other.expirationUs + + def __bool__(self) -> bool: + return True + + +class _OrderedList: + def __init__(self) -> None: + self._data: list[Any] = [] + + def add(self, item: Any) -> None: + heappush(self._data, item) + + def pop(self) -> Any: + return heappop(self._data) + + def peek( + self, + ) -> Any: # todo change to Any | None when we don't build with python 3.9 + if self._data: + return self._data[0] + else: + return None + + def __len__(self) -> int: + return len(self._data) + + def __iter__(self) -> Iterable[Any]: + return iter(sorted(self._data)) + + def __contains__(self, item) -> bool: + return item in self._data + + def __str__(self) -> str: + return str(sorted(self._data)) + + +# todo what should the name of this class be? +class TimedRobotPy(IterativeRobotPy): + """ + TimedRobotPy implements the IterativeRobotBase robot program framework. + + The TimedRobotPy class is intended to be subclassed by a user creating a + robot program. + + Periodic() functions from the base class are called on an interval by a + Notifier instance. + """ + + kDefaultPeriod: ClassVar[wpimath.units.seconds] = ( + 0.020 # todo this is a change to keep consistent units in the API + ) + + def __init__(self, period: wpimath.units.seconds = kDefaultPeriod) -> None: + """ + Constructor for TimedRobotPy. + + :param period: period of the main robot periodic loop in seconds. + """ + super().__init__(period) + + # All periodic functions created by addPeriodic are relative + # to this self._startTimeUs + self._startTimeUs = _getFPGATime() + self._callbacks = _OrderedList() + self._loopStartTimeUs = 0 + self.addPeriodic(self._loopFunc, period=self._periodS) + + self._notifier, status = initializeNotifier() + print(f"{self._notifier}, {status} = initializeNotifier()") + if status != 0: + raise RuntimeError( + f"initializeNotifier() returned {self._notifier}, {status}" + ) + + status = setNotifierName(self._notifier, "TimedRobotPy") + if status != 0: + raise RuntimeError(f"setNotifierName() returned {status}") + + report(_kResourceType_Framework, _kFramework_Timed) + + def startCompetition(self) -> None: + """ + Provide an alternate "main loop" via startCompetition(). + """ + try: + self.robotInit() + + if self.isSimulation(): + self._simulationInit() + + # Tell the DS that the robot is ready to be enabled + print("********** Robot program startup complete **********", flush=True) + observeUserProgramStarting() + + # Loop forever, calling the appropriate mode-dependent function + # (really not forever, there is a check for a break) + while True: + # We don't have to check there's an element in the queue first because + # there's always at least one (the constructor adds one). It's re-enqueued + # at the end of the loop. + callback = self._callbacks.pop() + + status = updateNotifierAlarm(self._notifier, callback.expirationUs) + if status != 0: + raise RuntimeError(f"updateNotifierAlarm() returned {status}") + + currentTimeUs, status = waitForNotifierAlarm(self._notifier) + if status != 0: + raise RuntimeError( + f"waitForNotifierAlarm() returned currentTimeUs={currentTimeUs} status={status}" + ) + + if currentTimeUs == 0: + # when HAL_StopNotifier(self.notifier) is called the above waitForNotifierAlarm + # will return a currentTimeUs==0 and the API requires robots to stop any loops. + # See the API for waitForNotifierAlarm + break + + self._loopStartTimeUs = _getFPGATime() + self._runCallbackAndReschedule(callback, currentTimeUs) + + # Process all other callbacks that are ready to run + while self._callbacks.peek().expirationUs <= currentTimeUs: + callback = self._callbacks.pop() + self._runCallbackAndReschedule(callback, currentTimeUs) + finally: + self._stopNotifier() + + def _runCallbackAndReschedule( + self, callback: _Callback, currentTimeUs: microsecondsAsInt + ) -> None: + callback.func() + callback.setNextStartTimeUs(currentTimeUs) + self._callbacks.add(callback) + + def _stopNotifier(self): + stopNotifier(self._notifier) + + def endCompetition(self) -> None: + """ + Ends the main loop in startCompetition(). + """ + self._stopNotifier() + + def getLoopStartTime(self) -> microsecondsAsInt: + """ + Return the system clock time in microseconds + for the start of the current + periodic loop. This is in the same time base as Timer.getFPGATimestamp(), + but is stable through a loop. It is updated at the beginning of every + periodic callback (including the normal periodic loop). + + :returns: Robot running time in microseconds, + as of the start of the current + periodic function. + """ + + return self._loopStartTimeUs + + def addPeriodic( + self, + callback: Callable[[], None], + period: wpimath.units.seconds, + offset: wpimath.units.seconds = 0.0, + ) -> None: + """ + Add a callback to run at a specific period with a starting time offset. + + This is scheduled on TimedRobotPy's Notifier, so TimedRobotPy and the callback + run synchronously. Interactions between them are thread-safe. + + :param callback: The callback to run. + :param period: The period at which to run the callback. + :param offset: The offset from the common starting time. This is useful + for scheduling a callback in a different timeslot relative + to TimedRobotPy. + """ + + self._callbacks.add( + _Callback.makeCallBack( + callback, self._startTimeUs, int(period * 1e6), int(offset * 1e6) + ) + )