ResilientDB Python Cache
ResilientDB Python Cache is a library that provides real-time synchronization between ResilientDB and MongoDB. It’s designed to act as a caching layer for blockchain data, making it easier to query and work with data from ResilientDB in your applications.
Interactive Tools
Required: Connection string for MongoDB instance
Required: Name of the database to store blocks
Required: Name of the collection to store blocks
Required: URL of your ResilientDB node
Optional: How often to attempt reconnection if connection is lost
Optional: How often to sync data from ResilientDB
Features
- Real-time synchronization between ResilientDB and MongoDB
- Automatic WebSocket connection management
- Configurable batch processing
- Event-driven architecture
- MongoDB querying capabilities for blockchain data
Installation
The library can be installed directly from PyPI using pip:
pip install resilient-python-cache
Complete Example
Below is a complete example that demonstrates how to set up real-time synchronization between ResilientDB and MongoDB. This example shows:
- How to configure MongoDB and ResilientDB connections
- How to set up event handlers for different scenarios
- How to handle errors and cleanup
- How to run the synchronization indefinitely
import asyncio
from resilient_python_cache import ResilientPythonCache, MongoConfig, ResilientDBConfig
async def main():
# Configure MongoDB connection
mongo_config = MongoConfig(
uri="mongodb://localhost:27017",
db_name="myDatabase",
collection_name="myCollection"
)
# Configure ResilientDB connection
resilient_db_config = ResilientDBConfig(
base_url="resilientdb://crow.resilientdb.com",
http_secure=True,
ws_secure=True
)
# Initialize the cache
cache = ResilientPythonCache(mongo_config, resilient_db_config)
# Set up event handlers for different scenarios
cache.on("connected", lambda: print("WebSocket connected."))
cache.on("data", lambda new_blocks: print("Received new blocks:", new_blocks))
cache.on("error", lambda error: print("Error:", error))
cache.on("closed", lambda: print("Connection closed."))
try:
# Start the synchronization process
await cache.initialize()
print("Synchronization initialized.")
try:
# Keep the process running until interrupted
await asyncio.Future()
except asyncio.CancelledError:
pass
except Exception as error:
print("Error during sync initialization:", error)
finally:
# Ensure proper cleanup of resources
await cache.close()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("Interrupted by user")
Configuration Options
MongoDB Configuration
The MongoDB configuration specifies where and how to store the blockchain data. Here’s what each parameter means:
uri
: The MongoDB connection string (supports both local and remote MongoDB instances)db_name
: The name of the database where blocks will be storedcollection_name
: The name of the collection within the database
mongo_config = MongoConfig(
uri="mongodb://localhost:27017", # MongoDB connection string
db_name="myDatabase", # Database name
collection_name="myCollection" # Collection name
)
ResilientDB Configuration
The ResilientDB configuration controls how the library connects to ResilientDB nodes. Here’s what each parameter does:
base_url
: The URL of your ResilientDB nodehttp_secure
: Whether to use HTTPS for REST API callsws_secure
: Whether to use WSS for WebSocket connectionsreconnect_interval
: How often to attempt reconnection if connection is lostfetch_interval
: How often to sync data from ResilientDB
resilient_db_config = ResilientDBConfig(
base_url="resilientdb://crow.resilientdb.com", # ResilientDB node URL
http_secure=True, # Use HTTPS for REST API
ws_secure=True, # Use WSS for WebSocket
reconnect_interval=5000, # Reconnection interval (ms)
fetch_interval=30000 # Periodic sync interval (ms)
)
Event Handling
The library uses an event-driven architecture to notify your application about various states and data updates. Here are the key events you can listen to:
connected
: Fired when WebSocket connection is establishedclosed
: Fired when connection is closeddata
: Fired when new blocks are receivederror
: Fired when an error occurs
# WebSocket connection events
cache.on("connected", lambda: print("WebSocket connected"))
cache.on("closed", lambda: print("Connection closed"))
# Data events
cache.on("data", lambda new_blocks: print("New blocks received:", new_blocks))
# Error handling
cache.on("error", lambda error: print("Error occurred:", error))
MongoDB Collection Structure
The library stores blockchain data in MongoDB using the following structure. Each document represents a block:
id
: Unique identifier for the blockcreatedAt
: Timestamp when the block was createdtransactions
: Array of transactions in the block, each containing:- Transaction ID
- Input data
- Output data
- Associated metadata
{
"id": "block_identifier",
"createdAt": ISODate("2025-04-27T..."),
"transactions": [
{
"id": "transaction_id",
"inputs": [...],
"outputs": [...],
"metadata": {...}
}
// ... more transactions
]
}
Best Practices
Error Handling
Proper error handling is crucial for maintaining a reliable synchronization process. Always wrap your code in try-catch blocks and ensure proper cleanup:
try:
await cache.initialize()
except ConnectionError as e:
print("Failed to connect:", e)
except Exception as e:
print("Unexpected error:", e)
finally:
await cache.close()
Advanced Usage
Custom Batch Processing
You can customize how batches of blocks are processed by extending the ResilientPythonCache class. This is useful when you need to:
- Apply custom transformations to the data
- Perform additional validation
- Add custom logging or monitoring
class CustomCache(ResilientPythonCache):
async def process_batch(self, blocks):
# Custom processing logic
for block in blocks:
# Process each block
await self.process_single_block(block)
Query Examples
Here are some common MongoDB queries you might use to access your blockchain data. These examples show how to:
- Retrieve specific transactions by ID
- Get blocks within a time range
- Use MongoDB’s query operators effectively
# Get transaction by ID
async def get_transaction_by_id(mongo_client, tx_id):
result = await mongo_client[db_name][collection_name].find_one({
"transactions.id": tx_id
})
return result
# Get blocks in a time range
async def get_blocks_by_time(mongo_client, start_time, end_time):
results = await mongo_client[db_name][collection_name].find({
"createdAt": {
"$gte": start_time,
"$lte": end_time
}
}).to_list(length=None)
return results
Questions or Feedback about the Python Cache?