Implementando Replicação Lógica no PostgreSQL: Multi-Master e Sincronização Seletiva de Dados

Configure a replicação lógica do PostgreSQL para sincronização seletiva, upgrades, cópias de relatórios e designs bidirecionais cuidadosamente escopados.

Implementando Replicação Lógica no PostgreSQL: Sincronização Seletiva e Multi-Mestre

A replicação lógica no PostgreSQL ajuda quando você precisa copiar tabelas selecionadas em vez de um cluster inteiro. É útil para bancos de dados de relatórios, upgrades de versão, cópias de leitura regionais e sincronização seletiva de dados, mas a replicação lógica nativa do PostgreSQL não é um sistema multi-mestre sem conflitos pronto para uso.

Replicação Lógica vs Física

Replicação Física por Streaming

  • Replica todo o cluster de banco de dados
  • Replicação em nível binário
  • Réplicas somente leitura
  • Requer mesma versão do PostgreSQL
  • Menor overhead

Replicação Lógica

  • Replicação seletiva de tabelas/linhas
  • Compatível entre versões
  • Assinantes graváveis
  • Maior overhead
  • Distribuição flexível de dados

Casos de Uso para Replicação Lógica

  1. Distribuição Seletiva de Dados: Replicar tabelas específicas para diferentes regiões
  2. Experimentos de Sincronização Bidirecional: Gravações cuidadosamente escopadas entre bancos de dados, geralmente com regras de conflito em nível de aplicação
  3. Upgrades entre Versões: Replicar de versões antigas para novas do PostgreSQL
  4. Agregação de Dados: Consolidar dados de múltiplas fontes
  5. Conformidade com LGPD: Replicar apenas colunas não sensíveis

Pré-requisitos e Configuração

Requisitos de Configuração

No Publicador (Origem):

# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10

No Assinante (Destino):

# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16

Reiniciar o PostgreSQL

sudo systemctl restart postgresql

Configuração de Rede

Garanta que os bancos de dados possam se comunicar:

# Testar conexão do assinante ao publicador
psql -h publisher.example.com -U replication_user -d source_db

Configurar pg_hba.conf no publicador:

# Permitir conexões de replicação
host    source_db    replication_user    subscriber_ip/32    scram-sha-256

Configuração Básica de Replicação Lógica

Passo 1: Criar Usuário de Replicação

No Publicador:

CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;

Passo 2: Criar Tabelas de Origem

No Publicador:

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

INSERT INTO users (username, email) VALUES 
    ('alice', '[email protected]'),
    ('bob', '[email protected]');

Passo 3: Criar Publicação

No Publicador:

-- Publicar todas as tabelas
CREATE PUBLICATION my_publication FOR ALL TABLES;

-- Ou publicar tabelas específicas
CREATE PUBLICATION my_publication FOR TABLE users, orders;

-- Ou com filtros de linha nas tabelas publicadas
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');

Visualizar publicações:

SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;

Passo 4: Criar Tabelas Réplica

No Assinante:

-- As tabelas devem ter estrutura idêntica
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

Passo 5: Criar Assinatura

No Assinante:

CREATE SUBSCRIPTION my_subscription
    CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
    PUBLICATION my_publication;

Passo 6: Verificar Replicação

No Publicador:

SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;

No Assinante:

SELECT * FROM pg_stat_subscription;
SELECT * FROM users;  -- Deve mostrar dados replicados

Configuração Avançada

Replicação em Nível de Coluna (PostgreSQL 15+)

Replicar apenas colunas específicas:

-- No Publicador: Replicar apenas colunas não sensíveis
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100),
    credit_card VARCHAR(20),  -- Não será replicado
    created_at TIMESTAMP
);

CREATE PUBLICATION customer_basic 
    FOR TABLE customers (id, name, email, created_at);

Filtragem de Linhas

Replicar apenas registros ativos:

CREATE PUBLICATION active_data 
    FOR TABLE orders WHERE (status IN ('pending', 'processing'));

Distribuição regional de dados:

CREATE PUBLICATION us_customers 
    FOR TABLE customers WHERE (country = 'US');

CREATE PUBLICATION eu_customers 
    FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));

Múltiplas Publicações

-- Publicador: Criar múltiplas publicações
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;

-- Assinante: Assinar múltiplas publicações
CREATE SUBSCRIPTION multi_sub
    CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
    PUBLICATION oltp_data, analytics_data;

Advertências sobre Replicação Bidirecional

A replicação lógica nativa pode ser configurada em ambas as direções, mas o PostgreSQL não mescla automaticamente gravações conflitantes. Use este padrão apenas quando cada linha tiver um único escritor, chaves não puderem colidir e sua aplicação puder lidar com conflitos.

Exemplo de Estrutura de Sincronização Bidirecional

Configuração do Banco de Dados A:

-- Criar publicação
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;

-- Assinar o Banco de Dados B
CREATE SUBSCRIPTION db_a_sub
    CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
    PUBLICATION db_b_pub
    WITH (origin = none);  -- Previne loops de replicação

Configuração do Banco de Dados B:

-- Criar publicação
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;

-- Assinar o Banco de Dados A
CREATE SUBSCRIPTION db_b_sub
    CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
    PUBLICATION db_a_pub
    WITH (origin = none);

Tratamento de Conflitos

