Rego 策略语言语法

Rego 是 OPA 的策略语言,基于 Datalog 扩展而来。如果你是第一次看到 Rego 代码,可能会觉得它像伪代码又像 Python——确实,Rego 的设计目标之一就是让策略既可读又可执行。

但 Rego 的强大之处在于它的声明式特性:你只需要描述「什么是正确的」,而不需要描述「如何计算正确」

一、Rego 基础

1.1 Hello World

最简单的策略)
package example

# 默认拒绝
default allow = false

# 如果用户是管理员,则允许
allow {
    input.user.role == "admin"
}

1.2 核心概念

概念说明
package命名空间,类似 Java 的 package
import导入外部数据
rule规则,定义策略逻辑
default默认值
every集合遍历

1.3 包声明

package app.authz

package kubernetes.admission

package terraform.sentinel.v2

二、规则结构

2.1 规则语法

<variable> {
    <body>
}

等价于:

<variable> {
    <condition1>
    <condition2>
    ...
}

2.2 简单规则

# 允许管理员访问
allow {
    input.user.role == "admin"
}

2.3 多条件规则

# 允许在工作时间访问机密文档
allow {
    input.user.clearance_level >= 3
    input.resource.classification == "confidential"
    input.environment.hour >= 9
    input.environment.hour <= 18
}

2.4 带赋值的规则

# 如果用户属于工程部门,则允许访问工程文档
allow {
    team := input.user.department
    doc_department := input.resource.department
    
    team == doc_department
}

三、集合操作

3.1 集合推导

# 提取所有活跃用户
active_users := [user | data.users[user_id].status == "active"; user := data.users[user_id]]
# 提取所有机密文档
confidential_docs := [doc |
    data.documents[doc_id]
    doc := data.documents[doc_id]
    doc.classification in ["confidential", "secret", "top_secret"]
]

3.2 集合 Membership

# 检查用户是否在允许列表中
allow {
    input.user.email in data.config.allowed_users
}

3.3 Some 声明

# 遍历并检查
allow {
    some i
    data.roles[i] == "admin"
    input.user.id == data.user_roles[i].user_id
}

3.4 Every 遍历

# 所有资源标签必须存在
valid_labels {
    every key := input.required_labels[_] {
        input.resource.labels[key]
    }
}

四、复合条件

4.1 否定

# 非管理员不能访问
deny[msg] {
    input.user.role != "admin"
    msg := "Only administrators can perform this action"
}

4.2 或条件

# 满足任一条件即可
allow {
    input.user.role == "admin"
}

allow {
    input.user.role == "editor"
    input.action == "read"
}

使用 | 表示或:

allow {
    input.user.role == "admin"
    input.user.role == "editor"
}

更好的写法:

allow {
    input.user.role == "admin"
}

allow {
    input.user.role == "editor"
    input.action == "read"
}

4.3 嵌套条件

# 复杂嵌套逻辑
allow {
    # 如果是机密资源
    input.resource.classification == "confidential"
    
    # 则必须满足以下任一条件
    {
        input.user.clearance_level >= 4
    } {
        input.user.department == input.resource.owner
        input.action == "read"
    }
}

五、内置函数

5.1 字符串函数

# 字符串前缀检查
startswith(s, prefix)

# 字符串包含
contains(s, substring)

# 字符串小写
lower(s)

# 字符串分割
split(s, separator)

# 正则匹配
re_match(pattern, s)
# 示例
allow {
    startswith(input.user.email, "admin@")
}

allow {
    contains(input.resource.name, "prod")
}

5.2 集合函数

# 集合交集
intersection(sets)

# 集合并集
union(sets)

# 集合差集
array.slice(a, start_index, stop_index)

5.3 聚合函数

# 计数
count(collection)

# 求和
sum(collection)

# 最大值
max(collection)

