Skip to content

Commit 246a3b6

Browse files
Poll FIL events, populate daily_reward_transfers table, reformat endpoint handling (#102)
* Poll Transfer events, populate daily_fil table, reformat endpoint handling * incomplete: use queryFilter for Event tracking history * Merge with new repo structure, ensure total block history coverage * migrating db before observer tests * Run in loop, modularize code, add dry-run * fix any missed renames for consistency * Loop error handling and transition to single-table approach * Update migrations/001.do.sql * Update observer/bin/spark-observer.js * Update observer/lib/observer.js * Update observer/lib/platform-stats.js * Update observer/bin/spark-observer.js * Update observer/lib/observer.js --------- Co-authored-by: Julian Gruber <[email protected]>
1 parent 1146262 commit 246a3b6

15 files changed

+395
-176
lines changed
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
CREATE TABLE daily_reward_transfers (
2+
day DATE NOT NULL,
3+
to_address TEXT NOT NULL,
4+
amount NUMERIC NOT NULL,
5+
last_checked_block INTEGER NOT NULL,
6+
PRIMARY KEY (day, to_address)
7+
);
8+
9+
CREATE INDEX daily_reward_transfers_day ON daily_reward_transfers (day);
10+
CREATE INDEX daily_reward_transfers_last_block ON daily_reward_transfers (last_checked_block DESC);

observer/bin/dry-run.js

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
2+
import { ethers } from 'ethers'
3+
4+
import { RPC_URL, rpcHeaders } from '../lib/config.js'
5+
import { observeTransferEvents } from '../lib/observer.js'
6+
import { getPgPool } from '../lib/db.js'
7+
8+
/** @type {pg.Pool} */
9+
const pgPool = await getPgPool()
10+
11+
const fetchRequest = new ethers.FetchRequest(RPC_URL)
12+
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
13+
const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true })
14+
15+
const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)
16+
17+
await pgPool.query('DELETE FROM daily_reward_transfers')
18+
19+
await observeTransferEvents(pgPool, ieContract, provider)
20+
21+
await pgPool.end()

observer/bin/spark-observer.js

Lines changed: 20 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,27 @@
11
import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
2-
import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations'
32
import { ethers } from 'ethers'
4-
import pg from 'pg'
3+
import * as Sentry from '@sentry/node'
4+
import timers from 'node:timers/promises'
55

6-
// TODO: move this to a config.js file
7-
const {
8-
DATABASE_URL = 'postgres://localhost:5432/spark_stats',
9-
RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1,https://filecoin.chainup.net/rpc/v1',
10-
GLIF_TOKEN
11-
} = process.env
6+
import { RPC_URL, rpcHeaders, OBSERVATION_INTERVAL_MS } from '../lib/config.js'
7+
import { getPgPool } from '../lib/db.js'
8+
import { observeTransferEvents } from '../lib/observer.js'
129

13-
const rpcUrls = RPC_URLS.split(',')
14-
const RPC_URL = rpcUrls[Math.floor(Math.random() * rpcUrls.length)]
15-
console.log(`Selected JSON-RPC endpoint ${RPC_URL}`)
10+
const pgPool = await getPgPool()
1611

17-
const rpcHeaders = {}
18-
if (RPC_URL.includes('glif')) {
19-
rpcHeaders.Authorization = `Bearer ${GLIF_TOKEN}`
20-
}
21-
22-
// TODO: move this to a different file
2312
const fetchRequest = new ethers.FetchRequest(RPC_URL)
2413
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
25-
const provider = new ethers.JsonRpcProvider(
26-
fetchRequest,
27-
null,
28-
{ polling: true }
29-
)
30-
31-
const ieContract = new ethers.Contract(
32-
SparkImpactEvaluator.ADDRESS,
33-
SparkImpactEvaluator.ABI,
34-
provider
35-
)
36-
37-
const pgPool = new pg.Pool({
38-
connectionString: DATABASE_URL,
39-
// allow the pool to close all connections and become empty
40-
min: 0,
41-
// this values should correlate with service concurrency hard_limit configured in fly.toml
42-
// and must take into account the connection limit of our PG server, see
43-
// https://fly.io/docs/postgres/managing/configuration-tuning/
44-
max: 100,
45-
// close connections that haven't been used for one second
46-
idleTimeoutMillis: 1000,
47-
// automatically close connections older than 60 seconds
48-
maxLifetimeSeconds: 60
49-
})
50-
51-
pgPool.on('error', err => {
52-
// Prevent crashing the process on idle client errors, the pool will recover
53-
// itself. If all connections are lost, the process will still crash.
54-
// https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405
55-
console.error('An idle client has experienced an error', err.stack)
56-
})
57-
58-
await migrateWithPgClient(pgPool)
59-
60-
// Check that we can talk to the database
61-
await pgPool.query('SELECT 1')
62-
63-
console.log('Listening for impact evaluator events')
64-
65-
ieContract.on('Transfer', (to, amount, ...args) => {
66-
/** @type {number} */
67-
const blockNumber = args.pop()
68-
console.log('Transfer %s FIL to %s at epoch %s', amount, to, blockNumber)
69-
// TODO: update the database
70-
})
14+
const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true })
15+
16+
const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)
17+
18+
// Listen for Transfer events from the IE contract
19+
while (true) {
20+
try {
21+
await observeTransferEvents(pgPool, ieContract, provider)
22+
} catch (e) {
23+
console.error(e)
24+
Sentry.captureException(e)
25+
}
26+
await timers.setTimeout(OBSERVATION_INTERVAL_MS)
27+
}

