Skip to main content

Data Migration

Data migration transfers data from source systems to target systems as a one-time or time-bounded operation, distinct from ongoing data pipelines that move data continuously. You perform migrations when replacing applications, consolidating databases, moving to cloud platforms, or decommissioning legacy systems. A successful migration preserves data integrity, maintains business continuity, and provides rollback capability until the target system is validated.

Prerequisites

Before beginning migration, gather the following information and secure the necessary access.

Source system requirements

RequirementDetail
Read accessDatabase credentials with SELECT permissions on all migration-scope tables
Schema documentationEntity-relationship diagrams or data dictionary for source system
Data volumesRow counts and storage sizes for each table in scope
Data profiling resultsQuality assessment identifying nulls, duplicates, format variations
Change velocityRecords created or modified per hour during business operations
Maintenance windowsPeriods when source system changes can be frozen

Extract source row counts for baseline reconciliation:

-- PostgreSQL: Generate row counts for all tables in schema
SELECT
schemaname,
relname AS table_name,
n_live_tup AS row_count
FROM pg_stat_user_tables
WHERE schemaname = 'public'
ORDER BY n_live_tup DESC;
-- MySQL: Generate row counts for all tables in database
SELECT
table_name,
table_rows AS estimated_row_count,
ROUND(data_length / 1024 / 1024, 2) AS data_size_mb
FROM information_schema.tables
WHERE table_schema = 'production_db'
ORDER BY table_rows DESC;

Target system requirements

RequirementDetail
Write accessDatabase credentials with INSERT, UPDATE, DELETE, and DDL permissions
Schema deployedTarget tables, indexes, and constraints created (constraints may be disabled during load)
Storage capacity2.5x the source data size to accommodate staging, transformation, and final tables
Compute capacitySufficient CPU and memory for transformation processing
Network bandwidthMinimum 100 Mbps between source and target for datasets under 100 GB

Documentation and approvals

Obtain written sign-off from data owners for each dataset in scope. Document the migration scope in a migration specification that includes source-to-target field mappings, transformation rules, and acceptance criteria. The specification serves as the authoritative reference throughout the migration and forms the basis for validation queries.

Tools and environment

Install migration tooling in an environment with network access to both source and target systems:

Terminal window
# Install common migration tools
pip install pandas sqlalchemy psycopg2-binary pymysql --break-system-packages
# For large-scale migrations, install Apache Spark
# Requires Java 11+ runtime
wget https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
tar -xzf spark-3.5.0-bin-hadoop3.tgz
export SPARK_HOME=$(pwd)/spark-3.5.0-bin-hadoop3
export PATH=$SPARK_HOME/bin:$PATH

Verify connectivity to both systems:

Terminal window
# Test source connection (PostgreSQL example)
psql -h source-db.example.org -U migration_user -d source_db -c "SELECT 1;"
# Test target connection
psql -h target-db.example.org -U migration_user -d target_db -c "SELECT 1;"

Procedure

Phase 1: Extraction

  1. Create extraction queries for each source table. Include all columns required by the target schema plus any columns needed for transformation logic. Add a checksum column for row-level validation:
-- Extract with checksum for validation
SELECT
id,
beneficiary_name,
registration_date,
location_code,
programme_id,
created_at,
updated_at,
MD5(CONCAT_WS('|', id, beneficiary_name, registration_date, location_code, programme_id)) AS row_checksum
FROM beneficiaries
WHERE registration_date >= '2020-01-01';
  1. Execute extraction to staging files. For datasets under 10 million rows, CSV format with gzip compression balances portability and performance:
Terminal window
# Extract to compressed CSV
psql -h source-db.example.org -U migration_user -d source_db \
-c "\COPY (SELECT * FROM beneficiaries WHERE registration_date >= '2020-01-01') TO STDOUT WITH CSV HEADER" \
| gzip > beneficiaries_extract_$(date +%Y%m%d).csv.gz
# Record extraction metadata
echo "Extraction completed: $(date)" >> migration_log.txt
echo "Source row count: $(zcat beneficiaries_extract_*.csv.gz | wc -l)" >> migration_log.txt
  1. For datasets exceeding 10 million rows, use parallel extraction by partitioning on a distribution key:
