Tork
Back to Docs
Cloud-Native Deployment

Serverless Deployment

Deploy Tork governance in serverless environments. Run AI governance on AWS Lambda, Google Cloud Functions, Azure Functions, and Vercel Edge with optimized cold start performance.

Platform Overview

Supported serverless platforms

AWS
AWS Lambda
Python 3.11+ runtime
GCP
Google Cloud Functions
Gen 2 with Python
Azure
Azure Functions
Python v2 model
Vercel
Vercel Edge
Edge runtime

Tork is optimized for serverless environments with minimal dependencies, fast initialization, and efficient memory usage. The governance engine loads in under 100ms on warm starts.

AWS Lambda

Deploy Tork governance as Lambda functions

Deploy Tork as an AWS Lambda function for on-demand AI governance. Use with API Gateway for HTTP endpoints or invoke directly from other AWS services.

pythonlambda_handler.py
"""
AWS Lambda handler for Tork AI Governance.

Provides serverless governance evaluation for AI requests.
"""

import json
import os
from typing import Any

from tork import TorkClient
from tork.core.engine import GovernanceEngine
from tork.detectors.pii import PIIDetector


# Initialize outside handler for warm start reuse
# These persist across invocations in the same container
tork_client = None
governance_engine = None


def initialize():
    """Initialize Tork components (called once per container)."""
    global tork_client, governance_engine

    if tork_client is None:
        tork_client = TorkClient(
            api_key=os.environ.get("TORK_API_KEY"),
            # Disable features not needed in Lambda
            enable_caching=False,
            enable_metrics=False,
        )

    if governance_engine is None:
        governance_engine = GovernanceEngine(
            policies_path=os.environ.get("TORK_POLICIES_PATH", "/opt/policies"),
        )
        # Pre-warm the PII detector
        PIIDetector().detect("warmup")


def lambda_handler(event: dict, context: Any) -> dict:
    """
    Main Lambda handler for governance evaluation.

    Args:
        event: Lambda event (API Gateway or direct invocation)
        context: Lambda context object

    Returns:
        API Gateway response or direct result
    """
    initialize()

    # Parse request body
    if "body" in event:
        # API Gateway invocation
        try:
            body = json.loads(event["body"]) if isinstance(event["body"], str) else event["body"]
        except json.JSONDecodeError:
            return {
                "statusCode": 400,
                "headers": {"Content-Type": "application/json"},
                "body": json.dumps({"error": "Invalid JSON body"}),
            }
    else:
        # Direct invocation
        body = event

    # Extract governance request
    action = body.get("action", "evaluate")
    payload = body.get("payload", {})
    agent_id = body.get("agent_id", "lambda-default")

    try:
        if action == "evaluate":
            # Evaluate payload against governance policies
            result = governance_engine.evaluate(
                payload=payload,
                agent_id=agent_id,
                context={
                    "lambda_function": context.function_name,
                    "request_id": context.aws_request_id,
                },
            )

            response_body = {
                "decision": result.decision,
                "violations": result.violations,
                "modified_payload": result.modified_payload,
                "pii_found": [
                    {"type": p.type, "redacted": p.redacted}
                    for p in result.pii_matches
                ],
                "request_id": context.aws_request_id,
            }

        elif action == "redact":
            # PII redaction only
            text = body.get("text", "")
            detector = PIIDetector()
            matches = detector.detect(text)
            redacted = detector.redact(text, matches)

            response_body = {
                "original": text,
                "redacted": redacted,
                "matches": [
                    {"type": m.type, "start": m.start, "end": m.end}
                    for m in matches
                ],
            }

        elif action == "health":
            response_body = {
                "status": "healthy",
                "function": context.function_name,
                "memory_mb": context.memory_limit_in_mb,
                "remaining_time_ms": context.get_remaining_time_in_millis(),
            }

        else:
            return {
                "statusCode": 400,
                "headers": {"Content-Type": "application/json"},
                "body": json.dumps({"error": f"Unknown action: {action}"}),
            }

        # Return response
        if "body" in event:
            # API Gateway format
            return {
                "statusCode": 200,
                "headers": {
                    "Content-Type": "application/json",
                    "X-Request-Id": context.aws_request_id,
                },
                "body": json.dumps(response_body),
            }
        else:
            # Direct invocation
            return response_body

    except Exception as e:
        error_response = {
            "error": str(e),
            "error_type": type(e).__name__,
            "request_id": context.aws_request_id,
        }

        if "body" in event:
            return {
                "statusCode": 500,
                "headers": {"Content-Type": "application/json"},
                "body": json.dumps(error_response),
            }
        else:
            raise

