Skip to main content
Source: batch.ts • Tests: batch.test.ts
This page is a placeholder. All examples on this page are currently AI-generated and are not correct. This documentation will be completed in the future with accurate, tested examples.

Batch Processing

Automatically batch similar operations for efficiency. Essential for optimizing RPC calls, reducing network overhead, and managing concurrent operations in Ethereum applications.

Overview

Two batching strategies:
  • BatchQueue: Automatically batch similar operations by size and time
  • AsyncQueue: Process operations with concurrency limit

BatchQueue

Automatically batch items when:
  • Batch size reaches maxBatchSize
  • maxWaitTime elapses since first item added
All items eventually processed, results returned individually.

Basic Usage

import { BatchQueue } from '@tevm/voltaire/utils';

const queue = new BatchQueue({
  maxBatchSize: 50,
  maxWaitTime: 100,
  processBatch: async (addresses) => {
    return Promise.all(
      addresses.map(addr => provider.eth_getBalance(addr))
    );
  }
});

// Add items - automatically batched
const balance1 = queue.add('0x123...');
const balance2 = queue.add('0x456...');
const balance3 = queue.add('0x789...');

// Results returned individually
console.log(await balance1); // Balance for 0x123...
console.log(await balance2); // Balance for 0x456...
console.log(await balance3); // Balance for 0x789...

Configuration

interface BatchQueueOptions<T, R> {
  maxBatchSize: number;                          // Max items per batch
  maxWaitTime: number;                           // Max wait in ms
  processBatch: (items: T[]) => Promise<R[]>;   // Batch processor
  onError?: (error: unknown, items: T[]) => void;
}

createBatchedFunction

Create batched version of function:
import { createBatchedFunction } from '@tevm/voltaire/utils';

const getBalance = createBatchedFunction(
  async (addresses: string[]) => {
    return Promise.all(
      addresses.map(addr => provider.eth_getBalance(addr))
    );
  },
  50,   // Batch up to 50 addresses
  100   // Wait max 100ms
);

// Use like normal function - batching automatic
const balance1 = getBalance('0x123...');
const balance2 = getBalance('0x456...');
const balance3 = getBalance('0x789...');

// All three batched into single call
console.log(await balance1);
console.log(await balance2);
console.log(await balance3);

AsyncQueue

Process items with concurrency limit.

Basic Usage

import { AsyncQueue } from '@tevm/voltaire/utils';

const processor = new AsyncQueue<string, Balance>(
  async (address) => provider.eth_getBalance(address),
  { concurrency: 3 }
);

// Add items
const results = await Promise.all([
  processor.add('0x123...'),
  processor.add('0x456...'),
  processor.add('0x789...'),
  processor.add('0xabc...'),
  processor.add('0xdef...'),
]);

// Only 3 execute concurrently, others wait

Monitor Queue

const processor = new AsyncQueue(
  processItem,
  { concurrency: 5 }
);

// Check status
console.log(`Queue: ${processor.size()}`);
console.log(`Active: ${processor.activeCount()}`);

// Wait for completion
await processor.drain();

Real-World Examples

Batch Balance Lookups

Efficiently fetch multiple balances:
const balanceQueue = new BatchQueue({
  maxBatchSize: 100,
  maxWaitTime: 50,
  processBatch: async (addresses: string[]) => {
    // Use multicall or parallel requests
    return Promise.all(
      addresses.map(addr => provider.eth_getBalance(addr))
    );
  }
});

// Fetch many balances
const addresses = [...]; // 1000 addresses
const balances = await Promise.all(
  addresses.map(addr => balanceQueue.add(addr))
);

Batch Contract Calls

Batch eth_call operations:
import { Abi } from '@tevm/voltaire/Abi';

const abi = [
  {
    type: 'function',
    name: 'balanceOf',
    inputs: [{ type: 'address' }],
    outputs: [{ type: 'uint256' }]
  }
] as const;

const callQueue = new BatchQueue({
  maxBatchSize: 50,
  maxWaitTime: 100,
  processBatch: async (addresses: string[]) => {
    return Promise.all(
      addresses.map(async (addr) => {
        const data = Abi.Function.encodeParams(abi[0], [addr]);
        const result = await provider.eth_call({
          to: tokenAddress,
          data
        });
        return Abi.Function.decodeResult(abi[0], result)[0];
      })
    );
  }
});

const balances = await Promise.all(
  holders.map(addr => callQueue.add(addr))
);

Rate-Limited Batching

Combine batching with rate limiting:
import { BatchQueue, RateLimiter } from '@tevm/voltaire/utils';

const limiter = new RateLimiter({
  maxRequests: 10,
  interval: 1000
});

const queue = new BatchQueue({
  maxBatchSize: 50,
  maxWaitTime: 100,
  processBatch: async (addresses) => {
    // Rate limit the batch
    return limiter.execute(() =>
      Promise.all(
        addresses.map(addr => provider.eth_getBalance(addr))
      )
    );
  }
});

Concurrency-Limited Processing

Process items with max concurrency:
const processor = new AsyncQueue<Transaction, Receipt>(
  async (tx) => {
    const signedTx = await wallet.signTransaction(tx);
    const txHash = await provider.eth_sendRawTransaction(signedTx);
    return pollForReceipt(txHash, provider.eth_getTransactionReceipt);
  },
  { concurrency: 3 } // Max 3 concurrent transactions
);

// Process many transactions
const receipts = await Promise.all(
  transactions.map(tx => processor.add(tx))
);

Error Handling

Handle batch errors gracefully:
const queue = new BatchQueue({
  maxBatchSize: 50,
  maxWaitTime: 100,
  processBatch: async (addresses) => {
    try {
      return await Promise.all(
        addresses.map(addr => provider.eth_getBalance(addr))
      );
    } catch (error) {
      // Retry or return defaults
      return addresses.map(() => 0n);
    }
  },
  onError: (error, items) => {
    console.error(`Batch failed for ${items.length} items:`, error);
  }
});

Heterogeneous Batching

Batch different operations:
type Operation =
  | { type: 'balance'; address: string }
  | { type: 'code'; address: string }
  | { type: 'nonce'; address: string };

const queue = new BatchQueue<Operation, any>({
  maxBatchSize: 100,
  maxWaitTime: 50,
  processBatch: async (operations) => {
    // Group by type
    const balances = operations.filter(op => op.type === 'balance');
    const codes = operations.filter(op => op.type === 'code');
    const nonces = operations.filter(op => op.type === 'nonce');

    // Batch each type
    const [balanceResults, codeResults, nonceResults] = await Promise.all([
      Promise.all(balances.map(op => provider.eth_getBalance(op.address))),
      Promise.all(codes.map(op => provider.eth_getCode(op.address))),
      Promise.all(nonces.map(op => provider.eth_getTransactionCount(op.address)))
    ]);

    // Reconstruct results in original order
    let bi = 0, ci = 0, ni = 0;
    return operations.map(op => {
      if (op.type === 'balance') return balanceResults[bi++];
      if (op.type === 'code') return codeResults[ci++];
      return nonceResults[ni++];
    });
  }
});

Manual Queue Control

Flush Queue

Force immediate processing:
const queue = new BatchQueue({
  maxBatchSize: 100,
  maxWaitTime: 1000,
  processBatch: async (items) => { ... }
});

// Add items
queue.add(item1);
queue.add(item2);

// Force flush immediately
await queue.flush();

Check Queue Size

console.log(`Pending items: ${queue.size()}`);

Clear Queue

queue.clear(); // Rejects all pending items

Wait for Completion

// Add items
queue.add(item1);
queue.add(item2);
queue.add(item3);

// Wait for all to complete
await queue.drain();

Batching Strategies

Time-Based Batching

