Внедрение логической репликации в PostgreSQL: мультимастер и селективная синхронизация данных
Настройка логической репликации PostgreSQL для выборочной синхронизации, обновлений, копий для отчетности и тщательно ограниченных двунаправленных схем.
Реализация логической репликации в PostgreSQL: мульти-мастер и выборочная синхронизация данных
Логическая репликация в PostgreSQL помогает, когда нужно скопировать выбранные таблицы вместо всего кластера. Она полезна для баз данных отчетности, обновления версий, региональных копий для чтения и выборочной синхронизации данных, но нативная логическая репликация PostgreSQL не является готовой системой мульти-мастер без конфликтов.
Логическая vs Физическая репликация
Физическая потоковая репликация
- Реплицирует весь кластер базы данных
- Репликация на бинарном уровне
- Реплики только для чтения
- Требуется одинаковая версия PostgreSQL
- Меньшие накладные расходы
Логическая репликация
- Выборочная репликация таблиц/строк
- Совместимость между версиями
- Подписчики с возможностью записи
- Более высокие накладные расходы
- Гибкое распределение данных
Варианты использования логической репликации
- Выборочное распределение данных: Репликация определенных таблиц в разные регионы
- Эксперименты с двунаправленной синхронизацией: Тщательно ограниченные записи между базами данных, обычно с правилами разрешения конфликтов на уровне приложения
- Обновления между версиями: Репликация со старой версии PostgreSQL на новую
- Агрегация данных: Консолидация данных из нескольких источников
- Соответствие GDPR: Репликация только нечувствительных столбцов
Предварительные требования и настройка
Требования к конфигурации
На издателе (Источник):
# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
На подписчике (Цель):
# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16
Перезапуск PostgreSQL
sudo systemctl restart postgresql
Сетевая конфигурация
Убедитесь, что базы данных могут взаимодействовать:
# Проверка соединения от подписчика к издателю
psql -h publisher.example.com -U replication_user -d source_db
Настройка pg_hba.conf на издателе:
# Разрешить репликационные соединения
host source_db replication_user subscriber_ip/32 scram-sha-256
Базовая настройка логической репликации
Шаг 1: Создание пользователя репликации
На издателе:
CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;
Шаг 2: Создание исходных таблиц
На издателе:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO users (username, email) VALUES
('alice', '[email protected]'),
('bob', '[email protected]');
Шаг 3: Создание публикации
На издателе:
-- Опубликовать все таблицы
CREATE PUBLICATION my_publication FOR ALL TABLES;
-- Или опубликовать определенные таблицы
CREATE PUBLICATION my_publication FOR TABLE users, orders;
-- Или с фильтрами строк для опубликованных таблиц
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');
Просмотр публикаций:
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;
Шаг 4: Создание таблиц-реплик
На подписчике:
-- Таблицы должны иметь идентичную структуру
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
Шаг 5: Создание подписки
На подписчике:
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
PUBLICATION my_publication;
Шаг 6: Проверка репликации
На издателе:
SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;
На подписчике:
SELECT * FROM pg_stat_subscription;
SELECT * FROM users; -- Должны увидеть реплицированные данные
Расширенная конфигурация
Репликация на уровне столбцов (PostgreSQL 15+)
Репликация только определенных столбцов:
-- На издателе: Репликация только нечувствительных столбцов
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
credit_card VARCHAR(20), -- Не будет реплицироваться
created_at TIMESTAMP
);
CREATE PUBLICATION customer_basic
FOR TABLE customers (id, name, email, created_at);
Фильтрация строк
Репликация только активных записей:
CREATE PUBLICATION active_data
FOR TABLE orders WHERE (status IN ('pending', 'processing'));
Региональное распределение данных:
CREATE PUBLICATION us_customers
FOR TABLE customers WHERE (country = 'US');
CREATE PUBLICATION eu_customers
FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));
Несколько публикаций
-- Издатель: Создание нескольких публикаций
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;
-- Подписчик: Подписка на несколько публикаций
CREATE SUBSCRIPTION multi_sub
CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
PUBLICATION oltp_data, analytics_data;
Предостережения по двунаправленной репликации
Нативную логическую репликацию можно настроить в обоих направлениях, но PostgreSQL не объединяет конфликтующие записи автоматически. Используйте этот шаблон только когда каждая строка имеет одного писателя, ключи не могут конфликтовать, и ваше приложение может обрабатывать конфликты.
Пример скелета двусторонней синхронизации
Конфигурация базы данных A:
-- Создание публикации
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;
-- Подписка на базу данных B
CREATE SUBSCRIPTION db_a_sub
CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
PUBLICATION db_b_pub
WITH (origin = none); -- Предотвращает циклы репликации
Конфигурация базы данных B:
-- Создание публикации
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;
-- Подписка на базу данных A
CREATE SUBSCRIPTION db_b_sub
CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
PUBLICATION db_a_pub
WITH (origin = none);
Обработка конфликтов
Логическая репликация не предоставляет автоматического разрешения конфликтов "последняя запись побеждает". Конфликтующие вставки, отсутствующие строки при обновлениях, нарушения ограничений или дублирующиеся ключи могут остановить рабочие процессы применения, пока вы не исправите проблему с данными.
-- Установка идентификатора реплики для отслеживания конфликтов
ALTER TABLE shared_table REPLICA IDENTITY FULL;
Стратегии уменьшения конфликтов:
- Проверки владения на основе временных меток: Добавьте
updated_atи идентификатор писателя, чтобы ваше приложение могло отклонять устаревшие записи.
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_shared_table_timestamp
BEFORE UPDATE ON shared_table
FOR EACH ROW
EXECUTE FUNCTION update_timestamp();
- Нумерация версий: Увеличивайте версию в приложении и отклоняйте обновления на основе устаревших версий.
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
version INTEGER DEFAULT 1
);
Варианты начальной синхронизации данных
Вариант 1: Автоматическое копирование (по умолчанию)
-- Подписчик автоматически копирует существующие данные
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true); -- По умолчанию
Вариант 2: Ручная начальная синхронизация
Для больших наборов данных используйте pg_dump:
# Дамп определенных таблиц с издателя
pg_dump -h publisher.example.com -U postgres -d source_db \
-t users -t orders --no-owner --no-acl > initial_data.sql
# Загрузка на подписчика
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql
# Создание подписки без начального копирования
psql -h subscriber.example.com -U postgres -d target_db -c "
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=source_db user=repuser'
PUBLICATION my_pub
WITH (copy_data = false);
"
Вариант 3: Потоковая передача больших незавершенных транзакций
-- Потоковая передача больших незавершенных транзакций вместо ожидания фиксации.
CREATE SUBSCRIPTION fast_sync
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (
copy_data = true,
streaming = on,
synchronous_commit = off
);
Мониторинг логической репликации
Мониторинг издателя
-- Просмотр слотов репликации
SELECT
slot_name,
plugin,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;
-- Просмотр активных репликационных соединений
SELECT
pid,
usename,
application_name,
client_addr,
state,
sync_state,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;
Мониторинг подписчика
-- Просмотр статуса подписки
SELECT
subname,
pid,
received_lsn,
latest_end_lsn,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;
-- Проверка ошибок репликации
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';
Скрипт мониторинга
#!/bin/bash
# logical-replication-monitor.sh
echo "=== Статус издателя ==="
psql -h publisher -d mydb -c "
SELECT slot_name, active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;"
echo ""
echo "=== Статус подписчика ==="
psql -h subscriber -d mydb -c "
SELECT subname, pid,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;"
Устранение неполадок
Проблема 1: Подписка не получает данные
Проверка статуса подписки:
SELECT subname, subenabled, subconninfo FROM pg_subscription;
SELECT subname, pid, received_lsn, latest_end_lsn FROM pg_stat_subscription;
Включение подписки, если она отключена:
ALTER SUBSCRIPTION my_sub ENABLE;
Проверка ошибок:
SELECT * FROM pg_stat_subscription;
Проблема 2: Растущее отставание репликации
Определение медленных таблиц:
SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;
Увеличение рабочих процессов:
# postgresql.conf на подписчике
max_logical_replication_workers = 20
max_worker_processes = 30
Проблема 3: Раздувание слотов репликации
Проверка использования слотов:
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
Удаление неактивных слотов:
SELECT pg_drop_replication_slot('inactive_slot_name');
Проблема 4: Сбой начальной синхронизации
Перезапуск начальной синхронизации:
-- Удаление и повторное создание подписки
DROP SUBSCRIPTION my_sub;
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true);
Оптимизация производительности
1. Используйте соответствующий идентификатор реплики
-- По умолчанию: только PRIMARY KEY
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;
-- Полный: все столбцы (более высокие накладные расходы)
ALTER TABLE my_table REPLICA IDENTITY FULL;
-- Индекс: используйте определенный уникальный индекс
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;
2. Параллельное применение для больших транзакций
CREATE SUBSCRIPTION parallel_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (streaming = parallel);
Лучшие практики
- Всегда используйте идентификатор реплики: Убедитесь, что таблицы имеют PRIMARY KEY или UNIQUE ограничение
- Мониторьте отставание репликации: Настройте оповещения на основе ваших потребностей восстановления и бюджета хранения WAL
- Используйте фильтры строк осторожно: Фильтры вычисляются на издателе, влияя на производительность
- Планируйте конфликты: Внедрите обнаружение конфликтов перед попыткой двунаправленной записи
- Тестируйте процедуры отработки отказа: Практикуйте переключение между базами данных
- Регулярное обслуживание: Очищайте старые слоты репликации
Вывод
Начните с однонаправленной логической репликации. Она надежна для выборочной синхронизации таблиц, копий для отчетности и многих рабочих процессов обновления. Относитесь к двунаправленной репликации как к продвинутой задаче проектирования, а не просто галочке: определите владение строками, генерацию ключей и обработку конфликтов до того, как начнете запись в обе стороны.