Terminal window
# Parallel extraction using ID ranges (4 parallel streams)
for i in {0..3}; do
psql -h source-db.example.org -U migration_user -d source_db \
-c "\COPY (SELECT * FROM beneficiaries WHERE id % 4 = $i) TO STDOUT WITH CSV HEADER" \
| gzip > beneficiaries_part${i}_$(date +%Y%m%d).csv.gz &
done
wait
echo "Parallel extraction complete"
  1. Validate extraction completeness by comparing row counts:
Terminal window
# Count extracted rows (subtract 1 for header)
EXTRACTED=$(zcat beneficiaries_extract_*.csv.gz | wc -l)
EXTRACTED=$((EXTRACTED - 1))
# Compare to source count
SOURCE_COUNT=$(psql -h source-db.example.org -U migration_user -d source_db -t \
-c "SELECT COUNT(*) FROM beneficiaries WHERE registration_date >= '2020-01-01'")
if [ "$EXTRACTED" -eq "$SOURCE_COUNT" ]; then
echo "Extraction validated: $EXTRACTED rows"
else
echo "ERROR: Count mismatch. Source: $SOURCE_COUNT, Extracted: $EXTRACTED"
exit 1
fi

Phase 2: Transformation

Transformation converts extracted data to match target schema requirements. Create transformation scripts that are idempotent and produce identical output when run multiple times on the same input.

  1. Load extracted data into a transformation environment. Python with pandas handles datasets up to 5 GB in memory; larger datasets require Spark or database-based transformation:
import pandas as pd
import hashlib
# Load extracted data
df = pd.read_csv('beneficiaries_extract_20240115.csv.gz', compression='gzip')
print(f"Loaded {len(df)} rows")
# Document initial state
initial_count = len(df)
initial_nulls = df.isnull().sum().to_dict()
  1. Apply field-level transformations according to the mapping specification. Document each transformation with the business rule it implements:
# Transformation 1: Standardise location codes to ISO 3166-2
# Business rule: Legacy 4-digit codes map to ISO format per reference table
location_mapping = pd.read_csv('location_code_mapping.csv')
df = df.merge(location_mapping, left_on='location_code', right_on='legacy_code', how='left')
df['location_code'] = df['iso_code'].fillna(df['location_code'])
df = df.drop(columns=['legacy_code', 'iso_code'])
# Transformation 2: Convert registration_date from DD/MM/YYYY to ISO 8601
# Business rule: All dates stored in ISO 8601 format
df['registration_date'] = pd.to_datetime(df['registration_date'], format='%d/%m/%Y').dt.strftime('%Y-%m-%d')
# Transformation 3: Generate surrogate key for target system
# Business rule: Target uses UUID primary keys
import uuid
df['target_id'] = [str(uuid.uuid4()) for _ in range(len(df))]
# Transformation 4: Map programme IDs to new reference data
# Business rule: Programme consolidation merged 3 legacy programmes into 1
programme_mapping = {101: 'PROG-2024-001', 102: 'PROG-2024-001', 103: 'PROG-2024-001', 104: 'PROG-2024-002'}
df['programme_id'] = df['programme_id'].map(programme_mapping)
  1. Handle data quality issues discovered during profiling. Log all modifications for audit:
