Skip to main content

Policy Management API

Complete API reference for creating, managing, and inspecting authorization policies in the Taruvi Platform using Cerbos.

Overview

The Policy Management API provides endpoints for managing fine-grained authorization policies that control access to resources in your application. The API supports four types of policies:

  • Resource Policies: Define what actions can be performed on specific resources
  • Principal Policies: Define what a specific user/principal can do
  • Role Policies: Define role-based permissions with inheritance
  • Derived Role Policies: Define dynamic roles with conditions and variables

Base URLs

Standard (tenant from connection context):

/api/apps/{app_slug}/policies/

Site-scoped (explicit tenant override):

/site/{site_slug}/api/apps/{app_slug}/policies/
Site-Scoped URLs

Site-scoped URLs allow you to explicitly specify the tenant context via the URL path. The SiteSwitchingMiddleware handles tenant switching based on the {site_slug} parameter. This is useful for:

  • Cross-tenant operations (with proper permissions)
  • Admin interfaces managing multiple tenants
  • API clients that need explicit tenant control

Authentication

All endpoints require JWT authentication:

Authorization: Bearer <your_jwt_token>

To obtain a JWT token:

POST /api/cloud/auth/jwt/token/
Content-Type: application/json

{
"username": "[email protected]",
"password": "your_password"
}

Multi-Tenant Isolation

All policy operations are automatically scoped to:

  • The current tenant (from connection context or site_slug)
  • The specified app (from URL path)

This ensures complete data isolation between tenants and apps.


Policy Management Endpoints (REST API)

1. Create or Update Policy

Create a new authorization policy or update an existing one. This endpoint is idempotent - Cerbos uses add_or_update internally, so the same request can safely be called multiple times.

POST /api/apps/{app_slug}/policies/
POST /site/{site_slug}/api/apps/{app_slug}/policies/

Request Headers

Authorization: Bearer <jwt_token>
Content-Type: application/json

Request Body - Resource Policy

{
"policy_type": "resource",
"name": "sales_invoices",
"entity_type": "invoice",
"import_derived_roles": ["common_roles"],
"rules": [
{
"actions": ["read", "update"],
"effect": "EFFECT_ALLOW",
"roles": ["admin", "manager"]
},
{
"actions": ["delete"],
"effect": "EFFECT_DENY",
"roles": ["guest"]
},
{
"actions": ["read", "update"],
"effect": "EFFECT_ALLOW",
"derived_roles": ["owner"],
"condition": {
"match": {
"expr": "R.attr.status != 'archived'"
}
}
}
],
"metadata": {
"description": "Sales invoices access policy",
"tags": ["finance", "sales-team"]
}
}

Request Body - Principal Policy

{
"policy_type": "principal",
"name": "john_doe",
"rules": [
{
"resource": "invoice-sales_invoices",
"actions": [
{"action": "read", "effect": "EFFECT_ALLOW"},
{"action": "create", "effect": "EFFECT_ALLOW"},
{"action": "delete", "effect": "EFFECT_DENY"}
]
}
]
}

Request Body - Role Policy

{
"policy_type": "role",
"name": "admin",
"parent_roles": ["user"],
"rules": [
{
"resource": "invoice-sales_invoices",
"allowActions": ["create", "read", "update", "delete"]
}
]
}

Request Body - Derived Role Policy

{
"policy_type": "derived_role",
"name": "common_roles",
"definitions": [
{
"name": "owner",
"parentRoles": ["user"],
"condition": {
"match": {
"expr": "R.attr.owner_id == P.id"
}
}
},
{
"name": "manager",
"parentRoles": ["owner"],
"condition": {
"match": {
"expr": "P.attr.role == 'manager'"
}
}
}
],
"variables": {
"import": ["shared_variables"],
"local": {
"is_owner": "R.attr.owner_id == P.id"
}
},
"constants": {
"import": ["shared_constants"],
"local": {
"max_items": 100
}
}
}
Derived Role Naming

