高级 PostgreSQL 基于角色的访问控制:行级安全与策略管理
简介
PostgreSQL 行级安全 (Row-Level Security, RLS) 提供了行级别的细粒度访问控制,使您能够直接在数据库中强制执行复杂的授权规则。这消除了对应用层过滤的需求,并确保无论应用程序如何访问数据库,数据安全性都能得到保障。
为什么使用行级安全?
传统方法的缺陷
应用层过滤:
# 每一个查询都必须记得添加过滤条件
query = "SELECT * FROM documents WHERE tenant_id = %s"
results = db.execute(query, [current_user_tenant_id])
问题:
- 容易忘记添加过滤器(产生安全漏洞)
- 代码在整个应用程序中存在重复
- 无法保护直接访问数据库的行为
- 复杂的审计需求
行级安全的优势
- 自动强制执行:透明地应用策略
- 集中化规则:安全逻辑统一管理
- 多租户隔离:完美适用于 SaaS 应用
- 审计合规性:内置的安全保证
- 性能:高效的查询规划
基础 RLS 设置
第 1 步:在表上启用 RLS
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
title VARCHAR(200),
content TEXT,
owner_id INTEGER NOT NULL,
department VARCHAR(50),
is_public BOOLEAN DEFAULT false,
created_at TIMESTAMP DEFAULT NOW()
);
-- 启用行级安全
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
第 2 步:创建角色
-- 创建不同的用户角色
CREATE ROLE regular_user;
CREATE ROLE department_manager;
CREATE ROLE admin;
-- 授予表访问权限
GRANT SELECT, INSERT, UPDATE, DELETE ON documents TO regular_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON documents TO department_manager;
GRANT ALL ON documents TO admin;
第 3 步:创建策略
-- 策略:用户可以看到自己的文档
CREATE POLICY user_own_documents ON documents
FOR SELECT
TO regular_user
USING (owner_id = current_setting('app.user_id')::INTEGER);
-- 策略:用户可以插入属于自己的文档
CREATE POLICY user_insert_own ON documents
FOR INSERT
TO regular_user
WITH CHECK (owner_id = current_setting('app.user_id')::INTEGER);
-- 策略:用户可以更新自己的文档
CREATE POLICY user_update_own ON documents
FOR UPDATE
TO regular_user
USING (owner_id = current_setting('app.user_id')::INTEGER)
WITH CHECK (owner_id = current_setting('app.user_id')::INTEGER);
第 4 步:设置会话变量
在应用程序代码中:
import psycopg2
# 以特定角色连接
conn = psycopg2.connect(
host="localhost",
database="mydb",
user="regular_user",
password="password"
)
# 设置用户上下文
cursor = conn.cursor()
cursor.execute("SET app.user_id = %s", [current_user_id])
# 现在查询会自动被过滤
cursor.execute("SELECT * FROM documents")
# 仅返回 owner_id = current_user_id 的文档
多租户 SaaS 应用程序
完整多租户设置
-- 创建租户表
CREATE TABLE tenants (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
-- 创建用户表
CREATE TABLE users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) NOT NULL,
tenant_id INTEGER REFERENCES tenants(id),
role VARCHAR(20) NOT NULL,
created_at TIMESTAMP DEFAULT NOW()
);
-- 创建业务数据表
CREATE TABLE orders (
id SERIAL PRIMARY KEY,
tenant_id INTEGER NOT NULL REFERENCES tenants(id),
customer_name VARCHAR(100),
amount DECIMAL(10,2),
status VARCHAR(20),
created_by INTEGER REFERENCES users(id),
created_at TIMESTAMP DEFAULT NOW()
);
-- 启用 RLS
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;
-- 策略:用户只能看到所属租户的订单
CREATE POLICY tenant_isolation ON orders
FOR ALL
TO public
USING (tenant_id = current_setting('app.tenant_id')::INTEGER)
WITH CHECK (tenant_id = current_setting('app.tenant_id')::INTEGER);
应用程序集成
Django 示例:
from django.db import connection
class TenantMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
if request.user.is_authenticated:
with connection.cursor() as cursor:
cursor.execute(
"SET app.tenant_id = %s",
[request.user.tenant_id]
)
response = self.get_response(request)
return response
Node.js 示例:
const { Pool } = require('pg');
const pool = new Pool();
async function setTenantContext(tenantId) {
const client = await pool.connect();
try {
await client.query('SET app.tenant_id = $1', [tenantId]);
return client;
} catch (err) {
client.release();
throw err;
}
}
// 使用方法
app.use(async (req, res, next) => {
if (req.user) {
req.dbClient = await setTenantContext(req.user.tenantId);
}
next();
});
高级策略模式
1. 层级访问控制
-- 用户可以看到所属部门的文档以及公开文档
CREATE POLICY department_access ON documents
FOR SELECT
TO regular_user
USING (
department = current_setting('app.user_department', true)
OR is_public = true
OR owner_id = current_setting('app.user_id')::INTEGER
);
2. 基于时间的访问
CREATE TABLE subscriptions (
id SERIAL PRIMARY KEY,
user_id INTEGER NOT NULL,
content TEXT,
valid_from TIMESTAMP NOT NULL,
valid_until TIMESTAMP NOT NULL
);
ALTER TABLE subscriptions ENABLE ROW LEVEL SECURITY;
-- 策略:用户只能看到活跃的订阅
CREATE POLICY active_subscriptions ON subscriptions
FOR SELECT
TO public
USING (
user_id = current_setting('app.user_id')::INTEGER
AND NOW() BETWEEN valid_from AND valid_until
);
3. 带有复杂逻辑的基于角色的访问
CREATE TABLE projects (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
owner_id INTEGER,
team_id INTEGER,
visibility VARCHAR(20) -- 'private', 'team', 'public'
);
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
-- 复杂的可见性策略
CREATE POLICY project_visibility ON projects
FOR SELECT
TO public
USING (
CASE current_setting('app.user_role', true)
WHEN 'admin' THEN true
WHEN 'manager' THEN (
team_id = current_setting('app.user_team_id')::INTEGER
OR visibility = 'public'
)
ELSE (
owner_id = current_setting('app.user_id')::INTEGER
OR visibility = 'public'
)
END
);
4. 共享访问列表
CREATE TABLE documents (
id SERIAL PRIMARY KEY,
title VARCHAR(200),
owner_id INTEGER NOT NULL
);
CREATE TABLE document_shares (
document_id INTEGER REFERENCES documents(id),
shared_with_user_id INTEGER,
permission VARCHAR(20), -- 'read', 'write'
PRIMARY KEY (document_id, shared_with_user_id)
);
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
-- 策略:用户可以看到拥有的或共享给自己的文档
CREATE POLICY document_access ON documents
FOR SELECT
TO public
USING (
owner_id = current_setting('app.user_id')::INTEGER
OR id IN (
SELECT document_id
FROM document_shares
WHERE shared_with_user_id = current_setting('app.user_id')::INTEGER
)
);
-- 策略:用户可以修改拥有的或被授予写入权限的共享文档
CREATE POLICY document_modify ON documents
FOR UPDATE
TO public
USING (
owner_id = current_setting('app.user_id')::INTEGER
OR id IN (
SELECT document_id
FROM document_shares
WHERE shared_with_user_id = current_setting('app.user_id')::INTEGER
AND permission = 'write'
)
);
为管理任务绕过 RLS
选项 1:BYPASSRLS 角色属性
-- 授予类似超级用户的访问权限
CREATE ROLE admin_user WITH LOGIN PASSWORD 'secure_password';
ALTER ROLE admin_user BYPASSRLS;
-- 管理员可以无视策略看到所有行
选项 2:为管理员创建宽松策略
-- 为 admin 角色创建宽松策略
CREATE POLICY admin_all_access ON documents
FOR ALL
TO admin
USING (true)
WITH CHECK (true);
选项 3:临时禁用 RLS
-- 用于维护脚本
BEGIN;
ALTER TABLE documents DISABLE ROW LEVEL SECURITY;
-- 执行管理操作
UPDATE documents SET archived = true WHERE created_at < NOW() - INTERVAL '1 year';
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
COMMIT;
策略类型:USING 与 WITH CHECK
USING 子句
控制哪些现有的行是可见或可修改的:
CREATE POLICY select_own ON documents
FOR SELECT
USING (owner_id = current_user_id());
-- 用户只能 SELECT owner_id 匹配的行
WITH CHECK 子句
控制允许哪些新增或更新的行:
CREATE POLICY insert_own ON documents
FOR INSERT
WITH CHECK (owner_id = current_user_id());
-- 用户只能 INSERT owner_id 匹配的行
组合策略
CREATE POLICY update_own ON documents
FOR UPDATE
USING (owner_id = current_user_id()) -- 只能更新自己的行
WITH CHECK (owner_id = current_user_id()); -- 且不能更改所有权
性能注意事项
1. 为策略列创建索引
-- 为策略中使用的列创建索引
CREATE INDEX idx_documents_owner ON documents(owner_id);
CREATE INDEX idx_documents_tenant ON documents(tenant_id);
CREATE INDEX idx_documents_department ON documents(department);
2. 分析查询计划
EXPLAIN ANALYZE
SELECT * FROM documents
WHERE title LIKE '%%report%%';
-- 检查策略过滤器是否被高效应用
3. 避免在策略中使用子查询
不好 - 每一行都会执行子查询:
CREATE POLICY slow_policy ON documents
USING (
id IN (SELECT document_id FROM shares WHERE user_id = current_user_id())
);
较好 - 使用 EXISTS:
CREATE POLICY faster_policy ON documents
USING (
EXISTS (
SELECT 1 FROM shares
WHERE document_id = documents.id
AND user_id = current_user_id()
)
);
最好 - 在应用程序中使用 JOIN 或使用物化视图:
CREATE MATERIALIZED VIEW user_accessible_documents AS
SELECT d.*, s.user_id AS shared_with
FROM documents d
LEFT JOIN shares s ON d.id = s.document_id;
CREATE INDEX ON user_accessible_documents(shared_with);
监控与审计
查看活动策略
-- 列出所有 RLS 策略
SELECT
schemaname,
tablename,
policyname,
permissive,
roles,
cmd,
qual,
with_check
FROM pg_policies
ORDER BY tablename, policyname;
审计策略变更
-- 为策略变更创建审计日志
CREATE TABLE policy_audit_log (
id SERIAL PRIMARY KEY,
table_name VARCHAR(100),
policy_name VARCHAR(100),
action VARCHAR(20),
changed_by VARCHAR(50),
changed_at TIMESTAMP DEFAULT NOW()
);
-- 用于记录变更的触发器函数
CREATE OR REPLACE FUNCTION log_policy_changes()
RETURNS event_trigger AS $$
BEGIN
INSERT INTO policy_audit_log (table_name, policy_name, action, changed_by)
SELECT
objid::regclass::text,
object_identity,
'CREATED',
current_user
FROM pg_event_trigger_ddl_commands()
WHERE command_tag = 'CREATE POLICY';
END;
$$ LANGUAGE plpgsql;
CREATE EVENT TRIGGER log_policy_create
ON ddl_command_end
WHEN TAG IN ('CREATE POLICY')
EXECUTE FUNCTION log_policy_changes();
测试策略有效性
-- 创建测试函数
CREATE OR REPLACE FUNCTION test_user_access(
test_user_id INTEGER,
test_tenant_id INTEGER
)
RETURNS TABLE(document_id INTEGER, title VARCHAR) AS $$
BEGIN
-- 设置会话变量
PERFORM set_config('app.user_id', test_user_id::text, true);
PERFORM set_config('app.tenant_id', test_tenant_id::text, true);
-- 返回可访问的文档
RETURN QUERY SELECT id, documents.title FROM documents;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- 测试访问
SELECT * FROM test_user_access(123, 5);
故障排除
问题 1:策略未应用
检查 RLS 是否启用:
SELECT tablename, rowsecurity
FROM pg_tables
WHERE schemaname = 'public' AND tablename = 'documents';
如果未启用则启用它:
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
问题 2:会话变量未设置
调试当前设置:
SHOW app.user_id;
SHOW app.tenant_id;
-- 或使用 current_setting
SELECT current_setting('app.user_id', true);
问题 3:性能下降
识别慢速策略:
EXPLAIN (ANALYZE, BUFFERS)
SELECT * FROM documents WHERE title = 'test';
-- 查找带有策略条件的 "Filter" 操作
解决方案:简化策略或增加索引
问题 4:意外的访问拒绝
以特定用户身份测试:
SET ROLE regular_user;
SET app.user_id = '123';
SELECT * FROM documents;
-- 显示该用户实际能看到的内容
RESET ROLE;
最佳实践
- 始终设置会话变量:确保
current_setting()的值已填充 - 为策略列创建索引:在 USING/WITH CHECK 中使用的所有列上创建索引
- 彻底测试:使用不同的角色和场景验证策略
- 保持策略简单:复杂的逻辑会影响性能
- 使用 EXPLAIN:分析启用 RLS 后的查询计划
- 记录策略文档:为策略逻辑添加注释以便于维护
- 职责分离:为 SELECT, INSERT, UPDATE, DELETE 使用不同的策略
- 监控性能:为慢查询设置警报
结论
PostgreSQL 行级安全提供了强大的、由数据库强制执行的访问控制:
- 自动过滤:透明地应用安全规则
- 多租户隔离:SaaS 应用程序的完美选择
- 灵活的策略:支持复杂的授权规则
- 集中化安全:逻辑存在于数据库中,而非分散在代码中
建议从简单的租户隔离开始,测试性能影响,然后根据需要扩展到更复杂的授权模式。