Implementación de la replicación lógica en PostgreSQL: multimaestro y sincronización selectiva de datos
Configura replicación lógica de PostgreSQL para sincronización selectiva, actualizaciones, copias de informes y diseños bidireccionales cuidadosamente delimitados.
Implementación de Replicación Lógica en PostgreSQL: Sincronización Multimaestro y Selectiva de Datos
La replicación lógica en PostgreSQL ayuda cuando necesitas copiar tablas seleccionadas en lugar de un clúster completo. Es útil para bases de datos de informes, actualizaciones de versión, copias de lectura regionales y sincronización selectiva de datos, pero la replicación lógica nativa de PostgreSQL no es un sistema multimaestro sin conflictos listo para usar.
Replicación Lógica vs Física
Replicación Física por Streaming
- Replica todo el clúster de bases de datos
- Replicación a nivel binario
- Réplicas de solo lectura
- Se requiere la misma versión de PostgreSQL
- Menor sobrecarga
Replicación Lógica
- Replicación selectiva de tablas/filas
- Compatible entre versiones
- Suscriptores con capacidad de escritura
- Mayor sobrecarga
- Distribución flexible de datos
Casos de Uso para Replicación Lógica
- Distribución Selectiva de Datos: Replicar tablas específicas a diferentes regiones
- Experimentos de Sincronización Bidireccional: Escrituras cuidadosamente delimitadas entre bases de datos, generalmente con reglas de conflicto a nivel de aplicación
- Actualizaciones entre Versiones: Replicar desde versiones antiguas a nuevas de PostgreSQL
- Agregación de Datos: Consolidar datos de múltiples fuentes
- Cumplimiento GDPR: Replicar solo columnas no sensibles
Requisitos Previos y Configuración
Requisitos de Configuración
En el Publicador (Origen):
# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
En el Suscriptor (Destino):
# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16
Reiniciar PostgreSQL
sudo systemctl restart postgresql
Configuración de Red
Asegúrate de que las bases de datos puedan comunicarse:
# Probar conexión desde el suscriptor al publicador
psql -h publisher.example.com -U replication_user -d source_db
Configurar pg_hba.conf en el publicador:
# Permitir conexiones de replicación
host source_db replication_user subscriber_ip/32 scram-sha-256
Configuración Básica de Replicación Lógica
Paso 1: Crear Usuario de Replicación
En el Publicador:
CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;
Paso 2: Crear Tablas de Origen
En el Publicador:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO users (username, email) VALUES
('alice', '[email protected]'),
('bob', '[email protected]');
Paso 3: Crear Publicación
En el Publicador:
-- Publicar todas las tablas
CREATE PUBLICATION my_publication FOR ALL TABLES;
-- O publicar tablas específicas
CREATE PUBLICATION my_publication FOR TABLE users, orders;
-- O con filtros de fila en tablas publicadas
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');
Ver publicaciones:
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;
Paso 4: Crear Tablas Réplica
En el Suscriptor:
-- Las tablas deben tener estructura idéntica
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
Paso 5: Crear Suscripción
En el Suscriptor:
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
PUBLICATION my_publication;
Paso 6: Verificar Replicación
En el Publicador:
SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;
En el Suscriptor:
SELECT * FROM pg_stat_subscription;
SELECT * FROM users; -- Debería ver los datos replicados
Configuración Avanzada
Replicación a Nivel de Columna (PostgreSQL 15+)
Replicar solo columnas específicas:
-- En el Publicador: Replicar solo columnas no sensibles
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
credit_card VARCHAR(20), -- No se replicará
created_at TIMESTAMP
);
CREATE PUBLICATION customer_basic
FOR TABLE customers (id, name, email, created_at);
Filtrado de Filas
Replicar solo registros activos:
CREATE PUBLICATION active_data
FOR TABLE orders WHERE (status IN ('pending', 'processing'));
Distribución regional de datos:
CREATE PUBLICATION us_customers
FOR TABLE customers WHERE (country = 'US');
CREATE PUBLICATION eu_customers
FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));
Múltiples Publicaciones
-- Publicador: Crear múltiples publicaciones
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;
-- Suscriptor: Suscribirse a múltiples publicaciones
CREATE SUBSCRIPTION multi_sub
CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
PUBLICATION oltp_data, analytics_data;
Advertencias sobre Replicación Bidireccional
La replicación lógica nativa puede configurarse en ambas direcciones, pero PostgreSQL no fusiona automáticamente escrituras conflictivas. Usa este patrón solo cuando cada fila tenga un único escritor, las claves no puedan colisionar y tu aplicación pueda manejar conflictos.
Ejemplo de Esqueleto de Sincronización Bidireccional
Configuración de Base de Datos A:
-- Crear publicación
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;
-- Suscribirse a Base de Datos B
CREATE SUBSCRIPTION db_a_sub
CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
PUBLICATION db_b_pub
WITH (origin = none); -- Previene bucles de replicación
Configuración de Base de Datos B:
-- Crear publicación
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;
-- Suscribirse a Base de Datos A
CREATE SUBSCRIPTION db_b_sub
CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
PUBLICATION db_a_pub
WITH (origin = none);
Manejo de Conflictos
La replicación lógica no proporciona resolución automática de conflictos "última escritura gana". Las inserciones conflictivas, filas faltantes durante actualizaciones, violaciones de restricciones o claves duplicadas pueden detener los trabajadores de aplicación hasta que arregles el problema de datos.
-- Establecer identidad de réplica para rastrear conflictos
ALTER TABLE shared_table REPLICA IDENTITY FULL;
Estrategias de reducción de conflictos:
- Verificaciones de propiedad basadas en marcas de tiempo: Agrega
updated_aty un identificador de escritor para que tu aplicación pueda rechazar escrituras obsoletas.
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_shared_table_timestamp
BEFORE UPDATE ON shared_table
FOR EACH ROW
EXECUTE FUNCTION update_timestamp();
- Numeración de versiones: Incrementa una versión en la aplicación y rechaza actualizaciones basadas en versiones obsoletas.
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
version INTEGER DEFAULT 1
);
Opciones de Sincronización Inicial de Datos
Opción 1: Copia Automática (Predeterminada)
-- El suscriptor copia automáticamente los datos existentes
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true); -- Predeterminado
Opción 2: Sincronización Inicial Manual
Para conjuntos de datos grandes, usa pg_dump:
# Volcar tablas específicas del publicador
pg_dump -h publisher.example.com -U postgres -d source_db \
-t users -t orders --no-owner --no-acl > initial_data.sql
# Cargar en el suscriptor
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql
# Crear suscripción sin copia inicial
psql -h subscriber.example.com -U postgres -d target_db -c "
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=source_db user=repuser'
PUBLICATION my_pub
WITH (copy_data = false);
"
Opción 3: Transmitir Transacciones Grandes en Progreso
-- Transmitir transacciones grandes en progreso en lugar de esperar hasta el commit.
CREATE SUBSCRIPTION fast_sync
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (
copy_data = true,
streaming = on,
synchronous_commit = off
);
Monitoreo de Replicación Lógica
Monitoreo del Publicador
-- Ver espacios de replicación
SELECT
slot_name,
plugin,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;
-- Ver conexiones de replicación activas
SELECT
pid,
usename,
application_name,
client_addr,
state,
sync_state,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;
Monitoreo del Suscriptor
-- Ver estado de la suscripción
SELECT
subname,
pid,
received_lsn,
latest_end_lsn,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;
-- Verificar errores de replicación
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';
Script de Monitoreo
#!/bin/bash
# logical-replication-monitor.sh
echo "=== Estado del Publicador ==="
psql -h publisher -d mydb -c "
SELECT slot_name, active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;"
echo ""
echo "=== Estado del Suscriptor ==="
psql -h subscriber -d mydb -c "
SELECT subname, pid,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;"
Solución de Problemas
Problema 1: La Suscripción No Recibe Datos
Verificar estado de la suscripción:
SELECT subname, subenabled, subconninfo FROM pg_subscription;
SELECT subname, pid, received_lsn, latest_end_lsn FROM pg_stat_subscription;
Habilitar suscripción si está deshabilitada:
ALTER SUBSCRIPTION my_sub ENABLE;
Verificar errores:
SELECT * FROM pg_stat_subscription;
Problema 2: Retraso de Replicación Creciente
Identificar tablas lentas:
SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;
Aumentar procesos de trabajo:
# postgresql.conf en el suscriptor
max_logical_replication_workers = 20
max_worker_processes = 30
Problema 3: Hinchazón del Espacio de Replicación
Verificar uso del espacio:
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
Eliminar espacios inactivos:
SELECT pg_drop_replication_slot('inactive_slot_name');
Problema 4: Fallo en la Sincronización Inicial
Reiniciar sincronización inicial:
-- Eliminar y recrear suscripción
DROP SUBSCRIPTION my_sub;
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true);
Optimización del Rendimiento
1. Usar Identidad de Réplica Apropiada
-- Predeterminado: Solo CLAVE PRIMARIA
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;
-- Completa: Todas las columnas (mayor sobrecarga)
ALTER TABLE my_table REPLICA IDENTITY FULL;
-- Índice: Usar índice único específico
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;
2. Aplicación Paralela para Transacciones Grandes
CREATE SUBSCRIPTION parallel_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (streaming = parallel);
Mejores Prácticas
- Usar siempre identidad de réplica: Asegúrate de que las tablas tengan CLAVE PRIMARIA o restricción ÚNICA
- Monitorear el retraso de replicación: Configura alertas basadas en tus necesidades de recuperación y presupuesto de retención de WAL
- Usar filtros de fila con cuidado: Los filtros se evalúan en el publicador, afectando el rendimiento
- Planificar para conflictos: Implementa detección de conflictos antes de intentar escrituras bidireccionales
- Probar procedimientos de conmutación por error: Practica el cambio entre bases de datos
- Mantenimiento regular: Limpia espacios de replicación antiguos
Conclusión
Comienza con replicación lógica unidireccional. Es confiable para sincronización selectiva de tablas, copias de informes y muchos flujos de trabajo de actualización. Trata la replicación bidireccional como un problema de diseño avanzado, no como una casilla de verificación: define la propiedad de las filas, la generación de claves y el manejo de conflictos antes de escribir en ambos lados.