Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ ignore_missing_imports = true

[tool.poetry]
name = "pyth-observer"
version = "0.2.0"
version = "0.2.1"
description = "Alerts and stuff"
authors = []
readme = "README.md"
Expand Down
85 changes: 39 additions & 46 deletions pyth_observer/check/price_feed.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import time
from dataclasses import dataclass
from datetime import datetime
from textwrap import dedent
from typing import Dict, Optional, Protocol, runtime_checkable
from zoneinfo import ZoneInfo

Expand Down Expand Up @@ -42,7 +41,7 @@ def state(self) -> PriceFeedState:
def run(self) -> bool:
...

def error_message(self) -> str:
def error_message(self) -> dict:
...


Expand Down Expand Up @@ -80,17 +79,15 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
distance = self.__state.latest_block_slot - self.__state.latest_trading_slot
return dedent(
f"""
{self.__state.symbol} is offline (either non-trading/stale).
It is not updated for {distance} slots.

Latest trading slot: {self.__state.latest_trading_slot}
Block slot: {self.__state.latest_block_slot}
"""
).strip()
return {
"msg": f"{self.__state.symbol} is offline (either non-trading/stale). Last update {distance} slots ago.",
"type": "PriceFeedCheck",
"symbol": self.__state.symbol,
"latest_trading_slot": self.__state.latest_trading_slot,
"block_slot": self.__state.latest_block_slot,
}


class PriceFeedCoinGeckoCheck(PriceFeedCheck):
Expand Down Expand Up @@ -127,15 +124,14 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
return dedent(
f"""
{self.__state.symbol} is too far from Coingecko's price.

Pyth price: {self.__state.price_aggregate}
Coingecko price: {self.__state.coingecko_price}
"""
).strip()
def error_message(self) -> dict:
return {
"msg": f"{self.__state.symbol} is too far from Coingecko's price.",
"type": "PriceFeedCheck",
"symbol": self.__state.symbol,
"pyth_price": self.__state.price_aggregate,
"coingecko_price": self.__state.coingecko_price,
}


class PriceFeedConfidenceIntervalCheck(PriceFeedCheck):
Expand All @@ -158,14 +154,13 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
return dedent(
f"""
{self.__state.symbol} confidence interval is too low.

Confidence interval: {self.__state.confidence_interval_aggregate}
"""
).strip()
def error_message(self) -> dict:
return {
"msg": f"{self.__state.symbol} confidence interval is too low.",
"type": "PriceFeedCheck",
"symbol": self.__state.symbol,
"confidence_interval": self.__state.confidence_interval_aggregate,
}


class PriceFeedCrossChainOnlineCheck(PriceFeedCheck):
Expand Down Expand Up @@ -210,19 +205,18 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
if self.__state.crosschain_price:
publish_time = arrow.get(self.__state.crosschain_price["publish_time"])
else:
publish_time = arrow.get(0)

return dedent(
f"""
{self.__state.symbol} isn't online at the price service.

Last publish time: {publish_time.format('YYYY-MM-DD HH:mm:ss ZZ')}
"""
).strip()
return {
"msg": f"{self.__state.symbol} isn't online at the price service.",
"type": "PriceFeedCheck",
"symbol": self.__state.symbol,
"last_publish_time": publish_time.format("YYYY-MM-DD HH:mm:ss ZZ"),
}


class PriceFeedCrossChainDeviationCheck(PriceFeedCheck):
Expand Down Expand Up @@ -270,21 +264,20 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
# It can never happen because of the check logic but linter could not understand it.
price = (
self.__state.crosschain_price["price"]
if self.__state.crosschain_price
else None
)
return dedent(
f"""
{self.__state.symbol} is too far at the price service.

Price: {self.__state.price_aggregate}
Price at price service: {price}
"""
).strip()
return {
"msg": f"{self.__state.symbol} is too far at the price service.",
"type": "PriceFeedCheck",
"symbol": self.__state.symbol,
"price": self.__state.price_aggregate,
"price_at_price_service": price,
}


