Skip to content

Poll FIL events, populate daily_reward_transfers table, reformat endpoint handling #102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions migrations/002.do.daily-reward-transfers.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
CREATE TABLE daily_reward_transfers (
day DATE NOT NULL,
to_address TEXT NOT NULL,
amount NUMERIC NOT NULL,
last_checked_block INTEGER NOT NULL,
PRIMARY KEY (day, to_address)
);

CREATE INDEX daily_reward_transfers_day ON daily_reward_transfers (day);
CREATE INDEX daily_reward_transfers_last_block ON daily_reward_transfers (last_checked_block DESC);
21 changes: 21 additions & 0 deletions observer/bin/dry-run.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import { ethers } from 'ethers'

import { RPC_URL, rpcHeaders } from '../lib/config.js'
import { observeTransferEvents } from '../lib/observer.js'
import { getPgPool } from '../lib/db.js'

/** @type {pg.Pool} */
const pgPool = await getPgPool()

const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true })

const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)

await pgPool.query('DELETE FROM daily_reward_transfers')

await observeTransferEvents(pgPool, ieContract, provider)

await pgPool.end()
83 changes: 20 additions & 63 deletions observer/bin/spark-observer.js
Original file line number Diff line number Diff line change
@@ -1,70 +1,27 @@
import * as SparkImpactEvaluator from '@filecoin-station/spark-impact-evaluator'
import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations'
import { ethers } from 'ethers'
import pg from 'pg'
import * as Sentry from '@sentry/node'
import timers from 'node:timers/promises'

// TODO: move this to a config.js file
const {
DATABASE_URL = 'postgres://localhost:5432/spark_stats',
RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1,https://filecoin.chainup.net/rpc/v1',
GLIF_TOKEN
} = process.env
import { RPC_URL, rpcHeaders, OBSERVATION_INTERVAL_MS } from '../lib/config.js'
import { getPgPool } from '../lib/db.js'
import { observeTransferEvents } from '../lib/observer.js'

const rpcUrls = RPC_URLS.split(',')
const RPC_URL = rpcUrls[Math.floor(Math.random() * rpcUrls.length)]
console.log(`Selected JSON-RPC endpoint ${RPC_URL}`)
const pgPool = await getPgPool()

const rpcHeaders = {}
if (RPC_URL.includes('glif')) {
rpcHeaders.Authorization = `Bearer ${GLIF_TOKEN}`
}

// TODO: move this to a different file
const fetchRequest = new ethers.FetchRequest(RPC_URL)
fetchRequest.setHeader('Authorization', rpcHeaders.Authorization || '')
const provider = new ethers.JsonRpcProvider(
fetchRequest,
null,
{ polling: true }
)

const ieContract = new ethers.Contract(
SparkImpactEvaluator.ADDRESS,
SparkImpactEvaluator.ABI,
provider
)

const pgPool = new pg.Pool({
connectionString: DATABASE_URL,
// allow the pool to close all connections and become empty
min: 0,
// this values should correlate with service concurrency hard_limit configured in fly.toml
// and must take into account the connection limit of our PG server, see
// https://fly.io/docs/postgres/managing/configuration-tuning/
max: 100,
// close connections that haven't been used for one second
idleTimeoutMillis: 1000,
// automatically close connections older than 60 seconds
maxLifetimeSeconds: 60
})

pgPool.on('error', err => {
// Prevent crashing the process on idle client errors, the pool will recover
// itself. If all connections are lost, the process will still crash.
// https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405
console.error('An idle client has experienced an error', err.stack)
})

await migrateWithPgClient(pgPool)

// Check that we can talk to the database
await pgPool.query('SELECT 1')

console.log('Listening for impact evaluator events')

ieContract.on('Transfer', (to, amount, ...args) => {
/** @type {number} */
const blockNumber = args.pop()
console.log('Transfer %s FIL to %s at epoch %s', amount, to, blockNumber)
// TODO: update the database
})
const provider = new ethers.JsonRpcProvider(fetchRequest, null, { polling: true })

const ieContract = new ethers.Contract(SparkImpactEvaluator.ADDRESS, SparkImpactEvaluator.ABI, provider)

