在 PostgreSQL 中实现逻辑复制:多主架构与选择性数据同步

设置PostgreSQL逻辑复制,用于选择性同步、升级、报表副本以及精心设计的双向复制方案。

在PostgreSQL中实现逻辑复制:多主与选择性数据同步

PostgreSQL中的逻辑复制在需要复制选定表而非整个集群时非常有用。它适用于报表数据库、版本升级、区域读取副本和选择性数据同步,但原生PostgreSQL逻辑复制并非开箱即用的无冲突多主系统。

逻辑复制与物理复制

物理流复制

  • 复制整个数据库集群
  • 二进制级别复制
  • 只读副本
  • 需要相同PostgreSQL版本
  • 较低开销

逻辑复制

  • 选择性表/行复制
  • 跨版本兼容
  • 可写订阅者
  • 较高开销
  • 灵活的数据分布

逻辑复制的用例

  1. 选择性数据分发:将特定表复制到不同区域
  2. 双向同步实验:在数据库之间进行精心设计的写入,通常带有应用级冲突规则
  3. 跨版本升级:从旧版PostgreSQL复制到新版
  4. 数据聚合:整合来自多个来源的数据
  5. GDPR合规:仅复制非敏感列

前提条件和设置

配置要求

在发布者(源)上:

# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10

在订阅者(目标)上:

# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16

重启PostgreSQL

sudo systemctl restart postgresql

网络配置

确保数据库可以通信:

# 从订阅者测试连接到发布者
psql -h publisher.example.com -U replication_user -d source_db

在发布者上配置pg_hba.conf:

# 允许复制连接
host    source_db    replication_user    subscriber_ip/32    scram-sha-256

基本逻辑复制设置

步骤1:创建复制用户

在发布者上:

CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;

步骤2:创建源表

在发布者上:

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

INSERT INTO users (username, email) VALUES 
    ('alice', '[email protected]'),
    ('bob', '[email protected]');

步骤3:创建发布

在发布者上:

-- 发布所有表
CREATE PUBLICATION my_publication FOR ALL TABLES;

-- 或发布特定表
CREATE PUBLICATION my_publication FOR TABLE users, orders;

-- 或带行过滤的发布
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');

查看发布:

SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;

步骤4:创建副本表

在订阅者上:

-- 表必须具有相同的结构
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    status VARCHAR(20) DEFAULT 'active',
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

步骤5:创建订阅

在订阅者上:

CREATE SUBSCRIPTION my_subscription
    CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
    PUBLICATION my_publication;

步骤6:验证复制

在发布者上:

SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;

在订阅者上:

SELECT * FROM pg_stat_subscription;
SELECT * FROM users;  -- 应看到复制的数据

高级配置

列级复制(PostgreSQL 15+)

仅复制特定列:

-- 在发布者上:仅复制非敏感列
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100),
    credit_card VARCHAR(20),  -- 不会被复制
    created_at TIMESTAMP
);

CREATE PUBLICATION customer_basic 
    FOR TABLE customers (id, name, email, created_at);

行过滤

仅复制活动记录:

CREATE PUBLICATION active_data 
    FOR TABLE orders WHERE (status IN ('pending', 'processing'));

区域数据分发:

CREATE PUBLICATION us_customers 
    FOR TABLE customers WHERE (country = 'US');

CREATE PUBLICATION eu_customers 
    FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));

多个发布

-- 发布者:创建多个发布
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;

-- 订阅者:订阅多个发布
CREATE SUBSCRIPTION multi_sub
    CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
    PUBLICATION oltp_data, analytics_data;

双向复制注意事项

原生逻辑复制可以双向配置,但PostgreSQL不会自动合并冲突写入。仅当每行有单一写入者、键不会冲突且应用程序能处理冲突时,才使用此模式。

示例双向同步框架

数据库A配置:

-- 创建发布
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;

-- 订阅数据库B
CREATE SUBSCRIPTION db_a_sub
    CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
    PUBLICATION db_b_pub
    WITH (origin = none);  -- 防止复制循环

数据库B配置:

-- 创建发布
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;

-- 订阅数据库A
CREATE SUBSCRIPTION db_b_sub
    CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
    PUBLICATION db_a_pub
    WITH (origin = none);

冲突处理

逻辑复制不提供自动的“最后写入胜出”冲突解决。冲突的插入、更新时缺失行、约束违规或重复键会停止应用工作进程,直到你修复数据问题。

