> ## Documentation Index
> Fetch the complete documentation index at: https://docs.ondb.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Batch Operations

> Bulk data operations with progress tracking, distributed coordination, and deduplication

Use batch operations for efficient bulk data processing. OnDB's `store()` method accepts arrays of records, and for large-scale imports you can partition data into chunks.

## Basic Batch Storage

Store multiple records in a single call:

```typescript TypeScript theme={null}
const result = await client.store(
  {
    collection: 'tweets',
    data: [
      { message: 'Tweet 1', author: 'alice' },
      { message: 'Tweet 2', author: 'bob' },
      { message: 'Tweet 3', author: 'charlie' }
    ]
  },
  async (quote) => {
    const txHash = await processPayment(quote);
    return { txHash, network: quote.network, sender: walletAddress, chainType: quote.chainType, paymentMethod: 'native' };
  },
  true
);

console.log('Stored at block:', result.block_height);
```

## Large Dataset Processing

For large datasets, chunk your data into manageable batches:

```typescript TypeScript theme={null}
async function importLargeDataset(records: any[]) {
  const BATCH_SIZE = 100;
  const results = [];

  for (let i = 0; i < records.length; i += BATCH_SIZE) {
    const chunk = records.slice(i, i + BATCH_SIZE);

    const result = await client.store(
      { collection: 'data', data: chunk },
      paymentCallback,
      true
    );

    results.push(result);

    const progress = Math.min(i + BATCH_SIZE, records.length);
    console.log(`Progress: ${progress}/${records.length}`);
  }

  return results;
}
```

## Error Handling in Batches

### Retry with Exponential Backoff

```typescript TypeScript theme={null}
async function storeWithRetry(
  collection: string,
  data: any[],
  maxRetries: number = 3,
  delay: number = 1000
) {
  let lastError: Error;

  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await client.store(
        { collection, data },
        paymentCallback,
        true
      );
    } catch (error) {
      lastError = error;

      if (error instanceof ValidationError) throw error;
      if (error instanceof PaymentRequiredError) throw error;

      console.log(`Attempt ${attempt} failed, retrying in ${delay}ms...`);
      await new Promise(resolve => setTimeout(resolve, delay));
      delay *= 2;
    }
  }

  throw lastError;
}
```

## Distributed Bulk Operations

When processing large datasets across multiple OnDB client instances (e.g., multiple workers, microservices, or serverless functions), you need coordination for global progress tracking and deduplication.

### Deduplication with Unique Indexes

Create a unique hash index on a natural key or a dedicated `import_id` field to prevent duplicate records when multiple workers process overlapping data:

```typescript theme={null}
const db = client.database('my-app');

// Unique index prevents duplicate inserts
await db.createIndex({
  name: 'idx_records_import_id',
  collection: 'records',
  field_name: 'import_id',
  index_type: 'hash',
  unique_constraint: true,
  store_values: true
});
```

With a unique index, duplicate records are rejected automatically. Workers can safely retry failed batches without risk of duplicates.

### Multi-Worker Partitioning

Split work across multiple clients by partitioning the input dataset. Each worker handles a disjoint range:

```typescript theme={null}
import { createClient } from '@ondb/sdk';

interface WorkerConfig {
  workerId: string;
  totalWorkers: number;
  records: any[];
}

async function processPartition(config: WorkerConfig) {
  const client = createClient({
    endpoint: 'https://api.ondb.io',
    appId: 'my-app',
    appKey: process.env.ONDB_APP_KEY!,
  });

  // Each worker processes its assigned partition
  const partition = config.records.filter(
    (_, i) => i % config.totalWorkers === parseInt(config.workerId)
  );

  const BATCH_SIZE = 100;
  const results = [];

  for (let i = 0; i < partition.length; i += BATCH_SIZE) {
    const chunk = partition.slice(i, i + BATCH_SIZE);

    const result = await client.store(
      {
        collection: 'records',
        data: chunk.map(record => ({
          ...record,
          import_id: record.id,  // Natural key for deduplication
        }))
      },
      paymentCallback,
      true
    );

    results.push(result);

    // Report progress to coordination layer
    reportProgress(config.workerId, i + chunk.length, partition.length);
  }

  return results;
}
```

### Global Progress Tracking

Use a shared coordination collection to track progress across all workers. Each worker writes its progress, and any client can query the aggregate state:

```typescript theme={null}
// Each worker reports progress to a shared collection
async function reportProgress(
  workerId: string,
  completed: number,
  total: number
) {
  await client.store({
    collection: 'import_progress',
    data: [{
      worker_id: workerId,
      job_id: 'import-2026-03-16',
      completed,
      total,
      updated_at: new Date().toISOString(),
    }]
  });
}

// Query global progress from any client
async function getGlobalProgress(jobId: string) {
  const progress = await client.queryBuilder()
    .collection('import_progress')
    .whereField('job_id').equals(jobId)
    .execute();

  const workers = progress.records;
  const globalCompleted = workers.reduce((sum, w) => sum + w.completed, 0);
  const globalTotal = workers.reduce((sum, w) => sum + w.total, 0);
  const percent = Math.round((globalCompleted / globalTotal) * 100);

  console.log(`Global progress: ${percent}% (${globalCompleted}/${globalTotal})`);
  console.log(`Active workers: ${workers.length}`);
  return { globalCompleted, globalTotal, percent, workers };
}
```

### Using Sharding for Distributed Writes

For high-volume distributed ingestion, combine sharding with multi-worker writes so each worker targets a specific shard, avoiding write contention:

```typescript theme={null}
// Shard by tenant_id -- each worker handles specific tenants
await client.syncCollection({
  name: 'events',
  fields: {
    tenant_id: { type: 'string', index: true },
    event_type: { type: 'string', index: true },
    timestamp: { type: 'date', index: true },
    payload: { type: 'string' },
  },
  sharding: {
    keys: [
      { field: 'tenant_id', type: 'discrete' },
    ],
    enforce_in_queries: true,
  },
});

// Worker 1 handles tenants A-M, Worker 2 handles N-Z
// Each targets a different shard -- no contention
```

See [Sharding](/advanced/sharding) for full configuration options.

## Concurrency Tuning

When running multiple workers, adjust concurrency based on your needs:

| Workers | Use Case                              |
| ------- | ------------------------------------- |
| 1-3     | Rate-limited APIs, careful processing |
| 5-10    | Standard batch operations             |
| 10-20   | High-throughput scenarios             |

## Next Steps

<CardGroup cols={2}>
  <Card title="Sharding" icon="layer-group" href="/advanced/sharding">
    Partition collections for distributed writes
  </Card>

  <Card title="Task Management" icon="list-check" href="/advanced/task-management">
    Track async operations
  </Card>

  <Card title="Transaction Tracking" icon="clock" href="/advanced/transaction-tracking">
    Monitor transaction status
  </Card>

  <Card title="Collections & Indexes" icon="database" href="/concepts/collections-indexes">
    Unique indexes for deduplication
  </Card>
</CardGroup>