SAM Template

yamltemplate.yaml
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Tork AI Governance Lambda Function

Globals:
  Function:
    Timeout: 30
    MemorySize: 512
    Runtime: python3.11
    Architectures:
      - arm64  # Graviton2 for better price/performance

Parameters:
  TorkApiKey:
    Type: String
    NoEcho: true
    Description: Tork API Key

Resources:
  TorkGovernanceFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: tork-governance
      CodeUri: src/
      Handler: lambda_handler.lambda_handler
      Description: AI governance evaluation function
      Environment:
        Variables:
          TORK_API_KEY: !Ref TorkApiKey
          TORK_POLICIES_PATH: /opt/policies
      Layers:
        - !Ref TorkLayer
      Events:
        ApiGateway:
          Type: Api
          Properties:
            Path: /governance/{proxy+}
            Method: ANY
            RestApiId: !Ref TorkApi
      Policies:
        - Version: '2012-10-17'
          Statement:
            - Effect: Allow
              Action:
                - logs:CreateLogGroup
                - logs:CreateLogStream
                - logs:PutLogEvents
              Resource: '*'

  TorkLayer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: tork-dependencies
      Description: Tork governance dependencies
      ContentUri: layer/
      CompatibleRuntimes:
        - python3.11
      CompatibleArchitectures:
        - arm64
    Metadata:
      BuildMethod: python3.11

  TorkApi:
    Type: AWS::Serverless::Api
    Properties:
      Name: tork-governance-api
      StageName: prod
      Cors:
        AllowOrigin: "'*'"
        AllowMethods: "'POST,GET,OPTIONS'"
        AllowHeaders: "'Content-Type,Authorization'"

Outputs:
  ApiEndpoint:
    Description: API Gateway endpoint URL
    Value: !Sub "https://${TorkApi}.execute-api.${AWS::Region}.amazonaws.com/prod/"
  FunctionArn:
    Description: Lambda function ARN
    Value: !GetAtt TorkGovernanceFunction.Arn
bash
# Deploy with SAM
sam build
sam deploy --guided

# Test the function
aws lambda invoke \
  --function-name tork-governance \
  --payload '{"action": "evaluate", "payload": {"message": "My SSN is 123-45-6789"}}' \
  response.json

cat response.json

Lambda Layer Creation

Package Tork as a reusable Lambda layer

Create a Lambda Layer containing Tork and its dependencies for reuse across multiple functions. This reduces deployment size and improves cold start times.

bashbuild-layer.sh
#!/bin/bash
# Build Tork Lambda Layer for ARM64 (Graviton2)

set -e

LAYER_DIR="layer"
PYTHON_VERSION="3.11"

echo "Building Tork Lambda Layer..."

# Clean previous build
rm -rf $LAYER_DIR
mkdir -p $LAYER_DIR/python

# Install dependencies into layer
pip install \
  --platform manylinux2014_aarch64 \
  --implementation cp \
  --python-version $PYTHON_VERSION \
  --only-binary=:all: \
  --target $LAYER_DIR/python \
  tork