-- 设置副本标识以跟踪冲突
ALTER TABLE shared_table REPLICA IDENTITY FULL;

冲突减少策略:

  1. 基于时间戳的所有权检查:添加updated_at和写入者标识符,使应用程序能拒绝过时的写入。
CREATE TABLE shared_table (
    id SERIAL PRIMARY KEY,
    data TEXT,
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER update_shared_table_timestamp
    BEFORE UPDATE ON shared_table
    FOR EACH ROW
    EXECUTE FUNCTION update_timestamp();
  1. 版本编号:在应用程序中递增版本,并基于过时版本拒绝更新。
CREATE TABLE shared_table (
    id SERIAL PRIMARY KEY,
    data TEXT,
    version INTEGER DEFAULT 1
);

初始数据同步选项

选项1:自动复制(默认)

-- 订阅者自动复制现有数据
CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = true);  -- 默认

选项2:手动初始同步

对于大数据集,使用pg_dump:

# 从发布者转储特定表
pg_dump -h publisher.example.com -U postgres -d source_db \
    -t users -t orders --no-owner --no-acl > initial_data.sql

# 加载到订阅者
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql

# 创建不带初始复制的订阅
psql -h subscriber.example.com -U postgres -d target_db -c "
    CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=source_db user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = false);
"

选项3:流式传输大型进行中事务

-- 流式传输大型进行中事务,而不是等待提交。
CREATE SUBSCRIPTION fast_sync
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (
        copy_data = true,
        streaming = on,
        synchronous_commit = off
    );

监控逻辑复制

发布者监控

-- 查看复制槽
SELECT 
    slot_name,
    plugin,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;

-- 查看活动复制连接
SELECT 
    pid,
    usename,
    application_name,
    client_addr,
    state,
    sync_state,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;

订阅者监控

-- 查看订阅状态
SELECT 
    subname,
    pid,
    received_lsn,
    latest_end_lsn,
    pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;

-- 检查复制错误
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';

监控脚本

#!/bin/bash
# logical-replication-monitor.sh

echo "=== 发布者状态 ==="
psql -h publisher -d mydb -c "
    SELECT slot_name, active, 
           pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
    FROM pg_replication_slots;"

echo ""
echo "=== 订阅者状态 ==="
psql -h subscriber -d mydb -c "
    SELECT subname, pid, 
           pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
    FROM pg_stat_subscription;"

故障排除

问题1:订阅未接收数据

检查订阅状态:

SELECT subname, subenabled, subconninfo FROM pg_subscription;
SELECT subname, pid, received_lsn, latest_end_lsn FROM pg_stat_subscription;

如果订阅被禁用,启用它:

ALTER SUBSCRIPTION my_sub ENABLE;

检查错误:

SELECT * FROM pg_stat_subscription;

问题2:复制延迟增长

识别慢表:

SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;

增加工作进程:

# 订阅者上的postgresql.conf
max_logical_replication_workers = 20
max_worker_processes = 30

问题3:复制槽膨胀

检查槽使用情况:

SELECT 
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;

删除非活动槽:

SELECT pg_drop_replication_slot('inactive_slot_name');

问题4:初始同步失败

重新启动初始同步:

-- 删除并重新创建订阅
DROP SUBSCRIPTION my_sub;

CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = true);

性能优化

1. 使用适当的副本标识

-- 默认:仅主键
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;

-- 完整:所有列(较高开销)
ALTER TABLE my_table REPLICA IDENTITY FULL;

-- 索引:使用特定唯一索引
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;

2. 大型事务的并行应用

CREATE SUBSCRIPTION parallel_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (streaming = parallel);

最佳实践

  1. 始终使用副本标识:确保表具有主键或唯一约束
  2. 监控复制延迟:根据恢复需求和WAL保留预算设置警报
  3. 谨慎使用行过滤:过滤在发布者上评估,影响性能
  4. 规划冲突:在尝试双向写入之前实现冲突检测
  5. 测试故障转移过程:练习在数据库之间切换
  6. 定期维护:清理旧的复制槽

总结

从单向逻辑复制开始。它对于选择性表同步、报表副本和许多升级工作流是可靠的。将双向复制视为高级设计问题,而非一个复选框:在向两侧写入之前,定义行所有权、键生成和冲突处理。