高级 PostgreSQL 基于角色的访问控制:行级安全性与策略管理

深入掌握 PostgreSQL 行级安全性 (RLS),实现细粒度访问控制、多租户隔离及基于策略的授权。包含实际应用模式的完整指南。

31 浏览量

高级 PostgreSQL 基于角色的访问控制:行级安全与策略管理

简介

PostgreSQL 行级安全 (Row-Level Security, RLS) 提供了行级别的细粒度访问控制,使您能够直接在数据库中强制执行复杂的授权规则。这消除了对应用层过滤的需求,并确保无论应用程序如何访问数据库,数据安全性都能得到保障。

为什么使用行级安全?

传统方法的缺陷

应用层过滤:

# 每一个查询都必须记得添加过滤条件
query = "SELECT * FROM documents WHERE tenant_id = %s"
results = db.execute(query, [current_user_tenant_id])

问题:
- 容易忘记添加过滤器(产生安全漏洞)
- 代码在整个应用程序中存在重复
- 无法保护直接访问数据库的行为
- 复杂的审计需求

行级安全的优势

  • 自动强制执行:透明地应用策略
  • 集中化规则:安全逻辑统一管理
  • 多租户隔离:完美适用于 SaaS 应用
  • 审计合规性:内置的安全保证
  • 性能:高效的查询规划

基础 RLS 设置

第 1 步:在表上启用 RLS

CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title VARCHAR(200),
    content TEXT,
    owner_id INTEGER NOT NULL,
    department VARCHAR(50),
    is_public BOOLEAN DEFAULT false,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 启用行级安全
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

第 2 步:创建角色

-- 创建不同的用户角色
CREATE ROLE regular_user;
CREATE ROLE department_manager;
CREATE ROLE admin;

-- 授予表访问权限
GRANT SELECT, INSERT, UPDATE, DELETE ON documents TO regular_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON documents TO department_manager;
GRANT ALL ON documents TO admin;

第 3 步:创建策略

-- 策略:用户可以看到自己的文档
CREATE POLICY user_own_documents ON documents
    FOR SELECT
    TO regular_user
    USING (owner_id = current_setting('app.user_id')::INTEGER);

-- 策略:用户可以插入属于自己的文档
CREATE POLICY user_insert_own ON documents
    FOR INSERT
    TO regular_user
    WITH CHECK (owner_id = current_setting('app.user_id')::INTEGER);

-- 策略:用户可以更新自己的文档
CREATE POLICY user_update_own ON documents
    FOR UPDATE
    TO regular_user
    USING (owner_id = current_setting('app.user_id')::INTEGER)
    WITH CHECK (owner_id = current_setting('app.user_id')::INTEGER);

第 4 步:设置会话变量

在应用程序代码中:

import psycopg2

# 以特定角色连接
conn = psycopg2.connect(
    host="localhost",
    database="mydb",
    user="regular_user",
    password="password"
)

# 设置用户上下文
cursor = conn.cursor()
cursor.execute("SET app.user_id = %s", [current_user_id])

# 现在查询会自动被过滤
cursor.execute("SELECT * FROM documents")
# 仅返回 owner_id = current_user_id 的文档

多租户 SaaS 应用程序

完整多租户设置

-- 创建租户表
CREATE TABLE tenants (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 创建用户表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    tenant_id INTEGER REFERENCES tenants(id),
    role VARCHAR(20) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 创建业务数据表
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    tenant_id INTEGER NOT NULL REFERENCES tenants(id),
    customer_name VARCHAR(100),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_by INTEGER REFERENCES users(id),
    created_at TIMESTAMP DEFAULT NOW()
);

-- 启用 RLS
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;

-- 策略:用户只能看到所属租户的订单
CREATE POLICY tenant_isolation ON orders
    FOR ALL
    TO public
    USING (tenant_id = current_setting('app.tenant_id')::INTEGER)
    WITH CHECK (tenant_id = current_setting('app.tenant_id')::INTEGER);

应用程序集成

Django 示例:

from django.db import connection

class TenantMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        if request.user.is_authenticated:
            with connection.cursor() as cursor:
                cursor.execute(
                    "SET app.tenant_id = %s",
                    [request.user.tenant_id]
                )

        response = self.get_response(request)
        return response

Node.js 示例:

const { Pool } = require('pg');
const pool = new Pool();

async function setTenantContext(tenantId) {
    const client = await pool.connect();
    try {
        await client.query('SET app.tenant_id = $1', [tenantId]);
        return client;
    } catch (err) {
        client.release();
        throw err;
    }
}

// 使用方法
app.use(async (req, res, next) => {
    if (req.user) {
        req.dbClient = await setTenantContext(req.user.tenantId);
    }
    next();
});

高级策略模式

1. 层级访问控制

