Advanced20 min read
Edit on GitHub

Production Patterns

Scale, monitor, and harden your blockchain integration for production use

Your integration works in development. Now let's make it production-ready with high availability, monitoring, reconciliation, and disaster recovery patterns.

High Availability

Running Multiple Instances

In production, you'll want multiple instances of your payment monitor for redundancy. The challenge: only one instance should process each block.

Solution: Leader Election with PostgreSQL Advisory Locks

class LeaderElection {
  private lockId = 12345  // Unique ID for your service
  private isLeader = false
  private heartbeatInterval: NodeJS.Timer | null = null

  async tryBecomeLeader(): Promise<boolean> {
    const result = await db.query(
      'SELECT pg_try_advisory_lock($1) as acquired',
      [this.lockId]
    )

    this.isLeader = result.rows[0].acquired

    if (this.isLeader) {
      console.log('Acquired leader lock')
      this.startHeartbeat()
    }

    return this.isLeader
  }

  private startHeartbeat(): void {
    // Keep the lock alive and release on shutdown
    this.heartbeatInterval = setInterval(async () => {
      // The lock is held as long as the connection is alive
      // This just logs status for monitoring
      console.log('Leader heartbeat - still holding lock')
    }, 30_000)
  }

  async releaseLeadership(): Promise<void> {
    if (this.heartbeatInterval) {
      clearInterval(this.heartbeatInterval)
    }

    if (this.isLeader) {
      await db.query('SELECT pg_advisory_unlock($1)', [this.lockId])
      this.isLeader = false
      console.log('Released leader lock')
    }
  }
}

// Usage
const leader = new LeaderElection()

async function main() {
  const isLeader = await leader.tryBecomeLeader()

  if (isLeader) {
    // This instance processes blocks
    await startChainSync()
  } else {
    // This instance waits for leadership
    console.log('Running as standby, waiting for leadership...')
    setInterval(async () => {
      if (await leader.tryBecomeLeader()) {
        await startChainSync()
      }
    }, 10_000)
  }
}

// Graceful shutdown
process.on('SIGTERM', async () => {
  await leader.releaseLeadership()
  process.exit(0)
})

Connection Management

Implement robust reconnection for WebSocket connections:

class ResilientConnection {
  private ws: WebSocket | null = null
  private reconnectAttempts = 0
  private maxReconnectAttempts = 10
  private baseDelay = 1000

  async connect(): Promise<void> {
    while (this.reconnectAttempts < this.maxReconnectAttempts) {
      try {
        await this.doConnect()
        this.reconnectAttempts = 0  // Reset on success
        return
      } catch (error) {
        this.reconnectAttempts++
        const delay = this.calculateBackoff()
        console.log(`Connection failed, retry ${this.reconnectAttempts} in ${delay}ms`)
        await this.sleep(delay)
      }
    }

    throw new Error('Max reconnection attempts exceeded')
  }

  private calculateBackoff(): number {
    // Exponential backoff with jitter
    const exponential = this.baseDelay * Math.pow(2, this.reconnectAttempts)
    const jitter = Math.random() * 1000
    return Math.min(exponential + jitter, 60_000)  // Max 60 seconds
  }

  private async doConnect(): Promise<void> {
    return new Promise((resolve, reject) => {
      this.ws = new WebSocket(`wss://api.nacho.builders/v1/ogmios?apikey=${API_KEY}`)

      const timeout = setTimeout(() => {
        this.ws?.close()
        reject(new Error('Connection timeout'))
      }, 10_000)

      this.ws.on('open', () => {
        clearTimeout(timeout)
        resolve()
      })

      this.ws.on('error', (error) => {
        clearTimeout(timeout)
        reject(error)
      })

      this.ws.on('close', () => {
        console.log('Connection closed, will reconnect...')
        this.connect().catch(console.error)
      })
    })
  }
}

Health Checks

Expose health endpoints for load balancers and monitoring:

import express from 'express'

const app = express()

// Liveness: Is the process running?
app.get('/health/live', (req, res) => {
  res.json({ status: 'ok' })
})

