Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added publisher/__init__.py
Empty file.
8 changes: 8 additions & 0 deletions publisher/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ class PythReplicatorConfig:
first_mapping: str
program_key: str
staleness_time_in_secs: int = ts.option(default=30)
# Manual aggregation is aggregating the prices of the publishers and ignoring
# the min_publishers when aggregate price status is not TRADING. This will improve
# the feed uptime but reduces the accuracy of the feed. One benefit of this feature
# is mirroring coming soon feeds on the target network that have some beta publishers.
manual_agg_enabled: bool = ts.option(default=True)
# The maximum slot difference to consider a publisher for manual aggregation
# when the aggregate price status is not TRADING.
manual_agg_max_slot_diff: int = ts.option(default=25)
account_update_interval_secs: int = ts.option(default=300)


Expand Down
71 changes: 65 additions & 6 deletions publisher/providers/pyth_replicator.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from typing import Dict, List, Optional, Tuple
from pythclient.pythclient import PythClient
from pythclient.pythaccounts import PythPriceAccount
from pythclient.pythaccounts import PythPriceAccount, PythPriceStatus
import time


Expand Down Expand Up @@ -46,11 +46,47 @@ async def _update_loop(self) -> None:
if isinstance(update, PythPriceAccount):
symbol = update.product.symbol

self._prices[symbol] = [
update.aggregate_price,
update.aggregate_price_confidence_interval,
update.timestamp,
]
if self._prices.get(symbol) is None:
self._prices[symbol] = [None, None, None]

if update.aggregate_price_status == PythPriceStatus.TRADING:
self._prices[symbol] = [
update.aggregate_price,
update.aggregate_price_confidence_interval,
update.timestamp,
]
elif self._config.manual_agg_enabled:
# Do the manual aggregation based on the recent active publishers
# and their confidence intervals if possible. This will allow us to
# get an aggregate if there are some active publishers but they are
# not enough to reach the min_publishers threshold.
prices = []

current_slot = update.slot
for price_component in update.price_components:
price = price_component.latest_price_info
if (
price.price_status == PythPriceStatus.TRADING
and current_slot - price.pub_slot
<= self._config.manual_agg_max_slot_diff
):
prices.extend(
[
price.price - price.confidence_interval,
price.price,
price.price + price.confidence_interval,
]
)
break

if prices:
agg_price, agg_confidence_interval = manual_aggregate(prices)

self._prices[symbol] = [
agg_price,
agg_confidence_interval,
update.timestamp,
]

log.info(
"Received a price update", symbol=symbol, price=self._prices[symbol]
Expand Down Expand Up @@ -86,3 +122,26 @@ def latest_price(self, symbol: Symbol) -> Optional[Price]:
return None

return Price(price, conf)


def manual_aggregate(prices: List[float]) -> Tuple[float, float]:
"""
This function is used to manually aggregate the prices of the active publishers. This is a very simple
implementation that does not get the aggregate and confidence accurately but it is good enough for our use case.
On this implementation, if the aggregate or confidence are not an element of the list, then we consider the
rightmost element lower than them in the list. For example, if the list is [1, 2, 3, 4] instead of using
median 2.5 as aggregate we use 2.
"""
prices.sort()
num_prices = len(prices)

agg_price = prices[num_prices // 2]

agg_confidence_interval_left = agg_price - prices[num_prices // 4]
agg_confidence_interval_right = prices[num_prices * 3 // 4] - agg_price

agg_confidence_interval = max(
agg_confidence_interval_left, agg_confidence_interval_right
)

return agg_price, agg_confidence_interval
Empty file added publisher/tests/__init__.py
Empty file.
11 changes: 11 additions & 0 deletions publisher/tests/test_pyth_replicator_manual_aggregate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import random
from ..providers.pyth_replicator import manual_aggregate


def test_manual_aggregate_works():
prices = [1, 2, 3, 4, 5, 6, 8, 10, 12, 14]
random.shuffle(prices)

agg_price, agg_confidence_interval = manual_aggregate(prices)
assert agg_price == 6
assert agg_confidence_interval == 4
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

setup(
name="example-publisher",
version="0.1.1",
version="1.0.0",
author="Pyth Data Association",
author_email="",
packages=find_packages(exclude=["tests"]),
Expand Down
2 changes: 0 additions & 2 deletions tests/test_publisher.py

This file was deleted.