Beginner10 min read
Edit on GitHub

Python

Build Cardano applications with PyCardano and Nacho API

Python is an excellent choice for backend services, automation scripts, and data analysis on Cardano. This guide shows you how to integrate Nacho API with PyCardano, the most popular Python library for Cardano development.

Installation

pip install pycardano aiohttp

For async support (recommended for production):

pip install pycardano aiohttp asyncio

Creating a Chain Context

PyCardano uses a "ChainContext" to interact with the blockchain. Here's a custom context for Nacho API:

import json
from typing import Dict, List, Optional, Union
from dataclasses import dataclass

import aiohttp
from pycardano import (
    ChainContext,
    ProtocolParameters,
    UTxO,
    TransactionInput,
    TransactionOutput,
    TransactionId,
    Address,
    Value,
    MultiAsset,
    AssetName,
    ScriptHash,
)


class NachoChainContext(ChainContext):
    """Custom ChainContext that uses Nacho API as the backend."""

    def __init__(self, api_key: str, network: str = "mainnet"):
        self.api_key = api_key
        self.network = network
        self.base_url = "https://api.nacho.builders/v1/ogmios"
        self._protocol_params: Optional[ProtocolParameters] = None

    async def _rpc(self, method: str, params: dict = None) -> dict:
        """Make a JSON-RPC call to Nacho API."""
        async with aiohttp.ClientSession() as session:
            payload = {
                "jsonrpc": "2.0",
                "method": method,
                "params": params or {},
                "id": 1
            }
            headers = {
                "Content-Type": "application/json",
                "Authorization": f"Bearer {self.api_key}"
            }

            async with session.post(self.base_url, json=payload, headers=headers) as response:
                data = await response.json()
                if "error" in data:
                    raise Exception(f"RPC Error: {data['error']['message']}")
                return data.get("result", {})

    def _rpc_sync(self, method: str, params: dict = None) -> dict:
        """Synchronous RPC call for compatibility."""
        import asyncio
        return asyncio.get_event_loop().run_until_complete(self._rpc(method, params))

    @property
    def protocol_param(self) -> ProtocolParameters:
        """Get current protocol parameters."""
        if self._protocol_params is None:
            result = self._rpc_sync("queryLedgerState/protocolParameters")
            self._protocol_params = self._map_protocol_params(result)
        return self._protocol_params

    def _map_protocol_params(self, ogmios_params: dict) -> ProtocolParameters:
        """Map Ogmios protocol parameters to PyCardano format."""
        return ProtocolParameters(
            min_fee_constant=ogmios_params["minFeeConstant"]["ada"]["lovelace"],
            min_fee_coefficient=ogmios_params["minFeeCoefficient"],
            max_tx_size=ogmios_params["maxTransactionSize"]["bytes"],
            max_val_size=ogmios_params["maxValueSize"]["bytes"],
            key_deposit=ogmios_params["stakeCredentialDeposit"]["ada"]["lovelace"],
            pool_deposit=ogmios_params["stakePoolDeposit"]["ada"]["lovelace"],
            coins_per_utxo_byte=ogmios_params["minUtxoDepositCoefficient"],
            collateral_percent=ogmios_params["collateralPercentage"],
            max_collateral_inputs=ogmios_params["maxCollateralInputs"],
            price_mem=ogmios_params["scriptExecutionPrices"]["memory"],
            price_step=ogmios_params["scriptExecutionPrices"]["cpu"],
            max_tx_ex_mem=ogmios_params["maxExecutionUnitsPerTransaction"]["memory"],
            max_tx_ex_steps=ogmios_params["maxExecutionUnitsPerTransaction"]["cpu"],
        )

    def utxos(self, address: Union[str, Address]) -> List[UTxO]:
        """Get UTxOs for an address."""
        addr_str = str(address) if isinstance(address, Address) else address
        result = self._rpc_sync("queryLedgerState/utxo", {"addresses": [addr_str]})
        return [self._map_utxo(u) for u in result]

    def _map_utxo(self, ogmios_utxo: dict) -> UTxO:
        """Map Ogmios UTxO to PyCardano format."""
        tx_id = TransactionId.from_primitive(ogmios_utxo["transaction"]["id"])
        tx_in = TransactionInput(tx_id, ogmios_utxo["index"])

        # Build value
        lovelace = ogmios_utxo["value"]["ada"]["lovelace"]
        multi_asset = None

        if "assets" in ogmios_utxo["value"] and ogmios_utxo["value"]["assets"]:
            multi_asset = MultiAsset()
            for policy_id, tokens in ogmios_utxo["value"]["assets"].items():
                script_hash = ScriptHash.from_primitive(policy_id)
                for token_name, amount in tokens.items():
                    asset_name = AssetName(bytes.fromhex(token_name) if token_name else b"")
                    multi_asset[script_hash] = multi_asset.get(script_hash, {})
                    multi_asset[script_hash][asset_name] = amount

        value = Value(lovelace, multi_asset)
        address = Address.from_primitive(ogmios_utxo["address"])
        tx_out = TransactionOutput(address, value)

        return UTxO(tx_in, tx_out)

    def submit_tx(self, tx_cbor: bytes) -> str:
        """Submit a transaction."""
        result = self._rpc_sync("submitTransaction", {
            "transaction": {"cbor": tx_cbor.hex()}
        })
        return result["transaction"]["id"]

    async def submit_tx_async(self, tx_cbor: bytes) -> str:
        """Submit a transaction asynchronously."""
        result = await self._rpc("submitTransaction", {
            "transaction": {"cbor": tx_cbor.hex()}
        })
        return result["transaction"]["id"]