# Track modifications
modifications = []
# Handle null beneficiary names (reject - required field)
null_names = df[df['beneficiary_name'].isnull()]
if len(null_names) > 0:
modifications.append(f"Rejected {len(null_names)} rows with null beneficiary_name")
df = df[df['beneficiary_name'].notnull()]
# Handle duplicate source IDs (keep most recent by updated_at)
duplicates = df[df.duplicated(subset=['id'], keep=False)]
if len(duplicates) > 0:
df = df.sort_values('updated_at', ascending=False).drop_duplicates(subset=['id'], keep='first')
modifications.append(f"Deduplicated {len(duplicates)} rows, kept most recent")
# Handle invalid location codes (map to 'UNKNOWN' for manual review)
valid_locations = set(location_mapping['iso_code'].dropna())
invalid_locations = df[~df['location_code'].isin(valid_locations)]
if len(invalid_locations) > 0:
df.loc[~df['location_code'].isin(valid_locations), 'location_code'] = 'UNKNOWN'
df.loc[~df['location_code'].isin(valid_locations), 'requires_review'] = True
modifications.append(f"Flagged {len(invalid_locations)} rows with invalid location codes")
# Write modification log
with open('transformation_log.txt', 'w') as f:
f.write(f"Transformation completed: {pd.Timestamp.now()}\n")
f.write(f"Input rows: {initial_count}\n")
f.write(f"Output rows: {len(df)}\n")
for mod in modifications:
f.write(f" - {mod}\n")
  1. Recalculate checksums after transformation for target validation:
# Generate new checksum based on target field values
def generate_checksum(row):
values = '|'.join([str(row['target_id']), str(row['beneficiary_name']),
str(row['registration_date']), str(row['location_code']),
str(row['programme_id'])])
return hashlib.md5(values.encode()).hexdigest()
df['target_checksum'] = df.apply(generate_checksum, axis=1)
  1. Export transformed data for loading:
# Export to CSV for loading
output_columns = ['target_id', 'beneficiary_name', 'registration_date',
'location_code', 'programme_id', 'requires_review', 'target_checksum']
df[output_columns].to_csv('beneficiaries_transformed.csv', index=False)
print(f"Exported {len(df)} rows for loading")

Phase 3: Loading

Load transformed data into the target system. The loading approach depends on data volume, acceptable downtime, and target database capabilities.

  1. Disable constraints and indexes on target tables to accelerate bulk loading:
-- Disable foreign key checks (PostgreSQL)
ALTER TABLE beneficiaries DISABLE TRIGGER ALL;
-- Drop indexes (recreate after load)
DROP INDEX IF EXISTS idx_beneficiaries_location;
DROP INDEX IF EXISTS idx_beneficiaries_programme;
DROP INDEX IF EXISTS idx_beneficiaries_registration_date;
  1. Load data using the target database’s bulk import facility:
Terminal window
# PostgreSQL COPY (fastest for large datasets)
psql -h target-db.example.org -U migration_user -d target_db \
-c "\COPY beneficiaries(id, beneficiary_name, registration_date, location_code, programme_id, requires_review) FROM 'beneficiaries_transformed.csv' WITH CSV HEADER"
# Record load completion
echo "Load completed: $(date)" >> migration_log.txt

For datasets exceeding 50 GB, use parallel loading with table partitioning or multiple connections:

Terminal window
# Split transformed file for parallel loading
split -l 1000000 beneficiaries_transformed.csv beneficiaries_chunk_
# Load chunks in parallel (4 concurrent connections)
for chunk in beneficiaries_chunk_*; do
psql -h target-db.example.org -U migration_user -d target_db \
-c "\COPY beneficiaries FROM '$chunk' WITH CSV" &
# Limit concurrent connections
while [ $(jobs -r | wc -l) -ge 4 ]; do sleep 1; done
done
wait
  1. Re-enable constraints and rebuild indexes:
-- Re-enable triggers
ALTER TABLE beneficiaries ENABLE TRIGGER ALL;
-- Rebuild indexes
CREATE INDEX idx_beneficiaries_location ON beneficiaries(location_code);
CREATE INDEX idx_beneficiaries_programme ON beneficiaries(programme_id);
CREATE INDEX idx_beneficiaries_registration_date ON beneficiaries(registration_date);
-- Update statistics for query optimiser
ANALYZE beneficiaries;
  1. Verify load completeness:
-- Count loaded rows
SELECT COUNT(*) AS loaded_count FROM beneficiaries;
-- Compare to transformation output
-- Expected: matches row count from transformation phase

Phase 4: Validation

Validation confirms that migrated data matches source data and meets quality requirements. Run validation before decommissioning source systems or enabling production traffic on target systems.

  1. Execute row count reconciliation across all migrated tables:
