UBEC DAO Protocol Suite - Technical Operator Onboarding Guide
Version: 1.0
Last Updated: November 2, 2025
Target Audience: Database Administrators, Blockchain Operators, Systems Engineers
Estimated Onboarding Time: 6-8 hours
Document Overview
This guide provides comprehensive technical onboarding for operators responsible for database management, blockchain operations, and system-level troubleshooting of the UBEC (Ubuntu Bioregional Economic Commons) DAO Protocol Suite. This documentation assumes advanced technical knowledge and focuses on operational procedures, architecture internals, and hands-on troubleshooting.
Attribution: This project uses the services of Claude and Anthropic PBC to inform our decisions and recommendations. This project was made possible with the assistance of Claude and Anthropic PBC.
Table of Contents
- Technical Architecture Deep Dive
- Database Operations
- Blockchain Integration
- Service Registry Internals
- Data Synchronization
- Performance Tuning
- Advanced Troubleshooting
- System Internals
- Development Operations
- Emergency Procedures
Technical Architecture Deep Dive
Design Principles
The UBEC Protocol Suite is built on 12 core design principles that govern all architectural decisions:
- Modular Design: Self-contained holonic components with clear boundaries
- Service Pattern: Only
main.pyexecutes; all modules are services - Service Registry: Centralized dependency injection with topological sorting
- Single Source of Truth: Database-backed configuration, no duplication
- Strict Async: 100% async/await, zero blocking operations
- No Sync Fallbacks: Forward-looking codebase only
- Per-Asset Monitoring: Individual tracking with execution minimums
- No Duplicate Configuration: Each parameter defined exactly once
- Integrated Rate Limiting: Built-in protection for all external APIs
- Clear Separation of Concerns: Layered architecture
- Comprehensive Documentation: Complete docstrings in all modules
- Method Singularity: Each method implemented once (zero code duplication)
Async-Only Architecture
Critical: The entire codebase operates on asyncio patterns. There are NO synchronous fallbacks.
# Service Registry Pattern - All services async
async with registry:
db = await registry.get('database')
result = await db.execute("SELECT * FROM accounts")
# Database Operations - Always async
async def fetch_accounts(self):
async with self.pool.acquire() as conn:
return await conn.fetch("SELECT * FROM stellar_accounts")
# Stellar Operations - Always async
async def get_account(self, account_id: str):
return await self.server.accounts().account_id(account_id).call()
Why This Matters: - No blocking I/O anywhere in the system - Concurrent operations by default - Efficient resource utilization - Predictable performance characteristics
Service Dependency Graph
The service registry uses topological sorting to resolve dependencies:
database (no dependencies)
├── config (requires: database)
│ ├── stellar_client (requires: config)
│ ├── air (requires: database, config)
│ ├── water (requires: database, config)
│ ├── earth (requires: database, config)
│ ├── fire (requires: database, config)
│ └── monitoring (requires: database, config)
│
├── synchronizer (requires: database, stellar_client)
├── analytics (requires: database)
├── distribution (requires: database)
├── distribution_evaluator (requires: database, distribution)
├── holonic_evaluator (requires: database)
├── visualizer (requires: database, analytics)
└── audit (requires: database)
Initialization Order: Services initialize in dependency order automatically. Manual initialization is never required.
Component Communication
Inter-Service Communication Pattern:
# Services communicate through registry only
# NO direct imports between service modules
# ✅ CORRECT: Via registry
synchronizer = await registry.get('synchronizer')
stellar = await registry.get('stellar_client')
result = await synchronizer.sync_account(account_id)
# ⌠INCORRECT: Direct import
from modules.synchronizer import UBECDataSynchronizer
sync = UBECDataSynchronizer() # NEVER DO THIS
Database as Message Bus:
The system uses the database as the authoritative communication layer: - No in-memory state required - Services can restart without state loss - Multi-instance deployment possible (with coordination) - Audit trail for all operations
Database Operations
Schema Architecture
Primary Schema: ubec_main
Analytics Schema: phenomenal (optional, for advanced metrics)
Table Categories:
| Category | Tables | Purpose |
|---|---|---|
| Blockchain Mirror | stellar_accounts, stellar_transactions, stellar_operations, stellar_ledgers | Direct Stellar blockchain data |
| Token Balances | ubec_balances, ubec_balance_history | Token holdings and changes |
| Protocol Data | ubec_air_metrics, ubec_water_flows, ubec_earth_distributions, ubec_fire_transformations | Element-specific tracking |
| Evaluation | ubec_holonic_metrics, ubec_holonic_history | Ubuntu principle assessments |
| Distribution | ubec_distributions, ubec_distribution_states, ubec_distribution_events | Tokenomics compliance |
| System | ubec_config, ubec_audit_log, ubec_system_state | Configuration and auditing |
Connection Pool Management
Pool Configuration:
# From .env
DB_POOL_MIN=5 # Minimum connections
DB_POOL_MAX=20 # Maximum connections
DB_POOL_TIMEOUT=30 # Connection timeout (seconds)
DB_COMMAND_TIMEOUT=60 # Query timeout (seconds)
Pool Monitoring:
# Check pool status
python main.py database --pool-status
# Output:
# Pool Status:
# • Size: 15 connections
# • Active: 12 connections
# • Idle: 3 connections
# • Max: 20 connections
# • Utilization: 60%
Managing Pool Exhaustion:
-- View active connections
SELECT
pid,
usename,
application_name,
client_addr,
state,
query_start,
state_change,
LEFT(query, 50) as query_preview
FROM pg_stat_activity
WHERE datname = 'ubec'
ORDER BY query_start DESC;
-- Terminate long-running queries (use with caution)
SELECT pg_terminate_backend(pid)
FROM pg_stat_activity
WHERE datname = 'ubec'
AND state = 'active'
AND query_start < NOW() - INTERVAL '10 minutes'
AND pid <> pg_backend_pid();
Database Maintenance
Vacuum Operations:
-- Standard vacuum (can run during operations)
VACUUM ANALYZE ubec_main.stellar_transactions;
-- Full vacuum (requires exclusive lock, schedule during maintenance window)
VACUUM FULL ANALYZE ubec_main.stellar_transactions;
-- Auto-vacuum configuration
ALTER TABLE ubec_main.stellar_transactions
SET (autovacuum_vacuum_scale_factor = 0.1);
-- Check vacuum statistics
SELECT
schemaname,
relname,
last_vacuum,
last_autovacuum,
last_analyze,
last_autoanalyze,
n_tup_ins,
n_tup_upd,
n_tup_del
FROM pg_stat_user_tables
WHERE schemaname = 'ubec_main'
ORDER BY n_tup_upd DESC;
Index Maintenance:
-- Find unused indexes
SELECT
schemaname,
tablename,
indexname,
idx_scan,
idx_tup_read,
idx_tup_fetch,
pg_size_pretty(pg_relation_size(indexrelid)) as index_size
FROM pg_stat_user_indexes
WHERE schemaname = 'ubec_main'
AND idx_scan = 0
ORDER BY pg_relation_size(indexrelid) DESC;
-- Rebuild bloated indexes
REINDEX INDEX CONCURRENTLY ubec_main.idx_transactions_account_id;
-- Check index usage
SELECT
schemaname,
tablename,
indexname,
idx_scan as scans,
pg_size_pretty(pg_relation_size(indexrelid)) as size
FROM pg_stat_user_indexes
WHERE schemaname = 'ubec_main'
ORDER BY idx_scan DESC
LIMIT 20;
Query Performance Analysis:
-- Enable query logging (postgresql.conf)
-- log_min_duration_statement = 1000 # Log queries > 1 second
-- Find slow queries
SELECT
userid::regrole,
dbid,
query,
calls,
total_time,
mean_time,
max_time,
stddev_time
FROM pg_stat_statements
WHERE dbid = (SELECT oid FROM pg_database WHERE datname = 'ubec')
ORDER BY mean_time DESC
LIMIT 20;
-- Analyze specific query
EXPLAIN (ANALYZE, BUFFERS, VERBOSE)
SELECT * FROM ubec_main.stellar_transactions
WHERE account_id = 'GABC...'
ORDER BY created_at DESC
LIMIT 100;
Data Integrity Checks
Foreign Key Validation:
-- Check orphaned records (transactions without accounts)
SELECT COUNT(*) as orphaned_transactions
FROM ubec_main.stellar_transactions t
LEFT JOIN ubec_main.stellar_accounts a ON t.account_id = a.account_id
WHERE a.account_id IS NULL;
-- Check balance consistency
SELECT
a.account_id,
a.xlm_balance as account_xlm,
COALESCE(SUM(b.balance), 0) as sum_balances
FROM ubec_main.stellar_accounts a
LEFT JOIN ubec_main.ubec_balances b ON a.account_id = b.account_id
GROUP BY a.account_id, a.xlm_balance
HAVING ABS(a.xlm_balance - COALESCE(SUM(b.balance), 0)) > 0.0001;
Data Verification:
-- Verify transaction count matches blockchain
SELECT
COUNT(*) as local_count,
MAX(created_at) as latest_transaction
FROM ubec_main.stellar_transactions;
-- Check for gaps in sequence numbers
SELECT
t1.sequence + 1 as missing_start,
MIN(t2.sequence) - 1 as missing_end
FROM ubec_main.stellar_transactions t1
LEFT JOIN ubec_main.stellar_transactions t2 ON t1.sequence + 1 = t2.sequence
WHERE t2.sequence IS NULL
GROUP BY t1.sequence
HAVING MIN(t2.sequence) IS NOT NULL
ORDER BY missing_start;
Backup and Recovery Operations
Hot Backup (No Downtime):
#!/bin/bash
# Continuous archiving with WAL shipping
# postgresql.conf settings:
# wal_level = replica
# archive_mode = on
# archive_command = 'cp %p /var/backups/ubec/wal/%f'
# Perform base backup
pg_basebackup -D /var/backups/ubec/base -F tar -z -P -U ubec_admin
# Backup script with rotation
BACKUP_DIR=/var/backups/ubec
TIMESTAMP=$(date +%Y%m%d-%H%M%S)
pg_dump -U ubec_admin -F c -b -v -f $BACKUP_DIR/ubec-${TIMESTAMP}.dump ubec
gzip $BACKUP_DIR/ubec-${TIMESTAMP}.dump
# Verify backup
pg_restore --list $BACKUP_DIR/ubec-${TIMESTAMP}.dump.gz | head -20
Point-in-Time Recovery (PITR):
# Restore base backup
cd /var/lib/postgresql/15/main
rm -rf *
tar -xzf /var/backups/ubec/base/base.tar.gz
# Create recovery.conf (PostgreSQL 12+: postgresql.auto.conf)
cat > recovery.signal << EOF
restore_command = 'cp /var/backups/ubec/wal/%f %p'
recovery_target_time = '2025-11-02 10:30:00 UTC'
EOF
# Start PostgreSQL (will perform recovery)
sudo systemctl start postgresql
# Verify recovery
psql -U ubec_admin -d ubec -c "SELECT NOW();"
Incremental Backup with pg_dump:
# Schema-only backup (fast, small)
pg_dump -U ubec_admin -s -f ubec-schema-$(date +%Y%m%d).sql ubec
# Data-only backup by table
pg_dump -U ubec_admin -t ubec_main.stellar_transactions -a -f transactions.dump ubec
# Parallel dump (faster for large databases)
pg_dump -U ubec_admin -F d -j 4 -f ubec-parallel ubec
Blockchain Integration
Stellar Network Architecture
Horizon API Endpoints:
| Network | Horizon URL | Purpose |
|---|---|---|
| Mainnet | https://horizon.stellar.org | Production operations |
| Testnet | https://horizon-testnet.stellar.org | Testing |
API Rate Limits: - Public Horizon: 3,600 requests/hour (72 requests/minute) - Custom Horizon: Configurable (requires infrastructure)
Rate Limiting Implementation
Circuit Breaker Pattern:
class StellarClientService:
def __init__(self):
self._rate_limiter = RateLimiter(
max_requests=3000,
window_seconds=3600
)
self._circuit_breaker = CircuitBreaker(
failure_threshold=5,
recovery_timeout=60
)
async def _make_request(self, operation):
# Wait for rate limit clearance
await self._rate_limiter.acquire()
try:
# Check circuit breaker
if self._circuit_breaker.is_open():
raise ServiceUnavailableError("Circuit breaker open")
# Make request
result = await operation()
# Record success
self._circuit_breaker.record_success()
return result
except Exception as e:
# Record failure
self._circuit_breaker.record_failure()
raise
Rate Limit Monitoring:
# Check current rate limit status
python main.py stellar --rate-limit-status
# Output:
# Stellar Horizon Rate Limit Status:
# • Current: 2,456 / 3,000 requests used
# • Remaining: 544 requests
# • Window: 3,600 seconds
# • Resets at: 2025-11-02 11:00:00 UTC
# • Utilization: 81.9%
# • Estimated time to exhaustion: 12 minutes
# View rate limit history
python main.py stellar --rate-limit-history --hours 24
Handling Rate Limit Exhaustion:
# Automatic backoff strategy (built into client)
async def fetch_with_backoff(self, operation, max_retries=3):
"""Exponential backoff with jitter for rate limit handling."""
for attempt in range(max_retries):
try:
return await operation()
except RateLimitExceeded as e:
if attempt == max_retries - 1:
raise
# Exponential backoff: 2^attempt * base_delay + jitter
base_delay = 60 # 1 minute
max_delay = 900 # 15 minutes
jitter = random.uniform(0, 30)
delay = min(
(2 ** attempt) * base_delay + jitter,
max_delay
)
logger.warning(
f"Rate limit exceeded, retrying in {delay:.0f}s "
f"(attempt {attempt + 1}/{max_retries})"
)
await asyncio.sleep(delay)
Transaction Processing
Transaction States:
-- Transaction lifecycle states
CREATE TYPE transaction_state AS ENUM (
'pending', -- Submitted to blockchain
'confirmed', -- Included in ledger
'failed', -- Transaction failed
'timeout' -- No confirmation received
);
-- Monitor transaction states
SELECT
state,
COUNT(*) as count,
AVG(EXTRACT(EPOCH FROM (confirmed_at - created_at))) as avg_confirm_seconds
FROM ubec_main.stellar_transactions
WHERE created_at > NOW() - INTERVAL '24 hours'
GROUP BY state;
Transaction Monitoring:
# Monitor pending transactions
python main.py transactions --pending --watch
# Check transaction status
python main.py transactions --hash <transaction_hash>
# Retry failed transactions
python main.py transactions --retry-failed --since "1 hour ago"
Streaming Blockchain Events:
# Built-in streaming capability
async def stream_operations(self, account_id: str, cursor: str = 'now'):
"""Stream operations for an account in real-time."""
async for operation in self.server.operations() \
.for_account(account_id) \
.cursor(cursor) \
.stream():
# Process operation
await self.process_operation(operation)
# Update cursor for resume capability
await self.db.execute(
"UPDATE ubec_main.sync_state SET last_cursor = $1",
operation['paging_token']
)
Account Management
Account Discovery:
# Discover accounts by asset
python main.py accounts --discover --asset UBEC
# Output: List of all accounts holding UBEC token
# Sync discovered accounts
python main.py sync --accounts <discovered-list>
Account Health Monitoring:
-- Check account health
SELECT
a.account_id,
a.xlm_balance,
a.trustline_count,
COUNT(DISTINCT b.token_code) as token_count,
SUM(b.balance) as total_balance,
MAX(t.created_at) as last_transaction
FROM ubec_main.stellar_accounts a
LEFT JOIN ubec_main.ubec_balances b ON a.account_id = b.account_id
LEFT JOIN ubec_main.stellar_transactions t ON a.account_id = t.account_id
GROUP BY a.account_id, a.xlm_balance, a.trustline_count
HAVING a.xlm_balance < 1.5 -- Minimum reserve warning
ORDER BY a.xlm_balance ASC;
Trustline Management:
# Check trustline status
async def check_trustlines(self, account_id: str) -> Dict[str, bool]:
"""Verify account has trustlines for all UBEC tokens."""
account = await self.stellar.get_account(account_id)
required_assets = ['UBEC', 'UBECrc', 'UBECgpi', 'UBECtt']
trustlines = {}
for balance in account['balances']:
if balance['asset_type'] != 'native':
asset_code = balance['asset_code']
if asset_code in required_assets:
trustlines[asset_code] = True
return {asset: trustlines.get(asset, False) for asset in required_assets}
Service Registry Internals
Service Lifecycle Management
Service States:
class ServiceState(Enum):
UNINITIALIZED = "uninitialized"
INITIALIZING = "initializing"
READY = "ready"
ERROR = "error"
SHUTDOWN = "shutdown"
Initialization Flow:
async def get(self, service_name: str):
"""Get service instance, initializing if necessary."""
# Check if already initialized
if service_name in self._services:
return self._services[service_name]
# Check if service is registered
if service_name not in self._factories:
raise ServiceNotFoundError(f"Service '{service_name}' not registered")
# Mark as initializing
self._states[service_name] = ServiceState.INITIALIZING
try:
# Get factory and dependencies
factory = self._factories[service_name]
deps = self._dependencies.get(service_name, [])
# Initialize dependencies first (recursive)
dep_services = {}
for dep in deps:
dep_services[dep] = await self.get(dep)
# Create service instance
service = await factory(**dep_services)
# Store and mark ready
self._services[service_name] = service
self._states[service_name] = ServiceState.READY
return service
except Exception as e:
self._states[service_name] = ServiceState.ERROR
raise ServiceInitializationError(f"Failed to initialize '{service_name}': {e}")
Health Check Patterns
Three Health Check Patterns:
- Database-Dependent Services:
async def health_check(self):
return await ServiceHealthCheck.database_dependent_health(
service_name='synchronizer',
db_manager=self.db,
is_initialized=self._initialized,
operation_count=self._ops_count,
error_count=self._error_count
)
- API-Dependent Services:
async def health_check(self):
return await ServiceHealthCheck.api_dependent_health(
service_name='stellar_client',
is_initialized=self._initialized,
last_request_time=self._last_request,
rate_limiter=self._rate_limiter
)
- Config-Only Services:
def health_check(self): # Note: Synchronous
return ServiceHealthCheck.sync_basic_health_check(
service_name='config',
is_initialized=True,
settings_count=len(self._settings)
)
Health Check Aggregation:
# System-wide health check
async def health_check(self, detailed=False):
"""Aggregate health from all services."""
health_results = {}
# Collect health from all initialized services
for service_name, service in self._services.items():
try:
# Attempt async health check first
if hasattr(service, 'health_check'):
health_method = getattr(service, 'health_check')
if asyncio.iscoroutinefunction(health_method):
# Async health check with timeout
health = await asyncio.wait_for(
health_method(),
timeout=5.0
)
else:
# Sync health check (rare, config-only services)
health = health_method()
health_results[service_name] = health
except asyncio.TimeoutError:
health_results[service_name] = {
'status': 'unhealthy',
'message': 'Health check timed out'
}
except Exception as e:
health_results[service_name] = {
'status': 'unhealthy',
'message': str(e)
}
# Calculate overall status
statuses = [h.get('status', 'unknown') for h in health_results.values()]
if all(s == 'healthy' for s in statuses):
overall = 'healthy'
elif any(s == 'unhealthy' for s in statuses):
overall = 'unhealthy'
else:
overall = 'degraded'
return {
'overall_status': overall,
'services': health_results,
'summary': {
'total': len(health_results),
'healthy': sum(1 for s in statuses if s == 'healthy'),
'unhealthy': sum(1 for s in statuses if s == 'unhealthy'),
'degraded': sum(1 for s in statuses if s == 'degraded')
}
}
Dependency Resolution
Topological Sorting:
def _resolve_dependencies(self):
"""Resolve service initialization order using topological sort."""
# Build adjacency list
graph = defaultdict(list)
in_degree = defaultdict(int)
for service, deps in self._dependencies.items():
for dep in deps:
graph[dep].append(service)
in_degree[service] += 1
# Kahn's algorithm for topological sort
queue = deque([s for s in self._factories if in_degree[s] == 0])
order = []
while queue:
service = queue.popleft()
order.append(service)
for dependent in graph[service]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
# Check for circular dependencies
if len(order) != len(self._factories):
raise CircularDependencyError("Circular dependency detected")
return order
Data Synchronization
Synchronization Architecture
Three Synchronization Modes:
- Incremental Sync: Updates since last sync (default)
- Full Sync: Complete resync from genesis
- Targeted Sync: Specific accounts or time ranges
Sync State Management:
-- Sync state tracking
CREATE TABLE ubec_main.sync_state (
id SERIAL PRIMARY KEY,
account_id TEXT,
last_sync_ledger INTEGER,
last_sync_cursor TEXT,
last_sync_time TIMESTAMPTZ,
sync_status TEXT CHECK (sync_status IN ('synced', 'syncing', 'error')),
error_count INTEGER DEFAULT 0,
last_error TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Check sync status
SELECT
account_id,
last_sync_time,
sync_status,
AGE(NOW(), last_sync_time) as sync_age,
error_count
FROM ubec_main.sync_state
WHERE sync_status != 'synced'
OR last_sync_time < NOW() - INTERVAL '1 hour'
ORDER BY last_sync_time ASC;
Synchronization Procedures
Incremental Sync:
async def sync_incremental(self, account_id: str):
"""Perform incremental sync from last known cursor."""
# Get last cursor
cursor = await self.db.fetchval(
"SELECT last_sync_cursor FROM ubec_main.sync_state WHERE account_id = $1",
account_id
)
# Default to 'now' if no cursor (first sync)
cursor = cursor or 'now'
# Fetch operations since cursor
operations = await self.stellar.operations() \
.for_account(account_id) \
.cursor(cursor) \
.order('asc') \
.limit(200) \
.call()
# Process each operation
for op in operations['_embedded']['records']:
await self.process_operation(op)
# Update cursor after each operation
await self.db.execute(
"UPDATE ubec_main.sync_state SET last_sync_cursor = $1 WHERE account_id = $2",
op['paging_token'],
account_id
)
# Mark sync complete
await self.db.execute(
"""
UPDATE ubec_main.sync_state
SET last_sync_time = NOW(),
sync_status = 'synced',
error_count = 0
WHERE account_id = $1
""",
account_id
)
Full Sync Strategy:
async def sync_full(self, account_id: str):
"""Perform complete resync from account creation."""
logger.info(f"Starting full sync for {account_id}")
# Clear existing data (with transaction)
async with self.db.transaction():
await self.db.execute(
"DELETE FROM ubec_main.stellar_operations WHERE account_id = $1",
account_id
)
await self.db.execute(
"DELETE FROM ubec_main.stellar_transactions WHERE account_id = $1",
account_id
)
await self.db.execute(
"DELETE FROM ubec_main.ubec_balances WHERE account_id = $1",
account_id
)
# Sync from beginning
cursor = '0' # Start from genesis
total_ops = 0
while True:
# Fetch batch of operations
operations = await self.stellar.operations() \
.for_account(account_id) \
.cursor(cursor) \
.order('asc') \
.limit(200) \
.call()
records = operations['_embedded']['records']
if not records:
break
# Process batch
for op in records:
await self.process_operation(op)
cursor = op['paging_token']
total_ops += 1
# Log progress
if total_ops % 1000 == 0:
logger.info(f"Synced {total_ops} operations for {account_id}")
# Rate limiting pause
await asyncio.sleep(0.1)
logger.info(f"Full sync complete: {total_ops} operations for {account_id}")
Handling Sync Failures
Error Recovery:
async def sync_with_retry(self, account_id: str, max_retries=3):
"""Sync with automatic retry and exponential backoff."""
for attempt in range(max_retries):
try:
# Mark as syncing
await self.db.execute(
"UPDATE ubec_main.sync_state SET sync_status = 'syncing' WHERE account_id = $1",
account_id
)
# Perform sync
await self.sync_incremental(account_id)
return # Success
except Exception as e:
logger.error(f"Sync failed for {account_id} (attempt {attempt + 1}): {e}")
# Update error count
await self.db.execute(
"""
UPDATE ubec_main.sync_state
SET sync_status = 'error',
error_count = error_count + 1,
last_error = $1
WHERE account_id = $2
""",
str(e),
account_id
)
if attempt < max_retries - 1:
# Exponential backoff
delay = (2 ** attempt) * 10
logger.info(f"Retrying in {delay}s...")
await asyncio.sleep(delay)
else:
raise
Monitoring Sync Performance:
-- Sync performance metrics
SELECT
DATE_TRUNC('hour', last_sync_time) as sync_hour,
COUNT(*) as accounts_synced,
AVG(EXTRACT(EPOCH FROM (updated_at - created_at))) as avg_sync_duration,
SUM(CASE WHEN sync_status = 'error' THEN 1 ELSE 0 END) as error_count
FROM ubec_main.sync_state
WHERE last_sync_time > NOW() - INTERVAL '24 hours'
GROUP BY sync_hour
ORDER BY sync_hour DESC;
Performance Tuning
Database Optimization
Connection Pool Tuning:
# Optimal pool size calculation
# Formula: (2 * CPU_CORES) + effective_spindle_count
# For 8-core CPU with SSD:
DB_POOL_MIN = 5
DB_POOL_MAX = 20 # (2 * 8) + 4
# Monitor pool utilization
async def monitor_pool():
pool_size = len(pool._holders)
pool_free = pool.get_size() - pool.get_idle_size()
utilization = (pool_free / pool_size) * 100
if utilization > 80:
logger.warning(f"Pool utilization high: {utilization:.1f}%")
Query Optimization:
-- Add appropriate indexes
CREATE INDEX CONCURRENTLY idx_transactions_account_created
ON ubec_main.stellar_transactions (account_id, created_at DESC);
CREATE INDEX CONCURRENTLY idx_operations_type_created
ON ubec_main.stellar_operations (operation_type, created_at DESC);
-- Partial indexes for common queries
CREATE INDEX CONCURRENTLY idx_active_accounts
ON ubec_main.stellar_accounts (account_id)
WHERE last_modified > NOW() - INTERVAL '30 days';
-- Expression indexes
CREATE INDEX CONCURRENTLY idx_balances_significant
ON ubec_main.ubec_balances (account_id, token_code)
WHERE balance > 0.01;
Configuration Tuning:
# postgresql.conf optimizations for UBEC workload
# Memory settings (for 16GB RAM server)
shared_buffers = 4GB # 25% of RAM
effective_cache_size = 12GB # 75% of RAM
maintenance_work_mem = 1GB # For vacuum/index operations
work_mem = 64MB # Per query operation
# Checkpoint settings
checkpoint_completion_target = 0.9 # Spread out checkpoint writes
wal_buffers = 16MB
max_wal_size = 4GB
min_wal_size = 1GB
# Query planning
random_page_cost = 1.1 # For SSD
effective_io_concurrency = 200 # SSD concurrency
# Connections
max_connections = 100 # Application pool + admin
# Logging
log_min_duration_statement = 1000 # Log slow queries > 1s
log_line_prefix = '%t [%p]: [%l-1] user=%u,db=%d,app=%a,client=%h '
Application Performance
Async Batch Operations:
async def batch_sync_accounts(self, account_ids: List[str], concurrency=5):
"""Sync multiple accounts concurrently with controlled parallelism."""
semaphore = asyncio.Semaphore(concurrency)
async def sync_with_semaphore(account_id):
async with semaphore:
return await self.sync_incremental(account_id)
# Execute in parallel batches
tasks = [sync_with_semaphore(aid) for aid in account_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check for errors
errors = [r for r in results if isinstance(r, Exception)]
if errors:
logger.error(f"Batch sync had {len(errors)} errors")
return results
Database Query Batching:
async def fetch_balances_batch(self, account_ids: List[str]):
"""Fetch balances for multiple accounts in single query."""
# Single query instead of N queries
query = """
SELECT account_id, token_code, balance, last_modified
FROM ubec_main.ubec_balances
WHERE account_id = ANY($1)
"""
rows = await self.db.fetch(query, account_ids)
# Group by account
balances = defaultdict(list)
for row in rows:
balances[row['account_id']].append(dict(row))
return balances
Caching Strategy:
from functools import lru_cache
from datetime import datetime, timedelta
class CachedDataService:
def __init__(self):
self._cache = {}
self._cache_ttl = 300 # 5 minutes
async def get_token_metrics(self, token_code: str):
"""Get token metrics with caching."""
cache_key = f"metrics:{token_code}"
# Check cache
if cache_key in self._cache:
cached_data, cached_time = self._cache[cache_key]
if datetime.now() - cached_time < timedelta(seconds=self._cache_ttl):
return cached_data
# Cache miss - fetch from database
metrics = await self._fetch_metrics_from_db(token_code)
# Store in cache
self._cache[cache_key] = (metrics, datetime.now())
return metrics
Monitoring Performance
Application Metrics:
import time
from contextlib import asynccontextmanager
class PerformanceMonitor:
def __init__(self):
self._metrics = defaultdict(list)
@asynccontextmanager
async def measure(self, operation_name: str):
"""Context manager for timing operations."""
start = time.perf_counter()
try:
yield
finally:
duration = time.perf_counter() - start
self._metrics[operation_name].append(duration)
# Log slow operations
if duration > 1.0:
logger.warning(f"Slow operation: {operation_name} took {duration:.2f}s")
def get_stats(self, operation_name: str):
"""Get performance statistics."""
timings = self._metrics[operation_name]
if not timings:
return None
return {
'count': len(timings),
'min': min(timings),
'max': max(timings),
'avg': sum(timings) / len(timings),
'p95': sorted(timings)[int(len(timings) * 0.95)],
'p99': sorted(timings)[int(len(timings) * 0.99)]
}
# Usage
monitor = PerformanceMonitor()
async with monitor.measure('sync_account'):
await sync_account(account_id)
# Get stats
stats = monitor.get_stats('sync_account')
print(f"Average sync time: {stats['avg']:.3f}s, p95: {stats['p95']:.3f}s")
Advanced Troubleshooting
Diagnostic Tools
Comprehensive System Diagnostic:
# Run full diagnostic
python main.py diagnostic --comprehensive --output /tmp/diagnostic/
# Output includes:
# - System health snapshot
# - Database statistics
# - Blockchain connectivity
# - Service states
# - Recent errors
# - Performance metrics
Database Diagnostics:
-- Long-running queries
SELECT
pid,
now() - query_start as duration,
state,
query
FROM pg_stat_activity
WHERE state != 'idle'
AND query_start < now() - interval '30 seconds'
ORDER BY duration DESC;
-- Blocking queries
SELECT
blocked_locks.pid AS blocked_pid,
blocked_activity.usename AS blocked_user,
blocking_locks.pid AS blocking_pid,
blocking_activity.usename AS blocking_user,
blocked_activity.query AS blocked_statement,
blocking_activity.query AS blocking_statement
FROM pg_catalog.pg_locks blocked_locks
JOIN pg_catalog.pg_stat_activity blocked_activity ON blocked_activity.pid = blocked_locks.pid
JOIN pg_catalog.pg_locks blocking_locks ON blocking_locks.locktype = blocked_locks.locktype
JOIN pg_catalog.pg_stat_activity blocking_activity ON blocking_activity.pid = blocking_locks.pid
WHERE NOT blocked_locks.granted
AND blocking_locks.granted;
-- Table bloat
SELECT
schemaname,
tablename,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename)) AS size,
pg_size_pretty(pg_total_relation_size(schemaname||'.'||tablename) - pg_relation_size(schemaname||'.'||tablename)) AS external_size
FROM pg_tables
WHERE schemaname = 'ubec_main'
ORDER BY pg_total_relation_size(schemaname||'.'||tablename) DESC
LIMIT 20;
Network Troubleshooting
Stellar Horizon Connectivity:
# Test Horizon connectivity
curl -v https://horizon.stellar.org/
# Check Horizon health
curl https://horizon.stellar.org/ | jq '.horizon_version, .core_version'
# Test account endpoint
curl https://horizon.stellar.org/accounts/<ACCOUNT_ID> | jq '.balances'
# Stream operations (test streaming)
curl -N https://horizon.stellar.org/operations?cursor=now
Network Latency Analysis:
# Measure Horizon API latency
for i in {1..10}; do
curl -w "@curl-format.txt" -o /dev/null -s https://horizon.stellar.org/
done
# curl-format.txt:
# time_namelookup: %{time_namelookup}\n
# time_connect: %{time_connect}\n
# time_starttransfer: %{time_starttransfer}\n
# time_total: %{time_total}\n
# Database latency
psql -U ubec_admin -d ubec -c "\timing on" -c "SELECT COUNT(*) FROM ubec_main.stellar_accounts;"
Error Analysis
Log Aggregation:
# Extract and categorize errors
grep ERROR /var/log/ubec/application.log | \
awk '{print $5}' | \
sort | uniq -c | sort -rn
# Top error messages
grep ERROR /var/log/ubec/application.log | \
cut -d' ' -f6- | \
sort | uniq -c | sort -rn | head -20
# Error timeline
grep ERROR /var/log/ubec/application.log | \
awk '{print $1}' | \
cut -d'T' -f1 | \
sort | uniq -c
# Service-specific errors
grep ERROR /var/log/ubec/application.log | grep "service=synchronizer"
Database Error Tracking:
-- Application errors from audit log
SELECT
DATE_TRUNC('hour', timestamp) as error_hour,
action,
COUNT(*) as error_count,
array_agg(DISTINCT error_message) as error_types
FROM ubec_main.ubec_audit_log
WHERE action LIKE '%error%'
AND timestamp > NOW() - INTERVAL '24 hours'
GROUP BY error_hour, action
ORDER BY error_hour DESC, error_count DESC;
-- Failed transactions
SELECT
DATE_TRUNC('hour', created_at) as failure_hour,
COUNT(*) as failed_count,
array_agg(DISTINCT error_message) as error_types
FROM ubec_main.stellar_transactions
WHERE state = 'failed'
AND created_at > NOW() - INTERVAL '24 hours'
GROUP BY failure_hour
ORDER BY failure_hour DESC;
Resolving Common Issues
Issue: Database Deadlocks
-- Identify deadlock patterns
SELECT
query,
COUNT(*) as occurrence_count
FROM pg_stat_activity
WHERE wait_event_type = 'Lock'
GROUP BY query
ORDER BY occurrence_count DESC;
-- Solution: Ensure consistent lock ordering
-- BAD: Different order in different transactions
BEGIN;
UPDATE ubec_main.stellar_accounts SET ... WHERE account_id = 'A';
UPDATE ubec_main.ubec_balances SET ... WHERE account_id = 'B';
COMMIT;
-- GOOD: Same order in all transactions
BEGIN;
UPDATE ubec_main.stellar_accounts SET ... WHERE account_id IN ('A', 'B') ORDER BY account_id;
UPDATE ubec_main.ubec_balances SET ... WHERE account_id IN ('A', 'B') ORDER BY account_id;
COMMIT;
Issue: Memory Leaks
# Monitor memory usage
import tracemalloc
import psutil
def monitor_memory():
process = psutil.Process()
memory_info = process.memory_info()
print(f"RSS: {memory_info.rss / 1024 / 1024:.2f} MB")
print(f"VMS: {memory_info.vms / 1024 / 1024:.2f} MB")
# Tracemalloc for Python memory
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
for stat in top_stats[:10]:
print(stat)
# Check for circular references
import gc
gc.collect()
print(f"Unreachable objects: {gc.collect()}")
# Common causes in async code:
# 1. Unclosed connections
# 2. Tasks not properly cancelled
# 3. Circular references in callbacks
# Solution: Use context managers and proper cleanup
async with registry:
# Services auto-cleanup
pass
System Internals
File Structure
ubec-protocol/
├── main.py # SOLE ENTRY POINT
├── core/
│ ├── service_registry.py # Service management
│ ├── db/
│ │ ├── database_manager.py
│ │ ├── config_manager.py
│ │ └── ubec_data_synchronizer.py
│ ├── stellar/
│ │ ├── stellar_client.py
│ │ └── rate_limiter.py
│ ├── protocols/
│ │ ├── UBEC_protocol.py # Air
│ │ ├── UBECrc_protocol.py # Water
│ │ ├── UBECgpi_protocol.py # Earth
│ │ └── UBECtt_protocol.py # Fire
│ ├── analytics/
│ │ ├── analytics_service.py
│ │ ├── holonic_evaluator.py
│ │ └── distribution_manager.py
│ └── utils/
│ ├── service_health.py
│ └── logging_config.py
├── database/
│ └── schema/
│ ├── ubec_main_schema.sql
│ └── phenomenal_schema.sql
├── .env # Configuration
└── requirements.txt
Configuration Management
Database-Backed Configuration:
-- Configuration table
CREATE TABLE ubec_main.ubec_config (
id SERIAL PRIMARY KEY,
config_key TEXT UNIQUE NOT NULL,
config_value TEXT NOT NULL,
config_type TEXT NOT NULL, -- 'string', 'integer', 'boolean', 'json'
description TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
-- Example configurations
INSERT INTO ubec_main.ubec_config (config_key, config_value, config_type, description)
VALUES
('sync.batch_size', '200', 'integer', 'Number of operations per sync batch'),
('stellar.rate_limit', '3000', 'integer', 'Stellar API rate limit per hour'),
('evaluation.threshold', '0.5', 'float', 'Minimum holonic score threshold');
-- Retrieve configuration
SELECT config_value::integer
FROM ubec_main.ubec_config
WHERE config_key = 'sync.batch_size';
Configuration Service:
class ConfigManager:
"""Database-backed configuration manager (Principle #4: Single Source of Truth)."""
async def get(self, key: str, default=None):
"""Get configuration value."""
row = await self.db.fetchrow(
"SELECT config_value, config_type FROM ubec_main.ubec_config WHERE config_key = $1",
key
)
if not row:
return default
# Type conversion
value = row['config_value']
config_type = row['config_type']
if config_type == 'integer':
return int(value)
elif config_type == 'float':
return float(value)
elif config_type == 'boolean':
return value.lower() in ('true', '1', 'yes')
elif config_type == 'json':
return json.loads(value)
else:
return value
async def set(self, key: str, value, config_type: str, description: str = None):
"""Set configuration value."""
await self.db.execute(
"""
INSERT INTO ubec_main.ubec_config (config_key, config_value, config_type, description)
VALUES ($1, $2, $3, $4)
ON CONFLICT (config_key)
DO UPDATE SET
config_value = EXCLUDED.config_value,
updated_at = NOW()
""",
key,
str(value),
config_type,
description
)
Logging Architecture
Structured Logging:
import logging
import json
from datetime import datetime
class StructuredLogger:
"""Structured logging for operational analysis."""
def __init__(self, name: str):
self.logger = logging.getLogger(name)
def log(self, level: str, message: str, **kwargs):
"""Log structured message."""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'level': level,
'service': self.logger.name,
'message': message,
**kwargs
}
# Log as JSON
self.logger.log(
getattr(logging, level.upper()),
json.dumps(log_entry)
)
def operation(self, operation: str, duration: float, success: bool, **kwargs):
"""Log operation completion."""
self.log(
'INFO' if success else 'ERROR',
f"Operation completed: {operation}",
operation=operation,
duration_ms=duration * 1000,
success=success,
**kwargs
)
# Usage
logger = StructuredLogger('synchronizer')
logger.operation(
operation='sync_account',
duration=1.23,
success=True,
account_id='GABC...',
operations_synced=150
)
Log Analysis:
# Parse JSON logs
cat /var/log/ubec/application.log | \
jq -r 'select(.level == "ERROR") | "\(.timestamp) \(.service) \(.message)"'
# Operation performance
cat /var/log/ubec/application.log | \
jq -r 'select(.operation) | [.operation, .duration_ms, .success] | @csv'
# Service health
cat /var/log/ubec/application.log | \
jq -r 'select(.service) | .service' | \
sort | uniq -c | sort -rn
Development Operations
Testing Framework
Unit Tests:
# tests/test_synchronizer.py
import pytest
from unittest.mock import AsyncMock, MagicMock
@pytest.mark.asyncio
async def test_sync_account():
"""Test account synchronization."""
# Mock dependencies
mock_db = AsyncMock()
mock_stellar = AsyncMock()
# Create synchronizer
sync = UBECDataSynchronizer(db=mock_db, stellar=mock_stellar)
# Mock responses
mock_stellar.operations.return_value.for_account.return_value.cursor.return_value.call = AsyncMock(
return_value={'_embedded': {'records': []}}
)
# Execute test
await sync.sync_incremental('GABC...')
# Verify
assert mock_stellar.operations.called
assert mock_db.execute.called
Integration Tests:
@pytest.mark.integration
@pytest.mark.asyncio
async def test_database_sync_integration():
"""Test full database synchronization workflow."""
# Use test database
db = await DatabaseManager.create(
host='localhost',
database='ubec_test',
user='ubec_test'
)
try:
# Initialize services
async with ServiceRegistry() as registry:
registry.register_instance('database', db)
sync = await registry.get('synchronizer')
# Perform sync
result = await sync.sync_incremental('GABC...')
# Verify data in database
count = await db.fetchval(
"SELECT COUNT(*) FROM ubec_main.stellar_operations WHERE account_id = $1",
'GABC...'
)
assert count > 0
finally:
await db.close()
Running Tests:
# Run all tests
pytest tests/
# Run specific test file
pytest tests/test_synchronizer.py
# Run with coverage
pytest --cov=core tests/
# Run integration tests only
pytest -m integration tests/
# Run with verbose output
pytest -v tests/
Code Quality Tools
Linting:
# Flake8
flake8 core/ --max-line-length=100
# PyLint
pylint core/ --disable=C0111,R0903
# Black (code formatting)
black core/ --line-length=100
# isort (import sorting)
isort core/ --profile black
Type Checking:
# MyPy
mypy core/ --strict
# Type hints example
from typing import List, Dict, Optional
import asyncpg
async def fetch_accounts(
db: asyncpg.Pool,
account_ids: List[str]
) -> Dict[str, Dict[str, any]]:
"""Fetch multiple accounts.
Args:
db: Database connection pool
account_ids: List of account IDs to fetch
Returns:
Dictionary mapping account_id to account data
"""
# Implementation
pass
Deployment Procedures
Pre-Deployment Checklist:
# 1. Run tests
pytest tests/ --cov=core
# 2. Check code quality
flake8 core/
pylint core/
# 3. Verify database migrations
psql -U ubec_admin -d ubec -f database/migrations/latest.sql --dry-run
# 4. Backup current deployment
tar -czf ubec-backup-$(date +%Y%m%d-%H%M%S).tar.gz /opt/ubec-protocol/
# 5. Backup database
pg_dump -U ubec_admin -F c ubec > ubec-backup-$(date +%Y%m%d-%H%M%S).dump
Deployment Process:
# 1. Pull latest code
cd /opt/ubec-protocol
git pull origin main
# 2. Update dependencies
source venv/bin/activate
pip install -r requirements.txt --upgrade
# 3. Run database migrations
psql -U ubec_admin -d ubec -f database/migrations/latest.sql
# 4. Restart services
sudo systemctl restart ubec-monitor.service
# 5. Verify health
python main.py health --detailed
# 6. Monitor logs
tail -f /var/log/ubec/application.log
Rollback Procedure:
# 1. Stop services
sudo systemctl stop ubec-monitor.service
# 2. Restore code
cd /opt/ubec-protocol
git checkout <previous-commit>
# 3. Restore dependencies
pip install -r requirements.txt
# 4. Restore database (if necessary)
pg_restore -U ubec_admin -d ubec -c ubec-backup-<timestamp>.dump
# 5. Restart services
sudo systemctl start ubec-monitor.service
# 6. Verify
python main.py health --detailed
Emergency Procedures
Critical Failure Response
Incident Response Flowchart:
1. DETECT: Alert triggered or user report
↓
2. ASSESS: Determine severity (P0-P3)
↓
3. NOTIFY: Alert on-call engineer
↓
4. INVESTIGATE: Gather diagnostics
↓
5. MITIGATE: Implement temporary fix
↓
6. RESOLVE: Deploy permanent solution
↓
7. DOCUMENT: Post-mortem analysis
Severity Levels:
| Priority | Description | Response Time | Example |
|---|---|---|---|
| P0 | System down | 15 minutes | Database unreachable |
| P1 | Critical degradation | 1 hour | Sync lag >4 hours |
| P2 | Partial degradation | 4 hours | Single service failing |
| P3 | Minor issue | 1 day | Slow query performance |
Emergency Commands
Database Emergency:
# Check if PostgreSQL is running
sudo systemctl status postgresql
# Force restart PostgreSQL
sudo systemctl restart postgresql
# Check connections
psql -U postgres -c "SELECT * FROM pg_stat_activity WHERE datname = 'ubec';"
# Kill all connections (emergency only)
psql -U postgres -c "SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname = 'ubec' AND pid <> pg_backend_pid();"
# Start emergency read-only mode
psql -U postgres -c "ALTER DATABASE ubec SET default_transaction_read_only = on;"
Application Emergency:
# Force stop all Python processes
pkill -9 -f "python.*main.py"
# Check for zombie processes
ps aux | grep defunct
# Clear connection pool
python main.py database --reset-pool
# Emergency health check
python main.py health --emergency
Blockchain Emergency:
# Check Stellar network status
curl https://status.stellar.org/api/v2/status.json
# Switch to backup Horizon (if available)
# Update .env:
STELLAR_HORIZON_URL=https://horizon-backup.stellar.org
# Test new connection
curl https://horizon-backup.stellar.org/
# Restart with new configuration
sudo systemctl restart ubec-monitor.service
Data Recovery
Transaction Recovery:
-- Find missing transactions
SELECT generate_series AS expected_id
FROM generate_series(
(SELECT MIN(id) FROM ubec_main.stellar_transactions),
(SELECT MAX(id) FROM ubec_main.stellar_transactions)
)
EXCEPT
SELECT id FROM ubec_main.stellar_transactions
ORDER BY expected_id;
-- Recover from backup
-- 1. Identify time range of missing data
SELECT MIN(created_at), MAX(created_at)
FROM ubec_main.stellar_transactions
WHERE created_at > '2025-11-01'
AND created_at < '2025-11-02';
-- 2. Extract from backup
pg_restore -U ubec_admin -d ubec -t stellar_transactions -a backup.dump
-- 3. Verify recovery
SELECT COUNT(*) FROM ubec_main.stellar_transactions WHERE created_at BETWEEN '2025-11-01' AND '2025-11-02';
Full System Recovery:
#!/bin/bash
# Emergency full system recovery
echo "Starting emergency recovery..."
# 1. Stop all services
sudo systemctl stop ubec-monitor.service
# 2. Backup current state
timestamp=$(date +%Y%m%d-%H%M%S)
pg_dump -U ubec_admin -F c ubec > /tmp/pre-recovery-${timestamp}.dump
# 3. Restore from last known good backup
pg_restore -U ubec_admin -d ubec -c /var/backups/ubec/ubec-last-good.dump
# 4. Run database integrity check
psql -U ubec_admin -d ubec -c "SELECT COUNT(*) FROM ubec_main.stellar_accounts;"
# 5. Resync blockchain data
python main.py sync --full --accounts all
# 6. Verify health
python main.py health --detailed
# 7. Restart services
sudo systemctl start ubec-monitor.service
echo "Recovery complete. Check logs for verification."
Appendix A: Database Schema Reference
Core Tables
stellar_accounts:
CREATE TABLE ubec_main.stellar_accounts (
id SERIAL PRIMARY KEY,
account_id TEXT UNIQUE NOT NULL,
sequence BIGINT,
xlm_balance NUMERIC(20,7),
trustline_count INTEGER,
last_modified TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
INDEX idx_account_id (account_id),
INDEX idx_last_modified (last_modified)
);
stellar_transactions:
CREATE TABLE ubec_main.stellar_transactions (
id SERIAL PRIMARY KEY,
transaction_hash TEXT UNIQUE NOT NULL,
account_id TEXT REFERENCES ubec_main.stellar_accounts(account_id),
sequence BIGINT,
fee INTEGER,
operation_count INTEGER,
created_at TIMESTAMPTZ,
state TEXT CHECK (state IN ('pending', 'confirmed', 'failed')),
INDEX idx_tx_account (account_id, created_at),
INDEX idx_tx_hash (transaction_hash),
INDEX idx_tx_state (state)
);
ubec_balances:
CREATE TABLE ubec_main.ubec_balances (
id SERIAL PRIMARY KEY,
account_id TEXT REFERENCES ubec_main.stellar_accounts(account_id),
token_code TEXT NOT NULL,
issuer TEXT NOT NULL,
balance NUMERIC(20,7),
last_modified TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE(account_id, token_code, issuer),
INDEX idx_balance_account (account_id),
INDEX idx_balance_token (token_code, balance)
);
Appendix B: API Reference
Service Registry API
# Get service instance
service = await registry.get('service_name')
# Register service factory
registry.register_factory('name', factory_func, dependencies=['dep1', 'dep2'])
# Check health
health = await registry.health_check(detailed=True)
# Context manager
async with registry:
# Auto-initialize and cleanup
pass
Database Manager API
# Execute query
await db.execute("INSERT INTO ...")
# Fetch single row
row = await db.fetchrow("SELECT * FROM ...")
# Fetch multiple rows
rows = await db.fetch("SELECT * FROM ...")
# Fetch single value
count = await db.fetchval("SELECT COUNT(*) FROM ...")
# Transaction
async with db.transaction():
await db.execute("INSERT ...")
await db.execute("UPDATE ...")
Synchronizer API
# Incremental sync
await synchronizer.sync_incremental(account_id)
# Full sync
await synchronizer.sync_full(account_id)
# Batch sync
await synchronizer.sync_accounts([id1, id2, id3])
# Check sync status
status = await synchronizer.get_sync_status(account_id)
Document History
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2025-11-02 | UBEC DevOps | Initial release |
End of Technical Operator Onboarding Guide