// Listen for Transfer events from the IE contract
while (true) {
try {
await observeTransferEvents(pgPool, ieContract, provider)
} catch (e) {
console.error(e)
Sentry.captureException(e)
}
await timers.setTimeout(OBSERVATION_INTERVAL_MS)
}
27 changes: 25 additions & 2 deletions observer/lib/config.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,27 @@
export const {
const {
// FIXME Add back chain.love either when it's online or once onContractEvent
// supports rpc failover
// RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1',
RPC_URLS = 'https://api.node.glif.io/rpc/v0',
GLIF_TOKEN,
// DATABASE_URL points to `spark_stats` database managed by this monorepo
DATABASE_URL = 'postgres://localhost:5432/spark_stats'
DATABASE_URL = 'postgres://localhost:5432/spark_stats',
// Sleep one hour between observations
OBSERVATION_INTERVAL_MS = 1000 * 60 * 60
} = process.env

const rpcUrls = RPC_URLS.split(',')
const RPC_URL = rpcUrls[Math.floor(Math.random() * rpcUrls.length)]
console.log(`Selected JSON-RPC endpoint ${RPC_URL}`)

const rpcHeaders = {}
if (RPC_URL.includes('glif')) {
rpcHeaders.Authorization = `Bearer ${GLIF_TOKEN}`
}

export {
RPC_URL,
DATABASE_URL,
rpcHeaders,
OBSERVATION_INTERVAL_MS
}
34 changes: 34 additions & 0 deletions observer/lib/db.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations'
import pg from 'pg'

import { DATABASE_URL } from '../lib/config.js'

export const getPgPool = async () => {
const pgPool = new pg.Pool({
connectionString: DATABASE_URL,
// allow the pool to close all connections and become empty
min: 0,
// this values should correlate with service concurrency hard_limit configured in fly.toml
// and must take into account the connection limit of our PG server, see
// https://fly.io/docs/postgres/managing/configuration-tuning/
max: 100,
// close connections that haven't been used for one second
idleTimeoutMillis: 1000,
// automatically close connections older than 60 seconds
maxLifetimeSeconds: 60
})

pgPool.on('error', err => {
// Prevent crashing the process on idle client errors, the pool will recover
// itself. If all connections are lost, the process will still crash.
// https://github.com/brianc/node-postgres/issues/1324#issuecomment-308778405
console.error('An idle client has experienced an error', err.stack)
})

await migrateWithPgClient(pgPool)

// Check that we can talk to the database
await pgPool.query('SELECT 1')

return pgPool
}
39 changes: 39 additions & 0 deletions observer/lib/observer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { updateDailyTransferStats } from './platform-stats.js'

/**
* Observe the transfer events on the Filecoin blockchain
* @param {import('pg').Pool} pgPool
* @param {import('ethers').Contract} ieContract
* @param {import('ethers').Provider} provider
*/
export const observeTransferEvents = async (pgPool, ieContract, provider) => {
const { rows } = await pgPool.query(
'SELECT MAX(last_checked_block) FROM daily_reward_transfers'
)
const lastCheckedBlock = rows[0].last_checked_block

console.log('Querying impact evaluator Transfer events after block', lastCheckedBlock)
let events
try {
events = await ieContract.queryFilter(ieContract.filters.Transfer(), lastCheckedBlock)
} catch (error) {
console.error('Error querying impact evaluator Transfer events', error)
if (error.message.includes('bad tipset height')) {
console.log('Block number too old, GLIF only provides last 2000 blocks, querying from -1900')
events = await ieContract.queryFilter(ieContract.filters.Transfer(), -1900)
} else {
throw error
}
}
const currentBlockNumber = await provider.getBlockNumber()
console.log('Current block number:', currentBlockNumber)
console.log(`Found ${events.length} Transfer events`)
for (const event of events) {
const transferEvent = {
to_address: event.args.to,
amount: event.args.amount
}
console.log('Transfer event:', transferEvent)
await updateDailyTransferStats(pgPool, transferEvent, currentBlockNumber)
}
}
15 changes: 15 additions & 0 deletions observer/lib/platform-stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/**
* @param {import('pg').Client} pgClient
* @param {Object} transferEvent
* @param {string} transferEvent.to_address
* @param {number} transferEvent.amount
* @param {number} currentBlockNumber
*/
export const updateDailyTransferStats = async (pgClient, transferEvent, currentBlockNumber) => {
await pgClient.query(`
INSERT INTO daily_reward_transfers (day, to_address, amount, last_checked_block)
VALUES (now(), $1, $2, $3)
ON CONFLICT (day, to_address) DO UPDATE SET
amount = daily_reward_transfers.amount + EXCLUDED.amount
`, [transferEvent.to_address, transferEvent.amount, currentBlockNumber])
}
71 changes: 71 additions & 0 deletions observer/test/platform-stats.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
import assert from 'node:assert'
import pg from 'pg'
import { beforeEach, describe, it } from 'mocha'

import { DATABASE_URL } from '../lib/config.js'
import { updateDailyTransferStats } from '../lib/platform-stats.js'
import { migrateWithPgClient } from '@filecoin-station/spark-stats-db-migrations'

describe('platform-stats-generator', () => {
/** @type {pg.Pool} */
let pgPool

before(async () => {
pgPool = new pg.Pool({ connectionString: DATABASE_URL })
await migrateWithPgClient(pgPool)
})

let today
beforeEach(async () => {
await pgPool.query('DELETE FROM daily_reward_transfers')

// Run all tests inside a transaction to ensure `now()` always returns the same value
// See https://dba.stackexchange.com/a/63549/125312
// This avoids subtle race conditions when the tests are executed around midnight.
await pgPool.query('BEGIN TRANSACTION')
today = await getCurrentDate()
})

afterEach(async () => {
await pgPool.query('END TRANSACTION')
})

after(async () => {
await pgPool.end()
})

describe('updateDailyTransferStats', () => {
it('should correctly update daily Transfer stats with new transfer events', async () => {
await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 100 }, 1)
await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 200 }, 1)

const { rows } = await pgPool.query(`
SELECT day::TEXT, to_address, amount FROM daily_reward_transfers
WHERE to_address = $1
`, ['address1'])
assert.strictEqual(rows.length, 1)
assert.deepStrictEqual(rows, [{ day: today, to_address: 'address1', amount: '300' }])
})

it('should handle multiple addresses in daily Transfer stats', async () => {
await updateDailyTransferStats(pgPool, { to_address: 'address1', amount: 50 }, 1)
await updateDailyTransferStats(pgPool, { to_address: 'address2', amount: 150 }, 1)

const { rows } = await pgPool.query(`
SELECT day::TEXT, to_address, amount, last_checked_block FROM daily_reward_transfers
ORDER BY to_address
`)
assert.strictEqual(rows.length, 2)

assert.deepStrictEqual(rows, [
{ day: today, to_address: 'address1', amount: '50', last_checked_block: 1 },
{ day: today, to_address: 'address2', amount: '150', last_checked_block: 1 }
])
})
})

const getCurrentDate = async () => {
const { rows: [{ today }] } = await pgPool.query('SELECT now()::DATE::TEXT as today')
return today
}
})
23 changes: 0 additions & 23 deletions observer/test/smoke.test.js

This file was deleted.

Loading