-- Source count (run on source database)
SELECT 'beneficiaries' AS table_name, COUNT(*) AS source_count
FROM beneficiaries WHERE registration_date >= '2020-01-01';
-- Target count (run on target database)
SELECT 'beneficiaries' AS table_name, COUNT(*) AS target_count
FROM beneficiaries;

Calculate expected variance based on transformation rules:

Source count: 1,247,893
Rejected (nulls): -2,341
Deduplicated: -1,456
Expected target: 1,244,096
Actual target: 1,244,096
Variance: 0 ✓
  1. Execute aggregate validation to detect systematic transformation errors:
-- Source aggregates
SELECT
location_code,
COUNT(*) AS record_count,
MIN(registration_date) AS earliest_date,
MAX(registration_date) AS latest_date
FROM beneficiaries
WHERE registration_date >= '2020-01-01'
GROUP BY location_code;
-- Target aggregates (location codes will differ due to mapping)
SELECT
location_code,
COUNT(*) AS record_count,
MIN(registration_date) AS earliest_date,
MAX(registration_date) AS latest_date
FROM beneficiaries
GROUP BY location_code;
  1. Execute sample-based record validation. Select a random sample and compare field values:
import pandas as pd
import random
# Load source IDs and select random sample
source_ids = pd.read_sql(
"SELECT id FROM beneficiaries WHERE registration_date >= '2020-01-01'",
source_connection
)
sample_ids = random.sample(list(source_ids['id']), min(1000, len(source_ids)))
# Fetch source records
source_sample = pd.read_sql(
f"SELECT * FROM beneficiaries WHERE id IN ({','.join(map(str, sample_ids))})",
source_connection
)
# Fetch corresponding target records (using ID mapping table)
target_sample = pd.read_sql(
f"""SELECT t.* FROM beneficiaries t
JOIN id_mapping m ON t.id = m.target_id
WHERE m.source_id IN ({','.join(map(str, sample_ids))})""",
target_connection
)
# Compare field values after applying inverse transformation
mismatches = []
for _, source_row in source_sample.iterrows():
target_row = target_sample[target_sample['source_id'] == source_row['id']]
if len(target_row) == 0:
mismatches.append({'source_id': source_row['id'], 'error': 'Missing in target'})
else:
# Compare transformed values
if source_row['beneficiary_name'] != target_row.iloc[0]['beneficiary_name']:
mismatches.append({'source_id': source_row['id'], 'field': 'beneficiary_name',
'source': source_row['beneficiary_name'],
'target': target_row.iloc[0]['beneficiary_name']})
print(f"Sample validation: {len(mismatches)} mismatches out of {len(sample_ids)} records")
  1. Execute checksum validation for critical fields:
-- Compare checksums between source and target
-- Requires checksum stored during extraction and transformation
SELECT
s.id AS source_id,
s.row_checksum AS source_checksum,
t.target_checksum,
CASE WHEN s.row_checksum = t.target_checksum THEN 'MATCH' ELSE 'MISMATCH' END AS status
FROM source_checksums s
JOIN target_checksums t ON s.id = t.source_id
WHERE s.row_checksum != t.target_checksum;
  1. Validate referential integrity in the target system:
-- Check for orphaned foreign keys
SELECT b.id, b.programme_id
FROM beneficiaries b
LEFT JOIN programmes p ON b.programme_id = p.id
WHERE p.id IS NULL;
-- Expected: 0 rows (all programme references valid)
  1. Generate validation report documenting results:
validation_report = {
'migration_id': 'MIG-2024-001',
'execution_date': '2024-01-15',
'tables_migrated': 12,
'total_source_rows': 1247893,
'total_target_rows': 1244096,
'expected_variance': 3797,
'actual_variance': 0,
'sample_validation': {'sample_size': 1000, 'mismatches': 0},
'referential_integrity': 'PASSED',
'status': 'VALIDATED'
}
import json
with open('validation_report.json', 'w') as f:
json.dump(validation_report, f, indent=2)

Phase 5: Cutover

Cutover transitions production operations from source to target systems. The cutover approach determines acceptable downtime and risk exposure.