Basic Usage

Query UTxOs

import os
from pycardano import Address

# Initialize context
api_key = os.environ["NACHO_API_KEY"]
context = NachoChainContext(api_key, network="mainnet")

# Query UTxOs
address = Address.from_primitive("addr1qy2kp7ux2qx7g9h6...")
utxos = context.utxos(address)

# Print balance
total_lovelace = sum(utxo.output.amount.coin for utxo in utxos)
print(f"Balance: {total_lovelace / 1_000_000} ADA")

Build and Submit a Transaction

from pycardano import (
    TransactionBuilder,
    PaymentSigningKey,
    PaymentVerificationKey,
    Address,
)

# Load your signing key
signing_key = PaymentSigningKey.load("payment.skey")
verification_key = PaymentVerificationKey.from_signing_key(signing_key)

# Build sender address
sender_address = Address(verification_key.hash(), network=1)  # 1 = mainnet

# Create transaction builder
builder = TransactionBuilder(context)
builder.add_input_address(sender_address)
builder.add_output(
    TransactionOutput(
        Address.from_primitive("addr1...recipient"),
        5_000_000  # 5 ADA in lovelace
    )
)

# Build and sign
signed_tx = builder.build_and_sign(
    signing_keys=[signing_key],
    change_address=sender_address
)

# Submit
tx_id = context.submit_tx(signed_tx.to_cbor())
print(f"Transaction submitted: {tx_id}")

Async Usage

For high-performance applications, use async/await:

import asyncio
from typing import List


class AsyncNachoClient:
    """Async client for Nacho API."""

    def __init__(self, api_key: str):
        self.api_key = api_key
        self.base_url = "https://api.nacho.builders/v1/ogmios"
        self._session: Optional[aiohttp.ClientSession] = None

    async def __aenter__(self):
        self._session = aiohttp.ClientSession(
            headers={
                "Content-Type": "application/json",
                "Authorization": f"Bearer {self.api_key}"
            }
        )
        return self

    async def __aexit__(self, *args):
        if self._session:
            await self._session.close()

    async def rpc(self, method: str, params: dict = None) -> dict:
        payload = {
            "jsonrpc": "2.0",
            "method": method,
            "params": params or {},
            "id": 1
        }

        async with self._session.post(self.base_url, json=payload) as response:
            data = await response.json()
            if "error" in data:
                raise Exception(f"RPC Error: {data['error']['message']}")
            return data.get("result", {})

    async def query_tip(self) -> dict:
        """Get current blockchain tip."""
        return await self.rpc("queryNetwork/tip")

    async def query_utxos(self, addresses: List[str]) -> list:
        """Query UTxOs for multiple addresses."""
        return await self.rpc("queryLedgerState/utxo", {"addresses": addresses})

    async def submit_transaction(self, tx_cbor: str) -> str:
        """Submit a signed transaction."""
        result = await self.rpc("submitTransaction", {
            "transaction": {"cbor": tx_cbor}
        })
        return result["transaction"]["id"]


# Usage
async def main():
    api_key = os.environ["NACHO_API_KEY"]

    async with AsyncNachoClient(api_key) as client:
        # Get current tip
        tip = await client.query_tip()
        print(f"Current block: {tip['height']}")

        # Query multiple addresses concurrently
        addresses = [
            "addr1qy2kp7ux2qx7g9h6...",
            "addr1qx3mp8vx3rx8h0j7...",
            "addr1qz4np9wy4sx9j1k8...",
        ]

        utxos = await client.query_utxos(addresses)
        print(f"Found {len(utxos)} UTxOs")


if __name__ == "__main__":
    asyncio.run(main())

WebSocket Chain Sync

For real-time blockchain monitoring:

import asyncio
import websockets
import json


