高级 PostgreSQL 基于角色的访问控制:行级安全性与策略管理

使用PostgreSQL行级安全与RBAC策略实现租户隔离、所有权检查以及更安全的应用程序访问。

高级PostgreSQL基于角色的访问控制:行级安全与策略管理

PostgreSQL行级安全(RLS)允许您的数据库根据当前角色和会话上下文过滤行。如果您的应用程序服务于多个租户,或者混合了管理员、经理和普通用户的访问权限,RLS可以防止一个遗漏的WHERE tenant_id = ...变成数据泄露。

RLS并不能替代应用程序授权。它为您提供了数据库级别的安全网,用于保护最敏感的边界:连接可以读取或写入哪些行。

为什么需要行级安全?

传统方法的问题

应用层过滤:

# 每个查询都必须记住要过滤
query = "SELECT * FROM documents WHERE tenant_id = %s"
results = db.execute(query, [current_user_tenant_id])

问题:

  • 容易忘记过滤条件(安全漏洞)
  • 应用程序中代码重复
  • 无法保护直接数据库访问
  • 复杂的审计要求

行级安全的优势

  • 自动执行:策略透明地应用
  • 集中化规则:安全逻辑集中在一处
  • 多租户隔离:非常适合SaaS应用程序
  • 审计合规:内置安全保证
  • 性能:高效的查询计划

基本RLS设置

步骤1:在表上启用RLS

CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title VARCHAR(200),
    content TEXT,
    owner_id INTEGER NOT NULL,
    department VARCHAR(50),
    is_public BOOLEAN DEFAULT false,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 启用行级安全
ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

步骤2:创建角色

-- 创建不同的用户角色
CREATE ROLE regular_user;
CREATE ROLE department_manager;
CREATE ROLE admin;

-- 授予表访问权限
GRANT SELECT, INSERT, UPDATE, DELETE ON documents TO regular_user;
GRANT SELECT, INSERT, UPDATE, DELETE ON documents TO department_manager;
GRANT ALL ON documents TO admin;

步骤3:创建策略

-- 策略:用户只能看到自己的文档
CREATE POLICY user_own_documents ON documents
    FOR SELECT
    TO regular_user
    USING (owner_id = current_setting('app.user_id')::INTEGER);

-- 策略:用户可以插入自己拥有的文档
CREATE POLICY user_insert_own ON documents
    FOR INSERT
    TO regular_user
    WITH CHECK (owner_id = current_setting('app.user_id')::INTEGER);

-- 策略:用户可以更新自己的文档
CREATE POLICY user_update_own ON documents
    FOR UPDATE
    TO regular_user
    USING (owner_id = current_setting('app.user_id')::INTEGER)
    WITH CHECK (owner_id = current_setting('app.user_id')::INTEGER);

步骤4:设置会话变量

在应用程序代码中:

import psycopg2

# 以角色身份连接
conn = psycopg2.connect(
    host="localhost",
    database="mydb",
    user="regular_user",
    password="password"
)

# 设置用户上下文
cursor = conn.cursor()
cursor.execute("SELECT set_config('app.user_id', %s, false)", [str(current_user_id)])

# 现在查询会自动过滤
cursor.execute("SELECT * FROM documents")
# 只返回owner_id = current_user_id的文档

多租户SaaS应用程序

完整的多租户设置

-- 创建租户表
CREATE TABLE tenants (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 创建用户表
CREATE TABLE users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) NOT NULL,
    tenant_id INTEGER REFERENCES tenants(id),
    role VARCHAR(20) NOT NULL,
    created_at TIMESTAMP DEFAULT NOW()
);

-- 创建应用程序数据表
CREATE TABLE orders (
    id SERIAL PRIMARY KEY,
    tenant_id INTEGER NOT NULL REFERENCES tenants(id),
    customer_name VARCHAR(100),
    amount DECIMAL(10,2),
    status VARCHAR(20),
    created_by INTEGER REFERENCES users(id),
    created_at TIMESTAMP DEFAULT NOW()
);

-- 启用RLS
ALTER TABLE orders ENABLE ROW LEVEL SECURITY;

-- 策略:用户只能看到自己租户的订单
CREATE POLICY tenant_isolation ON orders
    FOR ALL
    TO public
    USING (tenant_id = current_setting('app.tenant_id')::INTEGER)
    WITH CHECK (tenant_id = current_setting('app.tenant_id')::INTEGER);

应用程序集成

Django示例:

from django.db import connection

class TenantMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response

    def __call__(self, request):
        if request.user.is_authenticated:
            with connection.cursor() as cursor:
                cursor.execute(
                    "SELECT set_config('app.tenant_id', %s, false)",
                    [str(request.user.tenant_id)]
                )
        
        response = self.get_response(request)
        return response

Node.js示例:

const { Pool } = require('pg');
const pool = new Pool();

async function setTenantContext(tenantId) {
    const client = await pool.connect();
    try {
        await client.query('SELECT set_config($1, $2, false)', ['app.tenant_id', String(tenantId)]);
        return client;
    } catch (err) {
        client.release();
        throw err;
    }
}

// 使用
app.use(async (req, res, next) => {
    if (req.user) {
        req.dbClient = await setTenantContext(req.user.tenantId);
    }
    next();
});

高级策略模式

1. 分层访问控制

-- 用户可以查看自己部门的文档以及公共文档
CREATE POLICY department_access ON documents
    FOR SELECT
    TO regular_user
    USING (
        department = current_setting('app.user_department', true)
        OR is_public = true
        OR owner_id = current_setting('app.user_id')::INTEGER
    );

2. 基于时间的访问

CREATE TABLE subscriptions (
    id SERIAL PRIMARY KEY,
    user_id INTEGER NOT NULL,
    content TEXT,
    valid_from TIMESTAMP NOT NULL,
    valid_until TIMESTAMP NOT NULL
);

ALTER TABLE subscriptions ENABLE ROW LEVEL SECURITY;

-- 策略:用户只能看到有效的订阅
CREATE POLICY active_subscriptions ON subscriptions
    FOR SELECT
    TO public
    USING (
        user_id = current_setting('app.user_id')::INTEGER
        AND NOW() BETWEEN valid_from AND valid_until
    );

3. 基于角色的访问与复杂逻辑

CREATE TABLE projects (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    owner_id INTEGER,
    team_id INTEGER,
    visibility VARCHAR(20) -- 'private', 'team', 'public'
);

ALTER TABLE projects ENABLE ROW LEVEL SECURITY;

-- 复杂可见性策略
CREATE POLICY project_visibility ON projects
    FOR SELECT
    TO public
    USING (
        CASE current_setting('app.user_role', true)
            WHEN 'admin' THEN true
            WHEN 'manager' THEN (
                team_id = current_setting('app.user_team_id')::INTEGER
                OR visibility = 'public'
            )
            ELSE (
                owner_id = current_setting('app.user_id')::INTEGER
                OR visibility = 'public'
            )
        END
    );

4. 共享访问列表

CREATE TABLE documents (
    id SERIAL PRIMARY KEY,
    title VARCHAR(200),
    owner_id INTEGER NOT NULL
);

CREATE TABLE document_shares (
    document_id INTEGER REFERENCES documents(id),
    shared_with_user_id INTEGER,
    permission VARCHAR(20), -- 'read', 'write'
    PRIMARY KEY (document_id, shared_with_user_id)
);

ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

-- 策略:用户可以查看自己拥有或共享的文档
CREATE POLICY document_access ON documents
    FOR SELECT
    TO public
    USING (
        owner_id = current_setting('app.user_id')::INTEGER
        OR id IN (
            SELECT document_id 
            FROM document_shares 
            WHERE shared_with_user_id = current_setting('app.user_id')::INTEGER
        )
    );

-- 策略:用户可以修改自己拥有或具有写权限的共享文档
CREATE POLICY document_modify ON documents
    FOR UPDATE
    TO public
    USING (
        owner_id = current_setting('app.user_id')::INTEGER
        OR id IN (
            SELECT document_id 
            FROM document_shares 
            WHERE shared_with_user_id = current_setting('app.user_id')::INTEGER
              AND permission = 'write'
        )
    );

管理访问与RLS

表所有者和超级用户可以绕过RLS,除非您强制启用。将管理路径视为一个独立的设计决策,而不是事后才考虑的事情。

选项1:BYPASSRLS角色属性

CREATE ROLE admin_user WITH LOGIN PASSWORD 'secure_password';
ALTER ROLE admin_user BYPASSRLS;

请谨慎使用。BYPASSRLS功能强大,不应授予普通应用程序角色。

选项2:为管理员设置允许策略

-- 为管理员角色创建允许策略
CREATE POLICY admin_all_access ON documents
    FOR ALL
    TO admin
    USING (true)
    WITH CHECK (true);

选项3:为表所有者强制RLS

ALTER TABLE documents FORCE ROW LEVEL SECURITY;

FORCE ROW LEVEL SECURITY使表所有者也必须遵循策略。对于应用程序拥有的表,这通常更安全。对于维护工作,请使用单独受控的角色,而不是在临时脚本中禁用RLS。

策略类型:USING vs WITH CHECK

USING子句

控制哪些现有行可见/可修改:

CREATE POLICY select_own ON documents
    FOR SELECT
    USING (owner_id = current_setting('app.user_id')::INTEGER);

-- 用户只能SELECT owner_id匹配的行

WITH CHECK子句

控制哪些新行/更新行被允许:

CREATE POLICY insert_own ON documents
    FOR INSERT
    WITH CHECK (owner_id = current_setting('app.user_id')::INTEGER);

-- 用户只能INSERT owner_id匹配的行

组合策略

CREATE POLICY update_own ON documents
    FOR UPDATE
    USING (owner_id = current_setting('app.user_id')::INTEGER)
    WITH CHECK (owner_id = current_setting('app.user_id')::INTEGER);

性能考虑

1. 为策略列创建索引

-- 为策略中使用的列创建索引
CREATE INDEX idx_documents_owner ON documents(owner_id);
CREATE INDEX idx_documents_tenant ON documents(tenant_id);
CREATE INDEX idx_documents_department ON documents(department);

2. 分析查询计划

EXPLAIN ANALYZE
SELECT * FROM documents
WHERE title LIKE '%report%';

-- 检查策略过滤是否高效应用

3. 谨慎使用策略中的子查询

子查询有时是共享访问的正确模型,但它们需要索引和测试。一个为每个候选行探测另一个表的策略可能会变得昂贵。

没有索引时风险较大:

CREATE POLICY slow_policy ON documents
    USING (
        id IN (
            SELECT document_id
            FROM shares
            WHERE user_id = current_setting('app.user_id')::INTEGER
        )
    );

通常配合支持索引更清晰:

CREATE POLICY faster_policy ON documents
    USING (
        EXISTS (
            SELECT 1 FROM shares 
            WHERE document_id = documents.id 
              AND user_id = current_setting('app.user_id')::INTEGER
        )
    );

创建诸如shares(user_id, document_id)的索引,并使用EXPLAIN (ANALYZE, BUFFERS)检查计划。

监控与审计

查看活动策略

-- 列出所有RLS策略
SELECT 
    schemaname,
    tablename,
    policyname,
    permissive,
    roles,
    cmd,
    qual,
    with_check
FROM pg_policies
ORDER BY tablename, policyname;

审计策略变更

在迁移和数据库审计日志中跟踪策略DDL。如果您的环境使用事件触发器,请根据您的PostgreSQL版本仔细测试它们,并包括CREATE POLICYALTER POLICYDROP POLICY事件。不要依赖未经测试的触发器片段作为合规证据。

测试策略有效性

-- 创建测试函数
CREATE OR REPLACE FUNCTION test_user_access(
    test_user_id INTEGER,
    test_tenant_id INTEGER
)
RETURNS TABLE(document_id INTEGER, title VARCHAR) AS $$
BEGIN
    -- 设置会话变量
    PERFORM set_config('app.user_id', test_user_id::text, false);
    PERFORM set_config('app.tenant_id', test_tenant_id::text, false);
    
    -- 返回可访问的文档
    RETURN QUERY SELECT id, documents.title FROM documents;
END;
$$ LANGUAGE plpgsql SECURITY INVOKER;

-- 测试访问
SELECT * FROM test_user_access(123, 5);

故障排除

问题1:策略未应用

检查RLS是否启用:

SELECT tablename, rowsecurity 
FROM pg_tables 
WHERE schemaname = 'public' AND tablename = 'documents';

如果禁用则启用:

ALTER TABLE documents ENABLE ROW LEVEL SECURITY;

问题2:会话变量未设置

调试当前设置:

SHOW app.user_id;
SHOW app.tenant_id;

-- 或使用current_setting
SELECT current_setting('app.user_id', true);

问题3:性能下降

识别慢策略:

EXPLAIN (ANALYZE, BUFFERS) 
SELECT * FROM documents WHERE title = 'test';

-- 查找带有策略条件的"Filter"操作

解决方案:简化策略或添加索引

问题4:意外拒绝访问

以特定用户身份测试:

SET ROLE regular_user;
SELECT set_config('app.user_id', '123', false);

SELECT * FROM documents;
-- 显示该用户实际能看到的内容

RESET ROLE;

最佳实践

  1. 始终设置会话变量:确保current_setting()值已填充
  2. 为策略列创建索引:在USING/WITH CHECK中使用的所有列上创建索引
  3. 彻底测试:使用不同角色和场景验证策略
  4. 保持策略简单:复杂逻辑会影响性能
  5. 使用EXPLAIN:分析启用RLS的查询计划
  6. 记录策略:为策略逻辑添加注释以便维护
  7. 分离关注点:为SELECT、INSERT、UPDATE、DELETE使用不同策略
  8. 监控性能:为慢查询设置警报

要点总结

从一个简单的策略开始:对最高风险的表进行租户隔离。使用set_config设置会话上下文,为策略使用的每一列创建索引,并以真实应用程序角色测试读写操作。

一旦基本边界正常工作,仅在产品需要时添加所有权、经理或共享访问策略。小而经过测试的策略比一个试图编码整个权限模型的巧妙规则更容易推理。