Skip to main content

Data Synchronisation Setup

Data synchronisation connects field systems operating intermittently with central infrastructure, ensuring data collected offline reaches headquarters and reference data flows to field locations. This task establishes the synchronisation architecture, configures server and client components, sets schedules appropriate to available bandwidth, and enables conflict detection for cases where the same record changes in multiple locations before sync completes.

Prerequisites

RequirementDetail
Network assessmentDocumented bandwidth, latency, and availability for each field site
Storage capacityServer: 3x expected sync data volume for versioning. Client: 2x local data volume
Conflict strategyDocumented rules for handling concurrent edits (see Sync Conflict Resolution)
Identity configurationService accounts for sync processes with appropriate permissions
Database accessRead/write access to source and target databases
Time4-8 hours for initial setup; 1-2 hours per additional site

Verify network characteristics before proceeding. Measure actual bandwidth rather than relying on nominal connection speeds:

Terminal window
# Measure bandwidth to central server from field location
iperf3 -c sync.example.org -t 30 -P 4
# Expected output shows bandwidth in Mbits/sec
# Example: 2.45 Mbits/sec sender, 2.41 Mbits/sec receiver
# Measure latency
ping -c 100 sync.example.org | tail -1
# Example: rtt min/avg/max/mdev = 180.234/195.678/210.456/8.234 ms

Record these values for each field site. Bandwidth below 1 Mbps or latency above 500ms requires specific configuration adjustments documented in the procedure.

Synchronisation architecture

Before configuring components, select an architecture appropriate to your data patterns and infrastructure. Three primary architectures serve different requirements.

ARCHITECTURE A: HUB-AND-SPOKE (Centralised)
+------------------+
| |
| Central Server |
| (sync hub) |
| |
+--------+---------+
|
+---------------------+---------------------+
| | |
+-------v-------+ +-------v-------+ +-------v-------+
| | | | | |
| Field Site A | | Field Site B | | Field Site C |
| (spoke) | | (spoke) | | (spoke) |
| | | | | |
+---------------+ +---------------+ +---------------+
- All sync traffic routes through central server
- Field sites never sync directly with each other
- Central server holds authoritative copy
- Simplest conflict resolution (server wins or manual)
ARCHITECTURE B: MULTI-MASTER (Distributed)
+---------------+ +---------------+
| | | |
| Region Hub 1 |<--------->| Region Hub 2 |
| (master) | | (master) |
| | | |
+-------+-------+ +-------+-------+
| |
+-------v-------+ +-------v-------+
| | | |
| Field Sites | | Field Sites |
| (replicas) | | (replicas) |
| | | |
+---------------+ +---------------+
- Multiple authoritative nodes sync with each other
- Regional hubs reduce latency for field sites
- Requires vector clocks or similar for ordering
- Complex conflict resolution required
ARCHITECTURE C: EVENT-SOURCED (Append-only)
+---------------+ +---------------+ +---------------+
| | | | | |
| Field Site | | Field Site | | Field Site |
| (events) | | (events) | | (events) |
| | | | | |
+-------+-------+ +-------+-------+ +-------+-------+
| | |
+---------------------+---------------------+
|
+--------v---------+
| |
| Event Store |
| (append-only) |
| |
+--------+---------+
|
+--------v---------+
| |
| Materialised |
| Views |
| |
+------------------+
- Changes recorded as immutable events
- No overwrites, only new events
- Conflicts resolved by event ordering rules
- Full audit trail inherent in design

Figure 1: Three synchronisation architectures with different trade-offs

Select architecture based on these criteria:

CriterionHub-and-SpokeMulti-MasterEvent-Sourced
Setup complexityLowHighMedium
Conflict handlingSimpleComplexRule-based
Bandwidth efficiencyMediumLow (cross-sync)High (deltas only)
Offline toleranceHours to daysDays to weeksWeeks to months
Audit requirementsAdd-onAdd-onBuilt-in
Staff expertise neededBasic SQLDistributed systemsEvent modelling

For organisations with a single IT person or limited database expertise, hub-and-spoke provides the most maintainable option. The remainder of this procedure documents hub-and-spoke configuration; multi-master and event-sourced architectures require specialist implementation covered in Data Pipeline Design.

Procedure

Server-side configuration

  1. Create the synchronisation database schema. This schema tracks sync state independently from application data, enabling recovery without affecting production tables.