Derived role names are automatically prefixed with {tenant_id}_{app_slug}_ for tenant isolation. When referencing them in import_derived_roles, use the unprefixed name (e.g., "common_roles" not "public_crm_common_roles").

Request Parameters

FieldTypeRequiredDescription
policy_typestringNo (default: "resource")Type: "resource", "principal", "role", or "derived_role"
namestringYesPolicy name (alphanumeric + underscore/hyphen, max 200 chars)
entity_typestringRequired for resource policiesResource type (e.g., "invoice", "project")
parent_rolesstring[]No (role policies only)Parent roles for inheritance
import_derived_rolesstring[]No (resource policies only)Derived roles to import (unprefixed names)
rulesarrayYes (except derived_role)Policy rules (min 1, max 50)
definitionsarrayYes for derived_roleRole definitions (derived_role only)
variablesobjectNo (derived_role only)Policy variables with import/local structure
constantsobjectNo (derived_role only)Policy constants with import/local structure
metadataobjectNoOptional metadata

Success Response (201 Created)

{
"success": true,
"message": "Policy created successfully",
"status_code": 201
}

cURL Example

curl -X POST "http://localhost:8000/api/apps/crm/policies/" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"policy_type": "resource",
"name": "sales_invoices",
"entity_type": "invoice",
"rules": [
{
"actions": ["read", "update"],
"effect": "EFFECT_ALLOW",
"roles": ["admin"]
}
]
}'

2. List Policies

Retrieve all policies for the current app and tenant with optional filtering.

GET /api/apps/{app_slug}/policies/
GET /site/{site_slug}/api/apps/{app_slug}/policies/

Query Parameters

ParameterTypeRequiredDefaultDescription
name_regexpstringNo.*Regex filter for policy names
scope_regexpstringNoAuto-filtered by tenantRegex filter for scope
version_regexpstringNoNoneRegex filter for version
include_disabledbooleanNofalseInclude disabled policies

Success Response (200 OK)

{
"success": true,
"message": "Policies retrieved successfully",
"status_code": 200,
"data": [
{
"apiVersion": "api.cerbos.dev/v1",
"resourcePolicy": {
"resource": "invoice-sales_invoices",
"version": "default",
"scope": "public_crm",
"rules": [...]
}
}
],
"total": 1
}

cURL Example

curl -X GET "http://localhost:8000/api/apps/crm/policies/?name_regexp=invoice" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"

3. Retrieve Policy

Get a specific policy by its full Cerbos policy ID.

GET /api/apps/{app_slug}/policies/?id={policy_id}
GET /site/{site_slug}/api/apps/{app_slug}/policies/?id={policy_id}

Query Parameters

ParameterTypeRequiredDescription
idstringYesFull policy ID in Cerbos internal format

Policy ID Formats

Use the policy ID exactly as returned from the /inspect endpoint:

Policy TypeFormatExample
Resourceresource.{name}.{version}/{scope}resource.invoice-sales_invoices.default/public_crm
Rolerole.{name}/{scope}role.admin/public_crm
Principalprincipal.{name}.{version}/{scope}principal.john_doe.default/public_crm
Derived Rolederived_roles.{name}derived_roles.public_crm_common_roles

Success Response (200 OK)

{
"success": true,
"message": "Policy retrieved successfully",
"status_code": 200,
"data": {
"apiVersion": "api.cerbos.dev/v1",
"resourcePolicy": {
"resource": "invoice-sales_invoices",
"version": "default",
"scope": "public_crm",
"rules": [...]
}
}
}

cURL Example

curl -X GET "http://localhost:8000/api/apps/crm/policies/?id=resource.invoice-sales_invoices.default/public_crm" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"

4. Delete Policy

Delete (soft-delete/disable) a policy by its full Cerbos policy ID.

DELETE /api/apps/{app_slug}/policies/?id={policy_id}
DELETE /site/{site_slug}/api/apps/{app_slug}/policies/?id={policy_id}

Query Parameters