// Readiness: Is the service ready to process?
app.get('/health/ready', async (req, res) => {
  try {
    // Check database connection
    await db.query('SELECT 1')

    // Check sync status
    const syncState = await db.query('SELECT * FROM sync_state WHERE id = 1')
    const tip = await getCurrentTip()

    const syncLag = tip.height - syncState.rows[0].last_block_height

    if (syncLag > 100) {
      return res.status(503).json({
        status: 'not_ready',
        reason: 'sync_lag',
        lag: syncLag
      })
    }

    res.json({
      status: 'ready',
      syncedTo: syncState.rows[0].last_block_height,
      tipHeight: tip.height,
      lag: syncLag
    })
  } catch (error) {
    res.status(503).json({
      status: 'error',
      error: error.message
    })
  }
})

app.listen(8080)

Monitoring and Alerting

Key Metrics to Track

import { Counter, Gauge, Histogram } from 'prom-client'

// Sync metrics
const syncHeight = new Gauge({
  name: 'payment_monitor_sync_height',
  help: 'Current synced block height'
})

const syncLag = new Gauge({
  name: 'payment_monitor_sync_lag',
  help: 'Blocks behind chain tip'
})

// Payment metrics
const paymentsDetected = new Counter({
  name: 'payments_detected_total',
  help: 'Total payments detected',
  labelNames: ['reference_type']
})

const paymentsConfirmed = new Counter({
  name: 'payments_confirmed_total',
  help: 'Total payments confirmed',
  labelNames: ['reference_type']
})

const paymentValue = new Histogram({
  name: 'payment_value_ada',
  help: 'Payment values in ADA',
  buckets: [1, 10, 100, 1000, 10000, 100000]
})

// Processing metrics
const blockProcessingTime = new Histogram({
  name: 'block_processing_seconds',
  help: 'Time to process each block',
  buckets: [0.01, 0.05, 0.1, 0.5, 1, 5]
})

// Update metrics in your code
async function processBlock(block: Block): Promise<void> {
  const start = Date.now()

  // ... process block ...

  blockProcessingTime.observe((Date.now() - start) / 1000)
  syncHeight.set(block.height)
}

Alert Rules (Prometheus/Grafana)

groups:
  - name: payment-monitor
    rules:
      # Sync falling behind
      - alert: PaymentMonitorSyncLag
        expr: payment_monitor_sync_lag > 50
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Payment monitor is {{ $value }} blocks behind"

      - alert: PaymentMonitorSyncLagCritical
        expr: payment_monitor_sync_lag > 200
        for: 2m
        labels:
          severity: critical
        annotations:
          summary: "Payment monitor critically behind: {{ $value }} blocks"

      # No payments detected (might indicate issue)
      - alert: NoPaymentsDetected
        expr: increase(payments_detected_total[1h]) == 0
        for: 6h
        labels:
          severity: warning
        annotations:
          summary: "No payments detected in 6 hours"

      # Hot wallet balance
      - alert: HotWalletLow
        expr: hot_wallet_balance_ada < 50000
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Hot wallet balance low: {{ $value }} ADA"

      - alert: HotWalletCritical
        expr: hot_wallet_balance_ada < 10000
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "Hot wallet critically low: {{ $value }} ADA"

Logging Best Practices

import pino from 'pino'

const logger = pino({
  level: process.env.LOG_LEVEL || 'info',
  formatters: {
    level: (label) => ({ level: label })
  }
})

// Structured logging for easy querying
function logPaymentDetected(payment: Payment): void {
  logger.info({
    event: 'payment_detected',
    tx_hash: payment.txHash,
    amount_ada: payment.amountLovelace / 1_000_000,
    reference_type: payment.referenceType,
    reference_id: payment.referenceId,
    block_height: payment.blockHeight
  })
}

function logPaymentConfirmed(payment: Payment): void {
  logger.info({
    event: 'payment_confirmed',
    tx_hash: payment.txHash,
    amount_ada: payment.amountLovelace / 1_000_000,
    confirmations: payment.confirmations,
    latency_seconds: (Date.now() - payment.detectedAt.getTime()) / 1000
  })
}

function logRollback(blocksRolledBack: number, affectedPayments: number): void {
  logger.warn({
    event: 'rollback',
    blocks_rolled_back: blocksRolledBack,
    affected_payments: affectedPayments
  })
}

Reconciliation

Even with perfect code, periodically verify your records match chain state.

Daily Reconciliation Job

interface ReconciliationResult {
  matched: number
  mismatched: number
  missing: number
  extra: number
  discrepancies: Discrepancy[]
}