Batch by time window:
const queue = new BatchQueue({
  maxBatchSize: 1000,  // High limit
  maxWaitTime: 100,    // 100ms window
  processBatch: async (items) => { ... }
});

// Items batch every 100ms regardless of count

Size-Based Batching

Batch by size:
const queue = new BatchQueue({
  maxBatchSize: 50,    // 50 items
  maxWaitTime: 60000,  // 1 minute max wait
  processBatch: async (items) => { ... }
});

// Flushes when 50 items accumulated

Hybrid Batching

Balance time and size:
const queue = new BatchQueue({
  maxBatchSize: 100,   // Max 100 items
  maxWaitTime: 100,    // Max 100ms wait
  processBatch: async (items) => { ... }
});

// Flushes on either condition

Performance Considerations

Batch Size

  • Too small: More overhead, less benefit
  • Too large: Higher latency, memory usage
  • Optimal: 20-100 items for most RPC calls

Wait Time

  • Too short: Small batches, less efficient
  • Too long: High latency for users
  • Optimal: 50-200ms for most applications

Concurrency

  • Too low: Underutilized resources
  • Too high: Rate limiting, resource exhaustion
  • Optimal: 3-10 concurrent operations

Combining with Other Utils

Batch + Rate Limit

import { BatchQueue, RateLimiter } from '@tevm/voltaire/utils';

const limiter = new RateLimiter({
  maxRequests: 10,
  interval: 1000
});

const queue = new BatchQueue({
  maxBatchSize: 50,
  maxWaitTime: 100,
  processBatch: (items) => limiter.execute(() => processBatch(items))
});

Batch + Retry

import { BatchQueue, retryWithBackoff } from '@tevm/voltaire/utils';

const queue = new BatchQueue({
  maxBatchSize: 50,
  maxWaitTime: 100,
  processBatch: (items) => retryWithBackoff(
    () => processBatch(items),
    { maxRetries: 3 }
  )
});

Batch + Timeout

import { BatchQueue, withTimeout } from '@tevm/voltaire/utils';

const queue = new BatchQueue({
  maxBatchSize: 50,
  maxWaitTime: 100,
  processBatch: (items) => withTimeout(
    processBatch(items),
    { ms: 10000 }
  )
});

Best Practices

Choose Appropriate Batch Sizes

  • Balance lookups: 50-100 addresses
  • Contract calls: 20-50 calls
  • Transaction submission: 5-10 transactions
  • Log queries: 10-20 queries

Monitor Queue Performance

const queue = new BatchQueue({
  maxBatchSize: 50,
  maxWaitTime: 100,
  processBatch: async (items) => {
    const start = Date.now();
    const results = await processBatch(items);
    const duration = Date.now() - start;

    console.log(`Batch: ${items.length} items in ${duration}ms`);
    return results;
  }
});

Handle Partial Failures

const queue = new BatchQueue({
  maxBatchSize: 50,
  maxWaitTime: 100,
  processBatch: async (items) => {
    const results = await Promise.allSettled(
      items.map(item => processItem(item))
    );

    return results.map((result, i) => {
      if (result.status === 'fulfilled') {
        return result.value;
      } else {
        console.error(`Item ${i} failed:`, result.reason);
        return null; // Or default value
      }
    });
  }
});

API Reference

BatchQueue

class BatchQueue<T, R> {
  constructor(options: BatchQueueOptions<T, R>)
  add(item: T): Promise<R>
  flush(): Promise<void>
  size(): number
  clear(): void
  drain(): Promise<void>
}

createBatchedFunction

function createBatchedFunction<T, R>(
  fn: (items: T[]) => Promise<R[]>,
  maxBatchSize: number,
  maxWaitTime: number
): (item: T) => Promise<R>

AsyncQueue

class AsyncQueue<T, R> {
  constructor(
    processFn: (item: T) => Promise<R>,
    options: { concurrency: number }
  )
  add(item: T): Promise<R>
  size(): number
  activeCount(): number
  drain(): Promise<void>
}

See Also