Skip to main content
Build a production-ready ERC-20 transfer indexer that handles chain reorganizations correctly.

Overview

This example demonstrates building an indexer that:
  • Backfills historical Transfer events from a specific block range
  • Watches for new blocks in real-time
  • Handles chain reorgs by removing re-orged transfers and adding new ones
  • Parses ERC-20 Transfer events from transaction receipts

Full Example

import { BlockStream } from '@tevm/voltaire/BlockStream';
import { Keccak256 } from '@tevm/voltaire/Keccak256';
import { Hex } from '@tevm/voltaire/Hex';
import { Address } from '@tevm/voltaire/Address';

// ERC-20 Transfer(address indexed from, address indexed to, uint256 value)
const TRANSFER_TOPIC = Hex.fromBytes(Keccak256('Transfer(address,address,uint256)'));

// Simple in-memory database
interface Transfer {
  blockNumber: bigint;
  blockHash: string;
  transactionHash: string;
  logIndex: number;
  from: string;
  to: string;
  value: bigint;
  tokenAddress: string;
}

const transfersByBlock = new Map<string, Transfer[]>();
const allTransfers: Transfer[] = [];

/**
 * Parse ERC-20 Transfer events from a block's receipts
 */
function parseTransfers(block: {
  header: { number: bigint; hash: Uint8Array };
  receipts: readonly {
    transactionHash: Uint8Array;
    logs: readonly {
      address: Uint8Array;
      topics: readonly Uint8Array[];
      data: Uint8Array;
      logIndex?: number;
    }[]
  }[]
}): Transfer[] {
  const transfers: Transfer[] = [];
  const blockHash = Hex.fromBytes(block.header.hash);

  for (const receipt of block.receipts) {
    for (const log of receipt.logs) {
      // Check if this is a Transfer event (topic0 matches)
      if (log.topics.length < 3) continue;

      const topic0 = Hex.fromBytes(log.topics[0]);
      if (topic0 !== TRANSFER_TOPIC) continue;

      // Parse indexed parameters from topics
      // topic1 = from (address, padded to 32 bytes)
      // topic2 = to (address, padded to 32 bytes)
      const fromBytes = log.topics[1].slice(12); // Remove 12-byte padding
      const toBytes = log.topics[2].slice(12);

      // Parse value from data (uint256)
      const value = Hex.toBigInt(Hex.fromBytes(log.data));

      transfers.push({
        blockNumber: block.header.number,
        blockHash,
        transactionHash: Hex.fromBytes(receipt.transactionHash),
        logIndex: log.logIndex ?? 0,
        from: Address.toChecksummed(Address.fromBytes(fromBytes)),
        to: Address.toChecksummed(Address.fromBytes(toBytes)),
        value,
        tokenAddress: Address.toChecksummed(log.address),
      });
    }
  }

  return transfers;
}

/**
 * Add transfers to the database
 */
function addTransfers(transfers: Transfer[]) {
  for (const transfer of transfers) {
    allTransfers.push(transfer);

    const existing = transfersByBlock.get(transfer.blockHash) ?? [];
    existing.push(transfer);
    transfersByBlock.set(transfer.blockHash, existing);
  }
}

/**
 * Remove transfers from a re-orged block
 */
function removeTransfersByBlockHash(blockHash: string) {
  const removed = transfersByBlock.get(blockHash) ?? [];
  transfersByBlock.delete(blockHash);

  // Remove from allTransfers array
  for (const transfer of removed) {
    const idx = allTransfers.findIndex(
      t => t.blockHash === transfer.blockHash &&
           t.transactionHash === transfer.transactionHash &&
           t.logIndex === transfer.logIndex
    );
    if (idx !== -1) {
      allTransfers.splice(idx, 1);
    }
  }

  return removed;
}

/**
 * Run the indexer
 */
