Rego 策略语言语法#
Rego 是 OPA 的策略语言,基于 Datalog 扩展而来。如果你是第一次看到 Rego 代码,可能会觉得它像伪代码又像 Python——确实,Rego 的设计目标之一就是让策略既可读又可执行。
但 Rego 的强大之处在于它的声明式特性:你只需要描述「什么是正确的」,而不需要描述「如何计算正确」。
#一、Rego 基础
#1.1 Hello World
最简单的策略)
package example
# 默认拒绝
default allow = false
# 如果用户是管理员,则允许
allow {
input.user.role == "admin"
}#1.2 核心概念
| 概念 | 说明 |
|---|---|
package | 命名空间,类似 Java 的 package |
import | 导入外部数据 |
rule | 规则,定义策略逻辑 |
default | 默认值 |
every | 集合遍历 |
#1.3 包声明
package app.authz
package kubernetes.admission
package terraform.sentinel.v2#二、规则结构
#2.1 规则语法
<variable> {
<body>
}等价于:
<variable> {
<condition1>
<condition2>
...
}#2.2 简单规则
# 允许管理员访问
allow {
input.user.role == "admin"
}#2.3 多条件规则
# 允许在工作时间访问机密文档
allow {
input.user.clearance_level >= 3
input.resource.classification == "confidential"
input.environment.hour >= 9
input.environment.hour <= 18
}#2.4 带赋值的规则
# 如果用户属于工程部门,则允许访问工程文档
allow {
team := input.user.department
doc_department := input.resource.department
team == doc_department
}#三、集合操作
#3.1 集合推导
# 提取所有活跃用户
active_users := [user | data.users[user_id].status == "active"; user := data.users[user_id]]# 提取所有机密文档
confidential_docs := [doc |
data.documents[doc_id]
doc := data.documents[doc_id]
doc.classification in ["confidential", "secret", "top_secret"]
]#3.2 集合 Membership
# 检查用户是否在允许列表中
allow {
input.user.email in data.config.allowed_users
}#3.3 Some 声明
# 遍历并检查
allow {
some i
data.roles[i] == "admin"
input.user.id == data.user_roles[i].user_id
}#3.4 Every 遍历
# 所有资源标签必须存在
valid_labels {
every key := input.required_labels[_] {
input.resource.labels[key]
}
}#四、复合条件
#4.1 否定
# 非管理员不能访问
deny[msg] {
input.user.role != "admin"
msg := "Only administrators can perform this action"
}#4.2 或条件
# 满足任一条件即可
allow {
input.user.role == "admin"
}
allow {
input.user.role == "editor"
input.action == "read"
}使用 | 表示或:
allow {
input.user.role == "admin"
input.user.role == "editor"
}更好的写法:
allow {
input.user.role == "admin"
}
allow {
input.user.role == "editor"
input.action == "read"
}#4.3 嵌套条件
# 复杂嵌套逻辑
allow {
# 如果是机密资源
input.resource.classification == "confidential"
# 则必须满足以下任一条件
{
input.user.clearance_level >= 4
} {
input.user.department == input.resource.owner
input.action == "read"
}
}#五、内置函数
#5.1 字符串函数
# 字符串前缀检查
startswith(s, prefix)
# 字符串包含
contains(s, substring)
# 字符串小写
lower(s)
# 字符串分割
split(s, separator)
# 正则匹配
re_match(pattern, s)# 示例
allow {
startswith(input.user.email, "admin@")
}
allow {
contains(input.resource.name, "prod")
}#5.2 集合函数
# 集合交集
intersection(sets)
# 集合并集
union(sets)
# 集合差集
array.slice(a, start_index, stop_index)#5.3 聚合函数
# 计数
count(collection)
# 求和
sum(collection)
# 最大值
max(collection)
# 最小值
min(collection)# 用户拥有多少个管理员角色
admin_role_count := count([role |
data.user_roles[role_id].user_id == input.user.id
data.roles[role_id].name == "admin"
])
# 如果超过 1 个管理员角色,拒绝
deny[msg] {
admin_role_count > 1
msg := "User has too many admin roles"
}#5.4 时间函数
# 当前时间(UTC)
time.now_ns() # 纳秒时间戳
# 格式化时间
time.format(t, layout)# 只允许工作时间内访问
working_hours {
hour := to_number(time.clock(time.now_ns(), "America/New_York")[0])
hour >= 9
hour < 17
}#5.5 类型检查
# 类型检查
is_number(x)
is_string(x)
is_array(x)
is_object(x)
null(x)# 输入必须包含有效用户
deny[msg] {
not is_string(input.user.id)
msg := "User ID must be a string"
}#六、虚拟文档
#6.1 什么是虚拟文档
虚拟文档是 Rego 的核心特性,允许你定义「派生数据」:
# 定义虚拟文档
names["alice"] = true
names["bob"] = true
# 使用虚拟文档
allowed {
names[input.user.name]
}#6.2 动态虚拟文档
# 基于外部数据生成虚拟文档
resource_permissions[resource_id] = permissions {
resource := data.resources[resource_id]
permissions := [perm |
role_id := data.resource_roles[resource_id][_]
role := data.roles[role_id]
perm := role.permissions[_]
]
}#6.3 链式虚拟文档
# 第一层
managers[user_id] {
user := data.users[user_id]
user.title == "Manager"
}
# 第二层:经理的下属
subordinates[manager_id][subordinate_id] {
manager := data.users[manager_id]
subordinate := data.users[subordinate_id]
subordinate.manager_id == manager_id
}#七、调试 Rego
#7.1 print 语句
allow {
print("Checking access for:", input.user.id)
# ... 调试输出
print("Result:", input.user.role == "admin")
input.user.role == "admin"
}#7.2 trace 函数
deny[msg] {
trace("Entering deny rule")
# ...
trace("Exiting deny rule")
msg := "Access denied"
}#7.3 解释模式
# 启用跟踪
opa eval --explain full --format pretty -d policy.rego "data.app.allow"
# 部分跟踪
opa eval --explain=notes -d policy.rego "data.app.allow"#八、常用模式
#8.1 默认拒绝
package app.authz
default allow = false
allow {
input.user.role == "admin"
}
allow {
input.user.department == input.resource.department
input.action == "read"
}#8.2 拒绝覆盖
# 定义多个拒绝条件
deny[msg] {
not is_working_hours
msg := "Access only allowed during working hours"
}
deny[msg] {
input.user.clearance_level < input.resource.required_clearance
msg := sprintf("User clearance (%d) insufficient for resource (requires %d)",
[input.user.clearance_level, input.resource.required_clearance])
}
# 允许:没有拒绝条件
allow {
count(deny) == 0
}#8.3 资源层级
# 用户可以访问其部门或其父部门的资源
allow {
user_dept := data.users[input.user.id].department
resource_dept := input.resource.department
# 直接匹配
user_dept == resource_dept
} {
# 或通过父部门
parent_dept := data.departments[user_dept].parent
parent_dept == resource_dept
}#8.4 条件组合
# 多条件组合判断
access_level = "full" {
input.user.role == "admin"
input.user.clearance >= 4
}
access_level = "limited" {
input.user.role == "user"
input.user.clearance >= 2
}
access_level = "none" {
not access_level == "full"
not access_level == "limited"
}#九、性能优化
#9.1 避免重复计算
# 不好:每次条件都重新计算
deny[msg] {
data.users[input.user.id].department == "engineering"
data.users[input.user.id].clearance >= 3
msg := "..."
}
# 好:使用变量缓存
deny[msg] {
user := data.users[input.user.id]
user.department == "engineering"
user.clearance >= 3
msg := "..."
}#9.2 限制搜索空间
# 不好:遍历所有用户
deny[msg] {
data.users[user_id].suspicious == true
user_id == input.user.id
}
# 好:直接定位
deny[msg] {
user := data.users[input.user.id]
user.suspicious == true
msg := "User is flagged as suspicious"
}#9.3 早退出
# 将最可能为真的条件放前面
allow {
# 简单检查放前面(快速失败)
input.user.status == "active"
# 复杂检查放后面
resource := data.resources[input.resource.id]
resource.owner == input.user.id
}#9.4 避免深度嵌套
# 不好:深层嵌套
deny[msg] {
org := data.orgs[input.user.org_id]
dept := org.departments[input.user.dept_id]
team := dept.teams[input.user.team_id]
team.name == "security"
msg := "..."
}
# 好:扁平化 + 提前检查
deny[msg] {
input.user.team_name == "security"
msg := "Security team has restricted access"
}#十、测试 Rego
#10.1 单元测试
authz_test.rego)
package app.authz
# 测试管理员总是被允许
test_allow_admin {
allow with input as {
"user": {"role": "admin", "id": "user-1"},
"resource": {"type": "document"},
"action": "delete"
}
}
# 测试普通用户不能删除
test_deny_delete_for_non_admin {
not allow with input as {
"user": {"role": "user", "id": "user-2"},
"resource": {"type": "document"},
"action": "delete"
}
}
# 测试用户可以读取自己的资源
test_allow_read_own_resource {
allow with input as {
"user": {"role": "user", "id": "user-2"},
"resource": {"type": "document", "owner": "user-2"},
"action": "read"
}
}#10.2 运行测试
# 运行所有测试
opa test ./...
# 详细输出
opa test -v ./...
# 只运行特定包的测试
opa test -v ./policy/authz_test.rego核心原则
编写 Rego 的黄金法则:先让它正确,再让它高效。声明式语言的性能优化往往来自重写逻辑,而非微调细节。
#思考题
问题 1:Rego 的「声明式」特性意味着你描述结果而非过程。请分析以下两种 Rego 写法哪种更好,为什么?
# 写法 A
allow {
data.roles[input.user.role].permissions[_] == input.action
}
# 写法 B
allow {
some i
role := data.roles[input.user.role]
role.permissions[i] == input.action
}参考答案
写法 B 更好。
原因分析:
- 明确性:写法 B 明确引用了
role对象,后续可以访问其他属性 - 可扩展性:如果需要检查权限的其他属性(如
scope),写法 B 更容易扩展 - 性能:写法 B 避免了
_通配符的直接比较,在大数据集上可能更快
改进后的最佳实践:
allow {
role := data.roles[input.user.role]
role.enabled == true
role.permissions[_] == input.action
}问题 2:在 Rego 中实现「职责分离」(SOD)策略,需要注意哪些问题?
参考答案
职责分离(SOD)的 Rego 实现要点:
1. 检查互斥角色
deny[msg] {
# 不能同时持有这两个角色
data.user_roles[input.user.id][_] == "approver"
data.user_roles[input.user.id][_] == "executor"
msg := "Separation of duties violation: cannot be both approver and executor"
}2. 检查操作与角色组合
deny[msg] {
# 如果是审批操作,检查用户是否也是执行者
input.action == "approve"
expense := data.expenses[input.resource.id]
expense.submitted_by == input.user.id
msg := "Cannot approve your own expense"
}3. 注意事项
- 使用
count()聚合检查更高效 - 考虑时间窗口(最近 N 小时内)
- 支持豁免机制(emergency override)
- 完整的审计日志记录