Implementierung von logischer Replikation in PostgreSQL: Multi-Master und selektive Datensynchronisation

Richten Sie die logische PostgreSQL-Replikation für selektive Synchronisation, Upgrades, Berichtskopien und sorgfältig abgesteckte bidirektionale Designs ein.

Implementierung logischer Replikation in PostgreSQL: Multi-Master und selektive Datensynchronisation

Die logische Replikation in PostgreSQL hilft, wenn Sie ausgewählte Tabellen anstelle eines gesamten Clusters kopieren müssen. Sie ist nützlich für Berichtsdatenbanken, Versionsupgrades, regionale Lesekopien und selektive Datensynchronisation, aber die native logische PostgreSQL-Replikation ist kein schlüsselfertiges konfliktfreies Multi-Master-System.

Logische vs. physische Replikation

Physische Streaming-Replikation

  • Repliziert den gesamten Datenbankcluster
  • Replikation auf Binärebene
  • Schreibgeschützte Replikate
  • Gleiche PostgreSQL-Version erforderlich
  • Geringerer Overhead

Logische Replikation

  • Selektive Tabellen-/Zeilenreplikation
  • Versionsübergreifend kompatibel
  • Beschreibbare Abonnenten
  • Höherer Overhead
  • Flexible Datenverteilung

Anwendungsfälle für logische Replikation

  1. Selektive Datenverteilung: Replizieren Sie bestimmte Tabellen in verschiedene Regionen
  2. Bidirektionale Synchronisationsexperimente: Sorgfältig abgesteckte Schreibvorgänge zwischen Datenbanken, normalerweise mit Anwendungskonfliktregeln
  3. Versionsübergreifende Upgrades: Replizieren Sie von alten zu neuen PostgreSQL-Versionen
  4. Datenaggregation: Konsolidieren Sie Daten aus mehreren Quellen
  5. DSGVO-Konformität: Replizieren Sie nur nicht sensible Spalten

Voraussetzungen und Einrichtung

Konfigurationsanforderungen

Auf dem Publisher (Quelle):

# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10

Auf dem Abonnenten (Ziel):

# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16

PostgreSQL neu starten

sudo systemctl restart postgresql

Netzwerkkonfiguration

Stellen Sie sicher, dass die Datenbanken kommunizieren können:

# Testverbindung vom Abonnenten zum Publisher
psql -h publisher.example.com -U replication_user -d source_db

Konfigurieren Sie pg_hba.conf auf dem Publisher:

# Replikationsverbindungen zulassen
host    source_db    replication_user    subscriber_ip/32    scram-sha-256

Grundlegende Einrichtung der logischen Replikation

Schritt 1: Replikationsbenutzer erstellen

Auf dem Publisher:

CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;

Schritt 2: Quelltabellen erstellen

Auf dem Publisher:

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

INSERT INTO users (username, email) VALUES 
    ('alice', '[email protected]'),
    ('bob', '[email protected]');

Schritt 3: Publikation erstellen

Auf dem Publisher:

-- Alle Tabellen veröffentlichen
CREATE PUBLICATION my_publication FOR ALL TABLES;

-- Oder bestimmte Tabellen veröffentlichen
CREATE PUBLICATION my_publication FOR TABLE users, orders;

-- Oder mit Zeilenfiltern auf veröffentlichten Tabellen
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');

Publikationen anzeigen:

SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;

Schritt 4: Replikatabellen erstellen

Auf dem Abonnenten:

-- Tabellen müssen identische Struktur haben
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

Schritt 5: Abonnement erstellen

Auf dem Abonnenten:

CREATE SUBSCRIPTION my_subscription
    CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
    PUBLICATION my_publication;

Schritt 6: Replikation überprüfen

Auf dem Publisher:

SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;

Auf dem Abonnenten:

SELECT * FROM pg_stat_subscription;
SELECT * FROM users;  -- Sollte replizierte Daten anzeigen

Erweiterte Konfiguration

Spaltenebene-Replikation (PostgreSQL 15+)

Nur bestimmte Spalten replizieren:

