在 PostgreSQL 中实现逻辑复制:多主架构与选择性数据同步

深入掌握 PostgreSQL 逻辑复制,实现选择性数据同步、多主配置以及跨版本升级。包含真实案例与故障排除的完整指南。

23 浏览量

在 PostgreSQL 中实现逻辑复制:多主复制与选择性数据同步

简介

PostgreSQL 中的逻辑复制允许您在数据库之间复制特定的表、行甚至列,从而实现复杂的数据分布模式。与复制整个数据库集群的物理流复制不同,逻辑复制提供了对哪些数据被复制以及复制到何处的细粒度控制。

逻辑复制 vs 物理复制

物理流复制

  • 复制整个数据库集群
  • 二进制级复制
  • 只读副本
  • 要求相同的 PostgreSQL 版本
  • 开销较低

逻辑复制

  • 选择性表/行复制
  • 跨版本兼容
  • 可写订阅者
  • 开销较高
  • 灵活的数据分布

逻辑复制的使用场景

  1. 选择性数据分布:将特定表复制到不同区域
  2. 多主配置:具有双向同步的多个可写数据库
  3. 跨版本升级:从旧版 PostgreSQL 复制到新版本
  4. 数据汇聚:合并来自多个源的数据
  5. 符合 GDPR 合规性:仅复制非敏感列

前提条件与环境搭建

配置要求

在发布者(源端)上:

# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10

在订阅者(目标端)上:

# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16

重启 PostgreSQL

sudo systemctl restart postgresql

网络配置

确保数据库之间可以通信:

# 测试从订阅者到发布者的连接
psql -h publisher.example.com -U replication_user -d source_db

在发布者上配置 pg_hba.conf:

# 允许复制连接
host    all    replication_user    subscriber_ip/32    md5

基础逻辑复制设置

第 1 步:创建复制用户

在发布者上:

CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;

第 2 步:创建源表

在发布者上:

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

INSERT INTO users (username, email) VALUES 
    ('alice', '[email protected]'),
    ('bob', '[email protected]');

第 3 步:创建发布

在发布者上:

-- 发布所有表
CREATE PUBLICATION my_publication FOR ALL TABLES;

-- 或发布特定表
CREATE PUBLICATION my_publication FOR TABLE users, orders;

-- 或使用行过滤(PostgreSQL 15+)
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');

查看发布:

SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;

第 4 步:创建副本表

在订阅者上:

-- 表必须具有相同的结构
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

第 5 步:创建订阅

在订阅者上:

CREATE SUBSCRIPTION my_subscription
    CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
    PUBLICATION my_publication;

第 6 步:验证复制

在发布者上:

SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;

在订阅者上:

SELECT * FROM pg_stat_subscription;
SELECT * FROM users;  -- 应该能看到已复制的数据

高级配置

列级复制(PostgreSQL 15+)

仅复制特定列:

-- 在发布者上:仅复制非敏感列
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100),
    credit_card VARCHAR(20),  -- 不会被复制
    created_at TIMESTAMP
);

CREATE PUBLICATION customer_basic 
    FOR TABLE customers (id, name, email, created_at);

行过滤

仅复制活跃记录:

CREATE PUBLICATION active_data 
    FOR TABLE orders WHERE (status IN ('pending', 'processing'));

区域数据分布:

CREATE PUBLICATION us_customers 
    FOR TABLE customers WHERE (country = 'US');

CREATE PUBLICATION eu_customers 
    FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));

多个发布

-- 发布者:创建多个发布
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;

-- 订阅者:订阅多个发布
CREATE SUBSCRIPTION multi_sub
    CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
    PUBLICATION oltp_data, analytics_data;

双向复制(多主)

设置双向同步

数据库 A 配置:

-- 创建发布
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;

-- 订阅数据库 B
CREATE SUBSCRIPTION db_a_sub
    CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
    PUBLICATION db_b_pub
    WITH (origin = none);  -- 防止复制回环

数据库 B 配置:

-- 创建发布
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;

-- 订阅数据库 A
CREATE SUBSCRIPTION db_b_sub
    CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
    PUBLICATION db_a_pub
    WITH (origin = none);

冲突解决

逻辑复制默认使用“最后写入者胜出”原则:

-- 设置复制标识以跟踪冲突
ALTER TABLE shared_table REPLICA IDENTITY FULL;

