diff --git a/.bumpversion.toml b/.bumpversion.toml index 172ff0f..d7d3a3c 100644 --- a/.bumpversion.toml +++ b/.bumpversion.toml @@ -1,5 +1,5 @@ [tool.bumpversion] -current_version = "1.1.0" +current_version = "1.2.0" commit = true message = "Release v{new_version}" tag = true diff --git a/.gitignore b/.gitignore index 79db71f..d2f1286 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ __pycache__/ /.tox /docs/_build/ /dist +/.venv diff --git a/CHANGELOG.md b/CHANGELOG.md index 079388f..caad3fd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,9 @@ # Changelog +## v1.2.0 (2025-01-14) +- Add session mode to CLI. Control a session declaratively with a JSON file as input +- Updated docs for session mode usage + ## v1.1.0 (2024-08-17) - New random mode features: diff --git a/docs/cli.md b/docs/cli.md index 6483c9a..a58967d 100644 --- a/docs/cli.md +++ b/docs/cli.md @@ -159,6 +159,81 @@ can be used to customize the behavior (default values are shown): - `--spam-intensity`: Intensity for spam shocks (by default, the given `--intensity` is used) +## Session mode + +To schedule shock, spam, vibrate and beep events over a session, the session +command takes a json file as input allowing automation of multiple shockers for +completely handsfree use. + +```console +$ pishock session examples/session.json +✔ Validating events +✔ Event list is valid +▶ Session started +🟩 Count in mode is set to beep +🔔 Beeping shocker1 for 1s +🔔 Beeping shocker1 for 1s +🔔 Beeping shocker1 for 1s +🕐 Max runtime is 3600 seconds +🕐 Spam Cooldown is 120 seconds +⚡ Shocking shocker1 for 9s at 82% +⚡ Shocking shocker2 for 14s at 55% +``` + +Following similar field names to the random mode, the JSON format is as follows: + +```js +{ + "shocker_names": ["shocker-1", "shocker-2"], // cli shocker names + "max_runtime": "1h", // automatically end the session (default 1h inclusive of init_delay) + "init_delay": "2m", // delay the start of the script + "spam_cooldown": "2m", // limit how much you can be spammed + "count_in_mode": "beep", // if specified, the script will count down to session start + "events": [ // a list of events. add as many breakpoints as needed + { + "time": "0", // a time in seconds for when the session changes + "sync_mode": "sync", // random-shocker, sync, round-robin, dealers-choice + "break_duration": "1-10", // add spaces between shocker operations + "vibrate": { // by default, the program will vibe randomly between events + "intensity": "20-60", // intensity out of 100 + "duration": "1-6" // duration in seconds (1-15) + }, + "shock": { + "possibility": 15, // the percent chance of this event happening + "intensity": "3-8", + "duration": "1-6" + }, + "spam": { + "possibility": 1, + "operations": "10-20", // how many times to send the shocks consecutively + "intensity": "3-5", + "duration": "1-2", + "delay": 0.3 // delay in seconds between spammed shocks + }, + "beep": { + "possibility": 5, + "duration": "1-6" + } + }, + { + "time": "60s", // the session will change after the specified time + "sync_mode": "random-shocker", // change the way your shockers are chosen with each step + "vibrate": { + "possibility": 5, + "intensity": "70-90", + "duration": "3-6" + } + "shock": { + "possibility": 5, + "intensity": "1-2", + "duration": "12-15" + } + }, + ... // define as many events as needed + ] +} +``` + ## Serial usage When a PiShock is attached via USB, the commands `shock`, `vibrate`, `beep` and diff --git a/pyproject.toml b/pyproject.toml index 98a4b7d..ba66544 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "pishock" -version = "1.1.0" +version = "1.2.0" description = "The bestest Python PiShock API wrapper and CLI!" authors = ["Zerario "] readme = "README.md" diff --git a/src/pishock/__init__.py b/src/pishock/__init__.py index 932b8ad..8ff7e22 100644 --- a/src/pishock/__init__.py +++ b/src/pishock/__init__.py @@ -1,4 +1,4 @@ -__version__ = "1.1.0" +__version__ = "1.2.0" from .zap.core import ( Shocker as Shocker, diff --git a/src/pishock/zap/cli/__init__.py b/src/pishock/zap/cli/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/pishock/zap/cli/cli.py b/src/pishock/zap/cli/cli.py index 9276630..d3c3e6c 100644 --- a/src/pishock/zap/cli/cli.py +++ b/src/pishock/zap/cli/cli.py @@ -1,5 +1,6 @@ import contextlib import difflib +import json import pathlib import random import sys @@ -15,7 +16,7 @@ from typing_extensions import Annotated, TypeAlias from pishock.zap import serialapi, core, httpapi -from pishock.zap.cli import cli_random, cli_serial, cli_utils, cli_code +from pishock.zap.cli import cli_random, cli_serial, cli_utils, cli_code, cli_session """Command-line interface for PiShock.""" @@ -55,6 +56,12 @@ metavar="ID", ), ] +SessionFileArg: TypeAlias = Annotated[ + str, + typer.Argument( + help="A path to a JSON file containing the session format" + ) +] DurationOpt: TypeAlias = Annotated[ float, typer.Option("-d", "--duration", min=0, help="Duration in seconds."), @@ -66,7 +73,6 @@ ), ] - @contextlib.contextmanager def handle_errors(*args: Type[Exception]) -> Iterator[None]: try: @@ -300,6 +306,26 @@ def random_mode( ) random_shocker.run() +@app.command(name="session") +def session_mode( + ctx: typer.Context, + json_file: SessionFileArg) -> None: + """Use a session workflow to have haptic/shock/beep operations change over time""" + if not json_file: + raise typer.BadParameter("Must provide a session file path in JSON format") + + with open(json_file, 'r', encoding="UTF-8") as file: + data = json.load(file) + shocker_objs = [get_shocker(ctx.obj, shocker) for shocker in data.get("shocker_names")] + + session = cli_session.Session( + shockers=shocker_objs, + data=data + ) + + session.validate_events() + + session.run() @app.command(rich_help_panel="API credentials") def verify(ctx: typer.Context) -> None: diff --git a/src/pishock/zap/cli/cli_random.py b/src/pishock/zap/cli/cli_random.py index 98bc8f3..567ccd0 100644 --- a/src/pishock/zap/cli/cli_random.py +++ b/src/pishock/zap/cli/cli_random.py @@ -1,92 +1,14 @@ import contextlib import dataclasses import random -import re import time -from typing import Iterator, List, Optional, Union, Callable - -import click +from typing import Iterator, List, Optional import rich import typer from typing_extensions import Annotated, TypeAlias - from pishock.zap import httpapi, core from pishock.zap.cli import cli_utils as utils - -class RangeParser(click.ParamType): - name = "Range" - - def __init__( - self, min: int, max: Optional[int] = None, converter: Callable[[str], int] = int - ) -> None: - self.min = min - self.max = max - self.converter = converter - - def _parse_single(self, s: str) -> int: - try: - n = self.converter(s) - except ValueError: - self.fail(f"Value must be a {self.converter.__name__}: {s}") - - if self.max is None and n < self.min: - self.fail(f"Value must be at least {self.min}: {n}") - if self.max is not None and not (self.min <= n <= self.max): - self.fail(f"Value must be between {self.min} and {self.max}: {n}") - - return n - - def convert( - self, - value: Union[str, utils.Range], - param: Optional[click.Parameter], - ctx: Optional[click.Context], - ) -> utils.Range: - if isinstance(value, utils.Range): # default value - return value - - if "-" not in value: - n = self._parse_single(value) - return utils.Range(n, n) - - if value.count("-") > 1: - self.fail("Range must be in the form min-max.") - - a_str, b_str = value.split("-") - a = self._parse_single(a_str) - b = self._parse_single(b_str) - - try: - return utils.Range(a, b) - except ValueError as e: - self.fail(str(e)) - - -def parse_duration(duration: str) -> int: - """Parse duration in format XhYmZs into second duration.""" - if duration.isdigit(): - return int(duration) - - match = re.fullmatch( - r"(?P[0-9]+(\.[0-9])?h)?\s*" - r"(?P[0-9]+(\.[0-9])?m)?\s*" - r"(?P[0-9]+(\.[0-9])?s)?", - duration, - ) - if not match or not match.group(0): - raise ValueError( - f"Invalid duration: {duration} - " "expected XhYmZs or a number of seconds" - ) - seconds_string = match.group("seconds") if match.group("seconds") else "0" - seconds = float(seconds_string.rstrip("s")) - minutes_string = match.group("minutes") if match.group("minutes") else "0" - minutes = float(minutes_string.rstrip("m")) - hours_string = match.group("hours") if match.group("hours") else "0" - hours = float(hours_string.rstrip("h")) - return int(seconds + minutes * 60 + hours * 3600) - - @dataclasses.dataclass class SpamSettings: possibility: int @@ -221,7 +143,7 @@ def _tick(self) -> None: "Duration in seconds, as a single value or a min-max range (0-15 " "respectively)." ), - click_type=RangeParser(min=0, max=15), + click_type=utils.RangeParser(min=0, max=15), ), ] @@ -234,7 +156,7 @@ def _tick(self) -> None: "Intensity in percent, as a single value or min-max range (0-100 " "respectively)." ), - click_type=RangeParser(min=0, max=100), + click_type=utils.RangeParser(min=0, max=100), ), ] @@ -246,7 +168,7 @@ def _tick(self) -> None: help="Delay between operations, in seconds or a string like " "1h2m3s (with h/m being optional). With a min-max range of such values, " "picked randomly.", - click_type=RangeParser(min=0, converter=parse_duration), + click_type=utils.RangeParser(min=0, converter=utils.parse_duration), ), ] @@ -257,7 +179,7 @@ def _tick(self) -> None: help="Initial delay before the first operation, in seconds or a string like " "1h2m3s (with h/m being optional). With a min-max range of such values, " "picked randomly.", - click_type=RangeParser(min=0, converter=parse_duration), + click_type=utils.RangeParser(min=0, converter=utils.parse_duration), ), ] @@ -274,7 +196,7 @@ def _tick(self) -> None: utils.Range, typer.Option( help="Number of operations to spam, as a single value or min-max range.", - click_type=RangeParser(min=1), + click_type=utils.RangeParser(min=1), ), ] @@ -284,7 +206,7 @@ def _tick(self) -> None: help="Delay between spam operations, in seconds or a string like " "1h2m3s (with h/m being optional). With a min-max range of such values, " "picked randomly.", - click_type=RangeParser(min=0, converter=parse_duration), + click_type=utils.RangeParser(min=0, converter=utils.parse_duration), ), ] @@ -295,7 +217,7 @@ def _tick(self) -> None: "Duration of spam operations in seconds, as a single value or min-max " "range." ), - click_type=RangeParser(min=0, max=15), + click_type=utils.RangeParser(min=0, max=15), ), ] @@ -306,7 +228,7 @@ def _tick(self) -> None: "Intensity of spam operations in percent, as a single value or min-max " "range. If not given, normal intensity is used." ), - click_type=RangeParser(min=0, max=100), + click_type=utils.RangeParser(min=0, max=100), ), ] @@ -317,7 +239,7 @@ def _tick(self) -> None: "Maximum runtime in seconds or a string like 1h2m3s (with h/m being " "optional). With a min-max range of such values, picked randomly." ), - click_type=RangeParser(min=0, converter=parse_duration), + click_type=utils.RangeParser(min=0, converter=utils.parse_duration), ), ] @@ -328,7 +250,7 @@ def _tick(self) -> None: "Duration for vibration in seconds, as a single value or a min-max " "range (0-15 respectively). If not given, --duration is used." ), - click_type=RangeParser(min=0, max=15), + click_type=utils.RangeParser(min=0, max=15), ), ] @@ -339,7 +261,7 @@ def _tick(self) -> None: "Intensity in percent, as a single value or min-max range (0-100 " "respectively). If not given, --intensity is used." ), - click_type=RangeParser(min=0, max=100), + click_type=utils.RangeParser(min=0, max=100), ), ] diff --git a/src/pishock/zap/cli/cli_session.py b/src/pishock/zap/cli/cli_session.py new file mode 100644 index 0000000..4780cb3 --- /dev/null +++ b/src/pishock/zap/cli/cli_session.py @@ -0,0 +1,388 @@ +"""Session mode allows you to tailor automation of a longer session. +You can ramp up and down the intensity or use built-in randomizers +to choose between ranges of durations, intensity and how the shockers +interact over a given timeframe using JSON +""" +import contextlib +from enum import Enum +from functools import reduce +import math +import random +import time +from concurrent.futures import ThreadPoolExecutor +from typing import Iterator, List, Optional +from typing_extensions import TypeAlias +import rich +from pishock.zap import httpapi, core +from pishock.zap.cli import cli_utils as utils + +JSON: TypeAlias = dict[str, "JSON"] | list["JSON"] | str | int | float | bool | None + +allowed_sync_modes = [ + "random-shocker", + "round-robin", + "sync", + "dealers-choice" +] + +class ProgamModes(Enum): + """States for each tick""" + SHOCK = 1 + VIBRATE = 2 + BEEP = 3 + SPAM = 4 + +MINIMUM_DURATION_BETWEEN_EVENTS = 0.3 +DEFAULT_MAX_RUNTIME = "1h" +DEFAULT_JSON_BREAK_DURATION = "1-10" +DEFAULT_VIBRATION_DURATION_RANGE = "1-5" +DEFAULT_VIBRATION_INTENSITY_RANGE = "0-100" +DEFAULT_SYNC_MODE = "random-shocker" +MAX_THREADS = 8 # max supported shockers to sync + +class Session(): + """Session state handler""" + def __init__( + self, + shockers: List[core.Shocker], + data: List): + self.data = data + self.shockers = shockers + self.declared_shockers = None + self.init_delay = 0 + self.max_runtime = None + self.count_in_mode = None + self.start_time = time.monotonic() + self.sync_mode = "random-shocker" + self.spam_cooldown = 0 + self.spam_cooldown_timer = time.monotonic() + self.default_vibration_intensity = self._parse_duration(DEFAULT_VIBRATION_INTENSITY_RANGE) + self.default_vibration_duration = self._parse_duration(DEFAULT_VIBRATION_DURATION_RANGE) + + def _log(self, message: str) -> None: + rich.print(message) + + def _get_event_params(self, seconds): + """Get an event for the specific timeframe""" + pointer = 0 + items = self.data.get("events") + length = len(items) + prev_time = 0 + prev_val = None + + for v in items: + cur_time = utils.parse_duration(v.get("time")) + + if prev_time < seconds < cur_time: + if pointer == 0: + return v + + return prev_val + + if seconds > cur_time and pointer + 1 < length: + prev_time = cur_time + prev_val = v + pointer = pointer + 1 + continue + + return v + + return None + + @contextlib.contextmanager + def _handle_errors(self) -> Iterator[None]: + """Handle HTTP errors and bad input to the shockers""" + try: + yield + except (httpapi.APIError, ValueError) as e: + utils.print_exception(e) + + def _get_shockers_for_event(self) -> List[core.Shocker]: + if len(self.shockers) == 1: + return [self.shockers[0]] + + if self.sync_mode == "dealers-choice": + self.sync_mode = random.choice(allowed_sync_modes[:3]) + + match self.sync_mode: + case "sync": + return self.shockers + case "round-robin": + self.shockers = self.shockers[1:] + self.shockers[:1] + return [self.shockers[0]] + case "random-shocker": + return [random.choice(self.shockers)] + + def _shock(self, shocker: core.Shocker, duration: int, intensity: int) -> None: + """Wrap shock API with error handling and logging""" + self._log( + f":zap: [yellow]Shocking[/] [green]{shocker}[/] for [green]{duration}s[/] " + f"at [green]{intensity}%[/]." + ) + with self._handle_errors(): + shocker.shock(duration=duration, intensity=intensity) + + def _vibrate(self, shocker: core.Shocker, duration: int, intensity: int) -> None: + """Wrap vibrate API with error handling and logging""" + self._log( + f":vibration_mode: [cyan]Vibrating[/] [green]{shocker}[/] for " + f"[green]{duration}s[/] at [green]{intensity}%[/]." + ) + + with self._handle_errors(): + shocker.vibrate(duration=duration, intensity=intensity) + + def _beep(self, shocker: core.Shocker, duration: int) -> None: + """Wrap beep API with error handling and logging""" + self._log( + f":bell: [cyan]Beeping[/] [green]{shocker}[/] for " + f"[green]{duration}s[/]" + ) + with self._handle_errors(): + shocker.beep(duration=duration) + + def _spam(self, spam_event: dict, shockers: List[core.Shocker], executor: ThreadPoolExecutor) -> None: + """Send consecutive shocks to the user""" + self._log("[red bold]Spamming.[/]") + + operations = spam_event.get("operations") + intensity = spam_event.get("intensity") + duration = spam_event.get("duration") + delay = spam_event.get("delay") + + for _ in range(operations.pick()): + d = duration.pick() + i = intensity.pick() + + for shocker in shockers: + executor.submit(self._shock, shocker, d, i) + + time.sleep(d + delay + MINIMUM_DURATION_BETWEEN_EVENTS) + + def _execute_count_in(self) -> None: + """Alert user that the session is about to start using beep or vibration""" + if self.count_in_mode == "vibrate": + for _ in range(3): + self._vibrate(self.shockers[0], 1, 100) + time.sleep(MINIMUM_DURATION_BETWEEN_EVENTS) + else: + for _ in range(3): + self._beep(self.shockers[0], 1) + time.sleep(MINIMUM_DURATION_BETWEEN_EVENTS) + + def _parse_duration(self, val: str) -> utils.Range: + """Parse json duration ranges to a Range""" + return utils.RangeParser(min=0, converter=utils.parse_duration).convert(val, ctx=None, param=None) + + def _parse_intensity(self, val: str) -> utils.Range: + """Parse json 0-100 ranges into a Range""" + return utils.RangeParser(min=0, max=100).convert(val, ctx=None, param=None) + + def _parse_sync_mode(self, sync_mode:str) -> str: + """Convert shocker sync modes from JSON to domain values""" + if sync_mode: + if sync_mode in allowed_sync_modes: + return sync_mode + + raise ValueError(f"the JSON field: root.events.sync_mode: {sync_mode} is invalid") + + return DEFAULT_SYNC_MODE + + def _parse_shocker_info(self, cls: str, val: JSON) -> Optional[dict]: + """Get shared shocker characteristics""" + if not val: + return None + + intensity = duration = None + + if cls != "beep": + if val.get("intensity"): + intensity = self._parse_intensity(val.get("intensity")) + else: + raise ValueError(f"the JSON field: root.events.{cls}.intensity is missing") + + if val.get("duration"): + duration = self._parse_duration(val.get("duration")) + else: + raise ValueError(f"the JSON field: root.events.{cls}.duration is missing") + + return { + "possibility": val.get("possibility"), + "intensity": intensity, + "duration": duration + } + + def _parse_spam_info(self, cls: str, val: JSON) -> Optional[dict]: + """Spam requires some extra details beyond intensity, duration and possibility""" + if not val: + return None + + operations = delay = None + info = self._parse_shocker_info("spam", val) + + if val.get("operations"): + operations = self._parse_intensity(val.get("operations")) + else: + raise ValueError(f"the JSON field: root.events.{cls}.operations is missing") + + if val.get("delay"): + delay = float(val.get("delay")) + else: + raise ValueError(f"the JSON field: root.events.{cls}.delay is missing") + + info["operations"] = operations + info["delay"] = delay + + return info + + def _parse_event(self, val: JSON) -> dict: + """Create domain values from JSON""" + return { + "sync_mode": self._parse_sync_mode(val.get("sync_mode")), + "break_duration": self._parse_duration(val.get("break_duration", DEFAULT_JSON_BREAK_DURATION)), + "vibrate": self._parse_shocker_info("vibrate", val.get("vibrate")), + "shock": self._parse_shocker_info("shock", val.get("shock")), + "beep": self._parse_shocker_info("beep", val.get("beep")), + "spam": self._parse_spam_info("spam", val.get("spam")) + } + + def _get(self, dictionary:dict, keys:str, default=None) -> int: + return reduce(lambda d, key: d.get(key, default) if isinstance(d, dict) else default, keys.split("."), dictionary) + + def _get_int(self, dictionary:dict, keys:str) -> int: + val = self._get(dictionary, keys) + + if val is None: + return 0 + + return val + + def _get_possibility_map(self, event: dict): + """create a list of 100 operation possibilities based on the event data and then + shuffle it to simulate a percent chance when chosen at random""" + spam = self._get_int(event, "spam.possibility") + shock = self._get_int(event, "shock.possibility") + beep = self._get_int(event, "beep.possibility") + vibrate = self._get_int(event, "vibrate.possibility") + vibrate_possibility = self._get(event, "vibrate.possibility") + + if spam + shock + beep + vibrate > 100: + raise ValueError(f"the JSON field: root.events.evt.possibility exceeds 100%: {event}") + + if time.monotonic() < self.spam_cooldown_timer: + spam = 0 + + modes = [ProgamModes.SPAM]*spam\ + + [ProgamModes.SHOCK]*shock\ + + [ProgamModes.BEEP]*beep\ + + [ProgamModes.VIBRATE]*vibrate + + if vibrate_possibility is None: + pad = modes + [ProgamModes.VIBRATE]*(100-len(modes)) + else: + pad = modes + [None]*(100-len(modes)) + + random.shuffle(pad) + + return pad + + def _tick(self) -> None: + """event handler loop""" + event = self._parse_event(self._get_event_params(time.monotonic() - self.start_time)) + event_choice = random.choice(self._get_possibility_map(event)) + break_duration = event["break_duration"] + self.sync_mode = event["sync_mode"] + shockers = self._get_shockers_for_event() + with ThreadPoolExecutor(max_workers=MAX_THREADS) as executor: + match event_choice: + case(ProgamModes.SHOCK): + duration = event["shock"]["duration"].pick() + intensity = event["shock"]["intensity"].pick() + for shocker in shockers: + executor.submit(self._shock, + shocker, + duration, + intensity) + + + time.sleep(duration) + + case(ProgamModes.SPAM): + self.spam_cooldown_timer = time.monotonic() + self.spam_cooldown + self._spam(event["spam"], shockers, executor) + + case(ProgamModes.VIBRATE): + duration = self.default_vibration_duration.pick() + intensity = self.default_vibration_intensity.pick() + + if event.get("vibrate") is not None: + duration = event["vibrate"]["duration"].pick() + intensity = event["vibrate"]["intensity"].pick() + + for shocker in shockers: + executor.submit(self._vibrate, + shocker, + duration, + intensity) + + time.sleep(duration) + + case(ProgamModes.BEEP): + duration = event["beep"]["duration"].pick() + + for shocker in shockers: + executor.submit(self._beep, + shocker, + duration) + + time.sleep(duration) + + time.sleep(break_duration.pick() + MINIMUM_DURATION_BETWEEN_EVENTS) + + def run(self): + """entry point for the session command""" + self._log(":arrow_forward: [white]Session started[/]") + + self.declared_shockers = self.data.get("shocker_names") + if not self.declared_shockers or len(self.declared_shockers) == 0: + raise ValueError("the JSON field: root.shockers can not be blank or empty") + + self.count_in_mode = self.data.get("count_in_mode") + if self.count_in_mode is not None: + self._log(f":green_square: [blue]Count in mode[/] is set to [green]{self.count_in_mode}[/]") + + if self.data.get("init_delay"): + self.init_delay = self._parse_duration(self.data.get("init_delay", 0)).pick() + self._log(f":zzz: [blue]Initial delay[/] of [green]{self.init_delay}[/] seconds") + time.sleep(self.init_delay) + + if self.count_in_mode is not None: + self._execute_count_in() + + if self.data.get("max_runtime"): + self.max_runtime = self._parse_duration(self.data.get("max_runtime", DEFAULT_MAX_RUNTIME)).pick() + self._log(f":clock1: [blue]Max runtime[/] is [green]{self.max_runtime}[/] seconds") + + if self.data.get("spam_cooldown"): + self.spam_cooldown = self._parse_duration(self.data.get("spam_cooldown", 0)).pick() + self._log(f":clock1: [blue]Spam Cooldown[/] is [green]{self.spam_cooldown}[/] seconds") + + while self.max_runtime is None or (time.monotonic() - self.start_time) < self.max_runtime: + self._tick() + + self._log(f":checkered_flag: [white]Session ended at[/] [green]{math.ceil(time.monotonic()-self.start_time)}[/] seconds") + + def validate_events(self): + """Check the events list can be parsed beginning to end and output the plan""" + self._log(":heavy_check_mark: [blue]Validating events[/]") + + items = self.data.get("events") + + if not items: + raise ValueError("The JSON field: root.events cannot be empty and must be an array") + + for val in items: + event = self._parse_event(val) + self._get_possibility_map(event) + + self._log(":heavy_check_mark: [blue]Event list is valid[/]") diff --git a/src/pishock/zap/cli/cli_utils.py b/src/pishock/zap/cli/cli_utils.py index a9178fe..20d2284 100644 --- a/src/pishock/zap/cli/cli_utils.py +++ b/src/pishock/zap/cli/cli_utils.py @@ -5,9 +5,10 @@ import random import pathlib import json -from typing import Any +from typing import Any, Optional, Union, Callable import platformdirs +import click import rich import typer @@ -122,3 +123,77 @@ def bool_emoji(value: bool) -> str: def paused_emoji(is_paused: bool) -> str: return ":double_vertical_bar:" if is_paused else ":arrow_forward:" + +class RangeParser(click.ParamType): + name = "Range" + + def __init__( + self, min: int, max: Optional[int] = None, converter: Callable[[str], int] = int + ) -> None: + self.min = min + self.max = max + self.converter = converter + + def _parse_single(self, s: str) -> int: + try: + n = self.converter(s) + except ValueError: + self.fail(f"Value must be a {self.converter.__name__}: {s}") + + if self.max is None and n < self.min: + self.fail(f"Value must be at least {self.min}: {n}") + if self.max is not None and not (self.min <= n <= self.max): + self.fail(f"Value must be between {self.min} and {self.max}: {n}") + + return n + + def convert( + self, + value: Union[str, Range], + param: Optional[click.Parameter], + ctx: Optional[click.Context], + ) -> Range: + if isinstance(value, Range): # default value + return value + + if "-" not in value: + n = self._parse_single(value) + return Range(n, n) + + if value.count("-") > 1: + self.fail("Range must be in the form min-max.") + + a_str, b_str = value.split("-") + a = self._parse_single(a_str) + b = self._parse_single(b_str) + + try: + return Range(a, b) + except ValueError as e: + self.fail(str(e)) + +def parse_duration(duration: str) -> int: + """Parse duration in format XhYmZs into second duration.""" + if duration.isdigit(): + return int(duration) + + match = re.fullmatch( + r"(?P[0-9]+(\.[0-9])?h)?\s*" + r"(?P[0-9]+(\.[0-9])?m)?\s*" + r"(?P[0-9]+(\.[0-9])?s)?", + duration, + ) + + if not match or not match.group(0): + raise ValueError( + f"Invalid duration: {duration} - expected XhYmZs or a number of seconds" + ) + + seconds_string = match.group("seconds") if match.group("seconds") else "0" + seconds = float(seconds_string.rstrip("s")) + minutes_string = match.group("minutes") if match.group("minutes") else "0" + minutes = float(minutes_string.rstrip("m")) + hours_string = match.group("hours") if match.group("hours") else "0" + hours = float(hours_string.rstrip("h")) + + return int(seconds + minutes * 60 + hours * 3600)