Real-time Data Synchronization¶
This guide explains how to set up continuous real-time data synchronization between external systems and the Open PaaS Platform. Follow these steps to configure change data capture, define sync rules, and handle real-time events effectively.
Before You Begin¶
Make sure you have the following:
- Access to Open PaaS Platform with real-time sync permissions
- External system with change data capture (CDC) or webhook capabilities
- Network connectivity between systems (consider firewall rules)
- Python 3.x and the Open PaaS Platform SDK installed
- Understanding of your data schema and sync requirements
- Familiarity with Create Your First Connector
Create a Real-time Sync Connector¶
-
Configure Your Data Source: Set up the source system for real-time data capture:
from openpaas_sdk import OpenPaaSClient, RealtimeSync # Initialize the platform client client = OpenPaaSClient(api_key="your_api_key") # Create real-time sync for database changes sync_connector = RealtimeSync( name="customer_realtime_sync", source_type="postgresql_cdc", config={ "host": "db.yourcompany.com", "port": 5432, "database": "production_db", "username": "sync_user", "password": "secure_password", "tables": ["customers", "orders", "products"], "ssl_mode": "require" } )
-
Define Sync Rules: Configure what data to sync and how to handle changes:
# Set up change detection rules sync_connector.add_sync_rule( table="customers", operations=["INSERT", "UPDATE", "DELETE"], filters={ "status": "active", "updated_at": "> YESTERDAY" } ) # Configure field mappings sync_connector.add_field_mapping("customers", { "customer_id": "id", "full_name": "name", "email_address": "email", "last_modified": "updated_at" }) # Set up conflict resolution sync_connector.set_conflict_resolution({ "strategy": "timestamp_wins", "timestamp_field": "updated_at" })
-
Handle Real-time Events: Define how to process incoming change events:
# Handle customer updates @sync_connector.on_change("customers") def handle_customer_change(event): operation = event.operation # INSERT, UPDATE, DELETE old_data = event.old_data new_data = event.new_data if operation == "INSERT": print(f"New customer: {new_data['name']}") # Trigger welcome workflow client.trigger_workflow("customer_welcome", new_data) elif operation == "UPDATE": print(f"Customer updated: {new_data['name']}") # Update search index client.update_search_index("customers", new_data) elif operation == "DELETE": print(f"Customer deleted: {old_data['name']}") # Archive customer data client.archive_record("customers", old_data['id']) return {"status": "processed", "operation": operation}
-
Test Your Sync Setup: Verify that your real-time sync is working correctly with test data:
# Test the sync connector test_result = sync_connector.test_connection() if test_result.success: print(f"Sync connection successful. Ready to process changes.") print(f"Connected tables: {test_result.tables}") else: print(f"Sync connection failed: {test_result.error}") # Simulate a test change event test_event = { "operation": "INSERT", "table": "customers", "new_data": { "id": "test_123", "name": "Test Customer", "email": "test@example.com", "created_at": "2024-01-01T00:00:00Z" } } # Process test event result = sync_connector.process_test_event(test_event) print(f"Test event result: {result}")
Troubleshooting¶
If you encounter any issues with real-time sync, refer to the Troubleshooting Guide.
See Also¶
- Data Ingestion - Batch data processing
- Webhook Processing - Handle incoming webhooks