在 PostgreSQL 中实现逻辑复制:多主架构与选择性数据同步
设置PostgreSQL逻辑复制,用于选择性同步、升级、报表副本以及精心设计的双向复制方案。
在PostgreSQL中实现逻辑复制:多主与选择性数据同步
PostgreSQL中的逻辑复制在需要复制选定表而非整个集群时非常有用。它适用于报表数据库、版本升级、区域读取副本和选择性数据同步,但原生PostgreSQL逻辑复制并非开箱即用的无冲突多主系统。
逻辑复制与物理复制
物理流复制
- 复制整个数据库集群
- 二进制级别复制
- 只读副本
- 需要相同PostgreSQL版本
- 较低开销
逻辑复制
- 选择性表/行复制
- 跨版本兼容
- 可写订阅者
- 较高开销
- 灵活的数据分布
逻辑复制的用例
- 选择性数据分发:将特定表复制到不同区域
- 双向同步实验:在数据库之间进行精心设计的写入,通常带有应用级冲突规则
- 跨版本升级:从旧版PostgreSQL复制到新版
- 数据聚合:整合来自多个来源的数据
- GDPR合规:仅复制非敏感列
前提条件和设置
配置要求
在发布者(源)上:
# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
在订阅者(目标)上:
# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16
重启PostgreSQL
sudo systemctl restart postgresql
网络配置
确保数据库可以通信:
# 从订阅者测试连接到发布者
psql -h publisher.example.com -U replication_user -d source_db
在发布者上配置pg_hba.conf:
# 允许复制连接
host source_db replication_user subscriber_ip/32 scram-sha-256
基本逻辑复制设置
步骤1:创建复制用户
在发布者上:
CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;
步骤2:创建源表
在发布者上:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO users (username, email) VALUES
('alice', '[email protected]'),
('bob', '[email protected]');
步骤3:创建发布
在发布者上:
-- 发布所有表
CREATE PUBLICATION my_publication FOR ALL TABLES;
-- 或发布特定表
CREATE PUBLICATION my_publication FOR TABLE users, orders;
-- 或带行过滤的发布
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');
查看发布:
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;
步骤4:创建副本表
在订阅者上:
-- 表必须具有相同的结构
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
status VARCHAR(20) DEFAULT 'active',
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
步骤5:创建订阅
在订阅者上:
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
PUBLICATION my_publication;
步骤6:验证复制
在发布者上:
SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;
在订阅者上:
SELECT * FROM pg_stat_subscription;
SELECT * FROM users; -- 应看到复制的数据
高级配置
列级复制(PostgreSQL 15+)
仅复制特定列:
-- 在发布者上:仅复制非敏感列
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
credit_card VARCHAR(20), -- 不会被复制
created_at TIMESTAMP
);
CREATE PUBLICATION customer_basic
FOR TABLE customers (id, name, email, created_at);
行过滤
仅复制活动记录:
CREATE PUBLICATION active_data
FOR TABLE orders WHERE (status IN ('pending', 'processing'));
区域数据分发:
CREATE PUBLICATION us_customers
FOR TABLE customers WHERE (country = 'US');
CREATE PUBLICATION eu_customers
FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));
多个发布
-- 发布者:创建多个发布
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;
-- 订阅者:订阅多个发布
CREATE SUBSCRIPTION multi_sub
CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
PUBLICATION oltp_data, analytics_data;
双向复制注意事项
原生逻辑复制可以双向配置,但PostgreSQL不会自动合并冲突写入。仅当每行有单一写入者、键不会冲突且应用程序能处理冲突时,才使用此模式。
示例双向同步框架
数据库A配置:
-- 创建发布
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;
-- 订阅数据库B
CREATE SUBSCRIPTION db_a_sub
CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
PUBLICATION db_b_pub
WITH (origin = none); -- 防止复制循环
数据库B配置:
-- 创建发布
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;
-- 订阅数据库A
CREATE SUBSCRIPTION db_b_sub
CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
PUBLICATION db_a_pub
WITH (origin = none);
冲突处理
逻辑复制不提供自动的“最后写入胜出”冲突解决。冲突的插入、更新时缺失行、约束违规或重复键会停止应用工作进程,直到你修复数据问题。
-- 设置副本标识以跟踪冲突
ALTER TABLE shared_table REPLICA IDENTITY FULL;
冲突减少策略:
- 基于时间戳的所有权检查:添加
updated_at和写入者标识符,使应用程序能拒绝过时的写入。
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_shared_table_timestamp
BEFORE UPDATE ON shared_table
FOR EACH ROW
EXECUTE FUNCTION update_timestamp();
- 版本编号:在应用程序中递增版本,并基于过时版本拒绝更新。
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
version INTEGER DEFAULT 1
);
初始数据同步选项
选项1:自动复制(默认)
-- 订阅者自动复制现有数据
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true); -- 默认
选项2:手动初始同步
对于大数据集,使用pg_dump:
# 从发布者转储特定表
pg_dump -h publisher.example.com -U postgres -d source_db \
-t users -t orders --no-owner --no-acl > initial_data.sql
# 加载到订阅者
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql
# 创建不带初始复制的订阅
psql -h subscriber.example.com -U postgres -d target_db -c "
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=source_db user=repuser'
PUBLICATION my_pub
WITH (copy_data = false);
"
选项3:流式传输大型进行中事务
-- 流式传输大型进行中事务,而不是等待提交。
CREATE SUBSCRIPTION fast_sync
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (
copy_data = true,
streaming = on,
synchronous_commit = off
);
监控逻辑复制
发布者监控
-- 查看复制槽
SELECT
slot_name,
plugin,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;
-- 查看活动复制连接
SELECT
pid,
usename,
application_name,
client_addr,
state,
sync_state,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;
订阅者监控
-- 查看订阅状态
SELECT
subname,
pid,
received_lsn,
latest_end_lsn,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;
-- 检查复制错误
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';
监控脚本
#!/bin/bash
# logical-replication-monitor.sh
echo "=== 发布者状态 ==="
psql -h publisher -d mydb -c "
SELECT slot_name, active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;"
echo ""
echo "=== 订阅者状态 ==="
psql -h subscriber -d mydb -c "
SELECT subname, pid,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;"
故障排除
问题1:订阅未接收数据
检查订阅状态:
SELECT subname, subenabled, subconninfo FROM pg_subscription;
SELECT subname, pid, received_lsn, latest_end_lsn FROM pg_stat_subscription;
如果订阅被禁用,启用它:
ALTER SUBSCRIPTION my_sub ENABLE;
检查错误:
SELECT * FROM pg_stat_subscription;
问题2:复制延迟增长
识别慢表:
SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;
增加工作进程:
# 订阅者上的postgresql.conf
max_logical_replication_workers = 20
max_worker_processes = 30
问题3:复制槽膨胀
检查槽使用情况:
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
删除非活动槽:
SELECT pg_drop_replication_slot('inactive_slot_name');
问题4:初始同步失败
重新启动初始同步:
-- 删除并重新创建订阅
DROP SUBSCRIPTION my_sub;
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true);
性能优化
1. 使用适当的副本标识
-- 默认:仅主键
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;
-- 完整:所有列(较高开销)
ALTER TABLE my_table REPLICA IDENTITY FULL;
-- 索引:使用特定唯一索引
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;
2. 大型事务的并行应用
CREATE SUBSCRIPTION parallel_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (streaming = parallel);
最佳实践
- 始终使用副本标识:确保表具有主键或唯一约束
- 监控复制延迟:根据恢复需求和WAL保留预算设置警报
- 谨慎使用行过滤:过滤在发布者上评估,影响性能
- 规划冲突:在尝试双向写入之前实现冲突检测
- 测试故障转移过程:练习在数据库之间切换
- 定期维护:清理旧的复制槽
总结
从单向逻辑复制开始。它对于选择性表同步、报表副本和许多升级工作流是可靠的。将双向复制视为高级设计问题,而非一个复选框:在向两侧写入之前,定义行所有权、键生成和冲突处理。