From ee30d75ee439ca01ab385ab00ea42cda44fb0870 Mon Sep 17 00:00:00 2001 From: PatrickNercessian Date: Thu, 16 May 2024 22:20:41 +0200 Subject: [PATCH 01/13] Poll Transfer events, populate daily_fil table, reformat endpoint handling --- observer/bin/spark-observer.js | 2 + observer/lib/config.js | 22 + stats/lib/abi.json | 763 ++++++++++++++++++++ stats/lib/handler.js | 43 +- stats/lib/platform-routes.js | 31 +- stats/lib/platform-stats-fetchers.js | 11 + stats/lib/platform-stats-generator.js | 14 + stats/test/platform-routes.test.js | 46 ++ stats/test/platform-stats-generator.test.js | 73 ++ 9 files changed, 957 insertions(+), 48 deletions(-) create mode 100644 stats/lib/abi.json create mode 100644 stats/lib/platform-stats-generator.js create mode 100644 stats/test/platform-stats-generator.test.js diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js index 59c27e72..33e493f2 100644 --- a/observer/bin/spark-observer.js +++ b/observer/bin/spark-observer.js @@ -67,4 +67,6 @@ ieContract.on('Transfer', (to, amount, ...args) => { const blockNumber = args.pop() console.log('Transfer %s FIL to %s at epoch %s', amount, to, blockNumber) // TODO: update the database + const transferEvent = { to_address: to, amount } + updateDailyFilStats(pgPool, transferEvent) }) diff --git a/observer/lib/config.js b/observer/lib/config.js index d8aa67de..a138da86 100644 --- a/observer/lib/config.js +++ b/observer/lib/config.js @@ -1,4 +1,26 @@ export const { + IE_CONTRACT_ADDRESS = '0x8460766Edc62B525fc1FA4D628FC79229dC73031', + // FIXME Add back chain.love either when it's online or once onContractEvent + // supports rpc failover + // RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1', + RPC_URLS = 'https://api.node.glif.io/rpc/v0', + GLIF_TOKEN, // DATABASE_URL points to `spark_stats` database managed by this monorepo DATABASE_URL = 'postgres://localhost:5432/spark_stats' } = process.env + +const rpcUrls = RPC_URLS.split(',') +const RPC_URL = rpcUrls[Math.floor(Math.random() * rpcUrls.length)] +console.log(`Selected JSON-RPC endpoint ${RPC_URL}`) + +const rpcHeaders = {} +if (RPC_URL.includes('glif')) { + rpcHeaders.Authorization = `Bearer ${GLIF_TOKEN}` +} + +export { + IE_CONTRACT_ADDRESS, + RPC_URL, + DATABASE_URL, + rpcHeaders +} diff --git a/stats/lib/abi.json b/stats/lib/abi.json new file mode 100644 index 00000000..851fee03 --- /dev/null +++ b/stats/lib/abi.json @@ -0,0 +1,763 @@ +[ + { + "type": "constructor", + "inputs": [ + { + "internalType": "address", + "name": "admin", + "type": "address" + } + ] + }, + { + "type": "function", + "name": "DEFAULT_ADMIN_ROLE", + "inputs": [], + "outputs": [ + { + "internalType": "bytes32", + "name": "", + "type": "bytes32" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "EVALUATE_ROLE", + "inputs": [], + "outputs": [ + { + "internalType": "bytes32", + "name": "", + "type": "bytes32" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "MAX_SCORE", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "MEASURE_ROLE", + "inputs": [], + "outputs": [ + { + "internalType": "bytes32", + "name": "", + "type": "bytes32" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "addBalances", + "inputs": [ + { + "internalType": "address payable[]", + "name": "addresses", + "type": "address[]" + }, + { + "internalType": "uint256[]", + "name": "_balances", + "type": "uint256[]" + } + ], + "outputs": [], + "stateMutability": "payable" + }, + { + "type": "function", + "name": "addMeasurements", + "inputs": [ + { + "internalType": "string", + "name": "cid", + "type": "string" + } + ], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "adminAdvanceRound", + "inputs": [], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "availableBalance", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "balanceHeld", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "balances", + "inputs": [ + { + "internalType": "address", + "name": "", + "type": "address" + } + ], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "currentRoundEndBlockNumber", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "currentRoundIndex", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "currentRoundRoundReward", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "disableWithdraw", + "inputs": [], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "getRoleAdmin", + "inputs": [ + { + "internalType": "bytes32", + "name": "role", + "type": "bytes32" + } + ], + "outputs": [ + { + "internalType": "bytes32", + "name": "", + "type": "bytes32" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "grantRole", + "inputs": [ + { + "internalType": "bytes32", + "name": "role", + "type": "bytes32" + }, + { + "internalType": "address", + "name": "account", + "type": "address" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "hasRole", + "inputs": [ + { + "internalType": "bytes32", + "name": "role", + "type": "bytes32" + }, + { + "internalType": "address", + "name": "account", + "type": "address" + } + ], + "outputs": [ + { + "internalType": "bool", + "name": "", + "type": "bool" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "maxTransfersPerTx", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "minBalanceForTransfer", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "nextRoundLength", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "participantCountReadyForTransfer", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "participantCountScheduledForTransfer", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "previousRoundIndex", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "previousRoundRoundReward", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "previousRoundTotalScores", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "readyForTransfer", + "inputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "outputs": [ + { + "internalType": "address payable", + "name": "", + "type": "address" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "releaseRewards", + "inputs": [], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "renounceRole", + "inputs": [ + { + "internalType": "bytes32", + "name": "role", + "type": "bytes32" + }, + { + "internalType": "address", + "name": "callerConfirmation", + "type": "address" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "revokeRole", + "inputs": [ + { + "internalType": "bytes32", + "name": "role", + "type": "bytes32" + }, + { + "internalType": "address", + "name": "account", + "type": "address" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "rewardsScheduledFor", + "inputs": [ + { + "internalType": "address", + "name": "participant", + "type": "address" + } + ], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "roundReward", + "inputs": [], + "outputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "scheduledForTransfer", + "inputs": [ + { + "internalType": "uint256", + "name": "", + "type": "uint256" + } + ], + "outputs": [ + { + "internalType": "address payable", + "name": "", + "type": "address" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "setMaxTransfersPerTx", + "inputs": [ + { + "internalType": "uint256", + "name": "_maxTransfersPerTx", + "type": "uint256" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "setMinBalanceForTransfer", + "inputs": [ + { + "internalType": "uint256", + "name": "_minBalanceForTransfer", + "type": "uint256" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "setNextRoundLength", + "inputs": [ + { + "internalType": "uint256", + "name": "_nextRoundLength", + "type": "uint256" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "setRoundReward", + "inputs": [ + { + "internalType": "uint256", + "name": "_roundReward", + "type": "uint256" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "setScores", + "inputs": [ + { + "internalType": "uint256", + "name": "roundIndex", + "type": "uint256" + }, + { + "internalType": "address payable[]", + "name": "addresses", + "type": "address[]" + }, + { + "internalType": "uint256[]", + "name": "scores", + "type": "uint256[]" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "supportsInterface", + "inputs": [ + { + "internalType": "bytes4", + "name": "interfaceId", + "type": "bytes4" + } + ], + "outputs": [ + { + "internalType": "bool", + "name": "", + "type": "bool" + } + ], + "stateMutability": "view" + }, + { + "type": "function", + "name": "tick", + "inputs": [], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "withdraw", + "inputs": [ + { + "internalType": "address payable", + "name": "destination", + "type": "address" + } + ], + "outputs": [], + "stateMutability": "nonpayable" + }, + { + "type": "function", + "name": "withdrawDisabled", + "inputs": [], + "outputs": [ + { + "internalType": "bool", + "name": "", + "type": "bool" + } + ], + "stateMutability": "view" + }, + { + "type": "event", + "name": "MeasurementsAdded", + "inputs": [ + { + "name": "cid", + "type": "string", + "indexed": false + }, + { + "name": "roundIndex", + "type": "uint256", + "indexed": true + }, + { + "name": "sender", + "type": "address", + "indexed": true + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "RoleAdminChanged", + "inputs": [ + { + "name": "role", + "type": "bytes32", + "indexed": true + }, + { + "name": "previousAdminRole", + "type": "bytes32", + "indexed": true + }, + { + "name": "newAdminRole", + "type": "bytes32", + "indexed": true + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "RoleGranted", + "inputs": [ + { + "name": "role", + "type": "bytes32", + "indexed": true + }, + { + "name": "account", + "type": "address", + "indexed": true + }, + { + "name": "sender", + "type": "address", + "indexed": true + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "RoleRevoked", + "inputs": [ + { + "name": "role", + "type": "bytes32", + "indexed": true + }, + { + "name": "account", + "type": "address", + "indexed": true + }, + { + "name": "sender", + "type": "address", + "indexed": true + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "RoundStart", + "inputs": [ + { + "name": "roundIndex", + "type": "uint256", + "indexed": false + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "Transfer", + "inputs": [ + { + "name": "to", + "type": "address", + "indexed": true + }, + { + "name": "amount", + "type": "uint256", + "indexed": false + } + ], + "anonymous": false + }, + { + "type": "event", + "name": "TransferFailed", + "inputs": [ + { + "name": "to", + "type": "address", + "indexed": true + }, + { + "name": "amount", + "type": "uint256", + "indexed": false + } + ], + "anonymous": false + }, + { + "type": "error", + "name": "AccessControlBadConfirmation", + "inputs": [] + }, + { + "type": "error", + "name": "AccessControlUnauthorizedAccount", + "inputs": [ + { + "internalType": "address", + "name": "account", + "type": "address" + }, + { + "internalType": "bytes32", + "name": "neededRole", + "type": "bytes32" + } + ] + }, + { + "type": "receive" + } + ] + \ No newline at end of file diff --git a/stats/lib/handler.js b/stats/lib/handler.js index 16a15151..0865da91 100644 --- a/stats/lib/handler.js +++ b/stats/lib/handler.js @@ -42,41 +42,24 @@ const handler = async (req, res, pgPool) => { // Caveat! `new URL('//foo', 'http://127.0.0.1')` would produce "http://foo/" - not what we want! const { pathname, searchParams } = new URL(`http://127.0.0.1${req.url}`) const segs = pathname.split('/').filter(Boolean) - if (req.method === 'GET' && segs[0] === 'retrieval-success-rate' && segs.length === 1) { - await getStatsWithFilterAndCaching( - pathname, - searchParams, - res, - pgPool, - fetchRetrievalSuccessRate) - } else if (req.method === 'GET' && segs[0] === 'participants' && segs[1] === 'daily' && segs.length === 2) { - await getStatsWithFilterAndCaching( - pathname, - searchParams, - res, - pgPool, - fetchDailyParticipants) - } else if (req.method === 'GET' && segs[0] === 'participants' && segs[1] === 'monthly' && segs.length === 2) { - await getStatsWithFilterAndCaching( - pathname, - searchParams, - res, - pgPool, - fetchMonthlyParticipants) - } else if (req.method === 'GET' && segs.join('/') === 'participants/change-rates') { - await getStatsWithFilterAndCaching( - pathname, - searchParams, - res, - pgPool, - fetchParticipantChangeRates) - } else if (req.method === 'GET' && segs.join('/') === 'miners/retrieval-success-rate/summary') { + + const fetchFunctionMap = { + 'retrieval-success-rate': fetchRetrievalSuccessRate, + 'participants/daily': fetchDailyParticipants, + 'participants/monthly': fetchMonthlyParticipants, + 'participants/change-rates': fetchParticipantChangeRates, + 'miners/retrieval-success-rate/summary': fetchMinersRSRSummary + } + + const fetchStatsFn = fetchFunctionMap[segs.join('/')] + if (req.method === 'GET' && fetchStatsFn) { await getStatsWithFilterAndCaching( pathname, searchParams, res, pgPool, - fetchMinersRSRSummary) + fetchStatsFn + ) } else if (await handlePlatformRoutes(req, res, pgPool)) { // no-op, request was handled by handlePlatformRoute } else { diff --git a/stats/lib/platform-routes.js b/stats/lib/platform-routes.js index 36ec1441..23fb38c1 100644 --- a/stats/lib/platform-routes.js +++ b/stats/lib/platform-routes.js @@ -2,6 +2,7 @@ import { getStatsWithFilterAndCaching } from './request-helpers.js' import { fetchDailyStationCount, fetchMonthlyStationCount, + fetchDailyFilSent, fetchDailyStationAcceptedMeasurementCount } from './platform-stats-fetchers.js' @@ -9,29 +10,23 @@ export const handlePlatformRoutes = async (req, res, pgPool) => { // Caveat! `new URL('//foo', 'http://127.0.0.1')` would produce "http://foo/" - not what we want! const { pathname, searchParams } = new URL(`http://127.0.0.1${req.url}`) const segs = pathname.split('/').filter(Boolean) - if (req.method === 'GET' && segs[0] === 'stations' && segs[1] === 'daily' && segs.length === 2) { - await getStatsWithFilterAndCaching( - pathname, - searchParams, - res, - pgPool, - fetchDailyStationCount) - return true - } else if (req.method === 'GET' && segs[0] === 'stations' && segs[1] === 'monthly' && segs.length === 2) { - await getStatsWithFilterAndCaching( - pathname, - searchParams, - res, - pgPool, - fetchMonthlyStationCount) - return true - } else if (req.method === 'GET' && segs[0] === 'measurements' && segs[1] === 'daily' && segs.length === 2) { + + const fetchFunctionMap = { + 'stations/daily': fetchDailyStationCount, + 'stations/monthly': fetchMonthlyStationCount, + 'measurements/daily': fetchDailyStationAcceptedMeasurementCount, + 'fil/daily': fetchDailyFilSent + } + + const fetchStatsFn = fetchFunctionMap[segs.join('/')] + if (req.method === 'GET' && fetchStatsFn) { await getStatsWithFilterAndCaching( pathname, searchParams, res, pgPool, - fetchDailyStationAcceptedMeasurementCount) + fetchStatsFn + ) return true } else if (req.method === 'GET' && segs.length === 0) { // health check - required by Grafana datasources diff --git a/stats/lib/platform-stats-fetchers.js b/stats/lib/platform-stats-fetchers.js index edb29e96..68eda328 100644 --- a/stats/lib/platform-stats-fetchers.js +++ b/stats/lib/platform-stats-fetchers.js @@ -40,3 +40,14 @@ export const fetchDailyStationAcceptedMeasurementCount = async (pgPool, filter) `, [filter.from, filter.to]) return rows } + +export const fetchDailyFilSent = async (pgPool, filter) => { + const { rows } = await pgPool.query(` + SELECT day::TEXT, SUM(amount) as amount + FROM daily_fil + WHERE day >= $1 AND day <= $2 + GROUP BY day + ORDER BY day + `, [filter.from, filter.to]) + return rows +} diff --git a/stats/lib/platform-stats-generator.js b/stats/lib/platform-stats-generator.js new file mode 100644 index 00000000..9e8fad7c --- /dev/null +++ b/stats/lib/platform-stats-generator.js @@ -0,0 +1,14 @@ +/** + * @param {import('pg').Client} pgClient + * @param {Object} filEvent + */ +export const updateDailyFilStats = async (pgClient, filEvent) => { + console.log('Event:', filEvent) + + await pgClient.query(` + INSERT INTO daily_fil (day, to_address, amount) + VALUES (now(), $1, $2) + ON CONFLICT (day, to_address) DO UPDATE + SET amount = daily_fil.amount + EXCLUDED.amount + `, [filEvent.to_address, filEvent.amount]) +} diff --git a/stats/test/platform-routes.test.js b/stats/test/platform-routes.test.js index c9fe5504..d37928eb 100644 --- a/stats/test/platform-routes.test.js +++ b/stats/test/platform-routes.test.js @@ -44,6 +44,7 @@ describe('Platform Routes HTTP request handler', () => { beforeEach(async () => { await pgPool.query('DELETE FROM daily_stations') + await pgPool.query('DELETE FROM daily_fil') }) describe('GET /stations/daily', () => { @@ -153,6 +154,39 @@ describe('Platform Routes HTTP request handler', () => { ]) }) }) + + describe('GET /fil/daily', () => { + it('returns daily total FIL sent for the given date range', async () => { + await givenDailyFilMetrics(pgPool, '2024-01-10', [ + { to_address: 'to1', amount: 100 } + ]) + await givenDailyFilMetrics(pgPool, '2024-01-11', [ + { to_address: 'to2', amount: 150 } + ]) + await givenDailyFilMetrics(pgPool, '2024-01-12', [ + { to_address: 'to2', amount: 300 }, + { to_address: 'to3', amount: 250 } + ]) + await givenDailyFilMetrics(pgPool, '2024-01-13', [ + { to_address: 'to1', amount: 100 } + ]) + + const res = await fetch( + new URL( + '/fil/daily?from=2024-01-11&to=2024-01-12', + baseUrl + ), { + redirect: 'manual' + } + ) + await assertResponseStatus(res, 200) + const metrics = await res.json() + assert.deepStrictEqual(metrics, [ + { day: '2024-01-11', amount: '150' }, + { day: '2024-01-12', amount: '550' } + ]) + }) + }) }) const givenDailyStationMetrics = async (pgPool, day, stationStats) => { @@ -166,3 +200,15 @@ const givenDailyStationMetrics = async (pgPool, day, stationStats) => { stationStats.map(s => s.accepted_measurement_count) ]) } + +const givenDailyFilMetrics = async (pgPool, day, filStats) => { + await pgPool.query(` + INSERT INTO daily_fil (day, to_address, amount) + SELECT $1 AS day, UNNEST($2::text[]) AS to_address, UNNEST($3::int[]) AS amount + ON CONFLICT DO NOTHING + `, [ + day, + filStats.map(s => s.to_address), + filStats.map(s => s.amount) + ]) +} diff --git a/stats/test/platform-stats-generator.test.js b/stats/test/platform-stats-generator.test.js new file mode 100644 index 00000000..2734d900 --- /dev/null +++ b/stats/test/platform-stats-generator.test.js @@ -0,0 +1,73 @@ +import assert from 'node:assert' +import pg from 'pg' +import { beforeEach, describe, it } from 'mocha' + +import { DATABASE_URL } from '../lib/config.js' +import { updateDailyFilStats } from '../lib/platform-stats-generator.js' + +const createPgClient = async () => { + const pgClient = new pg.Client({ connectionString: DATABASE_URL }) + await pgClient.connect() + return pgClient +} + +describe('platform-stats-generator', () => { + let pgClient + before(async () => { + pgClient = await createPgClient() + }) + + let today + beforeEach(async () => { + await pgClient.query('DELETE FROM daily_fil') + + // Run all tests inside a transaction to ensure `now()` always returns the same value + // See https://dba.stackexchange.com/a/63549/125312 + // This avoids subtle race conditions when the tests are executed around midnight. + await pgClient.query('BEGIN TRANSACTION') + today = await getCurrentDate() + }) + + afterEach(async () => { + await pgClient.query('END TRANSACTION') + }) + + after(async () => { + await pgClient.end() + }) + + describe('updateDailyFilStats', () => { + it('should correctly update daily FIL stats with new transfer events', async () => { + await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 100 }) + await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 200 }) + + const { rows } = await pgClient.query(` + SELECT day::TEXT, to_address, amount FROM daily_fil + WHERE to_address = $1 + `, ['address1']) + assert.strictEqual(rows.length, 1) + assert.deepStrictEqual(rows, [{ day: today, to_address: 'address1', amount: '300' }]) + }) + + it('should handle multiple addresses in daily FIL stats', async () => { + await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 50 }) + await updateDailyFilStats(pgClient, { to_address: 'address2', amount: 150 }) + + const { rows } = await pgClient.query(` + SELECT day::TEXT, to_address, amount FROM daily_fil + ORDER BY to_address + `) + assert.strictEqual(rows.length, 2) + + assert.deepStrictEqual(rows, [ + { day: today, to_address: 'address1', amount: '50' }, + { day: today, to_address: 'address2', amount: '150' } + ]) + }) + }) + + const getCurrentDate = async () => { + const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today') + return today + } +}) From f60d5b2723fb1ac7a3f3a62edf88e8ec700c92e1 Mon Sep 17 00:00:00 2001 From: PatrickNercessian Date: Thu, 30 May 2024 12:00:12 -0400 Subject: [PATCH 02/13] incomplete: use queryFilter for Event tracking history --- observer/bin/spark-observer.js | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js index 33e493f2..857b700a 100644 --- a/observer/bin/spark-observer.js +++ b/observer/bin/spark-observer.js @@ -62,11 +62,17 @@ await pgPool.query('SELECT 1') console.log('Listening for impact evaluator events') -ieContract.on('Transfer', (to, amount, ...args) => { - /** @type {number} */ - const blockNumber = args.pop() - console.log('Transfer %s FIL to %s at epoch %s', amount, to, blockNumber) - // TODO: update the database +// Listen for Transfer events from the IE contract +const onTransfer = async (to, amount, blockNumber) => { + // TODO update in database const transferEvent = { to_address: to, amount } updateDailyFilStats(pgPool, transferEvent) -}) +} +const lastCheckedBlock = 0 // TODO: get this from the database +ieContract.queryFilter(ieContract.filters.Transfer(), lastCheckedBlock) + .then(events => { + for (const event of events) { + console.log('Transfer %s FIL to %s at epoch %s', event.args.value, event.args.to, event.blockNumber) + onTransfer(event.args.to, event.args.value, event.blockNumber) + } +}) \ No newline at end of file From eb2a5c280dc332bb08077258f76049d3486ee28c Mon Sep 17 00:00:00 2001 From: PatrickNercessian Date: Thu, 30 May 2024 15:10:04 -0400 Subject: [PATCH 03/13] Merge with new repo structure, ensure total block history coverage --- migrations/001.do.daily-reward-transfers.sql | 11 + migrations/001.do.sql | 1 - observer/bin/spark-observer.js | 42 +- observer/lib/config.js | 4 +- observer/lib/platform-stats-generator.js | 23 + .../test/platform-stats-generator.test.js | 23 +- package-lock.json | 1 + stats/bin/spark-stats.js | 30 +- stats/lib/abi.json | 763 ------------------ stats/lib/handler.js | 17 +- stats/lib/platform-routes.js | 32 +- stats/lib/platform-stats-fetchers.js | 2 +- stats/lib/platform-stats-generator.js | 14 - stats/test/handler.test.js | 4 +- stats/test/platform-routes.test.js | 65 +- 15 files changed, 162 insertions(+), 870 deletions(-) create mode 100644 migrations/001.do.daily-reward-transfers.sql delete mode 100644 migrations/001.do.sql create mode 100644 observer/lib/platform-stats-generator.js rename {stats => observer}/test/platform-stats-generator.test.js (72%) delete mode 100644 stats/lib/abi.json delete mode 100644 stats/lib/platform-stats-generator.js diff --git a/migrations/001.do.daily-reward-transfers.sql b/migrations/001.do.daily-reward-transfers.sql new file mode 100644 index 00000000..8f1dc1b0 --- /dev/null +++ b/migrations/001.do.daily-reward-transfers.sql @@ -0,0 +1,11 @@ +CREATE TABLE daily_reward_transfers ( + day DATE NOT NULL, + to_address TEXT NOT NULL, + amount BIGINT NOT NULL, + PRIMARY KEY (day, to_address) +); + +CREATE TABLE reward_transfer_last_block ( + last_block INTEGER NOT NULL +); +INSERT INTO reward_transfer_last_block (last_block) VALUES (0); diff --git a/migrations/001.do.sql b/migrations/001.do.sql deleted file mode 100644 index 2e607d07..00000000 --- a/migrations/001.do.sql +++ /dev/null @@ -1 +0,0 @@ -SELECT now(); diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js index 857b700a..3383ffad 100644 --- a/observer/bin/spark-observer.js +++ b/observer/bin/spark-observer.js @@ -2,22 +2,8 @@ import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations' import { ethers } from 'ethers' import pg from 'pg' - -// TODO: move this to a config.js file -const { - DATABASE_URL = 'postgres://localhost:5432/spark_stats', - RPC_URLS = 'https://api.node.glif.io/rpc/v0', - GLIF_TOKEN -} = process.env - -const rpcUrls = RPC_URLS.split(',') -const RPC_URL = rpcUrls[Math.floor(Math.random() * rpcUrls.length)] -console.log(`Selected JSON-RPC endpoint ${RPC_URL}`) - -const rpcHeaders = {} -if (RPC_URL.includes('glif')) { - rpcHeaders.Authorization = `Bearer ${GLIF_TOKEN}` -} +import { RPC_URL, DATABASE_URL, rpcHeaders } from '../lib/config.js' +import { updateDailyFilStats } from '../lib/platform-stats-generator.js' // TODO: move this to a different file const fetchRequest = new ethers.FetchRequest(RPC_URL) @@ -62,17 +48,23 @@ await pgPool.query('SELECT 1') console.log('Listening for impact evaluator events') +// Get the last block we checked. Even though there should be only one row, use MAX just to be safe +const lastCheckedBlock = await pgPool.query( + 'SELECT MAX(last_block) AS last_block FROM reward_transfer_last_block' +).then(res => res.rows[0].last_block) + // Listen for Transfer events from the IE contract -const onTransfer = async (to, amount, blockNumber) => { - // TODO update in database - const transferEvent = { to_address: to, amount } - updateDailyFilStats(pgPool, transferEvent) -} -const lastCheckedBlock = 0 // TODO: get this from the database ieContract.queryFilter(ieContract.filters.Transfer(), lastCheckedBlock) .then(events => { for (const event of events) { - console.log('Transfer %s FIL to %s at epoch %s', event.args.value, event.args.to, event.blockNumber) - onTransfer(event.args.to, event.args.value, event.blockNumber) + console.log('%s FIL to %s at block %s', event.args.amount, event.args.to, event.blockNumber) + updateDailyFilStats( + pgPool, + { + to_address: event.args.to, + amount: event.args.amount, + blockNumber: event.blockNumber + } + ) } -}) \ No newline at end of file + }) diff --git a/observer/lib/config.js b/observer/lib/config.js index a138da86..89c1fde6 100644 --- a/observer/lib/config.js +++ b/observer/lib/config.js @@ -1,5 +1,4 @@ -export const { - IE_CONTRACT_ADDRESS = '0x8460766Edc62B525fc1FA4D628FC79229dC73031', +const { // FIXME Add back chain.love either when it's online or once onContractEvent // supports rpc failover // RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1', @@ -19,7 +18,6 @@ if (RPC_URL.includes('glif')) { } export { - IE_CONTRACT_ADDRESS, RPC_URL, DATABASE_URL, rpcHeaders diff --git a/observer/lib/platform-stats-generator.js b/observer/lib/platform-stats-generator.js new file mode 100644 index 00000000..d70e5cbe --- /dev/null +++ b/observer/lib/platform-stats-generator.js @@ -0,0 +1,23 @@ +/** + * @param {import('pg').Client} pgClient + * @param {Object} filEvent + * @param {string} filEvent.to_address + * @param {number} filEvent.amount + * @param {number} filEvent.blockNumber + */ +export const updateDailyFilStats = async (pgClient, filEvent) => { + await pgClient.query(` + INSERT INTO daily_reward_transfers (day, to_address, amount) + VALUES (now(), $1, $2) + ON CONFLICT (day, to_address) DO UPDATE + SET amount = daily_reward_transfers.amount + EXCLUDED.amount + `, [filEvent.to_address, filEvent.amount]) + + // Update the last_block in reward_transfer_last_block table + // For safety, only update if the new block number is greater than the existing one + await pgClient.query(` + UPDATE reward_transfer_last_block + SET last_block = $1 + WHERE $1 > last_block + `, [filEvent.blockNumber]) +} diff --git a/stats/test/platform-stats-generator.test.js b/observer/test/platform-stats-generator.test.js similarity index 72% rename from stats/test/platform-stats-generator.test.js rename to observer/test/platform-stats-generator.test.js index 2734d900..ea8f8531 100644 --- a/stats/test/platform-stats-generator.test.js +++ b/observer/test/platform-stats-generator.test.js @@ -19,7 +19,7 @@ describe('platform-stats-generator', () => { let today beforeEach(async () => { - await pgClient.query('DELETE FROM daily_fil') + await pgClient.query('DELETE FROM daily_reward_transfers') // Run all tests inside a transaction to ensure `now()` always returns the same value // See https://dba.stackexchange.com/a/63549/125312 @@ -38,11 +38,11 @@ describe('platform-stats-generator', () => { describe('updateDailyFilStats', () => { it('should correctly update daily FIL stats with new transfer events', async () => { - await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 100 }) - await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 200 }) + await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 100, blockNumber: 1 }) + await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 200, blockNumber: 1 }) const { rows } = await pgClient.query(` - SELECT day::TEXT, to_address, amount FROM daily_fil + SELECT day::TEXT, to_address, amount FROM daily_reward_transfers WHERE to_address = $1 `, ['address1']) assert.strictEqual(rows.length, 1) @@ -50,11 +50,11 @@ describe('platform-stats-generator', () => { }) it('should handle multiple addresses in daily FIL stats', async () => { - await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 50 }) - await updateDailyFilStats(pgClient, { to_address: 'address2', amount: 150 }) + await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 50, blockNumber: 1 }) + await updateDailyFilStats(pgClient, { to_address: 'address2', amount: 150, blockNumber: 1 }) const { rows } = await pgClient.query(` - SELECT day::TEXT, to_address, amount FROM daily_fil + SELECT day::TEXT, to_address, amount FROM daily_reward_transfers ORDER BY to_address `) assert.strictEqual(rows.length, 2) @@ -64,6 +64,15 @@ describe('platform-stats-generator', () => { { day: today, to_address: 'address2', amount: '150' } ]) }) + + it('should update the last block number', async () => { + await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 100, blockNumber: 1 }) + await updateDailyFilStats(pgClient, { to_address: 'address2', amount: 200, blockNumber: 2 }) + + const { rows } = await pgClient.query('SELECT last_block FROM reward_transfer_last_block') + assert.strictEqual(rows.length, 1) + assert.strictEqual(rows[0].last_block, 2) + }) }) const getCurrentDate = async () => { diff --git a/package-lock.json b/package-lock.json index 862cdc8e..dea64a54 100644 --- a/package-lock.json +++ b/package-lock.json @@ -6056,6 +6056,7 @@ "pg": "^8.11.5" }, "devDependencies": { + "@filecoin-station/spark-stats-db-migrations": "^1.0.0", "mocha": "^10.4.0", "spark-evaluate": "filecoin-station/spark-evaluate#main", "standard": "^17.1.0" diff --git a/stats/bin/spark-stats.js b/stats/bin/spark-stats.js index 5fa903db..f7f9b7e8 100644 --- a/stats/bin/spark-stats.js +++ b/stats/bin/spark-stats.js @@ -3,7 +3,7 @@ import http from 'node:http' import { once } from 'node:events' import pg from 'pg' import { createHandler } from '../lib/handler.js' -import { EVALUATE_DB_URL } from '../lib/config.js' +import { DATABASE_URL, EVALUATE_DB_URL } from '../lib/config.js' const { PORT = 8080, @@ -11,8 +11,7 @@ const { REQUEST_LOGGING = 'true' } = process.env -const pgPool = new pg.Pool({ - connectionString: EVALUATE_DB_URL, +const pgPoolConfig = { // allow the pool to close all connections and become empty min: 0, // this values should correlate with service concurrency hard_limit configured in fly.toml @@ -23,17 +22,31 @@ const pgPool = new pg.Pool({ idleTimeoutMillis: 1000, // automatically close connections older than 60 seconds maxLifetimeSeconds: 60 -}) - -pgPool.on('error', err => { +} +const pgPoolErrFn = err => { // Prevent crashing the process on idle client errors, the pool will recover // itself. If all connections are lost, the process will still crash. // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405 console.error('An idle client has experienced an error', err.stack) +} + +// Connect and set up the Evaluate DB +const pgPoolEvaluateDb = new pg.Pool({ + connectionString: EVALUATE_DB_URL, + ...pgPoolConfig }) +pgPoolEvaluateDb.on('error', pgPoolErrFn) +// Check that we can talk to the database +await pgPoolEvaluateDb.query('SELECT 1') +// Connect and set up the Stats DB +const pgPoolStatsDb = new pg.Pool({ + connectionString: DATABASE_URL, + ...pgPoolConfig +}) +pgPoolStatsDb.on('error', pgPoolErrFn) // Check that we can talk to the database -await pgPool.query('SELECT 1') +await pgPoolStatsDb.query('SELECT 1') const logger = { error: console.error, @@ -42,7 +55,8 @@ const logger = { } const handler = createHandler({ - pgPool, + pgPoolEvaluateDb, + pgPoolStatsDb, logger }) const server = http.createServer(handler) diff --git a/stats/lib/abi.json b/stats/lib/abi.json deleted file mode 100644 index 851fee03..00000000 --- a/stats/lib/abi.json +++ /dev/null @@ -1,763 +0,0 @@ -[ - { - "type": "constructor", - "inputs": [ - { - "internalType": "address", - "name": "admin", - "type": "address" - } - ] - }, - { - "type": "function", - "name": "DEFAULT_ADMIN_ROLE", - "inputs": [], - "outputs": [ - { - "internalType": "bytes32", - "name": "", - "type": "bytes32" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "EVALUATE_ROLE", - "inputs": [], - "outputs": [ - { - "internalType": "bytes32", - "name": "", - "type": "bytes32" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "MAX_SCORE", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "MEASURE_ROLE", - "inputs": [], - "outputs": [ - { - "internalType": "bytes32", - "name": "", - "type": "bytes32" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "addBalances", - "inputs": [ - { - "internalType": "address payable[]", - "name": "addresses", - "type": "address[]" - }, - { - "internalType": "uint256[]", - "name": "_balances", - "type": "uint256[]" - } - ], - "outputs": [], - "stateMutability": "payable" - }, - { - "type": "function", - "name": "addMeasurements", - "inputs": [ - { - "internalType": "string", - "name": "cid", - "type": "string" - } - ], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "adminAdvanceRound", - "inputs": [], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "availableBalance", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "balanceHeld", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "balances", - "inputs": [ - { - "internalType": "address", - "name": "", - "type": "address" - } - ], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "currentRoundEndBlockNumber", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "currentRoundIndex", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "currentRoundRoundReward", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "disableWithdraw", - "inputs": [], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "getRoleAdmin", - "inputs": [ - { - "internalType": "bytes32", - "name": "role", - "type": "bytes32" - } - ], - "outputs": [ - { - "internalType": "bytes32", - "name": "", - "type": "bytes32" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "grantRole", - "inputs": [ - { - "internalType": "bytes32", - "name": "role", - "type": "bytes32" - }, - { - "internalType": "address", - "name": "account", - "type": "address" - } - ], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "hasRole", - "inputs": [ - { - "internalType": "bytes32", - "name": "role", - "type": "bytes32" - }, - { - "internalType": "address", - "name": "account", - "type": "address" - } - ], - "outputs": [ - { - "internalType": "bool", - "name": "", - "type": "bool" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "maxTransfersPerTx", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "minBalanceForTransfer", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "nextRoundLength", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "participantCountReadyForTransfer", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "participantCountScheduledForTransfer", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "previousRoundIndex", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "previousRoundRoundReward", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "previousRoundTotalScores", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "readyForTransfer", - "inputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "outputs": [ - { - "internalType": "address payable", - "name": "", - "type": "address" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "releaseRewards", - "inputs": [], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "renounceRole", - "inputs": [ - { - "internalType": "bytes32", - "name": "role", - "type": "bytes32" - }, - { - "internalType": "address", - "name": "callerConfirmation", - "type": "address" - } - ], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "revokeRole", - "inputs": [ - { - "internalType": "bytes32", - "name": "role", - "type": "bytes32" - }, - { - "internalType": "address", - "name": "account", - "type": "address" - } - ], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "rewardsScheduledFor", - "inputs": [ - { - "internalType": "address", - "name": "participant", - "type": "address" - } - ], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "roundReward", - "inputs": [], - "outputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "scheduledForTransfer", - "inputs": [ - { - "internalType": "uint256", - "name": "", - "type": "uint256" - } - ], - "outputs": [ - { - "internalType": "address payable", - "name": "", - "type": "address" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "setMaxTransfersPerTx", - "inputs": [ - { - "internalType": "uint256", - "name": "_maxTransfersPerTx", - "type": "uint256" - } - ], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "setMinBalanceForTransfer", - "inputs": [ - { - "internalType": "uint256", - "name": "_minBalanceForTransfer", - "type": "uint256" - } - ], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "setNextRoundLength", - "inputs": [ - { - "internalType": "uint256", - "name": "_nextRoundLength", - "type": "uint256" - } - ], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "setRoundReward", - "inputs": [ - { - "internalType": "uint256", - "name": "_roundReward", - "type": "uint256" - } - ], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "setScores", - "inputs": [ - { - "internalType": "uint256", - "name": "roundIndex", - "type": "uint256" - }, - { - "internalType": "address payable[]", - "name": "addresses", - "type": "address[]" - }, - { - "internalType": "uint256[]", - "name": "scores", - "type": "uint256[]" - } - ], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "supportsInterface", - "inputs": [ - { - "internalType": "bytes4", - "name": "interfaceId", - "type": "bytes4" - } - ], - "outputs": [ - { - "internalType": "bool", - "name": "", - "type": "bool" - } - ], - "stateMutability": "view" - }, - { - "type": "function", - "name": "tick", - "inputs": [], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "withdraw", - "inputs": [ - { - "internalType": "address payable", - "name": "destination", - "type": "address" - } - ], - "outputs": [], - "stateMutability": "nonpayable" - }, - { - "type": "function", - "name": "withdrawDisabled", - "inputs": [], - "outputs": [ - { - "internalType": "bool", - "name": "", - "type": "bool" - } - ], - "stateMutability": "view" - }, - { - "type": "event", - "name": "MeasurementsAdded", - "inputs": [ - { - "name": "cid", - "type": "string", - "indexed": false - }, - { - "name": "roundIndex", - "type": "uint256", - "indexed": true - }, - { - "name": "sender", - "type": "address", - "indexed": true - } - ], - "anonymous": false - }, - { - "type": "event", - "name": "RoleAdminChanged", - "inputs": [ - { - "name": "role", - "type": "bytes32", - "indexed": true - }, - { - "name": "previousAdminRole", - "type": "bytes32", - "indexed": true - }, - { - "name": "newAdminRole", - "type": "bytes32", - "indexed": true - } - ], - "anonymous": false - }, - { - "type": "event", - "name": "RoleGranted", - "inputs": [ - { - "name": "role", - "type": "bytes32", - "indexed": true - }, - { - "name": "account", - "type": "address", - "indexed": true - }, - { - "name": "sender", - "type": "address", - "indexed": true - } - ], - "anonymous": false - }, - { - "type": "event", - "name": "RoleRevoked", - "inputs": [ - { - "name": "role", - "type": "bytes32", - "indexed": true - }, - { - "name": "account", - "type": "address", - "indexed": true - }, - { - "name": "sender", - "type": "address", - "indexed": true - } - ], - "anonymous": false - }, - { - "type": "event", - "name": "RoundStart", - "inputs": [ - { - "name": "roundIndex", - "type": "uint256", - "indexed": false - } - ], - "anonymous": false - }, - { - "type": "event", - "name": "Transfer", - "inputs": [ - { - "name": "to", - "type": "address", - "indexed": true - }, - { - "name": "amount", - "type": "uint256", - "indexed": false - } - ], - "anonymous": false - }, - { - "type": "event", - "name": "TransferFailed", - "inputs": [ - { - "name": "to", - "type": "address", - "indexed": true - }, - { - "name": "amount", - "type": "uint256", - "indexed": false - } - ], - "anonymous": false - }, - { - "type": "error", - "name": "AccessControlBadConfirmation", - "inputs": [] - }, - { - "type": "error", - "name": "AccessControlUnauthorizedAccount", - "inputs": [ - { - "internalType": "address", - "name": "account", - "type": "address" - }, - { - "internalType": "bytes32", - "name": "neededRole", - "type": "bytes32" - } - ] - }, - { - "type": "receive" - } - ] - \ No newline at end of file diff --git a/stats/lib/handler.js b/stats/lib/handler.js index 0865da91..d8b96a78 100644 --- a/stats/lib/handler.js +++ b/stats/lib/handler.js @@ -14,18 +14,20 @@ import { handlePlatformRoutes } from './platform-routes.js' /** * @param {object} args - * @param {import('pg').Pool} args.pgPool + * @param {import('pg').Pool} args.pgPoolEvaluateDb + * @param {import('pg').Pool} args.pgPoolStatsDb * @param {import('./typings').Logger} args.logger * @returns */ export const createHandler = ({ - pgPool, + pgPoolEvaluateDb, + pgPoolStatsDb, logger }) => { return (req, res) => { const start = new Date() logger.request(`${req.method} ${req.url} ...`) - handler(req, res, pgPool) + handler(req, res, pgPoolEvaluateDb, pgPoolStatsDb) .catch(err => errorHandler(res, err, logger)) .then(() => { logger.request(`${req.method} ${req.url} ${res.statusCode} (${new Date() - start}ms)`) @@ -36,9 +38,10 @@ export const createHandler = ({ /** * @param {import('node:http').IncomingMessage} req * @param {import('node:http').ServerResponse} res - * @param {import('pg').Pool} pgPool + * @param {import('pg').Pool} pgPoolEvaluateDb + * @param {import('pg').Pool} pgPoolStatsDb */ -const handler = async (req, res, pgPool) => { +const handler = async (req, res, pgPoolEvaluateDb, pgPoolStatsDb) => { // Caveat! `new URL('//foo', 'http://127.0.0.1')` would produce "http://foo/" - not what we want! const { pathname, searchParams } = new URL(`http://127.0.0.1${req.url}`) const segs = pathname.split('/').filter(Boolean) @@ -57,10 +60,10 @@ const handler = async (req, res, pgPool) => { pathname, searchParams, res, - pgPool, + pgPoolEvaluateDb, fetchStatsFn ) - } else if (await handlePlatformRoutes(req, res, pgPool)) { + } else if (await handlePlatformRoutes(req, res, pgPoolEvaluateDb, pgPoolStatsDb)) { // no-op, request was handled by handlePlatformRoute } else { notFound(res) diff --git a/stats/lib/platform-routes.js b/stats/lib/platform-routes.js index 23fb38c1..3f1bc469 100644 --- a/stats/lib/platform-routes.js +++ b/stats/lib/platform-routes.js @@ -6,26 +6,38 @@ import { fetchDailyStationAcceptedMeasurementCount } from './platform-stats-fetchers.js' -export const handlePlatformRoutes = async (req, res, pgPool) => { +export const handlePlatformRoutes = async (req, res, pgPoolEvaluateDb, pgPoolStatsDb) => { // Caveat! `new URL('//foo', 'http://127.0.0.1')` would produce "http://foo/" - not what we want! const { pathname, searchParams } = new URL(`http://127.0.0.1${req.url}`) const segs = pathname.split('/').filter(Boolean) - const fetchFunctionMap = { - 'stations/daily': fetchDailyStationCount, - 'stations/monthly': fetchMonthlyStationCount, - 'measurements/daily': fetchDailyStationAcceptedMeasurementCount, - 'fil/daily': fetchDailyFilSent + const routeHandlerInfoMap = { + 'stations/daily': { + fetchFunction: fetchDailyStationCount, + pgPool: pgPoolEvaluateDb + }, + 'stations/monthly': { + fetchFunction: fetchMonthlyStationCount, + pgPool: pgPoolEvaluateDb + }, + 'measurements/daily': { + fetchFunction: fetchDailyStationAcceptedMeasurementCount, + pgPool: pgPoolEvaluateDb + }, + 'fil/daily': { + fetchFunction: fetchDailyFilSent, + pgPool: pgPoolStatsDb + } } - const fetchStatsFn = fetchFunctionMap[segs.join('/')] - if (req.method === 'GET' && fetchStatsFn) { + const routeHandlerInfo = routeHandlerInfoMap[segs.join('/')] + if (req.method === 'GET' && routeHandlerInfo) { await getStatsWithFilterAndCaching( pathname, searchParams, res, - pgPool, - fetchStatsFn + routeHandlerInfo.pgPool, + routeHandlerInfo.fetchFunction ) return true } else if (req.method === 'GET' && segs.length === 0) { diff --git a/stats/lib/platform-stats-fetchers.js b/stats/lib/platform-stats-fetchers.js index 68eda328..af953647 100644 --- a/stats/lib/platform-stats-fetchers.js +++ b/stats/lib/platform-stats-fetchers.js @@ -44,7 +44,7 @@ export const fetchDailyStationAcceptedMeasurementCount = async (pgPool, filter) export const fetchDailyFilSent = async (pgPool, filter) => { const { rows } = await pgPool.query(` SELECT day::TEXT, SUM(amount) as amount - FROM daily_fil + FROM daily_reward_transfers WHERE day >= $1 AND day <= $2 GROUP BY day ORDER BY day diff --git a/stats/lib/platform-stats-generator.js b/stats/lib/platform-stats-generator.js deleted file mode 100644 index 9e8fad7c..00000000 --- a/stats/lib/platform-stats-generator.js +++ /dev/null @@ -1,14 +0,0 @@ -/** - * @param {import('pg').Client} pgClient - * @param {Object} filEvent - */ -export const updateDailyFilStats = async (pgClient, filEvent) => { - console.log('Event:', filEvent) - - await pgClient.query(` - INSERT INTO daily_fil (day, to_address, amount) - VALUES (now(), $1, $2) - ON CONFLICT (day, to_address) DO UPDATE - SET amount = daily_fil.amount + EXCLUDED.amount - `, [filEvent.to_address, filEvent.amount]) -} diff --git a/stats/test/handler.test.js b/stats/test/handler.test.js index 88257cb9..2bf0fd44 100644 --- a/stats/test/handler.test.js +++ b/stats/test/handler.test.js @@ -21,10 +21,12 @@ describe('HTTP request handler', () => { let baseUrl before(async () => { + // handler doesn't use Stats DB pgPool = new pg.Pool({ connectionString: EVALUATE_DB_URL }) const handler = createHandler({ - pgPool, + pgPoolEvaluateDb: pgPool, + pgPoolStatsDb: undefined, logger: { info: debug, error: console.error, diff --git a/stats/test/platform-routes.test.js b/stats/test/platform-routes.test.js index d37928eb..7ecb44eb 100644 --- a/stats/test/platform-routes.test.js +++ b/stats/test/platform-routes.test.js @@ -6,23 +6,27 @@ import createDebug from 'debug' import { assertResponseStatus } from './test-helpers.js' import { createHandler } from '../lib/handler.js' -import { EVALUATE_DB_URL } from '../lib/config.js' +import { DATABASE_URL, EVALUATE_DB_URL } from '../lib/config.js' const debug = createDebug('test') describe('Platform Routes HTTP request handler', () => { /** @type {pg.Pool} */ - let pgPool + let pgPoolEvaluateDb + /** @type {pg.Pool} */ + let pgPoolStatsDb /** @type {http.Server} */ let server /** @type {string} */ let baseUrl before(async () => { - pgPool = new pg.Pool({ connectionString: EVALUATE_DB_URL }) + pgPoolEvaluateDb = new pg.Pool({ connectionString: EVALUATE_DB_URL }) + pgPoolStatsDb = new pg.Pool({ connectionString: DATABASE_URL }) const handler = createHandler({ - pgPool, + pgPoolEvaluateDb, + pgPoolStatsDb, logger: { info: debug, error: console.error, @@ -39,27 +43,28 @@ describe('Platform Routes HTTP request handler', () => { after(async () => { server.closeAllConnections() server.close() - await pgPool.end() + await pgPoolEvaluateDb.end() + await pgPoolStatsDb.end() }) beforeEach(async () => { - await pgPool.query('DELETE FROM daily_stations') - await pgPool.query('DELETE FROM daily_fil') + await pgPoolEvaluateDb.query('DELETE FROM daily_stations') + await pgPoolStatsDb.query('DELETE FROM daily_reward_transfers') }) describe('GET /stations/daily', () => { it('returns daily station metrics for the given date range', async () => { - await givenDailyStationMetrics(pgPool, '2024-01-10', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-10', [ { station_id: 'station1', accepted_measurement_count: 1 } ]) - await givenDailyStationMetrics(pgPool, '2024-01-11', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-11', [ { station_id: 'station2', accepted_measurement_count: 1 } ]) - await givenDailyStationMetrics(pgPool, '2024-01-12', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-12', [ { station_id: 'station2', accepted_measurement_count: 2 }, { station_id: 'station3', accepted_measurement_count: 1 } ]) - await givenDailyStationMetrics(pgPool, '2024-01-13', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-13', [ { station_id: 'station1', accepted_measurement_count: 1 } ]) @@ -83,25 +88,25 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /stations/monthly', () => { it('returns monthly station metrics for the given date range ignoring the day number', async () => { // before the date range - await givenDailyStationMetrics(pgPool, '2023-12-31', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2023-12-31', [ { station_id: 'station1', accepted_measurement_count: 1 } ]) // in the date range - await givenDailyStationMetrics(pgPool, '2024-01-10', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-10', [ { station_id: 'station1', accepted_measurement_count: 1 } ]) - await givenDailyStationMetrics(pgPool, '2024-01-11', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-11', [ { station_id: 'station2', accepted_measurement_count: 1 } ]) - await givenDailyStationMetrics(pgPool, '2024-01-12', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-12', [ { station_id: 'station2', accepted_measurement_count: 2 }, { station_id: 'station3', accepted_measurement_count: 1 } ]) - await givenDailyStationMetrics(pgPool, '2024-02-13', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-02-13', [ { station_id: 'station1', accepted_measurement_count: 1 } ]) // after the date range - await givenDailyStationMetrics(pgPool, '2024-03-01', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-03-01', [ { station_id: 'station1', accepted_measurement_count: 1 } ]) @@ -124,17 +129,17 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /measurements/daily', () => { it('returns daily total accepted measurement count for the given date range', async () => { - await givenDailyStationMetrics(pgPool, '2024-01-10', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-10', [ { station_id: 'station1', accepted_measurement_count: 1 } ]) - await givenDailyStationMetrics(pgPool, '2024-01-11', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-11', [ { station_id: 'station2', accepted_measurement_count: 1 } ]) - await givenDailyStationMetrics(pgPool, '2024-01-12', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-12', [ { station_id: 'station2', accepted_measurement_count: 2 }, { station_id: 'station3', accepted_measurement_count: 1 } ]) - await givenDailyStationMetrics(pgPool, '2024-01-13', [ + await givenDailyStationMetrics(pgPoolEvaluateDb, '2024-01-13', [ { station_id: 'station1', accepted_measurement_count: 1 } ]) @@ -157,17 +162,17 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /fil/daily', () => { it('returns daily total FIL sent for the given date range', async () => { - await givenDailyFilMetrics(pgPool, '2024-01-10', [ + await givenDailyFilMetrics(pgPoolStatsDb, '2024-01-10', [ { to_address: 'to1', amount: 100 } ]) - await givenDailyFilMetrics(pgPool, '2024-01-11', [ + await givenDailyFilMetrics(pgPoolStatsDb, '2024-01-11', [ { to_address: 'to2', amount: 150 } ]) - await givenDailyFilMetrics(pgPool, '2024-01-12', [ + await givenDailyFilMetrics(pgPoolStatsDb, '2024-01-12', [ { to_address: 'to2', amount: 300 }, { to_address: 'to3', amount: 250 } ]) - await givenDailyFilMetrics(pgPool, '2024-01-13', [ + await givenDailyFilMetrics(pgPoolStatsDb, '2024-01-13', [ { to_address: 'to1', amount: 100 } ]) @@ -189,8 +194,8 @@ describe('Platform Routes HTTP request handler', () => { }) }) -const givenDailyStationMetrics = async (pgPool, day, stationStats) => { - await pgPool.query(` +const givenDailyStationMetrics = async (pgPoolEvaluateDb, day, stationStats) => { + await pgPoolEvaluateDb.query(` INSERT INTO daily_stations (day, station_id, accepted_measurement_count) SELECT $1 AS day, UNNEST($2::text[]) AS station_id, UNNEST($3::int[]) AS accepted_measurement_count ON CONFLICT DO NOTHING @@ -201,9 +206,9 @@ const givenDailyStationMetrics = async (pgPool, day, stationStats) => { ]) } -const givenDailyFilMetrics = async (pgPool, day, filStats) => { - await pgPool.query(` - INSERT INTO daily_fil (day, to_address, amount) +const givenDailyFilMetrics = async (pgPoolStatsDb, day, filStats) => { + await pgPoolStatsDb.query(` + INSERT INTO daily_reward_transfers (day, to_address, amount) SELECT $1 AS day, UNNEST($2::text[]) AS to_address, UNNEST($3::int[]) AS amount ON CONFLICT DO NOTHING `, [ From a819bcbcc71fcc11a4333753420668e669798f04 Mon Sep 17 00:00:00 2001 From: PatrickNercessian Date: Thu, 30 May 2024 15:27:47 -0400 Subject: [PATCH 04/13] migrating db before observer tests --- .../test/platform-stats-generator.test.js | 42 +++++++++---------- observer/test/smoke.test.js | 23 ---------- 2 files changed, 20 insertions(+), 45 deletions(-) delete mode 100644 observer/test/smoke.test.js diff --git a/observer/test/platform-stats-generator.test.js b/observer/test/platform-stats-generator.test.js index ea8f8531..6629cc0f 100644 --- a/observer/test/platform-stats-generator.test.js +++ b/observer/test/platform-stats-generator.test.js @@ -4,44 +4,42 @@ import { beforeEach, describe, it } from 'mocha' import { DATABASE_URL } from '../lib/config.js' import { updateDailyFilStats } from '../lib/platform-stats-generator.js' - -const createPgClient = async () => { - const pgClient = new pg.Client({ connectionString: DATABASE_URL }) - await pgClient.connect() - return pgClient -} +import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations' describe('platform-stats-generator', () => { - let pgClient + /** @type {pg.Pool} */ + let pgPool + before(async () => { - pgClient = await createPgClient() + pgPool = new pg.Pool({ connectionString: DATABASE_URL }) + await migrateWithPgClient(pgPool) }) let today beforeEach(async () => { - await pgClient.query('DELETE FROM daily_reward_transfers') + await pgPool.query('DELETE FROM daily_reward_transfers') // Run all tests inside a transaction to ensure `now()` always returns the same value // See https://dba.stackexchange.com/a/63549/125312 // This avoids subtle race conditions when the tests are executed around midnight. - await pgClient.query('BEGIN TRANSACTION') + await pgPool.query('BEGIN TRANSACTION') today = await getCurrentDate() }) afterEach(async () => { - await pgClient.query('END TRANSACTION') + await pgPool.query('END TRANSACTION') }) after(async () => { - await pgClient.end() + await pgPool.end() }) describe('updateDailyFilStats', () => { it('should correctly update daily FIL stats with new transfer events', async () => { - await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 100, blockNumber: 1 }) - await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 200, blockNumber: 1 }) + await updateDailyFilStats(pgPool, { to_address: 'address1', amount: 100, blockNumber: 1 }) + await updateDailyFilStats(pgPool, { to_address: 'address1', amount: 200, blockNumber: 1 }) - const { rows } = await pgClient.query(` + const { rows } = await pgPool.query(` SELECT day::TEXT, to_address, amount FROM daily_reward_transfers WHERE to_address = $1 `, ['address1']) @@ -50,10 +48,10 @@ describe('platform-stats-generator', () => { }) it('should handle multiple addresses in daily FIL stats', async () => { - await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 50, blockNumber: 1 }) - await updateDailyFilStats(pgClient, { to_address: 'address2', amount: 150, blockNumber: 1 }) + await updateDailyFilStats(pgPool, { to_address: 'address1', amount: 50, blockNumber: 1 }) + await updateDailyFilStats(pgPool, { to_address: 'address2', amount: 150, blockNumber: 1 }) - const { rows } = await pgClient.query(` + const { rows } = await pgPool.query(` SELECT day::TEXT, to_address, amount FROM daily_reward_transfers ORDER BY to_address `) @@ -66,17 +64,17 @@ describe('platform-stats-generator', () => { }) it('should update the last block number', async () => { - await updateDailyFilStats(pgClient, { to_address: 'address1', amount: 100, blockNumber: 1 }) - await updateDailyFilStats(pgClient, { to_address: 'address2', amount: 200, blockNumber: 2 }) + await updateDailyFilStats(pgPool, { to_address: 'address1', amount: 100, blockNumber: 1 }) + await updateDailyFilStats(pgPool, { to_address: 'address2', amount: 200, blockNumber: 2 }) - const { rows } = await pgClient.query('SELECT last_block FROM reward_transfer_last_block') + const { rows } = await pgPool.query('SELECT last_block FROM reward_transfer_last_block') assert.strictEqual(rows.length, 1) assert.strictEqual(rows[0].last_block, 2) }) }) const getCurrentDate = async () => { - const { rows: [{ today }] } = await pgClient.query('SELECT now()::DATE::TEXT as today') + const { rows: [{ today }] } = await pgPool.query('SELECT now()::DATE::TEXT as today') return today } }) diff --git a/observer/test/smoke.test.js b/observer/test/smoke.test.js deleted file mode 100644 index b3b45f60..00000000 --- a/observer/test/smoke.test.js +++ /dev/null @@ -1,23 +0,0 @@ -// TODO: remove this file once we have real tests in place - -import { DATABASE_URL } from '../lib/config.js' -import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations' -import pg from 'pg' - -describe('spark-observer', () => { - /** @type {pg.Pool} */ - let pgPool - - before(async () => { - pgPool = new pg.Pool({ connectionString: DATABASE_URL }) - await migrateWithPgClient(pgPool) - }) - - after(async () => { - await pgPool.end() - }) - - it('works', async () => { - await import('../index.js') - }) -}) From 01e78d621058488c2f6c5918d3cee8f86f498f76 Mon Sep 17 00:00:00 2001 From: PatrickNercessian Date: Mon, 3 Jun 2024 17:50:37 -0400 Subject: [PATCH 05/13] Run in loop, modularize code, add dry-run --- migrations/001.do.daily-reward-transfers.sql | 2 +- observer/bin/dry-run.js | 24 ++++++ observer/bin/spark-observer.js | 74 +++---------------- observer/lib/config.js | 7 +- observer/lib/db.js | 34 +++++++++ observer/lib/observer.js | 47 ++++++++++++ observer/lib/platform-stats-generator.js | 23 ------ observer/lib/platform-stats.js | 15 ++++ ...nerator.test.js => platform-stats.test.js} | 25 ++----- 9 files changed, 146 insertions(+), 105 deletions(-) create mode 100644 observer/bin/dry-run.js create mode 100644 observer/lib/db.js create mode 100644 observer/lib/observer.js delete mode 100644 observer/lib/platform-stats-generator.js create mode 100644 observer/lib/platform-stats.js rename observer/test/{platform-stats-generator.test.js => platform-stats.test.js} (62%) diff --git a/migrations/001.do.daily-reward-transfers.sql b/migrations/001.do.daily-reward-transfers.sql index 8f1dc1b0..cacff3e8 100644 --- a/migrations/001.do.daily-reward-transfers.sql +++ b/migrations/001.do.daily-reward-transfers.sql @@ -1,7 +1,7 @@ CREATE TABLE daily_reward_transfers ( day DATE NOT NULL, to_address TEXT NOT NULL, - amount BIGINT NOT NULL, + amount NUMERIC NOT NULL, PRIMARY KEY (day, to_address) ); diff --git a/observer/bin/dry-run.js b/observer/bin/dry-run.js new file mode 100644 index 00000000..ca6cf457 --- /dev/null +++ b/observer/bin/dry-run.js @@ -0,0 +1,24 @@ +import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' +import { ethers } from 'ethers' + +import { RPC_URL, rpcHeaders } from '../lib/config.js' +import { observeTransferEvents } from '../lib/observer.js' +import { getPgPool } from '../lib/db.js' + +/** @type {pg.Pool} */ +const pgPool = await getPgPool() + +const fetchRequest = new ethers.FetchRequest(RPC_URL) +fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '') +const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true }) + +const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider) + +await pgPool.query('DELETE FROM daily_reward_transfers') +await pgPool.query('DELETE FROM reward_transfer_last_block') +// Set the last block to -800 to simulate the observer starting from the beginning +await pgPool.query('INSERT INTO reward_transfer_last_block (last_block) VALUES (-800)') + +await observeTransferEvents(pgPool, ieContract, provider) + +await pgPool.end() diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js index 3383ffad..89367afa 100644 --- a/observer/bin/spark-observer.js +++ b/observer/bin/spark-observer.js @@ -1,70 +1,20 @@ import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' -import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations' import { ethers } from 'ethers' -import pg from 'pg' -import { RPC_URL, DATABASE_URL, rpcHeaders } from '../lib/config.js' -import { updateDailyFilStats } from '../lib/platform-stats-generator.js' -// TODO: move this to a different file -const fetchRequest = new ethers.FetchRequest(RPC_URL) -fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '') -const provider = new ethers.JsonRpcProvider( - fetchRequest, - null, - { polling: true } -) - -const ieContract = new ethers.Contract( - SparkImpactEvaluator.ADDRESS, - SparkImpactEvaluator.ABI, - provider -) - -const pgPool = new pg.Pool({ - connectionString: DATABASE_URL, - // allow the pool to close all connections and become empty - min: 0, - // this values should correlate with service concurrency hard_limit configured in fly.toml - // and must take into account the connection limit of our PG server, see - // https://fly.io/docs/postgres/managing/configuration-tuning/ - max: 100, - // close connections that haven't been used for one second - idleTimeoutMillis: 1000, - // automatically close connections older than 60 seconds - maxLifetimeSeconds: 60 -}) +import { RPC_URL, rpcHeaders, OBSERVATION_INTERVAL_MS } from '../lib/config.js' +import { getPgPool } from '../lib/db.js' +import { observeTransferEvents } from '../lib/observer.js' -pgPool.on('error', err => { - // Prevent crashing the process on idle client errors, the pool will recover - // itself. If all connections are lost, the process will still crash. - // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405 - console.error('An idle client has experienced an error', err.stack) -}) +const pgPool = await getPgPool() -await migrateWithPgClient(pgPool) - -// Check that we can talk to the database -await pgPool.query('SELECT 1') - -console.log('Listening for impact evaluator events') +const fetchRequest = new ethers.FetchRequest(RPC_URL) +fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '') +const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true }) -// Get the last block we checked. Even though there should be only one row, use MAX just to be safe -const lastCheckedBlock = await pgPool.query( - 'SELECT MAX(last_block) AS last_block FROM reward_transfer_last_block' -).then(res => res.rows[0].last_block) +const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider) // Listen for Transfer events from the IE contract -ieContract.queryFilter(ieContract.filters.Transfer(), lastCheckedBlock) - .then(events => { - for (const event of events) { - console.log('%s FIL to %s at block %s', event.args.amount, event.args.to, event.blockNumber) - updateDailyFilStats( - pgPool, - { - to_address: event.args.to, - amount: event.args.amount, - blockNumber: event.blockNumber - } - ) - } - }) +while (true) { + observeTransferEvents(pgPool, ieContract, provider) + await new Promise(resolve => setTimeout(resolve, OBSERVATION_INTERVAL_MS)) +} diff --git a/observer/lib/config.js b/observer/lib/config.js index 89c1fde6..4bfb0f01 100644 --- a/observer/lib/config.js +++ b/observer/lib/config.js @@ -5,7 +5,9 @@ const { RPC_URLS = 'https://api.node.glif.io/rpc/v0', GLIF_TOKEN, // DATABASE_URL points to `spark_stats` database managed by this monorepo - DATABASE_URL = 'postgres://localhost:5432/spark_stats' + DATABASE_URL = 'postgres://localhost:5432/spark_stats', + // Sleep one hour between observations + OBSERVATION_INTERVAL_MS = 1000 * 60 * 60 } = process.env const rpcUrls = RPC_URLS.split(',') @@ -20,5 +22,6 @@ if (RPC_URL.includes('glif')) { export { RPC_URL, DATABASE_URL, - rpcHeaders + rpcHeaders, + OBSERVATION_INTERVAL_MS } diff --git a/observer/lib/db.js b/observer/lib/db.js new file mode 100644 index 00000000..1a74dd17 --- /dev/null +++ b/observer/lib/db.js @@ -0,0 +1,34 @@ +import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations' +import pg from 'pg' + +import { DATABASE_URL } from '../lib/config.js' + +export const getPgPool = async () => { + const pgPool = new pg.Pool({ + connectionString: DATABASE_URL, + // allow the pool to close all connections and become empty + min: 0, + // this values should correlate with service concurrency hard_limit configured in fly.toml + // and must take into account the connection limit of our PG server, see + // https://fly.io/docs/postgres/managing/configuration-tuning/ + max: 100, + // close connections that haven't been used for one second + idleTimeoutMillis: 1000, + // automatically close connections older than 60 seconds + maxLifetimeSeconds: 60 + }) + + pgPool.on('error', err => { + // Prevent crashing the process on idle client errors, the pool will recover + // itself. If all connections are lost, the process will still crash. + // https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405 + console.error('An idle client has experienced an error', err.stack) + }) + + await migrateWithPgClient(pgPool) + + // Check that we can talk to the database + await pgPool.query('SELECT 1') + + return pgPool +} diff --git a/observer/lib/observer.js b/observer/lib/observer.js new file mode 100644 index 00000000..6a08e3ad --- /dev/null +++ b/observer/lib/observer.js @@ -0,0 +1,47 @@ +import { updateDailyTransferStats } from './platform-stats.js' + +/** + * Observe the transfer events on the Filecoin blockchain + * @param {import('pg').Pool} pgPool + * @param {import('ethers').Contract} ieContract + * @param {import('ethers').Provider} provider + */ +export const observeTransferEvents = async (pgPool, ieContract, provider) => { + // Get the last checked block. Even though there should be only one row, use MAX just to be safe + const lastCheckedBlock = await pgPool.query( + 'SELECT MAX(last_block) AS last_block FROM reward_transfer_last_block' + ).then(res => res.rows[0].last_block) + + console.log('Querying impact evaluator Transfer events after block', lastCheckedBlock) + let events + try { + events = await ieContract.queryFilter(ieContract.filters.Transfer(), lastCheckedBlock) + } catch (error) { + console.error('Error querying impact evaluator Transfer events', error) + if (error.message.includes('bad tipset height')) { + console.log('Block number too old, GLIF only provides last 2000 blocks, querying from there') + events = await ieContract.queryFilter(ieContract.filters.Transfer(), -1999) + } else { + throw error + } + } + console.log(`Found ${events.length} Transfer events`) + for (const event of events) { + const transferEvent = { + to_address: event.args.to, + amount: event.args.amount + } + console.log('Transfer event:', transferEvent) + await updateDailyTransferStats(pgPool, transferEvent) + } + + // Get the current block number and update the last_block in reward_transfer_last_block table + // For safety, only update if the new block number is greater than the existing one + const blockNumber = await provider.getBlockNumber() + console.log('Current block number:', blockNumber) + await pgPool.query(` + UPDATE reward_transfer_last_block + SET last_block = $1 + WHERE $1 > last_block + `, [blockNumber]) +} diff --git a/observer/lib/platform-stats-generator.js b/observer/lib/platform-stats-generator.js deleted file mode 100644 index d70e5cbe..00000000 --- a/observer/lib/platform-stats-generator.js +++ /dev/null @@ -1,23 +0,0 @@ -/** - * @param {import('pg').Client} pgClient - * @param {Object} filEvent - * @param {string} filEvent.to_address - * @param {number} filEvent.amount - * @param {number} filEvent.blockNumber - */ -export const updateDailyFilStats = async (pgClient, filEvent) => { - await pgClient.query(` - INSERT INTO daily_reward_transfers (day, to_address, amount) - VALUES (now(), $1, $2) - ON CONFLICT (day, to_address) DO UPDATE - SET amount = daily_reward_transfers.amount + EXCLUDED.amount - `, [filEvent.to_address, filEvent.amount]) - - // Update the last_block in reward_transfer_last_block table - // For safety, only update if the new block number is greater than the existing one - await pgClient.query(` - UPDATE reward_transfer_last_block - SET last_block = $1 - WHERE $1 > last_block - `, [filEvent.blockNumber]) -} diff --git a/observer/lib/platform-stats.js b/observer/lib/platform-stats.js new file mode 100644 index 00000000..ec648097 --- /dev/null +++ b/observer/lib/platform-stats.js @@ -0,0 +1,15 @@ +/** + * @param {import('pg').Client} pgClient + * @param {Object} transferEvent + * @param {string} transferEvent.to_address + * @param {number} transferEvent.amount + * @param {number} transferEvent.blockNumber + */ +export const updateDailyTransferStats = async (pgClient, transferEvent) => { + await pgClient.query(` + INSERT INTO daily_reward_transfers (day, to_address, amount) + VALUES (now(), $1, $2) + ON CONFLICT (day, to_address) DO UPDATE + SET amount = daily_reward_transfers.amount + EXCLUDED.amount + `, [transferEvent.to_address, transferEvent.amount]) +} diff --git a/observer/test/platform-stats-generator.test.js b/observer/test/platform-stats.test.js similarity index 62% rename from observer/test/platform-stats-generator.test.js rename to observer/test/platform-stats.test.js index 6629cc0f..493288fb 100644 --- a/observer/test/platform-stats-generator.test.js +++ b/observer/test/platform-stats.test.js @@ -3,7 +3,7 @@ import pg from 'pg' import { beforeEach, describe, it } from 'mocha' import { DATABASE_URL } from '../lib/config.js' -import { updateDailyFilStats } from '../lib/platform-stats-generator.js' +import { updateDailyTransferStats } from '../lib/platform-stats.js' import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations' describe('platform-stats-generator', () => { @@ -34,10 +34,10 @@ describe('platform-stats-generator', () => { await pgPool.end() }) - describe('updateDailyFilStats', () => { - it('should correctly update daily FIL stats with new transfer events', async () => { - await updateDailyFilStats(pgPool, { to_address: 'address1', amount: 100, blockNumber: 1 }) - await updateDailyFilStats(pgPool, { to_address: 'address1', amount: 200, blockNumber: 1 }) + describe('updateDailyTransferStats', () => { + it('should correctly update daily Transfer stats with new transfer events', async () => { + await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 100, blockNumber: 1 }) + await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 200, blockNumber: 1 }) const { rows } = await pgPool.query(` SELECT day::TEXT, to_address, amount FROM daily_reward_transfers @@ -47,9 +47,9 @@ describe('platform-stats-generator', () => { assert.deepStrictEqual(rows, [{ day: today, to_address: 'address1', amount: '300' }]) }) - it('should handle multiple addresses in daily FIL stats', async () => { - await updateDailyFilStats(pgPool, { to_address: 'address1', amount: 50, blockNumber: 1 }) - await updateDailyFilStats(pgPool, { to_address: 'address2', amount: 150, blockNumber: 1 }) + it('should handle multiple addresses in daily Transfer stats', async () => { + await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 50, blockNumber: 1 }) + await updateDailyTransferStats(pgPool, { to_address: 'address2', amount: 150, blockNumber: 1 }) const { rows } = await pgPool.query(` SELECT day::TEXT, to_address, amount FROM daily_reward_transfers @@ -62,15 +62,6 @@ describe('platform-stats-generator', () => { { day: today, to_address: 'address2', amount: '150' } ]) }) - - it('should update the last block number', async () => { - await updateDailyFilStats(pgPool, { to_address: 'address1', amount: 100, blockNumber: 1 }) - await updateDailyFilStats(pgPool, { to_address: 'address2', amount: 200, blockNumber: 2 }) - - const { rows } = await pgPool.query('SELECT last_block FROM reward_transfer_last_block') - assert.strictEqual(rows.length, 1) - assert.strictEqual(rows[0].last_block, 2) - }) }) const getCurrentDate = async () => { From 7be3dcc4a2f034dacb836f3475d67936f6c97f7c Mon Sep 17 00:00:00 2001 From: PatrickNercessian Date: Mon, 3 Jun 2024 18:01:19 -0400 Subject: [PATCH 06/13] fix any missed renames for consistency --- stats/lib/platform-routes.js | 6 +++--- stats/lib/platform-stats-fetchers.js | 2 +- stats/test/platform-routes.test.js | 20 ++++++++++---------- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/stats/lib/platform-routes.js b/stats/lib/platform-routes.js index 3f1bc469..e2fcf434 100644 --- a/stats/lib/platform-routes.js +++ b/stats/lib/platform-routes.js @@ -2,7 +2,7 @@ import { getStatsWithFilterAndCaching } from './request-helpers.js' import { fetchDailyStationCount, fetchMonthlyStationCount, - fetchDailyFilSent, + fetchDailyRewardTransfers, fetchDailyStationAcceptedMeasurementCount } from './platform-stats-fetchers.js' @@ -24,8 +24,8 @@ export const handlePlatformRoutes = async (req, res, pgPoolEvaluateDb, pgPoolSta fetchFunction: fetchDailyStationAcceptedMeasurementCount, pgPool: pgPoolEvaluateDb }, - 'fil/daily': { - fetchFunction: fetchDailyFilSent, + 'transfers/daily': { + fetchFunction: fetchDailyRewardTransfers, pgPool: pgPoolStatsDb } } diff --git a/stats/lib/platform-stats-fetchers.js b/stats/lib/platform-stats-fetchers.js index af953647..fc68ddbd 100644 --- a/stats/lib/platform-stats-fetchers.js +++ b/stats/lib/platform-stats-fetchers.js @@ -41,7 +41,7 @@ export const fetchDailyStationAcceptedMeasurementCount = async (pgPool, filter) return rows } -export const fetchDailyFilSent = async (pgPool, filter) => { +export const fetchDailyRewardTransfers = async (pgPool, filter) => { const { rows } = await pgPool.query(` SELECT day::TEXT, SUM(amount) as amount FROM daily_reward_transfers diff --git a/stats/test/platform-routes.test.js b/stats/test/platform-routes.test.js index 7ecb44eb..24eba0e2 100644 --- a/stats/test/platform-routes.test.js +++ b/stats/test/platform-routes.test.js @@ -160,25 +160,25 @@ describe('Platform Routes HTTP request handler', () => { }) }) - describe('GET /fil/daily', () => { - it('returns daily total FIL sent for the given date range', async () => { - await givenDailyFilMetrics(pgPoolStatsDb, '2024-01-10', [ + describe('GET /transfers/daily', () => { + it('returns daily total Rewards sent for the given date range', async () => { + await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-10', [ { to_address: 'to1', amount: 100 } ]) - await givenDailyFilMetrics(pgPoolStatsDb, '2024-01-11', [ + await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-11', [ { to_address: 'to2', amount: 150 } ]) - await givenDailyFilMetrics(pgPoolStatsDb, '2024-01-12', [ + await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-12', [ { to_address: 'to2', amount: 300 }, { to_address: 'to3', amount: 250 } ]) - await givenDailyFilMetrics(pgPoolStatsDb, '2024-01-13', [ + await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-13', [ { to_address: 'to1', amount: 100 } ]) const res = await fetch( new URL( - '/fil/daily?from=2024-01-11&to=2024-01-12', + '/transfers/daily?from=2024-01-11&to=2024-01-12', baseUrl ), { redirect: 'manual' @@ -206,14 +206,14 @@ const givenDailyStationMetrics = async (pgPoolEvaluateDb, day, stationStats) => ]) } -const givenDailyFilMetrics = async (pgPoolStatsDb, day, filStats) => { +const givenDailyRewardTransferMetrics = async (pgPoolStatsDb, day, transferStats) => { await pgPoolStatsDb.query(` INSERT INTO daily_reward_transfers (day, to_address, amount) SELECT $1 AS day, UNNEST($2::text[]) AS to_address, UNNEST($3::int[]) AS amount ON CONFLICT DO NOTHING `, [ day, - filStats.map(s => s.to_address), - filStats.map(s => s.amount) + transferStats.map(s => s.to_address), + transferStats.map(s => s.amount) ]) } From 3939e8086fa6a0e472515946519f512c76283d07 Mon Sep 17 00:00:00 2001 From: PatrickNercessian Date: Thu, 6 Jun 2024 10:43:47 -0400 Subject: [PATCH 07/13] Loop error handling and transition to single-table approach --- migrations/001.do.daily-reward-transfers.sql | 11 ---------- migrations/001.do.sql | 1 + migrations/002.do.daily-reward-transfers.sql | 10 +++++++++ observer/bin/dry-run.js | 3 --- observer/bin/spark-observer.js | 8 ++++++- observer/lib/observer.js | 23 ++++++-------------- observer/lib/platform-stats.js | 15 +++++++------ observer/test/platform-stats.test.js | 14 ++++++------ stats/test/platform-routes.test.js | 17 ++++++++------- 9 files changed, 49 insertions(+), 53 deletions(-) delete mode 100644 migrations/001.do.daily-reward-transfers.sql create mode 100644 migrations/001.do.sql create mode 100644 migrations/002.do.daily-reward-transfers.sql diff --git a/migrations/001.do.daily-reward-transfers.sql b/migrations/001.do.daily-reward-transfers.sql deleted file mode 100644 index cacff3e8..00000000 --- a/migrations/001.do.daily-reward-transfers.sql +++ /dev/null @@ -1,11 +0,0 @@ -CREATE TABLE daily_reward_transfers ( - day DATE NOT NULL, - to_address TEXT NOT NULL, - amount NUMERIC NOT NULL, - PRIMARY KEY (day, to_address) -); - -CREATE TABLE reward_transfer_last_block ( - last_block INTEGER NOT NULL -); -INSERT INTO reward_transfer_last_block (last_block) VALUES (0); diff --git a/migrations/001.do.sql b/migrations/001.do.sql new file mode 100644 index 00000000..40a84eab --- /dev/null +++ b/migrations/001.do.sql @@ -0,0 +1 @@ +SELECT now(); \ No newline at end of file diff --git a/migrations/002.do.daily-reward-transfers.sql b/migrations/002.do.daily-reward-transfers.sql new file mode 100644 index 00000000..ecd9f7ec --- /dev/null +++ b/migrations/002.do.daily-reward-transfers.sql @@ -0,0 +1,10 @@ +CREATE TABLE daily_reward_transfers ( + day DATE NOT NULL, + to_address TEXT NOT NULL, + amount NUMERIC NOT NULL, + last_checked_block INTEGER NOT NULL, + PRIMARY KEY (day, to_address) +); + +CREATE INDEX daily_reward_transfers_day ON daily_reward_transfers (day); +CREATE INDEX daily_reward_transfers_last_block ON daily_reward_transfers (last_checked_block DESC); diff --git a/observer/bin/dry-run.js b/observer/bin/dry-run.js index ca6cf457..2617603b 100644 --- a/observer/bin/dry-run.js +++ b/observer/bin/dry-run.js @@ -15,9 +15,6 @@ const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider) await pgPool.query('DELETE FROM daily_reward_transfers') -await pgPool.query('DELETE FROM reward_transfer_last_block') -// Set the last block to -800 to simulate the observer starting from the beginning -await pgPool.query('INSERT INTO reward_transfer_last_block (last_block) VALUES (-800)') await observeTransferEvents(pgPool, ieContract, provider) diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js index 89367afa..23dbfa58 100644 --- a/observer/bin/spark-observer.js +++ b/observer/bin/spark-observer.js @@ -1,5 +1,6 @@ import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' import { ethers } from 'ethers' +import * as Sentry from '@sentry/node' import { RPC_URL, rpcHeaders, OBSERVATION_INTERVAL_MS } from '../lib/config.js' import { getPgPool } from '../lib/db.js' @@ -15,6 +16,11 @@ const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpact // Listen for Transfer events from the IE contract while (true) { - observeTransferEvents(pgPool, ieContract, provider) + try { + await observeTransferEvents(pgPool, ieContract, provider) + } catch (e) { + console.error(e) + Sentry.captureException(e) + } await new Promise(resolve => setTimeout(resolve, OBSERVATION_INTERVAL_MS)) } diff --git a/observer/lib/observer.js b/observer/lib/observer.js index 6a08e3ad..e92c3840 100644 --- a/observer/lib/observer.js +++ b/observer/lib/observer.js @@ -7,10 +7,9 @@ import { updateDailyTransferStats } from './platform-stats.js' * @param {import('ethers').Provider} provider */ export const observeTransferEvents = async (pgPool, ieContract, provider) => { - // Get the last checked block. Even though there should be only one row, use MAX just to be safe const lastCheckedBlock = await pgPool.query( - 'SELECT MAX(last_block) AS last_block FROM reward_transfer_last_block' - ).then(res => res.rows[0].last_block) + 'SELECT MAX(last_checked_block) FROM daily_reward_transfers' + ).then(res => res.rows[0].last_checked_block) console.log('Querying impact evaluator Transfer events after block', lastCheckedBlock) let events @@ -19,12 +18,14 @@ export const observeTransferEvents = async (pgPool, ieContract, provider) => { } catch (error) { console.error('Error querying impact evaluator Transfer events', error) if (error.message.includes('bad tipset height')) { - console.log('Block number too old, GLIF only provides last 2000 blocks, querying from there') - events = await ieContract.queryFilter(ieContract.filters.Transfer(), -1999) + console.log('Block number too old, GLIF only provides last 2000 blocks, querying from -1900') + events = await ieContract.queryFilter(ieContract.filters.Transfer(), -1900) } else { throw error } } + const currentBlockNumber = await provider.getBlockNumber() + console.log('Current block number:', currentBlockNumber) console.log(`Found ${events.length} Transfer events`) for (const event of events) { const transferEvent = { @@ -32,16 +33,6 @@ export const observeTransferEvents = async (pgPool, ieContract, provider) => { amount: event.args.amount } console.log('Transfer event:', transferEvent) - await updateDailyTransferStats(pgPool, transferEvent) + await updateDailyTransferStats(pgPool, transferEvent, currentBlockNumber) } - - // Get the current block number and update the last_block in reward_transfer_last_block table - // For safety, only update if the new block number is greater than the existing one - const blockNumber = await provider.getBlockNumber() - console.log('Current block number:', blockNumber) - await pgPool.query(` - UPDATE reward_transfer_last_block - SET last_block = $1 - WHERE $1 > last_block - `, [blockNumber]) } diff --git a/observer/lib/platform-stats.js b/observer/lib/platform-stats.js index ec648097..a41a3f2e 100644 --- a/observer/lib/platform-stats.js +++ b/observer/lib/platform-stats.js @@ -3,13 +3,14 @@ * @param {Object} transferEvent * @param {string} transferEvent.to_address * @param {number} transferEvent.amount - * @param {number} transferEvent.blockNumber + * @param {number} currentBlockNumber */ -export const updateDailyTransferStats = async (pgClient, transferEvent) => { +export const updateDailyTransferStats = async (pgClient, transferEvent, currentBlockNumber) => { await pgClient.query(` - INSERT INTO daily_reward_transfers (day, to_address, amount) - VALUES (now(), $1, $2) - ON CONFLICT (day, to_address) DO UPDATE - SET amount = daily_reward_transfers.amount + EXCLUDED.amount - `, [transferEvent.to_address, transferEvent.amount]) + INSERT INTO daily_reward_transfers (day, to_address, amount, last_checked_block) + VALUES (now(), $1, $2, $3) + ON CONFLICT (day, to_address) DO UPDATE SET + amount = daily_reward_transfers.amount + EXCLUDED.amount, + last_checked_block = EXCLUDED.last_checked_block + `, [transferEvent.to_address, transferEvent.amount, currentBlockNumber]) } diff --git a/observer/test/platform-stats.test.js b/observer/test/platform-stats.test.js index 493288fb..a0f3adcf 100644 --- a/observer/test/platform-stats.test.js +++ b/observer/test/platform-stats.test.js @@ -36,8 +36,8 @@ describe('platform-stats-generator', () => { describe('updateDailyTransferStats', () => { it('should correctly update daily Transfer stats with new transfer events', async () => { - await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 100, blockNumber: 1 }) - await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 200, blockNumber: 1 }) + await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 100 }, 1) + await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 200 }, 1) const { rows } = await pgPool.query(` SELECT day::TEXT, to_address, amount FROM daily_reward_transfers @@ -48,18 +48,18 @@ describe('platform-stats-generator', () => { }) it('should handle multiple addresses in daily Transfer stats', async () => { - await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 50, blockNumber: 1 }) - await updateDailyTransferStats(pgPool, { to_address: 'address2', amount: 150, blockNumber: 1 }) + await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 50 }, 1) + await updateDailyTransferStats(pgPool, { to_address: 'address2', amount: 150 }, 1) const { rows } = await pgPool.query(` - SELECT day::TEXT, to_address, amount FROM daily_reward_transfers + SELECT day::TEXT, to_address, amount, last_checked_block FROM daily_reward_transfers ORDER BY to_address `) assert.strictEqual(rows.length, 2) assert.deepStrictEqual(rows, [ - { day: today, to_address: 'address1', amount: '50' }, - { day: today, to_address: 'address2', amount: '150' } + { day: today, to_address: 'address1', amount: '50', last_checked_block: 1 }, + { day: today, to_address: 'address2', amount: '150', last_checked_block: 1 } ]) }) }) diff --git a/stats/test/platform-routes.test.js b/stats/test/platform-routes.test.js index 24eba0e2..a276a835 100644 --- a/stats/test/platform-routes.test.js +++ b/stats/test/platform-routes.test.js @@ -163,17 +163,17 @@ describe('Platform Routes HTTP request handler', () => { describe('GET /transfers/daily', () => { it('returns daily total Rewards sent for the given date range', async () => { await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-10', [ - { to_address: 'to1', amount: 100 } + { to_address: 'to1', amount: 100, last_checked_block: 1 } ]) await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-11', [ - { to_address: 'to2', amount: 150 } + { to_address: 'to2', amount: 150, last_checked_block: 1 } ]) await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-12', [ - { to_address: 'to2', amount: 300 }, - { to_address: 'to3', amount: 250 } + { to_address: 'to2', amount: 300, last_checked_block: 1 }, + { to_address: 'to3', amount: 250, last_checked_block: 1 } ]) await givenDailyRewardTransferMetrics(pgPoolStatsDb, '2024-01-13', [ - { to_address: 'to1', amount: 100 } + { to_address: 'to1', amount: 100, last_checked_block: 1 } ]) const res = await fetch( @@ -208,12 +208,13 @@ const givenDailyStationMetrics = async (pgPoolEvaluateDb, day, stationStats) => const givenDailyRewardTransferMetrics = async (pgPoolStatsDb, day, transferStats) => { await pgPoolStatsDb.query(` - INSERT INTO daily_reward_transfers (day, to_address, amount) - SELECT $1 AS day, UNNEST($2::text[]) AS to_address, UNNEST($3::int[]) AS amount + INSERT INTO daily_reward_transfers (day, to_address, amount, last_checked_block) + SELECT $1 AS day, UNNEST($2::text[]) AS to_address, UNNEST($3::int[]) AS amount, UNNEST($4::int[]) AS last_checked_block ON CONFLICT DO NOTHING `, [ day, transferStats.map(s => s.to_address), - transferStats.map(s => s.amount) + transferStats.map(s => s.amount), + transferStats.map(s => s.last_checked_block) ]) } From d87254c974f0988b6842acae7f4ef309a7b425f5 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Fri, 7 Jun 2024 06:00:51 -0700 Subject: [PATCH 08/13] Update migrations/001.do.sql --- migrations/001.do.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/migrations/001.do.sql b/migrations/001.do.sql index 40a84eab..2e607d07 100644 --- a/migrations/001.do.sql +++ b/migrations/001.do.sql @@ -1 +1 @@ -SELECT now(); \ No newline at end of file +SELECT now(); From 79b92f6c7b7cfc3e016722d3c0a1183dc32e124f Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Fri, 7 Jun 2024 06:01:06 -0700 Subject: [PATCH 09/13] Update observer/bin/spark-observer.js --- observer/bin/spark-observer.js | 1 + 1 file changed, 1 insertion(+) diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js index 23dbfa58..98524af2 100644 --- a/observer/bin/spark-observer.js +++ b/observer/bin/spark-observer.js @@ -1,6 +1,7 @@ import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator' import { ethers } from 'ethers' import * as Sentry from '@sentry/node' +import timers from 'node:timers/promises' import { RPC_URL, rpcHeaders, OBSERVATION_INTERVAL_MS } from '../lib/config.js' import { getPgPool } from '../lib/db.js' From f26fb5b6c7825ab87d0534edc42aee7d5dde8217 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Fri, 7 Jun 2024 06:01:18 -0700 Subject: [PATCH 10/13] Update observer/lib/observer.js --- observer/lib/observer.js | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/observer/lib/observer.js b/observer/lib/observer.js index e92c3840..a6c9970f 100644 --- a/observer/lib/observer.js +++ b/observer/lib/observer.js @@ -7,9 +7,10 @@ import { updateDailyTransferStats } from './platform-stats.js' * @param {import('ethers').Provider} provider */ export const observeTransferEvents = async (pgPool, ieContract, provider) => { - const lastCheckedBlock = await pgPool.query( + const rows = await pgPool.query( 'SELECT MAX(last_checked_block) FROM daily_reward_transfers' - ).then(res => res.rows[0].last_checked_block) + ) + const lastCheckedBlock = res.rows[0].last_checked_block console.log('Querying impact evaluator Transfer events after block', lastCheckedBlock) let events From a4c7f5107a2834892ba61c5f7cd44a4d9ce95ca1 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Fri, 7 Jun 2024 06:01:34 -0700 Subject: [PATCH 11/13] Update observer/lib/platform-stats.js --- observer/lib/platform-stats.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/observer/lib/platform-stats.js b/observer/lib/platform-stats.js index a41a3f2e..445d29d5 100644 --- a/observer/lib/platform-stats.js +++ b/observer/lib/platform-stats.js @@ -10,7 +10,6 @@ export const updateDailyTransferStats = async (pgClient, transferEvent, currentB INSERT INTO daily_reward_transfers (day, to_address, amount, last_checked_block) VALUES (now(), $1, $2, $3) ON CONFLICT (day, to_address) DO UPDATE SET - amount = daily_reward_transfers.amount + EXCLUDED.amount, - last_checked_block = EXCLUDED.last_checked_block + amount = daily_reward_transfers.amount + EXCLUDED.amount `, [transferEvent.to_address, transferEvent.amount, currentBlockNumber]) } From 5aff42f070a846132a8151ec4bd36f514fe03622 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Fri, 7 Jun 2024 06:03:31 -0700 Subject: [PATCH 12/13] Update observer/bin/spark-observer.js --- observer/bin/spark-observer.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/observer/bin/spark-observer.js b/observer/bin/spark-observer.js index 98524af2..66beec1b 100644 --- a/observer/bin/spark-observer.js +++ b/observer/bin/spark-observer.js @@ -23,5 +23,5 @@ while (true) { console.error(e) Sentry.captureException(e) } - await new Promise(resolve => setTimeout(resolve, OBSERVATION_INTERVAL_MS)) + await timers.setTimeout(OBSERVATION_INTERVAL_MS) } From 03cec3855969ce47ea14dd3d2a88191c83e59d12 Mon Sep 17 00:00:00 2001 From: Julian Gruber Date: Fri, 7 Jun 2024 06:05:53 -0700 Subject: [PATCH 13/13] Update observer/lib/observer.js --- observer/lib/observer.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/observer/lib/observer.js b/observer/lib/observer.js index a6c9970f..c1ef935a 100644 --- a/observer/lib/observer.js +++ b/observer/lib/observer.js @@ -7,10 +7,10 @@ import { updateDailyTransferStats } from './platform-stats.js' * @param {import('ethers').Provider} provider */ export const observeTransferEvents = async (pgPool, ieContract, provider) => { - const rows = await pgPool.query( + const { rows } = await pgPool.query( 'SELECT MAX(last_checked_block) FROM daily_reward_transfers' ) - const lastCheckedBlock = res.rows[0].last_checked_block + const lastCheckedBlock = rows[0].last_checked_block console.log('Querying impact evaluator Transfer events after block', lastCheckedBlock) let events