PostgreSQLにおける論理レプリケーションの実装:マルチマスターと選択的データ同期

選択的データ同期、マルチマスターコンフィグレーション、およびクロスバージョンアップグレードのためのPostgreSQL論理レプリケーションをマスターします。実世界の例とトラブルシューティングを含む完全ガイド。

28 ビュー

PostgreSQLでの論理レプリケーション実装:マルチマスターと選択的なデータ同期

はじめに

PostgreSQLの論理レプリケーションにより、データベース間で特定のテーブル、行、さらには列を複製し、高度なデータ配布パターンを実現できます。データベース全体のクラスターをコピーする物理ストリーミングレプリケーションとは異なり、論理レプリケーションは、どこに、どのデータをレプリケーションするかを細かく制御できます。

論理レプリケーションと物理レプリケーション

物理ストリーミングレプリケーション

  • データベース全体のクラスターをレプリケーション
  • バイナリレベルのレプリケーション
  • 読み取り専用のレプリカ
  • 同じPostgreSQLバージョンが必要
  • オーバーヘッドが低い

論理レプリケーション

  • 選択的なテーブル/行のレプリケーション
  • バージョン間互換性
  • 書き込み可能なサブスクライバー
  • オーヘッドが高い
  • 柔軟なデータ配布

論理レプリケーションの使用例

  1. 選択的なデータ配布: 特定のテーブルを異なる地域にレプリケーション
  2. マルチマスターセットアップ: 複数の書き込み可能データベースと双方向同期
  3. バージョン間アップグレード: 古いバージョンから新しいPostgreSQLバージョンへレプリケーション
  4. データ集約: 複数のソースからデータを統合
  5. GDPR対応: 機密でない列のみをレプリケーション

前提条件と設定

設定要件

パブリッシャー(送信元):

# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10

サブスクライバー(送信先):

# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16

PostgreSQLの再起動

sudo systemctl restart postgresql

ネットワーク設定

データベース間の通信を確認:

# サブスクライバーからパブリッシャーへの接続テスト
psql -h publisher.example.com -U replication_user -d source_db

パブリッシャーでpg_hba.confを設定:

# レプリケーション接続を許可
host    all    replication_user    subscriber_ip/32    md5

基本的な論理レプリケーション設定

ステップ1: レプリケーションユーザーの作成

パブリッシャーで:

CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;

ステップ2: ソーステーブルの作成

パブリッシャーで:

CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

INSERT INTO users (username, email) VALUES 
    ('alice', '[email protected]'),
    ('bob', '[email protected]');

ステップ3: パブリケーションの作成

パブリッシャーで:

-- すべてのテーブルをパブリッシュ
CREATE PUBLICATION my_publication FOR ALL TABLES;

-- または特定のテーブルをパブリッシュ
CREATE PUBLICATION my_publication FOR TABLE users, orders;

-- または行フィルタ付き(PostgreSQL 15+)
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');

パブリケーションの表示:

SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;

ステップ4: レプリカテーブルの作成

サブスクライバーで:

-- テーブルは同一の構造である必要がある
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    email VARCHAR(100) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_at TIMESTAMP DEFAULT NOW()
);

ステップ5: サブスクリプションの作成

サブスクライバーで:

CREATE SUBSCRIPTION my_subscription
    CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
    PUBLICATION my_publication;

ステップ6: レプリケーションの検証

パブリッシャーで:

SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;

サブスクライバーで:

SELECT * FROM pg_stat_subscription;
SELECT * FROM users;  -- レプリケーションされたデータが表示される

高度な設定

列レベルのレプリケーション(PostgreSQL 15+)

特定の列のみをレプリケーション:

-- パブリッシャーで: 機密でない列のみをレプリケーション
CREATE TABLE customers (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100),
    credit_card VARCHAR(20),  -- レプリケーションされない
    created_at TIMESTAMP
);

CREATE PUBLICATION customer_basic 
    FOR TABLE customers (id, name, email, created_at);

行フィルタリング

アクティブなレコードのみをレプリケーション:

CREATE PUBLICATION active_data 
    FOR TABLE orders WHERE (status IN ('pending', 'processing'));

地域データ配布:

CREATE PUBLICATION us_customers 
    FOR TABLE customers WHERE (country = 'US');

CREATE PUBLICATION eu_customers 
    FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));

複数のパブリケーション

-- パブリッシャー: 複数のパブリケーションを作成
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;

-- サブスクライバー: 複数のパブリケーションを購読
CREATE SUBSCRIPTION multi_sub
    CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
    PUBLICATION oltp_data, analytics_data;

双方向レプリケーション(マルチマスター)

2方向同期の設定

データベースAの設定:

-- パブリケーション作成
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;

-- データベースBを購読
CREATE SUBSCRIPTION db_a_sub
    CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
    PUBLICATION db_b_pub
    WITH (origin = none);  -- レプリケーションループを防止

データベースBの設定:

-- パブリケーション作成
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;

-- データベースAを購読
CREATE SUBSCRIPTION db_b_sub
    CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
    PUBLICATION db_a_pub
    WITH (origin = none);

競合解決

論理レプリケーションはデフォルトで「最後の書き込みが勝つ」方式を使用します:

-- レプリカIDを設定して競合を追跡
ALTER TABLE shared_table REPLICA IDENTITY FULL;

