Intermediate15 min read
Edit on GitHub

Trading Bot Patterns

Build real-time trading bots with efficient pool monitoring using asset filters and chain synchronization

Build efficient trading bots that monitor DEX pools in real-time. This guide covers the optimal pattern of combining asset-filtered UTxO queries with chain synchronization for true real-time state tracking.

The Pattern: Initial Query + State Derivation

Trading bots need two things:

  1. Current pool state - What are the reserves right now?
  2. Real-time updates - When does the pool change?

The most efficient pattern:

  1. Connect chainsync and get the current tip slot
  2. Query initial state with asset filter (instant from cache)
  3. Derive all future state from chainsync block data (no re-queries!)

After initialization, state is derived purely from chainsync data. This gives you zero-latency updates with minimal API load - just one initial query plus the chainsync stream.

Why State Derivation?

ApproachLatencyAPI LoadComplexity
Polling every N secondsN secondsHighLow
Chainsync + re-query~20ms per changeMediumMedium
Chainsync + derive state~0msMinimalHigher

Chainsync gives you complete transaction data:

  • inputs - UTxOs being consumed (remove from state)
  • outputs - New UTxOs created (add matching ones to state)

Initialization Sequence

The prewarmed cache includes metadata about exactly which slot the data is from:

{
  "result": [...utxos...],
  "_cache": {
    "slot": 142857000,
    "height": 11234567,
    "hash": "abc123...",
    "age": 45
  }
}

Use this for precise chainsync alignment:

State Derivation Pattern

1
Query Initial State
2
Connect ChainSync
3
Derive from Blocks
4
React to Changes

This guarantees zero gaps and zero redundant processing.

Complete Example: MIN/ADA Pool Monitor

#!/usr/bin/env python3
"""
Real-time MIN/ADA pool monitor with state derivation from chainsync.

After initialization, state is derived purely from block data - no re-queries!

Requirements: pip install websockets requests certifi
Usage: export NACHO_API_KEY="napi_..." && python pool_monitor.py
"""

import asyncio
import json
import os
import ssl
import certifi
from datetime import datetime
from typing import Dict, Optional

import requests
import websockets

MINSWAP_ADDRESS = "addr1z84q0denmyep98ph3tmzwsmw0j7zau9ljmsqx6a4rvaau66j2c79gy9l76sdg0xwhd7r0c0kna0tycz4y5s6mlenh8pq777e2a"
MIN_POLICY = "29d222ce763455e3d7a09a665ce554f00ac89d2e99a1a83d267170c6"


class PoolState:
  """Manages pool UTxO state with derivation from chainsync."""

  def __init__(self):
      # Store UTxOs indexed by "txhash#outputindex" for O(1) lookups
      self.utxos: Dict[str, dict] = {}
      # Track which slot we initialized from to skip duplicate blocks
      self.initialized_at_slot: Optional[int] = None

  def load_initial(self, utxos: list, tip_slot: int):
      """Load initial state from UTxO query."""
      # Build lookup table: "txhash#index" -> full UTxO data
      self.utxos = {
          f"{u['transaction']['id']}#{u['index']}": u
          for u in utxos
      }
      self.initialized_at_slot = tip_slot

  def apply_block(self, block: dict) -> bool:
      """
      Derive new state from block data. Returns True if any pool UTxO changed.

      Key insight: Chainsync gives us complete transaction data, so we can
      maintain accurate state by processing inputs (removals) and outputs (additions).
      """
      changed = False

      for tx in block.get("transactions", []):
          # STEP 1: Remove consumed UTxOs (inputs being spent)
          # When a UTxO is used as an input, it's no longer unspent
          for inp in tx.get("inputs", []):
              key = f"{inp['transaction']['id']}#{inp['index']}"
              if key in self.utxos:
                  del self.utxos[key]
                  changed = True

          # STEP 2: Add new UTxOs that match our filter criteria
          # Only track outputs sent to Minswap address with MIN tokens
          tx_id = tx.get("id")
          for idx, out in enumerate(tx.get("outputs", [])):
              # Skip outputs not at our target DEX address
              if out.get("address") != MINSWAP_ADDRESS:
                  continue
              # Skip outputs that don't contain MIN tokens
              if MIN_POLICY not in out.get("value", {}):
                  continue

              # This is a new pool UTxO - add it to our state
              key = f"{tx_id}#{idx}"
              self.utxos[key] = {
                  "transaction": {"id": tx_id},
                  "index": idx,
                  "address": out["address"],
                  "value": out["value"],
              }
              changed = True

      return changed

  def calculate_price(self) -> float:
      """Calculate MIN/ADA price from largest pool by ADA reserves."""
      if not self.utxos:
          return 0.0
      # Find pool with most ADA (main liquidity pool, not fragment UTxOs)
      pool = max(self.utxos.values(), key=lambda u: u["value"]["ada"]["lovelace"])
      # Extract reserve amounts
      ada = pool["value"]["ada"]["lovelace"]
      min_qty = list(pool["value"].get(MIN_POLICY, {}).values())[0]
      # Price = ADA per MIN token (convert lovelace to ADA)
      return (ada / 1e6) / (min_qty / 1e6) if min_qty else 0.0


