Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ __pycache__/
.envrc
.coverage

.env
.env
.vscode/
1 change: 1 addition & 0 deletions .python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.10
6 changes: 5 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,13 @@ Observe Pyth on-chain price feeds and run sanity checks on the data.

Container images are available at https://github.com/pyth-network/pyth-observer/pkgs/container/pyth-observer

To run Observer locally, make sure you have a recent version of [Poetry](https://python-poetry.org) installed and run:
To run Observer locally, you will need:
- Python 3.10 ([pyenv](https://github.com/pyenv/pyenv) is a nice way to manage Python installs, and once installed will automatically set the version to 3.10 for this project dir via the `.python-version` file).
- [Poetry](https://python-poetry.org), which handles package and virtualenv management.

Install dependencies and run the service:
```sh
$ poetry env use $(which python) # point Poetry to the pyenv python shim
$ poetry install
$ poetry run pyth-observer
```
Expand Down
69 changes: 67 additions & 2 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ throttler = "1.2.1"
types-pyyaml = "^6.0.12"
types-pytz = "^2022.4.0.0"
python-dotenv = "^1.0.1"
numpy = "^2.1.3"


[tool.poetry.group.dev.dependencies]
Expand Down
12 changes: 4 additions & 8 deletions pyth_observer/check/price_feed.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,13 @@ class PriceFeedState:

@runtime_checkable
class PriceFeedCheck(Protocol):
def __init__(self, state: PriceFeedState, config: PriceFeedCheckConfig):
...
def __init__(self, state: PriceFeedState, config: PriceFeedCheckConfig): ...

def state(self) -> PriceFeedState:
...
def state(self) -> PriceFeedState: ...

def run(self) -> bool:
...
def run(self) -> bool: ...

def error_message(self) -> dict:
...
def error_message(self) -> dict: ...


class PriceFeedOfflineCheck(PriceFeedCheck):
Expand Down
93 changes: 66 additions & 27 deletions pyth_observer/check/publisher.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,35 @@
from collections import defaultdict, deque
import time
from dataclasses import dataclass
from dataclasses import asdict, dataclass
from datetime import datetime
from typing import Dict, Protocol, runtime_checkable
from typing import Dict, List, Optional, Protocol, runtime_checkable
from zoneinfo import ZoneInfo

from loguru import logger
from pythclient.calendar import is_market_open
from pythclient.pythaccounts import PythPriceStatus
from pythclient.solana import SolanaPublicKey


@dataclass
class PriceUpdate:
"""Represents a single price with its timestamp (epoch seconds)."""

timestamp: int
price: float


PUBLISHER_EXCLUSION_DISTANCE = 25
PUBLISHER_CACHE_MAX_LEN = 30
"""Roughly 30 mins of updates, since the check runs about once a minute"""

PUBLISHER_CACHE = {}
PUBLISHER_CACHE: Dict[tuple[str, str], List[PriceUpdate]] = defaultdict(
lambda: deque(maxlen=PUBLISHER_CACHE_MAX_LEN)
)
"""
Cache that holds tuples of (price, timestamp) for publisher/feed combos as they stream in.
Entries longer than `PUBLISHER_CACHE_MAX_LEN` are automatically pruned.
Used by the PublisherStalledCheck to detect stalls in prices.
"""


@dataclass
Expand All @@ -35,17 +54,13 @@ class PublisherState:

@runtime_checkable
class PublisherCheck(Protocol):
def __init__(self, state: PublisherState, config: PublisherCheckConfig):
...
def __init__(self, state: PublisherState, config: PublisherCheckConfig): ...

def state(self) -> PublisherState:
...
def state(self) -> PublisherState: ...

def run(self) -> bool:
...
def run(self) -> bool: ...

def error_message(self) -> dict:
...
def error_message(self) -> dict: ...


class PublisherWithinAggregateConfidenceCheck(PublisherCheck):
Expand Down Expand Up @@ -240,6 +255,20 @@ def __init__(self, state: PublisherState, config: PublisherCheckConfig):
self.__abandoned_time_limit: int = int(config["abandoned_time_limit"])
self.__max_slot_distance: int = int(config["max_slot_distance"])

from pyth_observer.check.stall_detection import (
StallDetectionResult,
StallDetector,
) # noqa: deferred import to avoid circular import

self.__detector = StallDetector(
stall_time_limit=self.__stall_time_limit,
noise_threshold=float(config.get("noise_threshold")),
min_noise_samples=int(config.get("min_noise_samples")),
)

# Keep track of last analysis for error reporting
self.__last_analysis: Optional[StallDetectionResult] = None

def state(self) -> PublisherState:
return self.__state

Expand All @@ -254,36 +283,46 @@ def run(self) -> bool:

distance = self.__state.latest_block_slot - self.__state.slot

# Pass for redemption rates because they are expected to be static for long periods
if self.__state.asset_type == "Crypto Redemption Rate":
logger.info(f"Redemption rate: Skipping {self.__state.symbol}")
return True

# Pass when publisher is offline because PublisherOfflineCheck will be triggered
if distance >= self.__max_slot_distance:
return True

publisher_key = (self.__state.publisher_name, self.__state.symbol)
current_time = int(time.time())
previous_price, last_change_time = PUBLISHER_CACHE.get(
publisher_key, (None, None)
)

if previous_price is None or self.__state.price != previous_price:
PUBLISHER_CACHE[publisher_key] = (self.__state.price, current_time)
return True
publisher_key = (self.__state.publisher_name, self.__state.symbol)
PUBLISHER_CACHE[publisher_key].append(
PriceUpdate(current_time, self.__state.price)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the logic in the StalledDetector that you have I think you need to append a price only it's different that the latest stored price, otherwise the exact stalled might never fire.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yeah you're right, i was considering a few different ways to do this and looks like this slipped between the cracks during refactoring. test suite should have caught this -- let me fix and cover it with a test case

),
updates = PUBLISHER_CACHE[publisher_key]

time_since_last_change = current_time - last_change_time
if time_since_last_change > self.__stall_time_limit:
if time_since_last_change > self.__abandoned_time_limit:
return True # Abandon this check after the abandoned time limit
return False
# Analyze for stalls
result = self.__detector.analyze_updates(list(updates))
logger.debug(f"Stall detection result: {result}")

self.__last_analysis = result # For error logging

# If we've been stalled for too long, abandon this check
if result.is_stalled and result.duration > self.__abandoned_time_limit:
return True

return True
return not result.is_stalled

def error_message(self) -> dict:
stall_duration = f"{self.__last_analysis.duration:.1f} seconds"
return {
"msg": f"{self.__state.publisher_name} has been publishing the same price for too long.",
"msg": f"{self.__state.publisher_name} has been publishing the same price of {self.__state.symbol} for {stall_duration}",
"type": "PublisherStalledCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"price": self.__state.price,
"stall_duration": f"{int(time.time()) - PUBLISHER_CACHE[(self.__state.publisher_name, self.__state.symbol)][1]} seconds",
"stall_type": self.__last_analysis.stall_type,
"stall_duration": stall_duration,
"analysis": asdict(self.__last_analysis),
}


Expand Down
Loading
Loading