競合検出戦略:

  1. タイムスタンププベース: updated_atカラムを追加
CREATE TABLE shared_table (
    id SERIAL PRIMARY KEY,
    data TEXT,
    updated_at TIMESTAMP DEFAULT NOW()
);

CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = NOW();
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER update_shared_table_timestamp
    BEFORE UPDATE ON shared_table
    FOR EACH ROW
    EXECUTE FUNCTION update_timestamp();
  1. バージョン番号:
CREATE TABLE shared_table (
    id SERIAL PRIMARY KEY,
    data TEXT,
    version INTEGER DEFAULT 1
);

初期データ同期オプション

オプション1: 自動コピー(デフォルト)

-- サブスクライバーは自動的に既存データをコピー
CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = true);  -- デフォルト

オプション2: 手動初期同期

大規模データセットの場合はpg_dumpを使用:

# パブリッシャーから特定のテーブルをダンプ
pg_dump -h publisher.example.com -U postgres -d source_db \n    -t users -t orders --no-owner --no-acl > initial_data.sql

# サブスクライバーに読み込む
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql

# 初期コピーなしでサブスクリプションを作成
psql -h subscriber.example.com -U postgres -d target_db -c "
    CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=source_db user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = false);
"

オプション3: 並列初期同期

-- 複数のワーカーを使用して高速初期同期
CREATE SUBSCRIPTION fast_sync
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (
        copy_data = true,
        streaming = on,
        synchronous_commit = off
    );

論理レプリケーションの監視

パブリッシャーの監視

-- レプリケーションスロットの表示
SELECT 
    slot_name,
    plugin,
    active,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;

-- アクティブなレプリケーション接続の表示
SELECT 
    pid,
    usename,
    application_name,
    client_addr,
    state,
    sync_state,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;

サブスクライバーの監視

-- サブスクリプションステータスの表示
SELECT 
    subname,
    pid,
    received_lsn,
    latest_end_lsn,
    pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;

-- レプリケーションエラーの確認
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';

監視スクリプト

#!/bin/bash
# logical-replication-monitor.sh

echo "=== Publisher Status ==="
psql -h publisher -d mydb -c "
    SELECT slot_name, active, 
           pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
    FROM pg_replication_slots;"

echo ""
echo "=== Subscriber Status ==="
psql -h subscriber -d mydb -c "
    SELECT subname, pid, 
           pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
    FROM pg_stat_subscription;"

トラブルシューティング

問題1: サブスクリプションがデータを受信しない

サブスクリプションステータスを確認:

SELECT subname, pid, subenabled, subconninfo FROM pg_subscription;

無効になっている場合は有効化:

ALTER SUBSCRIPTION my_sub ENABLE;

エラーを確認:

SELECT * FROM pg_stat_subscription;

問題2: レプリケーションラグが増加している

遅いテーブルを特定:

SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;

ワーカープロセスを増やす:

# サブスクライバーのpostgresql.conf
max_logical_replication_workers = 20
max_worker_processes = 30

問題3: レプリケーションスロットの肥大化

スロット使用状況を確認:

SELECT 
    slot_name,
    pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;

非アクティブなスロットを削除:

SELECT pg_drop_replication_slot('inactive_slot_name');

問題4: 初期同期の失敗

初期同期を再起動:

-- サブスクリプションを削除して再作成
DROP SUBSCRIPTION my_sub;

CREATE SUBSCRIPTION my_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (copy_data = true);

パフォーマンス最適化

1. 適切なレプリカIDを使用する

-- デフォルト: プライマリキーのみ
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;

-- フル: すべての列(オーバーヘッドが高い)
ALTER TABLE my_table REPLICA IDENTITY FULL;

-- インデックス: 特定のユニークインデックスを使用
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;

2. 初期同期中は制約を無効にする

-- 初期読み込みを高速化するためにトリガーを一時的に無効化
ALTER TABLE my_table DISABLE TRIGGER ALL;

-- 初期同期完了後
ALTER TABLE my_table ENABLE TRIGGER ALL;

3. 並列適用(PostgreSQL 16+)

CREATE SUBSCRIPTION parallel_sub
    CONNECTION 'host=publisher dbname=mydb user=repuser'
    PUBLICATION my_pub
    WITH (streaming = parallel);

ベストプラクティス

  1. 常にレプリカIDを使用: テーブルにプライマリキーやユニーク制約があることを確認
  2. レプリケーションラグを監視: 100MBまたは5分以上のラグに対してアラートを設定
  3. 行フィルタを慎重に使用: フィルタはパブリッシャーで評価され、パフォーマンスに影響
  4. 競合に備える: マルチマスターセットアップで競合検出を実装
  5. フェイルオーバー手順をテスト: データベース間の切り替えを練習
  6. 定期的なメンテナンス: 古いレプリケーションスロットをクリーンアップ

まとめ

論理レプリケーションはPostgreSQLに強力なデータ配布機能を提供します。主な利点:

  • 選択的なテーブルと行のレプリケーション
  • バージョン間互換性
  • 双方向マルチマスター設定
  • 詳細なデータプライバシー制御

まずは単純な片方向レプリケーションから始め、徹底的にテストしてから、必要に応じてより複雑なトポロジーに拡張してください。