-- 用户可以看到所属部门的文档以及公开文档
CREATE POLICY department_access ON documents
    FOR SELECT
    TO regular_user
    USING (
        department = current_setting('app.user_department', true)
        OR is_public = true
        OR owner_id = current_setting('app.user_id')::INTEGER
    );

2. 基于时间的访问

CREATE TABLE subscriptions (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL,
    content TEXT,
    valid_from TIMESTAMP NOT NULL,
    valid_until TIMESTAMP NOT NULL
);

ALTER TABLE subscriptions ENABLE ROW LEVEL SECURITY;

-- 策略:用户只能看到活跃的订阅
CREATE POLICY active_subscriptions ON subscriptions
    FOR SELECT
    TO public
    USING (
        user_id = current_setting('app.user_id')::INTEGER
        AND NOW() BETWEEN valid_from AND valid_until
    );

3. 带有复杂逻辑的基于角色的访问

CREATE TABLE projects (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    owner_id INTEGER,
    team_id INTEGER,
    visibility VARCHAR(20) -- 'private', 'team', 'public'
);

ALTER TABLE projects ENABLE ROW LEVEL SECURITY;

-- 复杂的可见性策略
CREATE POLICY project_visibility ON projects
    FOR SELECT
    TO public
    USING (
        CASE current_setting('app.user_role', true)
            WHEN 'admin' THEN true
            WHEN 'manager' THEN (
                team_id = current_setting('app.user_team_id')::INTEGER
                OR visibility = 'public'
            )
            ELSE (
                owner_id = current_setting('app.user_id')::INTEGER
                OR visibility = 'public'
            )
        END
    );

4. 共享访问列表

CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title VARCHAR(200),
    owner_id INTEGER NOT NULL
);

CREATE TABLE document_shares (
    document_id INTEGER REFERENCES documents(id),
    shared_with_user_id INTEGER,
    permission VARCHAR(20), -- 'read', 'write'
    PRIMARY KEY (document_id, shared_with_user_id)
);

ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- 策略:用户可以看到拥有的或共享给自己的文档
CREATE POLICY document_access ON documents
    FOR SELECT
    TO public
    USING (
        owner_id = current_setting('app.user_id')::INTEGER
        OR id IN (
            SELECT document_id 
            FROM document_shares 
            WHERE shared_with_user_id = current_setting('app.user_id')::INTEGER
        )
    );

-- 策略:用户可以修改拥有的或被授予写入权限的共享文档
CREATE POLICY document_modify ON documents
    FOR UPDATE
    TO public
    USING (
        owner_id = current_setting('app.user_id')::INTEGER
        OR id IN (
            SELECT document_id 
            FROM document_shares 
            WHERE shared_with_user_id = current_setting('app.user_id')::INTEGER
              AND permission = 'write'
        )
    );

为管理任务绕过 RLS

选项 1:BYPASSRLS 角色属性

-- 授予类似超级用户的访问权限
CREATE ROLE admin_user WITH LOGIN PASSWORD 'secure_password';
ALTER ROLE admin_user BYPASSRLS;

-- 管理员可以无视策略看到所有行

选项 2:为管理员创建宽松策略

-- 为 admin 角色创建宽松策略
CREATE POLICY admin_all_access ON documents
    FOR ALL
    TO admin
    USING (true)
    WITH CHECK (true);

选项 3:临时禁用 RLS

-- 用于维护脚本
BEGIN;
    ALTER TABLE documents DISABLE ROW LEVEL SECURITY;
    -- 执行管理操作
    UPDATE documents SET archived = true WHERE created_at < NOW() - INTERVAL '1 year';
    ALTER TABLE documents ENABLE ROW LEVEL SECURITY;
COMMIT;

策略类型:USING 与 WITH CHECK

USING 子句

控制哪些现有的行是可见或可修改的:

CREATE POLICY select_own ON documents
    FOR SELECT
    USING (owner_id = current_user_id());

-- 用户只能 SELECT owner_id 匹配的行

WITH CHECK 子句

控制允许哪些新增或更新的行:

CREATE POLICY insert_own ON documents
    FOR INSERT
    WITH CHECK (owner_id = current_user_id());

-- 用户只能 INSERT owner_id 匹配的行

组合策略

CREATE POLICY update_own ON documents
    FOR UPDATE
    USING (owner_id = current_user_id())       -- 只能更新自己的行
    WITH CHECK (owner_id = current_user_id()); -- 且不能更改所有权

性能注意事项

1. 为策略列创建索引

-- 为策略中使用的列创建索引
CREATE INDEX idx_documents_owner ON documents(owner_id);
CREATE INDEX idx_documents_tenant ON documents(tenant_id);
CREATE INDEX idx_documents_department ON documents(department);

2. 分析查询计划