async function runReconciliation(): Promise<ReconciliationResult> {
  const result: ReconciliationResult = {
    matched: 0,
    mismatched: 0,
    missing: 0,
    extra: 0,
    discrepancies: []
  }

  // Get all payment addresses
  const addresses = await db.query('SELECT address FROM payment_addresses')

  for (const { address } of addresses.rows) {
    // Query chain state
    const chainUtxos = await queryChainUtxos(address)

    // Query our records (confirmed payments that haven't been spent)
    const ourRecords = await db.query(
      `SELECT tx_hash, output_index, amount_lovelace
       FROM incoming_payments
       WHERE address = $1 AND status = 'confirmed'`,
      [address]
    )

    // Compare
    const chainSet = new Set(chainUtxos.map(u => `${u.txHash}#${u.index}`))
    const ourSet = new Set(ourRecords.rows.map(r => `${r.tx_hash}#${r.output_index}`))

    // Find discrepancies
    for (const utxo of chainUtxos) {
      const key = `${utxo.txHash}#${utxo.index}`
      if (!ourSet.has(key)) {
        result.missing++
        result.discrepancies.push({
          type: 'missing_in_db',
          txHash: utxo.txHash,
          index: utxo.index,
          address,
          amount: utxo.amount
        })
      }
    }

    for (const record of ourRecords.rows) {
      const key = `${record.tx_hash}#${record.output_index}`
      if (!chainSet.has(key)) {
        result.extra++
        result.discrepancies.push({
          type: 'extra_in_db',
          txHash: record.tx_hash,
          index: record.output_index,
          address
        })
      } else {
        result.matched++
      }
    }
  }

  // Log and alert on discrepancies
  if (result.discrepancies.length > 0) {
    logger.error({
      event: 'reconciliation_discrepancy',
      discrepancies: result.discrepancies
    })
    await sendAlert('reconciliation', `Found ${result.discrepancies.length} discrepancies`)
  } else {
    logger.info({
      event: 'reconciliation_success',
      matched: result.matched
    })
  }

  return result
}

// Run daily at 3am
import cron from 'node-cron'
cron.schedule('0 3 * * *', runReconciliation)

Balance Verification

async function verifyUserBalances(): Promise<void> {
  // Get all users with balances
  const users = await db.query(
    `SELECT user_id, balance_lovelace FROM user_balances WHERE balance_lovelace > 0`
  )

  for (const user of users.rows) {
    // Calculate expected balance from payment history
    const payments = await db.query(
      `SELECT
         COALESCE(SUM(CASE WHEN type = 'credit' THEN amount ELSE 0 END), 0) as credits,
         COALESCE(SUM(CASE WHEN type = 'debit' THEN amount ELSE 0 END), 0) as debits
       FROM balance_transactions
       WHERE user_id = $1`,
      [user.user_id]
    )

    const expectedBalance = BigInt(payments.rows[0].credits) - BigInt(payments.rows[0].debits)

    if (expectedBalance !== BigInt(user.balance_lovelace)) {
      logger.error({
        event: 'balance_mismatch',
        user_id: user.user_id,
        stored_balance: user.balance_lovelace,
        calculated_balance: expectedBalance.toString()
      })
    }
  }
}

Disaster Recovery

Rebuilding from Chain

Your integration database can always be rebuilt from the blockchain:

async function rebuildFromChain(): Promise<void> {
  console.log('Starting full rebuild from chain...')

  // 1. Clear integration tables (keep payment_addresses)
  await db.query('TRUNCATE sync_state, blocks, incoming_payments')

  // 2. Get all addresses to monitor
  const addresses = await db.query('SELECT address FROM payment_addresses')
  const addressSet = new Set(addresses.rows.map(r => r.address))

  // 3. Connect to chain sync from a known good point
  const chainSync = new ChainSyncClient()
  await chainSync.connect()

  // Start from recent history (e.g., 30 days ago)
  // You'd calculate the appropriate slot number
  const startSlot = await calculateSlotDaysAgo(30)
  await chainSync.findIntersection([{ slot: startSlot, id: 'known-block-hash' }])

  // 4. Process all blocks
  let blocksProcessed = 0
  const blockProcessor = new BlockProcessor(db)

  chainSync.onBlock(async (result) => {
    if (result.direction === 'forward' && result.block) {
      await blockProcessor.processBlock(result.block)
      blocksProcessed++

      if (blocksProcessed % 1000 === 0) {
        console.log(`Rebuilt ${blocksProcessed} blocks, height: ${result.block.height}`)
      }
    }

    chainSync.requestNext()
  })

  chainSync.requestNext()

  // Wait until caught up
  // ... implementation depends on your architecture
}

Backup Strategy

