Serverless Deployment
Deploy Tork governance in serverless environments. Run AI governance on AWS Lambda, Google Cloud Functions, Azure Functions, and Vercel Edge with optimized cold start performance.
Platform Overview
Supported serverless platforms
Tork is optimized for serverless environments with minimal dependencies, fast initialization, and efficient memory usage. The governance engine loads in under 100ms on warm starts.
AWS Lambda
Deploy Tork governance as Lambda functions
Deploy Tork as an AWS Lambda function for on-demand AI governance. Use with API Gateway for HTTP endpoints or invoke directly from other AWS services.
"""
AWS Lambda handler for Tork AI Governance.
Provides serverless governance evaluation for AI requests.
"""
import json
import os
from typing import Any
from tork import TorkClient
from tork.core.engine import GovernanceEngine
from tork.detectors.pii import PIIDetector
# Initialize outside handler for warm start reuse
# These persist across invocations in the same container
tork_client = None
governance_engine = None
def initialize():
"""Initialize Tork components (called once per container)."""
global tork_client, governance_engine
if tork_client is None:
tork_client = TorkClient(
api_key=os.environ.get("TORK_API_KEY"),
# Disable features not needed in Lambda
enable_caching=False,
enable_metrics=False,
)
if governance_engine is None:
governance_engine = GovernanceEngine(
policies_path=os.environ.get("TORK_POLICIES_PATH", "/opt/policies"),
)
# Pre-warm the PII detector
PIIDetector().detect("warmup")
def lambda_handler(event: dict, context: Any) -> dict:
"""
Main Lambda handler for governance evaluation.
Args:
event: Lambda event (API Gateway or direct invocation)
context: Lambda context object
Returns:
API Gateway response or direct result
"""
initialize()
# Parse request body
if "body" in event:
# API Gateway invocation
try:
body = json.loads(event["body"]) if isinstance(event["body"], str) else event["body"]
except json.JSONDecodeError:
return {
"statusCode": 400,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"error": "Invalid JSON body"}),
}
else:
# Direct invocation
body = event
# Extract governance request
action = body.get("action", "evaluate")
payload = body.get("payload", {})
agent_id = body.get("agent_id", "lambda-default")
try:
if action == "evaluate":
# Evaluate payload against governance policies
result = governance_engine.evaluate(
payload=payload,
agent_id=agent_id,
context={
"lambda_function": context.function_name,
"request_id": context.aws_request_id,
},
)
response_body = {
"decision": result.decision,
"violations": result.violations,
"modified_payload": result.modified_payload,
"pii_found": [
{"type": p.type, "redacted": p.redacted}
for p in result.pii_matches
],
"request_id": context.aws_request_id,
}
elif action == "redact":
# PII redaction only
text = body.get("text", "")
detector = PIIDetector()
matches = detector.detect(text)
redacted = detector.redact(text, matches)
response_body = {
"original": text,
"redacted": redacted,
"matches": [
{"type": m.type, "start": m.start, "end": m.end}
for m in matches
],
}
elif action == "health":
response_body = {
"status": "healthy",
"function": context.function_name,
"memory_mb": context.memory_limit_in_mb,
"remaining_time_ms": context.get_remaining_time_in_millis(),
}
else:
return {
"statusCode": 400,
"headers": {"Content-Type": "application/json"},
"body": json.dumps({"error": f"Unknown action: {action}"}),
}
# Return response
if "body" in event:
# API Gateway format
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"X-Request-Id": context.aws_request_id,
},
"body": json.dumps(response_body),
}
else:
# Direct invocation
return response_body
except Exception as e:
error_response = {
"error": str(e),
"error_type": type(e).__name__,
"request_id": context.aws_request_id,
}
if "body" in event:
return {
"statusCode": 500,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(error_response),
}
else:
raiseSAM Template
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Tork AI Governance Lambda Function
Globals:
Function:
Timeout: 30
MemorySize: 512
Runtime: python3.11
Architectures:
- arm64 # Graviton2 for better price/performance
Parameters:
TorkApiKey:
Type: String
NoEcho: true
Description: Tork API Key
Resources:
TorkGovernanceFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: tork-governance
CodeUri: src/
Handler: lambda_handler.lambda_handler
Description: AI governance evaluation function
Environment:
Variables:
TORK_API_KEY: !Ref TorkApiKey
TORK_POLICIES_PATH: /opt/policies
Layers:
- !Ref TorkLayer
Events:
ApiGateway:
Type: Api
Properties:
Path: /governance/{proxy+}
Method: ANY
RestApiId: !Ref TorkApi
Policies:
- Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- logs:CreateLogGroup
- logs:CreateLogStream
- logs:PutLogEvents
Resource: '*'
TorkLayer:
Type: AWS::Serverless::LayerVersion
Properties:
LayerName: tork-dependencies
Description: Tork governance dependencies
ContentUri: layer/
CompatibleRuntimes:
- python3.11
CompatibleArchitectures:
- arm64
Metadata:
BuildMethod: python3.11
TorkApi:
Type: AWS::Serverless::Api
Properties:
Name: tork-governance-api
StageName: prod
Cors:
AllowOrigin: "'*'"
AllowMethods: "'POST,GET,OPTIONS'"
AllowHeaders: "'Content-Type,Authorization'"
Outputs:
ApiEndpoint:
Description: API Gateway endpoint URL
Value: !Sub "https://${TorkApi}.execute-api.${AWS::Region}.amazonaws.com/prod/"
FunctionArn:
Description: Lambda function ARN
Value: !GetAtt TorkGovernanceFunction.Arn# Deploy with SAM
sam build
sam deploy --guided
# Test the function
aws lambda invoke \
--function-name tork-governance \
--payload '{"action": "evaluate", "payload": {"message": "My SSN is 123-45-6789"}}' \
response.json
cat response.jsonLambda Layer Creation
Package Tork as a reusable Lambda layer
Create a Lambda Layer containing Tork and its dependencies for reuse across multiple functions. This reduces deployment size and improves cold start times.
#!/bin/bash
# Build Tork Lambda Layer for ARM64 (Graviton2)
set -e
LAYER_DIR="layer"
PYTHON_VERSION="3.11"
echo "Building Tork Lambda Layer..."
# Clean previous build
rm -rf $LAYER_DIR
mkdir -p $LAYER_DIR/python
# Install dependencies into layer
pip install \
--platform manylinux2014_aarch64 \
--implementation cp \
--python-version $PYTHON_VERSION \
--only-binary=:all: \
--target $LAYER_DIR/python \
tork
# Add governance policies
mkdir -p $LAYER_DIR/policies
cp policies/*.yaml $LAYER_DIR/policies/
# Create layer zip
cd $LAYER_DIR
zip -r ../tork-layer.zip .
cd ..
echo "Layer built: tork-layer.zip"
echo "Size: $(du -h tork-layer.zip | cut -f1)"
# Publish layer
aws lambda publish-layer-version \
--layer-name tork-governance \
--description "Tork AI Governance Library" \
--zip-file fileb://tork-layer.zip \
--compatible-runtimes python3.11 \
--compatible-architectures arm64Optimized requirements.txt
# Minimal dependencies for Lambda
tork>=1.0.0
# Exclude optional heavy dependencies
# --no-deps for tork, then add only what's needed:
pydantic>=2.0.0
regex>=2023.0.0
# Don't include: torch, transformers (use API-based detection instead)Cold Start Optimization
Minimize serverless startup latency
Cold starts can add 500ms-3s to Lambda invocations. Use these strategies to minimize startup time.
"""
Cold-start optimized Lambda handler.
"""
import os
# Module-level initialization (persists across warm starts)
_tork_client = None
_pii_detector = None
def get_tork_client():
"""Lazy initialization of Tork client."""
global _tork_client
if _tork_client is None:
from tork import TorkClient
_tork_client = TorkClient(
api_key=os.environ.get("TORK_API_KEY"),
# Use lightweight mode for serverless
mode="lightweight",
)
return _tork_client
def get_pii_detector():
"""Lazy initialization of PII detector."""
global _pii_detector
if _pii_detector is None:
from tork.detectors.pii import PIIDetector
_pii_detector = PIIDetector(
# Use regex-based detection (faster than ML)
use_ml=False,
)
return _pii_detector
def lambda_handler(event, context):
"""Handler with lazy loading."""
action = event.get("action", "evaluate")
if action == "evaluate":
client = get_tork_client()
return client.evaluate(event.get("payload", {}))
elif action == "redact":
detector = get_pii_detector()
text = event.get("text", "")
matches = detector.detect(text)
return {"redacted": detector.redact(text, matches)}
return {"error": "Unknown action"}Enable Provisioned Concurrency
# In SAM template
TorkGovernanceFunction:
Type: AWS::Serverless::Function
Properties:
# ... other properties
ProvisionedConcurrencyConfig:
ProvisionedConcurrentExecutions: 5
AutoPublishAlias: live
DeploymentPreference:
Type: AllAtOnceGoogle Cloud Functions
Deploy on Google Cloud Platform
Deploy Tork as a Google Cloud Function (Gen 2) with HTTP triggers and Pub/Sub integration.
"""
Google Cloud Function for Tork AI Governance.
"""
import functions_framework
import json
import os
from flask import jsonify, Request
from tork import TorkClient
from tork.core.engine import GovernanceEngine
# Initialize globally for instance reuse
tork = TorkClient(api_key=os.environ.get("TORK_API_KEY"))
engine = GovernanceEngine()
@functions_framework.http
def governance_handler(request: Request):
"""
HTTP Cloud Function for governance evaluation.
Args:
request: Flask request object
Returns:
JSON response with governance result
"""
# Handle CORS
if request.method == "OPTIONS":
headers = {
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "POST, GET, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization",
"Access-Control-Max-Age": "3600",
}
return ("", 204, headers)
headers = {"Access-Control-Allow-Origin": "*"}
try:
request_json = request.get_json(silent=True)
if not request_json:
return jsonify({"error": "No JSON body provided"}), 400, headers
action = request_json.get("action", "evaluate")
payload = request_json.get("payload", {})
if action == "evaluate":
result = engine.evaluate(
payload=payload,
agent_id=request_json.get("agent_id", "gcf-default"),
)
return jsonify({
"decision": result.decision,
"violations": result.violations,
"modified_payload": result.modified_payload,
"pii_found": len(result.pii_matches),
}), 200, headers
elif action == "redact":
text = request_json.get("text", "")
result = tork.redact(text)
return jsonify(result), 200, headers
elif action == "health":
return jsonify({"status": "healthy"}), 200, headers
else:
return jsonify({"error": f"Unknown action: {action}"}), 400, headers
except Exception as e:
return jsonify({
"error": str(e),
"error_type": type(e).__name__,
}), 500, headers
@functions_framework.cloud_event
def pubsub_governance(cloud_event):
"""
Pub/Sub triggered governance for async processing.
Triggered by messages to a Pub/Sub topic for batch
governance evaluation.
"""
import base64
# Decode Pub/Sub message
message_data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
payload = json.loads(message_data)
# Evaluate governance
result = engine.evaluate(
payload=payload,
agent_id=payload.get("agent_id", "pubsub-default"),
)
# Log result (or publish to another topic)
print(json.dumps({
"event_id": cloud_event["id"],
"decision": result.decision,
"violations": result.violations,
}))# Deploy HTTP function
gcloud functions deploy tork-governance \
--gen2 \
--runtime=python311 \
--region=us-central1 \
--source=. \
--entry-point=governance_handler \
--trigger-http \
--allow-unauthenticated \
--memory=512MB \
--timeout=60s \
--min-instances=1 \
--max-instances=100 \
--set-env-vars="TORK_API_KEY=$TORK_API_KEY"
# Deploy Pub/Sub function
gcloud functions deploy tork-governance-pubsub \
--gen2 \
--runtime=python311 \
--region=us-central1 \
--source=. \
--entry-point=pubsub_governance \
--trigger-topic=ai-governance-requests \
--memory=512MB \
--set-env-vars="TORK_API_KEY=$TORK_API_KEY"Azure Functions
Deploy on Microsoft Azure
Deploy Tork as an Azure Function with the Python v2 programming model.
"""
Azure Functions app for Tork AI Governance.
"""
import azure.functions as func
import json
import os
import logging
from tork import TorkClient
from tork.core.engine import GovernanceEngine
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
# Initialize Tork
tork = TorkClient(api_key=os.environ.get("TORK_API_KEY"))
engine = GovernanceEngine()
@app.route(route="governance", methods=["POST"])
def governance_http(req: func.HttpRequest) -> func.HttpResponse:
"""
HTTP trigger for governance evaluation.
"""
logging.info("Governance evaluation request received")
try:
req_body = req.get_json()
except ValueError:
return func.HttpResponse(
json.dumps({"error": "Invalid JSON body"}),
status_code=400,
mimetype="application/json",
)
action = req_body.get("action", "evaluate")
payload = req_body.get("payload", {})
try:
if action == "evaluate":
result = engine.evaluate(
payload=payload,
agent_id=req_body.get("agent_id", "azure-default"),
)
return func.HttpResponse(
json.dumps({
"decision": result.decision,
"violations": result.violations,
"modified_payload": result.modified_payload,
}),
status_code=200,
mimetype="application/json",
)
elif action == "redact":
text = req_body.get("text", "")
result = tork.redact(text)
return func.HttpResponse(
json.dumps(result),
status_code=200,
mimetype="application/json",
)
else:
return func.HttpResponse(
json.dumps({"error": f"Unknown action: {action}"}),
status_code=400,
mimetype="application/json",
)
except Exception as e:
logging.error(f"Governance error: {e}")
return func.HttpResponse(
json.dumps({"error": str(e)}),
status_code=500,
mimetype="application/json",
)
@app.blob_trigger(
arg_name="blob",
path="ai-requests/{name}",
connection="AzureWebJobsStorage"
)
def governance_blob(blob: func.InputStream):
"""
Blob trigger for batch governance processing.
Processes AI request files uploaded to blob storage.
"""
logging.info(f"Processing blob: {blob.name}")
content = blob.read().decode("utf-8")
requests = json.loads(content)
results = []
for req in requests:
result = engine.evaluate(
payload=req.get("payload", {}),
agent_id=req.get("agent_id", "blob-batch"),
)
results.append({
"id": req.get("id"),
"decision": result.decision,
"violations": result.violations,
})
logging.info(f"Processed {len(results)} requests")
# Results can be written to another blob or queue
@app.queue_trigger(
arg_name="msg",
queue_name="governance-queue",
connection="AzureWebJobsStorage"
)
def governance_queue(msg: func.QueueMessage):
"""
Queue trigger for async governance processing.
"""
payload = json.loads(msg.get_body().decode("utf-8"))
result = engine.evaluate(
payload=payload.get("payload", {}),
agent_id=payload.get("agent_id", "queue-default"),
)
logging.info(f"Queue message processed: decision={result.decision}"){
"version": "2.0",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
},
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"functionTimeout": "00:05:00"
}# Deploy to Azure
func azure functionapp publish tork-governance-app \
--python
# Set environment variables
az functionapp config appsettings set \
--name tork-governance-app \
--resource-group tork-rg \
--settings TORK_API_KEY=$TORK_API_KEYVercel Edge Functions
Deploy at the edge for lowest latency
Deploy Tork governance at the edge using Vercel Edge Functions. Ideal for global applications requiring low-latency governance checks.
/**
* Vercel Edge Function for Tork AI Governance
*
* Runs at the edge for low-latency governance evaluation.
*/
import { NextRequest, NextResponse } from 'next/server';
// Edge runtime configuration
export const runtime = 'edge';
export const preferredRegion = ['iad1', 'sfo1', 'fra1']; // Multi-region
// Tork governance client (edge-compatible)
const TORK_API_URL = process.env.TORK_API_URL || 'https://api.tork.ai';
const TORK_API_KEY = process.env.TORK_API_KEY;
interface GovernanceRequest {
action: 'evaluate' | 'redact' | 'health';
payload?: Record<string, unknown>;
text?: string;
agent_id?: string;
}
interface GovernanceResult {
decision: 'allow' | 'block' | 'modify';
violations: string[];
modified_payload?: Record<string, unknown>;
pii_found?: Array<{ type: string; redacted: boolean }>;
}
async function evaluateGovernance(
payload: Record<string, unknown>,
agentId: string
): Promise<GovernanceResult> {
const response = await fetch(`${TORK_API_URL}/v1/evaluate`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${TORK_API_KEY}`,
},
body: JSON.stringify({
payload,
agent_id: agentId,
}),
});
if (!response.ok) {
throw new Error(`Tork API error: ${response.status}`);
}
return response.json();
}
async function redactPII(text: string): Promise<{ redacted: string; matches: unknown[] }> {
const response = await fetch(`${TORK_API_URL}/v1/redact`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${TORK_API_KEY}`,
},
body: JSON.stringify({ text }),
});
if (!response.ok) {
throw new Error(`Tork API error: ${response.status}`);
}
return response.json();
}
export async function POST(request: NextRequest) {
const startTime = Date.now();
try {
const body: GovernanceRequest = await request.json();
const { action = 'evaluate', payload, text, agent_id = 'vercel-edge' } = body;
let result: unknown;
switch (action) {
case 'evaluate':
if (!payload) {
return NextResponse.json(
{ error: 'Missing payload for evaluate action' },
{ status: 400 }
);
}
result = await evaluateGovernance(payload, agent_id);
break;
case 'redact':
if (!text) {
return NextResponse.json(
{ error: 'Missing text for redact action' },
{ status: 400 }
);
}
result = await redactPII(text);
break;
case 'health':
result = {
status: 'healthy',
region: process.env.VERCEL_REGION || 'unknown',
latency_ms: Date.now() - startTime,
};
break;
default:
return NextResponse.json(
{ error: `Unknown action: ${action}` },
{ status: 400 }
);
}
return NextResponse.json({
...result as object,
_meta: {
latency_ms: Date.now() - startTime,
region: process.env.VERCEL_REGION,
},
});
} catch (error) {
console.error('Governance error:', error);
return NextResponse.json(
{
error: error instanceof Error ? error.message : 'Unknown error',
_meta: {
latency_ms: Date.now() - startTime,
region: process.env.VERCEL_REGION,
},
},
{ status: 500 }
);
}
}
export async function GET() {
return NextResponse.json({
status: 'healthy',
service: 'tork-governance-edge',
region: process.env.VERCEL_REGION,
});
}Middleware for Automatic Governance
/**
* Next.js Middleware for automatic AI API governance.
*
* Intercepts requests to /api/ai/* and applies governance
* before forwarding to the AI provider.
*/
import { NextRequest, NextResponse } from 'next/server';
export const config = {
matcher: '/api/ai/:path*',
};
const TORK_API_URL = process.env.TORK_API_URL || 'https://api.tork.ai';
const TORK_API_KEY = process.env.TORK_API_KEY;
export async function middleware(request: NextRequest) {
// Only process POST requests with JSON body
if (request.method !== 'POST') {
return NextResponse.next();
}
try {
const body = await request.json();
// Evaluate against governance policies
const governanceResponse = await fetch(`${TORK_API_URL}/v1/evaluate`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${TORK_API_KEY}`,
},
body: JSON.stringify({
payload: body,
agent_id: 'vercel-middleware',
context: {
path: request.nextUrl.pathname,
method: request.method,
},
}),
});
if (!governanceResponse.ok) {
console.error('Governance API error:', governanceResponse.status);
return NextResponse.next();
}
const result = await governanceResponse.json();
// Block if governance denies
if (result.decision === 'block') {
return NextResponse.json(
{
error: 'Request blocked by governance policy',
violations: result.violations,
},
{ status: 403 }
);
}
// Forward with modified payload if needed
if (result.decision === 'modify' && result.modified_payload) {
const modifiedRequest = new NextRequest(request.url, {
method: request.method,
headers: request.headers,
body: JSON.stringify(result.modified_payload),
});
return NextResponse.next({ request: modifiedRequest });
}
return NextResponse.next();
} catch (error) {
console.error('Middleware error:', error);
// Fail open - allow request to proceed
return NextResponse.next();
}
}Event-Driven Governance
Trigger governance from AWS events
Use event-driven patterns to govern AI data flows. Trigger governance checks from S3 uploads, DynamoDB streams, and other AWS events.
S3 Trigger for Document Governance
"""
S3-triggered Lambda for document governance.
Scans uploaded documents for sensitive content before
they're processed by AI systems.
"""
import json
import boto3
import urllib.parse
from tork import TorkClient
from tork.scanner import ContentScanner
s3 = boto3.client("s3")
tork = TorkClient()
scanner = ContentScanner()
def lambda_handler(event, context):
"""
Process S3 upload events for governance.
Triggered when files are uploaded to the AI input bucket.
Scans content and moves to appropriate destination.
"""
results = []
for record in event["Records"]:
bucket = record["s3"]["bucket"]["name"]
key = urllib.parse.unquote_plus(record["s3"]["object"]["key"])
print(f"Processing: s3://{bucket}/{key}")
# Download file content
response = s3.get_object(Bucket=bucket, Key=key)
content = response["Body"].read()
# Determine content type
content_type = response.get("ContentType", "application/octet-stream")
# Scan for sensitive content
if content_type.startswith("text/") or key.endswith((".json", ".txt", ".md")):
text_content = content.decode("utf-8")
scan_result = scanner.scan(text_content)
governance_result = tork.evaluate({
"content": text_content,
"filename": key,
"content_type": content_type,
})
else:
# Binary file - check metadata only
governance_result = tork.evaluate({
"filename": key,
"content_type": content_type,
"size_bytes": len(content),
})
scan_result = {"findings": []}
# Determine action based on governance
if governance_result.decision == "block":
# Move to quarantine bucket
destination_bucket = f"{bucket}-quarantine"
action = "quarantined"
elif governance_result.decision == "modify":
# Redact and save modified version
destination_bucket = f"{bucket}-processed"
if governance_result.modified_payload:
content = json.dumps(governance_result.modified_payload).encode()
action = "modified"
else:
# Move to approved bucket
destination_bucket = f"{bucket}-approved"
action = "approved"
# Copy to destination
s3.put_object(
Bucket=destination_bucket,
Key=key,
Body=content,
Metadata={
"governance-decision": governance_result.decision,
"governance-violations": ",".join(governance_result.violations),
},
)
# Delete from source
s3.delete_object(Bucket=bucket, Key=key)
results.append({
"key": key,
"action": action,
"decision": governance_result.decision,
"violations": governance_result.violations,
"findings": len(scan_result.get("findings", [])),
})
print(f"Processed {key}: {action}")
return {
"processed": len(results),
"results": results,
}API Gateway Integration
# SAM template for API Gateway with Lambda authorizer
Resources:
GovernanceApi:
Type: AWS::Serverless::Api
Properties:
StageName: prod
Auth:
DefaultAuthorizer: TorkAuthorizer
Authorizers:
TorkAuthorizer:
FunctionArn: !GetAtt AuthorizerFunction.Arn
AuthorizerFunction:
Type: AWS::Serverless::Function
Properties:
Handler: authorizer.lambda_handler
Runtime: python3.11
CodeUri: src/
Environment:
Variables:
TORK_API_KEY: !Ref TorkApiKey
# Authorizer that checks governance before allowing request
# authorizer.py content:
#
# def lambda_handler(event, context):
# token = event.get("authorizationToken", "")
# method_arn = event["methodArn"]
#
# # Validate token and check governance
# if is_valid_and_governed(token):
# return generate_policy("user", "Allow", method_arn)
# else:
# return generate_policy("user", "Deny", method_arn)
AIProxyFunction:
Type: AWS::Serverless::Function
Properties:
Handler: proxy.lambda_handler
Runtime: python3.11
CodeUri: src/
Timeout: 120
MemorySize: 1024
Events:
ProxyApi:
Type: Api
Properties:
RestApiId: !Ref GovernanceApi
Path: /ai/{proxy+}
Method: ANYDynamoDB Streams for Audit Logging
"""
DynamoDB Streams handler for governance audit logging.
Captures all governance decisions and stores them for
compliance and analytics.
"""
import json
import boto3
from datetime import datetime
from tork.audit import AuditLogger
dynamodb = boto3.resource("dynamodb")
audit_table = dynamodb.Table("tork-governance-audit")
audit_logger = AuditLogger()
def lambda_handler(event, context):
"""
Process DynamoDB stream events for audit logging.
"""
for record in event["Records"]:
if record["eventName"] not in ["INSERT", "MODIFY"]:
continue
# Extract governance decision from new image
new_image = record["dynamodb"].get("NewImage", {})
# Parse DynamoDB format to regular dict
decision_data = parse_dynamodb_item(new_image)
# Create audit record
audit_record = {
"audit_id": f"audit-{context.aws_request_id}-{record['eventID']}",
"timestamp": datetime.utcnow().isoformat(),
"event_type": record["eventName"],
"decision": decision_data.get("decision"),
"agent_id": decision_data.get("agent_id"),
"violations": decision_data.get("violations", []),
"pii_detected": decision_data.get("pii_count", 0),
"source_table": record["eventSourceARN"].split("/")[1],
}
# Store in audit table
audit_table.put_item(Item=audit_record)
# Send to external audit system if configured
audit_logger.log(audit_record)
return {"processed": len(event["Records"])}
def parse_dynamodb_item(item):
"""Convert DynamoDB item format to regular dict."""
result = {}
for key, value in item.items():
if "S" in value:
result[key] = value["S"]
elif "N" in value:
result[key] = int(value["N"])
elif "L" in value:
result[key] = [parse_dynamodb_item(v) if "M" in v else v.get("S") for v in value["L"]]
elif "M" in value:
result[key] = parse_dynamodb_item(value["M"])
return resultCost Optimization
Minimize serverless costs
Optimize your serverless deployment for cost-efficiency without sacrificing performance.
| Strategy | Savings | Trade-off |
|---|---|---|
| ARM64 architecture | 20% | Must verify dependency compatibility |
| Right-size memory | 10-50% | May increase duration |
| Provisioned concurrency | -10 to +30% | Fixed cost vs per-request |
| Lambda Layers | 5-10% | Version management complexity |
| Async processing | 20-40% | Not suitable for real-time |
| Request batching | 30-60% | Increased latency |
"""
Batch processor for cost-efficient governance.
Processes multiple requests in a single Lambda invocation
to reduce per-request overhead.
"""
import json
from concurrent.futures import ThreadPoolExecutor
from tork import TorkClient
tork = TorkClient()
def lambda_handler(event, context):
"""
Process batch of governance requests.
Accepts array of requests and processes them
in parallel for efficiency.
"""
requests = event.get("requests", [])
if not requests:
return {"error": "No requests provided", "processed": 0}
# Process in parallel with thread pool
results = []
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(process_request, req, i)
for i, req in enumerate(requests)
]
for future in futures:
results.append(future.result())
# Aggregate statistics
allowed = sum(1 for r in results if r["decision"] == "allow")
blocked = sum(1 for r in results if r["decision"] == "block")
modified = sum(1 for r in results if r["decision"] == "modify")
return {
"processed": len(results),
"summary": {
"allowed": allowed,
"blocked": blocked,
"modified": modified,
},
"results": results,
}
def process_request(request, index):
"""Process a single governance request."""
try:
result = tork.evaluate(
payload=request.get("payload", {}),
agent_id=request.get("agent_id", f"batch-{index}"),
)
return {
"index": index,
"decision": result.decision,
"violations": result.violations,
"success": True,
}
except Exception as e:
return {
"index": index,
"decision": "error",
"error": str(e),
"success": False,
}Monitoring & Observability
Track serverless governance metrics
Monitor your serverless Tork deployment with CloudWatch metrics and custom dashboards.
"""
Lambda handler with CloudWatch metrics integration.
"""
import os
import time
import boto3
from tork import TorkClient
cloudwatch = boto3.client("cloudwatch")
tork = TorkClient()
NAMESPACE = "Tork/Governance"
def put_metric(name, value, unit="Count", dimensions=None):
"""Publish metric to CloudWatch."""
metric_data = {
"MetricName": name,
"Value": value,
"Unit": unit,
}
if dimensions:
metric_data["Dimensions"] = [
{"Name": k, "Value": v} for k, v in dimensions.items()
]
cloudwatch.put_metric_data(
Namespace=NAMESPACE,
MetricData=[metric_data],
)
def lambda_handler(event, context):
"""Handler with metrics instrumentation."""
start_time = time.time()
try:
payload = event.get("payload", {})
agent_id = event.get("agent_id", "default")
# Evaluate governance
result = tork.evaluate(payload=payload, agent_id=agent_id)
# Record metrics
duration_ms = (time.time() - start_time) * 1000
put_metric("Invocations", 1, dimensions={"AgentId": agent_id})
put_metric("Duration", duration_ms, unit="Milliseconds")
put_metric(f"Decision_{result.decision.title()}", 1, dimensions={"AgentId": agent_id})
put_metric("ViolationCount", len(result.violations))
put_metric("PIIDetected", len(result.pii_matches))
if result.decision == "block":
put_metric("BlockedRequests", 1, dimensions={"AgentId": agent_id})
return {
"decision": result.decision,
"violations": result.violations,
"duration_ms": duration_ms,
}
except Exception as e:
put_metric("Errors", 1)
raiseCloudWatch Dashboard
{
"widgets": [
{
"type": "metric",
"properties": {
"title": "Governance Decisions",
"metrics": [
["Tork/Governance", "Decision_Allow", {"color": "#22c55e"}],
[".", "Decision_Block", {"color": "#ef4444"}],
[".", "Decision_Modify", {"color": "#f59e0b"}]
],
"period": 60,
"stat": "Sum"
}
},
{
"type": "metric",
"properties": {
"title": "Invocation Latency",
"metrics": [
["Tork/Governance", "Duration", {"stat": "p50"}],
[".", ".", {"stat": "p99"}]
],
"period": 60
}
},
{
"type": "metric",
"properties": {
"title": "Cold Starts",
"metrics": [
["AWS/Lambda", "ConcurrentExecutions", "FunctionName", "tork-governance"],
[".", "Invocations", ".", "."]
],
"period": 60
}
},
{
"type": "metric",
"properties": {
"title": "Error Rate",
"metrics": [
["Tork/Governance", "Errors"],
["AWS/Lambda", "Errors", "FunctionName", "tork-governance"]
],
"period": 60
}
}
]
}Troubleshooting
Common issues and solutions
Next Steps
Now that your serverless deployment is configured, explore these resources: