Trading Bot Patterns
Build real-time trading bots with efficient pool monitoring using asset filters and chain synchronization
Build efficient trading bots that monitor DEX pools in real-time. This guide covers the optimal pattern of combining asset-filtered UTxO queries with chain synchronization for true real-time state tracking.
The Pattern: Initial Query + State Derivation
Trading bots need two things:
- Current pool state - What are the reserves right now?
- Real-time updates - When does the pool change?
The most efficient pattern:
- Connect chainsync and get the current tip slot
- Query initial state with asset filter (instant from cache)
- Derive all future state from chainsync block data (no re-queries!)
After initialization, state is derived purely from chainsync data. This gives you zero-latency updates with minimal API load - just one initial query plus the chainsync stream.
Why State Derivation?
| Approach | Latency | API Load | Complexity |
|---|---|---|---|
| Polling every N seconds | N seconds | High | Low |
| Chainsync + re-query | ~20ms per change | Medium | Medium |
| Chainsync + derive state | ~0ms | Minimal | Higher |
Chainsync gives you complete transaction data:
inputs- UTxOs being consumed (remove from state)outputs- New UTxOs created (add matching ones to state)
Initialization Sequence
The prewarmed cache includes metadata about exactly which slot the data is from:
{
"result": [...utxos...],
"_cache": {
"slot": 142857000,
"height": 11234567,
"hash": "abc123...",
"age": 45
}
}Use this for precise chainsync alignment:
State Derivation Pattern
This guarantees zero gaps and zero redundant processing.
Complete Example: MIN/ADA Pool Monitor
#!/usr/bin/env python3
"""
Real-time MIN/ADA pool monitor with state derivation from chainsync.
After initialization, state is derived purely from block data - no re-queries!
Requirements: pip install websockets requests certifi
Usage: export NACHO_API_KEY="napi_..." && python pool_monitor.py
"""
import asyncio
import json
import os
import ssl
import certifi
from datetime import datetime
from typing import Dict, Optional
import requests
import websockets
MINSWAP_ADDRESS = "addr1z84q0denmyep98ph3tmzwsmw0j7zau9ljmsqx6a4rvaau66j2c79gy9l76sdg0xwhd7r0c0kna0tycz4y5s6mlenh8pq777e2a"
MIN_POLICY = "29d222ce763455e3d7a09a665ce554f00ac89d2e99a1a83d267170c6"
class PoolState:
"""Manages pool UTxO state with derivation from chainsync."""
def __init__(self):
# Store UTxOs indexed by "txhash#outputindex" for O(1) lookups
self.utxos: Dict[str, dict] = {}
# Track which slot we initialized from to skip duplicate blocks
self.initialized_at_slot: Optional[int] = None
def load_initial(self, utxos: list, tip_slot: int):
"""Load initial state from UTxO query."""
# Build lookup table: "txhash#index" -> full UTxO data
self.utxos = {
f"{u['transaction']['id']}#{u['index']}": u
for u in utxos
}
self.initialized_at_slot = tip_slot
def apply_block(self, block: dict) -> bool:
"""
Derive new state from block data. Returns True if any pool UTxO changed.
Key insight: Chainsync gives us complete transaction data, so we can
maintain accurate state by processing inputs (removals) and outputs (additions).
"""
changed = False
for tx in block.get("transactions", []):
# STEP 1: Remove consumed UTxOs (inputs being spent)
# When a UTxO is used as an input, it's no longer unspent
for inp in tx.get("inputs", []):
key = f"{inp['transaction']['id']}#{inp['index']}"
if key in self.utxos:
del self.utxos[key]
changed = True
# STEP 2: Add new UTxOs that match our filter criteria
# Only track outputs sent to Minswap address with MIN tokens
tx_id = tx.get("id")
for idx, out in enumerate(tx.get("outputs", [])):
# Skip outputs not at our target DEX address
if out.get("address") != MINSWAP_ADDRESS:
continue
# Skip outputs that don't contain MIN tokens
if MIN_POLICY not in out.get("value", {}):
continue
# This is a new pool UTxO - add it to our state
key = f"{tx_id}#{idx}"
self.utxos[key] = {
"transaction": {"id": tx_id},
"index": idx,
"address": out["address"],
"value": out["value"],
}
changed = True
return changed
def calculate_price(self) -> float:
"""Calculate MIN/ADA price from largest pool by ADA reserves."""
if not self.utxos:
return 0.0
# Find pool with most ADA (main liquidity pool, not fragment UTxOs)
pool = max(self.utxos.values(), key=lambda u: u["value"]["ada"]["lovelace"])
# Extract reserve amounts
ada = pool["value"]["ada"]["lovelace"]
min_qty = list(pool["value"].get(MIN_POLICY, {}).values())[0]
# Price = ADA per MIN token (convert lovelace to ADA)
return (ada / 1e6) / (min_qty / 1e6) if min_qty else 0.0
class PoolMonitor:
def __init__(self, api_key: str):
self.api_key = api_key
self.state = PoolState()
self.last_price = 0.0 # Track previous price to detect changes
def query_initial_utxos(self) -> list:
"""
Query UTxOs with server-side asset filter.
This is much faster than fetching all UTxOs and filtering client-side.
"""
resp = requests.post(
"https://api.nacho.builders/v1/ogmios",
json={
"jsonrpc": "2.0",
"method": "queryLedgerState/utxo",
"params": {
"addresses": [MINSWAP_ADDRESS], # Filter by DEX contract address
"assets": [{"policyId": MIN_POLICY}] # Only UTxOs with MIN tokens
},
"id": 1
},
headers={"Content-Type": "application/json", "apikey": self.api_key},
timeout=30
)
return resp.json()["result"]
async def on_price_change(self, old: float, new: float, height: int):
"""Called whenever pool reserves change. Add your trading logic here."""
change = ((new - old) / old) * 100
ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
print(f"[{ts}] Block {height}: {old:.8f} -> {new:.8f} ({change:+.4f}%)")
# === YOUR TRADING LOGIC HERE ===
# Example: if change < -5.0: execute_buy_order()
async def monitor(self):
"""
Main monitoring loop using the state derivation pattern:
1. Connect to chainsync and get current tip
2. Query initial pool state (one-time)
3. Derive all future state changes from block data (no re-queries!)
"""
uri = f"wss://api.nacho.builders/v1/ogmios?apikey={self.api_key}"
# Create SSL context with proper certificate bundle for secure connection
ssl_ctx = ssl.create_default_context(cafile=certifi.where())
async with websockets.connect(uri, ping_interval=30, ssl=ssl_ctx) as ws:
# STEP 1: Find intersection to get the current chain tip
# Using "origin" starts from genesis but returns the current tip info
await ws.send(json.dumps({
"jsonrpc": "2.0", "method": "findIntersection",
"params": {"points": ["origin"]}, "id": "find"
}))
resp = json.loads(await ws.recv())
tip_slot = resp.get("result", {}).get("tip", {}).get("slot", 0)
# STEP 2: Query initial state at this slot (this is the ONLY query we need!)
self.state.load_initial(self.query_initial_utxos(), tip_slot)
self.last_price = self.state.calculate_price()
print(f"Initialized: {len(self.state.utxos)} UTxOs, price: {self.last_price:.8f}")
# STEP 3: Start the block stream - all future state is derived, not queried
await ws.send(json.dumps({
"jsonrpc": "2.0", "method": "nextBlock", "id": "next"
}))
# Main event loop: process blocks as they arrive
while True:
data = json.loads(await ws.recv())
result = data.get("result", {})
if result.get("direction") == "forward":
# New block received - derive state changes from transactions
block = result.get("block", {})
# Skip blocks at or before our initial query (already in state)
if block.get("slot", 0) <= self.state.initialized_at_slot:
pass
elif self.state.apply_block(block):
# State changed - check if price moved
new_price = self.state.calculate_price()
if abs(self.last_price - new_price) > 1e-9:
await self.on_price_change(
self.last_price, new_price, block.get("height", 0)
)
self.last_price = new_price
elif result.get("direction") == "backward":
# Chain rollback detected (rare, ~1-5 per day)
# Re-query state since we can't easily undo derived state
slot = result.get("point", {}).get("slot", 0)
self.state.load_initial(self.query_initial_utxos(), slot)
self.last_price = self.state.calculate_price()
# Request the next block (continues the stream)
await ws.send(json.dumps({
"jsonrpc": "2.0", "method": "nextBlock", "id": "next"
}))
if __name__ == "__main__":
asyncio.run(PoolMonitor(os.environ["NACHO_API_KEY"]).monitor())How State Derivation Works
State Derivation
The key insight: chainsync provides complete transaction data, so you can maintain accurate state without any additional queries.
Handling Rollbacks
Chainsync can report rollbacks (chain reorganizations). When direction === "backward":
elif result.get("direction") == "backward":
# Rollback detected - re-initialize from current state
slot = result.get("point", {}).get("slot", 0)
self.state.load_initial(self.query_initial_utxos(), slot)Rollbacks are rare (a few per day at most), so the occasional re-query is acceptable.
For production systems, you could maintain historical state snapshots to handle rollbacks without re-querying. This adds complexity but eliminates all re-queries.
Monitoring Multiple Pools
Track multiple tokens by expanding the filter:
WATCHED_TOKENS = {
"29d222ce763455e3d7a09a665ce554f00ac89d2e99a1a83d267170c6": "MIN",
"279c909f348e533da5808898f87f9a14bb2c3dfbbacccd631d927a3f": "SNEK",
}
# Query with multiple asset filters
params = {
"addresses": [MINSWAP_ADDRESS],
"assets": [{"policyId": p} for p in WATCHED_TOKENS.keys()]
}Cost Analysis
| Operation | Frequency | Cost |
|---|---|---|
| Initial UTxO query | Once at startup | 1 request |
| Chainsync messages | ~2 per block (~20 sec) | Per-message billing |
| Rollback re-query | ~1-5 per day | 1 request each |
24-hour estimate:
- ~8,640 blocks/day × 2 messages = ~17,280 chainsync messages
- ~3 rollback re-queries
- Total: ~17,283 billable operations
Compare to polling every 10 seconds: 8,640 queries/day with 3MB each = 25GB bandwidth.
Next Steps
- Querying DEX Pool Reserves: Get pool reserve data
- Chain Synchronization: Full chainsync documentation
- Submitting Transactions: Execute trades
Was this page helpful?