冲突检测策略:

  1. 基于时间戳:添加 updated_at 列
CREATE TABLE shared_table (
    id SERIAL PRIMARY KEY,
    data TEXT,
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER update_shared_table_timestamp
    BEFORE UPDATE ON shared_table
    FOR EACH ROW
    EXECUTE FUNCTION update_timestamp();
  1. 版本号控制
CREATE TABLE shared_table (
    id SERIAL PRIMARY KEY,
    data TEXT,
    version INTEGER DEFAULT 1
);

初始数据同步选项

选项 1:自动复制(默认)

-- 订阅者自动复制现有数据
CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = true);  -- 默认值

选项 2:手动初始同步

对于大数据集,使用 pg_dump:

# 从发布者导出特定表
pg_dump -h publisher.example.com -U postgres -d source_db \
    -t users -t orders --no-owner --no-acl > initial_data.sql

# 加载到订阅者
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql

# 创建订阅,不进行初始复制
psql -h subscriber.example.com -U postgres -d target_db -c "
    CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=source_db user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = false);
"

选项 3:并行初始同步

-- 使用多个工作进程进行更快的初始同步
CREATE SUBSCRIPTION fast_sync
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (
        copy_data = true,
        streaming = on,
        synchronous_commit = off
    );

监控逻辑复制

发布者监控

-- 查看复制槽
SELECT 
    slot_name,
    plugin,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;

-- 查看活跃的复制连接
SELECT 
    pid,
    usename,
    application_name,
    client_addr,
    state,
    sync_state,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;

订阅者监控

-- 查看订阅状态
SELECT 
    subname,
    pid,
    received_lsn,
    latest_end_lsn,
    pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;

-- 检查复制错误
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';

监控脚本

#!/bin/bash
# logical-replication-monitor.sh

echo "=== 发布者状态 ==="
psql -h publisher -d mydb -c "
    SELECT slot_name, active, 
           pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
    FROM pg_replication_slots;"

echo ""
echo "=== 订阅者状态 ==="
psql -h subscriber -d mydb -c "
    SELECT subname, pid, 
           pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
    FROM pg_stat_subscription;"

常见问题排除

问题 1:订阅未接收数据

检查订阅状态:

SELECT subname, pid, subenabled, subconninfo FROM pg_subscription;

如果订阅被禁用,请启用:

ALTER SUBSCRIPTION my_sub ENABLE;

检查错误:

SELECT * FROM pg_stat_subscription;

问题 2:复制延迟持续增长

识别慢表:

SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;

增加工作进程:

# 订阅者上的 postgresql.conf
max_logical_replication_workers = 20
max_worker_processes = 30

问题 3:复制槽占用空间过大(膨胀)

检查槽位使用情况:

SELECT 
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;

删除不活跃的槽位:

SELECT pg_drop_replication_slot('inactive_slot_name');

问题 4:初始同步失败

重启初始同步:

-- 删除并重建订阅
DROP SUBSCRIPTION my_sub;

CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = true);

性能优化

1. 使用适当的复制标识

-- 默认:仅主键
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;

-- 全部:所有列(开销较高)
ALTER TABLE my_table REPLICA IDENTITY FULL;

-- 索引:使用特定的唯一索引
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;

2. 初始同步期间禁用约束

-- 临时禁用触发器以加快初始加载速度
ALTER TABLE my_table DISABLE TRIGGER ALL;

-- 初始同步完成后
ALTER TABLE my_table ENABLE TRIGGER ALL;

3. 并行应用(PostgreSQL 16+)

CREATE SUBSCRIPTION parallel_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (streaming = parallel);

最佳实践

  1. 始终使用复制标识:确保表具有主键或唯一约束。
  2. 监控复制延迟:为延迟超过 100MB 或 5 分钟的情况设置告警。
  3. 谨慎使用行过滤器:过滤器在发布端评估,会影响性能。
  4. 规划冲突处理:在多主配置中实现冲突检测。
  5. 测试故障转移流程:演练如何在数据库之间切换。
  6. 定期维护:清理旧的复制槽。

结论

逻辑复制为 PostgreSQL 提供了强大的数据分布能力。其关键优势包括:

  • 选择性的表和行复制
  • 跨版本兼容性
  • 双向多主配置
  • 细粒度的数据隐私控制

建议从简单的单向复制开始,经过彻底测试后,再根据需要扩展到更复杂的拓扑结构。