Реализация логической репликации в PostgreSQL: Multi-Master и выборочная синхронизация данных
Введение
Логическая репликация в PostgreSQL позволяет реплицировать определенные таблицы, строки или даже столбцы между базами данных, обеспечивая сложные паттерны распределения данных. В отличие от физической потоковой репликации, которая копирует весь кластер базы данных, логическая репликация предоставляет детализированный контроль над тем, какие данные реплицируются и куда.
Логическая против физической репликации
Физическая потоковая репликация
- Реплицирует весь кластер базы данных
- Репликация на бинарном уровне
- Реплики только для чтения
- Требуется одна и та же версия PostgreSQL
- Более низкие накладные расходы
Логическая репликация
- Выборочная репликация таблиц/строк
- Совместимость между разными версиями
- Подписчики с возможностью записи
- Более высокие накладные расходы
- Гибкое распределение данных
Кейсы использования логической репликации
- Выборочное распределение данных: Репликация определенных таблиц в разные регионы
- Настройки Multi-Master: Несколько баз данных с возможностью записи и двунаправленной синхронизацией
- Обновление между версиями: Репликация со старых версий PostgreSQL на новые
- Агрегация данных: Консолидация данных из нескольких источников
- Соответствие GDPR: Репликация только нечувствительных столбцов
Предварительные требования и настройка
Требования к конфигурации
На издателе (Источник):
# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
На подписчике (Цель):
# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16
Перезапуск PostgreSQL
sudo systemctl restart postgresql
Сетевая конфигурация
Убедитесь, что базы данных могут взаимодействовать:
# Тест соединения от подписчика к издателю
psql -h publisher.example.com -U replication_user -d source_db
Настройте pg_hba.conf на издателе:
# Разрешить соединения для репликации
host all replication_user subscriber_ip/32 md5
Базовая настройка логической репликации
Шаг 1: Создание пользователя репликации
На издателе:
CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;
Шаг 2: Создание исходных таблиц
На издателе:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO users (username, email) VALUES
('alice', '[email protected]'),
('bob', '[email protected]');
Шаг 3: Создание публикации
На издателе:
-- Публикация всех таблиц
CREATE PUBLICATION my_publication FOR ALL TABLES;
-- Или публикация конкретных таблиц
CREATE PUBLICATION my_publication FOR TABLE users, orders;
-- Или с фильтрами строк (PostgreSQL 15+)
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');
Просмотр публикаций:
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;
Шаг 4: Создание таблиц реплики
На подписчике:
-- Таблицы должны иметь идентичную структуру
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
Шаг 5: Создание подписки
На подписчике:
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
PUBLICATION my_publication;
Шаг 6: Проверка репликации
На издателе:
SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;
На подписчике:
SELECT * FROM pg_stat_subscription;
SELECT * FROM users; -- Вы должны увидеть реплицированные данные
Расширенная конфигурация
Репликация на уровне столбцов (PostgreSQL 15+)
Репликация только определенных столбцов:
-- На издателе: Репликация только нечувствительных столбцов
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
credit_card VARCHAR(20), -- Не будет реплицироваться
created_at TIMESTAMP
);
CREATE PUBLICATION customer_basic
FOR TABLE customers (id, name, email, created_at);
Фильтрация строк
Репликация только активных записей:
CREATE PUBLICATION active_data
FOR TABLE orders WHERE (status IN ('pending', 'processing'));
Региональное распределение данных:
CREATE PUBLICATION us_customers
FOR TABLE customers WHERE (country = 'US');
CREATE PUBLICATION eu_customers
FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));
Несколько публикаций
-- Издатель: Создание нескольких публикаций
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;
-- Подписчик: Подписка на несколько публикаций
CREATE SUBSCRIPTION multi_sub
CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
PUBLICATION oltp_data, analytics_data;
Двунаправленная репликация (Multi-Master)
Настройка двухсторонней синхронизации
Конфигурация базы данных A:
-- Создание публикации
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;
-- Подписка на базу данных B
CREATE SUBSCRIPTION db_a_sub
CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
PUBLICATION db_b_pub
WITH (origin = none); -- Предотвращает циклы репликации
Конфигурация базы данных B:
-- Создание публикации
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;
-- Подписка на базу данных A
CREATE SUBSCRIPTION db_b_sub
CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
PUBLICATION db_a_pub
WITH (origin = none);
Разрешение конфликтов
Логическая репликация по умолчанию использует правило «последняя запись побеждает» (last write wins):
-- Установите идентичность реплики для отслеживания конфликтов
ALTER TABLE shared_table REPLICA IDENTITY FULL;
Стратегии обнаружения конфликтов:
- На основе временной метки: Добавьте столбец updated_at
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_shared_table_timestamp
BEFORE UPDATE ON shared_table
FOR EACH ROW
EXECUTE FUNCTION update_timestamp();
- Нумерация версий:
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
version INTEGER DEFAULT 1
);
Варианты начальной синхронизации данных
Вариант 1: Автоматическое копирование (по умолчанию)
-- Подписчик автоматически копирует существующие данные
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true); -- По умолчанию
Вариант 2: Ручная начальная синхронизация
Для больших наборов данных используйте pg_dump:
# Дамп определенных таблиц с издателя
pg_dump -h publisher.example.com -U postgres -d source_db \
-t users -t orders --no-owner --no-acl > initial_data.sql
# Загрузка в подписчика
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql
# Создание подписки без начального копирования
psql -h subscriber.example.com -U postgres -d target_db -c "
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=source_db user=repuser'
PUBLICATION my_pub
WITH (copy_data = false);
"
Вариант 3: Параллельная начальная синхронизация
-- Использование нескольких воркеров для ускорения начальной синхронизации
CREATE SUBSCRIPTION fast_sync
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (
copy_data = true,
streaming = on,
synchronous_commit = off
);
Мониторинг логической репликации
Мониторинг на издателе
-- Просмотр слотов репликации
SELECT
slot_name,
plugin,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;
-- Просмотр активных соединений репликации
SELECT
pid,
usename,
application_name,
client_addr,
state,
sync_state,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;
Мониторинг на подписчике
-- Просмотр статуса подписки
SELECT
subname,
pid,
received_lsn,
latest_end_lsn,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;
-- Проверка ошибок репликации
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';
Скрипт мониторинга
#!/bin/bash
# logical-replication-monitor.sh
echo "=== Статус издателя ==="
psql -h publisher -d mydb -c "
SELECT slot_name, active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;"
echo ""
echo "=== Статус подписчика ==="
psql -h subscriber -d mydb -c "
SELECT subname, pid,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;"
Устранение неполадок
Проблема 1: Подписка не получает данные
Проверьте статус подписки:
SELECT subname, pid, subenabled, subconninfo FROM pg_subscription;
Включите подписку, если она отключена:
ALTER SUBSCRIPTION my_sub ENABLE;
Проверьте ошибки:
SELECT * FROM pg_stat_subscription;
Проблема 2: Растет задержка репликации (lag)
Определите медленные таблицы:
SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;
Увеличьте количество процессов-воркеров:
# postgresql.conf на подписчике
max_logical_replication_workers = 20
max_worker_processes = 30
Проблема 3: Раздувание слота репликации (bloat)
Проверьте использование слота:
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
Удалите неактивные слоты:
SELECT pg_drop_replication_slot('inactive_slot_name');
Проблема 4: Сбой начальной синхронизации
Перезапустите начальную синхронизацию:
-- Удаление и повторное создание подписки
DROP SUBSCRIPTION my_sub;
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true);
Оптимизация производительности
1. Используйте подходящую идентичность реплики (Replica Identity)
-- По умолчанию: только PRIMARY KEY
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;
-- Full: Все столбцы (более высокие накладные расходы)
ALTER TABLE my_table REPLICA IDENTITY FULL;
-- Index: Использование конкретного уникального индекса
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;
2. Отключите ограничения во время начальной синхронизации
-- Временно отключите триггеры для ускорения начальной загрузки
ALTER TABLE my_table DISABLE TRIGGER ALL;
-- После завершения начальной синхронизации
ALTER TABLE my_table ENABLE TRIGGER ALL;
3. Параллельное применение (PostgreSQL 16+)
CREATE SUBSCRIPTION parallel_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (streaming = parallel);
Лучшие практики
- Всегда используйте идентичность реплики: Убедитесь, что у таблиц есть PRIMARY KEY или ограничение UNIQUE
- Мониторьте задержку репликации: Настройте алерты, если лаг > 100 МБ или 5 минут
- Используйте фильтры строк осторожно: Фильтры вычисляются на стороне издателя, что влияет на производительность
- Планируйте разрешение конфликтов: Внедряйте механизмы обнаружения конфликтов в multi-master системах
- Тестируйте процедуры переключения (failover): Практикуйте переключение между базами данных
- Регулярное обслуживание: Очищайте старые слоты репликации
Заключение
Логическая репликация предоставляет мощные возможности распределения данных для PostgreSQL. Ключевые преимущества:
- Выборочная репликация таблиц и строк
- Совместимость между версиями
- Двунаправленные multi-master конфигурации
- Детализированный контроль приватности данных
Начните с простой однонаправленной репликации, тщательно протестируйте её, а затем расширяйте до более сложных топологий по мере необходимости.