-- Create sync tracking schema
CREATE SCHEMA IF NOT EXISTS sync_control;
-- Track sync state per client
CREATE TABLE sync_control.client_state (
client_id VARCHAR(64) PRIMARY KEY,
client_name VARCHAR(255) NOT NULL,
last_sync_timestamp TIMESTAMP WITH TIME ZONE,
last_sync_version BIGINT DEFAULT 0,
sync_status VARCHAR(20) DEFAULT 'pending',
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Track individual table sync progress
CREATE TABLE sync_control.table_sync_state (
client_id VARCHAR(64) NOT NULL,
table_name VARCHAR(128) NOT NULL,
last_synced_version BIGINT DEFAULT 0,
last_synced_at TIMESTAMP WITH TIME ZONE,
row_count_synced BIGINT DEFAULT 0,
PRIMARY KEY (client_id, table_name),
FOREIGN KEY (client_id) REFERENCES sync_control.client_state(client_id)
);
-- Log sync operations for troubleshooting
CREATE TABLE sync_control.sync_log (
log_id BIGSERIAL PRIMARY KEY,
client_id VARCHAR(64) NOT NULL,
operation VARCHAR(50) NOT NULL,
table_name VARCHAR(128),
rows_affected BIGINT,
duration_ms INTEGER,
status VARCHAR(20),
error_message TEXT,
created_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
-- Index for performance
CREATE INDEX idx_sync_log_client_time
ON sync_control.sync_log(client_id, created_at DESC);

Verify schema creation:

SELECT table_name FROM information_schema.tables
WHERE table_schema = 'sync_control';

Expected output shows three tables: client_state, table_sync_state, sync_log.

  1. Add version tracking columns to tables requiring synchronisation. The sync version column enables incremental sync by identifying changed rows since last sync.
-- Add version tracking to existing tables
-- Example for a beneficiary registration table
ALTER TABLE programme.beneficiaries
ADD COLUMN IF NOT EXISTS sync_version BIGINT DEFAULT 0,
ADD COLUMN IF NOT EXISTS sync_modified_at TIMESTAMP WITH TIME ZONE
DEFAULT CURRENT_TIMESTAMP,
ADD COLUMN IF NOT EXISTS sync_deleted BOOLEAN DEFAULT FALSE,
ADD COLUMN IF NOT EXISTS sync_origin_client VARCHAR(64);
-- Create trigger to auto-increment version on changes
CREATE OR REPLACE FUNCTION sync_control.update_sync_version()
RETURNS TRIGGER AS $$
BEGIN
NEW.sync_version := COALESCE(
(SELECT MAX(sync_version) FROM programme.beneficiaries), 0
) + 1;
NEW.sync_modified_at := CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER trg_beneficiaries_sync_version
BEFORE INSERT OR UPDATE ON programme.beneficiaries
FOR EACH ROW EXECUTE FUNCTION sync_control.update_sync_version();
-- Index for efficient delta queries
CREATE INDEX idx_beneficiaries_sync_version
ON programme.beneficiaries(sync_version);

Repeat for each table requiring synchronisation. A typical field deployment synchronises 5-15 tables; adding version tracking to all tables takes 30-60 minutes.

  1. Create the sync API endpoints. This example uses a REST API; adapt to your application framework.
# sync_api.py - Server-side sync endpoints
from flask import Flask, request, jsonify
from datetime import datetime
import psycopg2
app = Flask(__name__)
# Configuration
BATCH_SIZE = 500 # Rows per sync batch
MAX_SYNC_DURATION = 300 # 5 minutes max per sync session
@app.route('/api/sync/changes/<table_name>', methods=['GET'])
def get_changes(table_name):
"""Return rows changed since client's last sync version."""
client_id = request.headers.get('X-Client-ID')
since_version = request.args.get('since_version', 0, type=int)
# Validate table name against allowlist
allowed_tables = ['beneficiaries', 'distributions', 'assessments']
if table_name not in allowed_tables:
return jsonify({'error': 'Table not syncable'}), 400
conn = get_db_connection()
cursor = conn.cursor()
# Fetch changed rows
cursor.execute(f"""
SELECT * FROM programme.{table_name}
WHERE sync_version > %s
ORDER BY sync_version ASC
LIMIT %s
""", (since_version, BATCH_SIZE))
rows = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
# Get max version in result set
max_version = max(row[columns.index('sync_version')]
for row in rows) if rows else since_version
# Log the sync operation
cursor.execute("""
INSERT INTO sync_control.sync_log
(client_id, operation, table_name, rows_affected, status)
VALUES (%s, 'pull', %s, %s, 'success')
""", (client_id, table_name, len(rows)))
conn.commit()
return jsonify({
'table': table_name,
'rows': [dict(zip(columns, row)) for row in rows],
'max_version': max_version,
'has_more': len(rows) == BATCH_SIZE
})
@app.route('/api/sync/push/<table_name>', methods=['POST'])
def push_changes(table_name):
"""Receive rows from client and merge into server database."""
client_id = request.headers.get('X-Client-ID')
data = request.json
conn = get_db_connection()
cursor = conn.cursor()
merged = 0
conflicts = []
for row in data['rows']:
# Check for conflicts (server version newer than client's base)
cursor.execute(f"""
SELECT sync_version FROM programme.{table_name}
WHERE id = %s
""", (row['id'],))
server_row = cursor.fetchone()
if server_row and server_row[0] > row.get('base_version', 0):
# Conflict detected - record for resolution
conflicts.append({
'id': row['id'],
'server_version': server_row[0],
'client_version': row.get('base_version', 0)
})
continue
# No conflict - merge the row
# Use UPSERT to handle both new and updated rows
columns = [k for k in row.keys() if k != 'base_version']
values = [row[k] for k in columns]
placeholders = ', '.join(['%s'] * len(columns))
update_clause = ', '.join([f"{c} = EXCLUDED.{c}" for c in columns
if c != 'id'])
cursor.execute(f"""
INSERT INTO programme.{table_name} ({', '.join(columns)})
VALUES ({placeholders})
ON CONFLICT (id) DO UPDATE SET {update_clause}
""", values)
merged += 1
# Update client sync state
cursor.execute("""
UPDATE sync_control.client_state
SET last_sync_timestamp = %s, sync_status = 'active'
WHERE client_id = %s
""", (datetime.utcnow(), client_id))
conn.commit()
return jsonify({
'merged': merged,
'conflicts': conflicts,
'status': 'partial' if conflicts else 'success'
})

Deploy the API behind HTTPS with authentication. The sync endpoints require the client to authenticate; configure your identity provider to issue tokens for sync service accounts.

  1. Register field site clients. Each field location receives a unique client identifier used for tracking sync state and attributing data origin.
-- Register field sites
INSERT INTO sync_control.client_state (client_id, client_name)
VALUES
('site-nairobi-01', 'Nairobi Field Office'),
('site-kampala-01', 'Kampala Field Office'),
('site-juba-01', 'Juba Field Office');
-- Initialise table sync state for each client
INSERT INTO sync_control.table_sync_state (client_id, table_name)
SELECT c.client_id, t.table_name
FROM sync_control.client_state c
CROSS JOIN (VALUES
('beneficiaries'),
('distributions'),
('assessments')
) AS t(table_name);

Verify registration:

SELECT client_id, client_name, sync_status
FROM sync_control.client_state;

Output shows all registered clients with pending status until first sync completes.

Client-side configuration

  1. Install the sync client software on field devices. The client maintains a local database, queues changes during offline periods, and executes sync when connectivity allows.
Terminal window
# Create sync directory structure
mkdir -p /opt/field-sync/{data,logs,queue}
# Set permissions for sync service account
chown -R sync-service:sync-service /opt/field-sync
chmod 750 /opt/field-sync
# Install dependencies (Debian/Ubuntu)
apt-get update
apt-get install -y sqlite3 python3 python3-pip
pip3 install requests schedule
  1. Configure the local database. SQLite provides reliable local storage without requiring a database server on field devices.
-- /opt/field-sync/data/local.db
-- Initialise local database with sync tracking
CREATE TABLE IF NOT EXISTS sync_queue (
queue_id INTEGER PRIMARY KEY AUTOINCREMENT,
table_name TEXT NOT NULL,
row_id TEXT NOT NULL,
operation TEXT NOT NULL, -- 'insert', 'update', 'delete'
row_data TEXT NOT NULL, -- JSON serialised row
base_version INTEGER,
queued_at TEXT DEFAULT (datetime('now')),
sync_attempts INTEGER DEFAULT 0,
last_attempt TEXT,
status TEXT DEFAULT 'pending'
);
CREATE TABLE IF NOT EXISTS sync_state (
table_name TEXT PRIMARY KEY,
last_synced_version INTEGER DEFAULT 0,
last_synced_at TEXT,
rows_pending INTEGER DEFAULT 0
);
CREATE INDEX idx_queue_status ON sync_queue(status, table_name);
-- Initialise sync state for each table
INSERT OR IGNORE INTO sync_state (table_name)
VALUES ('beneficiaries'), ('distributions'), ('assessments');
  1. Create the sync client script. This script handles bidirectional synchronisation, managing both push (local changes to server) and pull (server changes to local).
/opt/field-sync/sync_client.py
#!/usr/bin/env python3
import sqlite3
import requests
import json
import logging
from datetime import datetime
import os
# Configuration
CONFIG = {
'server_url': 'https://sync.example.org/api/sync',
'client_id': 'site-nairobi-01', # Set per installation
'api_token': os.environ.get('SYNC_API_TOKEN'),
'local_db': '/opt/field-sync/data/local.db',
'batch_size': 100,
'timeout': 60,
'max_retries': 3
}
logging.basicConfig(
filename='/opt/field-sync/logs/sync.log',
level=logging.INFO,
format='%(asctime)s %(levelname)s: %(message)s'
)
def get_local_db():
return sqlite3.connect(CONFIG['local_db'])
def push_changes(table_name):
"""Push queued local changes to server."""
conn = get_local_db()
cursor = conn.cursor()
# Get pending changes
cursor.execute("""
SELECT queue_id, row_id, operation, row_data, base_version
FROM sync_queue
WHERE table_name = ? AND status = 'pending'
ORDER BY queued_at ASC
LIMIT ?
""", (table_name, CONFIG['batch_size']))
pending = cursor.fetchall()
if not pending:
return {'pushed': 0, 'status': 'no_changes'}
# Prepare batch for server
rows = []
queue_ids = []
for queue_id, row_id, operation, row_data, base_version in pending:
row = json.loads(row_data)
row['base_version'] = base_version
row['_operation'] = operation
rows.append(row)
queue_ids.append(queue_id)
# Send to server
try:
response = requests.post(
f"{CONFIG['server_url']}/push/{table_name}",
json={'rows': rows},
headers={
'X-Client-ID': CONFIG['client_id'],
'Authorization': f"Bearer {CONFIG['api_token']}"
},
timeout=CONFIG['timeout']
)
response.raise_for_status()
result = response.json()
# Mark successfully pushed rows
merged_ids = queue_ids[:result['merged']]
if merged_ids:
placeholders = ','.join('?' * len(merged_ids))
cursor.execute(f"""
UPDATE sync_queue SET status = 'synced'
WHERE queue_id IN ({placeholders})
""", merged_ids)
# Mark conflicts for resolution
for conflict in result.get('conflicts', []):
cursor.execute("""
UPDATE sync_queue SET status = 'conflict'
WHERE table_name = ? AND row_id = ?
""", (table_name, conflict['id']))
conn.commit()
logging.info(f"Pushed {result['merged']} rows to {table_name}, "
f"{len(result.get('conflicts', []))} conflicts")
return result
except requests.RequestException as e:
# Update retry count
placeholders = ','.join('?' * len(queue_ids))
cursor.execute(f"""
UPDATE sync_queue
SET sync_attempts = sync_attempts + 1,
last_attempt = datetime('now')
WHERE queue_id IN ({placeholders})
""", queue_ids)
conn.commit()
logging.error(f"Push failed for {table_name}: {e}")
return {'pushed': 0, 'status': 'error', 'error': str(e)}
def pull_changes(table_name):
"""Pull changes from server to local database."""
conn = get_local_db()
cursor = conn.cursor()
# Get current sync position
cursor.execute("""
SELECT last_synced_version FROM sync_state
WHERE table_name = ?
""", (table_name,))
row = cursor.fetchone()
since_version = row[0] if row else 0
try:
response = requests.get(
f"{CONFIG['server_url']}/changes/{table_name}",
params={'since_version': since_version},
headers={
'X-Client-ID': CONFIG['client_id'],
'Authorization': f"Bearer {CONFIG['api_token']}"
},
timeout=CONFIG['timeout']
)
response.raise_for_status()
result = response.json()
# Apply changes to local application database
# This requires application-specific logic
applied = apply_server_changes(table_name, result['rows'])
# Update sync state
cursor.execute("""
UPDATE sync_state
SET last_synced_version = ?,
last_synced_at = datetime('now')
WHERE table_name = ?
""", (result['max_version'], table_name))
conn.commit()
logging.info(f"Pulled {applied} rows for {table_name}, "
f"version now {result['max_version']}")
# Continue if more data available
if result.get('has_more'):
return pull_changes(table_name)
return {'pulled': applied, 'status': 'success'}
except requests.RequestException as e:
logging.error(f"Pull failed for {table_name}: {e}")
return {'pulled': 0, 'status': 'error', 'error': str(e)}
def run_sync():
"""Execute full sync cycle for all tables."""
tables = ['beneficiaries', 'distributions', 'assessments']
results = {}
for table in tables:
# Push first to minimise conflict window
push_result = push_changes(table)
pull_result = pull_changes(table)
results[table] = {
'push': push_result,
'pull': pull_result
}
return results
if __name__ == '__main__':
result = run_sync()
print(json.dumps(result, indent=2))
  1. Configure the sync schedule. Schedule sync to run during periods of expected connectivity, avoiding peak bandwidth competition.
/etc/cron.d/field-sync
# Run sync every 2 hours during working hours
0 6,8,10,12,14,16,18 * * * sync-service /opt/field-sync/sync_client.py >> /opt/field-sync/logs/cron.log 2>&1
# Run comprehensive sync overnight when bandwidth is available
0 2 * * * sync-service /opt/field-sync/sync_client.py --full >> /opt/field-sync/logs/cron.log 2>&1

For sites with unpredictable connectivity, configure opportunistic sync using a connectivity monitor:

/opt/field-sync/connectivity_monitor.py
#!/usr/bin/env python3
import subprocess
import time
import os
CHECK_INTERVAL = 60 # seconds
MIN_BANDWIDTH_KBPS = 256 # minimum bandwidth to trigger sync
SYNC_COOLDOWN = 1800 # 30 minutes between sync attempts
last_sync = 0
def check_connectivity():
"""Test if sync server is reachable with adequate bandwidth."""
try:
# Quick ping test
result = subprocess.run(
['ping', '-c', '3', '-W', '5', 'sync.example.org'],
capture_output=True, timeout=20
)
if result.returncode != 0:
return False
# Basic bandwidth test (download small file)
start = time.time()
result = subprocess.run(
['curl', '-s', '-o', '/dev/null', '-w', '%{size_download}',
'https://sync.example.org/api/health'],
capture_output=True, timeout=30
)
elapsed = time.time() - start
bytes_downloaded = int(result.stdout)
bandwidth_kbps = (bytes_downloaded * 8) / elapsed / 1000
return bandwidth_kbps >= MIN_BANDWIDTH_KBPS
except Exception:
return False
def run_if_connected():
global last_sync
current_time = time.time()
if current_time - last_sync < SYNC_COOLDOWN:
return
if check_connectivity():
os.system('/opt/field-sync/sync_client.py')
last_sync = current_time
if __name__ == '__main__':
while True:
run_if_connected()
time.sleep(CHECK_INTERVAL)
  1. Configure bandwidth management. Prevent sync from consuming all available bandwidth by implementing rate limiting and transfer windows.
# Add to sync_client.py CONFIG
BANDWIDTH_CONFIG = {
'max_upload_kbps': 512, # Limit upload to 512 Kbps
'max_download_kbps': 1024, # Limit download to 1 Mbps
'chunk_size_kb': 64, # Transfer in 64KB chunks
'inter_chunk_delay_ms': 50, # Delay between chunks
'peak_hours': [(9, 12), (14, 17)], # Reduce during peak
'peak_reduction_pct': 50 # Use 50% of limits during peak
}
def get_current_limits():
"""Return bandwidth limits adjusted for time of day."""
from datetime import datetime
hour = datetime.now().hour
in_peak = any(start <= hour < end
for start, end in BANDWIDTH_CONFIG['peak_hours'])
multiplier = (100 - BANDWIDTH_CONFIG['peak_reduction_pct']) / 100 if in_peak else 1.0
return {
'upload': int(BANDWIDTH_CONFIG['max_upload_kbps'] * multiplier),
'download': int(BANDWIDTH_CONFIG['max_download_kbps'] * multiplier)
}

Conflict detection configuration

  1. Configure conflict detection rules on the server. The conflict detection strategy determines how the system identifies when the same record changed in multiple locations.
-- Add conflict detection configuration table
CREATE TABLE sync_control.conflict_rules (
table_name VARCHAR(128) PRIMARY KEY,
detection_method VARCHAR(50) NOT NULL,
resolution_strategy VARCHAR(50) NOT NULL,
priority_field VARCHAR(64),
merge_fields TEXT[], -- Fields that can be auto-merged
notify_on_conflict BOOLEAN DEFAULT TRUE
);
-- Configure per-table rules
INSERT INTO sync_control.conflict_rules VALUES
-- Beneficiary data: server wins, notify for review
('beneficiaries', 'version_compare', 'server_wins', NULL,
ARRAY['phone_secondary', 'notes'], TRUE),
-- Distributions: last-write-wins by timestamp
('distributions', 'timestamp_compare', 'last_write_wins',
'sync_modified_at', NULL, FALSE),
-- Assessments: require manual resolution
('assessments', 'version_compare', 'manual', NULL, NULL, TRUE);
  1. Implement conflict detection in the push handler. Extend the server API to apply configured rules:
def detect_conflict(table_name, client_row, server_row, cursor):
"""Determine if a conflict exists and how to handle it."""
# Get conflict rules for this table
cursor.execute("""
SELECT detection_method, resolution_strategy,
priority_field, merge_fields
FROM sync_control.conflict_rules
WHERE table_name = %s
""", (table_name,))
rule = cursor.fetchone()
if not rule:
# Default: version comparison, server wins
return detect_version_conflict(client_row, server_row)
method, strategy, priority_field, merge_fields = rule
if method == 'version_compare':
if server_row['sync_version'] > client_row.get('base_version', 0):
return {'is_conflict': True, 'strategy': strategy}
return {'is_conflict': False}
elif method == 'timestamp_compare':
server_time = server_row[priority_field]
client_time = client_row.get(priority_field)
if server_time and client_time and server_time > client_time:
return {'is_conflict': True, 'strategy': strategy}
return {'is_conflict': False}
return {'is_conflict': False}
def resolve_conflict(strategy, client_row, server_row, merge_fields):
"""Apply resolution strategy and return merged row."""
if strategy == 'server_wins':
return server_row
elif strategy == 'client_wins':
return client_row
elif strategy == 'last_write_wins':
if client_row.get('sync_modified_at', '') > server_row.get('sync_modified_at', ''):
return client_row
return server_row
elif strategy == 'field_merge' and merge_fields:
# Merge specified fields from client, keep rest from server
merged = server_row.copy()
for field in merge_fields:
if field in client_row and client_row[field]:
merged[field] = client_row[field]
return merged
elif strategy == 'manual':
# Record for manual resolution
return None
return server_row # Default fallback
  1. Configure conflict notifications. Alert staff when conflicts require attention:
# Add to sync_api.py
def notify_conflict(table_name, record_id, client_id, conflict_details):
"""Send notification for manual conflict resolution."""
notification = {
'type': 'sync_conflict',
'table': table_name,
'record_id': record_id,
'client': client_id,
'server_version': conflict_details['server_version'],
'client_version': conflict_details['client_version'],
'timestamp': datetime.utcnow().isoformat()
}
# Store in notification queue
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute("""
INSERT INTO sync_control.conflict_notifications
(table_name, record_id, client_id, details, status)
VALUES (%s, %s, %s, %s, 'pending')
""", (table_name, record_id, client_id, json.dumps(notification)))
conn.commit()
# Send email to data manager
send_email(
to='data-manager@example.org',
subject=f'Sync conflict requires resolution: {table_name}',
body=f"""
A sync conflict occurred that requires manual resolution.
Table: {table_name}
Record ID: {record_id}
Field Client: {client_id}
Please review in the data management interface.
"""
)

Priority and selective sync

  1. Configure table priority for sync ordering. Higher priority tables sync first, ensuring critical data transfers even when connectivity drops mid-sync.
-- Add priority configuration
ALTER TABLE sync_control.table_sync_state
ADD COLUMN sync_priority INTEGER DEFAULT 50,
ADD COLUMN sync_direction VARCHAR(20) DEFAULT 'bidirectional',
ADD COLUMN sync_filter TEXT;
-- Set priorities (lower number = higher priority)
UPDATE sync_control.table_sync_state SET sync_priority = 10
WHERE table_name = 'beneficiaries'; -- Critical registration data
UPDATE sync_control.table_sync_state SET sync_priority = 20
WHERE table_name = 'distributions'; -- Distribution records
UPDATE sync_control.table_sync_state SET sync_priority = 30
WHERE table_name = 'assessments'; -- Assessment forms
  1. Configure selective sync filters to reduce data volume. Field sites synchronise only relevant subsets:
-- Sync only active beneficiaries for each site's region
UPDATE sync_control.table_sync_state
SET sync_filter = 'region_code = ''NAIROBI'' AND status = ''active'''
WHERE client_id = 'site-nairobi-01' AND table_name = 'beneficiaries';
-- Sync only recent distributions (last 90 days)
UPDATE sync_control.table_sync_state
SET sync_filter = 'distribution_date > CURRENT_DATE - INTERVAL ''90 days'''
WHERE table_name = 'distributions';

Apply filters in the server sync endpoint:

def get_changes_with_filter(table_name, client_id, since_version):
cursor = get_db_connection().cursor()
# Get client-specific filter
cursor.execute("""
SELECT sync_filter FROM sync_control.table_sync_state
WHERE client_id = %s AND table_name = %s
""", (client_id, table_name))
row = cursor.fetchone()
filter_clause = f"AND ({row[0]})" if row and row[0] else ""
query = f"""
SELECT * FROM programme.{table_name}
WHERE sync_version > %s {filter_clause}
ORDER BY sync_version ASC
LIMIT %s
"""
cursor.execute(query, (since_version, BATCH_SIZE))
return cursor.fetchall()

Monitoring and alerting

  1. Create monitoring views for sync health:
-- View: Current sync status across all clients
CREATE VIEW sync_control.sync_health AS
SELECT
cs.client_id,
cs.client_name,
cs.last_sync_timestamp,
EXTRACT(EPOCH FROM (NOW() - cs.last_sync_timestamp))/3600 AS hours_since_sync,
cs.sync_status,
COUNT(CASE WHEN sq.status = 'pending' THEN 1 END) AS pending_changes,
COUNT(CASE WHEN sq.status = 'conflict' THEN 1 END) AS unresolved_conflicts
FROM sync_control.client_state cs
LEFT JOIN sync_control.sync_queue sq ON cs.client_id = sq.client_id
GROUP BY cs.client_id, cs.client_name, cs.last_sync_timestamp, cs.sync_status;
-- View: Sync performance metrics
CREATE VIEW sync_control.sync_metrics AS
SELECT
client_id,
DATE(created_at) AS sync_date,
COUNT(*) AS sync_operations,
SUM(rows_affected) AS total_rows,
AVG(duration_ms) AS avg_duration_ms,
COUNT(CASE WHEN status = 'error' THEN 1 END) AS errors
FROM sync_control.sync_log
WHERE created_at > NOW() - INTERVAL '30 days'
GROUP BY client_id, DATE(created_at);
  1. Configure alerts for sync failures:
-- Function to check sync health and generate alerts
CREATE OR REPLACE FUNCTION sync_control.check_sync_alerts()
RETURNS TABLE (
alert_type VARCHAR,
client_id VARCHAR,
message TEXT
) AS $$
BEGIN
-- Alert: Client not synced in 24 hours
RETURN QUERY
SELECT
'sync_stale'::VARCHAR,
cs.client_id,
format('Client %s has not synced in %s hours',
cs.client_name,
ROUND(EXTRACT(EPOCH FROM (NOW() - cs.last_sync_timestamp))/3600))
FROM sync_control.client_state cs
WHERE cs.last_sync_timestamp < NOW() - INTERVAL '24 hours';
-- Alert: High conflict rate
RETURN QUERY
SELECT
'high_conflicts'::VARCHAR,
sl.client_id,
format('Client %s has %s unresolved conflicts', sl.client_id, conflict_count)
FROM (
SELECT client_id, COUNT(*) AS conflict_count
FROM sync_control.sync_log
WHERE status = 'conflict'
AND created_at > NOW() - INTERVAL '7 days'
GROUP BY client_id
HAVING COUNT(*) > 10
) sl;
-- Alert: Repeated sync failures
RETURN QUERY
SELECT
'sync_errors'::VARCHAR,
sl.client_id,
format('Client %s has %s sync errors in last 24 hours', sl.client_id, error_count)
FROM (
SELECT client_id, COUNT(*) AS error_count
FROM sync_control.sync_log
WHERE status = 'error'
AND created_at > NOW() - INTERVAL '24 hours'
GROUP BY client_id
HAVING COUNT(*) > 5
) sl;
END;
$$ LANGUAGE plpgsql;
  1. Create a monitoring dashboard query for operations staff:
-- Daily sync health dashboard
SELECT
client_name,
CASE
WHEN hours_since_sync < 4 THEN 'healthy'
WHEN hours_since_sync < 24 THEN 'warning'
ELSE 'critical'
END AS status,
hours_since_sync,
pending_changes,
unresolved_conflicts
FROM sync_control.sync_health
ORDER BY hours_since_sync DESC;

Verification

After completing configuration, verify synchronisation operates correctly.

Server verification:

-- Verify sync schema exists
SELECT COUNT(*) FROM information_schema.tables
WHERE table_schema = 'sync_control';
-- Expected: 4 (client_state, table_sync_state, sync_log, conflict_rules)
-- Verify clients registered
SELECT client_id, sync_status FROM sync_control.client_state;
-- Expected: All configured clients listed with 'pending' status
-- Verify version tracking on application tables
SELECT column_name FROM information_schema.columns
WHERE table_name = 'beneficiaries' AND column_name LIKE 'sync_%';
-- Expected: sync_version, sync_modified_at, sync_deleted, sync_origin_client

Client verification:

Terminal window
# Verify sync client installation
ls -la /opt/field-sync/
# Expected: data/, logs/, queue/ directories present
# Verify local database
sqlite3 /opt/field-sync/data/local.db ".tables"
# Expected: sync_queue, sync_state
# Test connectivity to server
curl -s -o /dev/null -w "%{http_code}" \
-H "Authorization: Bearer $SYNC_API_TOKEN" \
https://sync.example.org/api/health
# Expected: 200
# Run manual sync and verify
/opt/field-sync/sync_client.py
# Expected: JSON output showing sync results for each table

End-to-end verification:

Terminal window
# Create test record on server
psql -c "INSERT INTO programme.beneficiaries (id, name) VALUES ('test-001', 'Test Record');"
# Wait for sync cycle or trigger manually on client
/opt/field-sync/sync_client.py
# Verify record appears in client local database
sqlite3 /opt/field-sync/data/local.db "SELECT * FROM beneficiaries WHERE id = 'test-001';"
# Expected: Test record visible
# Modify on client
sqlite3 /opt/field-sync/data/local.db "UPDATE beneficiaries SET name = 'Modified' WHERE id = 'test-001';"
# Queue and sync
/opt/field-sync/sync_client.py
# Verify modification on server
psql -c "SELECT name FROM programme.beneficiaries WHERE id = 'test-001';"
# Expected: 'Modified'
# Clean up test data
psql -c "DELETE FROM programme.beneficiaries WHERE id = 'test-001';"

Troubleshooting

SymptomCauseResolution
Sync client reports “Connection refused”Server not running or firewall blockingVerify server process running: systemctl status sync-api. Check firewall allows port 443 from client IP
”Authentication failed” in sync logInvalid or expired API tokenRegenerate token: ./generate_sync_token.py --client site-nairobi-01. Update SYNC_API_TOKEN on client
Sync completes but no data transferredClient sync position ahead of actual dataReset sync position: UPDATE sync_state SET last_synced_version = 0 WHERE table_name = 'beneficiaries';
”Version mismatch” errorsDatabase schema differs between server and clientVerify schemas match. Run migrations on both server and client local database
Sync takes hours to completeInitial sync of large dataset over slow connectionEnable selective sync filters. Consider physical data transfer for initial load
High conflict rate (>5% of records)Multiple sites editing same recordsReview data ownership model. Implement record locking or partition data by site
Sync queue growing continuouslyPush failures not clearing queueCheck server logs for rejection reason. Verify queue entries have valid data
”Disk full” on clientSync queue or local database consuming spacePurge old sync queue entries: DELETE FROM sync_queue WHERE status = 'synced' AND queued_at < date('now', '-7 days');
Duplicate records appearingConflict resolution creating new records instead of updatingVerify primary key handling in upsert logic. Check ON CONFLICT clause specifies correct constraint
Sync log shows success but application data not updatedSync client not updating application databaseVerify apply_server_changes() function implementation. Check application database connection
Performance degrades over timeSync tables lacking indexes on version columnsAdd indexes: CREATE INDEX IF NOT EXISTS idx_sync_version ON programme.beneficiaries(sync_version);
Clients show “stale” in monitoring but are syncingSystem clock drift between client and serverConfigure NTP on both systems. Verify timezone settings match
Bandwidth limits not being appliedRate limiting code not executingVerify get_current_limits() returns expected values. Check bandwidth config loaded correctly
Selective sync filter not workingFilter syntax error or invalid column referenceTest filter manually in SQL. Verify column names match schema

Data integrity

Never manually modify sync control tables while sync processes are running. Stop sync services on all clients before making schema changes or resetting sync positions.

See also