FastAPI for Internal Automation Services: Building Robust Backends for Workflow Orchestration
Meta Description: Learn how to build scalable, type-safe REST APIs with FastAPI for internal automation services. This comprehensive guide covers Python backend development, microservices architecture, n8n integration, authentication, and best practices for automation engineers. Keywords: fastapi, python, rest api, backend, microservices, automation, n8n, workflow, api design, backend services, python backend, automation api, internal tools, microservices architecture---
Article Hero
Category: Backend Development • Date: February 27, 2026 • Read Time: 15-18 minutes Lead Paragraph: As automation engineers, we often need to build internal APIs that serve as the backbone for our workflow orchestration. FastAPI emerges as the perfect tool for this job—combining Python's simplicity with enterprise-grade performance, automatic documentation, and type safety. This guide walks through building production-ready automation services that integrate seamlessly with n8n and other workflow tools.---
Why FastAPI is Perfect for Automation Backends
When building internal automation services, you need a framework that's:
1. Fast to develop - Automation projects often have tight deadlines 2. Easy to maintain - Internal tools need to be reliable with minimal oversight 3. Well-documented - Team members should understand APIs without digging through code 4. Performant - Automation backends handle concurrent workflows efficiently 5. Type-safe - Catch errors before they break production workflows
FastAPI delivers on all these requirements while providing automatic OpenAPI documentation, dependency injection, and async support out of the box.
The Automation Backend Stack
python
Typical FastAPI automation service architecture
from fastapi import FastAPI, Depends, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import asyncio
from datetime import datetime
Your automation logic here
class AutomationService:
def __init__(self):
self.running_workflows = {}
async def trigger_workflow(self, workflow_id: str, data: dict):
"""Trigger an automation workflow"""
# Implementation
pass
Setting Up Your FastAPI Automation Project
Project Structure for Automation Services
automation-backend/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI app instance
│ ├── api/
│ │ ├── __init__.py
│ │ ├── v1/
│ │ │ ├── __init__.py
│ │ │ ├── endpoints/
│ │ │ │ ├ workflows.py
│ │ │ │ ├ triggers.py
│ │ │ │ └── tasks.py
│ │ │ └── dependencies.py
│ ├── core/
│ │ ├── config.py # Configuration management
│ │ ├── security.py # Authentication & authorization
│ │ └── database.py # Database connections
│ ├── models/
│ │ ├── schemas.py # Pydantic models
│ │ └── database.py # SQLAlchemy models
│ ├── services/
│ │ ├── workflow_service.py
│ │ ├── task_service.py
│ │ └── notification_service.py
│ └── utils/
│ ├── logging.py
│ └── helpers.py
├── tests/
├── requirements.txt
├── Dockerfile
└── docker-compose.yml
Installation and Basic Setup
bash
Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
Install FastAPI and dependencies
pip install fastapi[all] uvicorn sqlalchemy pydantic python-jose[cryptography] passlib[bcrypt]
For production
pip install gunicorn httpx redis celery
Minimal Automation API Example
python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime
app = FastAPI(
title="Automation Service API",
description="Internal API for workflow automation",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
class WorkflowTrigger(BaseModel):
workflow_id: str
trigger_data: dict
priority: Optional[str] = "normal"
scheduled_for: Optional[datetime] = None
class WorkflowResponse(BaseModel):
execution_id: str
status: str
started_at: datetime
estimated_completion: Optional[datetime]
@app.post("/trigger-workflow", response_model=WorkflowResponse)
async def trigger_workflow(trigger: WorkflowTrigger):
"""
Trigger an automation workflow with provided data.
- - workflow_id: ID of the workflow to trigger
- - trigger_data: JSON data to pass to the workflow
- - priority: Execution priority (low/normal/high)
- - scheduled_for: Optional future execution time
"""
# Simulate workflow execution
execution_id = f"exec_{datetime.now().timestamp()}"
return WorkflowResponse(
execution_id=execution_id,
status="queued",
started_at=datetime.now(),
estimated_completion=datetime.now()
)
@app.get("/workflow-status/{execution_id}")
async def get_workflow_status(execution_id: str):
"""Get the current status of a workflow execution."""
# Implementation would check database or task queue
return {
"execution_id": execution_id,
"status": "completed",
"progress": 100,
"result": {"message": "Workflow executed successfully"}
}
Building Automation-Focused API Endpoints
Workflow Management Endpoints
python
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List
from app.core.database import get_db
from app.models.schemas import Workflow, WorkflowCreate, WorkflowUpdate
from app.services.workflow_service import WorkflowService
router = APIRouter(prefix="/workflows", tags=["workflows"])
@router.get("/", response_model=List[Workflow])
async def list_workflows(
skip: int = 0,
limit: int = 100,
db: Session = Depends(get_db),
service: WorkflowService = Depends()
):
"""List all available automation workflows."""
return service.get_workflows(db, skip=skip, limit=limit)
@router.post("/", response_model=Workflow, status_code=status.HTTP_201_CREATED)
async def create_workflow(
workflow: WorkflowCreate,
db: Session = Depends(get_db),
service: WorkflowService = Depends()
):
"""Register a new automation workflow."""
return service.create_workflow(db, workflow)
@router.post("/{workflow_id}/execute")
async def execute_workflow(
workflow_id: str,
input_data: dict,
db: Session = Depends(get_db),
service: WorkflowService = Depends()
):
"""Execute a specific workflow with input data."""
result = service.execute_workflow(db, workflow_id, input_data)
if not result:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Workflow {workflow_id} not found"
)
return {
"execution_id": result.execution_id,
"status": "started",
"workflow": workflow_id,
"started_at": datetime.now().isoformat()
}
Task Queue Management
python
from fastapi import BackgroundTasks
from celery import Celery
import json
Celery configuration for async task processing
celery_app = Celery(
'automation_tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/0'
)
@router.post("/tasks/async")
async def create_async_task(
task_type: str,
parameters: dict,
background_tasks: BackgroundTasks
):
"""
Create an asynchronous automation task.
Useful for long-running operations like:
- - Data processing
- - File generation
- - External API calls
- - Batch operations
"""
# Add to background tasks (for lightweight operations)
background_tasks.add_task(process_automation_task, task_type, parameters)
# Or use Celery for distributed task processing
celery_task = celery_app.send_task(
'process_automation',
args=[task_type, parameters]
)
return {
"task_id": celery_task.id,
"status": "queued",
"message": f"Task {task_type} queued for processing"
}
async def process_automation_task(task_type: str, parameters: dict):
"""Background task processor"""
# Your automation logic here
print(f"Processing {task_type} with params: {parameters}")
Database Integration for Automation State
SQLAlchemy Models for Automation
python
from sqlalchemy import Column, String, DateTime, JSON, Integer, Boolean
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
Base = declarative_base()
class WorkflowExecution(Base):
__tablename__ = "workflow_executions"
id = Column(Integer, primary_key=True, index=True)
execution_id = Column(String, unique=True, index=True)
workflow_id = Column(String, index=True)
status = Column(String, default="pending") # pending, running, completed, failed
input_data = Column(JSON)
output_data = Column(JSON, nullable=True)
started_at = Column(DateTime, default=datetime.utcnow)
completed_at = Column(DateTime, nullable=True)
error_message = Column(String, nullable=True)
class AutomationTask(Base):
__tablename__ = "automation_tasks"
id = Column(Integer, primary_key=True, index=True)
task_id = Column(String, unique=True, index=True)
task_type = Column(String) # email, webhook, data_transform, etc.
parameters = Column(JSON)
status = Column(String, default="queued")
priority = Column(Integer, default=0)
scheduled_for = Column(DateTime, nullable=True)
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
Pydantic Schemas for API Validation
python
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from datetime import datetime
class WorkflowExecutionBase(BaseModel):
workflow_id: str = Field(..., description="ID of the workflow to execute")
trigger_data: Dict[str, Any] = Field(default_factory=dict)
priority: Optional[str] = "normal"
scheduled_for: Optional[datetime] = None
class WorkflowExecutionCreate(WorkflowExecutionBase):
pass
class WorkflowExecution(WorkflowExecutionBase):
execution_id: str
status: str
started_at: datetime
completed_at: Optional[datetime]
output_data: Optional[Dict[str, Any]]
class Config:
orm_mode = True
class TaskStatus(BaseModel):
task_id: str
status: str
progress: Optional[int] = None
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
created_at: datetime
updated_at: datetime
Authentication and Authorization for Internal Services
API Key Authentication
python
from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.models.database import APIKey
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
async def get_api_key(
api_key: str = Depends(api_key_header),
db: Session = Depends(get_db)
):
"""Validate API key for internal service access."""
if not api_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key required"
)
# Check if API key exists and is valid
db_key = db.query(APIKey).filter(
APIKey.key == api_key,
APIKey.is_active == True
).first()
if not db_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key"
)
# Check expiration
if db_key.expires_at and db_key.expires_at < datetime.utcnow():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="API key expired"
)
return db_key
Usage in endpoints
@router.post("/secure-trigger")
async def secure_workflow_trigger(
trigger: WorkflowTrigger,
api_key: APIKey = Depends(get_api_key)
):
"""Secure endpoint requiring API key authentication."""
# Only accessible with valid API key
return await trigger_workflow(trigger)
Role-Based Access Control
python
from enum import Enum
from functools import wraps
class UserRole(str, Enum):
VIEWER = "viewer"
OPERATOR = "operator"
ADMIN = "admin"
SUPER_ADMIN = "super_admin"
def require_role(required_role: UserRole):
"""Decorator to require specific user role."""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Get current user from dependency
current_user = kwargs.get("current_user")
if not current_user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required"
)
# Check role hierarchy
role_hierarchy = {
UserRole.VIEWER: 1,
UserRole.OPERATOR: 2,
UserRole.ADMIN: 3,
UserRole.SUPER_ADMIN: 4
}
user_role_level = role_hierarchy.get(current_user.role, 0)
required_role_level = role_hierarchy[required_role]
if user_role_level < required_role_level:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires {required_role.value} role or higher"
)
return await func(*args, **kwargs)
return wrapper
return decorator
Usage
@router.post("/admin/workflows")
@require_role(UserRole.ADMIN)
async def create_admin_workflow(
workflow: WorkflowCreate,
current_user: User = Depends(get_current_user)
):
"""Create workflow (admin only)."""
# Admin-only logic here
pass
Integrating with n8n and Other Automation Tools
Webhook Endpoints for n8n Integration
python
from fastapi import Request
import hashlib
import hmac
@router.post("/webhooks/n8n/{workflow_id}")
async def n8n_webhook(
workflow_id: str,
request: Request,
x_n8n_signature: Optional[str] = Header(None)
):
"""
Webhook endpoint for n8n workflow triggers.
n8n can call this endpoint to:
- - Trigger internal workflows
- - Pass data between n8n and custom services
- - Handle complex logic outside n8n
"""
# Verify webhook signature (if configured)
if x_n8n_signature:
body = await request.body()
secret = os.getenv("N8N_WEBHOOK_SECRET", "").encode()
expected_signature = hmac.new(
secret,
body,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(x_n8n_signature, expected_signature):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid webhook signature"
)
# Parse incoming data
data = await request.json()
# Process based on workflow ID
if workflow_id == "data-processing":
result = await process_data_pipeline(data)
elif workflow_id == "notification":
result = await send_notifications(data)
else:
result = {"status": "unknown_workflow", "data": data}
return {
"webhook_received": True,
"workflow_id": workflow_id,
"processed": True,
"result": result
}
Calling External APIs from FastAPI
python
import httpx
from fastapi import HTTPException
class ExternalAPIService:
def __init__(self):
self.client = httpx.AsyncClient(timeout=30.0)
async def call_n8n_webhook(self, webhook_url: str, data: dict):
"""Call n8n webhook from FastAPI service."""
try:
response = await self.client.post(
webhook_url,
json=data,
headers={"Content-Type": "application/json"}
)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
raise HTTPException(
status_code=status.HTTP_502_BAD_GATEWAY,
detail=f"Failed to call n8n webhook: {str(e)}"
)
async def trigger_n8n_workflow(self, workflow_id: str, data: dict):
"""Trigger a specific n8n workflow."""
# Assuming n8n is running locally on port 5678
n8n_url = f"http://localhost:5678/webhook/{workflow_id}"
return await self.call_n8n_webhook(n8n_url, data)
Usage in endpoint
@router.post("/trigger-n8n-workflow")
async def trigger_external_n8n_workflow(
workflow_id: str,
trigger_data: dict,
api_service: ExternalAPIService = Depends()
):
"""Trigger an n8n workflow from FastAPI."""
result = await api_service.trigger_n8n_workflow(workflow_id, trigger_data)
return {
"message": f"n8n workflow {workflow_id} triggered",
"n8n_response": result
}
Error Handling and Retry Logic
Robust Error Handling for Automation Services
python
from fastapi import HTTPException
from tenacity import retry, stop_after_attempt, wait_exponential
import logging
logger = logging.getLogger(__name__)
class AutomationError(Exception):
"""Custom exception for automation failures."""
def __init__(self, message: str, retryable: bool = True):
self.message = message
self.retryable = retryable
super().__init__(self.message)
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def execute_with_retry(workflow_func, *args, **kwargs):
"""Execute automation function with retry logic."""
try:
return await workflow_func(*args, **kwargs)
except Exception as e:
logger.error(f"Automation failed: {str(e)}")
# Check if error is retryable
if isinstance(e, AutomationError) and not e.retryable:
raise
# Re-raise for retry
raise
@router.post("/robust-workflow")
async def robust_workflow_execution(trigger: WorkflowTrigger):
"""Execute workflow with built-in retry logic."""
try:
result = await execute_with_retry(
process_complex_workflow,
trigger.workflow_id,
trigger.trigger_data
)
return {"status": "success", "result": result}
except Exception as e:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"Workflow failed after retries: {str(e)}"
)
Monitoring and Logging
Structured Logging for Automation Services
python
import structlog
from datetime import datetime
Configure structured logging
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
]
)
logger = structlog.get_logger()
class WorkflowLogger:
def __init__(self, execution_id: str):
self.execution_id = execution_id
self.logger = logger.bind(execution_id=execution_id)
def log_step(self, step_name: str, data: dict = None):
"""Log a workflow step with execution context."""
log_data = {
"step": step_name,
"timestamp": datetime.utcnow().isoformat()
}
if data:
log_data.update(data)
self.logger.info("workflow_step", **log_data)
def log_error(self, error: Exception, context: dict = None):
"""Log workflow error with context."""
error_data = {
"error_type": type(error).__name__,
"error_message": str(error),
"timestamp": datetime.utcnow().isoformat()
}
if context:
error_data.update(context)
self.logger.error("workflow_error", **error_data)
Usage in workflow service
async def execute_workflow_with_logging(workflow_id: str, data: dict):
workflow_logger = WorkflowLogger(f"exec_{workflow_id}")
try:
workflow_logger.log_step("start", {"input_data": data})
# Process workflow steps
workflow_logger.log_step("data_validation")
validated_data = validate_data(data)
workflow_logger.log_step("external_api_call")
api_result = await call_external_api(validated_data)
workflow_logger.log_step("complete", {"result": api_result})
return api_result
except Exception as e:
workflow_logger.log_error(e, {"workflow_id": workflow_id})
raise
Performance Optimization
Async Database Operations
python
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
import asyncio
Async database engine
DATABASE_URL = "postgresql+asyncpg://user:password@localhost/automation_db"
engine = create_async_engine(DATABASE_URL, echo=True)
AsyncSessionLocal = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
async def get_async_db():
"""Async database session dependency."""
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
@router.get("/workflows/async")
async def list_workflows_async(
skip: int = 0,
limit: int = 100,
db: AsyncSession = Depends(get_async_db)
):
"""Async endpoint for listing workflows."""
from sqlalchemy import select
from app.models.database import Workflow
stmt = select(Workflow).offset(skip).limit(limit)
result = await db.execute(stmt)
workflows = result.scalars().all()
return workflows
Caching for Performance
python
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import redis
Initialize cache
redis_client = redis.Redis(host="localhost", port=6379, db=0)
FastAPICache.init(RedisBackend(redis_client), prefix="automation-cache")
@router.get("/workflows/cached")
@cache(expire=300) # Cache for 5 minutes
async def get_cached_workflows(
db: Session = Depends(get_db),
service: WorkflowService = Depends()
):
"""Get workflows with Redis caching."""
return service.get_workflows(db)
@router.post("/workflows/invalidate-cache")
async def invalidate_workflow_cache(
workflow_id: Optional[str] = None
):
"""Invalidate workflow cache entries."""
if workflow_id:
# Invalidate specific workflow
await FastAPICache.clear(key=f"workflow:{workflow_id}")
else:
# Invalidate all workflow caches
await FastAPICache.clear(namespace="workflows")
return {"message": "Cache invalidated"}
Deployment and DevOps
Docker Configuration
dockerfile
Dockerfile
FROM python:3.11-slim
WORKDIR /app
Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
Copy requirements
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
Copy application
COPY . .
Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
Run application
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
Docker Compose for Development
yaml
docker-compose.yml
version: '3.8'
services:
automation-api:
build: .
ports:
- - "8000:8000"
environment:
- - DATABASE_URL=postgresql://postgres:password@db/automation_db
- - REDIS_URL=redis://redis:6379/0
depends_on:
- - db
- - redis
volumes:
- - ./app:/app/app
- - ./logs:/app/logs
db:
image: postgres:15
environment:
- - POSTGRES_USER=postgres
- - POSTGRES_PASSWORD=password
- - POSTGRES_DB=automation_db
volumes:
- - postgres_data:/var/lib/postgresql/data
ports:
- - "5432:5432"
redis:
image: redis:7-alpine
ports:
- - "6379:6379"
n8n:
image: n8nio/n8n
ports:
- - "5678:5678"
environment:
- - N8N_BASIC_AUTH_ACTIVE=true
- - N8N_BASIC_AUTH_USER=admin
- - N8N_BASIC_AUTH_PASSWORD=password
volumes:
- - n8n_data:/home/node/.n8n
volumes:
postgres_data:
n8n_data:
Testing Automation APIs
Unit Tests with pytest
python
tests/test_workflows.py
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from app.main import app
from app.core.database import Base, get_db
Test database
SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db"
engine = create_engine(
SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False}
)
TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Override get_db dependency
def override_get_db():
try:
db = TestingSessionLocal()
yield db
finally:
db.close()
app.dependency_overrides[get_db] = override_get_db
client = TestClient(app)
def test_trigger_workflow():
"""Test workflow triggering endpoint."""
response = client.post(
"/trigger-workflow",
json={
"workflow_id": "test-workflow",
"trigger_data": {"test": "data"},
"priority": "high"
}
)
assert response.status_code == 200
data = response.json()
assert "execution_id" in data
assert data["status"] == "queued"
def test_get_workflow_status():
"""Test workflow status endpoint."""
# First trigger a workflow
trigger_response = client.post(
"/trigger-workflow",
json={"workflow_id": "status-test", "trigger_data": {}}
)
execution_id = trigger_response.json()["execution_id"]
# Then check status
status_response = client.get(f"/workflow-status/{execution_id}")
assert status_response.status_code == 200
assert "status" in status_response.json()
@pytest.mark.asyncio
async def test_async_workflow_execution():
"""Test async workflow execution."""
from app.services.workflow_service import WorkflowService
service = WorkflowService()
result = await service.execute_async_workflow(
"async-test",
{"data": "test"}
)
assert result is not None
assert "task_id" in result
Best Practices for Automation Backends
1. Idempotency for Reliable Automation
python
from fastapi import Header
import hashlib
def generate_idempotency_key(workflow_id: str, data: dict) -> str:
"""Generate idempotency key from workflow and data."""
data_str = json.dumps(data, sort_keys=True)
key_string = f"{workflow_id}:{data_str}"
return hashlib.sha256(key_string.encode()).hexdigest()
@router.post("/idempotent-workflow")
async def idempotent_workflow_trigger(
trigger: WorkflowTrigger,
idempotency_key: Optional[str] = Header(None),
db: Session = Depends(get_db)
):
"""
Idempotent workflow trigger.
Same request with same idempotency key returns same result.
"""
# Generate or use provided idempotency key
if not idempotency_key:
idempotency_key = generate_idempotency_key(
trigger.workflow_id,
trigger.trigger_data
)
# Check if this request was already processed
existing = db.query(WorkflowExecution).filter(
WorkflowExecution.idempotency_key == idempotency_key
).first()
if existing:
# Return existing result
return {
"execution_id": existing.execution_id,
"status": existing.status,
"idempotent": True,
"previous_execution": existing.started_at.isoformat()
}
# Process new execution
execution = await process_workflow(trigger)
execution.idempotency_key = idempotency_key
db.add(execution)
db.commit()
return {
"execution_id": execution.execution_id,
"status": execution.status,
"idempotent": False
}
2. Rate Limiting for Internal Services
python
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
Initialize rate limiter
limiter = Limiter(key_func=get_remote_address)
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
@router.post("/rate-limited-trigger")
@limiter.limit("10/minute")
async def rate_limited_workflow(
request: Request,
trigger: WorkflowTrigger
):
"""Workflow trigger with rate limiting."""
return await trigger_workflow(trigger)
3. Circuit Breaker Pattern
python
from pybreaker import CircuitBreaker
import time
Circuit breaker for external dependencies
workflow_breaker = CircuitBreaker(
fail_max=5, # 5 failures before opening
reset_timeout=60 # 60 seconds before half-open
)
@workflow_breaker
async def call_external_service(url: str, data: dict):
"""Call external service with circuit breaker protection."""
async with httpx.AsyncClient() as client:
response = await client.post(url, json=data, timeout=10.0)
response.raise_for_status()
return response.json()
@router.post("/circuit-breaker-workflow")
async def circuit_breaker_workflow(trigger: WorkflowTrigger):
"""Workflow with circuit breaker for external dependencies."""
try:
result = await call_external_service(
"https://external-service.com/api",
trigger.trigger_data
)
return {"status": "success", "result": result}
except Exception as e:
return {
"status": "partial_failure",
"message": "External service unavailable, using fallback",
"fallback_result": process_locally(trigger.trigger_data)
}
Real-World Use Cases
1. Data Pipeline Orchestration
python
@router.post("/data-pipeline")
async def trigger_data_pipeline(pipeline_config: dict):
"""
Orchestrate complex data pipelines.
Example pipeline:
1. Extract data from source
2. Transform/clean data
3. Load to destination
4. Send notifications
5. Update metadata
"""
pipeline_id = pipeline_config.get("pipeline_id")
steps = pipeline_config.get("steps", [])
results = []
for step in steps:
step_result = await execute_pipeline_step(step)
results.append({
"step": step["name"],
"status": "completed" if step_result else "failed",
"result": step_result
})
# Send completion notification
await send_pipeline_notification(pipeline_id, results)
return {
"pipeline_id": pipeline_id,
"status": "completed",
"steps_executed": len(results),
"results": results
}
2. Scheduled Task Management
python
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger
scheduler = AsyncIOScheduler()
@router.post("/schedule-task")
async def schedule_automation_task(
task_config: dict,
cron_expression: str
):
"""Schedule recurring automation task."""
task_id = task_config.get("task_id")
# Add to scheduler
scheduler.add_job(
execute_scheduled_task,
CronTrigger.from_crontab(cron_expression),
args=[task_config],
id=task_id,
replace_existing=True
)
if not scheduler.running:
scheduler.start()
return {
"task_id": task_id,
"scheduled": True,
"cron_expression": cron_expression,
"next_run": scheduler.get_job(task_id).next_run_time.isoformat()
}
3. Webhook Aggregation Service
`python
@router.post("/webhook-aggregator")
async def webhook_aggregator(
source: str,
event_type: str,
payload: dict,
background_tasks: BackgroundTasks
):
"""
Aggregate webhooks from multiple sources.
- - Receive webhooks from various services
- - Normalize data format
- - Route to appropriate handlers
- - Store for audit trail
Need Help Building Your Automation Workflows?
Our team specializes in designing and implementing production-grade automation systems using n8n and other enterprise tools.
Get Free Consultation