# 最小值
min(collection)
# 用户拥有多少个管理员角色
admin_role_count := count([role |
    data.user_roles[role_id].user_id == input.user.id
    data.roles[role_id].name == "admin"
])

# 如果超过 1 个管理员角色,拒绝
deny[msg] {
    admin_role_count > 1
    msg := "User has too many admin roles"
}

5.4 时间函数

# 当前时间(UTC)
time.now_ns()  # 纳秒时间戳

# 格式化时间
time.format(t, layout)
# 只允许工作时间内访问
working_hours {
    hour := to_number(time.clock(time.now_ns(), "America/New_York")[0])
    hour >= 9
    hour < 17
}

5.5 类型检查

# 类型检查
is_number(x)
is_string(x)
is_array(x)
is_object(x)
null(x)
# 输入必须包含有效用户
deny[msg] {
    not is_string(input.user.id)
    msg := "User ID must be a string"
}

六、虚拟文档

6.1 什么是虚拟文档

虚拟文档是 Rego 的核心特性,允许你定义「派生数据」:

# 定义虚拟文档
names["alice"] = true
names["bob"] = true

# 使用虚拟文档
allowed {
    names[input.user.name]
}

6.2 动态虚拟文档

# 基于外部数据生成虚拟文档
resource_permissions[resource_id] = permissions {
    resource := data.resources[resource_id]
    permissions := [perm |
        role_id := data.resource_roles[resource_id][_]
        role := data.roles[role_id]
        perm := role.permissions[_]
    ]
}

6.3 链式虚拟文档

# 第一层
managers[user_id] {
    user := data.users[user_id]
    user.title == "Manager"
}

# 第二层:经理的下属
subordinates[manager_id][subordinate_id] {
    manager := data.users[manager_id]
    subordinate := data.users[subordinate_id]
    subordinate.manager_id == manager_id
}

七、调试 Rego

7.1 print 语句

allow {
    print("Checking access for:", input.user.id)
    
    # ... 调试输出
    
    print("Result:", input.user.role == "admin")
    
    input.user.role == "admin"
}

7.2 trace 函数

deny[msg] {
    trace("Entering deny rule")
    
    # ... 
    
    trace("Exiting deny rule")
    msg := "Access denied"
}

7.3 解释模式

# 启用跟踪
opa eval --explain full --format pretty -d policy.rego "data.app.allow"

# 部分跟踪
opa eval --explain=notes -d policy.rego "data.app.allow"

八、常用模式

8.1 默认拒绝

package app.authz

default allow = false

allow {
    input.user.role == "admin"
}

allow {
    input.user.department == input.resource.department
    input.action == "read"
}

8.2 拒绝覆盖

# 定义多个拒绝条件
deny[msg] {
    not is_working_hours
    msg := "Access only allowed during working hours"
}

deny[msg] {
    input.user.clearance_level < input.resource.required_clearance
    msg := sprintf("User clearance (%d) insufficient for resource (requires %d)", 
        [input.user.clearance_level, input.resource.required_clearance])
}

# 允许:没有拒绝条件
allow {
    count(deny) == 0
}

8.3 资源层级

# 用户可以访问其部门或其父部门的资源
allow {
    user_dept := data.users[input.user.id].department
    resource_dept := input.resource.department
    
    # 直接匹配
    user_dept == resource_dept
} {
    # 或通过父部门
    parent_dept := data.departments[user_dept].parent
    parent_dept == resource_dept
}

8.4 条件组合

# 多条件组合判断
access_level = "full" {
    input.user.role == "admin"
    input.user.clearance >= 4
}

access_level = "limited" {
    input.user.role == "user"
    input.user.clearance >= 2
}

access_level = "none" {
    not access_level == "full"
    not access_level == "limited"
}

九、性能优化

9.1 避免重复计算

# 不好:每次条件都重新计算
deny[msg] {
    data.users[input.user.id].department == "engineering"
    data.users[input.user.id].clearance >= 3
    msg := "..."
}