# Add governance policies
mkdir -p $LAYER_DIR/policies
cp policies/*.yaml $LAYER_DIR/policies/

# Create layer zip
cd $LAYER_DIR
zip -r ../tork-layer.zip .
cd ..

echo "Layer built: tork-layer.zip"
echo "Size: $(du -h tork-layer.zip | cut -f1)"

# Publish layer
aws lambda publish-layer-version \
  --layer-name tork-governance \
  --description "Tork AI Governance Library" \
  --zip-file fileb://tork-layer.zip \
  --compatible-runtimes python3.11 \
  --compatible-architectures arm64

Optimized requirements.txt

textrequirements.txt
# Minimal dependencies for Lambda
tork>=1.0.0
# Exclude optional heavy dependencies
# --no-deps for tork, then add only what's needed:
pydantic>=2.0.0
regex>=2023.0.0
# Don't include: torch, transformers (use API-based detection instead)

Cold Start Optimization

Minimize serverless startup latency

Cold starts can add 500ms-3s to Lambda invocations. Use these strategies to minimize startup time.

Initialize Outside Handler
Move Tork client initialization to module level so it persists across warm invocations.
Use Provisioned Concurrency
Pre-warm Lambda instances for consistent low latency. Ideal for production workloads.
Lazy Load Heavy Modules
Import PII detectors and ML models only when needed, not at module load time.
Use ARM64 (Graviton2)
20% better price/performance and often faster cold starts than x86.
Minimize Package Size
Use Lambda Layers, exclude dev dependencies, and strip debug symbols.
Enable SnapStart (Java)
For Java runtimes, SnapStart can reduce cold starts from seconds to milliseconds.
pythonoptimized_handler.py
"""
Cold-start optimized Lambda handler.
"""

import os

# Module-level initialization (persists across warm starts)
_tork_client = None
_pii_detector = None


def get_tork_client():
    """Lazy initialization of Tork client."""
    global _tork_client
    if _tork_client is None:
        from tork import TorkClient
        _tork_client = TorkClient(
            api_key=os.environ.get("TORK_API_KEY"),
            # Use lightweight mode for serverless
            mode="lightweight",
        )
    return _tork_client


def get_pii_detector():
    """Lazy initialization of PII detector."""
    global _pii_detector
    if _pii_detector is None:
        from tork.detectors.pii import PIIDetector
        _pii_detector = PIIDetector(
            # Use regex-based detection (faster than ML)
            use_ml=False,
        )
    return _pii_detector


def lambda_handler(event, context):
    """Handler with lazy loading."""
    action = event.get("action", "evaluate")

    if action == "evaluate":
        client = get_tork_client()
        return client.evaluate(event.get("payload", {}))

    elif action == "redact":
        detector = get_pii_detector()
        text = event.get("text", "")
        matches = detector.detect(text)
        return {"redacted": detector.redact(text, matches)}

    return {"error": "Unknown action"}

Enable Provisioned Concurrency

yaml
# In SAM template
TorkGovernanceFunction:
  Type: AWS::Serverless::Function
  Properties:
    # ... other properties
    ProvisionedConcurrencyConfig:
      ProvisionedConcurrentExecutions: 5
    AutoPublishAlias: live
    DeploymentPreference:
      Type: AllAtOnce

Google Cloud Functions

Deploy on Google Cloud Platform

Deploy Tork as a Google Cloud Function (Gen 2) with HTTP triggers and Pub/Sub integration.

pythonmain.py
"""
Google Cloud Function for Tork AI Governance.
"""

import functions_framework
import json
import os
from flask import jsonify, Request

from tork import TorkClient
from tork.core.engine import GovernanceEngine


# Initialize globally for instance reuse
tork = TorkClient(api_key=os.environ.get("TORK_API_KEY"))
engine = GovernanceEngine()


@functions_framework.http
def governance_handler(request: Request):
    """
    HTTP Cloud Function for governance evaluation.

    Args:
        request: Flask request object

    Returns:
        JSON response with governance result
    """
    # Handle CORS
    if request.method == "OPTIONS":
        headers = {
            "Access-Control-Allow-Origin": "*",
            "Access-Control-Allow-Methods": "POST, GET, OPTIONS",
            "Access-Control-Allow-Headers": "Content-Type, Authorization",
            "Access-Control-Max-Age": "3600",
        }
        return ("", 204, headers)

    headers = {"Access-Control-Allow-Origin": "*"}

    try:
        request_json = request.get_json(silent=True)

        if not request_json:
            return jsonify({"error": "No JSON body provided"}), 400, headers

        action = request_json.get("action", "evaluate")
        payload = request_json.get("payload", {})

        if action == "evaluate":
            result = engine.evaluate(
                payload=payload,
                agent_id=request_json.get("agent_id", "gcf-default"),
            )

            return jsonify({
                "decision": result.decision,
                "violations": result.violations,
                "modified_payload": result.modified_payload,
                "pii_found": len(result.pii_matches),
            }), 200, headers

        elif action == "redact":
            text = request_json.get("text", "")
            result = tork.redact(text)
            return jsonify(result), 200, headers

        elif action == "health":
            return jsonify({"status": "healthy"}), 200, headers

        else:
            return jsonify({"error": f"Unknown action: {action}"}), 400, headers

    except Exception as e:
        return jsonify({
            "error": str(e),
            "error_type": type(e).__name__,
        }), 500, headers


@functions_framework.cloud_event
def pubsub_governance(cloud_event):
    """
    Pub/Sub triggered governance for async processing.

    Triggered by messages to a Pub/Sub topic for batch
    governance evaluation.
    """
    import base64

    # Decode Pub/Sub message
    message_data = base64.b64decode(cloud_event.data["message"]["data"]).decode()
    payload = json.loads(message_data)

    # Evaluate governance
    result = engine.evaluate(
        payload=payload,
        agent_id=payload.get("agent_id", "pubsub-default"),
    )

    # Log result (or publish to another topic)
    print(json.dumps({
        "event_id": cloud_event["id"],
        "decision": result.decision,
        "violations": result.violations,
    }))
bashdeploy.sh
# Deploy HTTP function
gcloud functions deploy tork-governance \
  --gen2 \
  --runtime=python311 \
  --region=us-central1 \
  --source=. \
  --entry-point=governance_handler \
  --trigger-http \
  --allow-unauthenticated \
  --memory=512MB \
  --timeout=60s \
  --min-instances=1 \
  --max-instances=100 \
  --set-env-vars="TORK_API_KEY=$TORK_API_KEY"

# Deploy Pub/Sub function
gcloud functions deploy tork-governance-pubsub \
  --gen2 \
  --runtime=python311 \
  --region=us-central1 \
  --source=. \
  --entry-point=pubsub_governance \
  --trigger-topic=ai-governance-requests \
  --memory=512MB \
  --set-env-vars="TORK_API_KEY=$TORK_API_KEY"

Azure Functions

Deploy on Microsoft Azure

Deploy Tork as an Azure Function with the Python v2 programming model.

pythonfunction_app.py
"""
Azure Functions app for Tork AI Governance.
"""

import azure.functions as func
import json
import os
import logging

from tork import TorkClient
from tork.core.engine import GovernanceEngine


app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)

# Initialize Tork
tork = TorkClient(api_key=os.environ.get("TORK_API_KEY"))
engine = GovernanceEngine()


@app.route(route="governance", methods=["POST"])
def governance_http(req: func.HttpRequest) -> func.HttpResponse:
    """
    HTTP trigger for governance evaluation.
    """
    logging.info("Governance evaluation request received")

    try:
        req_body = req.get_json()
    except ValueError:
        return func.HttpResponse(
            json.dumps({"error": "Invalid JSON body"}),
            status_code=400,
            mimetype="application/json",
        )

    action = req_body.get("action", "evaluate")
    payload = req_body.get("payload", {})

    try:
        if action == "evaluate":
            result = engine.evaluate(
                payload=payload,
                agent_id=req_body.get("agent_id", "azure-default"),
            )

            return func.HttpResponse(
                json.dumps({
                    "decision": result.decision,
                    "violations": result.violations,
                    "modified_payload": result.modified_payload,
                }),
                status_code=200,
                mimetype="application/json",
            )

        elif action == "redact":
            text = req_body.get("text", "")
            result = tork.redact(text)
            return func.HttpResponse(
                json.dumps(result),
                status_code=200,
                mimetype="application/json",
            )

        else:
            return func.HttpResponse(
                json.dumps({"error": f"Unknown action: {action}"}),
                status_code=400,
                mimetype="application/json",
            )

    except Exception as e:
        logging.error(f"Governance error: {e}")
        return func.HttpResponse(
            json.dumps({"error": str(e)}),
            status_code=500,
            mimetype="application/json",
        )


@app.blob_trigger(
    arg_name="blob",
    path="ai-requests/{name}",
    connection="AzureWebJobsStorage"
)
def governance_blob(blob: func.InputStream):
    """
    Blob trigger for batch governance processing.

    Processes AI request files uploaded to blob storage.
    """
    logging.info(f"Processing blob: {blob.name}")

    content = blob.read().decode("utf-8")
    requests = json.loads(content)

    results = []
    for req in requests:
        result = engine.evaluate(
            payload=req.get("payload", {}),
            agent_id=req.get("agent_id", "blob-batch"),
        )
        results.append({
            "id": req.get("id"),
            "decision": result.decision,
            "violations": result.violations,
        })

    logging.info(f"Processed {len(results)} requests")
    # Results can be written to another blob or queue


@app.queue_trigger(
    arg_name="msg",
    queue_name="governance-queue",
    connection="AzureWebJobsStorage"
)
def governance_queue(msg: func.QueueMessage):
    """
    Queue trigger for async governance processing.
    """
    payload = json.loads(msg.get_body().decode("utf-8"))

    result = engine.evaluate(
        payload=payload.get("payload", {}),
        agent_id=payload.get("agent_id", "queue-default"),
    )

    logging.info(f"Queue message processed: decision={result.decision}")
jsonhost.json
{
  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[4.*, 5.0.0)"
  },
  "functionTimeout": "00:05:00"
}
bash
# Deploy to Azure
func azure functionapp publish tork-governance-app \
  --python

# Set environment variables
az functionapp config appsettings set \
  --name tork-governance-app \
  --resource-group tork-rg \
  --settings TORK_API_KEY=$TORK_API_KEY

Vercel Edge Functions

Deploy at the edge for lowest latency

Deploy Tork governance at the edge using Vercel Edge Functions. Ideal for global applications requiring low-latency governance checks.

typescriptapp/api/governance/route.ts
/**
 * Vercel Edge Function for Tork AI Governance
 *
 * Runs at the edge for low-latency governance evaluation.
 */