EXPLAIN ANALYZE
SELECT * FROM documents
WHERE title LIKE '%%report%%';

-- 检查策略过滤器是否被高效应用

3. 避免在策略中使用子查询

不好 - 每一行都会执行子查询:

CREATE POLICY slow_policy ON documents
    USING (
        id IN (SELECT document_id FROM shares WHERE user_id = current_user_id())
    );

较好 - 使用 EXISTS:

CREATE POLICY faster_policy ON documents
    USING (
        EXISTS (
            SELECT 1 FROM shares 
            WHERE document_id = documents.id 
              AND user_id = current_user_id()
        )
    );

最好 - 在应用程序中使用 JOIN 或使用物化视图:

CREATE MATERIALIZED VIEW user_accessible_documents AS
SELECT d.*, s.user_id AS shared_with
FROM documents d
LEFT JOIN shares s ON d.id = s.document_id;

CREATE INDEX ON user_accessible_documents(shared_with);

监控与审计

查看活动策略

-- 列出所有 RLS 策略
SELECT 
    schemaname,
    tablename,
    policyname,
    permissive,
    roles,
    cmd,
    qual,
    with_check
FROM pg_policies
ORDER BY tablename, policyname;

审计策略变更

-- 为策略变更创建审计日志
CREATE TABLE policy_audit_log (
    id SERIAL PRIMARY KEY,
    table_name VARCHAR(100),
    policy_name VARCHAR(100),
    action VARCHAR(20),
    changed_by VARCHAR(50),
    changed_at TIMESTAMP DEFAULT NOW()
);

-- 用于记录变更的触发器函数
CREATE OR REPLACE FUNCTION log_policy_changes()
RETURNS event_trigger AS $$
BEGIN
    INSERT INTO policy_audit_log (table_name, policy_name, action, changed_by)
    SELECT 
        objid::regclass::text,
        object_identity,
        'CREATED',
        current_user
    FROM pg_event_trigger_ddl_commands()
    WHERE command_tag = 'CREATE POLICY';
END;
$$ LANGUAGE plpgsql;

CREATE EVENT TRIGGER log_policy_create
    ON ddl_command_end
    WHEN TAG IN ('CREATE POLICY')
    EXECUTE FUNCTION log_policy_changes();

测试策略有效性

-- 创建测试函数
CREATE OR REPLACE FUNCTION test_user_access(
    test_user_id INTEGER,
    test_tenant_id INTEGER
)
RETURNS TABLE(document_id INTEGER, title VARCHAR) AS $$
BEGIN
    -- 设置会话变量
    PERFORM set_config('app.user_id', test_user_id::text, true);
    PERFORM set_config('app.tenant_id', test_tenant_id::text, true);

    -- 返回可访问的文档
    RETURN QUERY SELECT id, documents.title FROM documents;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 测试访问
SELECT * FROM test_user_access(123, 5);

故障排除

问题 1:策略未应用

检查 RLS 是否启用:

SELECT tablename, rowsecurity 
FROM pg_tables 
WHERE schemaname = 'public' AND tablename = 'documents';

如果未启用则启用它:

ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

问题 2:会话变量未设置

调试当前设置:

SHOW app.user_id;
SHOW app.tenant_id;

-- 或使用 current_setting
SELECT current_setting('app.user_id', true);

问题 3:性能下降

识别慢速策略:

EXPLAIN (ANALYZE, BUFFERS) 
SELECT * FROM documents WHERE title = 'test';

-- 查找带有策略条件的 "Filter" 操作

解决方案:简化策略或增加索引

问题 4:意外的访问拒绝

以特定用户身份测试:

SET ROLE regular_user;
SET app.user_id = '123';

SELECT * FROM documents;
-- 显示该用户实际能看到的内容

RESET ROLE;

最佳实践

  1. 始终设置会话变量:确保 current_setting() 的值已填充
  2. 为策略列创建索引:在 USING/WITH CHECK 中使用的所有列上创建索引
  3. 彻底测试:使用不同的角色和场景验证策略
  4. 保持策略简单:复杂的逻辑会影响性能
  5. 使用 EXPLAIN:分析启用 RLS 后的查询计划
  6. 记录策略文档:为策略逻辑添加注释以便于维护
  7. 职责分离:为 SELECT, INSERT, UPDATE, DELETE 使用不同的策略
  8. 监控性能:为慢查询设置警报

结论

PostgreSQL 行级安全提供了强大的、由数据库强制执行的访问控制:

  • 自动过滤:透明地应用安全规则
  • 多租户隔离:SaaS 应用程序的完美选择
  • 灵活的策略:支持复杂的授权规则
  • 集中化安全:逻辑存在于数据库中,而非分散在代码中

建议从简单的租户隔离开始,测试性能影响,然后根据需要扩展到更复杂的授权模式。