-- Auf dem Publisher: Nur nicht sensible Spalten replizieren
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100),
    credit_card VARCHAR(20),  -- Wird nicht repliziert
    created_at TIMESTAMP
);

CREATE PUBLICATION customer_basic 
    FOR TABLE customers (id, name, email, created_at);

Zeilenfilterung

Nur aktive Datensätze replizieren:

CREATE PUBLICATION active_data 
    FOR TABLE orders WHERE (status IN ('pending', 'processing'));

Regionale Datenverteilung:

CREATE PUBLICATION us_customers 
    FOR TABLE customers WHERE (country = 'US');

CREATE PUBLICATION eu_customers 
    FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));

Mehrere Publikationen

-- Publisher: Mehrere Publikationen erstellen
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;

-- Abonnent: Mehrere Publikationen abonnieren
CREATE SUBSCRIPTION multi_sub
    CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
    PUBLICATION oltp_data, analytics_data;

Fallstricke bei bidirektionaler Replikation

Die native logische Replikation kann in beide Richtungen verdrahtet werden, aber PostgreSQL führt keine automatische Zusammenführung widersprüchlicher Schreibvorgänge durch. Verwenden Sie dieses Muster nur, wenn jede Zeile einen einzelnen Schreiber hat, Schlüssel nicht kollidieren können und Ihre Anwendung Konflikte behandeln kann.

Beispiel für ein zweiseitiges Synchronisationsgerüst

Datenbank A Konfiguration:

-- Publikation erstellen
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;

-- Datenbank B abonnieren
CREATE SUBSCRIPTION db_a_sub
    CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
    PUBLICATION db_b_pub
    WITH (origin = none);  -- Verhindert Replikationsschleifen

Datenbank B Konfiguration:

-- Publikation erstellen
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;

-- Datenbank A abonnieren
CREATE SUBSCRIPTION db_b_sub
    CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
    PUBLICATION db_a_pub
    WITH (origin = none);

Konflikthandhabung

Die logische Replikation bietet keine automatische "Letzter Schreibvorgang gewinnt"-Konfliktauflösung. Widersprüchliche Einfügungen, fehlende Zeilen während Aktualisierungen, Einschränkungsverletzungen oder doppelte Schlüssel können die Anwendung von Workern stoppen, bis Sie das Datenproblem behoben haben.

-- Replikatidentität setzen, um Konflikte zu verfolgen
ALTER TABLE shared_table REPLICA IDENTITY FULL;

Strategien zur Konfliktreduzierung:

  1. Zeitstempelbasierte Besitzprüfungen: Fügen Sie updated_at und eine Schreiberkennung hinzu, damit Ihre Anwendung veraltete Schreibvorgänge ablehnen kann.
CREATE TABLE shared_table (
    id SERIAL PRIMARY KEY,
    data TEXT,
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER update_shared_table_timestamp
    BEFORE UPDATE ON shared_table
    FOR EACH ROW
    EXECUTE FUNCTION update_timestamp();
  1. Versionsnummerierung: Erhöhen Sie eine Version in der Anwendung und lehnen Sie Aktualisierungen basierend auf veralteten Versionen ab.
CREATE TABLE shared_table (
    id SERIAL PRIMARY KEY,
    data TEXT,
    version INTEGER DEFAULT 1
);

Optionen für die anfängliche Datensynchronisation

Option 1: Automatisches Kopieren (Standard)

-- Abonnent kopiert automatisch vorhandene Daten
CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = true);  -- Standard

Option 2: Manuelle anfängliche Synchronisation

Für große Datensätze pg_dump verwenden:

# Bestimmte Tabellen vom Publisher dumpen
pg_dump -h publisher.example.com -U postgres -d source_db \
    -t users -t orders --no-owner --no-acl > initial_data.sql

# In Abonnenten laden
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql

# Abonnement ohne anfängliches Kopieren erstellen
psql -h subscriber.example.com -U postgres -d target_db -c "
    CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=source_db user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = false);
"

Option 3: Große laufende Transaktionen streamen