async function runIndexer(provider: any) {
  const stream = BlockStream({ provider });

  // Configuration
  const START_BLOCK = 18000000n;
  const BACKFILL_TO = 18001000n; // Backfill first 1000 blocks

  console.log(`Starting indexer from block ${START_BLOCK}`);

  // Phase 1: Backfill historical blocks
  console.log('Phase 1: Backfilling historical blocks...');

  for await (const event of stream.backfill({
    fromBlock: START_BLOCK,
    toBlock: BACKFILL_TO,
    include: 'receipts',
  })) {
    for (const block of event.blocks) {
      const transfers = parseTransfers(block);
      addTransfers(transfers);

      if (transfers.length > 0) {
        console.log(
          `Block ${block.header.number}: ${transfers.length} transfers`
        );
      }
    }
  }

  console.log(`Backfill complete. Total transfers: ${allTransfers.length}`);

  // Phase 2: Watch for new blocks
  console.log('Phase 2: Watching for new blocks...');

  const controller = new AbortController();

  // Handle graceful shutdown
  process.on('SIGINT', () => {
    console.log('\nShutting down...');
    controller.abort();
  });

  try {
    for await (const event of stream.watch({
      signal: controller.signal,
      include: 'receipts',
      fromBlock: BACKFILL_TO + 1n,
    })) {
      if (event.type === 'reorg') {
        // Handle chain reorganization
        console.log(`Reorg detected! ${event.removed.length} blocks removed`);

        // Remove transfers from re-orged blocks (newest to oldest)
        for (const block of event.removed) {
          const blockHash = Hex.fromBytes(block.hash);
          const removed = removeTransfersByBlockHash(blockHash);
          if (removed.length > 0) {
            console.log(
              `  Removed ${removed.length} transfers from block ${block.number}`
            );
          }
        }

        // Add transfers from new chain (oldest to newest)
        for (const block of event.added) {
          const transfers = parseTransfers(block);
          addTransfers(transfers);
          if (transfers.length > 0) {
            console.log(
              `  Added ${transfers.length} transfers from block ${block.header.number}`
            );
          }
        }

        console.log(`Reorg resolved. Total transfers: ${allTransfers.length}`);
      } else {
        // Normal new blocks
        for (const block of event.blocks) {
          const transfers = parseTransfers(block);
          addTransfers(transfers);

          console.log(
            `Block ${block.header.number}: ${transfers.length} transfers ` +
            `(total: ${allTransfers.length})`
          );
        }
      }
    }
  } catch (error) {
    if (error instanceof Error && error.name === 'AbortError') {
      console.log('Indexer stopped');
    } else {
      throw error;
    }
  }
}

// Example usage with a provider
// const provider = { request: async ({ method, params }) => { ... } };
// runIndexer(provider);

Key Patterns

Computing Event Signatures

Use Keccak256 to compute the event topic hash:
const TRANSFER_TOPIC = Hex.fromBytes(
  Keccak256('Transfer(address,address,uint256)')
);
// 0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef

Parsing Indexed Parameters

ERC-20 Transfer has two indexed address parameters stored in topics:
// Topics layout:
// topic[0] = event signature hash
// topic[1] = from address (32 bytes, left-padded)
// topic[2] = to address (32 bytes, left-padded)
const fromBytes = log.topics[1].slice(12); // Remove 12-byte padding
const toBytes = log.topics[2].slice(12);

Handling Reorgs

BlockStream emits a reorg event with both removed and added blocks:
if (event.type === 'reorg') {
  // 1. Undo state for removed blocks (newest to oldest)
  for (const block of event.removed) {
    rollbackBlock(block);
  }

  // 2. Apply state for added blocks (oldest to newest)
  for (const block of event.added) {
    processBlock(block);
  }
}

Graceful Shutdown

Use AbortController to cleanly stop the watch loop:
const controller = new AbortController();

process.on('SIGINT', () => controller.abort());

for await (const event of stream.watch({ signal: controller.signal })) {
  // Process events...
}

Production Considerations

This example uses an in-memory database for simplicity. In production, you should:
  • Use a persistent database (PostgreSQL, SQLite, etc.)
  • Implement database transactions for atomicity during reorgs
  • Add retry logic for RPC failures
  • Implement checkpointing to resume from the last processed block

Database Transactions

Wrap reorg handling in a database transaction:
if (event.type === 'reorg') {
  await db.transaction(async (tx) => {
    // Delete re-orged data
    for (const block of event.removed) {
      await tx.execute('DELETE FROM transfers WHERE block_hash = ?', [
        Hex.fromBytes(block.hash)
      ]);
    }

    // Insert new data
    for (const block of event.added) {
      const transfers = parseTransfers(block);
      for (const transfer of transfers) {
        await tx.execute('INSERT INTO transfers ...', [...]);
      }
    }
  });
}

Checkpointing

Save progress to resume after restarts:
async function saveCheckpoint(blockNumber: bigint) {
  await db.execute(
    'UPDATE indexer_state SET last_block = ? WHERE id = 1',
    [blockNumber.toString()]
  );
}

async function loadCheckpoint(): Promise<bigint> {
  const result = await db.query(
    'SELECT last_block FROM indexer_state WHERE id = 1'
  );
  return BigInt(result.last_block ?? START_BLOCK);
}