在 PostgreSQL 中实现逻辑复制:多主复制与选择性数据同步
简介
PostgreSQL 中的逻辑复制允许您在数据库之间复制特定的表、行甚至列,从而实现复杂的数据分布模式。与复制整个数据库集群的物理流复制不同,逻辑复制提供了对哪些数据被复制以及复制到何处的细粒度控制。
逻辑复制 vs 物理复制
物理流复制
- 复制整个数据库集群
- 二进制级复制
- 只读副本
- 要求相同的 PostgreSQL 版本
- 开销较低
逻辑复制
- 选择性表/行复制
- 跨版本兼容
- 可写订阅者
- 开销较高
- 灵活的数据分布
逻辑复制的使用场景
- 选择性数据分布:将特定表复制到不同区域
- 多主配置:具有双向同步的多个可写数据库
- 跨版本升级:从旧版 PostgreSQL 复制到新版本
- 数据汇聚:合并来自多个源的数据
- 符合 GDPR 合规性:仅复制非敏感列
前提条件与环境搭建
配置要求
在发布者(源端)上:
# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
在订阅者(目标端)上:
# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16
重启 PostgreSQL
sudo systemctl restart postgresql
网络配置
确保数据库之间可以通信:
# 测试从订阅者到发布者的连接
psql -h publisher.example.com -U replication_user -d source_db
在发布者上配置 pg_hba.conf:
# 允许复制连接
host all replication_user subscriber_ip/32 md5
基础逻辑复制设置
第 1 步:创建复制用户
在发布者上:
CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;
第 2 步:创建源表
在发布者上:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO users (username, email) VALUES
('alice', '[email protected]'),
('bob', '[email protected]');
第 3 步:创建发布
在发布者上:
-- 发布所有表
CREATE PUBLICATION my_publication FOR ALL TABLES;
-- 或发布特定表
CREATE PUBLICATION my_publication FOR TABLE users, orders;
-- 或使用行过滤(PostgreSQL 15+)
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');
查看发布:
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;
第 4 步:创建副本表
在订阅者上:
-- 表必须具有相同的结构
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
第 5 步:创建订阅
在订阅者上:
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
PUBLICATION my_publication;
第 6 步:验证复制
在发布者上:
SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;
在订阅者上:
SELECT * FROM pg_stat_subscription;
SELECT * FROM users; -- 应该能看到已复制的数据
高级配置
列级复制(PostgreSQL 15+)
仅复制特定列:
-- 在发布者上:仅复制非敏感列
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
credit_card VARCHAR(20), -- 不会被复制
created_at TIMESTAMP
);
CREATE PUBLICATION customer_basic
FOR TABLE customers (id, name, email, created_at);
行过滤
仅复制活跃记录:
CREATE PUBLICATION active_data
FOR TABLE orders WHERE (status IN ('pending', 'processing'));
区域数据分布:
CREATE PUBLICATION us_customers
FOR TABLE customers WHERE (country = 'US');
CREATE PUBLICATION eu_customers
FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));
多个发布
-- 发布者:创建多个发布
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;
-- 订阅者:订阅多个发布
CREATE SUBSCRIPTION multi_sub
CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
PUBLICATION oltp_data, analytics_data;
双向复制(多主)
设置双向同步
数据库 A 配置:
-- 创建发布
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;
-- 订阅数据库 B
CREATE SUBSCRIPTION db_a_sub
CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
PUBLICATION db_b_pub
WITH (origin = none); -- 防止复制回环
数据库 B 配置:
-- 创建发布
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;
-- 订阅数据库 A
CREATE SUBSCRIPTION db_b_sub
CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
PUBLICATION db_a_pub
WITH (origin = none);
冲突解决
逻辑复制默认使用“最后写入者胜出”原则:
-- 设置复制标识以跟踪冲突
ALTER TABLE shared_table REPLICA IDENTITY FULL;
冲突检测策略:
- 基于时间戳:添加 updated_at 列
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_shared_table_timestamp
BEFORE UPDATE ON shared_table
FOR EACH ROW
EXECUTE FUNCTION update_timestamp();
- 版本号控制:
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
version INTEGER DEFAULT 1
);
初始数据同步选项
选项 1:自动复制(默认)
-- 订阅者自动复制现有数据
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true); -- 默认值
选项 2:手动初始同步
对于大数据集,使用 pg_dump:
# 从发布者导出特定表
pg_dump -h publisher.example.com -U postgres -d source_db \
-t users -t orders --no-owner --no-acl > initial_data.sql
# 加载到订阅者
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql
# 创建订阅,不进行初始复制
psql -h subscriber.example.com -U postgres -d target_db -c "
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=source_db user=repuser'
PUBLICATION my_pub
WITH (copy_data = false);
"
选项 3:并行初始同步
-- 使用多个工作进程进行更快的初始同步
CREATE SUBSCRIPTION fast_sync
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (
copy_data = true,
streaming = on,
synchronous_commit = off
);
监控逻辑复制
发布者监控
-- 查看复制槽
SELECT
slot_name,
plugin,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;
-- 查看活跃的复制连接
SELECT
pid,
usename,
application_name,
client_addr,
state,
sync_state,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;
订阅者监控
-- 查看订阅状态
SELECT
subname,
pid,
received_lsn,
latest_end_lsn,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;
-- 检查复制错误
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';
监控脚本
#!/bin/bash
# logical-replication-monitor.sh
echo "=== 发布者状态 ==="
psql -h publisher -d mydb -c "
SELECT slot_name, active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;"
echo ""
echo "=== 订阅者状态 ==="
psql -h subscriber -d mydb -c "
SELECT subname, pid,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;"
常见问题排除
问题 1:订阅未接收数据
检查订阅状态:
SELECT subname, pid, subenabled, subconninfo FROM pg_subscription;
如果订阅被禁用,请启用:
ALTER SUBSCRIPTION my_sub ENABLE;
检查错误:
SELECT * FROM pg_stat_subscription;
问题 2:复制延迟持续增长
识别慢表:
SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;
增加工作进程:
# 订阅者上的 postgresql.conf
max_logical_replication_workers = 20
max_worker_processes = 30
问题 3:复制槽占用空间过大(膨胀)
检查槽位使用情况:
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
删除不活跃的槽位:
SELECT pg_drop_replication_slot('inactive_slot_name');
问题 4:初始同步失败
重启初始同步:
-- 删除并重建订阅
DROP SUBSCRIPTION my_sub;
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true);
性能优化
1. 使用适当的复制标识
-- 默认:仅主键
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;
-- 全部:所有列(开销较高)
ALTER TABLE my_table REPLICA IDENTITY FULL;
-- 索引:使用特定的唯一索引
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;
2. 初始同步期间禁用约束
-- 临时禁用触发器以加快初始加载速度
ALTER TABLE my_table DISABLE TRIGGER ALL;
-- 初始同步完成后
ALTER TABLE my_table ENABLE TRIGGER ALL;
3. 并行应用(PostgreSQL 16+)
CREATE SUBSCRIPTION parallel_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (streaming = parallel);
最佳实践
- 始终使用复制标识:确保表具有主键或唯一约束。
- 监控复制延迟:为延迟超过 100MB 或 5 分钟的情况设置告警。
- 谨慎使用行过滤器:过滤器在发布端评估,会影响性能。
- 规划冲突处理:在多主配置中实现冲突检测。
- 测试故障转移流程:演练如何在数据库之间切换。
- 定期维护:清理旧的复制槽。
结论
逻辑复制为 PostgreSQL 提供了强大的数据分布能力。其关键优势包括:
- 选择性的表和行复制
- 跨版本兼容性
- 双向多主配置
- 细粒度的数据隐私控制
建议从简单的单向复制开始,经过彻底测试后,再根据需要扩展到更复杂的拓扑结构。