-- Große laufende Transaktionen streamen, anstatt auf den Commit zu warten.
CREATE SUBSCRIPTION fast_sync
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (
        copy_data = true,
        streaming = on,
        synchronous_commit = off
    );

Überwachung der logischen Replikation

Publisher-Überwachung

-- Replikationsslots anzeigen
SELECT 
    slot_name,
    plugin,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;

-- Aktive Replikationsverbindungen anzeigen
SELECT 
    pid,
    usename,
    application_name,
    client_addr,
    state,
    sync_state,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;

Abonnenten-Überwachung

-- Abonnementstatus anzeigen
SELECT 
    subname,
    pid,
    received_lsn,
    latest_end_lsn,
    pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;

-- Auf Replikationsfehler prüfen
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';

Überwachungsskript

#!/bin/bash
# logical-replication-monitor.sh

echo "=== Publisher-Status ==="
psql -h publisher -d mydb -c "
    SELECT slot_name, active, 
           pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
    FROM pg_replication_slots;"

echo ""
echo "=== Abonnenten-Status ==="
psql -h subscriber -d mydb -c "
    SELECT subname, pid, 
           pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
    FROM pg_stat_subscription;"

Fehlerbehebung

Problem 1: Abonnement empfängt keine Daten

Abonnementstatus prüfen:

SELECT subname, subenabled, subconninfo FROM pg_subscription;
SELECT subname, pid, received_lsn, latest_end_lsn FROM pg_stat_subscription;

Abonnement aktivieren, wenn deaktiviert:

ALTER SUBSCRIPTION my_sub ENABLE;

Auf Fehler prüfen:

SELECT * FROM pg_stat_subscription;

Problem 2: Replikationsverzögerung wächst

Langsame Tabellen identifizieren:

SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;

Worker-Prozesse erhöhen:

# postgresql.conf auf dem Abonnenten
max_logical_replication_workers = 20
max_worker_processes = 30

Problem 3: Replikationsslot-Aufblähung

Slot-Nutzung prüfen:

SELECT 
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;

Inaktive Slots löschen:

SELECT pg_drop_replication_slot('inactive_slot_name');

Problem 4: Fehler bei der anfänglichen Synchronisation

Anfängliche Synchronisation neu starten:

-- Abonnement löschen und neu erstellen
DROP SUBSCRIPTION my_sub;

CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = true);

Leistungsoptimierung

1. Geeignete Replikatidentität verwenden

-- Standard: Nur PRIMARY KEY
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;

-- Vollständig: Alle Spalten (höherer Overhead)
ALTER TABLE my_table REPLICA IDENTITY FULL;

-- Index: Bestimmten eindeutigen Index verwenden
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;

2. Paralleles Anwenden für große Transaktionen

CREATE SUBSCRIPTION parallel_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (streaming = parallel);

Best Practices

  1. Immer Replikatidentität verwenden: Stellen Sie sicher, dass Tabellen PRIMARY KEY oder UNIQUE-Einschränkung haben
  2. Replikationsverzögerung überwachen: Richten Sie Warnungen basierend auf Ihren Wiederherstellungsanforderungen und dem WAL-Aufbewahrungsbudget ein
  3. Zeilenfilter sorgfältig verwenden: Filter werden auf dem Publisher ausgewertet und beeinflussen die Leistung
  4. Konflikte planen: Implementieren Sie Konflikterkennung, bevor Sie bidirektionale Schreibvorgänge versuchen
  5. Failover-Verfahren testen: Üben Sie das Wechseln zwischen Datenbanken
  6. Regelmäßige Wartung: Bereinigen Sie alte Replikationsslots

Fazit

Beginnen Sie mit der einseitigen logischen Replikation. Sie ist zuverlässig für selektive Tabellensynchronisation, Berichtskopien und viele Upgrade-Workflows. Behandeln Sie die bidirektionale Replikation als fortgeschrittenes Designproblem, nicht als einfache Option: Definieren Sie Zeilenbesitz, Schlüsselgenerierung und Konflikthandhabung, bevor Sie auf beide Seiten schreiben.