async def chain_sync(api_key: str, start_point: dict = None):
    """
    Synchronize with the blockchain from a given point.

    Args:
        api_key: Your Nacho API key
        start_point: Optional starting point {"slot": int, "id": str}
    """
    url = f"wss://api.nacho.builders/v1/ogmios?apiKey={api_key}"

    async with websockets.connect(url) as ws:
        # Find intersection (where to start syncing from)
        if start_point:
            await ws.send(json.dumps({
                "jsonrpc": "2.0",
                "method": "findIntersection",
                "params": {"points": [start_point]},
                "id": 1
            }))
            response = await ws.recv()
            print(f"Intersection: {response}")

        # Request next block
        await ws.send(json.dumps({
            "jsonrpc": "2.0",
            "method": "nextBlock",
            "id": 2
        }))

        while True:
            response = await ws.recv()
            data = json.loads(response)

            if "result" in data:
                result = data["result"]

                if result.get("direction") == "forward":
                    block = result["block"]
                    print(f"New block: {block['height']} - {len(block.get('transactions', []))} transactions")

                    # Process transactions
                    for tx in block.get("transactions", []):
                        process_transaction(tx)

                elif result.get("direction") == "backward":
                    # Rollback - block was rolled back
                    print(f"Rollback to: {result['point']}")

            # Request next block
            await ws.send(json.dumps({
                "jsonrpc": "2.0",
                "method": "nextBlock",
                "id": data["id"] + 1
            }))


def process_transaction(tx: dict):
    """Process a transaction from a block."""
    tx_id = tx["id"]
    # Your transaction processing logic here
    pass


# Run the sync
if __name__ == "__main__":
    api_key = os.environ["NACHO_API_KEY"]
    asyncio.run(chain_sync(api_key))

Error Handling

Robust error handling for production:

import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class ErrorCategory(Enum):
    NETWORK = "network"
    AUTHENTICATION = "authentication"
    RATE_LIMIT = "rate_limit"
    VALIDATION = "validation"
    SERVER = "server"


@dataclass
class NachoError(Exception):
    message: str
    category: ErrorCategory
    retryable: bool
    original_error: Optional[Exception] = None


async def robust_rpc_call(client: AsyncNachoClient, method: str, params: dict = None, max_retries: int = 3):
    """Make an RPC call with automatic retry and error categorization."""
    last_error = None

    for attempt in range(max_retries):
        try:
            return await client.rpc(method, params)

        except aiohttp.ClientConnectionError as e:
            last_error = NachoError(
                message=f"Connection failed: {e}",
                category=ErrorCategory.NETWORK,
                retryable=True,
                original_error=e
            )
            await asyncio.sleep(2 ** attempt)  # Exponential backoff

        except aiohttp.ClientResponseError as e:
            if e.status == 401:
                raise NachoError(
                    message="Invalid API key",
                    category=ErrorCategory.AUTHENTICATION,
                    retryable=False,
                    original_error=e
                )
            elif e.status == 429:
                retry_after = int(e.headers.get("Retry-After", 60))
                last_error = NachoError(
                    message=f"Rate limited, retry after {retry_after}s",
                    category=ErrorCategory.RATE_LIMIT,
                    retryable=True,
                    original_error=e
                )
                await asyncio.sleep(retry_after)
            elif e.status >= 500:
                last_error = NachoError(
                    message=f"Server error: {e.status}",
                    category=ErrorCategory.SERVER,
                    retryable=True,
                    original_error=e
                )
                await asyncio.sleep(2 ** attempt)
            else:
                raise NachoError(
                    message=f"Request failed: {e}",
                    category=ErrorCategory.VALIDATION,
                    retryable=False,
                    original_error=e
                )

        except Exception as e:
            logger.error(f"Unexpected error: {e}")
            raise

    if last_error:
        raise last_error

Best Practices

Use Connection Pooling

# Create a session once and reuse it
session = aiohttp.ClientSession(
    connector=aiohttp.TCPConnector(limit=10),  # Max 10 connections
    timeout=aiohttp.ClientTimeout(total=30)
)

Batch Requests When Possible

# Instead of N separate calls
for address in addresses:
    utxos = await client.query_utxos([address])

# Make one call with all addresses
utxos = await client.query_utxos(addresses)

Cache Protocol Parameters

from functools import lru_cache
from datetime import datetime, timedelta


class CachedNachoClient(AsyncNachoClient):
    _protocol_params_cache = None
    _cache_timestamp = None
    _cache_ttl = timedelta(minutes=5)

    async def get_protocol_params(self):
        now = datetime.now()
        if (
            self._protocol_params_cache is None or
            self._cache_timestamp is None or
            now - self._cache_timestamp > self._cache_ttl
        ):
            self._protocol_params_cache = await self.rpc("queryLedgerState/protocolParameters")
            self._cache_timestamp = now

        return self._protocol_params_cache

Next Steps

Was this page helpful?