ParameterTypeRequiredDescription
idstringYesFull policy ID in Cerbos internal format

Success Response (200 OK)

{
"success": true,
"message": "Policy deleted successfully",
"status_code": 200
}
Soft Delete

Cerbos uses soft delete - policies are disabled, not permanently removed, to maintain audit history.

cURL Example

curl -X DELETE "http://localhost:8000/api/apps/crm/policies/?id=resource.invoice-sales_invoices.default/public_crm" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"

5. List Derived Roles

List derived role policies for the current tenant and app.

GET /api/apps/{app_slug}/policies/derived-roles/
GET /site/{site_slug}/api/apps/{app_slug}/policies/derived-roles/

Query Parameters

ParameterTypeRequiredDefaultDescription
name_regexpstringNo.*Additional regex filter for role names
include_disabledbooleanNofalseInclude disabled policies

Success Response (200 OK)

{
"success": true,
"message": "Derived roles retrieved successfully",
"status_code": 200,
"data": [
{
"apiVersion": "api.cerbos.dev/v1",
"derivedRoles": {
"name": "public_crm_common_roles",
"definitions": [...]
}
}
],
"total": 1
}

cURL Example

curl -X GET "http://localhost:8000/api/apps/crm/policies/derived-roles/" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"

6. Inspect Policies

Inspect policies to retrieve detailed metadata including actions, variables, attributes, derived roles, and constants.

GET /api/apps/{app_slug}/policies/inspect/
GET /site/{site_slug}/api/apps/{app_slug}/policies/inspect/

Query Parameters

ParameterTypeRequiredDefaultDescription
policy_idstring (repeatable)NoNoneSpecific policy IDs to inspect
name_regexpstringNo.*Regex filter for policy names
scope_regexpstringNoAuto-filtered by tenantRegex filter for scope
version_regexpstringNoNoneRegex filter for version
include_disabledbooleanNofalseInclude disabled policies

Success Response (200 OK)

{
"success": true,
"message": "Policies inspected successfully",
"status_code": 200,
"data": {
"results": {
"resource.invoice-sales_invoices.default/public_crm": {
"policyId": "resource.invoice-sales_invoices.default/public_crm",
"actions": ["read", "create", "update", "delete"],
"derivedRoles": [
{"name": "owner", "kind": "IMPORTED", "source": "common_roles"}
],
"variables": [
{"name": "is_owner", "kind": "LOCAL", "value": "R.attr.owner_id == P.id"}
],
"attributes": [
{"name": "tenant_id", "kind": "PRINCIPAL_ATTRIBUTE"},
{"name": "status", "kind": "RESOURCE_ATTRIBUTE"}
]
}
}
},
"total": 1
}

cURL Example

curl -X GET "http://localhost:8000/api/apps/crm/policies/inspect/?name_regexp=invoice" \
-H "Authorization: Bearer YOUR_JWT_TOKEN"

Authorization Endpoints (Django Ninja API)

These endpoints are used for runtime authorization checks. They use the Cerbos gRPC and HTTP APIs.

1. Check Resources (gRPC)

Check if a principal has permissions to perform actions on resources.

POST /api/apps/{app_slug}/check/resources
POST /site/{site_slug}/api/apps/{app_slug}/check/resources

Request Body

{
"principal": {
"id": "user_123",
"roles": ["admin"],
"attr": {"department": "sales"}
},
"resources": [
{
"resource": {
"kind": "invoice-sales_invoices",
"id": "inv_001",
"attr": {"owner_id": "user_456"}
},
"actions": ["read", "update", "delete"]
}
]
}

Response

{
"requestId": "test",
"results": [
{
"resource": {
"id": "inv_001",
"kind": "invoice-sales_invoices",
"policyVersion": "default",
"scope": "public_crm"
},
"actions": {
"read": "EFFECT_ALLOW",
"update": "EFFECT_ALLOW",
"delete": "EFFECT_DENY"
},
"meta": {
"effectiveDerivedRoles": ["owner"]
}
}
],
"cerbosCallId": "01HHENANTHFD5DV3HZGDKB87PJ"
}

