Skip to main content

Overview

BlockStream provides robust block streaming with reorg detection. Inspired by Reth’s ExEx pattern, it tracks the chain state and emits events when blocks are added or removed due to reorganizations.
import { BlockStream } from '@voltaire/block'

const stream = BlockStream({ provider })

// Watch for new blocks with reorg handling
for await (const event of stream.watch()) {
  if (event.type === 'reorg') {
    for (const block of event.removed) {
      rollback(block.hash)
    }
  } else {
    for (const block of event.blocks) {
      process(block)
    }
  }
}

API

Factory

function BlockStream(options: {
  provider: TypedProvider
}): BlockStreamInstance
Returns an instance with backfill() and watch() async generators.

backfill

Stream historical blocks within a range.
for await (const { blocks, metadata } of stream.backfill({
  fromBlock: 18000000n,
  toBlock: 19000000n,
  include: 'transactions'
})) {
  for (const block of blocks) {
    indexBlock(block)
  }
}
Options:
  • fromBlock - Start block (inclusive, required)
  • toBlock - End block (inclusive, required)
  • include - Content level: 'header' | 'transactions' | 'receipts'
  • chunkSize - Blocks per batch (default: 100)
  • signal - AbortSignal for cancellation
  • retry - Retry configuration

watch

Stream new blocks with reorg detection.
const controller = new AbortController()

for await (const event of stream.watch({
  signal: controller.signal,
  include: 'receipts'
})) {
  switch (event.type) {
    case 'blocks':
      for (const block of event.blocks) {
        processBlock(block)
      }
      break
    case 'reorg':
      // Undo removed blocks (newest first)
      for (const block of event.removed) {
        rollbackBlock(block.hash)
      }
      // Added blocks will come in subsequent events
      break
  }
}
Options:
  • fromBlock - Start watching from this block (default: current)
  • include - Content level: 'header' | 'transactions' | 'receipts'
  • pollingInterval - Poll interval in ms (default: 1000)
  • signal - AbortSignal for cancellation
  • retry - Retry configuration

Event Types

BlocksEvent

New canonical blocks added to the chain.
type BlocksEvent<TInclude> = {
  type: 'blocks'
  blocks: StreamBlock<TInclude>[]
  metadata: { chainHead: bigint }
}

ReorgEvent

Chain reorganization detected.
type ReorgEvent<TInclude> = {
  type: 'reorg'
  removed: LightBlock[]           // Blocks removed (newest first)
  added: StreamBlock<TInclude>[]  // New canonical blocks (oldest first)
  commonAncestor: LightBlock      // Last block before divergence
  metadata: { chainHead: bigint }
}

LightBlock

Minimal block info for reorg tracking.
type LightBlock = {
  number: bigint
  hash: BlockHashType
  parentHash: BlockHashType
  timestamp: bigint
}

Include Levels

Control how much block data to fetch:
LevelDataUse Case
'header'Block headers onlyChain monitoring
'transactions'Headers + full transactionsTransaction indexing
'receipts'Headers + transactions + receiptsEvent indexing

Reorg Detection

BlockStream tracks the parent hash chain to detect reorganizations:
  1. Block number regression - Block number goes backward
  2. Parent hash mismatch - New block’s parent doesn’t match last known block
  3. Deep reorg - Reorg extends beyond tracked history (throws UnrecoverableReorgError)
// Handle reorgs properly
for await (const event of stream.watch()) {
  if (event.type === 'reorg') {
    console.log(`Reorg detected: ${event.removed.length} blocks removed`)
    console.log(`Common ancestor: block ${event.commonAncestor.number}`)

    // Roll back state for removed blocks
    for (const block of event.removed) {
      await db.revertBlock(block.hash)
    }
  }
}

Consumer-Side Finality

BlockStream doesn’t track finality internally. Implement finality tracking yourself:
const FINALITY_DEPTH = 64n // ~13 minutes on mainnet

const finalizedBlocks = new Map<bigint, LightBlock>()
const pendingBlocks: LightBlock[] = []

for await (const event of stream.watch()) {
  if (event.type === 'blocks') {
    for (const block of event.blocks) {
      pendingBlocks.push(toLightBlock(block))

      // Check if oldest pending block is now final
      const chainHead = event.metadata.chainHead
      while (pendingBlocks.length > 0) {
        const oldest = pendingBlocks[0]
        if (chainHead - oldest.number >= FINALITY_DEPTH) {
          finalizedBlocks.set(oldest.number, oldest)
          pendingBlocks.shift()
          // Safe to archive/compact finalized data
        } else {
          break
        }
      }
    }
  }
}

Error Handling

import {
  BlockStreamAbortedError,
  UnrecoverableReorgError
} from '@voltaire/block'

try {
  for await (const event of stream.watch({ signal })) {
    // ...
  }
} catch (error) {
  if (error instanceof BlockStreamAbortedError) {
    console.log('Stream aborted')
  } else if (error instanceof UnrecoverableReorgError) {
    console.log('Deep reorg beyond tracked history, resync required')
  } else {
    throw error
  }
}

See Also