Payment Monitoring
Detect and process incoming ADA payments in real-time using chain synchronization
In this tutorial, you'll build a production-grade payment monitoring system that detects incoming ADA in real-time and handles blockchain rollbacks safely.
What You'll Build
A payment monitor that:
- Connects to the blockchain via WebSocket
- Processes new blocks as they arrive
- Detects payments to your addresses
- Tracks confirmations
- Handles rollbacks without double-crediting
Use Cases
E-Commerce
Customer pays → Detect payment → Fulfill order
SaaS Billing
User tops up → Detect payment → Credit account
Gaming
Player deposits → Detect payment → Add in-game currency
Architecture
Payment Monitor Architecture
The payment monitor sits between the blockchain and your application, translating raw blockchain data into actionable events.
Step 1: Database Schema
First, set up your tracking tables:
-- Track sync progress (where we are in the chain)
CREATE TABLE sync_state (
id INTEGER PRIMARY KEY DEFAULT 1,
last_block_hash TEXT NOT NULL,
last_block_height INTEGER NOT NULL,
last_slot INTEGER NOT NULL,
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Track blocks for rollback detection
CREATE TABLE blocks (
hash TEXT PRIMARY KEY,
height INTEGER NOT NULL,
slot INTEGER NOT NULL,
previous_hash TEXT,
processed_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_blocks_height ON blocks(height DESC);
-- Addresses to monitor for incoming payments
CREATE TABLE payment_addresses (
address TEXT PRIMARY KEY,
reference_id TEXT NOT NULL, -- Your order ID, user ID, etc.
reference_type TEXT NOT NULL, -- 'order', 'user', 'invoice', etc.
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_payment_addresses_ref ON payment_addresses(reference_type, reference_id);
-- Detected incoming payments
CREATE TABLE incoming_payments (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tx_hash TEXT NOT NULL,
output_index INTEGER NOT NULL,
address TEXT NOT NULL,
reference_id TEXT NOT NULL,
reference_type TEXT NOT NULL,
amount_lovelace BIGINT NOT NULL,
block_hash TEXT NOT NULL REFERENCES blocks(hash),
block_height INTEGER NOT NULL,
status TEXT DEFAULT 'pending',
-- Status: pending → confirmed → processed | rolled_back
created_at TIMESTAMPTZ DEFAULT NOW(),
confirmed_at TIMESTAMPTZ,
processed_at TIMESTAMPTZ,
UNIQUE(tx_hash, output_index)
);
CREATE INDEX idx_payments_status ON incoming_payments(status);
CREATE INDEX idx_payments_reference ON incoming_payments(reference_type, reference_id);Why This Schema?
| Table | Purpose |
|---|---|
sync_state | Resume from correct position after restart |
blocks | Detect which payments to roll back |
payment_addresses | Map blockchain addresses to your business objects |
incoming_payments | Track payment lifecycle with explicit states |
Step 2: Chain Sync Connection
Connect to the Ogmios chain sync protocol:
import WebSocket from 'ws'
const API_KEY = process.env.NACHO_API_KEY!
const WS_URL = 'wss://api.nacho.builders/v1/ogmios'
interface ChainPoint {
slot: number
id: string // block hash
}
interface BlockResult {
direction: 'forward' | 'backward'
block?: Block
point?: ChainPoint
}
class ChainSyncClient {
private ws: WebSocket | null = null
private messageHandler: ((result: BlockResult) => Promise<void>) | null = null
async connect(): Promise<void> {
return new Promise((resolve, reject) => {
this.ws = new WebSocket(`${WS_URL}?apikey=${API_KEY}`)
this.ws.on('open', () => {
console.log('Connected to chain sync')
resolve()
})
this.ws.on('message', async (data) => {
const msg = JSON.parse(data.toString())
if (msg.result && this.messageHandler) {
await this.messageHandler(msg.result)
}
})
this.ws.on('error', reject)
this.ws.on('close', () => {
console.log('Chain sync disconnected')
// Implement reconnection logic here
})
})
}
async findIntersection(points: (ChainPoint | 'origin')[]): Promise<ChainPoint> {
return new Promise((resolve) => {
const handler = this.messageHandler
this.messageHandler = async (result: any) => {
if (result.intersection) {
this.messageHandler = handler
resolve(result.intersection)
}
}
this.ws?.send(JSON.stringify({
jsonrpc: '2.0',
method: 'findIntersection',
params: { points },
id: 'find-intersection'
}))
})
}
requestNext(): void {
this.ws?.send(JSON.stringify({
jsonrpc: '2.0',
method: 'nextBlock',
id: 'next'
}))
}
onBlock(handler: (result: BlockResult) => Promise<void>): void {
this.messageHandler = handler
}
close(): void {
this.ws?.close()
}
}Step 3: Block Processor
Process blocks and detect payments to your addresses:
import { Pool } from 'pg'
interface Block {
id: string // block hash
height: number
slot: number
previousBlock: string
transactions: Transaction[]
}
interface Transaction {
id: string // tx hash
outputs: Output[]
}
interface Output {
address: string
value: { ada: { lovelace: number } }
}
class BlockProcessor {
constructor(private db: Pool) {}
async processBlock(block: Block): Promise<void> {
const client = await this.db.connect()
try {
await client.query('BEGIN')
// 1. Store the block (for rollback tracking)
await client.query(
`INSERT INTO blocks (hash, height, slot, previous_hash)
VALUES ($1, $2, $3, $4)
ON CONFLICT (hash) DO NOTHING`,
[block.id, block.height, block.slot, block.previousBlock]
)
// 2. Process each transaction for payments
for (const tx of block.transactions || []) {
await this.processTransaction(client, tx, block)
}
// 3. Update sync state
await client.query(
`INSERT INTO sync_state (id, last_block_hash, last_block_height, last_slot)
VALUES (1, $1, $2, $3)
ON CONFLICT (id) DO UPDATE SET
last_block_hash = $1,
last_block_height = $2,
last_slot = $3,
updated_at = NOW()`,
[block.id, block.height, block.slot]
)
// 4. Update confirmations for pending payments
await client.query(
`UPDATE incoming_payments
SET status = CASE
WHEN $1 - block_height >= 15 THEN 'confirmed'
ELSE status
END,
confirmed_at = CASE
WHEN $1 - block_height >= 15 AND confirmed_at IS NULL THEN NOW()
ELSE confirmed_at
END
WHERE status = 'pending'`,
[block.height]
)
await client.query('COMMIT')
console.log(`Block ${block.height}: ${block.transactions?.length || 0} txs`)
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
private async processTransaction(
client: any,
tx: Transaction,
block: Block
): Promise<void> {
for (let i = 0; i < tx.outputs.length; i++) {
const output = tx.outputs[i]
// Check if output is to one of our payment addresses
const result = await client.query(
`SELECT reference_id, reference_type
FROM payment_addresses WHERE address = $1`,
[output.address]
)
if (result.rows.length > 0) {
const { reference_id, reference_type } = result.rows[0]
const amount = output.value.ada.lovelace
// Record the payment
await client.query(
`INSERT INTO incoming_payments
(tx_hash, output_index, address, reference_id, reference_type,
amount_lovelace, block_hash, block_height)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
ON CONFLICT (tx_hash, output_index) DO NOTHING`,
[tx.id, i, output.address, reference_id, reference_type,
amount, block.id, block.height]
)
console.log(`Payment detected: ${amount / 1_000_000} ADA for ${reference_type}:${reference_id}`)
}
}
}
}Step 4: Rollback Handler
Handle chain rollbacks safely:
class RollbackHandler {
constructor(private db: Pool) {}
async handleRollback(rollbackPoint: ChainPoint): Promise<void> {
const client = await this.db.connect()
try {
await client.query('BEGIN')
// 1. Find all blocks being rolled back
const result = await client.query(
`SELECT hash, height FROM blocks
WHERE height > (SELECT height FROM blocks WHERE hash = $1)
ORDER BY height DESC`,
[rollbackPoint.id]
)
const rolledBackBlocks = result.rows
if (rolledBackBlocks.length === 0) {
await client.query('COMMIT')
return
}
console.log(`Rolling back ${rolledBackBlocks.length} blocks`)
// 2. Handle payments from rolled-back blocks
for (const block of rolledBackBlocks) {
// Mark payments as rolled back (unless already processed)
await client.query(
`UPDATE incoming_payments
SET status = 'rolled_back'
WHERE block_hash = $1 AND status IN ('pending', 'confirmed')`,
[block.hash]
)
// Check for payments that were already processed (ALERT!)
const processed = await client.query(
`SELECT * FROM incoming_payments
WHERE block_hash = $1 AND status = 'processed'`,
[block.hash]
)
if (processed.rows.length > 0) {
console.error('CRITICAL: Processed payments rolled back!', processed.rows)
// Send alert to operations team immediately
// These require manual review and possible reversal
}
}
// 3. Delete rolled-back blocks
await client.query(
`DELETE FROM blocks
WHERE height > (SELECT height FROM blocks WHERE hash = $1)`,
[rollbackPoint.id]
)
// 4. Update sync state
const rollbackBlock = await client.query(
'SELECT height FROM blocks WHERE hash = $1',
[rollbackPoint.id]
)
await client.query(
`UPDATE sync_state SET
last_block_hash = $1,
last_block_height = $2,
last_slot = $3,
updated_at = NOW()`,
[rollbackPoint.id, rollbackBlock.rows[0].height, rollbackPoint.slot]
)
await client.query('COMMIT')
} catch (error) {
await client.query('ROLLBACK')
throw error
} finally {
client.release()
}
}
}Step 5: Main Monitor Loop
Tie it all together:
async function startPaymentMonitor() {
const db = new Pool({ connectionString: process.env.DATABASE_URL })
const chainSync = new ChainSyncClient()
const blockProcessor = new BlockProcessor(db)
const rollbackHandler = new RollbackHandler(db)
await chainSync.connect()
// Get last synced point from database
const syncState = await db.query('SELECT * FROM sync_state WHERE id = 1')
let startPoints: (ChainPoint | 'origin')[]
if (syncState.rows.length > 0) {
startPoints = [{
id: syncState.rows[0].last_block_hash,
slot: syncState.rows[0].last_slot
}, 'origin'] // Fallback to origin if our point is gone
} else {
startPoints = ['origin']
}
// Find intersection with chain
const intersection = await chainSync.findIntersection(startPoints)
console.log(`Starting sync from block ${intersection.id.slice(0, 8)}...`)
// Set up block handler
chainSync.onBlock(async (result) => {
if (result.direction === 'forward' && result.block) {
await blockProcessor.processBlock(result.block)
} else if (result.direction === 'backward' && result.point) {
await rollbackHandler.handleRollback(result.point)
}
chainSync.requestNext()
})
// Start the sync loop
chainSync.requestNext()
}
startPaymentMonitor().catch(console.error)Processing Confirmed Payments
Run a separate job to act on confirmed payments:
async function processConfirmedPayments(db: Pool) {
// Get confirmed but not yet processed payments
const result = await db.query(
`SELECT * FROM incoming_payments
WHERE status = 'confirmed'
FOR UPDATE SKIP LOCKED` // Allow parallel processing
)
for (const payment of result.rows) {
try {
// Your business logic here
await handlePayment(payment)
// Mark as processed
await db.query(
`UPDATE incoming_payments
SET status = 'processed', processed_at = NOW()
WHERE id = $1`,
[payment.id]
)
console.log(`Processed: ${payment.amount_lovelace / 1_000_000} ADA for ${payment.reference_type}:${payment.reference_id}`)
} catch (error) {
console.error(`Failed to process payment ${payment.id}:`, error)
// Leave as 'confirmed' for retry
}
}
}
async function handlePayment(payment: IncomingPayment) {
switch (payment.reference_type) {
case 'order':
await fulfillOrder(payment.reference_id, payment.amount_lovelace)
break
case 'user':
await creditUserAccount(payment.reference_id, payment.amount_lovelace)
break
case 'invoice':
await markInvoicePaid(payment.reference_id)
break
}
}Production Considerations
Production Checklist
- Health monitoring - Alert if sync falls behind tip by >100 blocks
- Reconnection logic - Auto-reconnect with exponential backoff
- Duplicate handling - The UNIQUE constraint ensures idempotency
- Metrics - Track payments detected, confirmations, processing latency
- Backup verification - Periodically verify balances match chain state
Testing Your Integration
Use the Preprod testnet for development:
# Use preprod endpoint
wss://api.nacho.builders/v1/preprod/ogmios
# Get testnet ADA from faucet
# https://docs.cardano.org/cardano-testnets/tools/faucetCommon Pitfalls
| Pitfall | Solution |
|---|---|
| Processing unconfirmed payments | Wait for 15+ confirmations before acting |
| Not handling rollbacks | Always implement rollback logic |
| Double-processing on restart | Use UNIQUE constraints and check before insert |
| Blocking on slow processing | Process blocks first, handle business logic async |
Next Steps
Now that you can monitor incoming payments, learn to send payments:
Next: Sending Payments
Was this page helpful?