Intermediate25 min read
Edit on GitHub

Payment Monitoring

Detect and process incoming ADA payments in real-time using chain synchronization

In this tutorial, you'll build a production-grade payment monitoring system that detects incoming ADA in real-time and handles blockchain rollbacks safely.

What You'll Build

A payment monitor that:

  1. Connects to the blockchain via WebSocket
  2. Processes new blocks as they arrive
  3. Detects payments to your addresses
  4. Tracks confirmations
  5. Handles rollbacks without double-crediting

Use Cases

E-Commerce

Customer pays → Detect payment → Fulfill order

SaaS Billing

User tops up → Detect payment → Credit account

Gaming

Player deposits → Detect payment → Add in-game currency

Architecture

Payment Monitor Architecture

Payment Monitor Serviceblockspayments
Chain Sync Connection
Block Processor
Payment Detector
PostgreSQL

The payment monitor sits between the blockchain and your application, translating raw blockchain data into actionable events.

Step 1: Database Schema

First, set up your tracking tables:

-- Track sync progress (where we are in the chain)
CREATE TABLE sync_state (
  id INTEGER PRIMARY KEY DEFAULT 1,
  last_block_hash TEXT NOT NULL,
  last_block_height INTEGER NOT NULL,
  last_slot INTEGER NOT NULL,
  updated_at TIMESTAMPTZ DEFAULT NOW()
);