class PoolMonitor:
  def __init__(self, api_key: str):
      self.api_key = api_key
      self.state = PoolState()
      self.last_price = 0.0  # Track previous price to detect changes

  def query_initial_utxos(self) -> list:
      """
      Query UTxOs with server-side asset filter.
      This is much faster than fetching all UTxOs and filtering client-side.
      """
      resp = requests.post(
          "https://api.nacho.builders/v1/ogmios",
          json={
              "jsonrpc": "2.0",
              "method": "queryLedgerState/utxo",
              "params": {
                  "addresses": [MINSWAP_ADDRESS],  # Filter by DEX contract address
                  "assets": [{"policyId": MIN_POLICY}]  # Only UTxOs with MIN tokens
              },
              "id": 1
          },
          headers={"Content-Type": "application/json", "apikey": self.api_key},
          timeout=30
      )
      return resp.json()["result"]

  async def on_price_change(self, old: float, new: float, height: int):
      """Called whenever pool reserves change. Add your trading logic here."""
      change = ((new - old) / old) * 100
      ts = datetime.now().strftime("%H:%M:%S.%f")[:-3]
      print(f"[{ts}] Block {height}: {old:.8f} -> {new:.8f} ({change:+.4f}%)")

      # === YOUR TRADING LOGIC HERE ===
      # Example: if change < -5.0: execute_buy_order()

  async def monitor(self):
      """
      Main monitoring loop using the state derivation pattern:
      1. Connect to chainsync and get current tip
      2. Query initial pool state (one-time)
      3. Derive all future state changes from block data (no re-queries!)
      """
      uri = f"wss://api.nacho.builders/v1/ogmios?apikey={self.api_key}"
      # Create SSL context with proper certificate bundle for secure connection
      ssl_ctx = ssl.create_default_context(cafile=certifi.where())

      async with websockets.connect(uri, ping_interval=30, ssl=ssl_ctx) as ws:
          # STEP 1: Find intersection to get the current chain tip
          # Using "origin" starts from genesis but returns the current tip info
          await ws.send(json.dumps({
              "jsonrpc": "2.0", "method": "findIntersection",
              "params": {"points": ["origin"]}, "id": "find"
          }))
          resp = json.loads(await ws.recv())
          tip_slot = resp.get("result", {}).get("tip", {}).get("slot", 0)

          # STEP 2: Query initial state at this slot (this is the ONLY query we need!)
          self.state.load_initial(self.query_initial_utxos(), tip_slot)
          self.last_price = self.state.calculate_price()
          print(f"Initialized: {len(self.state.utxos)} UTxOs, price: {self.last_price:.8f}")

          # STEP 3: Start the block stream - all future state is derived, not queried
          await ws.send(json.dumps({
              "jsonrpc": "2.0", "method": "nextBlock", "id": "next"
          }))

          # Main event loop: process blocks as they arrive
          while True:
              data = json.loads(await ws.recv())
              result = data.get("result", {})

              if result.get("direction") == "forward":
                  # New block received - derive state changes from transactions
                  block = result.get("block", {})

                  # Skip blocks at or before our initial query (already in state)
                  if block.get("slot", 0) <= self.state.initialized_at_slot:
                      pass
                  elif self.state.apply_block(block):
                      # State changed - check if price moved
                      new_price = self.state.calculate_price()
                      if abs(self.last_price - new_price) > 1e-9:
                          await self.on_price_change(
                              self.last_price, new_price, block.get("height", 0)
                          )
                          self.last_price = new_price

              elif result.get("direction") == "backward":
                  # Chain rollback detected (rare, ~1-5 per day)
                  # Re-query state since we can't easily undo derived state
                  slot = result.get("point", {}).get("slot", 0)
                  self.state.load_initial(self.query_initial_utxos(), slot)
                  self.last_price = self.state.calculate_price()

              # Request the next block (continues the stream)
              await ws.send(json.dumps({
                  "jsonrpc": "2.0", "method": "nextBlock", "id": "next"
              }))


if __name__ == "__main__":
  asyncio.run(PoolMonitor(os.environ["NACHO_API_KEY"]).monitor())

How State Derivation Works

State Derivation

1
Block Arrives
2
Remove InputsUTxOs being spent
3
Add OutputsMatching filter
4
State Updated!Zero latency

The key insight: chainsync provides complete transaction data, so you can maintain accurate state without any additional queries.

Handling Rollbacks

Chainsync can report rollbacks (chain reorganizations). When direction === "backward":

elif result.get("direction") == "backward":
    # Rollback detected - re-initialize from current state
    slot = result.get("point", {}).get("slot", 0)
    self.state.load_initial(self.query_initial_utxos(), slot)

Rollbacks are rare (a few per day at most), so the occasional re-query is acceptable.

For production systems, you could maintain historical state snapshots to handle rollbacks without re-querying. This adds complexity but eliminates all re-queries.

Monitoring Multiple Pools

Track multiple tokens by expanding the filter:

WATCHED_TOKENS = {
    "29d222ce763455e3d7a09a665ce554f00ac89d2e99a1a83d267170c6": "MIN",
    "279c909f348e533da5808898f87f9a14bb2c3dfbbacccd631d927a3f": "SNEK",
}

# Query with multiple asset filters
params = {
    "addresses": [MINSWAP_ADDRESS],
    "assets": [{"policyId": p} for p in WATCHED_TOKENS.keys()]
}

Cost Analysis

OperationFrequencyCost
Initial UTxO queryOnce at startup1 request
Chainsync messages~2 per block (~20 sec)Per-message billing
Rollback re-query~1-5 per day1 request each

24-hour estimate:

  • ~8,640 blocks/day × 2 messages = ~17,280 chainsync messages
  • ~3 rollback re-queries
  • Total: ~17,283 billable operations

Compare to polling every 10 seconds: 8,640 queries/day with 3MB each = 25GB bandwidth.

Next Steps

Was this page helpful?