PostgreSQLでの論理レプリケーション実装:マルチマスターと選択的なデータ同期
はじめに
PostgreSQLの論理レプリケーションにより、データベース間で特定のテーブル、行、さらには列を複製し、高度なデータ配布パターンを実現できます。データベース全体のクラスターをコピーする物理ストリーミングレプリケーションとは異なり、論理レプリケーションは、どこに、どのデータをレプリケーションするかを細かく制御できます。
論理レプリケーションと物理レプリケーション
物理ストリーミングレプリケーション
- データベース全体のクラスターをレプリケーション
- バイナリレベルのレプリケーション
- 読み取り専用のレプリカ
- 同じPostgreSQLバージョンが必要
- オーバーヘッドが低い
論理レプリケーション
- 選択的なテーブル/行のレプリケーション
- バージョン間互換性
- 書き込み可能なサブスクライバー
- オーヘッドが高い
- 柔軟なデータ配布
論理レプリケーションの使用例
- 選択的なデータ配布: 特定のテーブルを異なる地域にレプリケーション
- マルチマスターセットアップ: 複数の書き込み可能データベースと双方向同期
- バージョン間アップグレード: 古いバージョンから新しいPostgreSQLバージョンへレプリケーション
- データ集約: 複数のソースからデータを統合
- GDPR対応: 機密でない列のみをレプリケーション
前提条件と設定
設定要件
パブリッシャー(送信元):
# postgresql.conf
wal_level = logical
max_replication_slots = 10
max_wal_senders = 10
サブスクライバー(送信先):
# postgresql.conf
max_replication_slots = 10
max_logical_replication_workers = 10
max_worker_processes = 16
PostgreSQLの再起動
sudo systemctl restart postgresql
ネットワーク設定
データベース間の通信を確認:
# サブスクライバーからパブリッシャーへの接続テスト
psql -h publisher.example.com -U replication_user -d source_db
パブリッシャーでpg_hba.confを設定:
# レプリケーション接続を許可
host all replication_user subscriber_ip/32 md5
基本的な論理レプリケーション設定
ステップ1: レプリケーションユーザーの作成
パブリッシャーで:
CREATE ROLE replication_user WITH REPLICATION LOGIN PASSWORD 'secure_password';
GRANT SELECT ON ALL TABLES IN SCHEMA public TO replication_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO replication_user;
ステップ2: ソーステーブルの作成
パブリッシャーで:
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
INSERT INTO users (username, email) VALUES
('alice', '[email protected]'),
('bob', '[email protected]');
ステップ3: パブリケーションの作成
パブリッシャーで:
-- すべてのテーブルをパブリッシュ
CREATE PUBLICATION my_publication FOR ALL TABLES;
-- または特定のテーブルをパブリッシュ
CREATE PUBLICATION my_publication FOR TABLE users, orders;
-- または行フィルタ付き(PostgreSQL 15+)
CREATE PUBLICATION active_users FOR TABLE users WHERE (status = 'active');
パブリケーションの表示:
SELECT * FROM pg_publication;
SELECT * FROM pg_publication_tables;
ステップ4: レプリカテーブルの作成
サブスクライバーで:
-- テーブルは同一の構造である必要がある
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
email VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id),
amount DECIMAL(10,2),
status VARCHAR(20),
created_at TIMESTAMP DEFAULT NOW()
);
ステップ5: サブスクリプションの作成
サブスクライバーで:
CREATE SUBSCRIPTION my_subscription
CONNECTION 'host=publisher.example.com port=5432 dbname=source_db user=replication_user password=secure_password'
PUBLICATION my_publication;
ステップ6: レプリケーションの検証
パブリッシャーで:
SELECT * FROM pg_stat_replication;
SELECT * FROM pg_replication_slots;
サブスクライバーで:
SELECT * FROM pg_stat_subscription;
SELECT * FROM users; -- レプリケーションされたデータが表示される
高度な設定
列レベルのレプリケーション(PostgreSQL 15+)
特定の列のみをレプリケーション:
-- パブリッシャーで: 機密でない列のみをレプリケーション
CREATE TABLE customers (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100),
credit_card VARCHAR(20), -- レプリケーションされない
created_at TIMESTAMP
);
CREATE PUBLICATION customer_basic
FOR TABLE customers (id, name, email, created_at);
行フィルタリング
アクティブなレコードのみをレプリケーション:
CREATE PUBLICATION active_data
FOR TABLE orders WHERE (status IN ('pending', 'processing'));
地域データ配布:
CREATE PUBLICATION us_customers
FOR TABLE customers WHERE (country = 'US');
CREATE PUBLICATION eu_customers
FOR TABLE customers WHERE (country IN ('UK', 'DE', 'FR'));
複数のパブリケーション
-- パブリッシャー: 複数のパブリケーションを作成
CREATE PUBLICATION oltp_data FOR TABLE users, orders;
CREATE PUBLICATION analytics_data FOR TABLE logs, metrics;
-- サブスクライバー: 複数のパブリケーションを購読
CREATE SUBSCRIPTION multi_sub
CONNECTION 'host=publisher port=5432 dbname=mydb user=repuser password=pass'
PUBLICATION oltp_data, analytics_data;
双方向レプリケーション(マルチマスター)
2方向同期の設定
データベースAの設定:
-- パブリケーション作成
CREATE PUBLICATION db_a_pub FOR TABLE shared_table;
-- データベースBを購読
CREATE SUBSCRIPTION db_a_sub
CONNECTION 'host=db-b.example.com dbname=mydb user=repuser'
PUBLICATION db_b_pub
WITH (origin = none); -- レプリケーションループを防止
データベースBの設定:
-- パブリケーション作成
CREATE PUBLICATION db_b_pub FOR TABLE shared_table;
-- データベースAを購読
CREATE SUBSCRIPTION db_b_sub
CONNECTION 'host=db-a.example.com dbname=mydb user=repuser'
PUBLICATION db_a_pub
WITH (origin = none);
競合解決
論理レプリケーションはデフォルトで「最後の書き込みが勝つ」方式を使用します:
-- レプリカIDを設定して競合を追跡
ALTER TABLE shared_table REPLICA IDENTITY FULL;
競合検出戦略:
- タイムスタンププベース: updated_atカラムを追加
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
updated_at TIMESTAMP DEFAULT NOW()
);
CREATE OR REPLACE FUNCTION update_timestamp()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = NOW();
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER update_shared_table_timestamp
BEFORE UPDATE ON shared_table
FOR EACH ROW
EXECUTE FUNCTION update_timestamp();
- バージョン番号:
CREATE TABLE shared_table (
id SERIAL PRIMARY KEY,
data TEXT,
version INTEGER DEFAULT 1
);
初期データ同期オプション
オプション1: 自動コピー(デフォルト)
-- サブスクライバーは自動的に既存データをコピー
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true); -- デフォルト
オプション2: 手動初期同期
大規模データセットの場合はpg_dumpを使用:
# パブリッシャーから特定のテーブルをダンプ
pg_dump -h publisher.example.com -U postgres -d source_db \n -t users -t orders --no-owner --no-acl > initial_data.sql
# サブスクライバーに読み込む
psql -h subscriber.example.com -U postgres -d target_db < initial_data.sql
# 初期コピーなしでサブスクリプションを作成
psql -h subscriber.example.com -U postgres -d target_db -c "
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=source_db user=repuser'
PUBLICATION my_pub
WITH (copy_data = false);
"
オプション3: 並列初期同期
-- 複数のワーカーを使用して高速初期同期
CREATE SUBSCRIPTION fast_sync
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (
copy_data = true,
streaming = on,
synchronous_commit = off
);
論理レプリケーションの監視
パブリッシャーの監視
-- レプリケーションスロットの表示
SELECT
slot_name,
plugin,
active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag_size
FROM pg_replication_slots;
-- アクティブなレプリケーション接続の表示
SELECT
pid,
usename,
application_name,
client_addr,
state,
sync_state,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), sent_lsn)) AS send_lag
FROM pg_stat_replication;
サブスクライバーの監視
-- サブスクリプションステータスの表示
SELECT
subname,
pid,
received_lsn,
latest_end_lsn,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;
-- レプリケーションエラーの確認
SELECT * FROM pg_stat_subscription WHERE last_msg_receipt_time < NOW() - INTERVAL '5 minutes';
監視スクリプト
#!/bin/bash
# logical-replication-monitor.sh
echo "=== Publisher Status ==="
psql -h publisher -d mydb -c "
SELECT slot_name, active,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS lag
FROM pg_replication_slots;"
echo ""
echo "=== Subscriber Status ==="
psql -h subscriber -d mydb -c "
SELECT subname, pid,
pg_size_pretty(pg_wal_lsn_diff(latest_end_lsn, received_lsn)) AS lag
FROM pg_stat_subscription;"
トラブルシューティング
問題1: サブスクリプションがデータを受信しない
サブスクリプションステータスを確認:
SELECT subname, pid, subenabled, subconninfo FROM pg_subscription;
無効になっている場合は有効化:
ALTER SUBSCRIPTION my_sub ENABLE;
エラーを確認:
SELECT * FROM pg_stat_subscription;
問題2: レプリケーションラグが増加している
遅いテーブルを特定:
SELECT schemaname, tablename, n_tup_ins, n_tup_upd, n_tup_del
FROM pg_stat_user_tables
ORDER BY (n_tup_ins + n_tup_upd + n_tup_del) DESC;
ワーカープロセスを増やす:
# サブスクライバーのpostgresql.conf
max_logical_replication_workers = 20
max_worker_processes = 30
問題3: レプリケーションスロットの肥大化
スロット使用状況を確認:
SELECT
slot_name,
pg_size_pretty(pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn)) AS retained_wal
FROM pg_replication_slots;
非アクティブなスロットを削除:
SELECT pg_drop_replication_slot('inactive_slot_name');
問題4: 初期同期の失敗
初期同期を再起動:
-- サブスクリプションを削除して再作成
DROP SUBSCRIPTION my_sub;
CREATE SUBSCRIPTION my_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (copy_data = true);
パフォーマンス最適化
1. 適切なレプリカIDを使用する
-- デフォルト: プライマリキーのみ
ALTER TABLE my_table REPLICA IDENTITY DEFAULT;
-- フル: すべての列(オーバーヘッドが高い)
ALTER TABLE my_table REPLICA IDENTITY FULL;
-- インデックス: 特定のユニークインデックスを使用
CREATE UNIQUE INDEX replica_idx ON my_table(col1, col2);
ALTER TABLE my_table REPLICA IDENTITY USING INDEX replica_idx;
2. 初期同期中は制約を無効にする
-- 初期読み込みを高速化するためにトリガーを一時的に無効化
ALTER TABLE my_table DISABLE TRIGGER ALL;
-- 初期同期完了後
ALTER TABLE my_table ENABLE TRIGGER ALL;
3. 並列適用(PostgreSQL 16+)
CREATE SUBSCRIPTION parallel_sub
CONNECTION 'host=publisher dbname=mydb user=repuser'
PUBLICATION my_pub
WITH (streaming = parallel);
ベストプラクティス
- 常にレプリカIDを使用: テーブルにプライマリキーやユニーク制約があることを確認
- レプリケーションラグを監視: 100MBまたは5分以上のラグに対してアラートを設定
- 行フィルタを慎重に使用: フィルタはパブリッシャーで評価され、パフォーマンスに影響
- 競合に備える: マルチマスターセットアップで競合検出を実装
- フェイルオーバー手順をテスト: データベース間の切り替えを練習
- 定期的なメンテナンス: 古いレプリケーションスロットをクリーンアップ
まとめ
論理レプリケーションはPostgreSQLに強力なデータ配布機能を提供します。主な利点:
- 選択的なテーブルと行のレプリケーション
- バージョン間互換性
- 双方向マルチマスター設定
- 詳細なデータプライバシー制御
まずは単純な片方向レプリケーションから始め、徹底的にテストしてから、必要に応じてより複雑なトポロジーに拡張してください。