# 好:使用变量缓存
deny[msg] {
    user := data.users[input.user.id]
    user.department == "engineering"
    user.clearance >= 3
    msg := "..."
}

9.2 限制搜索空间

# 不好:遍历所有用户
deny[msg] {
    data.users[user_id].suspicious == true
    user_id == input.user.id
}

# 好:直接定位
deny[msg] {
    user := data.users[input.user.id]
    user.suspicious == true
    msg := "User is flagged as suspicious"
}

9.3 早退出

# 将最可能为真的条件放前面
allow {
    # 简单检查放前面(快速失败)
    input.user.status == "active"
    
    # 复杂检查放后面
    resource := data.resources[input.resource.id]
    resource.owner == input.user.id
}

9.4 避免深度嵌套

# 不好:深层嵌套
deny[msg] {
    org := data.orgs[input.user.org_id]
    dept := org.departments[input.user.dept_id]
    team := dept.teams[input.user.team_id]
    team.name == "security"
    msg := "..."
}

# 好:扁平化 + 提前检查
deny[msg] {
    input.user.team_name == "security"
    msg := "Security team has restricted access"
}

十、测试 Rego

10.1 单元测试

authz_test.rego)
package app.authz

# 测试管理员总是被允许
test_allow_admin {
    allow with input as {
        "user": {"role": "admin", "id": "user-1"},
        "resource": {"type": "document"},
        "action": "delete"
    }
}

# 测试普通用户不能删除
test_deny_delete_for_non_admin {
    not allow with input as {
        "user": {"role": "user", "id": "user-2"},
        "resource": {"type": "document"},
        "action": "delete"
    }
}

# 测试用户可以读取自己的资源
test_allow_read_own_resource {
    allow with input as {
        "user": {"role": "user", "id": "user-2"},
        "resource": {"type": "document", "owner": "user-2"},
        "action": "read"
    }
}

10.2 运行测试

# 运行所有测试
opa test ./...

# 详细输出
opa test -v ./...

# 只运行特定包的测试
opa test -v ./policy/authz_test.rego
核心原则

编写 Rego 的黄金法则:先让它正确,再让它高效。声明式语言的性能优化往往来自重写逻辑,而非微调细节。

思考题

问题 1:Rego 的「声明式」特性意味着你描述结果而非过程。请分析以下两种 Rego 写法哪种更好,为什么?

# 写法 A
allow {
    data.roles[input.user.role].permissions[_] == input.action
}

# 写法 B
allow {
    some i
    role := data.roles[input.user.role]
    role.permissions[i] == input.action
}
参考答案

写法 B 更好

原因分析:

  1. 明确性:写法 B 明确引用了 role 对象,后续可以访问其他属性
  2. 可扩展性:如果需要检查权限的其他属性(如 scope),写法 B 更容易扩展
  3. 性能:写法 B 避免了 _ 通配符的直接比较,在大数据集上可能更快

改进后的最佳实践:

allow {
    role := data.roles[input.user.role]
    role.enabled == true
    role.permissions[_] == input.action
}

问题 2:在 Rego 中实现「职责分离」(SOD)策略,需要注意哪些问题?

参考答案

职责分离(SOD)的 Rego 实现要点:

1. 检查互斥角色

deny[msg] {
    # 不能同时持有这两个角色
    data.user_roles[input.user.id][_] == "approver"
    data.user_roles[input.user.id][_] == "executor"
    msg := "Separation of duties violation: cannot be both approver and executor"
}

2. 检查操作与角色组合

deny[msg] {
    # 如果是审批操作,检查用户是否也是执行者
    input.action == "approve"
    expense := data.expenses[input.resource.id]
    expense.submitted_by == input.user.id
    msg := "Cannot approve your own expense"
}

3. 注意事项

  • 使用 count() 聚合检查更高效
  • 考虑时间窗口(最近 N 小时内)
  • 支持豁免机制(emergency override)
  • 完整的审计日志记录