Skip to main content
Use batch operations for efficient bulk data processing. OnDB’s store() method accepts arrays of records, and for large-scale imports you can partition data into chunks.

Basic Batch Storage

Store multiple records in a single call:
TypeScript
const result = await client.store(
  {
    collection: 'tweets',
    data: [
      { message: 'Tweet 1', author: 'alice' },
      { message: 'Tweet 2', author: 'bob' },
      { message: 'Tweet 3', author: 'charlie' }
    ]
  },
  async (quote) => {
    const txHash = await processPayment(quote);
    return { txHash, network: quote.network, sender: walletAddress, chainType: quote.chainType, paymentMethod: 'native' };
  },
  true
);

console.log('Stored at block:', result.block_height);

Large Dataset Processing

For large datasets, chunk your data into manageable batches:
TypeScript
async function importLargeDataset(records: any[]) {
  const BATCH_SIZE = 100;
  const results = [];

  for (let i = 0; i < records.length; i += BATCH_SIZE) {
    const chunk = records.slice(i, i + BATCH_SIZE);

    const result = await client.store(
      { collection: 'data', data: chunk },
      paymentCallback,
      true
    );

    results.push(result);

    const progress = Math.min(i + BATCH_SIZE, records.length);
    console.log(`Progress: ${progress}/${records.length}`);
  }

  return results;
}

Error Handling in Batches

Retry with Exponential Backoff

TypeScript
async function storeWithRetry(
  collection: string,
  data: any[],
  maxRetries: number = 3,
  delay: number = 1000
) {
  let lastError: Error;

  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await client.store(
        { collection, data },
        paymentCallback,
        true
      );
    } catch (error) {
      lastError = error;

      if (error instanceof ValidationError) throw error;
      if (error instanceof PaymentRequiredError) throw error;

      console.log(`Attempt ${attempt} failed, retrying in ${delay}ms...`);
      await new Promise(resolve => setTimeout(resolve, delay));
      delay *= 2;
    }
  }

  throw lastError;
}

Distributed Bulk Operations

When processing large datasets across multiple OnDB client instances (e.g., multiple workers, microservices, or serverless functions), you need coordination for global progress tracking and deduplication.

Deduplication with Unique Indexes

Create a unique hash index on a natural key or a dedicated import_id field to prevent duplicate records when multiple workers process overlapping data:
const db = client.database('my-app');

// Unique index prevents duplicate inserts
await db.createIndex({
  name: 'idx_records_import_id',
  collection: 'records',
  field_name: 'import_id',
  index_type: 'hash',
  unique_constraint: true,
  store_values: true
});
With a unique index, duplicate records are rejected automatically. Workers can safely retry failed batches without risk of duplicates.

Multi-Worker Partitioning

Split work across multiple clients by partitioning the input dataset. Each worker handles a disjoint range:
import { createClient } from '@ondb/sdk';

interface WorkerConfig {
  workerId: string;
  totalWorkers: number;
  records: any[];
}

async function processPartition(config: WorkerConfig) {
  const client = createClient({
    endpoint: 'https://api.ondb.io',
    appId: 'my-app',
    appKey: process.env.ONDB_APP_KEY!,
  });

  // Each worker processes its assigned partition
  const partition = config.records.filter(
    (_, i) => i % config.totalWorkers === parseInt(config.workerId)
  );

  const BATCH_SIZE = 100;
  const results = [];

  for (let i = 0; i < partition.length; i += BATCH_SIZE) {
    const chunk = partition.slice(i, i + BATCH_SIZE);

    const result = await client.store(
      {
        collection: 'records',
        data: chunk.map(record => ({
          ...record,
          import_id: record.id,  // Natural key for deduplication
        }))
      },
      paymentCallback,
      true
    );

    results.push(result);

    // Report progress to coordination layer
    reportProgress(config.workerId, i + chunk.length, partition.length);
  }

  return results;
}

Global Progress Tracking

Use a shared coordination collection to track progress across all workers. Each worker writes its progress, and any client can query the aggregate state:
// Each worker reports progress to a shared collection
async function reportProgress(
  workerId: string,
  completed: number,
  total: number
) {
  await client.store({
    collection: 'import_progress',
    data: [{
      worker_id: workerId,
      job_id: 'import-2026-03-16',
      completed,
      total,
      updated_at: new Date().toISOString(),
    }]
  });
}

// Query global progress from any client
async function getGlobalProgress(jobId: string) {
  const progress = await client.queryBuilder()
    .collection('import_progress')
    .whereField('job_id').equals(jobId)
    .execute();

  const workers = progress.records;
  const globalCompleted = workers.reduce((sum, w) => sum + w.completed, 0);
  const globalTotal = workers.reduce((sum, w) => sum + w.total, 0);
  const percent = Math.round((globalCompleted / globalTotal) * 100);

  console.log(`Global progress: ${percent}% (${globalCompleted}/${globalTotal})`);
  console.log(`Active workers: ${workers.length}`);
  return { globalCompleted, globalTotal, percent, workers };
}

Using Sharding for Distributed Writes

For high-volume distributed ingestion, combine sharding with multi-worker writes so each worker targets a specific shard, avoiding write contention:
// Shard by tenant_id -- each worker handles specific tenants
await client.syncCollection({
  name: 'events',
  fields: {
    tenant_id: { type: 'string', index: true },
    event_type: { type: 'string', index: true },
    timestamp: { type: 'date', index: true },
    payload: { type: 'string' },
  },
  sharding: {
    keys: [
      { field: 'tenant_id', type: 'discrete' },
    ],
    enforce_in_queries: true,
  },
});

// Worker 1 handles tenants A-M, Worker 2 handles N-Z
// Each targets a different shard -- no contention
See Sharding for full configuration options.

Concurrency Tuning

When running multiple workers, adjust concurrency based on your needs:
WorkersUse Case
1-3Rate-limited APIs, careful processing
5-10Standard batch operations
10-20High-throughput scenarios

Next Steps

Sharding

Partition collections for distributed writes

Task Management

Track async operations

Transaction Tracking

Monitor transaction status

Collections & Indexes

Unique indexes for deduplication