A replicação lógica não fornece resolução automática de conflitos "última gravação vence". Inserções conflitantes, linhas ausentes durante atualizações, violações de restrição ou chaves duplicadas podem parar os workers de aplicação até que você corrija o problema de dados.

-- Definir identidade da réplica para rastrear conflitos
ALTER TABLE shared_table REPLICA IDENTITY FULL;

Estratégias de redução de conflitos:

  1. Verificações de propriedade baseadas em timestamp: Adicione updated_at e um identificador de escritor para que sua aplicação possa rejeitar gravações desatualizadas.
CREATE TABLE shared_table (
    id SERIAL PRIMARY KEY,
    data TEXT,
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER update_shared_table_timestamp
    BEFORE UPDATE ON shared_table
    FOR EACH ROW
    EXECUTE FUNCTION update_timestamp();
  1. Numeração de versão: Incremente uma versão na aplicação e rejeite atualizações baseadas em versões desatualizadas.
CREATE TABLE shared_table (
    id SERIAL PRIMARY KEY,
    data TEXT,
    version INTEGER DEFAULT 1
);

Opções de Sincronização Inicial de Dados

Opção 1: Cópia Automática (Padrão)

-- Assinante copia automaticamente os dados existentes
CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = true);  -- Padrão

Opção 2: Sincronização Inicial Manual

Para grandes conjuntos de dados, use pg_dump:

# Exportar tabelas específicas do publicador
pg_dump -h publisher.example.com -U postgres -d source_db \
    -t users -t orders --no-owner --no-acl > initial_data.sql

# Carregar no assinante
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql

# Criar assinatura sem cópia inicial
psql -h subscriber.example.com -U postgres -d target_db -c "
    CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=source_db user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = false);
"

Opção 3: Transmitir Grandes Transações em Andamento

-- Transmitir grandes transações em andamento em vez de esperar até o commit.
CREATE SUBSCRIPTION fast_sync
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (
        copy_data = true,
        streaming = on,
        synchronous_commit = off
    );

Monitorando a Replicação Lógica

Monitoramento do Publicador

-- Visualizar slots de replicação
SELECT 
    slot_name,
    plugin,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;

-- Visualizar conexões de replicação ativas
SELECT 
    pid,
    usename,
    application_name,
    client_addr,
    state,
    sync_state,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;

Monitoramento do Assinante

-- Visualizar status da assinatura
SELECT 
    subname,
    pid,
    received_lsn,
    latest_end_lsn,
    pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;

-- Verificar erros de replicação
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';

Script de Monitoramento

#!/bin/bash
# logical-replication-monitor.sh

echo "=== Status do Publicador ==="
psql -h publisher -d mydb -c "
    SELECT slot_name, active, 
           pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
    FROM pg_replication_slots;"

echo ""
echo "=== Status do Assinante ==="
psql -h subscriber -d mydb -c "
    SELECT subname, pid, 
           pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
    FROM pg_stat_subscription;"

Solução de Problemas

Problema 1: Assinatura Não Está Recebendo Dados

Verificar status da assinatura:

SELECT subname, subenabled, subconninfo FROM pg_subscription;
SELECT subname, pid, received_lsn, latest_end_lsn FROM pg_stat_subscription;

Habilitar assinatura se desabilitada:

ALTER SUBSCRIPTION my_sub ENABLE;

Verificar erros:

SELECT * FROM pg_stat_subscription;

Problema 2: Atraso de Replicação Crescendo

Identificar tabelas lentas:

SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;

Aumentar processos de worker:

# postgresql.conf no assinante
max_logical_replication_workers = 20
max_worker_processes = 30

Problema 3: Inchaço do Slot de Replicação

Verificar uso do slot:

SELECT 
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;

Remover slots inativos:

SELECT pg_drop_replication_slot('inactive_slot_name');

Problema 4: Falha na Sincronização Inicial

Reiniciar sincronização inicial:

-- Remover e recriar assinatura
DROP SUBSCRIPTION my_sub;

CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = true);

Otimização de Performance

1. Usar Identidade de Réplica Apropriada

-- Padrão: Apenas PRIMARY KEY
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;

-- Completa: Todas as colunas (maior overhead)
ALTER TABLE my_table REPLICA IDENTITY FULL;

-- Índice: Usar índice único específico
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;

2. Aplicação Paralela para Grandes Transações

CREATE SUBSCRIPTION parallel_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (streaming = parallel);

Melhores Práticas

  1. Sempre usar identidade de réplica: Garanta que as tabelas tenham PRIMARY KEY ou restrição UNIQUE
  2. Monitorar atraso de replicação: Configure alertas com base nas suas necessidades de recuperação e orçamento de retenção de WAL
  3. Usar filtros de linha com cuidado: Filtros são avaliados no publicador, afetando a performance
  4. Planejar para conflitos: Implemente detecção de conflitos antes de tentar gravações bidirecionais
  5. Testar procedimentos de failover: Pratique a troca entre bancos de dados
  6. Manutenção regular: Limpe slots de replicação antigos

Conclusão

Comece com replicação lógica unidirecional. Ela é confiável para sincronização seletiva de tabelas, cópias de relatórios e muitos fluxos de trabalho de upgrade. Trate a replicação bidirecional como um problema de design avançado, não como uma caixa de seleção: defina a propriedade das linhas, geração de chaves e tratamento de conflitos antes de escrever em ambos os lados.