cURL Example

curl -X POST "http://localhost:8000/api/apps/crm/check/resources" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"principal": {"id": "user_123", "roles": ["admin"]},
"resources": [
{
"resource": {"kind": "invoice-sales_invoices", "id": "inv_001"},
"actions": ["read", "update"]
}
]
}'

2. Plan Resources (gRPC)

Get a query plan from Cerbos for row-level filtering. Returns filter conditions that can be applied to database queries.

POST /api/apps/{app_slug}/plan/resources
POST /site/{site_slug}/api/apps/{app_slug}/plan/resources

Request Body

{
"principal": {
"id": "user_123",
"roles": ["viewer"],
"attr": {"department": "sales"}
},
"resource": {
"kind": "invoice-sales_invoices",
"policy_version": "default"
},
"action": "read"
}

Response

{
"filter_kind": "CONDITIONAL",
"condition": {
"expression": {
"operator": "eq",
"operands": [
{"variable": "request.resource.attr.owner_id"},
{"value": "user_123"}
]
}
}
}

Filter Kinds:

  • ALWAYS_ALLOWED: No filtering needed, user has full access
  • CONDITIONAL: Apply the returned condition to filter results
  • ALWAYS_DENIED: User has no access to any resources

cURL Example

curl -X POST "http://localhost:8000/api/apps/crm/plan/resources" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"principal": {"id": "user_123", "roles": ["viewer"]},
"resource": {"kind": "invoice-sales_invoices"},
"action": "read"
}'

3. AuthZEN Access Evaluation (HTTP Proxy)

Single access evaluation using the AuthZEN standard format. This endpoint proxies directly to Cerbos's native AuthZEN HTTP endpoint, only injecting the tenant scope.

POST /api/apps/{app_slug}/access/v1/evaluation
POST /site/{site_slug}/api/apps/{app_slug}/access/v1/evaluation

Request Body (AuthZEN Standard)

{
"subject": {
"type": "user",
"id": "user_123",
"properties": {
"cerbos.roles": ["admin"],
"department": "sales"
}
},
"resource": {
"type": "invoice-sales_invoices",
"id": "inv_001",
"properties": {
"owner_id": "user_456"
}
},
"action": {
"name": "read"
}
}

Response (from Cerbos)

{
"decision": true,
"context": {
"id": "01HHENANTHFD5DV3HZGDKB87PJ",
"reason_admin": {
"effectiveDerivedRoles": ["owner"]
}
}
}

cURL Example

curl -X POST "http://localhost:8000/api/apps/crm/access/v1/evaluation" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"subject": {
"type": "user",
"id": "user_123",
"properties": {"cerbos.roles": ["admin"]}
},
"resource": {
"type": "invoice-sales_invoices",
"id": "inv_001"
},
"action": {"name": "read"}
}'

4. AuthZEN Batch Evaluation (HTTP Proxy)

Batch access evaluation for multiple actions/resources. Proxies directly to Cerbos's native AuthZEN HTTP endpoint.

POST /api/apps/{app_slug}/access/v1/evaluations
POST /site/{site_slug}/api/apps/{app_slug}/access/v1/evaluations

Request Body (Cerbos AuthZEN Batch)

{
"subject": {
"type": "user",
"id": "user_123",
"properties": {
"cerbos.policyVersion": "default",
"cerbos.roles": ["admin"]
}
},
"resource": {
"type": "invoice-sales_invoices",
"id": "inv_001",
"properties": {}
},
"context": {
"cerbos.requestId": "batch_001",
"cerbos.includeMeta": true
},
"evaluations": [
{"action": {"name": "read"}},
{"action": {"name": "update"}},
{
"resource": {"type": "invoice-sales_invoices", "id": "inv_002"},
"action": {"name": "delete"}
}
],
"options": {
"evaluations_semantic": "execute_all"
}
}

