Taruvi SDK Guide for APP Mode Functions
Complete reference for using the Taruvi SDK within APP mode serverless functions.
Function Signature
APP mode functions must use this exact signature:
def main(params, user_data, sdk_client):
"""
APP mode function entry point.
Args:
params: User parameters + params['__function__'] metadata
user_data: Dict with id, username, email, full_name
sdk_client: Pre-authenticated Taruvi SDK client
Returns:
Any JSON-serializable data
"""
pass
Parameters Explained
params - Dictionary containing:
- User-provided parameters from function invocation
params['__function__']- Function metadata (name, slug, execution_mode, etc.)params['request']- HTTP request metadata (method, path, full_path, query_params, headers, content_type, scheme, host, origin, remote_addr, body). Empty dict{}for scheduled/event-triggered functions. Sensitive headers (Authorization, Cookie, X-Session-Token) are excluded.
user_data - Dictionary containing authenticated user info:
{
"id": 123,
"username": "alice",
"email": "[email protected]",
"full_name": "Alice Smith"
}
sdk_client - Pre-authenticated Taruvi SDK client
- Already authenticated with current user context
- Ready to use - no need to call auth methods
- Provides access to all platform features
Database Module
CRUD operations on DataTables.
Methods
sdk_client.database.from_(table_name) # Returns QueryBuilder (chainable)
sdk_client.database.get(table_name, record_id) # Get single record
sdk_client.database.create(table_name, data) # Create record
sdk_client.database.update(table_name, record_id, data) # Update record
sdk_client.database.delete(table_name, record_id) # Delete record
QueryBuilder Chaining
.filter(field, operator, value) # Operators: eq, ne, gt, gte, lt, lte, in, contains
.sort(field, direction) # direction: "asc" or "desc"
.page_size(limit) # Limit results
.page(number) # Page number
.populate(*fields) # Eager load relationships
.search(query) # Full-text search
.aggregate(*expressions) # e.g., "sum(price)", "count(*)"
.group_by(*fields) # Group aggregation results
.having(condition) # Filter aggregated results
.edges() # Target edges table for relationships
.format(type) # "tree" or "graph" for hierarchical data
.include(direction) # "descendants", "ancestors", or "both"
.depth(n) # Max traversal depth
.types(relationship_types) # Filter by relationship types
.execute() # Returns {"data": [...], "total": N}
.first() # Returns first record or None
.count() # Returns count of matching records
Examples
Simple Query:
def main(params, user_data, sdk_client):
result = sdk_client.database.from_("users").execute()
return {"users": result["data"], "total": result["total"]}
Query with Filters:
def main(params, user_data, sdk_client):
result = sdk_client.database.from_("users")\
.filter("status", "eq", "active")\
.filter("age", "gte", 18)\
.sort("created_at", "desc")\
.page_size(10)\
.execute()
return {"users": result["data"], "total": result["total"]}
Get Single Record:
def main(params, user_data, sdk_client):
user_id = params.get("user_id")
user = sdk_client.database.get("users", user_id)
return {"user": user}
Create Record:
def main(params, user_data, sdk_client):
new_user = sdk_client.database.create("users", {
"username": params.get("username"),
"email": params.get("email"),
"status": "active"
})
return {"created": True, "user": new_user}
Update Record:
def main(params, user_data, sdk_client):
user_id = params.get("user_id")
updated_user = sdk_client.database.update("users", user_id, {
"status": "inactive"
})
return {"updated": True, "user": updated_user}
Delete Record:
def main(params, user_data, sdk_client):
user_id = params.get("user_id")
sdk_client.database.delete("users", record_id=user_id)
return {"deleted": True, "user_id": user_id}
Functions Module
Execute other serverless functions and retrieve results.
Methods
sdk_client.functions.execute(function_slug, params, is_async=False, timeout=None)
sdk_client.functions.get_result(task_id)
sdk_client.functions.list(limit=None, offset=None)
sdk_client.functions.get(function_slug)
sdk_client.functions.list_invocations(function_slug, status=None, limit=None, offset=None)
Examples
Execute Sync Function:
def main(params, user_data, sdk_client):
result = sdk_client.functions.execute(
"send-email",
params={
"to": "[email protected]",
"subject": "Welcome"
}
)
return {"email_sent": True, "result": result}
Execute Async Function:
def main(params, user_data, sdk_client):
result = sdk_client.functions.execute(
"process-large-dataset",
params={"dataset_id": 123},
is_async=True
)
task_id = result['invocation']['celery_task_id']
return {"task_started": True, "task_id": task_id}
Get Async Result:
def main(params, user_data, sdk_client):
task_id = params.get("task_id")
result = sdk_client.functions.get_result(task_id)
return {
"status": result['data']['status'],
"result": result['data'].get('result')
}
List Functions:
def main(params, user_data, sdk_client):
functions = sdk_client.functions.list(limit=10)
return {"functions": functions['data']}
Secrets Module
Access secrets securely without hardcoding credentials.
Methods
sdk_client.secrets.get(key, app=None, tags=None)
sdk_client.secrets.list(keys=None, search=None, app=None, tags=None, secret_type=None, include_metadata=False, page=None, page_size=None)
Examples
Get Single Secret:
def main(params, user_data, sdk_client):
api_key = sdk_client.secrets.get("SENDGRID_API_KEY")
# Use the secret
send_email_with_api_key(api_key["value"])
return {"sent": True}
Get Multiple Secrets:
def main(params, user_data, sdk_client):
secrets = sdk_client.secrets.list(keys=[
"SENDGRID_API_KEY",
"STRIPE_SECRET_KEY",
"AWS_ACCESS_KEY"
])
sendgrid_key = secrets["SENDGRID_API_KEY"]["value"]
stripe_key = secrets["STRIPE_SECRET_KEY"]["value"]
return {"keys_loaded": True}
List Secrets:
def main(params, user_data, sdk_client):
secrets = sdk_client.secrets.list(
search="API",
tags=["production"]
)
return {"secrets": secrets['data']}
⚠️ Critical Rules
- Never hardcode secrets in function code
- Always use secrets.get() to retrieve credentials
- Handle missing secrets gracefully with try/except
- Never log secret values - only log keys
Analytics Module
Execute saved analytics queries with parameters.
Methods
sdk_client.analytics.execute(query_slug, params=None)
Returns
{
"data": [...], # Query results
"total": 100, # Row count
"execution_key": "..." # Execution tracking key
}
Examples
Simple Query Execution:
def main(params, user_data, sdk_client):
result = sdk_client.analytics.execute("monthly-revenue")
return {"revenue": result["data"]}
Query with Parameters:
def main(params, user_data, sdk_client):
result = sdk_client.analytics.execute(
"user-signups",
params={
"start_date": "2024-01-01",
"end_date": "2024-12-31",
"group_by": "month"
}
)
return {
"signups": result["data"],
"total": result["total"]
}
Query with Dynamic Filters:
def main(params, user_data, sdk_client):
region = params.get("region", "US")
category = params.get("category", "all")
result = sdk_client.analytics.execute(
"sales-by-region",
params={
"region": region,
"product_category": category
}
)
return result["data"]
Notes
- Queries must be created first (use MCP analytics tools or API)
- Supports Jinja2 templating in query definitions
- Uses secrets for database credentials
- Results are cached based on execution_key
Storage Module
Upload, download, and manage files in storage buckets.
Methods
# Bucket management
sdk_client.storage.list_buckets(search=None, visibility=None, app_category=None, ordering=None, page=None, page_size=None)
sdk_client.storage.create_bucket(name, slug=None, visibility="private", file_size_limit=None, allowed_mime_types=None, app_category=None, max_size_bytes=None, max_objects=None)
sdk_client.storage.get_bucket(slug)
sdk_client.storage.update_bucket(slug, name=None, visibility=None, file_size_limit=None, allowed_mime_types=None, app_category=None, max_size_bytes=None, max_objects=None)
sdk_client.storage.delete_bucket(slug)
# File operations (chainable via from_)
sdk_client.storage.from_(bucket_name).upload(files, paths, metadatas=None)
sdk_client.storage.from_(bucket_name).download(file_path)
sdk_client.storage.from_(bucket_name).list()
sdk_client.storage.from_(bucket_name).filter(search=None, mimetype=None, visibility=None, ordering=None, page=None, page_size=None).list()
sdk_client.storage.from_(bucket_name).update(file_path, metadata=None, visibility=None)
sdk_client.storage.from_(bucket_name).delete(paths)
sdk_client.storage.from_(bucket_name).copy_object(source_path, destination_path, destination_bucket=None)
sdk_client.storage.from_(bucket_name).move_object(source_path, destination_path)
Upload Parameters
The upload() method uses the batch upload endpoint internally:
| Parameter | Type | Required | Description |
|---|---|---|---|
files | list[tuple[str, BinaryIO]] | Yes | List of (filename, file_object) tuples |
paths | list[str] | Yes | Storage paths for each file (must match files length) |
metadatas | list[dict] | No | Optional metadata dict per file (max 2KB each per S3 limits) |
Constraints: Max 10 files per request. Each file validated against bucket's file_size_limit and allowed_mime_types.
Response: Returns the data field from the API response containing uploaded_count, failed_count, successful, and failed arrays.
Examples
Upload File:
def main(params, user_data, sdk_client):
with open("report.pdf", "rb") as f:
result = sdk_client.storage.from_("documents").upload(
files=[("report.pdf", f)],
paths=["uploads/report.pdf"],
metadatas=[{"department": "finance"}]
)
return {"uploaded": True, "result": result}
Upload Multiple Files:
def main(params, user_data, sdk_client):
files = [
("photo1.jpg", open("photo1.jpg", "rb")),
("photo2.jpg", open("photo2.jpg", "rb")),
]
result = sdk_client.storage.from_("images").upload(
files=files,
paths=["gallery/photo1.jpg", "gallery/photo2.jpg"],
metadatas=[{"album": "vacation"}, {"album": "vacation"}]
)
return {"uploaded_count": result.get("uploaded_count", 0)}
Download File:
def main(params, user_data, sdk_client):
file_path = params.get("file_path")
file_data = sdk_client.storage.from_("documents").download(file_path)
# file_data is raw bytes
return {"downloaded": True, "size": len(file_data)}
List and Filter Files:
def main(params, user_data, sdk_client):
# List all files
files = sdk_client.storage.from_("documents").list()
# List with filters
images = (
sdk_client.storage.from_("uploads")
.filter(mimetype_category="image", ordering="-created_at", page_size=20)
.list()
)
return {"files": files, "images": images}
Update File Metadata:
def main(params, user_data, sdk_client):
result = sdk_client.storage.from_("documents").update(
"reports/q1.pdf",
metadata={"status": "reviewed", "reviewer": "alice"},
visibility="public"
)
return {"updated": result}
Delete Files:
def main(params, user_data, sdk_client):
paths = params.get("paths", [])
sdk_client.storage.from_("documents").delete(paths)
return {"deleted": True, "count": len(paths)}
Copy Object:
def main(params, user_data, sdk_client):
# Copy within same bucket
result = sdk_client.storage.from_("images").copy_object(
"users/123/avatar.jpg",
"users/456/avatar.jpg"
)
# Copy to different bucket
result = sdk_client.storage.from_("uploads").copy_object(
"temp/file.pdf",
"archive/file.pdf",
destination_bucket="archives"
)
return {"copied": result}
Move/Rename Object:
def main(params, user_data, sdk_client):
result = sdk_client.storage.from_("documents").move_object(
"temp/draft.pdf",
"final/report-2024.pdf"
)
return {"moved": result}
Create Bucket with Quotas:
def main(params, user_data, sdk_client):
bucket = sdk_client.storage.create_bucket(
name="User Uploads",
slug="user-uploads",
visibility="private",
file_size_limit=10485760, # 10MB per file
allowed_mime_types=["image/*", "application/pdf"],
app_category="attachments",
max_size_bytes=1073741824, # 1GB total bucket quota
max_objects=1000 # Max 1000 files
)
return {"created": True, "bucket": bucket}
Users Module
Manage users and retrieve user information.
Methods
sdk_client.users.get(username) # Get user by username
sdk_client.users.create(data) # Create user (dict with username, email, password, confirm_password, ...)
sdk_client.users.update(username, data) # Update user (dict with fields to update)
sdk_client.users.delete(username) # Delete user
sdk_client.users.list(search=None, is_active=None, is_staff=None, roles=None, page=None, page_size=None, **kwargs) # List users (kwargs for reference attribute filters)
sdk_client.users.apps(username) # Get user's apps
sdk_client.users.assign_roles(roles, usernames, expires_at=None) # Bulk assign roles
sdk_client.users.revoke_roles(roles, usernames) # Bulk revoke roles
Examples
Get User:
def main(params, user_data, sdk_client):
username = params.get("username")
user = sdk_client.users.get(username)
return {"user": user["data"]}
List Users:
def main(params, user_data, sdk_client):
users = sdk_client.users.list(
search="alice",
is_active=True,
page=1,
page_size=10
)
return {"users": users["data"], "total": users["total"]}
List Users by Reference Attribute:
def main(params, user_data, sdk_client):
dept_id = params.get("department_id")
users = sdk_client.users.list(department_id=dept_id, is_active=True)
return {"users": users["data"], "total": users["total"]}
Create User:
def main(params, user_data, sdk_client):
new_user = sdk_client.users.create({
"username": params.get("username"),
"email": params.get("email"),
"password": params.get("password"),
"confirm_password": params.get("password"),
"first_name": params.get("first_name"),
"last_name": params.get("last_name"),
"is_active": True
})
return {"created": True, "user": new_user}
Assign Roles:
def main(params, user_data, sdk_client):
result = sdk_client.users.assign_roles(
roles=["admin", "editor"],
usernames=["alice", "bob"]
)
return {"message": result["message"]}
Get User's Apps:
def main(params, user_data, sdk_client):
username = params.get("username")
apps = sdk_client.users.apps(username)
return {"apps": apps}
Update User:
def main(params, user_data, sdk_client):
username = params.get("username")
updated_user = sdk_client.users.update(username, {
"first_name": params.get("first_name"),
"is_active": True
})
return {"updated": True, "user": updated_user}
Policy Module
Check authorization permissions and filter allowed resources using Cerbos-based policies.
Methods
sdk_client.policy.check_resources(resources, principal=None, aux_data=None)
sdk_client.policy.filter_allowed(resources, actions, principal=None, aux_data=None)
sdk_client.policy.get_allowed_actions(resource, actions=None, principal=None, aux_data=None)
Resource Kind Format
The kind field in resources follows a specific format based on the entity type:
Format: {entity_type}:{resource_name}
| Entity Type | Format | Example |
|---|---|---|
| DataTable | datatable:{table_name} | datatable:users, datatable:orders |
| Function | function:{function_slug} | function:send-email, function:process-data |
| Storage | storage:{bucket_name} | storage:documents, storage:images |
| Query | query:{query_slug} | query:monthly-revenue, query:user-signups |
| Custom | {custom_type}:{name} | invoice:sales_invoices, project:active_projects |
System Entity Types and Actions
| Entity Type | Available Actions |
|---|---|
datatable | read, update, create, delete |
function | execute |
query | execute |
storage | read, update, create, delete |
Principal Parameter
⚠️ Important: In APP mode functions, the principal parameter should be set to None. The principal is automatically populated from the API key/JWT token used to authenticate the SDK client.
# ✅ CORRECT - Let the system auto-populate principal from API key
result = sdk_client.policy.check_resources(
resources=[...],
principal=None # Auto-populated from authenticated context
)
# ❌ WRONG - Don't manually specify principal in APP mode
result = sdk_client.policy.check_resources(
resources=[...],
principal={"id": user_data["id"], "roles": ["user"]} # Not needed!
)
Examples
Check Permissions on DataTable:
def main(params, user_data, sdk_client):
result = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "datatable:users", # Format: entity_type:resource_name
"id": "users",
"attr": {"owner_id": user_data["id"]} # Optional resource attributes
},
"actions": ["read", "update", "delete"]
}],
principal=None # Auto-populated from API key
)
can_read = result["results"][0]["actions"]["read"] == "EFFECT_ALLOW"
can_update = result["results"][0]["actions"]["update"] == "EFFECT_ALLOW"
can_delete = result["results"][0]["actions"]["delete"] == "EFFECT_ALLOW"
return {
"can_read": can_read,
"can_update": can_update,
"can_delete": can_delete
}
Check Function Execution Permission:
def main(params, user_data, sdk_client):
result = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "function:send-email", # Format: function:{slug}
"id": "send-email"
},
"actions": ["execute"]
}],
principal=None # Auto-populated from API key
)
can_execute = result["results"][0]["actions"]["execute"] == "EFFECT_ALLOW"
return {"can_execute": can_execute}
Check Storage Permissions:
def main(params, user_data, sdk_client):
result = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "storage:documents", # Format: storage:{bucket_name}
"id": "documents",
"attr": {"visibility": "private"}
},
"actions": ["read", "create", "delete"]
}],
principal=None # Auto-populated from API key
)
actions = result["results"][0]["actions"]
return {
"can_read": actions["read"] == "EFFECT_ALLOW",
"can_create": actions["create"] == "EFFECT_ALLOW",
"can_delete": actions["delete"] == "EFFECT_ALLOW"
}
Check Custom Resource Type:
def main(params, user_data, sdk_client):
# For custom entity types (non-system), use your own type:name format
result = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "invoice:sales_invoices", # Custom type: invoice
"id": "inv_001",
"attr": {
"owner_id": user_data["id"],
"status": "pending",
"amount": 1500.00
}
},
"actions": ["read", "update", "approve", "delete"]
}],
principal=None # Auto-populated from API key
)
return {"permissions": result["results"][0]["actions"]}
Check Multiple Resources at Once:
def main(params, user_data, sdk_client):
result = sdk_client.policy.check_resources(
resources=[
{
"resource": {"kind": "datatable:users", "id": "users"},
"actions": ["read", "update"]
},
{
"resource": {"kind": "datatable:orders", "id": "orders"},
"actions": ["read", "create"]
},
{
"resource": {"kind": "function:process-order", "id": "process-order"},
"actions": ["execute"]
}
],
principal=None # Auto-populated from API key
)
return {
"users_permissions": result["results"][0]["actions"],
"orders_permissions": result["results"][1]["actions"],
"function_permissions": result["results"][2]["actions"]
}
Filter Allowed Resources:
def main(params, user_data, sdk_client):
all_tables = [
{"kind": "datatable:users", "id": "users"},
{"kind": "datatable:orders", "id": "orders"},
{"kind": "datatable:admin_logs", "id": "admin_logs"}
]
allowed = sdk_client.policy.filter_allowed(
resources=all_tables,
actions=["read", "update"],
principal=None # Auto-populated from API key
)
return {"allowed_tables": allowed}
Using Auxiliary Data:
def main(params, user_data, sdk_client):
# aux_data provides additional context for policy evaluation
result = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "datatable:sensitive_data",
"id": "sensitive_data",
"attr": {"classification": "confidential"}
},
"actions": ["read"]
}],
principal=None, # Auto-populated from API key
aux_data={
"ip_address": params.get("client_ip", "unknown"),
"time_of_day": "business_hours",
"device_type": "desktop"
}
)
return {"can_access": result["results"][0]["actions"]["read"] == "EFFECT_ALLOW"}
Response Format
The check_resources method returns a response in this format:
{
"requestId": "unique-request-id",
"results": [
{
"resource": {
"id": "users",
"kind": "datatable:users",
"policyVersion": "default",
"scope": "tenant_id_app_slug"
},
"actions": {
"read": "EFFECT_ALLOW",
"update": "EFFECT_ALLOW",
"delete": "EFFECT_DENY"
},
"validationErrors": [],
"meta": {
"actions": {...},
"effectiveDerivedRoles": [...]
}
}
],
"cerbosCallId": "..."
}
Notes
- Scope Injection: The system automatically injects the scope (
{tenant_id}_{app_slug}) into all resource checks for multi-tenant isolation - Policy Version: Defaults to "default" unless specified otherwise
- Resource Attributes: Use
attrto pass resource-specific attributes that policies can evaluate (e.g.,owner_id,status,department) - Effect Values: Actions return either
"EFFECT_ALLOW"or"EFFECT_DENY"
Auth Module
ℹ️ Usually not needed in APP mode functions
The sdk_client is already authenticated with the current user context.
Methods
sdk_client.auth.signInWithPassword(username, password)
sdk_client.auth.signInWithToken(token, token_type) # token_type: 'jwt', 'api_key', 'session_token'
sdk_client.auth.refreshToken(refresh_token)
sdk_client.auth.signOut()
sdk_client.is_authenticated # Property on the client, not auth module
Typical Usage (Recommended)
In most cases, you don't need auth methods because sdk_client is already authenticated:
def main(params, user_data, sdk_client):
# ✅ sdk_client is already authenticated as the calling user
# ✅ Just use it directly
users = sdk_client.database.from_("users").execute()
return users
Advanced Usage (If Needed)
If you need to authenticate as a different user or service account:
def main(params, user_data, sdk_client):
# Get service account API key from secrets
service_key = sdk_client.secrets.get("SERVICE_ACCOUNT_KEY")
# Authenticate as service account
service_client = sdk_client.auth.signInWithToken(
token=service_key["value"],
token_type='api_key'
)
# Use service_client for privileged operations
admin_data = service_client.database.from_("admin_logs").execute()
return {"admin_data": admin_data}
External SDK Usage
Auth methods are primarily for external SDK usage (scripts, CLIs, standalone apps):
# External Python script (NOT an APP mode function)
from taruvi import Client
client = Client(api_url='https://api.taruvi.cloud', app_slug='my-app')
# Authenticate (returns NEW client instance)
auth_client = client.auth.signInWithPassword(
username='[email protected]',
password='password'
)
# Use authenticated client
result = auth_client.database.from_("users").execute()
users = result["data"] # execute() returns {"data": [...], "total": N}
Complete Examples
Email Notification Function
def main(params, user_data, sdk_client):
# Get parameters
recipient_id = params.get("user_id")
message = params.get("message")
# Fetch recipient from database
recipient = sdk_client.database.get("users", recipient_id)
# Get SendGrid API key from secrets
sendgrid = sdk_client.secrets.get("SENDGRID_API_KEY")
# Call email function
result = sdk_client.functions.execute(
"send-email",
params={
"to": recipient["email"],
"subject": "Notification",
"body": message,
"api_key": sendgrid["value"]
}
)
return {
"sent": True,
"recipient": recipient["email"],
"message_id": result.get("message_id")
}
Data Processing Pipeline
def main(params, user_data, sdk_client):
# Get input data
dataset_id = params.get("dataset_id")
# Fetch dataset
dataset = sdk_client.database.get("datasets", dataset_id)
# Check permissions - principal auto-populated from API key
can_process = sdk_client.policy.check_resources(
resources=[{
"resource": {
"kind": "datatable:datasets", # Format: entity_type:resource_name
"id": str(dataset_id),
"attr": {"owner_id": dataset.get("owner_id")}
},
"actions": ["read", "update"] # Use valid datatable actions
}],
principal=None # Auto-populated from API key
)
if can_process["results"][0]["actions"]["read"] != "EFFECT_ALLOW":
return {"error": "Permission denied"}
# Execute processing function asynchronously
result = sdk_client.functions.execute(
"process-dataset",
params={"dataset_id": dataset_id},
is_async=True
)
# Upload results to storage
task_id = result['invocation']['celery_task_id']
return {
"processing": True,
"task_id": task_id,
"dataset": dataset["name"]
}
Report Generation Function
def main(params, user_data, sdk_client):
# Get date range from params
start_date = params.get("start_date")
end_date = params.get("end_date")
# Execute analytics query
revenue_data = sdk_client.analytics.execute(
"monthly-revenue",
params={
"start_date": start_date,
"end_date": end_date,
"group_by": "month"
}
)
# Get user signups
signup_data = sdk_client.analytics.execute(
"user-signups",
params={
"start_date": start_date,
"end_date": end_date
}
)
# Generate report content
report_content = f"""
Revenue Report
Period: {start_date} to {end_date}
Total Revenue: ${sum(r['revenue'] for r in revenue_data['data'])}
New Signups: {signup_data['total']}
"""
# Upload report to storage
with open("/tmp/report.txt", "w") as f:
f.write(report_content)
with open("/tmp/report.txt", "rb") as f:
sdk_client.storage.from_("reports").upload(
files=[("report.txt", f)],
paths=[f"reports/{start_date}_to_{end_date}.txt"]
)
return {
"report_generated": True,
"total_revenue": sum(r['revenue'] for r in revenue_data['data']),
"new_signups": signup_data['total']
}
Critical Rules
❌ Don't Do This
- Don't create new Client() - Use provided
sdk_client - Don't call auth methods -
sdk_clientis already authenticated - Don't hardcode secrets - Always use
secrets.get() - Don't skip error handling - User input can be anything
- Don't assume records exist - Always check before operations
✅ Do This
- Validate all params - Check for required fields
- Handle exceptions gracefully - Use try/except blocks
- Return meaningful data - Include success flags and error messages
- Use secrets for credentials - Never hardcode API keys
- Check permissions - Use policy module when needed
- Log important events - For debugging and audit trails
Error Handling Pattern
def main(params, user_data, sdk_client):
try:
# Validate params
user_id = params.get("user_id")
if not user_id:
return {"success": False, "error": "user_id is required"}
# Fetch user
user = sdk_client.database.get("users", user_id)
# Get secret
api_key = sdk_client.secrets.get("API_KEY")
# Perform operation
result = perform_operation(user, api_key["value"])
return {"success": True, "result": result}
except Exception as e:
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__
}
Related Documentation
- Full SDK Docs: SDK Overview
- API Reference: REST API Documentation
- Authentication: Auth Guide
- Data Service: Data Service API
- Storage: Storage API
- Functions: Functions API