Big bang cutover migrates all data and switches all users at once. This approach minimises the synchronisation complexity but requires a maintenance window during which neither system is available for updates. Use big bang cutover when downtime of 4 to 24 hours is acceptable and the dataset can be fully migrated within that window.

+------------------------------------------------------------------+
| BIG BANG CUTOVER TIMELINE |
+------------------------------------------------------------------+
| |
| T-4h T-2h T=0 T+2h T+4h |
| | | | | | |
| v v v v v |
| +-------+ +-------+ +-------+ +-------+ +-------+ |
| |Freeze | |Extract| |Load | |Valid- | |Go-Live| |
| |Source | |Delta | |Delta | |ate | |Target | |
| +-------+ +-------+ +-------+ +-------+ +-------+ |
| | | |
| +-------------------------------------------+ |
| Source system read-only |
| |
+------------------------------------------------------------------+

Figure 1: Big bang cutover with 4-hour maintenance window

Phased cutover migrates data incrementally, typically by organisational unit, geographic region, or functional area. Users transition in groups while both systems operate simultaneously. This approach extends the migration period but reduces risk by limiting the blast radius of issues.

Parallel run cutover operates both systems simultaneously with data synchronised between them. Users can fall back to the source system if target system issues arise. This approach requires bidirectional synchronisation capability and doubles operational overhead during the parallel period.

  1. Execute final delta extraction capturing changes since the initial extraction:
-- Extract records modified since initial extraction
SELECT * FROM beneficiaries
WHERE updated_at > '2024-01-15 00:00:00' -- Initial extraction timestamp
ORDER BY updated_at;
  1. Apply delta transformations using the same rules as the initial transformation.

  2. Load delta records using upsert logic to handle both inserts and updates:

-- PostgreSQL upsert
INSERT INTO beneficiaries (id, beneficiary_name, registration_date, location_code, programme_id)
SELECT id, beneficiary_name, registration_date, location_code, programme_id
FROM beneficiaries_delta
ON CONFLICT (id) DO UPDATE SET
beneficiary_name = EXCLUDED.beneficiary_name,
registration_date = EXCLUDED.registration_date,
location_code = EXCLUDED.location_code,
programme_id = EXCLUDED.programme_id,
updated_at = NOW();
  1. Validate delta load completeness.

  2. Execute application cutover by updating connection strings, DNS records, or load balancer configuration:

Terminal window
# Update application configuration
sed -i 's/source-db.example.org/target-db.example.org/g' /etc/app/database.conf
# Restart application to pick up new configuration
systemctl restart application
# Verify application connects to target
curl -s http://localhost:8080/health | jq '.database.host'
# Expected: "target-db.example.org"
  1. Monitor target system for errors during initial production traffic:
Terminal window
# Watch application logs for database errors
tail -f /var/log/application/error.log | grep -i "database\|sql\|connection"
# Monitor target database for unusual activity
psql -h target-db.example.org -U monitor -d target_db \
-c "SELECT pid, query, state, wait_event FROM pg_stat_activity WHERE state != 'idle';"

Rollback

Maintain rollback capability until the migration is formally accepted. Rollback returns operations to the source system, abandoning target system changes.

  1. Document rollback decision criteria before cutover. Define thresholds that trigger rollback:

    ConditionThresholdAction
    Data validation failuresMore than 0.1% of recordsImmediate rollback
    Application errorsError rate exceeds 5%Investigate, rollback if not resolved in 30 minutes
    Performance degradationResponse time exceeds 2x baselineInvestigate, rollback if not resolved in 1 hour
    User-reported issuesMore than 10 critical issuesInvestigate, rollback if not resolved in 2 hours
  2. Execute rollback by reverting connection configuration:

Terminal window
# Revert to source database
sed -i 's/target-db.example.org/source-db.example.org/g' /etc/app/database.conf
systemctl restart application
# Verify application connects to source
curl -s http://localhost:8080/health | jq '.database.host'
# Expected: "source-db.example.org"
  1. If rollback occurs after users have modified data in the target system, extract those changes for manual reconciliation:
-- Extract records created or modified in target during production use
SELECT * FROM beneficiaries
WHERE created_at > '2024-01-15 12:00:00' -- Cutover timestamp
OR updated_at > '2024-01-15 12:00:00';
  1. Document rollback in the migration log with root cause analysis.

Verification

After cutover and before decommissioning the source system, execute final verification to confirm migration success.

Run production validation queries comparing live data between systems:

-- Final row count comparison
SELECT
(SELECT COUNT(*) FROM source_db.beneficiaries) AS source_count,
(SELECT COUNT(*) FROM target_db.beneficiaries) AS target_count;

Verify application functionality through user acceptance testing. Confirm that key workflows complete successfully:

Terminal window
# Execute automated acceptance tests against target system
./run_acceptance_tests.sh --environment=production --database=target
# Expected output:
# Tests run: 147
# Passed: 147
# Failed: 0

Confirm reporting and analytics produce consistent results. Compare a representative set of reports generated from both source and target data:

# Generate comparison report
source_report = generate_monthly_summary(source_connection, '2024-01')
target_report = generate_monthly_summary(target_connection, '2024-01')
# Compare key metrics
for metric in ['total_beneficiaries', 'active_programmes', 'registrations_this_month']:
source_value = source_report[metric]
target_value = target_report[metric]
variance_pct = abs(source_value - target_value) / source_value * 100
status = 'PASS' if variance_pct < 0.01 else 'FAIL'
print(f"{metric}: Source={source_value}, Target={target_value}, Variance={variance_pct:.4f}%, Status={status}")

Obtain formal sign-off from data owners confirming migration acceptance. Document the acceptance in the migration record:

{
"migration_id": "MIG-2024-001",
"acceptance_date": "2024-01-18",
"accepted_by": [
{"name": "Data Owner Name", "role": "Programme Data Owner", "date": "2024-01-18"},
{"name": "Technical Lead Name", "role": "IT Technical Lead", "date": "2024-01-18"}
],
"source_decommission_approved": true,
"retention_period_days": 90
}

Troubleshooting

SymptomCauseResolution
Extraction query times outQuery lacks appropriate indexes or scans full tableAdd index on extraction filter columns; extract in date-range batches of 100,000 rows
Extraction produces fewer rows than expectedWHERE clause excludes valid records or connection drops during extractionReview filter criteria; implement extraction checkpointing to resume from last successful batch
Transformation runs out of memoryDataset exceeds available RAMSwitch from pandas to Spark or database-based transformation; process in chunks of 1 million rows
Transformation produces duplicate target recordsDeduplication logic fails on edge casesAdd explicit deduplication step after all transformations; sort by deterministic key before deduplicating
Character encoding errors during transformationSource and target use different encodingsExplicitly specify encoding in read operations; convert to UTF-8 during extraction
Load fails with constraint violationTarget constraints stricter than source or transformation introduced invalid valuesTemporarily disable constraints; load data; identify violations with validation queries; remediate or reject
Load performance degrades over timeIndexes maintained during load; transaction log growthDisable indexes before load, rebuild after; increase checkpoint frequency; batch commits every 100,000 rows
Target row count lower than expectedRecords rejected during load or transformation removed more than documentedCompare transformation output count to load count; check database error logs for rejected rows
Checksum validation fails for specific recordsTransformation applied inconsistently or source data changed between extraction and validationRe-extract and re-transform affected records; implement change data capture for delta synchronisation
Referential integrity validation failsParent records not migrated before child records or mapping errorsLoad parent tables first; verify foreign key mappings; correct mapping table entries
Application errors after cutoverConnection pooling issues, query syntax differences, or missing dataReview application logs; compare query execution plans; verify all dependent data migrated
Performance degradation in targetMissing indexes, outdated statistics, or query plan differencesRebuild indexes; run ANALYZE; compare query plans between source and target; add missing indexes
Reports show different totalsAggregation logic differences or timezone handlingCompare raw data underlying reports; verify transformation of date/time fields; check for precision loss

Data loss prevention

Never drop source tables or disable source system access until formal migration acceptance is documented and the retention period specified in the migration plan has elapsed. Minimum retention is 90 days after cutover.

See also