From 64fc83936c0ad6f319093c77d1cbe421082a7eb7 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Wed, 1 Mar 2023 18:35:12 +0000 Subject: [PATCH 1/3] Add manual aggregation to pyth replicator --- publisher/__init__.py | 0 publisher/config.py | 6 ++ publisher/providers/pyth_replicator.py | 69 +++++++++++++++++-- publisher/tests/__init__.py | 0 .../test_pyth_replicator_manual_aggregate.py | 11 +++ setup.py | 2 +- tests/test_publisher.py | 2 - 7 files changed, 81 insertions(+), 9 deletions(-) create mode 100644 publisher/__init__.py create mode 100644 publisher/tests/__init__.py create mode 100644 publisher/tests/test_pyth_replicator_manual_aggregate.py delete mode 100644 tests/test_publisher.py diff --git a/publisher/__init__.py b/publisher/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/publisher/config.py b/publisher/config.py index cc3fa36..172e591 100644 --- a/publisher/config.py +++ b/publisher/config.py @@ -32,6 +32,12 @@ class PythReplicatorConfig: first_mapping: str program_key: str staleness_time_in_secs: int = ts.option(default=30) + # Manual aggregation is aggregating the prices of the publishers and ignoring + # the min_publishers when aggregate price status is not TRADING. + manual_agg_enabled: bool = ts.option(default=True) + # The maximum slot difference to consider a publisher for manual aggregation + # when the aggregate price status is not TRADING. + manual_agg_max_slot_diff: int = ts.option(default=25) account_update_interval_secs: int = ts.option(default=300) diff --git a/publisher/providers/pyth_replicator.py b/publisher/providers/pyth_replicator.py index bb06bce..99f1530 100644 --- a/publisher/providers/pyth_replicator.py +++ b/publisher/providers/pyth_replicator.py @@ -1,7 +1,7 @@ import asyncio from typing import Dict, List, Optional, Tuple from pythclient.pythclient import PythClient -from pythclient.pythaccounts import PythPriceAccount +from pythclient.pythaccounts import PythPriceAccount, PythPriceStatus import time @@ -46,11 +46,47 @@ async def _update_loop(self) -> None: if isinstance(update, PythPriceAccount): symbol = update.product.symbol - self._prices[symbol] = [ - update.aggregate_price, - update.aggregate_price_confidence_interval, - update.timestamp, - ] + if self._prices.get(symbol) is None: + self._prices[symbol] = [None, None, None] + + if update.aggregate_price_status == PythPriceStatus.TRADING: + self._prices[symbol] = [ + update.aggregate_price, + update.aggregate_price_confidence_interval, + update.timestamp, + ] + elif self._config.manual_agg_enabled: + # Do the manual aggregation based on the recent active publishers + # and their confidence intervals if possible. This will allow us to + # get an aggregate if there are some active publishers but they are + # not enough to reach the min_publishers threshold. + prices = [] + + current_slot = update.slot + for price_component in update.price_components: + price = price_component.latest_price_info + if ( + price.price_status == PythPriceStatus.TRADING + and current_slot - price.pub_slot + <= self._config.manual_agg_max_slot_diff + ): + prices.extend( + [ + price.price - price.confidence_interval, + price.price, + price.price + price.confidence_interval, + ] + ) + break + + if prices: + agg_price, agg_confidence_interval = manual_aggregate(prices) + + self._prices[symbol] = [ + agg_price, + agg_confidence_interval, + update.timestamp, + ] log.info( "Received a price update", symbol=symbol, price=self._prices[symbol] @@ -86,3 +122,24 @@ def latest_price(self, symbol: Symbol) -> Optional[Price]: return None return Price(price, conf) + + +def manual_aggregate(prices: List[float]) -> Tuple[float, float]: + """ + This function is used to manually aggregate the prices of the active publishers. This is a very simple + implementation that does not get the median and confidence accurately but it is good enough for our use case. + """ + prices.sort() + no_prices = len(prices) + + # Ensure the price never goes below zero + agg_price = max(0, prices[no_prices // 2]) + + agg_confidence_interval_left = agg_price - prices[no_prices // 4] + agg_confidence_interval_right = prices[no_prices * 3 // 4] - agg_price + + # Make sure agg_price - agg_confidence interval is never negative + agg_confidence_interval = min( + agg_price, max(agg_confidence_interval_left, agg_confidence_interval_right, 0) + ) + return agg_price, agg_confidence_interval diff --git a/publisher/tests/__init__.py b/publisher/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/publisher/tests/test_pyth_replicator_manual_aggregate.py b/publisher/tests/test_pyth_replicator_manual_aggregate.py new file mode 100644 index 0000000..45011f7 --- /dev/null +++ b/publisher/tests/test_pyth_replicator_manual_aggregate.py @@ -0,0 +1,11 @@ +import random +from ..providers.pyth_replicator import manual_aggregate + + +def test_manual_aggregate_works(): + prices = [1, 2, 3, 4, 5, 6, 8, 10, 12, 14] + random.shuffle(prices) + + agg_price, agg_confidence_interval = manual_aggregate(prices) + assert agg_price == 6 + assert agg_confidence_interval == 4 diff --git a/setup.py b/setup.py index ed55623..577c1b9 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="example-publisher", - version="0.1.1", + version="1.0.0", author="Pyth Data Association", author_email="", packages=find_packages(exclude=["tests"]), diff --git a/tests/test_publisher.py b/tests/test_publisher.py deleted file mode 100644 index 201975f..0000000 --- a/tests/test_publisher.py +++ /dev/null @@ -1,2 +0,0 @@ -def test_placeholder(): - pass From f2f26515388155a295c352042459c7cdef320fb1 Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Wed, 1 Mar 2023 18:38:34 +0000 Subject: [PATCH 2/3] Update comment --- publisher/config.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/publisher/config.py b/publisher/config.py index 172e591..0ea13d5 100644 --- a/publisher/config.py +++ b/publisher/config.py @@ -33,7 +33,9 @@ class PythReplicatorConfig: program_key: str staleness_time_in_secs: int = ts.option(default=30) # Manual aggregation is aggregating the prices of the publishers and ignoring - # the min_publishers when aggregate price status is not TRADING. + # the min_publishers when aggregate price status is not TRADING. This will improve + # the feed uptime but reduces the accuracy of the feed. One benefit of this feature + # is mirroring coming soon feeds on the target network that have some beta publishers. manual_agg_enabled: bool = ts.option(default=True) # The maximum slot difference to consider a publisher for manual aggregation # when the aggregate price status is not TRADING. From 19f435e84ae0df664c984bc5174ef76ead84217c Mon Sep 17 00:00:00 2001 From: Ali Behjati Date: Thu, 2 Mar 2023 09:19:29 +0000 Subject: [PATCH 3/3] Address comments --- publisher/providers/pyth_replicator.py | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/publisher/providers/pyth_replicator.py b/publisher/providers/pyth_replicator.py index 99f1530..9b2a163 100644 --- a/publisher/providers/pyth_replicator.py +++ b/publisher/providers/pyth_replicator.py @@ -127,19 +127,21 @@ def latest_price(self, symbol: Symbol) -> Optional[Price]: def manual_aggregate(prices: List[float]) -> Tuple[float, float]: """ This function is used to manually aggregate the prices of the active publishers. This is a very simple - implementation that does not get the median and confidence accurately but it is good enough for our use case. + implementation that does not get the aggregate and confidence accurately but it is good enough for our use case. + On this implementation, if the aggregate or confidence are not an element of the list, then we consider the + rightmost element lower than them in the list. For example, if the list is [1, 2, 3, 4] instead of using + median 2.5 as aggregate we use 2. """ prices.sort() - no_prices = len(prices) + num_prices = len(prices) - # Ensure the price never goes below zero - agg_price = max(0, prices[no_prices // 2]) + agg_price = prices[num_prices // 2] - agg_confidence_interval_left = agg_price - prices[no_prices // 4] - agg_confidence_interval_right = prices[no_prices * 3 // 4] - agg_price + agg_confidence_interval_left = agg_price - prices[num_prices // 4] + agg_confidence_interval_right = prices[num_prices * 3 // 4] - agg_price - # Make sure agg_price - agg_confidence interval is never negative - agg_confidence_interval = min( - agg_price, max(agg_confidence_interval_left, agg_confidence_interval_right, 0) + agg_confidence_interval = max( + agg_confidence_interval_left, agg_confidence_interval_right ) + return agg_price, agg_confidence_interval