Data Migration
Data migration transfers data from source systems to target systems as a one-time or time-bounded operation, distinct from ongoing data pipelines that move data continuously. You perform migrations when replacing applications, consolidating databases, moving to cloud platforms, or decommissioning legacy systems. A successful migration preserves data integrity, maintains business continuity, and provides rollback capability until the target system is validated.
Prerequisites
Before beginning migration, gather the following information and secure the necessary access.
Source system requirements
| Requirement | Detail |
|---|---|
| Read access | Database credentials with SELECT permissions on all migration-scope tables |
| Schema documentation | Entity-relationship diagrams or data dictionary for source system |
| Data volumes | Row counts and storage sizes for each table in scope |
| Data profiling results | Quality assessment identifying nulls, duplicates, format variations |
| Change velocity | Records created or modified per hour during business operations |
| Maintenance windows | Periods when source system changes can be frozen |
Extract source row counts for baseline reconciliation:
-- PostgreSQL: Generate row counts for all tables in schemaSELECT schemaname, relname AS table_name, n_live_tup AS row_countFROM pg_stat_user_tablesWHERE schemaname = 'public'ORDER BY n_live_tup DESC;-- MySQL: Generate row counts for all tables in databaseSELECT table_name, table_rows AS estimated_row_count, ROUND(data_length / 1024 / 1024, 2) AS data_size_mbFROM information_schema.tablesWHERE table_schema = 'production_db'ORDER BY table_rows DESC;Target system requirements
| Requirement | Detail |
|---|---|
| Write access | Database credentials with INSERT, UPDATE, DELETE, and DDL permissions |
| Schema deployed | Target tables, indexes, and constraints created (constraints may be disabled during load) |
| Storage capacity | 2.5x the source data size to accommodate staging, transformation, and final tables |
| Compute capacity | Sufficient CPU and memory for transformation processing |
| Network bandwidth | Minimum 100 Mbps between source and target for datasets under 100 GB |
Documentation and approvals
Obtain written sign-off from data owners for each dataset in scope. Document the migration scope in a migration specification that includes source-to-target field mappings, transformation rules, and acceptance criteria. The specification serves as the authoritative reference throughout the migration and forms the basis for validation queries.
Tools and environment
Install migration tooling in an environment with network access to both source and target systems:
# Install common migration toolspip install pandas sqlalchemy psycopg2-binary pymysql --break-system-packages
# For large-scale migrations, install Apache Spark# Requires Java 11+ runtimewget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgztar -xzf spark-3.5.0-bin-hadoop3.tgzexport SPARK_HOME=$(pwd)/spark-3.5.0-bin-hadoop3export PATH=$SPARK_HOME/bin:$PATHVerify connectivity to both systems:
# Test source connection (PostgreSQL example)psql -h source-db.example.org -U migration_user -d source_db -c "SELECT 1;"
# Test target connectionpsql -h target-db.example.org -U migration_user -d target_db -c "SELECT 1;"Procedure
Phase 1: Extraction
- Create extraction queries for each source table. Include all columns required by the target schema plus any columns needed for transformation logic. Add a checksum column for row-level validation:
-- Extract with checksum for validation SELECT id, beneficiary_name, registration_date, location_code, programme_id, created_at, updated_at, MD5(CONCAT_WS('|', id, beneficiary_name, registration_date, location_code, programme_id)) AS row_checksum FROM beneficiaries WHERE registration_date >= '2020-01-01';- Execute extraction to staging files. For datasets under 10 million rows, CSV format with gzip compression balances portability and performance:
# Extract to compressed CSV psql -h source-db.example.org -U migration_user -d source_db \ -c "\COPY (SELECT * FROM beneficiaries WHERE registration_date >= '2020-01-01') TO STDOUT WITH CSV HEADER" \ | gzip > beneficiaries_extract_$(date +%Y%m%d).csv.gz
# Record extraction metadata echo "Extraction completed: $(date)" >> migration_log.txt echo "Source row count: $(zcat beneficiaries_extract_*.csv.gz | wc -l)" >> migration_log.txt- For datasets exceeding 10 million rows, use parallel extraction by partitioning on a distribution key:
# Parallel extraction using ID ranges (4 parallel streams) for i in {0..3}; do psql -h source-db.example.org -U migration_user -d source_db \ -c "\COPY (SELECT * FROM beneficiaries WHERE id % 4 = $i) TO STDOUT WITH CSV HEADER" \ | gzip > beneficiaries_part${i}_$(date +%Y%m%d).csv.gz & done wait echo "Parallel extraction complete"- Validate extraction completeness by comparing row counts:
# Count extracted rows (subtract 1 for header) EXTRACTED=$(zcat beneficiaries_extract_*.csv.gz | wc -l) EXTRACTED=$((EXTRACTED - 1))
# Compare to source count SOURCE_COUNT=$(psql -h source-db.example.org -U migration_user -d source_db -t \ -c "SELECT COUNT(*) FROM beneficiaries WHERE registration_date >= '2020-01-01'")
if [ "$EXTRACTED" -eq "$SOURCE_COUNT" ]; then echo "Extraction validated: $EXTRACTED rows" else echo "ERROR: Count mismatch. Source: $SOURCE_COUNT, Extracted: $EXTRACTED" exit 1 fiPhase 2: Transformation
Transformation converts extracted data to match target schema requirements. Create transformation scripts that are idempotent and produce identical output when run multiple times on the same input.
- Load extracted data into a transformation environment. Python with pandas handles datasets up to 5 GB in memory; larger datasets require Spark or database-based transformation:
import pandas as pd import hashlib
# Load extracted data df = pd.read_csv('beneficiaries_extract_20240115.csv.gz', compression='gzip') print(f"Loaded {len(df)} rows")
# Document initial state initial_count = len(df) initial_nulls = df.isnull().sum().to_dict()- Apply field-level transformations according to the mapping specification. Document each transformation with the business rule it implements:
# Transformation 1: Standardise location codes to ISO 3166-2 # Business rule: Legacy 4-digit codes map to ISO format per reference table location_mapping = pd.read_csv('location_code_mapping.csv') df = df.merge(location_mapping, left_on='location_code', right_on='legacy_code', how='left') df['location_code'] = df['iso_code'].fillna(df['location_code']) df = df.drop(columns=['legacy_code', 'iso_code'])
# Transformation 2: Convert registration_date from DD/MM/YYYY to ISO 8601 # Business rule: All dates stored in ISO 8601 format df['registration_date'] = pd.to_datetime(df['registration_date'], format='%d/%m/%Y').dt.strftime('%Y-%m-%d')
# Transformation 3: Generate surrogate key for target system # Business rule: Target uses UUID primary keys import uuid df['target_id'] = [str(uuid.uuid4()) for _ in range(len(df))]
# Transformation 4: Map programme IDs to new reference data # Business rule: Programme consolidation merged 3 legacy programmes into 1 programme_mapping = {101: 'PROG-2024-001', 102: 'PROG-2024-001', 103: 'PROG-2024-001', 104: 'PROG-2024-002'} df['programme_id'] = df['programme_id'].map(programme_mapping)- Handle data quality issues discovered during profiling. Log all modifications for audit:
# Track modifications modifications = []
# Handle null beneficiary names (reject - required field) null_names = df[df['beneficiary_name'].isnull()] if len(null_names) > 0: modifications.append(f"Rejected {len(null_names)} rows with null beneficiary_name") df = df[df['beneficiary_name'].notnull()]
# Handle duplicate source IDs (keep most recent by updated_at) duplicates = df[df.duplicated(subset=['id'], keep=False)] if len(duplicates) > 0: df = df.sort_values('updated_at', ascending=False).drop_duplicates(subset=['id'], keep='first') modifications.append(f"Deduplicated {len(duplicates)} rows, kept most recent")
# Handle invalid location codes (map to 'UNKNOWN' for manual review) valid_locations = set(location_mapping['iso_code'].dropna()) invalid_locations = df[~df['location_code'].isin(valid_locations)] if len(invalid_locations) > 0: df.loc[~df['location_code'].isin(valid_locations), 'location_code'] = 'UNKNOWN' df.loc[~df['location_code'].isin(valid_locations), 'requires_review'] = True modifications.append(f"Flagged {len(invalid_locations)} rows with invalid location codes")
# Write modification log with open('transformation_log.txt', 'w') as f: f.write(f"Transformation completed: {pd.Timestamp.now()}\n") f.write(f"Input rows: {initial_count}\n") f.write(f"Output rows: {len(df)}\n") for mod in modifications: f.write(f" - {mod}\n")- Recalculate checksums after transformation for target validation:
# Generate new checksum based on target field values def generate_checksum(row): values = '|'.join([str(row['target_id']), str(row['beneficiary_name']), str(row['registration_date']), str(row['location_code']), str(row['programme_id'])]) return hashlib.md5(values.encode()).hexdigest()
df['target_checksum'] = df.apply(generate_checksum, axis=1)- Export transformed data for loading:
# Export to CSV for loading output_columns = ['target_id', 'beneficiary_name', 'registration_date', 'location_code', 'programme_id', 'requires_review', 'target_checksum'] df[output_columns].to_csv('beneficiaries_transformed.csv', index=False)
print(f"Exported {len(df)} rows for loading")Phase 3: Loading
Load transformed data into the target system. The loading approach depends on data volume, acceptable downtime, and target database capabilities.
- Disable constraints and indexes on target tables to accelerate bulk loading:
-- Disable foreign key checks (PostgreSQL) ALTER TABLE beneficiaries DISABLE TRIGGER ALL;
-- Drop indexes (recreate after load) DROP INDEX IF EXISTS idx_beneficiaries_location; DROP INDEX IF EXISTS idx_beneficiaries_programme; DROP INDEX IF EXISTS idx_beneficiaries_registration_date;- Load data using the target database’s bulk import facility:
# PostgreSQL COPY (fastest for large datasets) psql -h target-db.example.org -U migration_user -d target_db \ -c "\COPY beneficiaries(id, beneficiary_name, registration_date, location_code, programme_id, requires_review) FROM 'beneficiaries_transformed.csv' WITH CSV HEADER"
# Record load completion echo "Load completed: $(date)" >> migration_log.txtFor datasets exceeding 50 GB, use parallel loading with table partitioning or multiple connections:
# Split transformed file for parallel loading split -l 1000000 beneficiaries_transformed.csv beneficiaries_chunk_
# Load chunks in parallel (4 concurrent connections) for chunk in beneficiaries_chunk_*; do psql -h target-db.example.org -U migration_user -d target_db \ -c "\COPY beneficiaries FROM '$chunk' WITH CSV" & # Limit concurrent connections while [ $(jobs -r | wc -l) -ge 4 ]; do sleep 1; done done wait- Re-enable constraints and rebuild indexes:
-- Re-enable triggers ALTER TABLE beneficiaries ENABLE TRIGGER ALL;
-- Rebuild indexes CREATE INDEX idx_beneficiaries_location ON beneficiaries(location_code); CREATE INDEX idx_beneficiaries_programme ON beneficiaries(programme_id); CREATE INDEX idx_beneficiaries_registration_date ON beneficiaries(registration_date);
-- Update statistics for query optimiser ANALYZE beneficiaries;- Verify load completeness:
-- Count loaded rows SELECT COUNT(*) AS loaded_count FROM beneficiaries;
-- Compare to transformation output -- Expected: matches row count from transformation phasePhase 4: Validation
Validation confirms that migrated data matches source data and meets quality requirements. Run validation before decommissioning source systems or enabling production traffic on target systems.
- Execute row count reconciliation across all migrated tables:
-- Source count (run on source database) SELECT 'beneficiaries' AS table_name, COUNT(*) AS source_count FROM beneficiaries WHERE registration_date >= '2020-01-01';
-- Target count (run on target database) SELECT 'beneficiaries' AS table_name, COUNT(*) AS target_count FROM beneficiaries;Calculate expected variance based on transformation rules:
Source count: 1,247,893 Rejected (nulls): -2,341 Deduplicated: -1,456 Expected target: 1,244,096 Actual target: 1,244,096 Variance: 0 ✓- Execute aggregate validation to detect systematic transformation errors:
-- Source aggregates SELECT location_code, COUNT(*) AS record_count, MIN(registration_date) AS earliest_date, MAX(registration_date) AS latest_date FROM beneficiaries WHERE registration_date >= '2020-01-01' GROUP BY location_code;
-- Target aggregates (location codes will differ due to mapping) SELECT location_code, COUNT(*) AS record_count, MIN(registration_date) AS earliest_date, MAX(registration_date) AS latest_date FROM beneficiaries GROUP BY location_code;- Execute sample-based record validation. Select a random sample and compare field values:
import pandas as pd import random
# Load source IDs and select random sample source_ids = pd.read_sql( "SELECT id FROM beneficiaries WHERE registration_date >= '2020-01-01'", source_connection ) sample_ids = random.sample(list(source_ids['id']), min(1000, len(source_ids)))
# Fetch source records source_sample = pd.read_sql( f"SELECT * FROM beneficiaries WHERE id IN ({','.join(map(str, sample_ids))})", source_connection )
# Fetch corresponding target records (using ID mapping table) target_sample = pd.read_sql( f"""SELECT t.* FROM beneficiaries t JOIN id_mapping m ON t.id = m.target_id WHERE m.source_id IN ({','.join(map(str, sample_ids))})""", target_connection )
# Compare field values after applying inverse transformation mismatches = [] for _, source_row in source_sample.iterrows(): target_row = target_sample[target_sample['source_id'] == source_row['id']] if len(target_row) == 0: mismatches.append({'source_id': source_row['id'], 'error': 'Missing in target'}) else: # Compare transformed values if source_row['beneficiary_name'] != target_row.iloc[0]['beneficiary_name']: mismatches.append({'source_id': source_row['id'], 'field': 'beneficiary_name', 'source': source_row['beneficiary_name'], 'target': target_row.iloc[0]['beneficiary_name']})
print(f"Sample validation: {len(mismatches)} mismatches out of {len(sample_ids)} records")- Execute checksum validation for critical fields:
-- Compare checksums between source and target -- Requires checksum stored during extraction and transformation SELECT s.id AS source_id, s.row_checksum AS source_checksum, t.target_checksum, CASE WHEN s.row_checksum = t.target_checksum THEN 'MATCH' ELSE 'MISMATCH' END AS status FROM source_checksums s JOIN target_checksums t ON s.id = t.source_id WHERE s.row_checksum != t.target_checksum;- Validate referential integrity in the target system:
-- Check for orphaned foreign keys SELECT b.id, b.programme_id FROM beneficiaries b LEFT JOIN programmes p ON b.programme_id = p.id WHERE p.id IS NULL;
-- Expected: 0 rows (all programme references valid)- Generate validation report documenting results:
validation_report = { 'migration_id': 'MIG-2024-001', 'execution_date': '2024-01-15', 'tables_migrated': 12, 'total_source_rows': 1247893, 'total_target_rows': 1244096, 'expected_variance': 3797, 'actual_variance': 0, 'sample_validation': {'sample_size': 1000, 'mismatches': 0}, 'referential_integrity': 'PASSED', 'status': 'VALIDATED' }
import json with open('validation_report.json', 'w') as f: json.dump(validation_report, f, indent=2)Phase 5: Cutover
Cutover transitions production operations from source to target systems. The cutover approach determines acceptable downtime and risk exposure.
Big bang cutover migrates all data and switches all users at once. This approach minimises the synchronisation complexity but requires a maintenance window during which neither system is available for updates. Use big bang cutover when downtime of 4 to 24 hours is acceptable and the dataset can be fully migrated within that window.
+------------------------------------------------------------------+| BIG BANG CUTOVER TIMELINE |+------------------------------------------------------------------+| || T-4h T-2h T=0 T+2h T+4h || | | | | | || v v v v v || +-------+ +-------+ +-------+ +-------+ +-------+ || |Freeze | |Extract| |Load | |Valid- | |Go-Live| || |Source | |Delta | |Delta | |ate | |Target | || +-------+ +-------+ +-------+ +-------+ +-------+ || | | || +-------------------------------------------+ || Source system read-only || |+------------------------------------------------------------------+Figure 1: Big bang cutover with 4-hour maintenance window
Phased cutover migrates data incrementally, typically by organisational unit, geographic region, or functional area. Users transition in groups while both systems operate simultaneously. This approach extends the migration period but reduces risk by limiting the blast radius of issues.
Parallel run cutover operates both systems simultaneously with data synchronised between them. Users can fall back to the source system if target system issues arise. This approach requires bidirectional synchronisation capability and doubles operational overhead during the parallel period.
- Execute final delta extraction capturing changes since the initial extraction:
-- Extract records modified since initial extraction SELECT * FROM beneficiaries WHERE updated_at > '2024-01-15 00:00:00' -- Initial extraction timestamp ORDER BY updated_at;Apply delta transformations using the same rules as the initial transformation.
Load delta records using upsert logic to handle both inserts and updates:
-- PostgreSQL upsert INSERT INTO beneficiaries (id, beneficiary_name, registration_date, location_code, programme_id) SELECT id, beneficiary_name, registration_date, location_code, programme_id FROM beneficiaries_delta ON CONFLICT (id) DO UPDATE SET beneficiary_name = EXCLUDED.beneficiary_name, registration_date = EXCLUDED.registration_date, location_code = EXCLUDED.location_code, programme_id = EXCLUDED.programme_id, updated_at = NOW();Validate delta load completeness.
Execute application cutover by updating connection strings, DNS records, or load balancer configuration:
# Update application configuration sed -i 's/source-db.example.org/target-db.example.org/g' /etc/app/database.conf
# Restart application to pick up new configuration systemctl restart application
# Verify application connects to target curl -s http://localhost:8080/health | jq '.database.host' # Expected: "target-db.example.org"- Monitor target system for errors during initial production traffic:
# Watch application logs for database errors tail -f /var/log/application/error.log | grep -i "database\|sql\|connection"
# Monitor target database for unusual activity psql -h target-db.example.org -U monitor -d target_db \ -c "SELECT pid, query, state, wait_event FROM pg_stat_activity WHERE state != 'idle';"Rollback
Maintain rollback capability until the migration is formally accepted. Rollback returns operations to the source system, abandoning target system changes.
Document rollback decision criteria before cutover. Define thresholds that trigger rollback:
Condition Threshold Action Data validation failures More than 0.1% of records Immediate rollback Application errors Error rate exceeds 5% Investigate, rollback if not resolved in 30 minutes Performance degradation Response time exceeds 2x baseline Investigate, rollback if not resolved in 1 hour User-reported issues More than 10 critical issues Investigate, rollback if not resolved in 2 hours Execute rollback by reverting connection configuration:
# Revert to source database sed -i 's/target-db.example.org/source-db.example.org/g' /etc/app/database.conf systemctl restart application
# Verify application connects to source curl -s http://localhost:8080/health | jq '.database.host' # Expected: "source-db.example.org"- If rollback occurs after users have modified data in the target system, extract those changes for manual reconciliation:
-- Extract records created or modified in target during production use SELECT * FROM beneficiaries WHERE created_at > '2024-01-15 12:00:00' -- Cutover timestamp OR updated_at > '2024-01-15 12:00:00';- Document rollback in the migration log with root cause analysis.
Verification
After cutover and before decommissioning the source system, execute final verification to confirm migration success.
Run production validation queries comparing live data between systems:
-- Final row count comparisonSELECT (SELECT COUNT(*) FROM source_db.beneficiaries) AS source_count, (SELECT COUNT(*) FROM target_db.beneficiaries) AS target_count;Verify application functionality through user acceptance testing. Confirm that key workflows complete successfully:
# Execute automated acceptance tests against target system./run_acceptance_tests.sh --environment=production --database=target
# Expected output:# Tests run: 147# Passed: 147# Failed: 0Confirm reporting and analytics produce consistent results. Compare a representative set of reports generated from both source and target data:
# Generate comparison reportsource_report = generate_monthly_summary(source_connection, '2024-01')target_report = generate_monthly_summary(target_connection, '2024-01')
# Compare key metricsfor metric in ['total_beneficiaries', 'active_programmes', 'registrations_this_month']: source_value = source_report[metric] target_value = target_report[metric] variance_pct = abs(source_value - target_value) / source_value * 100 status = 'PASS' if variance_pct < 0.01 else 'FAIL' print(f"{metric}: Source={source_value}, Target={target_value}, Variance={variance_pct:.4f}%, Status={status}")Obtain formal sign-off from data owners confirming migration acceptance. Document the acceptance in the migration record:
{ "migration_id": "MIG-2024-001", "acceptance_date": "2024-01-18", "accepted_by": [ {"name": "Data Owner Name", "role": "Programme Data Owner", "date": "2024-01-18"}, {"name": "Technical Lead Name", "role": "IT Technical Lead", "date": "2024-01-18"} ], "source_decommission_approved": true, "retention_period_days": 90}Troubleshooting
| Symptom | Cause | Resolution |
|---|---|---|
| Extraction query times out | Query lacks appropriate indexes or scans full table | Add index on extraction filter columns; extract in date-range batches of 100,000 rows |
| Extraction produces fewer rows than expected | WHERE clause excludes valid records or connection drops during extraction | Review filter criteria; implement extraction checkpointing to resume from last successful batch |
| Transformation runs out of memory | Dataset exceeds available RAM | Switch from pandas to Spark or database-based transformation; process in chunks of 1 million rows |
| Transformation produces duplicate target records | Deduplication logic fails on edge cases | Add explicit deduplication step after all transformations; sort by deterministic key before deduplicating |
| Character encoding errors during transformation | Source and target use different encodings | Explicitly specify encoding in read operations; convert to UTF-8 during extraction |
| Load fails with constraint violation | Target constraints stricter than source or transformation introduced invalid values | Temporarily disable constraints; load data; identify violations with validation queries; remediate or reject |
| Load performance degrades over time | Indexes maintained during load; transaction log growth | Disable indexes before load, rebuild after; increase checkpoint frequency; batch commits every 100,000 rows |
| Target row count lower than expected | Records rejected during load or transformation removed more than documented | Compare transformation output count to load count; check database error logs for rejected rows |
| Checksum validation fails for specific records | Transformation applied inconsistently or source data changed between extraction and validation | Re-extract and re-transform affected records; implement change data capture for delta synchronisation |
| Referential integrity validation fails | Parent records not migrated before child records or mapping errors | Load parent tables first; verify foreign key mappings; correct mapping table entries |
| Application errors after cutover | Connection pooling issues, query syntax differences, or missing data | Review application logs; compare query execution plans; verify all dependent data migrated |
| Performance degradation in target | Missing indexes, outdated statistics, or query plan differences | Rebuild indexes; run ANALYZE; compare query plans between source and target; add missing indexes |
| Reports show different totals | Aggregation logic differences or timezone handling | Compare raw data underlying reports; verify transformation of date/time fields; check for precision loss |
Data loss prevention
Never drop source tables or disable source system access until formal migration acceptance is documented and the retention period specified in the migration plan has elapsed. Minimum retention is 90 days after cutover.