Policy Management API
Complete API reference for creating, managing, and inspecting authorization policies in the Taruvi Platform using Cerbos.
Overview
The Policy Management API provides endpoints for managing fine-grained authorization policies that control access to resources in your application. The API supports four types of policies:
- Resource Policies: Define what actions can be performed on specific resources
- Principal Policies: Define what a specific user/principal can do
- Role Policies: Define role-based permissions with inheritance
- Derived Role Policies: Define dynamic roles with conditions and variables
Base URLs
Standard (tenant from connection context):
/api/apps/{app_slug}/policies/
Site-scoped (explicit tenant override):
/site/{site_slug}/api/apps/{app_slug}/policies/
Site-scoped URLs allow you to explicitly specify the tenant context via the URL path. The SiteSwitchingMiddleware handles tenant switching based on the {site_slug} parameter. This is useful for:
- Cross-tenant operations (with proper permissions)
- Admin interfaces managing multiple tenants
- API clients that need explicit tenant control
Authentication
All endpoints require JWT authentication:
Authorization: Bearer <your_jwt_token>
To obtain a JWT token:
POST /api/cloud/auth/jwt/token/
Content-Type: application/json
{
"username": "[email protected]",
"password": "your_password"
}
Multi-Tenant Isolation
All policy operations are automatically scoped to:
- The current tenant (from connection context or site_slug)
- The specified app (from URL path)
This ensures complete data isolation between tenants and apps.
Policy Management Endpoints (REST API)
1. Create or Update Policy
Create a new authorization policy or update an existing one. This endpoint is idempotent - Cerbos uses add_or_update internally, so the same request can safely be called multiple times.
POST /api/apps/{app_slug}/policies/
POST /site/{site_slug}/api/apps/{app_slug}/policies/
Request Headers
Authorization: Bearer <jwt_token>
Content-Type: application/json
Request Body - Resource Policy
{
"policy_type": "resource",
"name": "sales_invoices",
"entity_type": "invoice",
"import_derived_roles": ["common_roles"],
"rules": [
{
"actions": ["read", "update"],
"effect": "EFFECT_ALLOW",
"roles": ["admin", "manager"]
},
{
"actions": ["delete"],
"effect": "EFFECT_DENY",
"roles": ["guest"]
},
{
"actions": ["read", "update"],
"effect": "EFFECT_ALLOW",
"derived_roles": ["owner"],
"condition": {
"match": {
"expr": "R.attr.status != 'archived'"
}
}
}
],
"metadata": {
"description": "Sales invoices access policy",
"tags": ["finance", "sales-team"]
}
}
Request Body - Principal Policy
{
"policy_type": "principal",
"name": "john_doe",
"rules": [
{
"resource": "invoice-sales_invoices",
"actions": [
{"action": "read", "effect": "EFFECT_ALLOW"},
{"action": "create", "effect": "EFFECT_ALLOW"},
{"action": "delete", "effect": "EFFECT_DENY"}
]
}
]
}
Request Body - Role Policy
{
"policy_type": "role",
"name": "admin",
"parent_roles": ["user"],
"rules": [
{
"resource": "invoice-sales_invoices",
"allowActions": ["create", "read", "update", "delete"]
}
]
}
Request Body - Derived Role Policy
{
"policy_type": "derived_role",
"name": "common_roles",
"definitions": [
{
"name": "owner",
"parentRoles": ["user"],
"condition": {
"match": {
"expr": "R.attr.owner_id == P.id"
}
}
},
{
"name": "manager",
"parentRoles": ["owner"],
"condition": {
"match": {
"expr": "P.attr.role == 'manager'"
}
}
}
],
"variables": {
"import": ["shared_variables"],
"local": {
"is_owner": "R.attr.owner_id == P.id"
}
},
"constants": {
"import": ["shared_constants"],
"local": {
"max_items": 100
}
}
}
Derived role names are automatically prefixed with {tenant_id}_{app_slug}_ for tenant isolation. When referencing them in import_derived_roles, use the unprefixed name (e.g., "common_roles" not "public_crm_common_roles").
Request Parameters
| Field | Type | Required | Description |
|---|---|---|---|
policy_type | string | No (default: "resource") | Type: "resource", "principal", "role", or "derived_role" |
name | string | Yes | Policy name (alphanumeric + underscore/hyphen, max 200 chars) |
entity_type | string | Required for resource policies | Resource type (e.g., "invoice", "project") |
parent_roles | string[] | No (role policies only) | Parent roles for inheritance |
import_derived_roles | string[] | No (resource policies only) | Derived roles to import (unprefixed names) |
rules | array | Yes (except derived_role) | Policy rules (min 1, max 50) |
definitions | array | Yes for derived_role | Role definitions (derived_role only) |
variables | object | No (derived_role only) | Policy variables with import/local structure |
constants | object | No (derived_role only) | Policy constants with import/local structure |
metadata | object | No | Optional metadata |
Success Response (201 Created)
{
"success": true,
"message": "Policy created successfully",
"status_code": 201
}
cURL Example
curl -X POST "http://localhost:8000/api/apps/crm/policies/" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"policy_type": "resource",
"name": "sales_invoices",
"entity_type": "invoice",
"rules": [
{
"actions": ["read", "update"],
"effect": "EFFECT_ALLOW",
"roles": ["admin"]
}
]
}'
2. List Policies
Retrieve all policies for the current app and tenant with optional filtering.
GET /api/apps/{app_slug}/policies/
GET /site/{site_slug}/api/apps/{app_slug}/policies/
Query Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
name_regexp | string | No | .* | Regex filter for policy names |
scope_regexp | string | No | Auto-filtered by tenant | Regex filter for scope |
version_regexp | string | No | None | Regex filter for version |
include_disabled | boolean | No | false | Include disabled policies |
Success Response (200 OK)
{
"success": true,
"message": "Policies retrieved successfully",
"status_code": 200,
"data": [
{
"apiVersion": "api.cerbos.dev/v1",
"resourcePolicy": {
"resource": "invoice-sales_invoices",
"version": "default",
"scope": "public_crm",
"rules": [...]
}
}
],
"total": 1
}
cURL Example
curl -X GET "http://localhost:8000/api/apps/crm/policies/?name_regexp=invoice" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
3. Retrieve Policy
Get a specific policy by its full Cerbos policy ID.
GET /api/apps/{app_slug}/policies/?id={policy_id}
GET /site/{site_slug}/api/apps/{app_slug}/policies/?id={policy_id}
Query Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Full policy ID in Cerbos internal format |
Policy ID Formats
Use the policy ID exactly as returned from the /inspect endpoint:
| Policy Type | Format | Example |
|---|---|---|
| Resource | resource.{name}.{version}/{scope} | resource.invoice-sales_invoices.default/public_crm |
| Role | role.{name}/{scope} | role.admin/public_crm |
| Principal | principal.{name}.{version}/{scope} | principal.john_doe.default/public_crm |
| Derived Role | derived_roles.{name} | derived_roles.public_crm_common_roles |
Success Response (200 OK)
{
"success": true,
"message": "Policy retrieved successfully",
"status_code": 200,
"data": {
"apiVersion": "api.cerbos.dev/v1",
"resourcePolicy": {
"resource": "invoice-sales_invoices",
"version": "default",
"scope": "public_crm",
"rules": [...]
}
}
}
cURL Example
curl -X GET "http://localhost:8000/api/apps/crm/policies/?id=resource.invoice-sales_invoices.default/public_crm" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
4. Delete Policy
Delete (soft-delete/disable) a policy by its full Cerbos policy ID.
DELETE /api/apps/{app_slug}/policies/?id={policy_id}
DELETE /site/{site_slug}/api/apps/{app_slug}/policies/?id={policy_id}
Query Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
id | string | Yes | Full policy ID in Cerbos internal format |
Success Response (200 OK)
{
"success": true,
"message": "Policy deleted successfully",
"status_code": 200
}
Cerbos uses soft delete - policies are disabled, not permanently removed, to maintain audit history.
cURL Example
curl -X DELETE "http://localhost:8000/api/apps/crm/policies/?id=resource.invoice-sales_invoices.default/public_crm" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
5. List Derived Roles
List derived role policies for the current tenant and app.
GET /api/apps/{app_slug}/policies/derived-roles/
GET /site/{site_slug}/api/apps/{app_slug}/policies/derived-roles/
Query Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
name_regexp | string | No | .* | Additional regex filter for role names |
include_disabled | boolean | No | false | Include disabled policies |
Success Response (200 OK)
{
"success": true,
"message": "Derived roles retrieved successfully",
"status_code": 200,
"data": [
{
"apiVersion": "api.cerbos.dev/v1",
"derivedRoles": {
"name": "public_crm_common_roles",
"definitions": [...]
}
}
],
"total": 1
}
cURL Example
curl -X GET "http://localhost:8000/api/apps/crm/policies/derived-roles/" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
6. Inspect Policies
Inspect policies to retrieve detailed metadata including actions, variables, attributes, derived roles, and constants.
GET /api/apps/{app_slug}/policies/inspect/
GET /site/{site_slug}/api/apps/{app_slug}/policies/inspect/
Query Parameters
| Parameter | Type | Required | Default | Description |
|---|---|---|---|---|
policy_id | string (repeatable) | No | None | Specific policy IDs to inspect |
name_regexp | string | No | .* | Regex filter for policy names |
scope_regexp | string | No | Auto-filtered by tenant | Regex filter for scope |
version_regexp | string | No | None | Regex filter for version |
include_disabled | boolean | No | false | Include disabled policies |
Success Response (200 OK)
{
"success": true,
"message": "Policies inspected successfully",
"status_code": 200,
"data": {
"results": {
"resource.invoice-sales_invoices.default/public_crm": {
"policyId": "resource.invoice-sales_invoices.default/public_crm",
"actions": ["read", "create", "update", "delete"],
"derivedRoles": [
{"name": "owner", "kind": "IMPORTED", "source": "common_roles"}
],
"variables": [
{"name": "is_owner", "kind": "LOCAL", "value": "R.attr.owner_id == P.id"}
],
"attributes": [
{"name": "tenant_id", "kind": "PRINCIPAL_ATTRIBUTE"},
{"name": "status", "kind": "RESOURCE_ATTRIBUTE"}
]
}
}
},
"total": 1
}
cURL Example
curl -X GET "http://localhost:8000/api/apps/crm/policies/inspect/?name_regexp=invoice" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"
Authorization Endpoints (Django Ninja API)
These endpoints are used for runtime authorization checks. They use the Cerbos gRPC and HTTP APIs.
1. Check Resources (gRPC)
Check if a principal has permissions to perform actions on resources.
POST /api/apps/{app_slug}/check/resources
POST /site/{site_slug}/api/apps/{app_slug}/check/resources
Request Body
{
"principal": {
"id": "user_123",
"roles": ["admin"],
"attr": {"department": "sales"}
},
"resources": [
{
"resource": {
"kind": "invoice-sales_invoices",
"id": "inv_001",
"attr": {"owner_id": "user_456"}
},
"actions": ["read", "update", "delete"]
}
]
}
Response
{
"requestId": "test",
"results": [
{
"resource": {
"id": "inv_001",
"kind": "invoice-sales_invoices",
"policyVersion": "default",
"scope": "public_crm"
},
"actions": {
"read": "EFFECT_ALLOW",
"update": "EFFECT_ALLOW",
"delete": "EFFECT_DENY"
},
"meta": {
"effectiveDerivedRoles": ["owner"]
}
}
],
"cerbosCallId": "01HHENANTHFD5DV3HZGDKB87PJ"
}
cURL Example
curl -X POST "http://localhost:8000/api/apps/crm/check/resources" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"principal": {"id": "user_123", "roles": ["admin"]},
"resources": [
{
"resource": {"kind": "invoice-sales_invoices", "id": "inv_001"},
"actions": ["read", "update"]
}
]
}'
2. Plan Resources (gRPC)
Get a query plan from Cerbos for row-level filtering. Returns filter conditions that can be applied to database queries.
POST /api/apps/{app_slug}/plan/resources
POST /site/{site_slug}/api/apps/{app_slug}/plan/resources
Request Body
{
"principal": {
"id": "user_123",
"roles": ["viewer"],
"attr": {"department": "sales"}
},
"resource": {
"kind": "invoice-sales_invoices",
"policy_version": "default"
},
"action": "read"
}
Response
{
"filter_kind": "CONDITIONAL",
"condition": {
"expression": {
"operator": "eq",
"operands": [
{"variable": "request.resource.attr.owner_id"},
{"value": "user_123"}
]
}
}
}
Filter Kinds:
ALWAYS_ALLOWED: No filtering needed, user has full accessCONDITIONAL: Apply the returned condition to filter resultsALWAYS_DENIED: User has no access to any resources
cURL Example
curl -X POST "http://localhost:8000/api/apps/crm/plan/resources" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"principal": {"id": "user_123", "roles": ["viewer"]},
"resource": {"kind": "invoice-sales_invoices"},
"action": "read"
}'
3. AuthZEN Access Evaluation (HTTP Proxy)
Single access evaluation using the AuthZEN standard format. This endpoint proxies directly to Cerbos's native AuthZEN HTTP endpoint, only injecting the tenant scope.
POST /api/apps/{app_slug}/access/v1/evaluation
POST /site/{site_slug}/api/apps/{app_slug}/access/v1/evaluation
Request Body (AuthZEN Standard)
{
"subject": {
"type": "user",
"id": "user_123",
"properties": {
"cerbos.roles": ["admin"],
"department": "sales"
}
},
"resource": {
"type": "invoice-sales_invoices",
"id": "inv_001",
"properties": {
"owner_id": "user_456"
}
},
"action": {
"name": "read"
}
}
Response (from Cerbos)
{
"decision": true,
"context": {
"id": "01HHENANTHFD5DV3HZGDKB87PJ",
"reason_admin": {
"effectiveDerivedRoles": ["owner"]
}
}
}
cURL Example
curl -X POST "http://localhost:8000/api/apps/crm/access/v1/evaluation" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"subject": {
"type": "user",
"id": "user_123",
"properties": {"cerbos.roles": ["admin"]}
},
"resource": {
"type": "invoice-sales_invoices",
"id": "inv_001"
},
"action": {"name": "read"}
}'
4. AuthZEN Batch Evaluation (HTTP Proxy)
Batch access evaluation for multiple actions/resources. Proxies directly to Cerbos's native AuthZEN HTTP endpoint.
POST /api/apps/{app_slug}/access/v1/evaluations
POST /site/{site_slug}/api/apps/{app_slug}/access/v1/evaluations
Request Body (Cerbos AuthZEN Batch)
{
"subject": {
"type": "user",
"id": "user_123",
"properties": {
"cerbos.policyVersion": "default",
"cerbos.roles": ["admin"]
}
},
"resource": {
"type": "invoice-sales_invoices",
"id": "inv_001",
"properties": {}
},
"context": {
"cerbos.requestId": "batch_001",
"cerbos.includeMeta": true
},
"evaluations": [
{"action": {"name": "read"}},
{"action": {"name": "update"}},
{
"resource": {"type": "invoice-sales_invoices", "id": "inv_002"},
"action": {"name": "delete"}
}
],
"options": {
"evaluations_semantic": "execute_all"
}
}
Response (from Cerbos)
{
"evaluations": [
{"decision": true, "context": {...}},
{"decision": true, "context": {...}},
{"decision": false, "context": {...}}
]
}
cURL Example
curl -X POST "http://localhost:8000/api/apps/crm/access/v1/evaluations" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"subject": {
"type": "user",
"id": "user_123",
"properties": {"cerbos.roles": ["admin"]}
},
"resource": {"type": "invoice-sales_invoices", "id": "inv_001"},
"evaluations": [
{"action": {"name": "read"}},
{"action": {"name": "update"}}
]
}'
Using Policy Management in Internal Functions
This section explains how to use policy management functions directly within your internal code (views, signals, services, Celery tasks).
Architecture Overview
cloud_site/policy/views.py (PolicyViewSet - REST API)
↓
cloud_site/policy/services/policy_service.py (PolicyService)
↓
core/authorization/services/cerbos_service.py (CerbosService)
↓
core/authorization/clients/cerbos_admin_client.py (CerbosAdminClient)
↓
Cerbos SDK → Cerbos Server
For internal use, interact with the PolicyService layer.
Step 1: Import Required Components
from core.authorization.context import AuthorizationContext, build_context_from_request
from cloud_site.policy.services import get_policy_service
Step 2: Build Authorization Context
Option A: From HTTP Request (Views, API Endpoints)
from core.authorization.context import build_context_from_request
def my_view(request):
# Automatically extracts tenant_id, app_slug, user_id, roles
context = build_context_from_request(request)
Option B: Manual Construction (Signals, Celery Tasks, Management Commands)
from django.db import connection
from core.authorization.context import AuthorizationContext
# From current tenant context
context = AuthorizationContext(
tenant_id=connection.tenant.schema_name,
app_slug="my_app",
user_id="user_123",
roles=("admin", "user"),
)
# For system/background tasks
context = AuthorizationContext(
tenant_id="public",
app_slug="crm",
user_id="system",
roles=("admin",),
)
Step 3: Get Policy Service Instance
from cloud_site.policy.services import get_policy_service
policy_service = get_policy_service()
Step 4: Create or Update Policies
The create_or_update_policy() method follows 3 conditional scenarios based on the presence of rules and policy type:
Scenario 1: No Rules + Resource/Role Type
Creates unrestricted policies at BOTH default and scoped levels:
# No rules provided - creates unrestricted at both levels
policy_data = {
"policy_type": "resource",
"entity_type": "datatable",
"name": "users"
# No rules - will create unrestricted policies
}
result = policy_service.create_or_update_policy(
policy_data=policy_data,
context=context,
)
# Creates TWO policies:
# 1. Default level (scope=None): resource.datatable-users.default (DENY all)
# 2. Scoped level: resource.datatable-users.default/public_crm (ALLOW all)
Scenario 2: Rules + Resource/Role Type
Creates unrestricted base at default level + custom policy at scoped level:
# Rules provided - creates unrestricted base + scoped with custom rules
policy_data = {
"policy_type": "resource",
"entity_type": "invoice",
"name": "sales_invoices",
"rules": [
{
"actions": ["read", "create", "update"],
"effect": "EFFECT_ALLOW",
"roles": ["admin", "manager"]
},
{
"actions": ["read"],
"effect": "EFFECT_ALLOW",
"derived_roles": ["owner"]
}
],
"metadata": {
"description": "Sales invoices access control"
}
}
result = policy_service.create_or_update_policy(
policy_data=policy_data,
context=context,
)
# Creates TWO policies:
# 1. Default level (scope=None): resource.invoice-sales_invoices.default (DENY all - unrestricted)
# 2. Scoped level: resource.invoice-sales_invoices.default/public_crm (your custom rules)
print(f"Created policy: {result['policy_id']}")
# Output: Created policy: invoice-sales_invoices.default/public_crm
Scenario 3: Principal/Derived Role Type
Creates scoped policy only with custom rules:
# Principal or Derived Role - creates scoped policy only
policy_data = {
"policy_type": "principal",
"principal": "user:123",
"rules": [
{
"resource": "invoice:*",
"actions": [
{
"action": "read",
"effect": "EFFECT_ALLOW"
}
]
}
]
}
result = policy_service.create_or_update_policy(
policy_data=policy_data,
context=context,
)
# Creates ONE policy:
# - Scoped level only: principal.user_123.default/public_crm (custom rules)
Step 5: Delete Policies
Using delete_policy() for Custom Policies
# Use full Cerbos policy ID (get from inspect API or create result)
policy_id = "resource.invoice-sales_invoices.default/public_crm"
policy_service.delete_policy(
policy_id=policy_id,
context=context,
)
Using delete_system_policy() for Unrestricted Policies
Use delete_system_policy() to delete system policies that were created without rules (unrestricted policies). This method deletes BOTH default and scoped levels.
Method Signature:
def delete_system_policy(
self,
policy_type: str, # "resource" or "role"
name: str, # Policy name
entity_type: str | None, # Required for resource, None for role
context: AuthorizationContext
) -> dict[str, Any]
When to Use:
- Use
delete_system_policy(): For policies created without rules (Scenario 1) - Use
delete_policy(): For policies created with custom rules (Scenario 2 & 3)
Example 1: Delete Resource System Policy
from cloud_site.policy.services import get_policy_service
from core.authorization.context import AuthorizationContext
policy_service = get_policy_service()
context = AuthorizationContext(
tenant_id="public",
app_slug="crm",
user_id="admin",
roles=("admin",)
)
# Delete resource system policy (deletes both default and scoped)
result = policy_service.delete_system_policy(
policy_type="resource",
entity_type="datatable",
name="users",
context=context
)
# Deletes TWO policies:
# 1. resource.datatable-users.default (default level)
# 2. resource.datatable-users.default/public_crm (scoped level)
print(result)
# Output: {"deleted_policies": [...], "errors": []}
Example 2: Delete Role System Policy
# Delete role system policy (deletes both default and scoped)
result = policy_service.delete_system_policy(
policy_type="role",
name="admin",
entity_type=None, # Not required for role policies
context=context
)
# Deletes TWO policies:
# 1. role.admin.default (default level)
# 2. role.admin.default/public_crm (scoped level)
Complete Examples
Example 1: Creating Policy in a Django View
from django.http import JsonResponse
from core.authorization.context import build_context_from_request
from cloud_site.policy.services import get_policy_service
from base.responses import AppDataResponse
def create_invoice_policy(request):
try:
context = build_context_from_request(request)
policy_data = {
"policy_type": "resource",
"entity_type": "invoice",
"name": "sales_invoices",
"rules": [
{
"actions": ["*"],
"effect": "EFFECT_ALLOW",
"roles": ["admin"]
},
{
"actions": ["read", "update"],
"effect": "EFFECT_ALLOW",
"roles": ["manager"],
"condition": {
"match": {
"expr": "R.attr.department == P.attr.department"
}
}
}
]
}
policy_service = get_policy_service()
result = policy_service.create_or_update_policy(
policy_data=policy_data,
context=context,
)
return AppDataResponse.created(
data={"policy_id": result["policy_id"]},
message="Policy created successfully"
)
except Exception as e:
return AppDataResponse.error(
error_key="UNKNOWN_ERROR",
message=f"Failed to create policy: {str(e)}"
)
Example 2: Auto-Create Policy in Django Signals
from django.db.models.signals import post_save
from django.dispatch import receiver
from django.db import connection
from core.authorization.context import AuthorizationContext
from cloud_site.policy.services import get_policy_service
from cloud_site.data.models import DataTable
@receiver(post_save, sender=DataTable)
def create_datatable_policy(sender, instance, created, **kwargs):
if not created:
return
try:
context = AuthorizationContext(
tenant_id=connection.tenant.schema_name,
app_slug=instance.app.slug,
user_id="system",
roles=("admin",),
)
policy_data = {
"policy_type": "resource",
"entity_type": "datatable",
"name": instance.name,
"rules": [
{"actions": ["*"], "effect": "EFFECT_ALLOW", "roles": ["admin"]},
{"actions": ["read"], "effect": "EFFECT_ALLOW", "roles": ["viewer"]}
]
}
policy_service = get_policy_service()
result = policy_service.create_or_update_policy(policy_data, context)
print(f"✅ Created policy: {result['policy_id']}")
except Exception as e:
print(f"❌ Failed to create policy: {e}")
Example 3: Creating Derived Role Policies
from core.authorization.context import build_context_from_request
from cloud_site.policy.services import get_policy_service
def setup_ownership_roles(request):
context = build_context_from_request(request)
policy_service = get_policy_service()
# Step 1: Create derived role policy
derived_role_data = {
"policy_type": "derived_role",
"name": "ownership_roles", # Will be prefixed with tenant_id_app_slug_
"definitions": [
{
"name": "owner",
"parentRoles": ["user"],
"condition": {"match": {"expr": "R.attr.owner_id == P.id"}}
},
{
"name": "collaborator",
"parentRoles": ["user"],
"condition": {"match": {"expr": "P.id in R.attr.collaborator_ids"}}
}
]
}
result1 = policy_service.create_or_update_policy(derived_role_data, context)
# Step 2: Create resource policy that imports these derived roles
resource_policy_data = {
"policy_type": "resource",
"entity_type": "invoice",
"name": "sales_invoices",
"import_derived_roles": ["ownership_roles"], # Use unprefixed name!
"rules": [
{
"actions": ["read", "update"],
"effect": "EFFECT_ALLOW",
"derived_roles": ["owner"]
},
{
"actions": ["read"],
"effect": "EFFECT_ALLOW",
"derived_roles": ["collaborator"]
}
]
}
result2 = policy_service.create_or_update_policy(resource_policy_data, context)
return {
"derived_role_policy": result1["policy_id"],
"resource_policy": result2["policy_id"]
}
Example 4: Celery Background Task
from celery import shared_task
from django_tenants.utils import schema_context
from core.authorization.context import AuthorizationContext
from cloud_site.policy.services import get_policy_service
@shared_task
def create_policy_task(tenant_id, app_slug, policy_data):
with schema_context(tenant_id):
context = AuthorizationContext(
tenant_id=tenant_id,
app_slug=app_slug,
user_id="celery-worker",
roles=("admin",)
)
policy_service = get_policy_service()
result = policy_service.create_or_update_policy(
policy_data=policy_data,
context=context
)
return {"success": True, "policy_id": result["policy_id"]}
# Usage:
# create_policy_task.delay("public", "crm", {...})
Example 5: List and Inspect Policies
from core.authorization.context import build_context_from_request
from cloud_site.policy.services import get_policy_service
def get_all_invoice_policies(request):
context = build_context_from_request(request)
policy_service = get_policy_service()
# List policies with filters
policies = policy_service.list_policies(
filters={"name_regexp": "invoice.*"},
context=context
)
# Inspect policies for detailed metadata
inspection = policy_service.inspect_policies(
filters={"name_regexp": "invoice.*"},
context=context
)
# List derived roles
derived_roles = policy_service.list_derived_roles(
filters={"include_disabled": False},
context=context
)
return {
"policies": policies,
"inspection": inspection,
"derived_roles": derived_roles
}
Validation Before Creation
Use serializers for input validation:
from cloud_site.policy.serializers import PolicyCreateSerializer
from cloud_site.policy.services import get_policy_service
from core.authorization.context import build_context_from_request
def create_validated_policy(request, policy_data):
# Validate input
serializer = PolicyCreateSerializer(data=policy_data)
serializer.is_valid(raise_exception=True)
# Build context and create
context = build_context_from_request(request)
policy_service = get_policy_service()
return policy_service.create_or_update_policy(
policy_data=serializer.validated_data,
context=context
)
Error Handling
from base.responses.exceptions import (
ValidationException,
ResourceNotFoundException,
AppException
)
try:
result = policy_service.create_or_update_policy(policy_data, context)
except ValidationException as e:
print(f"Validation error: {e.message}")
except ResourceNotFoundException as e:
print(f"Resource not found: {e.message}")
except AppException as e:
print(f"Error: {e.message}, Status: {e.status_code}")
Important Notes
- Idempotency:
create_or_update_policy()is idempotent - safe to call multiple times - Auto Base Policy: Resource policies automatically create an unrestricted base policy with
scope=Noneand DENY rules for proper Cerbos policy chain completion - Scope Injection: Scope is automatically set to
{tenant_id}_{app_slug}(e.g.,public_crm) - Derived Role Naming: Names are prefixed with
{tenant_id}_{app_slug}_for tenant isolation - Policy IDs: Use full Cerbos format for deletions (get from inspect API or create result)
Policy Types Reference
Resource Policy
Controls what actions can be performed on specific resources.
{
"policy_type": "resource",
"name": "resource_name",
"entity_type": "entity_type",
"import_derived_roles": ["role1", "role2"],
"rules": [
{
"actions": ["action1", "action2"],
"effect": "EFFECT_ALLOW|EFFECT_DENY",
"roles": ["role1"],
"derived_roles": ["derived_role1"],
"condition": {"match": {"expr": "..."}}
}
]
}
Policy ID Format: resource.{entity_type}-{name}.default/{scope}
Principal Policy
Controls what a specific user/principal can do.
{
"policy_type": "principal",
"name": "principal_id",
"rules": [
{
"resource": "entity_type-resource_name",
"actions": [
{"action": "action_name", "effect": "EFFECT_ALLOW|EFFECT_DENY"}
]
}
]
}
Policy ID Format: principal.{name}.default/{scope}
Role Policy
Defines role-based permissions with inheritance.
{
"policy_type": "role",
"name": "role_name",
"parent_roles": ["parent_role1"],
"rules": [
{
"resource": "entity_type-resource_name",
"allowActions": ["action1", "action2"]
}
]
}
Policy ID Format: role.{name}/{scope}
Derived Role Policy
Defines dynamic roles with conditions.
{
"policy_type": "derived_role",
"name": "role_set_name",
"definitions": [
{
"name": "role_name",
"parentRoles": ["parent_role"],
"condition": {"match": {"expr": "..."}}
}
],
"variables": {"local": {"var_name": "expression"}},
"constants": {"local": {"const_name": value}}
}
Policy ID Format: derived_roles.{tenant_id}_{app_slug}_{name}
Error Responses
Standard Error Format
{
"success": false,
"message": "Error description",
"status_code": 400,
"errors": {"detail": "Detailed error message"}
}
HTTP Status Codes
| Code | Description |
|---|---|
| 200 | Success (GET, DELETE) |
| 201 | Created (POST) |
| 400 | Bad Request (validation error) |
| 401 | Unauthorized (missing/invalid JWT) |
| 403 | Forbidden (permission denied) |
| 404 | Not Found (policy doesn't exist) |
| 500 | Internal Server Error |
Best Practices
1. Use Meaningful Policy Names
{
"name": "sales_invoices",
"entity_type": "invoice"
}
2. Use Specific Actions (Avoid Wildcards)
{"actions": ["read", "update"]} // Good
{"actions": ["*"]} // Avoid in production
3. Leverage Derived Roles for Dynamic Permissions
{
"import_derived_roles": ["ownership_roles"],
"rules": [
{"actions": ["read", "update"], "derived_roles": ["owner"]}
]
}
4. Use Conditions for Fine-Grained Control
{
"condition": {
"match": {
"expr": "R.attr.department == P.attr.department && R.attr.status != 'archived'"
}
}
}
5. Use Inspection for Debugging
GET /api/apps/crm/policies/inspect/?name_regexp=invoice