PRICE_FEED_CHECKS = [
Expand Down
84 changes: 38 additions & 46 deletions pyth_observer/check/publisher.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
from dataclasses import dataclass
from textwrap import dedent
from typing import Dict, Protocol, runtime_checkable

from pythclient.pythaccounts import PythPriceStatus
Expand Down Expand Up @@ -38,7 +37,7 @@ def state(self) -> PublisherState:
def run(self) -> bool:
...

def error_message(self) -> str:
def error_message(self) -> dict:
...


Expand Down Expand Up @@ -78,20 +77,17 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
diff = self.__state.price - self.__state.price_aggregate
intervals_away = abs(diff / self.__state.confidence_interval_aggregate)

return dedent(
f"""
{self.__state.publisher_name} price not within aggregate confidence.
It is {intervals_away} times away from confidence.

Symbol: {self.__state.symbol}
Publisher price: {self.__state.price} ± {self.__state.confidence_interval}
Aggregate price: {self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate}
"""
).strip()
return {
"msg": f"{self.__state.publisher_name} price is {intervals_away} times away from confidence.",
"type": "PublisherCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"publisher_price": f"{self.__state.price} ± {self.__state.confidence_interval}",
"aggregate_price": f"{self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate}",
}


class PublisherConfidenceIntervalCheck(PublisherCheck):
Expand Down Expand Up @@ -119,16 +115,15 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
return dedent(
f"""
{self.__state.publisher_name} confidence interval is too tight.

Symbol: {self.__state.symbol}
Price: {self.__state.price}
Confidence interval: {self.__state.confidence_interval}
"""
).strip()
def error_message(self) -> dict:
return {
"msg": f"{self.__state.publisher_name} confidence interval is too tight.",
"type": "PublisherCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"price": self.__state.price,
"confidence_interval": self.__state.confidence_interval,
}


class PublisherOfflineCheck(PublisherCheck):
Expand All @@ -154,17 +149,16 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
distance = self.__state.latest_block_slot - self.__state.slot
return dedent(
f"""
{self.__state.publisher_name} hasn't published recently for {distance} slots.

Symbol: {self.__state.symbol}
Publisher slot: {self.__state.slot}
Aggregate slot: {self.__state.aggregate_slot}
"""
).strip()
return {
"msg": f"{self.__state.publisher_name} hasn't published recently for {distance} slots.",
"type": "PublisherCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"publisher_slot": self.__state.slot,
"aggregate_slot": self.__state.aggregate_slot,
}


class PublisherPriceCheck(PublisherCheck):
Expand Down Expand Up @@ -203,19 +197,17 @@ def run(self) -> bool:
# Fail
return False

def error_message(self) -> str:
def error_message(self) -> dict:
deviation = (self.ci_adjusted_price_diff() / self.__state.price_aggregate) * 100

return dedent(
f"""
{self.__state.publisher_name} price is too far from aggregate price.

Symbol: {self.__state.symbol}
Publisher price: {self.__state.price} ± {self.__state.confidence_interval}
Aggregate price: {self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate}
Deviation: {deviation}%
"""
).strip()
return {
"msg": f"{self.__state.publisher_name} price is too far from aggregate price.",
"type": "PublisherCheck",
"publisher": self.__state.publisher_name,
"symbol": self.__state.symbol,
"publisher_price": f"{self.__state.price} ± {self.__state.confidence_interval}",
"aggregate_price": f"{self.__state.price_aggregate} ± {self.__state.confidence_interval_aggregate}",
"deviation": deviation,
}

# Returns the distance between the aggregate price and the closest side of the publisher's confidence interval
# Returns 0 if the aggregate price is within the publisher's confidence interval.
Expand Down
19 changes: 8 additions & 11 deletions pyth_observer/event.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
from typing import Dict, Literal, Protocol, TypedDict, cast
from typing import Dict, Protocol, TypedDict, cast

import aiohttp
from datadog_api_client.api_client import AsyncApiClient as DatadogAPI
Expand Down Expand Up @@ -38,7 +39,7 @@ def __init__(self, check: Check, context: Context):
async def send(self):
# Publisher checks expect the key -> name mapping of publishers when
# generating the error title/message.
text = self.check.error_message()
event = self.check.error_message()

# An example is: PriceFeedOfflineCheck-Crypto.AAVE/USD
aggregation_key = f"{self.check.__class__.__name__}-{self.check.state().symbol}"
Expand All @@ -50,8 +51,8 @@ async def send(self):

event = EventCreateRequest(
aggregation_key=aggregation_key,
title=text.split("\n")[0],
text=text,
title=event["msg"],
text=json.dumps(event),
tags=[
"service:observer",
f"network:{self.context['network']}",
Expand Down Expand Up @@ -84,9 +85,6 @@ async def send(self):
)


LogEventLevel = Literal["DEBUG", "INFO", "WARNING", "ERROR"]


class LogEvent(Event):
def __init__(self, check: Check, context: Context):
self.check = check
Expand All @@ -95,10 +93,9 @@ def __init__(self, check: Check, context: Context):
async def send(self):
# Publisher checks expect the key -> name mapping of publishers when
# generating the error title/message.
text = self.check.error_message()

level = cast(LogEventLevel, os.environ.get("LOG_EVENT_LEVEL", "INFO"))
logger.log(level, text.replace("\n", ". "))
event = self.check.error_message()
with logger.contextualize(**event):
logger.info(event["msg"])


class TelegramEvent(Event):
Expand Down