DataBackup FrequencyRetentionNotes
payment_addressesDaily30 daysCritical - maps addresses to your entities
incoming_paymentsDaily90 daysCan be rebuilt from chain if needed
user_balancesHourly7 daysCritical business data
sync_state, blocksNot needed-Derived from chain
# Backup critical tables
pg_dump -t payment_addresses -t user_balances mydb > backup.sql

# Restore
psql mydb < backup.sql

Scaling Considerations

Horizontal Scaling

For very high throughput, partition your address space:

// Assign addresses to partitions based on hash
function getPartition(address: string, totalPartitions: number): number {
  const hash = crypto.createHash('sha256').update(address).digest()
  return hash.readUInt32BE(0) % totalPartitions
}

// Each instance only processes its partition
const MY_PARTITION = parseInt(process.env.PARTITION_ID || '0')
const TOTAL_PARTITIONS = parseInt(process.env.TOTAL_PARTITIONS || '1')

async function isMyAddress(address: string): Promise<boolean> {
  return getPartition(address, TOTAL_PARTITIONS) === MY_PARTITION
}

Database Optimization

-- Partition incoming_payments by month
CREATE TABLE incoming_payments (
  -- columns...
) PARTITION BY RANGE (created_at);

CREATE TABLE incoming_payments_2024_01 PARTITION OF incoming_payments
  FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');

-- Add more partitions as needed

-- Efficient status queries with partial index
CREATE INDEX idx_pending_payments ON incoming_payments(created_at)
  WHERE status = 'pending';

-- Use connection pooling
-- Configure PgBouncer or similar

Caching

import Redis from 'ioredis'

const redis = new Redis(process.env.REDIS_URL)

// Cache address lookups (they rarely change)
async function isPaymentAddress(address: string): Promise<boolean> {
  const cached = await redis.get(`addr:${address}`)

  if (cached !== null) {
    return cached === '1'
  }

  const result = await db.query(
    'SELECT 1 FROM payment_addresses WHERE address = $1',
    [address]
  )

  const isPayment = result.rows.length > 0
  await redis.setex(`addr:${address}`, 3600, isPayment ? '1' : '0')

  return isPayment
}

Security Hardening

Network Security

# docker-compose.yml example
services:
  payment-monitor:
    networks:
      - internal  # Only internal network
    environment:
      - DATABASE_URL=postgres://...  # Internal hostname
      - NACHO_API_KEY=${NACHO_API_KEY}

  api:
    networks:
      - internal
      - external  # Exposed to internet
    ports:
      - "8080:8080"

networks:
  internal:
    internal: true  # No external access
  external:

API Key Rotation

// Support multiple API keys for rotation
const API_KEYS = [
  process.env.NACHO_API_KEY_PRIMARY,
  process.env.NACHO_API_KEY_SECONDARY
].filter(Boolean)

let currentKeyIndex = 0

function getApiKey(): string {
  return API_KEYS[currentKeyIndex]!
}

function rotateToNextKey(): void {
  currentKeyIndex = (currentKeyIndex + 1) % API_KEYS.length
  console.log(`Rotated to API key ${currentKeyIndex + 1}`)
}

// On auth failure, try next key
async function connectWithRotation(): Promise<void> {
  for (let i = 0; i < API_KEYS.length; i++) {
    try {
      await connect(getApiKey())
      return
    } catch (error) {
      if (error.message.includes('unauthorized')) {
        rotateToNextKey()
      } else {
        throw error
      }
    }
  }
  throw new Error('All API keys failed')
}

Audit Logging

// Log all sensitive operations
async function createPayment(request: PaymentRequest): Promise<string> {
  const paymentId = await insertPayment(request)

  await db.query(
    `INSERT INTO audit_log (action, entity_type, entity_id, actor, details)
     VALUES ($1, $2, $3, $4, $5)`,
    [
      'create_payment',
      'outbound_payment',
      paymentId,
      request.requestedBy,  // User or system that initiated
      JSON.stringify({
        amount: request.amountLovelace,
        destination: request.destinationAddress,
        reference: request.referenceId
      })
    ]
  )

  return paymentId
}

Congratulations!

You now have a complete understanding of production blockchain integration patterns. Your system can:

  • Detect and process payments in real-time
  • Send outbound payments securely
  • Handle failures and rollbacks gracefully
  • Scale to handle high transaction volumes
  • Monitor and alert on issues
  • Recover from disasters

Next Steps

Was this page helpful?