Overview
This example demonstrates building an indexer that:- Backfills historical Transfer events from a specific block range
- Watches for new blocks in real-time
- Handles chain reorgs by removing re-orged transfers and adding new ones
- Parses ERC-20 Transfer events from transaction receipts
Full Example
Copy
Ask AI
import { BlockStream } from '@tevm/voltaire/BlockStream';
import { Keccak256 } from '@tevm/voltaire/Keccak256';
import { Hex } from '@tevm/voltaire/Hex';
import { Address } from '@tevm/voltaire/Address';
// ERC-20 Transfer(address indexed from, address indexed to, uint256 value)
const TRANSFER_TOPIC = Hex.fromBytes(Keccak256('Transfer(address,address,uint256)'));
// Simple in-memory database
interface Transfer {
blockNumber: bigint;
blockHash: string;
transactionHash: string;
logIndex: number;
from: string;
to: string;
value: bigint;
tokenAddress: string;
}
const transfersByBlock = new Map<string, Transfer[]>();
const allTransfers: Transfer[] = [];
/**
* Parse ERC-20 Transfer events from a block's receipts
*/
function parseTransfers(block: {
header: { number: bigint; hash: Uint8Array };
receipts: readonly {
transactionHash: Uint8Array;
logs: readonly {
address: Uint8Array;
topics: readonly Uint8Array[];
data: Uint8Array;
logIndex?: number;
}[]
}[]
}): Transfer[] {
const transfers: Transfer[] = [];
const blockHash = Hex.fromBytes(block.header.hash);
for (const receipt of block.receipts) {
for (const log of receipt.logs) {
// Check if this is a Transfer event (topic0 matches)
if (log.topics.length < 3) continue;
const topic0 = Hex.fromBytes(log.topics[0]);
if (topic0 !== TRANSFER_TOPIC) continue;
// Parse indexed parameters from topics
// topic1 = from (address, padded to 32 bytes)
// topic2 = to (address, padded to 32 bytes)
const fromBytes = log.topics[1].slice(12); // Remove 12-byte padding
const toBytes = log.topics[2].slice(12);
// Parse value from data (uint256)
const value = Hex.toBigInt(Hex.fromBytes(log.data));
transfers.push({
blockNumber: block.header.number,
blockHash,
transactionHash: Hex.fromBytes(receipt.transactionHash),
logIndex: log.logIndex ?? 0,
from: Address.toChecksummed(Address.fromBytes(fromBytes)),
to: Address.toChecksummed(Address.fromBytes(toBytes)),
value,
tokenAddress: Address.toChecksummed(log.address),
});
}
}
return transfers;
}
/**
* Add transfers to the database
*/
function addTransfers(transfers: Transfer[]) {
for (const transfer of transfers) {
allTransfers.push(transfer);
const existing = transfersByBlock.get(transfer.blockHash) ?? [];
existing.push(transfer);
transfersByBlock.set(transfer.blockHash, existing);
}
}
/**
* Remove transfers from a re-orged block
*/
function removeTransfersByBlockHash(blockHash: string) {
const removed = transfersByBlock.get(blockHash) ?? [];
transfersByBlock.delete(blockHash);
// Remove from allTransfers array
for (const transfer of removed) {
const idx = allTransfers.findIndex(
t => t.blockHash === transfer.blockHash &&
t.transactionHash === transfer.transactionHash &&
t.logIndex === transfer.logIndex
);
if (idx !== -1) {
allTransfers.splice(idx, 1);
}
}
return removed;
}
/**
* Run the indexer
*/
async function runIndexer(provider: any) {
const stream = BlockStream({ provider });
// Configuration
const START_BLOCK = 18000000n;
const BACKFILL_TO = 18001000n; // Backfill first 1000 blocks
console.log(`Starting indexer from block ${START_BLOCK}`);
// Phase 1: Backfill historical blocks
console.log('Phase 1: Backfilling historical blocks...');
for await (const event of stream.backfill({
fromBlock: START_BLOCK,
toBlock: BACKFILL_TO,
include: 'receipts',
})) {
for (const block of event.blocks) {
const transfers = parseTransfers(block);
addTransfers(transfers);
if (transfers.length > 0) {
console.log(
`Block ${block.header.number}: ${transfers.length} transfers`
);
}
}
}
console.log(`Backfill complete. Total transfers: ${allTransfers.length}`);
// Phase 2: Watch for new blocks
console.log('Phase 2: Watching for new blocks...');
const controller = new AbortController();
// Handle graceful shutdown
process.on('SIGINT', () => {
console.log('\nShutting down...');
controller.abort();
});
try {
for await (const event of stream.watch({
signal: controller.signal,
include: 'receipts',
fromBlock: BACKFILL_TO + 1n,
})) {
if (event.type === 'reorg') {
// Handle chain reorganization
console.log(`Reorg detected! ${event.removed.length} blocks removed`);
// Remove transfers from re-orged blocks (newest to oldest)
for (const block of event.removed) {
const blockHash = Hex.fromBytes(block.hash);
const removed = removeTransfersByBlockHash(blockHash);
if (removed.length > 0) {
console.log(
` Removed ${removed.length} transfers from block ${block.number}`
);
}
}
// Add transfers from new chain (oldest to newest)
for (const block of event.added) {
const transfers = parseTransfers(block);
addTransfers(transfers);
if (transfers.length > 0) {
console.log(
` Added ${transfers.length} transfers from block ${block.header.number}`
);
}
}
console.log(`Reorg resolved. Total transfers: ${allTransfers.length}`);
} else {
// Normal new blocks
for (const block of event.blocks) {
const transfers = parseTransfers(block);
addTransfers(transfers);
console.log(
`Block ${block.header.number}: ${transfers.length} transfers ` +
`(total: ${allTransfers.length})`
);
}
}
}
} catch (error) {
if (error instanceof Error && error.name === 'AbortError') {
console.log('Indexer stopped');
} else {
throw error;
}
}
}
// Example usage with a provider
// const provider = { request: async ({ method, params }) => { ... } };
// runIndexer(provider);
Key Patterns
Computing Event Signatures
Use Keccak256 to compute the event topic hash:Copy
Ask AI
const TRANSFER_TOPIC = Hex.fromBytes(
Keccak256('Transfer(address,address,uint256)')
);
// 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef
Parsing Indexed Parameters
ERC-20 Transfer has two indexed address parameters stored in topics:Copy
Ask AI
// Topics layout:
// topic[0] = event signature hash
// topic[1] = from address (32 bytes, left-padded)
// topic[2] = to address (32 bytes, left-padded)
const fromBytes = log.topics[1].slice(12); // Remove 12-byte padding
const toBytes = log.topics[2].slice(12);
Handling Reorgs
BlockStream emits areorg event with both removed and added blocks:
Copy
Ask AI
if (event.type === 'reorg') {
// 1. Undo state for removed blocks (newest to oldest)
for (const block of event.removed) {
rollbackBlock(block);
}
// 2. Apply state for added blocks (oldest to newest)
for (const block of event.added) {
processBlock(block);
}
}
Graceful Shutdown
UseAbortController to cleanly stop the watch loop:
Copy
Ask AI
const controller = new AbortController();
process.on('SIGINT', () => controller.abort());
for await (const event of stream.watch({ signal: controller.signal })) {
// Process events...
}
Production Considerations
This example uses an in-memory database for simplicity. In production, you should:
- Use a persistent database (PostgreSQL, SQLite, etc.)
- Implement database transactions for atomicity during reorgs
- Add retry logic for RPC failures
- Implement checkpointing to resume from the last processed block
Database Transactions
Wrap reorg handling in a database transaction:Copy
Ask AI
if (event.type === 'reorg') {
await db.transaction(async (tx) => {
// Delete re-orged data
for (const block of event.removed) {
await tx.execute('DELETE FROM transfers WHERE block_hash = ?', [
Hex.fromBytes(block.hash)
]);
}
// Insert new data
for (const block of event.added) {
const transfers = parseTransfers(block);
for (const transfer of transfers) {
await tx.execute('INSERT INTO transfers ...', [...]);
}
}
});
}
Checkpointing
Save progress to resume after restarts:Copy
Ask AI
async function saveCheckpoint(blockNumber: bigint) {
await db.execute(
'UPDATE indexer_state SET last_block = ? WHERE id = 1',
[blockNumber.toString()]
);
}
async function loadCheckpoint(): Promise<bigint> {
const result = await db.query(
'SELECT last_block FROM indexer_state WHERE id = 1'
);
return BigInt(result.last_block ?? START_BLOCK);
}

