Production Patterns
Scale, monitor, and harden your blockchain integration for production use
Your integration works in development. Now let's make it production-ready with high availability, monitoring, reconciliation, and disaster recovery patterns.
High Availability
Running Multiple Instances
In production, you'll want multiple instances of your payment monitor for redundancy. The challenge: only one instance should process each block.
Solution: Leader Election with PostgreSQL Advisory Locks
class LeaderElection {
private lockId = 12345 // Unique ID for your service
private isLeader = false
private heartbeatInterval: NodeJS.Timer | null = null
async tryBecomeLeader(): Promise<boolean> {
const result = await db.query(
'SELECT pg_try_advisory_lock($1) as acquired',
[this.lockId]
)
this.isLeader = result.rows[0].acquired
if (this.isLeader) {
console.log('Acquired leader lock')
this.startHeartbeat()
}
return this.isLeader
}
private startHeartbeat(): void {
// Keep the lock alive and release on shutdown
this.heartbeatInterval = setInterval(async () => {
// The lock is held as long as the connection is alive
// This just logs status for monitoring
console.log('Leader heartbeat - still holding lock')
}, 30_000)
}
async releaseLeadership(): Promise<void> {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval)
}
if (this.isLeader) {
await db.query('SELECT pg_advisory_unlock($1)', [this.lockId])
this.isLeader = false
console.log('Released leader lock')
}
}
}
// Usage
const leader = new LeaderElection()
async function main() {
const isLeader = await leader.tryBecomeLeader()
if (isLeader) {
// This instance processes blocks
await startChainSync()
} else {
// This instance waits for leadership
console.log('Running as standby, waiting for leadership...')
setInterval(async () => {
if (await leader.tryBecomeLeader()) {
await startChainSync()
}
}, 10_000)
}
}
// Graceful shutdown
process.on('SIGTERM', async () => {
await leader.releaseLeadership()
process.exit(0)
})Connection Management
Implement robust reconnection for WebSocket connections:
class ResilientConnection {
private ws: WebSocket | null = null
private reconnectAttempts = 0
private maxReconnectAttempts = 10
private baseDelay = 1000
async connect(): Promise<void> {
while (this.reconnectAttempts < this.maxReconnectAttempts) {
try {
await this.doConnect()
this.reconnectAttempts = 0 // Reset on success
return
} catch (error) {
this.reconnectAttempts++
const delay = this.calculateBackoff()
console.log(`Connection failed, retry ${this.reconnectAttempts} in ${delay}ms`)
await this.sleep(delay)
}
}
throw new Error('Max reconnection attempts exceeded')
}
private calculateBackoff(): number {
// Exponential backoff with jitter
const exponential = this.baseDelay * Math.pow(2, this.reconnectAttempts)
const jitter = Math.random() * 1000
return Math.min(exponential + jitter, 60_000) // Max 60 seconds
}
private async doConnect(): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(`wss://api.nacho.builders/v1/ogmios?apikey=${API_KEY}`)
const timeout = setTimeout(() => {
this.ws?.close()
reject(new Error('Connection timeout'))
}, 10_000)
this.ws.on('open', () => {
clearTimeout(timeout)
resolve()
})
this.ws.on('error', (error) => {
clearTimeout(timeout)
reject(error)
})
this.ws.on('close', () => {
console.log('Connection closed, will reconnect...')
this.connect().catch(console.error)
})
})
}
}Health Checks
Expose health endpoints for load balancers and monitoring:
import express from 'express'
const app = express()
// Liveness: Is the process running?
app.get('/health/live', (req, res) => {
res.json({ status: 'ok' })
})
// Readiness: Is the service ready to process?
app.get('/health/ready', async (req, res) => {
try {
// Check database connection
await db.query('SELECT 1')
// Check sync status
const syncState = await db.query('SELECT * FROM sync_state WHERE id = 1')
const tip = await getCurrentTip()
const syncLag = tip.height - syncState.rows[0].last_block_height
if (syncLag > 100) {
return res.status(503).json({
status: 'not_ready',
reason: 'sync_lag',
lag: syncLag
})
}
res.json({
status: 'ready',
syncedTo: syncState.rows[0].last_block_height,
tipHeight: tip.height,
lag: syncLag
})
} catch (error) {
res.status(503).json({
status: 'error',
error: error.message
})
}
})
app.listen(8080)Monitoring and Alerting
Key Metrics to Track
import { Counter, Gauge, Histogram } from 'prom-client'
// Sync metrics
const syncHeight = new Gauge({
name: 'payment_monitor_sync_height',
help: 'Current synced block height'
})
const syncLag = new Gauge({
name: 'payment_monitor_sync_lag',
help: 'Blocks behind chain tip'
})
// Payment metrics
const paymentsDetected = new Counter({
name: 'payments_detected_total',
help: 'Total payments detected',
labelNames: ['reference_type']
})
const paymentsConfirmed = new Counter({
name: 'payments_confirmed_total',
help: 'Total payments confirmed',
labelNames: ['reference_type']
})
const paymentValue = new Histogram({
name: 'payment_value_ada',
help: 'Payment values in ADA',
buckets: [1, 10, 100, 1000, 10000, 100000]
})
// Processing metrics
const blockProcessingTime = new Histogram({
name: 'block_processing_seconds',
help: 'Time to process each block',
buckets: [0.01, 0.05, 0.1, 0.5, 1, 5]
})
// Update metrics in your code
async function processBlock(block: Block): Promise<void> {
const start = Date.now()
// ... process block ...
blockProcessingTime.observe((Date.now() - start) / 1000)
syncHeight.set(block.height)
}Alert Rules (Prometheus/Grafana)
groups:
- name: payment-monitor
rules:
# Sync falling behind
- alert: PaymentMonitorSyncLag
expr: payment_monitor_sync_lag > 50
for: 5m
labels:
severity: warning
annotations:
summary: "Payment monitor is {{ $value }} blocks behind"
- alert: PaymentMonitorSyncLagCritical
expr: payment_monitor_sync_lag > 200
for: 2m
labels:
severity: critical
annotations:
summary: "Payment monitor critically behind: {{ $value }} blocks"
# No payments detected (might indicate issue)
- alert: NoPaymentsDetected
expr: increase(payments_detected_total[1h]) == 0
for: 6h
labels:
severity: warning
annotations:
summary: "No payments detected in 6 hours"
# Hot wallet balance
- alert: HotWalletLow
expr: hot_wallet_balance_ada < 50000
for: 5m
labels:
severity: warning
annotations:
summary: "Hot wallet balance low: {{ $value }} ADA"
- alert: HotWalletCritical
expr: hot_wallet_balance_ada < 10000
for: 1m
labels:
severity: critical
annotations:
summary: "Hot wallet critically low: {{ $value }} ADA"Logging Best Practices
import pino from 'pino'
const logger = pino({
level: process.env.LOG_LEVEL || 'info',
formatters: {
level: (label) => ({ level: label })
}
})
// Structured logging for easy querying
function logPaymentDetected(payment: Payment): void {
logger.info({
event: 'payment_detected',
tx_hash: payment.txHash,
amount_ada: payment.amountLovelace / 1_000_000,
reference_type: payment.referenceType,
reference_id: payment.referenceId,
block_height: payment.blockHeight
})
}
function logPaymentConfirmed(payment: Payment): void {
logger.info({
event: 'payment_confirmed',
tx_hash: payment.txHash,
amount_ada: payment.amountLovelace / 1_000_000,
confirmations: payment.confirmations,
latency_seconds: (Date.now() - payment.detectedAt.getTime()) / 1000
})
}
function logRollback(blocksRolledBack: number, affectedPayments: number): void {
logger.warn({
event: 'rollback',
blocks_rolled_back: blocksRolledBack,
affected_payments: affectedPayments
})
}Reconciliation
Even with perfect code, periodically verify your records match chain state.
Daily Reconciliation Job
interface ReconciliationResult {
matched: number
mismatched: number
missing: number
extra: number
discrepancies: Discrepancy[]
}
async function runReconciliation(): Promise<ReconciliationResult> {
const result: ReconciliationResult = {
matched: 0,
mismatched: 0,
missing: 0,
extra: 0,
discrepancies: []
}
// Get all payment addresses
const addresses = await db.query('SELECT address FROM payment_addresses')
for (const { address } of addresses.rows) {
// Query chain state
const chainUtxos = await queryChainUtxos(address)
// Query our records (confirmed payments that haven't been spent)
const ourRecords = await db.query(
`SELECT tx_hash, output_index, amount_lovelace
FROM incoming_payments
WHERE address = $1 AND status = 'confirmed'`,
[address]
)
// Compare
const chainSet = new Set(chainUtxos.map(u => `${u.txHash}#${u.index}`))
const ourSet = new Set(ourRecords.rows.map(r => `${r.tx_hash}#${r.output_index}`))
// Find discrepancies
for (const utxo of chainUtxos) {
const key = `${utxo.txHash}#${utxo.index}`
if (!ourSet.has(key)) {
result.missing++
result.discrepancies.push({
type: 'missing_in_db',
txHash: utxo.txHash,
index: utxo.index,
address,
amount: utxo.amount
})
}
}
for (const record of ourRecords.rows) {
const key = `${record.tx_hash}#${record.output_index}`
if (!chainSet.has(key)) {
result.extra++
result.discrepancies.push({
type: 'extra_in_db',
txHash: record.tx_hash,
index: record.output_index,
address
})
} else {
result.matched++
}
}
}
// Log and alert on discrepancies
if (result.discrepancies.length > 0) {
logger.error({
event: 'reconciliation_discrepancy',
discrepancies: result.discrepancies
})
await sendAlert('reconciliation', `Found ${result.discrepancies.length} discrepancies`)
} else {
logger.info({
event: 'reconciliation_success',
matched: result.matched
})
}
return result
}
// Run daily at 3am
import cron from 'node-cron'
cron.schedule('0 3 * * *', runReconciliation)Balance Verification
async function verifyUserBalances(): Promise<void> {
// Get all users with balances
const users = await db.query(
`SELECT user_id, balance_lovelace FROM user_balances WHERE balance_lovelace > 0`
)
for (const user of users.rows) {
// Calculate expected balance from payment history
const payments = await db.query(
`SELECT
COALESCE(SUM(CASE WHEN type = 'credit' THEN amount ELSE 0 END), 0) as credits,
COALESCE(SUM(CASE WHEN type = 'debit' THEN amount ELSE 0 END), 0) as debits
FROM balance_transactions
WHERE user_id = $1`,
[user.user_id]
)
const expectedBalance = BigInt(payments.rows[0].credits) - BigInt(payments.rows[0].debits)
if (expectedBalance !== BigInt(user.balance_lovelace)) {
logger.error({
event: 'balance_mismatch',
user_id: user.user_id,
stored_balance: user.balance_lovelace,
calculated_balance: expectedBalance.toString()
})
}
}
}Disaster Recovery
Rebuilding from Chain
Your integration database can always be rebuilt from the blockchain:
async function rebuildFromChain(): Promise<void> {
console.log('Starting full rebuild from chain...')
// 1. Clear integration tables (keep payment_addresses)
await db.query('TRUNCATE sync_state, blocks, incoming_payments')
// 2. Get all addresses to monitor
const addresses = await db.query('SELECT address FROM payment_addresses')
const addressSet = new Set(addresses.rows.map(r => r.address))
// 3. Connect to chain sync from a known good point
const chainSync = new ChainSyncClient()
await chainSync.connect()
// Start from recent history (e.g., 30 days ago)
// You'd calculate the appropriate slot number
const startSlot = await calculateSlotDaysAgo(30)
await chainSync.findIntersection([{ slot: startSlot, id: 'known-block-hash' }])
// 4. Process all blocks
let blocksProcessed = 0
const blockProcessor = new BlockProcessor(db)
chainSync.onBlock(async (result) => {
if (result.direction === 'forward' && result.block) {
await blockProcessor.processBlock(result.block)
blocksProcessed++
if (blocksProcessed % 1000 === 0) {
console.log(`Rebuilt ${blocksProcessed} blocks, height: ${result.block.height}`)
}
}
chainSync.requestNext()
})
chainSync.requestNext()
// Wait until caught up
// ... implementation depends on your architecture
}Backup Strategy
| Data | Backup Frequency | Retention | Notes |
|---|---|---|---|
payment_addresses | Daily | 30 days | Critical - maps addresses to your entities |
incoming_payments | Daily | 90 days | Can be rebuilt from chain if needed |
user_balances | Hourly | 7 days | Critical business data |
sync_state, blocks | Not needed | - | Derived from chain |
# Backup critical tables
pg_dump -t payment_addresses -t user_balances mydb > backup.sql
# Restore
psql mydb < backup.sqlScaling Considerations
Horizontal Scaling
For very high throughput, partition your address space:
// Assign addresses to partitions based on hash
function getPartition(address: string, totalPartitions: number): number {
const hash = crypto.createHash('sha256').update(address).digest()
return hash.readUInt32BE(0) % totalPartitions
}
// Each instance only processes its partition
const MY_PARTITION = parseInt(process.env.PARTITION_ID || '0')
const TOTAL_PARTITIONS = parseInt(process.env.TOTAL_PARTITIONS || '1')
async function isMyAddress(address: string): Promise<boolean> {
return getPartition(address, TOTAL_PARTITIONS) === MY_PARTITION
}Database Optimization
-- Partition incoming_payments by month
CREATE TABLE incoming_payments (
-- columns...
) PARTITION BY RANGE (created_at);
CREATE TABLE incoming_payments_2024_01 PARTITION OF incoming_payments
FOR VALUES FROM ('2024-01-01') TO ('2024-02-01');
-- Add more partitions as needed
-- Efficient status queries with partial index
CREATE INDEX idx_pending_payments ON incoming_payments(created_at)
WHERE status = 'pending';
-- Use connection pooling
-- Configure PgBouncer or similarCaching
import Redis from 'ioredis'
const redis = new Redis(process.env.REDIS_URL)
// Cache address lookups (they rarely change)
async function isPaymentAddress(address: string): Promise<boolean> {
const cached = await redis.get(`addr:${address}`)
if (cached !== null) {
return cached === '1'
}
const result = await db.query(
'SELECT 1 FROM payment_addresses WHERE address = $1',
[address]
)
const isPayment = result.rows.length > 0
await redis.setex(`addr:${address}`, 3600, isPayment ? '1' : '0')
return isPayment
}Security Hardening
Network Security
# docker-compose.yml example
services:
payment-monitor:
networks:
- internal # Only internal network
environment:
- DATABASE_URL=postgres://... # Internal hostname
- NACHO_API_KEY=${NACHO_API_KEY}
api:
networks:
- internal
- external # Exposed to internet
ports:
- "8080:8080"
networks:
internal:
internal: true # No external access
external:API Key Rotation
// Support multiple API keys for rotation
const API_KEYS = [
process.env.NACHO_API_KEY_PRIMARY,
process.env.NACHO_API_KEY_SECONDARY
].filter(Boolean)
let currentKeyIndex = 0
function getApiKey(): string {
return API_KEYS[currentKeyIndex]!
}
function rotateToNextKey(): void {
currentKeyIndex = (currentKeyIndex + 1) % API_KEYS.length
console.log(`Rotated to API key ${currentKeyIndex + 1}`)
}
// On auth failure, try next key
async function connectWithRotation(): Promise<void> {
for (let i = 0; i < API_KEYS.length; i++) {
try {
await connect(getApiKey())
return
} catch (error) {
if (error.message.includes('unauthorized')) {
rotateToNextKey()
} else {
throw error
}
}
}
throw new Error('All API keys failed')
}Audit Logging
// Log all sensitive operations
async function createPayment(request: PaymentRequest): Promise<string> {
const paymentId = await insertPayment(request)
await db.query(
`INSERT INTO audit_log (action, entity_type, entity_id, actor, details)
VALUES ($1, $2, $3, $4, $5)`,
[
'create_payment',
'outbound_payment',
paymentId,
request.requestedBy, // User or system that initiated
JSON.stringify({
amount: request.amountLovelace,
destination: request.destinationAddress,
reference: request.referenceId
})
]
)
return paymentId
}Congratulations!
You now have a complete understanding of production blockchain integration patterns. Your system can:
- Detect and process payments in real-time
- Send outbound payments securely
- Handle failures and rollbacks gracefully
- Scale to handle high transaction volumes
- Monitor and alert on issues
- Recover from disasters
Next Steps
- Production Deployment Guide - Infrastructure setup
- Error Handling - Handle edge cases
- Troubleshooting - Debug common issues
- API Reference - Complete API documentation
Was this page helpful?