Response (from Cerbos)

{
"evaluations": [
{"decision": true, "context": {...}},
{"decision": true, "context": {...}},
{"decision": false, "context": {...}}
]
}

cURL Example

curl -X POST "http://localhost:8000/api/apps/crm/access/v1/evaluations" \
-H "Authorization: Bearer YOUR_JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"subject": {
"type": "user",
"id": "user_123",
"properties": {"cerbos.roles": ["admin"]}
},
"resource": {"type": "invoice-sales_invoices", "id": "inv_001"},
"evaluations": [
{"action": {"name": "read"}},
{"action": {"name": "update"}}
]
}'

Using Policy Management in Internal Functions

This section explains how to use policy management functions directly within your internal code (views, signals, services, Celery tasks).

Architecture Overview

cloud_site/policy/views.py (PolicyViewSet - REST API)

cloud_site/policy/services/policy_service.py (PolicyService)

core/authorization/services/cerbos_service.py (CerbosService)

core/authorization/clients/cerbos_admin_client.py (CerbosAdminClient)

Cerbos SDK → Cerbos Server

For internal use, interact with the PolicyService layer.

Step 1: Import Required Components

from core.authorization.context import AuthorizationContext, build_context_from_request
from cloud_site.policy.services import get_policy_service

Step 2: Build Authorization Context

Option A: From HTTP Request (Views, API Endpoints)

from core.authorization.context import build_context_from_request

def my_view(request):
# Automatically extracts tenant_id, app_slug, user_id, roles
context = build_context_from_request(request)

Option B: Manual Construction (Signals, Celery Tasks, Management Commands)

from django.db import connection
from core.authorization.context import AuthorizationContext

# From current tenant context
context = AuthorizationContext(
tenant_id=connection.tenant.schema_name,
app_slug="my_app",
user_id="user_123",
roles=("admin", "user"),
)

# For system/background tasks
context = AuthorizationContext(
tenant_id="public",
app_slug="crm",
user_id="system",
roles=("admin",),
)

Step 3: Get Policy Service Instance

from cloud_site.policy.services import get_policy_service

policy_service = get_policy_service()

Step 4: Create or Update Policies

The create_or_update_policy() method follows 3 conditional scenarios based on the presence of rules and policy type:

Scenario 1: No Rules + Resource/Role Type

Creates unrestricted policies at BOTH default and scoped levels:

# No rules provided - creates unrestricted at both levels
policy_data = {
"policy_type": "resource",
"entity_type": "datatable",
"name": "users"
# No rules - will create unrestricted policies
}

result = policy_service.create_or_update_policy(
policy_data=policy_data,
context=context,
)

# Creates TWO policies:
# 1. Default level (scope=None): resource.datatable-users.default (DENY all)
# 2. Scoped level: resource.datatable-users.default/public_crm (ALLOW all)

Scenario 2: Rules + Resource/Role Type

Creates unrestricted base at default level + custom policy at scoped level:

# Rules provided - creates unrestricted base + scoped with custom rules
policy_data = {
"policy_type": "resource",
"entity_type": "invoice",
"name": "sales_invoices",
"rules": [
{
"actions": ["read", "create", "update"],
"effect": "EFFECT_ALLOW",
"roles": ["admin", "manager"]
},
{
"actions": ["read"],
"effect": "EFFECT_ALLOW",
"derived_roles": ["owner"]
}
],
"metadata": {
"description": "Sales invoices access control"
}
}

result = policy_service.create_or_update_policy(
policy_data=policy_data,
context=context,
)

# Creates TWO policies:
# 1. Default level (scope=None): resource.invoice-sales_invoices.default (DENY all - unrestricted)
# 2. Scoped level: resource.invoice-sales_invoices.default/public_crm (your custom rules)

print(f"Created policy: {result['policy_id']}")
# Output: Created policy: invoice-sales_invoices.default/public_crm

Scenario 3: Principal/Derived Role Type

