|  | 
| 17 | 17 | ) | 
| 18 | 18 | from throttler import Throttler | 
| 19 | 19 | 
 | 
|  | 20 | +import pyth_observer.health_server as health_server | 
| 20 | 21 | from pyth_observer.check.price_feed import PriceFeedState | 
| 21 | 22 | from pyth_observer.check.publisher import PublisherState | 
| 22 | 23 | from pyth_observer.coingecko import Symbol, get_coingecko_prices | 
| @@ -72,98 +73,106 @@ def __init__( | 
| 72 | 73 | 
 | 
| 73 | 74 |     async def run(self): | 
| 74 | 75 |         while True: | 
| 75 |  | -            logger.info("Running checks") | 
| 76 |  | - | 
| 77 |  | -            products = await self.get_pyth_products() | 
| 78 |  | -            coingecko_prices, coingecko_updates = await self.get_coingecko_prices() | 
| 79 |  | -            crosschain_prices = await self.get_crosschain_prices() | 
| 80 |  | - | 
| 81 |  | -            for product in products: | 
| 82 |  | -                # Skip tombstone accounts with blank metadata | 
| 83 |  | -                if "base" not in product.attrs: | 
| 84 |  | -                    continue | 
| 85 |  | - | 
| 86 |  | -                if not product.first_price_account_key: | 
| 87 |  | -                    continue | 
| 88 |  | - | 
| 89 |  | -                # For each product, we build a list of price feed states (one | 
| 90 |  | -                # for each price account) and a list of publisher states (one | 
| 91 |  | -                # for each publisher). | 
| 92 |  | -                states = [] | 
| 93 |  | -                price_accounts = await self.get_pyth_prices(product) | 
| 94 |  | - | 
| 95 |  | -                crosschain_price = crosschain_prices.get( | 
| 96 |  | -                    b58decode(product.first_price_account_key.key).hex(), None | 
| 97 |  | -                ) | 
| 98 |  | - | 
| 99 |  | -                for _, price_account in price_accounts.items(): | 
| 100 |  | -                    # Handle potential None for min_publishers | 
| 101 |  | -                    if ( | 
| 102 |  | -                        price_account.min_publishers is None | 
| 103 |  | -                        # When min_publishers is high it means that the price is not production-ready | 
| 104 |  | -                        # yet and it is still being tested. We need no alerting for these prices. | 
| 105 |  | -                        or price_account.min_publishers >= 10 | 
| 106 |  | -                    ): | 
|  | 76 | +            try: | 
|  | 77 | +                logger.info("Running checks") | 
|  | 78 | + | 
|  | 79 | +                products = await self.get_pyth_products() | 
|  | 80 | +                coingecko_prices, coingecko_updates = await self.get_coingecko_prices() | 
|  | 81 | +                crosschain_prices = await self.get_crosschain_prices() | 
|  | 82 | + | 
|  | 83 | +                health_server.observer_ready = True | 
|  | 84 | + | 
|  | 85 | +                for product in products: | 
|  | 86 | +                    # Skip tombstone accounts with blank metadata | 
|  | 87 | +                    if "base" not in product.attrs: | 
|  | 88 | +                        continue | 
|  | 89 | + | 
|  | 90 | +                    if not product.first_price_account_key: | 
| 107 | 91 |                         continue | 
| 108 | 92 | 
 | 
| 109 |  | -                    # Ensure latest_block_slot is not None or provide a default value | 
| 110 |  | -                    latest_block_slot = ( | 
| 111 |  | -                        price_account.slot if price_account.slot is not None else -1 | 
|  | 93 | +                    # For each product, we build a list of price feed states (one | 
|  | 94 | +                    # for each price account) and a list of publisher states (one | 
|  | 95 | +                    # for each publisher). | 
|  | 96 | +                    states = [] | 
|  | 97 | +                    price_accounts = await self.get_pyth_prices(product) | 
|  | 98 | + | 
|  | 99 | +                    crosschain_price = crosschain_prices.get( | 
|  | 100 | +                        b58decode(product.first_price_account_key.key).hex(), None | 
| 112 | 101 |                     ) | 
| 113 | 102 | 
 | 
| 114 |  | -                    if not price_account.aggregate_price_status: | 
| 115 |  | -                        raise RuntimeError("Price account status is missing") | 
| 116 |  | - | 
| 117 |  | -                    if not price_account.aggregate_price_info: | 
| 118 |  | -                        raise RuntimeError("Aggregate price info is missing") | 
| 119 |  | - | 
| 120 |  | -                    states.append( | 
| 121 |  | -                        PriceFeedState( | 
| 122 |  | -                            symbol=product.attrs["symbol"], | 
| 123 |  | -                            asset_type=product.attrs["asset_type"], | 
| 124 |  | -                            schedule=MarketSchedule(product.attrs["schedule"]), | 
| 125 |  | -                            public_key=price_account.key, | 
| 126 |  | -                            status=price_account.aggregate_price_status, | 
| 127 |  | -                            # this is the solana block slot when price account was fetched | 
| 128 |  | -                            latest_block_slot=latest_block_slot, | 
| 129 |  | -                            latest_trading_slot=price_account.last_slot, | 
| 130 |  | -                            price_aggregate=price_account.aggregate_price_info.price, | 
| 131 |  | -                            confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, | 
| 132 |  | -                            coingecko_price=coingecko_prices.get(product.attrs["base"]), | 
| 133 |  | -                            coingecko_update=coingecko_updates.get( | 
| 134 |  | -                                product.attrs["base"] | 
| 135 |  | -                            ), | 
| 136 |  | -                            crosschain_price=crosschain_price, | 
|  | 103 | +                    for _, price_account in price_accounts.items(): | 
|  | 104 | +                        # Handle potential None for min_publishers | 
|  | 105 | +                        if ( | 
|  | 106 | +                            price_account.min_publishers is None | 
|  | 107 | +                            # When min_publishers is high it means that the price is not production-ready | 
|  | 108 | +                            # yet and it is still being tested. We need no alerting for these prices. | 
|  | 109 | +                            or price_account.min_publishers >= 10 | 
|  | 110 | +                        ): | 
|  | 111 | +                            continue | 
|  | 112 | + | 
|  | 113 | +                        # Ensure latest_block_slot is not None or provide a default value | 
|  | 114 | +                        latest_block_slot = ( | 
|  | 115 | +                            price_account.slot if price_account.slot is not None else -1 | 
| 137 | 116 |                         ) | 
| 138 |  | -                    ) | 
| 139 | 117 | 
 | 
| 140 |  | -                    for component in price_account.price_components: | 
| 141 |  | -                        pub = self.publishers.get(component.publisher_key.key, None) | 
| 142 |  | -                        publisher_name = ( | 
| 143 |  | -                            (pub.name if pub else "") | 
| 144 |  | -                            + f" ({component.publisher_key.key})" | 
| 145 |  | -                        ).strip() | 
|  | 118 | +                        if not price_account.aggregate_price_status: | 
|  | 119 | +                            raise RuntimeError("Price account status is missing") | 
|  | 120 | + | 
|  | 121 | +                        if not price_account.aggregate_price_info: | 
|  | 122 | +                            raise RuntimeError("Aggregate price info is missing") | 
|  | 123 | + | 
| 146 | 124 |                         states.append( | 
| 147 |  | -                            PublisherState( | 
| 148 |  | -                                publisher_name=publisher_name, | 
|  | 125 | +                            PriceFeedState( | 
| 149 | 126 |                                 symbol=product.attrs["symbol"], | 
| 150 | 127 |                                 asset_type=product.attrs["asset_type"], | 
| 151 | 128 |                                 schedule=MarketSchedule(product.attrs["schedule"]), | 
| 152 |  | -                                public_key=component.publisher_key, | 
| 153 |  | -                                confidence_interval=component.latest_price_info.confidence_interval, | 
| 154 |  | -                                confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, | 
| 155 |  | -                                price=component.latest_price_info.price, | 
| 156 |  | -                                price_aggregate=price_account.aggregate_price_info.price, | 
| 157 |  | -                                slot=component.latest_price_info.pub_slot, | 
| 158 |  | -                                aggregate_slot=price_account.last_slot, | 
|  | 129 | +                                public_key=price_account.key, | 
|  | 130 | +                                status=price_account.aggregate_price_status, | 
| 159 | 131 |                                 # this is the solana block slot when price account was fetched | 
| 160 | 132 |                                 latest_block_slot=latest_block_slot, | 
| 161 |  | -                                status=component.latest_price_info.price_status, | 
| 162 |  | -                                aggregate_status=price_account.aggregate_price_status, | 
|  | 133 | +                                latest_trading_slot=price_account.last_slot, | 
|  | 134 | +                                price_aggregate=price_account.aggregate_price_info.price, | 
|  | 135 | +                                confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, | 
|  | 136 | +                                coingecko_price=coingecko_prices.get( | 
|  | 137 | +                                    product.attrs["base"] | 
|  | 138 | +                                ), | 
|  | 139 | +                                coingecko_update=coingecko_updates.get( | 
|  | 140 | +                                    product.attrs["base"] | 
|  | 141 | +                                ), | 
|  | 142 | +                                crosschain_price=crosschain_price, | 
| 163 | 143 |                             ) | 
| 164 | 144 |                         ) | 
| 165 | 145 | 
 | 
| 166 |  | -                await self.dispatch.run(states) | 
|  | 146 | +                        for component in price_account.price_components: | 
|  | 147 | +                            pub = self.publishers.get(component.publisher_key.key, None) | 
|  | 148 | +                            publisher_name = ( | 
|  | 149 | +                                (pub.name if pub else "") | 
|  | 150 | +                                + f" ({component.publisher_key.key})" | 
|  | 151 | +                            ).strip() | 
|  | 152 | +                            states.append( | 
|  | 153 | +                                PublisherState( | 
|  | 154 | +                                    publisher_name=publisher_name, | 
|  | 155 | +                                    symbol=product.attrs["symbol"], | 
|  | 156 | +                                    asset_type=product.attrs["asset_type"], | 
|  | 157 | +                                    schedule=MarketSchedule(product.attrs["schedule"]), | 
|  | 158 | +                                    public_key=component.publisher_key, | 
|  | 159 | +                                    confidence_interval=component.latest_price_info.confidence_interval, | 
|  | 160 | +                                    confidence_interval_aggregate=price_account.aggregate_price_info.confidence_interval, | 
|  | 161 | +                                    price=component.latest_price_info.price, | 
|  | 162 | +                                    price_aggregate=price_account.aggregate_price_info.price, | 
|  | 163 | +                                    slot=component.latest_price_info.pub_slot, | 
|  | 164 | +                                    aggregate_slot=price_account.last_slot, | 
|  | 165 | +                                    # this is the solana block slot when price account was fetched | 
|  | 166 | +                                    latest_block_slot=latest_block_slot, | 
|  | 167 | +                                    status=component.latest_price_info.price_status, | 
|  | 168 | +                                    aggregate_status=price_account.aggregate_price_status, | 
|  | 169 | +                                ) | 
|  | 170 | +                            ) | 
|  | 171 | + | 
|  | 172 | +                    await self.dispatch.run(states) | 
|  | 173 | +            except Exception as e: | 
|  | 174 | +                logger.error(f"Error in run loop: {e}") | 
|  | 175 | +                health_server.observer_ready = False | 
| 167 | 176 | 
 | 
| 168 | 177 |             logger.debug("Sleeping...") | 
| 169 | 178 |             await asyncio.sleep(5) | 
|  | 
0 commit comments