Documentation Index
Fetch the complete documentation index at: https://voltaire.tevm.sh/llms.txt
Use this file to discover all available pages before exploring further.
Overview
BlockStream provides robust block streaming with reorg detection. Inspired by Reth’s ExEx pattern, it tracks the chain state and emits events when blocks are added or removed due to reorganizations.
import { BlockStream } from '@voltaire/block'
const stream = BlockStream({ provider })
// Watch for new blocks with reorg handling
for await (const event of stream.watch()) {
if (event.type === 'reorg') {
for (const block of event.removed) {
rollback(block.hash)
}
} else {
for (const block of event.blocks) {
process(block)
}
}
}
API
Factory
function BlockStream(options: {
provider: TypedProvider
}): BlockStreamInstance
Returns an instance with backfill() and watch() async generators.
backfill
Stream historical blocks within a range.
for await (const { blocks, metadata } of stream.backfill({
fromBlock: 18000000n,
toBlock: 19000000n,
include: 'transactions'
})) {
for (const block of blocks) {
indexBlock(block)
}
}
Options:
fromBlock - Start block (inclusive, required)
toBlock - End block (inclusive, required)
include - Content level: 'header' | 'transactions' | 'receipts'
chunkSize - Blocks per batch (default: 100)
signal - AbortSignal for cancellation
retry - Retry configuration
watch
Stream new blocks with reorg detection.
const controller = new AbortController()
for await (const event of stream.watch({
signal: controller.signal,
include: 'receipts'
})) {
switch (event.type) {
case 'blocks':
for (const block of event.blocks) {
processBlock(block)
}
break
case 'reorg':
// Undo removed blocks (newest first)
for (const block of event.removed) {
rollbackBlock(block.hash)
}
// Added blocks will come in subsequent events
break
}
}
Options:
fromBlock - Start watching from this block (default: current)
include - Content level: 'header' | 'transactions' | 'receipts'
pollingInterval - Poll interval in ms (default: 1000)
signal - AbortSignal for cancellation
retry - Retry configuration
Event Types
BlocksEvent
New canonical blocks added to the chain.
type BlocksEvent<TInclude> = {
type: 'blocks'
blocks: StreamBlock<TInclude>[]
metadata: { chainHead: bigint }
}
ReorgEvent
Chain reorganization detected.
type ReorgEvent<TInclude> = {
type: 'reorg'
removed: LightBlock[] // Blocks removed (newest first)
added: StreamBlock<TInclude>[] // New canonical blocks (oldest first)
commonAncestor: LightBlock // Last block before divergence
metadata: { chainHead: bigint }
}
LightBlock
Minimal block info for reorg tracking.
type LightBlock = {
number: bigint
hash: BlockHashType
parentHash: BlockHashType
timestamp: bigint
}
Include Levels
Control how much block data to fetch:
| Level | Data | Use Case |
|---|
'header' | Block headers only | Chain monitoring |
'transactions' | Headers + full transactions | Transaction indexing |
'receipts' | Headers + transactions + receipts | Event indexing |
Reorg Detection
BlockStream tracks the parent hash chain to detect reorganizations:
- Block number regression - Block number goes backward
- Parent hash mismatch - New block’s parent doesn’t match last known block
- Deep reorg - Reorg extends beyond tracked history (throws
UnrecoverableReorgError)
// Handle reorgs properly
for await (const event of stream.watch()) {
if (event.type === 'reorg') {
console.log(`Reorg detected: ${event.removed.length} blocks removed`)
console.log(`Common ancestor: block ${event.commonAncestor.number}`)
// Roll back state for removed blocks
for (const block of event.removed) {
await db.revertBlock(block.hash)
}
}
}
Consumer-Side Finality
BlockStream doesn’t track finality internally. Implement finality tracking yourself:
const FINALITY_DEPTH = 64n // ~13 minutes on mainnet
const finalizedBlocks = new Map<bigint, LightBlock>()
const pendingBlocks: LightBlock[] = []
for await (const event of stream.watch()) {
if (event.type === 'blocks') {
for (const block of event.blocks) {
pendingBlocks.push(toLightBlock(block))
// Check if oldest pending block is now final
const chainHead = event.metadata.chainHead
while (pendingBlocks.length > 0) {
const oldest = pendingBlocks[0]
if (chainHead - oldest.number >= FINALITY_DEPTH) {
finalizedBlocks.set(oldest.number, oldest)
pendingBlocks.shift()
// Safe to archive/compact finalized data
} else {
break
}
}
}
}
}
Error Handling
import {
BlockStreamAbortedError,
UnrecoverableReorgError
} from '@voltaire/block'
try {
for await (const event of stream.watch({ signal })) {
// ...
}
} catch (error) {
if (error instanceof BlockStreamAbortedError) {
console.log('Stream aborted')
} else if (error instanceof UnrecoverableReorgError) {
console.log('Deep reorg beyond tracked history, resync required')
} else {
throw error
}
}
See Also