Creates scoped policy only with custom rules:

# Principal or Derived Role - creates scoped policy only
policy_data = {
"policy_type": "principal",
"principal": "user:123",
"rules": [
{
"resource": "invoice:*",
"actions": [
{
"action": "read",
"effect": "EFFECT_ALLOW"
}
]
}
]
}

result = policy_service.create_or_update_policy(
policy_data=policy_data,
context=context,
)

# Creates ONE policy:
# - Scoped level only: principal.user_123.default/public_crm (custom rules)

Step 5: Delete Policies

Using delete_policy() for Custom Policies

# Use full Cerbos policy ID (get from inspect API or create result)
policy_id = "resource.invoice-sales_invoices.default/public_crm"

policy_service.delete_policy(
policy_id=policy_id,
context=context,
)

Using delete_system_policy() for Unrestricted Policies

Use delete_system_policy() to delete system policies that were created without rules (unrestricted policies). This method deletes BOTH default and scoped levels.

Method Signature:

def delete_system_policy(
self,
policy_type: str, # "resource" or "role"
name: str, # Policy name
entity_type: str | None, # Required for resource, None for role
context: AuthorizationContext
) -> dict[str, Any]

When to Use:

  • Use delete_system_policy(): For policies created without rules (Scenario 1)
  • Use delete_policy(): For policies created with custom rules (Scenario 2 & 3)

Example 1: Delete Resource System Policy

from cloud_site.policy.services import get_policy_service
from core.authorization.context import AuthorizationContext

policy_service = get_policy_service()
context = AuthorizationContext(
tenant_id="public",
app_slug="crm",
user_id="admin",
roles=("admin",)
)

# Delete resource system policy (deletes both default and scoped)
result = policy_service.delete_system_policy(
policy_type="resource",
entity_type="datatable",
name="users",
context=context
)

# Deletes TWO policies:
# 1. resource.datatable-users.default (default level)
# 2. resource.datatable-users.default/public_crm (scoped level)

print(result)
# Output: {"deleted_policies": [...], "errors": []}

Example 2: Delete Role System Policy

# Delete role system policy (deletes both default and scoped)
result = policy_service.delete_system_policy(
policy_type="role",
name="admin",
entity_type=None, # Not required for role policies
context=context
)

# Deletes TWO policies:
# 1. role.admin.default (default level)
# 2. role.admin.default/public_crm (scoped level)

Complete Examples

Example 1: Creating Policy in a Django View

from django.http import JsonResponse
from core.authorization.context import build_context_from_request
from cloud_site.policy.services import get_policy_service
from base.responses import AppDataResponse

def create_invoice_policy(request):
try:
context = build_context_from_request(request)

policy_data = {
"policy_type": "resource",
"entity_type": "invoice",
"name": "sales_invoices",
"rules": [
{
"actions": ["*"],
"effect": "EFFECT_ALLOW",
"roles": ["admin"]
},
{
"actions": ["read", "update"],
"effect": "EFFECT_ALLOW",
"roles": ["manager"],
"condition": {
"match": {
"expr": "R.attr.department == P.attr.department"
}
}
}
]
}

policy_service = get_policy_service()
result = policy_service.create_or_update_policy(
policy_data=policy_data,
context=context,
)

return AppDataResponse.created(
data={"policy_id": result["policy_id"]},
message="Policy created successfully"
)

except Exception as e:
return AppDataResponse.error(
error_key="UNKNOWN_ERROR",
message=f"Failed to create policy: {str(e)}"
)

Example 2: Auto-Create Policy in Django Signals

from django.db.models.signals import post_save
from django.dispatch import receiver
from django.db import connection
from core.authorization.context import AuthorizationContext
from cloud_site.policy.services import get_policy_service
from cloud_site.data.models import DataTable

@receiver(post_save, sender=DataTable)
def create_datatable_policy(sender, instance, created, **kwargs):
if not created:
return