-- Track blocks for rollback detection
CREATE TABLE blocks (
  hash TEXT PRIMARY KEY,
  height INTEGER NOT NULL,
  slot INTEGER NOT NULL,
  previous_hash TEXT,
  processed_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_blocks_height ON blocks(height DESC);

-- Addresses to monitor for incoming payments
CREATE TABLE payment_addresses (
  address TEXT PRIMARY KEY,
  reference_id TEXT NOT NULL,  -- Your order ID, user ID, etc.
  reference_type TEXT NOT NULL, -- 'order', 'user', 'invoice', etc.
  created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_payment_addresses_ref ON payment_addresses(reference_type, reference_id);

-- Detected incoming payments
CREATE TABLE incoming_payments (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  tx_hash TEXT NOT NULL,
  output_index INTEGER NOT NULL,
  address TEXT NOT NULL,
  reference_id TEXT NOT NULL,
  reference_type TEXT NOT NULL,
  amount_lovelace BIGINT NOT NULL,
  block_hash TEXT NOT NULL REFERENCES blocks(hash),
  block_height INTEGER NOT NULL,
  status TEXT DEFAULT 'pending',
  -- Status: pending → confirmed → processed | rolled_back
  created_at TIMESTAMPTZ DEFAULT NOW(),
  confirmed_at TIMESTAMPTZ,
  processed_at TIMESTAMPTZ,
  UNIQUE(tx_hash, output_index)
);
CREATE INDEX idx_payments_status ON incoming_payments(status);
CREATE INDEX idx_payments_reference ON incoming_payments(reference_type, reference_id);

Why This Schema?

TablePurpose
sync_stateResume from correct position after restart
blocksDetect which payments to roll back
payment_addressesMap blockchain addresses to your business objects
incoming_paymentsTrack payment lifecycle with explicit states

Step 2: Chain Sync Connection

Connect to the Ogmios chain sync protocol:

import WebSocket from 'ws'

const API_KEY = process.env.NACHO_API_KEY!
const WS_URL = 'wss://api.nacho.builders/v1/ogmios'

interface ChainPoint {
slot: number
id: string  // block hash
}

interface BlockResult {
direction: 'forward' | 'backward'
block?: Block
point?: ChainPoint
}

class ChainSyncClient {
private ws: WebSocket | null = null
private messageHandler: ((result: BlockResult) => Promise<void>) | null = null

async connect(): Promise<void> {
  return new Promise((resolve, reject) => {
    this.ws = new WebSocket(`${WS_URL}?apikey=${API_KEY}`)

    this.ws.on('open', () => {
      console.log('Connected to chain sync')
      resolve()
    })

    this.ws.on('message', async (data) => {
      const msg = JSON.parse(data.toString())
      if (msg.result && this.messageHandler) {
        await this.messageHandler(msg.result)
      }
    })

    this.ws.on('error', reject)
    this.ws.on('close', () => {
      console.log('Chain sync disconnected')
      // Implement reconnection logic here
    })
  })
}

async findIntersection(points: (ChainPoint | 'origin')[]): Promise<ChainPoint> {
  return new Promise((resolve) => {
    const handler = this.messageHandler
    this.messageHandler = async (result: any) => {
      if (result.intersection) {
        this.messageHandler = handler
        resolve(result.intersection)
      }
    }

    this.ws?.send(JSON.stringify({
      jsonrpc: '2.0',
      method: 'findIntersection',
      params: { points },
      id: 'find-intersection'
    }))
  })
}

requestNext(): void {
  this.ws?.send(JSON.stringify({
    jsonrpc: '2.0',
    method: 'nextBlock',
    id: 'next'
  }))
}

onBlock(handler: (result: BlockResult) => Promise<void>): void {
  this.messageHandler = handler
}

close(): void {
  this.ws?.close()
}
}

Step 3: Block Processor

Process blocks and detect payments to your addresses:

import { Pool } from 'pg'

interface Block {
id: string       // block hash
height: number
slot: number
previousBlock: string
transactions: Transaction[]
}

interface Transaction {
id: string       // tx hash
outputs: Output[]
}

interface Output {
address: string
value: { ada: { lovelace: number } }
}

class BlockProcessor {
constructor(private db: Pool) {}

async processBlock(block: Block): Promise<void> {
  const client = await this.db.connect()

  try {
    await client.query('BEGIN')

    // 1. Store the block (for rollback tracking)
    await client.query(
      `INSERT INTO blocks (hash, height, slot, previous_hash)
       VALUES ($1, $2, $3, $4)
       ON CONFLICT (hash) DO NOTHING`,
      [block.id, block.height, block.slot, block.previousBlock]
    )

    // 2. Process each transaction for payments
    for (const tx of block.transactions || []) {
      await this.processTransaction(client, tx, block)
    }

    // 3. Update sync state
    await client.query(
      `INSERT INTO sync_state (id, last_block_hash, last_block_height, last_slot)
       VALUES (1, $1, $2, $3)
       ON CONFLICT (id) DO UPDATE SET
         last_block_hash = $1,
         last_block_height = $2,
         last_slot = $3,
         updated_at = NOW()`,
      [block.id, block.height, block.slot]
    )

    // 4. Update confirmations for pending payments
    await client.query(
      `UPDATE incoming_payments
       SET status = CASE
         WHEN $1 - block_height >= 15 THEN 'confirmed'
         ELSE status
       END,
       confirmed_at = CASE
         WHEN $1 - block_height >= 15 AND confirmed_at IS NULL THEN NOW()
         ELSE confirmed_at
       END
       WHERE status = 'pending'`,
      [block.height]
    )

    await client.query('COMMIT')
    console.log(`Block ${block.height}: ${block.transactions?.length || 0} txs`)

  } catch (error) {
    await client.query('ROLLBACK')
    throw error
  } finally {
    client.release()
  }
}

private async processTransaction(
  client: any,
  tx: Transaction,
  block: Block
): Promise<void> {
  for (let i = 0; i < tx.outputs.length; i++) {
    const output = tx.outputs[i]

    // Check if output is to one of our payment addresses
    const result = await client.query(
      `SELECT reference_id, reference_type
       FROM payment_addresses WHERE address = $1`,
      [output.address]
    )

    if (result.rows.length > 0) {
      const { reference_id, reference_type } = result.rows[0]
      const amount = output.value.ada.lovelace

      // Record the payment
      await client.query(
        `INSERT INTO incoming_payments
         (tx_hash, output_index, address, reference_id, reference_type,
          amount_lovelace, block_hash, block_height)
         VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
         ON CONFLICT (tx_hash, output_index) DO NOTHING`,
        [tx.id, i, output.address, reference_id, reference_type,
         amount, block.id, block.height]
      )

      console.log(`Payment detected: ${amount / 1_000_000} ADA for ${reference_type}:${reference_id}`)
    }
  }
}
}

Step 4: Rollback Handler

Handle chain rollbacks safely:

class RollbackHandler {
constructor(private db: Pool) {}

async handleRollback(rollbackPoint: ChainPoint): Promise<void> {
  const client = await this.db.connect()

  try {
    await client.query('BEGIN')

    // 1. Find all blocks being rolled back
    const result = await client.query(
      `SELECT hash, height FROM blocks
       WHERE height > (SELECT height FROM blocks WHERE hash = $1)
       ORDER BY height DESC`,
      [rollbackPoint.id]
    )

    const rolledBackBlocks = result.rows

    if (rolledBackBlocks.length === 0) {
      await client.query('COMMIT')
      return
    }

    console.log(`Rolling back ${rolledBackBlocks.length} blocks`)

    // 2. Handle payments from rolled-back blocks
    for (const block of rolledBackBlocks) {
      // Mark payments as rolled back (unless already processed)
      await client.query(
        `UPDATE incoming_payments
         SET status = 'rolled_back'
         WHERE block_hash = $1 AND status IN ('pending', 'confirmed')`,
        [block.hash]
      )

      // Check for payments that were already processed (ALERT!)
      const processed = await client.query(
        `SELECT * FROM incoming_payments
         WHERE block_hash = $1 AND status = 'processed'`,
        [block.hash]
      )

      if (processed.rows.length > 0) {
        console.error('CRITICAL: Processed payments rolled back!', processed.rows)
        // Send alert to operations team immediately
        // These require manual review and possible reversal
      }
    }

    // 3. Delete rolled-back blocks
    await client.query(
      `DELETE FROM blocks
       WHERE height > (SELECT height FROM blocks WHERE hash = $1)`,
      [rollbackPoint.id]
    )

    // 4. Update sync state
    const rollbackBlock = await client.query(
      'SELECT height FROM blocks WHERE hash = $1',
      [rollbackPoint.id]
    )

    await client.query(
      `UPDATE sync_state SET
         last_block_hash = $1,
         last_block_height = $2,
         last_slot = $3,
         updated_at = NOW()`,
      [rollbackPoint.id, rollbackBlock.rows[0].height, rollbackPoint.slot]
    )

    await client.query('COMMIT')

  } catch (error) {
    await client.query('ROLLBACK')
    throw error
  } finally {
    client.release()
  }
}
}

Step 5: Main Monitor Loop

Tie it all together:

async function startPaymentMonitor() {
const db = new Pool({ connectionString: process.env.DATABASE_URL })
const chainSync = new ChainSyncClient()
const blockProcessor = new BlockProcessor(db)
const rollbackHandler = new RollbackHandler(db)

await chainSync.connect()

// Get last synced point from database
const syncState = await db.query('SELECT * FROM sync_state WHERE id = 1')
let startPoints: (ChainPoint | 'origin')[]

if (syncState.rows.length > 0) {
  startPoints = [{
    id: syncState.rows[0].last_block_hash,
    slot: syncState.rows[0].last_slot
  }, 'origin']  // Fallback to origin if our point is gone
} else {
  startPoints = ['origin']
}

// Find intersection with chain
const intersection = await chainSync.findIntersection(startPoints)
console.log(`Starting sync from block ${intersection.id.slice(0, 8)}...`)

// Set up block handler
chainSync.onBlock(async (result) => {
  if (result.direction === 'forward' && result.block) {
    await blockProcessor.processBlock(result.block)
  } else if (result.direction === 'backward' && result.point) {
    await rollbackHandler.handleRollback(result.point)
  }
  chainSync.requestNext()
})

// Start the sync loop
chainSync.requestNext()
}

startPaymentMonitor().catch(console.error)

Processing Confirmed Payments

Run a separate job to act on confirmed payments:

async function processConfirmedPayments(db: Pool) {
  // Get confirmed but not yet processed payments
  const result = await db.query(
    `SELECT * FROM incoming_payments
     WHERE status = 'confirmed'
     FOR UPDATE SKIP LOCKED`  // Allow parallel processing
  )

  for (const payment of result.rows) {
    try {
      // Your business logic here
      await handlePayment(payment)

      // Mark as processed
      await db.query(
        `UPDATE incoming_payments
         SET status = 'processed', processed_at = NOW()
         WHERE id = $1`,
        [payment.id]
      )

      console.log(`Processed: ${payment.amount_lovelace / 1_000_000} ADA for ${payment.reference_type}:${payment.reference_id}`)

    } catch (error) {
      console.error(`Failed to process payment ${payment.id}:`, error)
      // Leave as 'confirmed' for retry
    }
  }
}

async function handlePayment(payment: IncomingPayment) {
  switch (payment.reference_type) {
    case 'order':
      await fulfillOrder(payment.reference_id, payment.amount_lovelace)
      break
    case 'user':
      await creditUserAccount(payment.reference_id, payment.amount_lovelace)
      break
    case 'invoice':
      await markInvoicePaid(payment.reference_id)
      break
  }
}

Production Considerations

Production Checklist

  1. Health monitoring - Alert if sync falls behind tip by >100 blocks
  2. Reconnection logic - Auto-reconnect with exponential backoff
  3. Duplicate handling - The UNIQUE constraint ensures idempotency
  4. Metrics - Track payments detected, confirmations, processing latency
  5. Backup verification - Periodically verify balances match chain state

Testing Your Integration

Use the Preprod testnet for development:

# Use preprod endpoint
wss://api.nacho.builders/v1/preprod/ogmios

# Get testnet ADA from faucet
# https://docs.cardano.org/cardano-testnets/tools/faucet

Common Pitfalls

PitfallSolution
Processing unconfirmed paymentsWait for 15+ confirmations before acting
Not handling rollbacksAlways implement rollback logic
Double-processing on restartUse UNIQUE constraints and check before insert
Blocking on slow processingProcess blocks first, handle business logic async

Next Steps

Now that you can monitor incoming payments, learn to send payments:

Next: Sending Payments

Was this page helpful?