Beginner10 min read
Edit on GitHubPython
Build Cardano applications with PyCardano and Nacho API
Python is an excellent choice for backend services, automation scripts, and data analysis on Cardano. This guide shows you how to integrate Nacho API with PyCardano, the most popular Python library for Cardano development.
Installation
pip install pycardano aiohttpFor async support (recommended for production):
pip install pycardano aiohttp asyncioCreating a Chain Context
PyCardano uses a "ChainContext" to interact with the blockchain. Here's a custom context for Nacho API:
import json
from typing import Dict, List, Optional, Union
from dataclasses import dataclass
import aiohttp
from pycardano import (
ChainContext,
ProtocolParameters,
UTxO,
TransactionInput,
TransactionOutput,
TransactionId,
Address,
Value,
MultiAsset,
AssetName,
ScriptHash,
)
class NachoChainContext(ChainContext):
"""Custom ChainContext that uses Nacho API as the backend."""
def __init__(self, api_key: str, network: str = "mainnet"):
self.api_key = api_key
self.network = network
self.base_url = "https://api.nacho.builders/v1/ogmios"
self._protocol_params: Optional[ProtocolParameters] = None
async def _rpc(self, method: str, params: dict = None) -> dict:
"""Make a JSON-RPC call to Nacho API."""
async with aiohttp.ClientSession() as session:
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": 1
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
async with session.post(self.base_url, json=payload, headers=headers) as response:
data = await response.json()
if "error" in data:
raise Exception(f"RPC Error: {data['error']['message']}")
return data.get("result", {})
def _rpc_sync(self, method: str, params: dict = None) -> dict:
"""Synchronous RPC call for compatibility."""
import asyncio
return asyncio.get_event_loop().run_until_complete(self._rpc(method, params))
@property
def protocol_param(self) -> ProtocolParameters:
"""Get current protocol parameters."""
if self._protocol_params is None:
result = self._rpc_sync("queryLedgerState/protocolParameters")
self._protocol_params = self._map_protocol_params(result)
return self._protocol_params
def _map_protocol_params(self, ogmios_params: dict) -> ProtocolParameters:
"""Map Ogmios protocol parameters to PyCardano format."""
return ProtocolParameters(
min_fee_constant=ogmios_params["minFeeConstant"]["ada"]["lovelace"],
min_fee_coefficient=ogmios_params["minFeeCoefficient"],
max_tx_size=ogmios_params["maxTransactionSize"]["bytes"],
max_val_size=ogmios_params["maxValueSize"]["bytes"],
key_deposit=ogmios_params["stakeCredentialDeposit"]["ada"]["lovelace"],
pool_deposit=ogmios_params["stakePoolDeposit"]["ada"]["lovelace"],
coins_per_utxo_byte=ogmios_params["minUtxoDepositCoefficient"],
collateral_percent=ogmios_params["collateralPercentage"],
max_collateral_inputs=ogmios_params["maxCollateralInputs"],
price_mem=ogmios_params["scriptExecutionPrices"]["memory"],
price_step=ogmios_params["scriptExecutionPrices"]["cpu"],
max_tx_ex_mem=ogmios_params["maxExecutionUnitsPerTransaction"]["memory"],
max_tx_ex_steps=ogmios_params["maxExecutionUnitsPerTransaction"]["cpu"],
)
def utxos(self, address: Union[str, Address]) -> List[UTxO]:
"""Get UTxOs for an address."""
addr_str = str(address) if isinstance(address, Address) else address
result = self._rpc_sync("queryLedgerState/utxo", {"addresses": [addr_str]})
return [self._map_utxo(u) for u in result]
def _map_utxo(self, ogmios_utxo: dict) -> UTxO:
"""Map Ogmios UTxO to PyCardano format."""
tx_id = TransactionId.from_primitive(ogmios_utxo["transaction"]["id"])
tx_in = TransactionInput(tx_id, ogmios_utxo["index"])
# Build value
lovelace = ogmios_utxo["value"]["ada"]["lovelace"]
multi_asset = None
if "assets" in ogmios_utxo["value"] and ogmios_utxo["value"]["assets"]:
multi_asset = MultiAsset()
for policy_id, tokens in ogmios_utxo["value"]["assets"].items():
script_hash = ScriptHash.from_primitive(policy_id)
for token_name, amount in tokens.items():
asset_name = AssetName(bytes.fromhex(token_name) if token_name else b"")
multi_asset[script_hash] = multi_asset.get(script_hash, {})
multi_asset[script_hash][asset_name] = amount
value = Value(lovelace, multi_asset)
address = Address.from_primitive(ogmios_utxo["address"])
tx_out = TransactionOutput(address, value)
return UTxO(tx_in, tx_out)
def submit_tx(self, tx_cbor: bytes) -> str:
"""Submit a transaction."""
result = self._rpc_sync("submitTransaction", {
"transaction": {"cbor": tx_cbor.hex()}
})
return result["transaction"]["id"]
async def submit_tx_async(self, tx_cbor: bytes) -> str:
"""Submit a transaction asynchronously."""
result = await self._rpc("submitTransaction", {
"transaction": {"cbor": tx_cbor.hex()}
})
return result["transaction"]["id"]Basic Usage
Query UTxOs
import os
from pycardano import Address
# Initialize context
api_key = os.environ["NACHO_API_KEY"]
context = NachoChainContext(api_key, network="mainnet")
# Query UTxOs
address = Address.from_primitive("addr1qy2kp7ux2qx7g9h6...")
utxos = context.utxos(address)
# Print balance
total_lovelace = sum(utxo.output.amount.coin for utxo in utxos)
print(f"Balance: {total_lovelace / 1_000_000} ADA")Build and Submit a Transaction
from pycardano import (
TransactionBuilder,
PaymentSigningKey,
PaymentVerificationKey,
Address,
)
# Load your signing key
signing_key = PaymentSigningKey.load("payment.skey")
verification_key = PaymentVerificationKey.from_signing_key(signing_key)
# Build sender address
sender_address = Address(verification_key.hash(), network=1) # 1 = mainnet
# Create transaction builder
builder = TransactionBuilder(context)
builder.add_input_address(sender_address)
builder.add_output(
TransactionOutput(
Address.from_primitive("addr1...recipient"),
5_000_000 # 5 ADA in lovelace
)
)
# Build and sign
signed_tx = builder.build_and_sign(
signing_keys=[signing_key],
change_address=sender_address
)
# Submit
tx_id = context.submit_tx(signed_tx.to_cbor())
print(f"Transaction submitted: {tx_id}")Async Usage
For high-performance applications, use async/await:
import asyncio
from typing import List
class AsyncNachoClient:
"""Async client for Nacho API."""
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = "https://api.nacho.builders/v1/ogmios"
self._session: Optional[aiohttp.ClientSession] = None
async def __aenter__(self):
self._session = aiohttp.ClientSession(
headers={
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}"
}
)
return self
async def __aexit__(self, *args):
if self._session:
await self._session.close()
async def rpc(self, method: str, params: dict = None) -> dict:
payload = {
"jsonrpc": "2.0",
"method": method,
"params": params or {},
"id": 1
}
async with self._session.post(self.base_url, json=payload) as response:
data = await response.json()
if "error" in data:
raise Exception(f"RPC Error: {data['error']['message']}")
return data.get("result", {})
async def query_tip(self) -> dict:
"""Get current blockchain tip."""
return await self.rpc("queryNetwork/tip")
async def query_utxos(self, addresses: List[str]) -> list:
"""Query UTxOs for multiple addresses."""
return await self.rpc("queryLedgerState/utxo", {"addresses": addresses})
async def submit_transaction(self, tx_cbor: str) -> str:
"""Submit a signed transaction."""
result = await self.rpc("submitTransaction", {
"transaction": {"cbor": tx_cbor}
})
return result["transaction"]["id"]
# Usage
async def main():
api_key = os.environ["NACHO_API_KEY"]
async with AsyncNachoClient(api_key) as client:
# Get current tip
tip = await client.query_tip()
print(f"Current block: {tip['height']}")
# Query multiple addresses concurrently
addresses = [
"addr1qy2kp7ux2qx7g9h6...",
"addr1qx3mp8vx3rx8h0j7...",
"addr1qz4np9wy4sx9j1k8...",
]
utxos = await client.query_utxos(addresses)
print(f"Found {len(utxos)} UTxOs")
if __name__ == "__main__":
asyncio.run(main())WebSocket Chain Sync
For real-time blockchain monitoring:
import asyncio
import websockets
import json
async def chain_sync(api_key: str, start_point: dict = None):
"""
Synchronize with the blockchain from a given point.
Args:
api_key: Your Nacho API key
start_point: Optional starting point {"slot": int, "id": str}
"""
url = f"wss://api.nacho.builders/v1/ogmios?apiKey={api_key}"
async with websockets.connect(url) as ws:
# Find intersection (where to start syncing from)
if start_point:
await ws.send(json.dumps({
"jsonrpc": "2.0",
"method": "findIntersection",
"params": {"points": [start_point]},
"id": 1
}))
response = await ws.recv()
print(f"Intersection: {response}")
# Request next block
await ws.send(json.dumps({
"jsonrpc": "2.0",
"method": "nextBlock",
"id": 2
}))
while True:
response = await ws.recv()
data = json.loads(response)
if "result" in data:
result = data["result"]
if result.get("direction") == "forward":
block = result["block"]
print(f"New block: {block['height']} - {len(block.get('transactions', []))} transactions")
# Process transactions
for tx in block.get("transactions", []):
process_transaction(tx)
elif result.get("direction") == "backward":
# Rollback - block was rolled back
print(f"Rollback to: {result['point']}")
# Request next block
await ws.send(json.dumps({
"jsonrpc": "2.0",
"method": "nextBlock",
"id": data["id"] + 1
}))
def process_transaction(tx: dict):
"""Process a transaction from a block."""
tx_id = tx["id"]
# Your transaction processing logic here
pass
# Run the sync
if __name__ == "__main__":
api_key = os.environ["NACHO_API_KEY"]
asyncio.run(chain_sync(api_key))Error Handling
Robust error handling for production:
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
NETWORK = "network"
AUTHENTICATION = "authentication"
RATE_LIMIT = "rate_limit"
VALIDATION = "validation"
SERVER = "server"
@dataclass
class NachoError(Exception):
message: str
category: ErrorCategory
retryable: bool
original_error: Optional[Exception] = None
async def robust_rpc_call(client: AsyncNachoClient, method: str, params: dict = None, max_retries: int = 3):
"""Make an RPC call with automatic retry and error categorization."""
last_error = None
for attempt in range(max_retries):
try:
return await client.rpc(method, params)
except aiohttp.ClientConnectionError as e:
last_error = NachoError(
message=f"Connection failed: {e}",
category=ErrorCategory.NETWORK,
retryable=True,
original_error=e
)
await asyncio.sleep(2 ** attempt) # Exponential backoff
except aiohttp.ClientResponseError as e:
if e.status == 401:
raise NachoError(
message="Invalid API key",
category=ErrorCategory.AUTHENTICATION,
retryable=False,
original_error=e
)
elif e.status == 429:
retry_after = int(e.headers.get("Retry-After", 60))
last_error = NachoError(
message=f"Rate limited, retry after {retry_after}s",
category=ErrorCategory.RATE_LIMIT,
retryable=True,
original_error=e
)
await asyncio.sleep(retry_after)
elif e.status >= 500:
last_error = NachoError(
message=f"Server error: {e.status}",
category=ErrorCategory.SERVER,
retryable=True,
original_error=e
)
await asyncio.sleep(2 ** attempt)
else:
raise NachoError(
message=f"Request failed: {e}",
category=ErrorCategory.VALIDATION,
retryable=False,
original_error=e
)
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
if last_error:
raise last_errorBest Practices
Use Connection Pooling
# Create a session once and reuse it
session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=10), # Max 10 connections
timeout=aiohttp.ClientTimeout(total=30)
)Batch Requests When Possible
# Instead of N separate calls
for address in addresses:
utxos = await client.query_utxos([address])
# Make one call with all addresses
utxos = await client.query_utxos(addresses)Cache Protocol Parameters
from functools import lru_cache
from datetime import datetime, timedelta
class CachedNachoClient(AsyncNachoClient):
_protocol_params_cache = None
_cache_timestamp = None
_cache_ttl = timedelta(minutes=5)
async def get_protocol_params(self):
now = datetime.now()
if (
self._protocol_params_cache is None or
self._cache_timestamp is None or
now - self._cache_timestamp > self._cache_ttl
):
self._protocol_params_cache = await self.rpc("queryLedgerState/protocolParameters")
self._cache_timestamp = now
return self._protocol_params_cacheNext Steps
- Production Deployment - Best practices for going live
- Chain Synchronization - Real-time blockchain monitoring
- API Reference - Complete endpoint documentation
Was this page helpful?