try:
context = AuthorizationContext(
tenant_id=connection.tenant.schema_name,
app_slug=instance.app.slug,
user_id="system",
roles=("admin",),
)

policy_data = {
"policy_type": "resource",
"entity_type": "datatable",
"name": instance.name,
"rules": [
{"actions": ["*"], "effect": "EFFECT_ALLOW", "roles": ["admin"]},
{"actions": ["read"], "effect": "EFFECT_ALLOW", "roles": ["viewer"]}
]
}

policy_service = get_policy_service()
result = policy_service.create_or_update_policy(policy_data, context)
print(f"✅ Created policy: {result['policy_id']}")

except Exception as e:
print(f"❌ Failed to create policy: {e}")

Example 3: Creating Derived Role Policies

from core.authorization.context import build_context_from_request
from cloud_site.policy.services import get_policy_service

def setup_ownership_roles(request):
context = build_context_from_request(request)
policy_service = get_policy_service()

# Step 1: Create derived role policy
derived_role_data = {
"policy_type": "derived_role",
"name": "ownership_roles", # Will be prefixed with tenant_id_app_slug_
"definitions": [
{
"name": "owner",
"parentRoles": ["user"],
"condition": {"match": {"expr": "R.attr.owner_id == P.id"}}
},
{
"name": "collaborator",
"parentRoles": ["user"],
"condition": {"match": {"expr": "P.id in R.attr.collaborator_ids"}}
}
]
}

result1 = policy_service.create_or_update_policy(derived_role_data, context)

# Step 2: Create resource policy that imports these derived roles
resource_policy_data = {
"policy_type": "resource",
"entity_type": "invoice",
"name": "sales_invoices",
"import_derived_roles": ["ownership_roles"], # Use unprefixed name!
"rules": [
{
"actions": ["read", "update"],
"effect": "EFFECT_ALLOW",
"derived_roles": ["owner"]
},
{
"actions": ["read"],
"effect": "EFFECT_ALLOW",
"derived_roles": ["collaborator"]
}
]
}

result2 = policy_service.create_or_update_policy(resource_policy_data, context)

return {
"derived_role_policy": result1["policy_id"],
"resource_policy": result2["policy_id"]
}

Example 4: Celery Background Task

from celery import shared_task
from django_tenants.utils import schema_context
from core.authorization.context import AuthorizationContext
from cloud_site.policy.services import get_policy_service

@shared_task
def create_policy_task(tenant_id, app_slug, policy_data):
with schema_context(tenant_id):
context = AuthorizationContext(
tenant_id=tenant_id,
app_slug=app_slug,
user_id="celery-worker",
roles=("admin",)
)

policy_service = get_policy_service()
result = policy_service.create_or_update_policy(
policy_data=policy_data,
context=context
)

return {"success": True, "policy_id": result["policy_id"]}

# Usage:
# create_policy_task.delay("public", "crm", {...})

Example 5: List and Inspect Policies

from core.authorization.context import build_context_from_request
from cloud_site.policy.services import get_policy_service

def get_all_invoice_policies(request):
context = build_context_from_request(request)
policy_service = get_policy_service()

# List policies with filters
policies = policy_service.list_policies(
filters={"name_regexp": "invoice.*"},
context=context
)

# Inspect policies for detailed metadata
inspection = policy_service.inspect_policies(
filters={"name_regexp": "invoice.*"},
context=context
)

# List derived roles
derived_roles = policy_service.list_derived_roles(
filters={"include_disabled": False},
context=context
)

return {
"policies": policies,
"inspection": inspection,
"derived_roles": derived_roles
}

Validation Before Creation

Use serializers for input validation:

from cloud_site.policy.serializers import PolicyCreateSerializer
from cloud_site.policy.services import get_policy_service
from core.authorization.context import build_context_from_request

def create_validated_policy(request, policy_data):
# Validate input
serializer = PolicyCreateSerializer(data=policy_data)
serializer.is_valid(raise_exception=True)

# Build context and create
context = build_context_from_request(request)
policy_service = get_policy_service()

return policy_service.create_or_update_policy(
policy_data=serializer.validated_data,
context=context
)

Error Handling

from base.responses.exceptions import (
ValidationException,
ResourceNotFoundException,
AppException
)

try:
result = policy_service.create_or_update_policy(policy_data, context)
except ValidationException as e:
print(f"Validation error: {e.message}")
except ResourceNotFoundException as e:
print(f"Resource not found: {e.message}")
except AppException as e:
print(f"Error: {e.message}, Status: {e.status_code}")

Important Notes

  1. Idempotency: create_or_update_policy() is idempotent - safe to call multiple times
  2. Auto Base Policy: Resource policies automatically create an unrestricted base policy with scope=None and DENY rules for proper Cerbos policy chain completion
  3. Scope Injection: Scope is automatically set to {tenant_id}_{app_slug} (e.g., public_crm)
  4. Derived Role Naming: Names are prefixed with {tenant_id}_{app_slug}_ for tenant isolation
  5. Policy IDs: Use full Cerbos format for deletions (get from inspect API or create result)

Policy Types Reference

Resource Policy

Controls what actions can be performed on specific resources.

{
"policy_type": "resource",
"name": "resource_name",
"entity_type": "entity_type",
"import_derived_roles": ["role1", "role2"],
"rules": [
{
"actions": ["action1", "action2"],
"effect": "EFFECT_ALLOW|EFFECT_DENY",
"roles": ["role1"],
"derived_roles": ["derived_role1"],
"condition": {"match": {"expr": "..."}}
}
]
}

Policy ID Format: resource.{entity_type}-{name}.default/{scope}

Principal Policy

Controls what a specific user/principal can do.

{
"policy_type": "principal",
"name": "principal_id",
"rules": [
{
"resource": "entity_type-resource_name",
"actions": [
{"action": "action_name", "effect": "EFFECT_ALLOW|EFFECT_DENY"}
]
}
]
}

Policy ID Format: principal.{name}.default/{scope}

Role Policy

Defines role-based permissions with inheritance.

{
"policy_type": "role",
"name": "role_name",
"parent_roles": ["parent_role1"],
"rules": [
{
"resource": "entity_type-resource_name",
"allowActions": ["action1", "action2"]
}
]
}

Policy ID Format: role.{name}/{scope}

Derived Role Policy

Defines dynamic roles with conditions.

{
"policy_type": "derived_role",
"name": "role_set_name",
"definitions": [
{
"name": "role_name",
"parentRoles": ["parent_role"],
"condition": {"match": {"expr": "..."}}
}
],
"variables": {"local": {"var_name": "expression"}},
"constants": {"local": {"const_name": value}}
}

Policy ID Format: derived_roles.{tenant_id}_{app_slug}_{name}


Error Responses

Standard Error Format

{
"success": false,
"message": "Error description",
"status_code": 400,
"errors": {"detail": "Detailed error message"}
}

HTTP Status Codes

CodeDescription
200Success (GET, DELETE)
201Created (POST)
400Bad Request (validation error)
401Unauthorized (missing/invalid JWT)
403Forbidden (permission denied)
404Not Found (policy doesn't exist)
500Internal Server Error

Best Practices

1. Use Meaningful Policy Names

{
"name": "sales_invoices",
"entity_type": "invoice"
}

2. Use Specific Actions (Avoid Wildcards)

{"actions": ["read", "update"]}  // Good
{"actions": ["*"]} // Avoid in production

3. Leverage Derived Roles for Dynamic Permissions

{
"import_derived_roles": ["ownership_roles"],
"rules": [
{"actions": ["read", "update"], "derived_roles": ["owner"]}
]
}

4. Use Conditions for Fine-Grained Control

{
"condition": {
"match": {
"expr": "R.attr.department == P.attr.department && R.attr.status != 'archived'"
}
}
}

5. Use Inspection for Debugging

GET /api/apps/crm/policies/inspect/?name_regexp=invoice

See Also