observer/lib/config.js

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,27 @@
1-
export const {
1+
const {
2+
// FIXME Add back chain.love either when it's online or once onContractEvent
3+
// supports rpc failover
4+
// RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1',
5+
RPC_URLS = 'https://api.node.glif.io/rpc/v0',
6+
GLIF_TOKEN,
27
// DATABASE_URL points to `spark_stats` database managed by this monorepo
3-
DATABASE_URL = 'postgres://localhost:5432/spark_stats'
8+
DATABASE_URL = 'postgres://localhost:5432/spark_stats',
9+
// Sleep one hour between observations
10+
OBSERVATION_INTERVAL_MS = 1000 * 60 * 60
411
} = process.env
12+
13+
const rpcUrls = RPC_URLS.split(',')
14+
const RPC_URL = rpcUrls[Math.floor(Math.random() * rpcUrls.length)]
15+
console.log(`Selected JSON-RPC endpoint ${RPC_URL}`)
16+
17+
const rpcHeaders = {}
18+
if (RPC_URL.includes('glif')) {
19+
rpcHeaders.Authorization = `Bearer ${GLIF_TOKEN}`
20+
}
21+
22+
export {
23+
RPC_URL,
24+
DATABASE_URL,
25+
rpcHeaders,
26+
OBSERVATION_INTERVAL_MS
27+
}

observer/lib/db.js

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations'
2+
import pg from 'pg'
3+
4+
import { DATABASE_URL } from '../lib/config.js'
5+
6+
export const getPgPool = async () => {
7+
const pgPool = new pg.Pool({
8+
connectionString: DATABASE_URL,
9+
// allow the pool to close all connections and become empty
10+
min: 0,
11+
// this values should correlate with service concurrency hard_limit configured in fly.toml
12+
// and must take into account the connection limit of our PG server, see
13+
// https://fly.io/docs/postgres/managing/configuration-tuning/
14+
max: 100,
15+
// close connections that haven't been used for one second
16+
idleTimeoutMillis: 1000,
17+
// automatically close connections older than 60 seconds
18+
maxLifetimeSeconds: 60
19+
})
20+
21+
pgPool.on('error', err => {
22+
// Prevent crashing the process on idle client errors, the pool will recover
23+
// itself. If all connections are lost, the process will still crash.
24+
// https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405
25+
console.error('An idle client has experienced an error', err.stack)
26+
})
27+
28+
await migrateWithPgClient(pgPool)
29+
30+
// Check that we can talk to the database
31+
await pgPool.query('SELECT 1')
32+
33+
return pgPool
34+
}

observer/lib/observer.js

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import { updateDailyTransferStats } from './platform-stats.js'
2+
3+
/**
4+
* Observe the transfer events on the Filecoin blockchain
5+
* @param {import('pg').Pool} pgPool
6+
* @param {import('ethers').Contract} ieContract
7+
* @param {import('ethers').Provider} provider
8+
*/
9+
export const observeTransferEvents = async (pgPool, ieContract, provider) => {
10+
const { rows } = await pgPool.query(
11+
'SELECT MAX(last_checked_block) FROM daily_reward_transfers'
12+
)
13+
const lastCheckedBlock = rows[0].last_checked_block
14+
15+
console.log('Querying impact evaluator Transfer events after block', lastCheckedBlock)
16+
let events
17+
try {
18+
events = await ieContract.queryFilter(ieContract.filters.Transfer(), lastCheckedBlock)
19+
} catch (error) {
20+
console.error('Error querying impact evaluator Transfer events', error)
21+
if (error.message.includes('bad tipset height')) {
22+
console.log('Block number too old, GLIF only provides last 2000 blocks, querying from -1900')
23+
events = await ieContract.queryFilter(ieContract.filters.Transfer(), -1900)
24+
} else {
25+
throw error
26+
}
27+
}
28+
const currentBlockNumber = await provider.getBlockNumber()
29+
console.log('Current block number:', currentBlockNumber)
30+
console.log(`Found ${events.length} Transfer events`)
31+
for (const event of events) {
32+
const transferEvent = {
33+
to_address: event.args.to,
34+
amount: event.args.amount
35+
}
36+
console.log('Transfer event:', transferEvent)
37+
await updateDailyTransferStats(pgPool, transferEvent, currentBlockNumber)
38+
}
39+
}

observer/lib/platform-stats.js

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
/**
2+
* @param {import('pg').Client} pgClient
3+
* @param {Object} transferEvent
4+
* @param {string} transferEvent.to_address
5+
* @param {number} transferEvent.amount
6+
* @param {number} currentBlockNumber
7+
*/
8+
export const updateDailyTransferStats = async (pgClient, transferEvent, currentBlockNumber) => {
9+
await pgClient.query(`
10+
INSERT INTO daily_reward_transfers (day, to_address, amount, last_checked_block)
11+
VALUES (now(), $1, $2, $3)
12+
ON CONFLICT (day, to_address) DO UPDATE SET
13+
amount = daily_reward_transfers.amount + EXCLUDED.amount
14+
`, [transferEvent.to_address, transferEvent.amount, currentBlockNumber])
15+
}

observer/test/platform-stats.test.js

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import assert from 'node:assert'
2+
import pg from 'pg'
3+
import { beforeEach, describe, it } from 'mocha'
4+
5+
import { DATABASE_URL } from '../lib/config.js'
6+
import { updateDailyTransferStats } from '../lib/platform-stats.js'
7+
import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations'
8+
9+
describe('platform-stats-generator', () => {
10+
/** @type {pg.Pool} */
11+
let pgPool
12+
13+
before(async () => {
14+
pgPool = new pg.Pool({ connectionString: DATABASE_URL })
15+
await migrateWithPgClient(pgPool)
16+
})
17+
18+
let today
19+
beforeEach(async () => {
20+
await pgPool.query('DELETE FROM daily_reward_transfers')
21+
22+
// Run all tests inside a transaction to ensure `now()` always returns the same value
23+
// See https://dba.stackexchange.com/a/63549/125312
24+
// This avoids subtle race conditions when the tests are executed around midnight.
25+
await pgPool.query('BEGIN TRANSACTION')
26+
today = await getCurrentDate()
27+
})
28+
29+
afterEach(async () => {
30+
await pgPool.query('END TRANSACTION')
31+
})
32+
33+
after(async () => {
34+
await pgPool.end()
35+
})
36+
37+
describe('updateDailyTransferStats', () => {
38+
it('should correctly update daily Transfer stats with new transfer events', async () => {
39+
await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 100 }, 1)
40+
await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 200 }, 1)
41+
42+
const { rows } = await pgPool.query(`
43+
SELECT day::TEXT, to_address, amount FROM daily_reward_transfers
44+
WHERE to_address = $1
45+
`, ['address1'])
46+
assert.strictEqual(rows.length, 1)
47+
assert.deepStrictEqual(rows, [{ day: today, to_address: 'address1', amount: '300' }])
48+
})
49+
50+
it('should handle multiple addresses in daily Transfer stats', async () => {
51+
await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 50 }, 1)
52+
await updateDailyTransferStats(pgPool, { to_address: 'address2', amount: 150 }, 1)
53+
54+
const { rows } = await pgPool.query(`
55+
SELECT day::TEXT, to_address, amount, last_checked_block FROM daily_reward_transfers
56+
ORDER BY to_address
57+
`)
58+
assert.strictEqual(rows.length, 2)
59+
60+
assert.deepStrictEqual(rows, [
61+
{ day: today, to_address: 'address1', amount: '50', last_checked_block: 1 },
62+
{ day: today, to_address: 'address2', amount: '150', last_checked_block: 1 }
63+
])
64+
})
65+
})
66+
67+
const getCurrentDate = async () => {
68+
const { rows: [{ today }] } = await pgPool.query('SELECT now()::DATE::TEXT as today')
69+
return today
70+
}
71+
})

observer/test/smoke.test.js

Lines changed: 0 additions & 23 deletions
This file was deleted.

0 commit comments

Comments
 (0)