import { NextRequest, NextResponse } from 'next/server';

// Edge runtime configuration
export const runtime = 'edge';
export const preferredRegion = ['iad1', 'sfo1', 'fra1']; // Multi-region

// Tork governance client (edge-compatible)
const TORK_API_URL = process.env.TORK_API_URL || 'https://api.tork.ai';
const TORK_API_KEY = process.env.TORK_API_KEY;

interface GovernanceRequest {
  action: 'evaluate' | 'redact' | 'health';
  payload?: Record<string, unknown>;
  text?: string;
  agent_id?: string;
}

interface GovernanceResult {
  decision: 'allow' | 'block' | 'modify';
  violations: string[];
  modified_payload?: Record<string, unknown>;
  pii_found?: Array<{ type: string; redacted: boolean }>;
}

async function evaluateGovernance(
  payload: Record<string, unknown>,
  agentId: string
): Promise<GovernanceResult> {
  const response = await fetch(`${TORK_API_URL}/v1/evaluate`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${TORK_API_KEY}`,
    },
    body: JSON.stringify({
      payload,
      agent_id: agentId,
    }),
  });

  if (!response.ok) {
    throw new Error(`Tork API error: ${response.status}`);
  }

  return response.json();
}

async function redactPII(text: string): Promise<{ redacted: string; matches: unknown[] }> {
  const response = await fetch(`${TORK_API_URL}/v1/redact`, {
    method: 'POST',
    headers: {
      'Content-Type': 'application/json',
      'Authorization': `Bearer ${TORK_API_KEY}`,
    },
    body: JSON.stringify({ text }),
  });

  if (!response.ok) {
    throw new Error(`Tork API error: ${response.status}`);
  }

  return response.json();
}

export async function POST(request: NextRequest) {
  const startTime = Date.now();

  try {
    const body: GovernanceRequest = await request.json();
    const { action = 'evaluate', payload, text, agent_id = 'vercel-edge' } = body;

    let result: unknown;

    switch (action) {
      case 'evaluate':
        if (!payload) {
          return NextResponse.json(
            { error: 'Missing payload for evaluate action' },
            { status: 400 }
          );
        }
        result = await evaluateGovernance(payload, agent_id);
        break;

      case 'redact':
        if (!text) {
          return NextResponse.json(
            { error: 'Missing text for redact action' },
            { status: 400 }
          );
        }
        result = await redactPII(text);
        break;

      case 'health':
        result = {
          status: 'healthy',
          region: process.env.VERCEL_REGION || 'unknown',
          latency_ms: Date.now() - startTime,
        };
        break;

      default:
        return NextResponse.json(
          { error: `Unknown action: ${action}` },
          { status: 400 }
        );
    }

    return NextResponse.json({
      ...result as object,
      _meta: {
        latency_ms: Date.now() - startTime,
        region: process.env.VERCEL_REGION,
      },
    });
  } catch (error) {
    console.error('Governance error:', error);
    return NextResponse.json(
      {
        error: error instanceof Error ? error.message : 'Unknown error',
        _meta: {
          latency_ms: Date.now() - startTime,
          region: process.env.VERCEL_REGION,
        },
      },
      { status: 500 }
    );
  }
}

export async function GET() {
  return NextResponse.json({
    status: 'healthy',
    service: 'tork-governance-edge',
    region: process.env.VERCEL_REGION,
  });
}

Middleware for Automatic Governance

typescriptmiddleware.ts
/**
 * Next.js Middleware for automatic AI API governance.
 *
 * Intercepts requests to /api/ai/* and applies governance
 * before forwarding to the AI provider.
 */

import { NextRequest, NextResponse } from 'next/server';

export const config = {
  matcher: '/api/ai/:path*',
};

const TORK_API_URL = process.env.TORK_API_URL || 'https://api.tork.ai';
const TORK_API_KEY = process.env.TORK_API_KEY;

export async function middleware(request: NextRequest) {
  // Only process POST requests with JSON body
  if (request.method !== 'POST') {
    return NextResponse.next();
  }

  try {
    const body = await request.json();

    // Evaluate against governance policies
    const governanceResponse = await fetch(`${TORK_API_URL}/v1/evaluate`, {
      method: 'POST',
      headers: {
        'Content-Type': 'application/json',
        'Authorization': `Bearer ${TORK_API_KEY}`,
      },
      body: JSON.stringify({
        payload: body,
        agent_id: 'vercel-middleware',
        context: {
          path: request.nextUrl.pathname,
          method: request.method,
        },
      }),
    });

    if (!governanceResponse.ok) {
      console.error('Governance API error:', governanceResponse.status);
      return NextResponse.next();
    }

    const result = await governanceResponse.json();

    // Block if governance denies
    if (result.decision === 'block') {
      return NextResponse.json(
        {
          error: 'Request blocked by governance policy',
          violations: result.violations,
        },
        { status: 403 }
      );
    }

    // Forward with modified payload if needed
    if (result.decision === 'modify' && result.modified_payload) {
      const modifiedRequest = new NextRequest(request.url, {
        method: request.method,
        headers: request.headers,
        body: JSON.stringify(result.modified_payload),
      });
      return NextResponse.next({ request: modifiedRequest });
    }

    return NextResponse.next();
  } catch (error) {
    console.error('Middleware error:', error);
    // Fail open - allow request to proceed
    return NextResponse.next();
  }
}

Event-Driven Governance

Trigger governance from AWS events

Use event-driven patterns to govern AI data flows. Trigger governance checks from S3 uploads, DynamoDB streams, and other AWS events.

S3 Trigger for Document Governance

pythons3_governance.py
"""
S3-triggered Lambda for document governance.

Scans uploaded documents for sensitive content before
they're processed by AI systems.
"""

import json
import boto3
import urllib.parse

from tork import TorkClient
from tork.scanner import ContentScanner


s3 = boto3.client("s3")
tork = TorkClient()
scanner = ContentScanner()


def lambda_handler(event, context):
    """
    Process S3 upload events for governance.

    Triggered when files are uploaded to the AI input bucket.
    Scans content and moves to appropriate destination.
    """
    results = []

    for record in event["Records"]:
        bucket = record["s3"]["bucket"]["name"]
        key = urllib.parse.unquote_plus(record["s3"]["object"]["key"])

        print(f"Processing: s3://{bucket}/{key}")

        # Download file content
        response = s3.get_object(Bucket=bucket, Key=key)
        content = response["Body"].read()

        # Determine content type
        content_type = response.get("ContentType", "application/octet-stream")

        # Scan for sensitive content
        if content_type.startswith("text/") or key.endswith((".json", ".txt", ".md")):
            text_content = content.decode("utf-8")
            scan_result = scanner.scan(text_content)

            governance_result = tork.evaluate({
                "content": text_content,
                "filename": key,
                "content_type": content_type,
            })
        else:
            # Binary file - check metadata only
            governance_result = tork.evaluate({
                "filename": key,
                "content_type": content_type,
                "size_bytes": len(content),
            })
            scan_result = {"findings": []}

        # Determine action based on governance
        if governance_result.decision == "block":
            # Move to quarantine bucket
            destination_bucket = f"{bucket}-quarantine"
            action = "quarantined"
        elif governance_result.decision == "modify":
            # Redact and save modified version
            destination_bucket = f"{bucket}-processed"
            if governance_result.modified_payload:
                content = json.dumps(governance_result.modified_payload).encode()
            action = "modified"
        else:
            # Move to approved bucket
            destination_bucket = f"{bucket}-approved"
            action = "approved"

        # Copy to destination
        s3.put_object(
            Bucket=destination_bucket,
            Key=key,
            Body=content,
            Metadata={
                "governance-decision": governance_result.decision,
                "governance-violations": ",".join(governance_result.violations),
            },
        )

        # Delete from source
        s3.delete_object(Bucket=bucket, Key=key)

        results.append({
            "key": key,
            "action": action,
            "decision": governance_result.decision,
            "violations": governance_result.violations,
            "findings": len(scan_result.get("findings", [])),
        })

        print(f"Processed {key}: {action}")

    return {
        "processed": len(results),
        "results": results,
    }

API Gateway Integration

yamlapi-gateway.yaml
# SAM template for API Gateway with Lambda authorizer

Resources:
  GovernanceApi:
    Type: AWS::Serverless::Api
    Properties:
      StageName: prod
      Auth:
        DefaultAuthorizer: TorkAuthorizer
        Authorizers:
          TorkAuthorizer:
            FunctionArn: !GetAtt AuthorizerFunction.Arn

  AuthorizerFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: authorizer.lambda_handler
      Runtime: python3.11
      CodeUri: src/
      Environment:
        Variables:
          TORK_API_KEY: !Ref TorkApiKey

  # Authorizer that checks governance before allowing request
  # authorizer.py content:
  #
  # def lambda_handler(event, context):
  #     token = event.get("authorizationToken", "")
  #     method_arn = event["methodArn"]
  #
  #     # Validate token and check governance
  #     if is_valid_and_governed(token):
  #         return generate_policy("user", "Allow", method_arn)
  #     else:
  #         return generate_policy("user", "Deny", method_arn)

  AIProxyFunction:
    Type: AWS::Serverless::Function
    Properties:
      Handler: proxy.lambda_handler
      Runtime: python3.11
      CodeUri: src/
      Timeout: 120
      MemorySize: 1024
      Events:
        ProxyApi:
          Type: Api
          Properties:
            RestApiId: !Ref GovernanceApi
            Path: /ai/{proxy+}
            Method: ANY

DynamoDB Streams for Audit Logging

pythondynamodb_audit.py
"""
DynamoDB Streams handler for governance audit logging.

Captures all governance decisions and stores them for
compliance and analytics.
"""

import json
import boto3
from datetime import datetime

from tork.audit import AuditLogger


dynamodb = boto3.resource("dynamodb")
audit_table = dynamodb.Table("tork-governance-audit")
audit_logger = AuditLogger()


def lambda_handler(event, context):
    """
    Process DynamoDB stream events for audit logging.
    """
    for record in event["Records"]:
        if record["eventName"] not in ["INSERT", "MODIFY"]:
            continue

        # Extract governance decision from new image
        new_image = record["dynamodb"].get("NewImage", {})

        # Parse DynamoDB format to regular dict
        decision_data = parse_dynamodb_item(new_image)

        # Create audit record
        audit_record = {
            "audit_id": f"audit-{context.aws_request_id}-{record['eventID']}",
            "timestamp": datetime.utcnow().isoformat(),
            "event_type": record["eventName"],
            "decision": decision_data.get("decision"),
            "agent_id": decision_data.get("agent_id"),
            "violations": decision_data.get("violations", []),
            "pii_detected": decision_data.get("pii_count", 0),
            "source_table": record["eventSourceARN"].split("/")[1],
        }

        # Store in audit table
        audit_table.put_item(Item=audit_record)

        # Send to external audit system if configured
        audit_logger.log(audit_record)

    return {"processed": len(event["Records"])}


def parse_dynamodb_item(item):
    """Convert DynamoDB item format to regular dict."""
    result = {}
    for key, value in item.items():
        if "S" in value:
            result[key] = value["S"]
        elif "N" in value:
            result[key] = int(value["N"])
        elif "L" in value:
            result[key] = [parse_dynamodb_item(v) if "M" in v else v.get("S") for v in value["L"]]
        elif "M" in value:
            result[key] = parse_dynamodb_item(value["M"])
    return result

Cost Optimization

Minimize serverless costs

Optimize your serverless deployment for cost-efficiency without sacrificing performance.

StrategySavingsTrade-off
ARM64 architecture20%Must verify dependency compatibility
Right-size memory10-50%May increase duration
Provisioned concurrency-10 to +30%Fixed cost vs per-request
Lambda Layers5-10%Version management complexity
Async processing20-40%Not suitable for real-time
Request batching30-60%Increased latency
pythonbatch_processor.py
"""
Batch processor for cost-efficient governance.

Processes multiple requests in a single Lambda invocation
to reduce per-request overhead.
"""

import json
from concurrent.futures import ThreadPoolExecutor

from tork import TorkClient


tork = TorkClient()


def lambda_handler(event, context):
    """
    Process batch of governance requests.

    Accepts array of requests and processes them
    in parallel for efficiency.
    """
    requests = event.get("requests", [])

    if not requests:
        return {"error": "No requests provided", "processed": 0}

    # Process in parallel with thread pool
    results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [
            executor.submit(process_request, req, i)
            for i, req in enumerate(requests)
        ]

        for future in futures:
            results.append(future.result())

    # Aggregate statistics
    allowed = sum(1 for r in results if r["decision"] == "allow")
    blocked = sum(1 for r in results if r["decision"] == "block")
    modified = sum(1 for r in results if r["decision"] == "modify")

    return {
        "processed": len(results),
        "summary": {
            "allowed": allowed,
            "blocked": blocked,
            "modified": modified,
        },
        "results": results,
    }


def process_request(request, index):
    """Process a single governance request."""
    try:
        result = tork.evaluate(
            payload=request.get("payload", {}),
            agent_id=request.get("agent_id", f"batch-{index}"),
        )
        return {
            "index": index,
            "decision": result.decision,
            "violations": result.violations,
            "success": True,
        }
    except Exception as e:
        return {
            "index": index,
            "decision": "error",
            "error": str(e),
            "success": False,
        }

Monitoring & Observability

Track serverless governance metrics

Monitor your serverless Tork deployment with CloudWatch metrics and custom dashboards.

pythonmetrics_handler.py
"""
Lambda handler with CloudWatch metrics integration.
"""

import os
import time
import boto3

from tork import TorkClient


cloudwatch = boto3.client("cloudwatch")
tork = TorkClient()

NAMESPACE = "Tork/Governance"


def put_metric(name, value, unit="Count", dimensions=None):
    """Publish metric to CloudWatch."""
    metric_data = {
        "MetricName": name,
        "Value": value,
        "Unit": unit,
    }
    if dimensions:
        metric_data["Dimensions"] = [
            {"Name": k, "Value": v} for k, v in dimensions.items()
        ]

    cloudwatch.put_metric_data(
        Namespace=NAMESPACE,
        MetricData=[metric_data],
    )


def lambda_handler(event, context):
    """Handler with metrics instrumentation."""
    start_time = time.time()

    try:
        payload = event.get("payload", {})
        agent_id = event.get("agent_id", "default")

        # Evaluate governance
        result = tork.evaluate(payload=payload, agent_id=agent_id)

        # Record metrics
        duration_ms = (time.time() - start_time) * 1000

        put_metric("Invocations", 1, dimensions={"AgentId": agent_id})
        put_metric("Duration", duration_ms, unit="Milliseconds")
        put_metric(f"Decision_{result.decision.title()}", 1, dimensions={"AgentId": agent_id})
        put_metric("ViolationCount", len(result.violations))
        put_metric("PIIDetected", len(result.pii_matches))

        if result.decision == "block":
            put_metric("BlockedRequests", 1, dimensions={"AgentId": agent_id})

        return {
            "decision": result.decision,
            "violations": result.violations,
            "duration_ms": duration_ms,
        }

    except Exception as e:
        put_metric("Errors", 1)
        raise

CloudWatch Dashboard

jsondashboard.json
{
  "widgets": [
    {
      "type": "metric",
      "properties": {
        "title": "Governance Decisions",
        "metrics": [
          ["Tork/Governance", "Decision_Allow", {"color": "#22c55e"}],
          [".", "Decision_Block", {"color": "#ef4444"}],
          [".", "Decision_Modify", {"color": "#f59e0b"}]
        ],
        "period": 60,
        "stat": "Sum"
      }
    },
    {
      "type": "metric",
      "properties": {
        "title": "Invocation Latency",
        "metrics": [
          ["Tork/Governance", "Duration", {"stat": "p50"}],
          [".", ".", {"stat": "p99"}]
        ],
        "period": 60
      }
    },
    {
      "type": "metric",
      "properties": {
        "title": "Cold Starts",
        "metrics": [
          ["AWS/Lambda", "ConcurrentExecutions", "FunctionName", "tork-governance"],
          [".", "Invocations", ".", "."]
        ],
        "period": 60
      }
    },
    {
      "type": "metric",
      "properties": {
        "title": "Error Rate",
        "metrics": [
          ["Tork/Governance", "Errors"],
          ["AWS/Lambda", "Errors", "FunctionName", "tork-governance"]
        ],
        "period": 60
      }
    }
  ]
}

Troubleshooting

Common issues and solutions

Cold start timeout
Increase timeout to 30s+, use provisioned concurrency, or reduce package size with Lambda Layers.
Memory errors during PII detection
Increase memory to 512MB+. PII regex patterns require more memory. Use API-based detection for large texts.
Module import errors
Ensure Lambda Layer includes all dependencies. Verify Python version compatibility (3.11+).
API Gateway timeout
API Gateway has 29s max timeout. Use async invocation with callback for long operations.
Inconsistent latency
Cold starts cause variance. Use provisioned concurrency or implement request warming.
Permission denied errors
Add required IAM permissions: logs:*, s3:GetObject, secretsmanager:GetSecretValue for API keys.

Next Steps

Now that your serverless deployment is configured, explore these resources:

Documentation

Learn to integrate TORK

Upgrade Plan

Current: free

Support

Get help from our team