HTTP Proxy Pattern
Deploy Tork as an HTTP proxy between your application and AI APIs. Intercept, govern, and forward requests to OpenAI, Anthropic, and other providers. Works with any language or framework.
Architecture Overview
How the proxy pattern works
The HTTP proxy pattern allows you to intercept all AI API calls, apply governance policies, and forward approved requests to the upstream provider. This works with any language since you simply point your API client to the proxy URL instead of the provider's URL.
FastAPI Proxy Service
Complete proxy implementation with Tork governance
This FastAPI service acts as a transparent proxy that intercepts OpenAI API calls, applies Tork governance, and forwards approved requests to OpenAI.
"""
Tork Governance Proxy for OpenAI API
A transparent HTTP proxy that applies governance policies
to all OpenAI API requests before forwarding them.
"""
import os
import json
import time
import httpx
from typing import Optional
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
from tork import TorkClient
app = FastAPI(title="Tork OpenAI Proxy")
tork = TorkClient(api_key=os.environ["TORK_API_KEY"])
# Target API configuration
OPENAI_BASE_URL = "https://api.openai.com/v1"
class GovernanceResult(BaseModel):
"""Result of governance evaluation."""
allowed: bool
modified_payload: Optional[dict] = None
violations: list[str] = []
pii_redacted: list[str] = []
async def evaluate_request(payload: dict, endpoint: str) -> GovernanceResult:
"""
Evaluate an API request against Tork governance policies.
Args:
payload: The request body
endpoint: The API endpoint being called
Returns:
GovernanceResult with decision and any modifications
"""
result = await tork.evaluate(
payload=payload,
context={
"endpoint": endpoint,
"timestamp": time.time(),
}
)
return GovernanceResult(
allowed=result.decision == "allow",
modified_payload=result.modified_payload,
violations=result.violations,
pii_redacted=result.pii_found,
)
async def evaluate_response(response_data: dict, endpoint: str) -> GovernanceResult:
"""
Evaluate an API response against output policies.
Args:
response_data: The response body
endpoint: The API endpoint
Returns:
GovernanceResult with any required modifications
"""
result = await tork.evaluate(
payload={"response": response_data},
context={
"endpoint": endpoint,
"direction": "output",
}
)
return GovernanceResult(
allowed=result.decision == "allow",
modified_payload=result.modified_payload.get("response") if result.modified_payload else None,
violations=result.violations,
)
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE", "PATCH"])
async def proxy(request: Request, path: str):
"""
Proxy all requests to OpenAI with governance.
This endpoint intercepts all HTTP methods and paths,
applies governance policies, and forwards to OpenAI.
"""
# Build target URL
target_url = f"{OPENAI_BASE_URL}/{path}"
# Get request body for POST/PUT/PATCH
body = None
if request.method in ["POST", "PUT", "PATCH"]:
body = await request.body()
if body:
try:
payload = json.loads(body)
# Evaluate request against governance policies
governance = await evaluate_request(payload, path)
if not governance.allowed:
raise HTTPException(
status_code=403,
detail={
"error": "Request blocked by governance policy",
"violations": governance.violations,
}
)
# Use modified payload if PII was redacted
if governance.modified_payload:
body = json.dumps(governance.modified_payload).encode()
except json.JSONDecodeError:
pass # Non-JSON body, forward as-is
# Forward headers (except host)
headers = dict(request.headers)
headers.pop("host", None)
# Check for streaming request
is_streaming = False
if body:
try:
payload = json.loads(body)
is_streaming = payload.get("stream", False)
except:
pass
async with httpx.AsyncClient() as client:
if is_streaming:
# Handle streaming responses
async def stream_with_governance():
async with client.stream(
request.method,
target_url,
headers=headers,
content=body,
timeout=120.0,
) as response:
async for chunk in response.aiter_bytes():
yield chunk
return StreamingResponse(
stream_with_governance(),
media_type="text/event-stream",
)
else:
# Handle regular responses
response = await client.request(
request.method,
target_url,
headers=headers,
content=body,
timeout=60.0,
)
# Evaluate response if JSON
response_body = response.content
try:
response_data = json.loads(response_body)
governance = await evaluate_response(response_data, path)
if not governance.allowed:
raise HTTPException(
status_code=403,
detail={
"error": "Response blocked by governance policy",
"violations": governance.violations,
}
)
if governance.modified_payload:
response_body = json.dumps(governance.modified_payload).encode()
except json.JSONDecodeError:
pass
return Response(
content=response_body,
status_code=response.status_code,
headers=dict(response.headers),
)
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "healthy", "proxy": "tork-openai"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)Run the proxy:
# Install dependencies
pip install fastapi uvicorn httpx tork
# Set environment variables
export TORK_API_KEY="your-tork-api-key"
export OPENAI_API_KEY="your-openai-api-key"
# Start the proxy server
uvicorn tork_proxy:app --host 0.0.0.0 --port 8080Client Configuration
Point your AI clients to the proxy
Configure your AI SDK or HTTP client to use the Tork proxy URL instead of the default API URL. All requests will automatically be governed.
from openai import OpenAI
# Point to Tork proxy instead of api.openai.com
client = OpenAI(
api_key="your-openai-api-key",
base_url="http://localhost:8080", # Tork proxy URL
)
# All requests now go through governance
response = client.chat.completions.create(
model="gpt-4",
messages=[
{"role": "user", "content": "Hello, my SSN is 123-45-6789"}
]
)
# PII is automatically redacted before reaching OpenAI
print(response.choices[0].message.content)Multi-Provider Proxy
Single proxy for multiple AI providers
Extend the proxy to support multiple AI providers with unified governance. Route requests based on URL path or headers.
"""
Multi-Provider Tork Governance Proxy
Routes requests to OpenAI, Anthropic, or other providers
with unified governance policies.
"""
import os
import json
import httpx
from fastapi import FastAPI, Request, Response, HTTPException
from fastapi.responses import StreamingResponse
from tork import TorkClient
app = FastAPI(title="Tork Multi-Provider Proxy")
tork = TorkClient(api_key=os.environ["TORK_API_KEY"])
# Provider configurations
PROVIDERS = {
"openai": {
"base_url": "https://api.openai.com/v1",
"auth_header": "Authorization",
"auth_env": "OPENAI_API_KEY",
},
"anthropic": {
"base_url": "https://api.anthropic.com/v1",
"auth_header": "x-api-key",
"auth_env": "ANTHROPIC_API_KEY",
},
"azure": {
"base_url": os.environ.get("AZURE_OPENAI_ENDPOINT", ""),
"auth_header": "api-key",
"auth_env": "AZURE_OPENAI_API_KEY",
},
}
async def apply_governance(payload: dict, provider: str, direction: str) -> dict:
"""Apply Tork governance to request/response."""
result = await tork.evaluate(
payload=payload,
context={
"provider": provider,
"direction": direction,
}
)
if result.decision != "allow":
raise HTTPException(
status_code=403,
detail={
"error": f"{direction.title()} blocked by governance",
"violations": result.violations,
}
)
return result.modified_payload or payload
@app.api_route("/v1/{provider}/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def proxy(request: Request, provider: str, path: str):
"""
Route requests to the appropriate provider.
URL format: /v1/{provider}/{path}
Examples:
/v1/openai/chat/completions
/v1/anthropic/messages
/v1/azure/deployments/gpt-4/chat/completions
"""
if provider not in PROVIDERS:
raise HTTPException(status_code=400, detail=f"Unknown provider: {provider}")
config = PROVIDERS[provider]
target_url = f"{config['base_url']}/{path}"
# Get and govern request body
body = None
if request.method in ["POST", "PUT", "PATCH"]:
body = await request.body()
if body:
try:
payload = json.loads(body)
governed = await apply_governance(payload, provider, "input")
body = json.dumps(governed).encode()
except json.JSONDecodeError:
pass
# Build headers with provider auth
headers = dict(request.headers)
headers.pop("host", None)
# Add provider-specific authentication
if config["auth_env"] in os.environ:
api_key = os.environ[config["auth_env"]]
if config["auth_header"] == "Authorization":
headers["Authorization"] = f"Bearer {api_key}"
else:
headers[config["auth_header"]] = api_key
# Forward request
async with httpx.AsyncClient() as client:
response = await client.request(
request.method,
target_url,
headers=headers,
content=body,
timeout=60.0,
)
# Govern response
response_body = response.content
try:
response_data = json.loads(response_body)
governed = await apply_governance(response_data, provider, "output")
response_body = json.dumps(governed).encode()
except json.JSONDecodeError:
pass
return Response(
content=response_body,
status_code=response.status_code,
)
@app.get("/health")
async def health():
"""Health check endpoint."""
return {
"status": "healthy",
"providers": list(PROVIDERS.keys()),
}Client usage with multi-provider proxy:
from openai import OpenAI
import anthropic
# OpenAI through proxy
openai_client = OpenAI(
api_key="your-key",
base_url="http://localhost:8080/v1/openai",
)
# Anthropic through proxy
anthropic_client = anthropic.Anthropic(
api_key="your-key",
base_url="http://localhost:8080/v1/anthropic",
)
# Both governed by the same Tork policies!
openai_response = openai_client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Hello"}],
)
anthropic_response = anthropic_client.messages.create(
model="claude-3-opus-20240229",
max_tokens=1024,
messages=[{"role": "user", "content": "Hello"}],
)Nginx Reverse Proxy
Production deployment with Nginx
For production deployments, use Nginx as a reverse proxy in front of the Tork proxy service for SSL termination, load balancing, and caching.
# Nginx configuration for Tork AI Governance Proxy
upstream tork_proxy {
# Multiple Tork proxy instances for load balancing
server tork-proxy-1:8080 weight=1;
server tork-proxy-2:8080 weight=1;
server tork-proxy-3:8080 weight=1;
# Health checks
keepalive 32;
}
server {
listen 443 ssl http2;
server_name ai-proxy.example.com;
# SSL configuration
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/key.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers ECDHE-ECDSA-AES128-GCM-SHA256:ECDHE-RSA-AES128-GCM-SHA256;
# Logging
access_log /var/log/nginx/tork_access.log;
error_log /var/log/nginx/tork_error.log;
# Timeouts for AI API calls (can be long)
proxy_connect_timeout 60s;
proxy_send_timeout 120s;
proxy_read_timeout 300s;
# Buffer settings
proxy_buffering off; # Required for streaming
proxy_buffer_size 128k;
proxy_buffers 4 256k;
proxy_busy_buffers_size 256k;
# Health check endpoint
location /health {
proxy_pass http://tork_proxy/health;
proxy_http_version 1.1;
proxy_set_header Connection "";
}
# Main proxy location
location / {
proxy_pass http://tork_proxy;
proxy_http_version 1.1;
# Headers
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
# WebSocket/SSE support for streaming
proxy_set_header Connection "";
proxy_set_header Upgrade $http_upgrade;
# Don't buffer streaming responses
proxy_buffering off;
proxy_cache off;
# Rate limiting (optional)
# limit_req zone=ai_api burst=20 nodelay;
}
}
# Rate limiting zone (optional)
# limit_req_zone $binary_remote_addr zone=ai_api:10m rate=10r/s;
# HTTP to HTTPS redirect
server {
listen 80;
server_name ai-proxy.example.com;
return 301 https://$server_name$request_uri;
}Docker Compose Deployment
Complete containerized setup
Deploy the complete Tork proxy stack with Docker Compose including the proxy service, Nginx, and optional Redis for caching.
version: '3.8'
services:
# Tork Governance Proxy
tork-proxy:
build:
context: .
dockerfile: Dockerfile.proxy
environment:
- TORK_API_KEY=${TORK_API_KEY}
- OPENAI_API_KEY=${OPENAI_API_KEY}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- REDIS_URL=redis://redis:6379
expose:
- "8080"
deploy:
replicas: 3
resources:
limits:
cpus: '1'
memory: 512M
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 3
depends_on:
- redis
# Nginx Load Balancer
nginx:
image: nginx:alpine
ports:
- "443:443"
- "80:80"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf:ro
- ./ssl:/etc/nginx/ssl:ro
depends_on:
- tork-proxy
healthcheck:
test: ["CMD", "nginx", "-t"]
interval: 30s
timeout: 10s
retries: 3
# Redis for caching (optional)
redis:
image: redis:7-alpine
expose:
- "6379"
volumes:
- redis_data:/data
command: redis-server --appendonly yes
volumes:
redis_data:FROM python:3.11-slim
WORKDIR /app
# Install dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy proxy code
COPY tork_proxy.py .
# Run with uvicorn
CMD ["uvicorn", "tork_proxy:app", "--host", "0.0.0.0", "--port", "8080", "--workers", "4"]# Start the stack
docker-compose up -d
# Scale proxy instances
docker-compose up -d --scale tork-proxy=5
# View logs
docker-compose logs -f tork-proxy
# Health check
curl https://ai-proxy.example.com/healthKubernetes Deployment
Production Kubernetes configuration
Deploy the Tork proxy to Kubernetes with autoscaling, health checks, and ingress configuration.
apiVersion: apps/v1
kind: Deployment
metadata:
name: tork-proxy
labels:
app: tork-proxy
spec:
replicas: 3
selector:
matchLabels:
app: tork-proxy
template:
metadata:
labels:
app: tork-proxy
spec:
containers:
- name: tork-proxy
image: your-registry/tork-proxy:latest
ports:
- containerPort: 8080
env:
- name: TORK_API_KEY
valueFrom:
secretKeyRef:
name: tork-secrets
key: api-key
- name: OPENAI_API_KEY
valueFrom:
secretKeyRef:
name: openai-secrets
key: api-key
resources:
requests:
cpu: "250m"
memory: "256Mi"
limits:
cpu: "1000m"
memory: "512Mi"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
---
apiVersion: v1
kind: Service
metadata:
name: tork-proxy
spec:
selector:
app: tork-proxy
ports:
- port: 80
targetPort: 8080
type: ClusterIP
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: tork-proxy-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: tork-proxy
minReplicas: 3
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
---
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: tork-proxy-ingress
annotations:
kubernetes.io/ingress.class: nginx
cert-manager.io/cluster-issuer: letsencrypt-prod
nginx.ingress.kubernetes.io/proxy-read-timeout: "300"
nginx.ingress.kubernetes.io/proxy-send-timeout: "120"
spec:
tls:
- hosts:
- ai-proxy.example.com
secretName: tork-proxy-tls
rules:
- host: ai-proxy.example.com
http:
paths:
- path: /
pathType: Prefix
backend:
service:
name: tork-proxy
port:
number: 80Governance Policies
Configure what the proxy enforces
Configure Tork policies to control what passes through the proxy. These policies apply to all requests regardless of the client language or framework.
# Tork Governance Policies for AI Proxy
version: "1.0"
name: "ai-proxy-policies"
policies:
# PII Protection - redact sensitive data
- name: "pii-protection"
type: "pii"
action: "redact"
enabled: true
config:
types:
- ssn
- credit_card
- phone_number
- email
- address
direction: "both" # Apply to input and output
# Content Moderation
- name: "content-safety"
type: "content"
action: "block"
enabled: true
config:
block_categories:
- hate_speech
- violence
- illegal_activities
sensitivity: "medium"
# Prompt Injection Protection
- name: "injection-protection"
type: "injection"
action: "block"
enabled: true
config:
patterns:
- "ignore previous instructions"
- "disregard all prior"
- "you are now"
block_on_match: true
# Rate Limiting per API Key
- name: "rate-limits"
type: "rate_limit"
action: "throttle"
enabled: true
config:
requests_per_minute: 60
tokens_per_minute: 100000
by: "api_key"
# Model Access Control
- name: "model-allowlist"
type: "allowlist"
action: "block"
enabled: true
config:
allowed_models:
- "gpt-4"
- "gpt-4-turbo"
- "gpt-3.5-turbo"
- "claude-3-opus"
- "claude-3-sonnet"
block_unknown: true
# Cost Control
- name: "cost-limits"
type: "cost"
action: "block"
enabled: true
config:
max_tokens_per_request: 4000
max_daily_cost_usd: 100.00
warn_at_percent: 80Monitoring & Observability
Track proxy metrics and audit logs
Add metrics and structured logging to monitor proxy performance and governance decisions.
"""
Tork Proxy with Prometheus metrics and structured logging.
"""
import time
import json
import structlog
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import FastAPI, Request, Response
from fastapi.responses import PlainTextResponse
from tork import TorkClient
# Configure structured logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer(),
]
)
logger = structlog.get_logger()
# Prometheus metrics
REQUEST_COUNT = Counter(
"tork_proxy_requests_total",
"Total proxy requests",
["provider", "endpoint", "status"]
)
GOVERNANCE_DECISIONS = Counter(
"tork_proxy_governance_decisions_total",
"Governance decisions made",
["provider", "decision", "policy"]
)
REQUEST_LATENCY = Histogram(
"tork_proxy_request_duration_seconds",
"Request latency in seconds",
["provider", "endpoint"]
)
PII_REDACTIONS = Counter(
"tork_proxy_pii_redactions_total",
"PII entities redacted",
["type"]
)
app = FastAPI(title="Tork Proxy with Metrics")
tork = TorkClient()
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
"""Add metrics to all requests."""
start_time = time.time()
response = await call_next(request)
# Record metrics
duration = time.time() - start_time
path_parts = request.url.path.split("/")
provider = path_parts[2] if len(path_parts) > 2 else "unknown"
endpoint = "/".join(path_parts[3:]) if len(path_parts) > 3 else "root"
REQUEST_COUNT.labels(
provider=provider,
endpoint=endpoint,
status=response.status_code
).inc()
REQUEST_LATENCY.labels(
provider=provider,
endpoint=endpoint
).observe(duration)
# Structured logging
logger.info(
"request_completed",
provider=provider,
endpoint=endpoint,
status=response.status_code,
duration_ms=round(duration * 1000, 2),
client_ip=request.client.host if request.client else None,
)
return response
def log_governance_decision(provider: str, result, direction: str):
"""Log governance decisions with metrics."""
decision = result.decision
# Update metrics
GOVERNANCE_DECISIONS.labels(
provider=provider,
decision=decision,
policy=result.policy_name if hasattr(result, 'policy_name') else "default"
).inc()
# Track PII redactions
for pii in result.pii_found:
PII_REDACTIONS.labels(type=pii.get("type", "unknown")).inc()
# Structured log
logger.info(
"governance_decision",
provider=provider,
direction=direction,
decision=decision,
violations=result.violations,
pii_redacted=len(result.pii_found),
)
if decision != "allow":
logger.warning(
"request_blocked",
provider=provider,
direction=direction,
violations=result.violations,
)
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
return PlainTextResponse(
generate_latest(),
media_type="text/plain"
)
@app.get("/health")
async def health():
"""Health check with detailed status."""
return {
"status": "healthy",
"tork_connected": await tork.health_check(),
"timestamp": time.time(),
}Prometheus scrape config:
scrape_configs:
- job_name: 'tork-proxy'
static_configs:
- targets: ['tork-proxy:8080']
metrics_path: /metrics
scrape_interval: 15sEnvironment Variables
Configuration reference
| Variable | Description | Required |
|---|---|---|
TORK_API_KEY | Your Tork API key for governance | Required |
OPENAI_API_KEY | OpenAI API key (if proxying OpenAI) | Optional |
ANTHROPIC_API_KEY | Anthropic API key (if proxying Anthropic) | Optional |
AZURE_OPENAI_ENDPOINT | Azure OpenAI endpoint URL | Optional |
AZURE_OPENAI_API_KEY | Azure OpenAI API key | Optional |
REDIS_URL | Redis URL for caching (optional) | Optional |
LOG_LEVEL | Logging level (DEBUG, INFO, WARNING, ERROR) | Optional |
PROXY_TIMEOUT | Request timeout in seconds (default: 120) | Optional |
Troubleshooting
Common issues and solutions
Next Steps
Now that your proxy is set up, explore these resources: