FastAPI for Internal Automation Services: Building Robust Backends for Workflow Orchestration

Meta Description: Learn how to build scalable, type-safe REST APIs with FastAPI for internal automation services. This comprehensive guide covers Python backend development, microservices architecture, n8n integration, authentication, and best practices for automation engineers. Keywords: fastapi, python, rest api, backend, microservices, automation, n8n, workflow, api design, backend services, python backend, automation api, internal tools, microservices architecture

---

Article Hero

Category: Backend Development • Date: February 27, 2026 • Read Time: 15-18 minutes Lead Paragraph: As automation engineers, we often need to build internal APIs that serve as the backbone for our workflow orchestration. FastAPI emerges as the perfect tool for this job—combining Python's simplicity with enterprise-grade performance, automatic documentation, and type safety. This guide walks through building production-ready automation services that integrate seamlessly with n8n and other workflow tools.

---

Why FastAPI is Perfect for Automation Backends

When building internal automation services, you need a framework that's:

1. Fast to develop - Automation projects often have tight deadlines 2. Easy to maintain - Internal tools need to be reliable with minimal oversight 3. Well-documented - Team members should understand APIs without digging through code 4. Performant - Automation backends handle concurrent workflows efficiently 5. Type-safe - Catch errors before they break production workflows

FastAPI delivers on all these requirements while providing automatic OpenAPI documentation, dependency injection, and async support out of the box.

The Automation Backend Stack

python

Typical FastAPI automation service architecture

from fastapi import FastAPI, Depends, HTTPException from pydantic import BaseModel from typing import List, Optional import asyncio from datetime import datetime

Your automation logic here

class AutomationService: def __init__(self): self.running_workflows = {} async def trigger_workflow(self, workflow_id: str, data: dict): """Trigger an automation workflow""" # Implementation pass

Setting Up Your FastAPI Automation Project

Project Structure for Automation Services


automation-backend/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI app instance
│   ├── api/
│   │   ├── __init__.py
│   │   ├── v1/
│   │   │   ├── __init__.py
│   │   │   ├── endpoints/
│   │   │   │   ├ workflows.py
│   │   │   │   ├ triggers.py
│   │   │   │   └── tasks.py
│   │   │   └── dependencies.py
│   ├── core/
│   │   ├── config.py        # Configuration management
│   │   ├── security.py      # Authentication & authorization
│   │   └── database.py      # Database connections
│   ├── models/
│   │   ├── schemas.py       # Pydantic models
│   │   └── database.py      # SQLAlchemy models
│   ├── services/
│   │   ├── workflow_service.py
│   │   ├── task_service.py
│   │   └── notification_service.py
│   └── utils/
│       ├── logging.py
│       └── helpers.py
├── tests/
├── requirements.txt
├── Dockerfile
└── docker-compose.yml

Installation and Basic Setup

bash

Create virtual environment

python -m venv venv source venv/bin/activate # On Windows: venv\Scripts\activate

Install FastAPI and dependencies

pip install fastapi[all] uvicorn sqlalchemy pydantic python-jose[cryptography] passlib[bcrypt]

For production

pip install gunicorn httpx redis celery

Minimal Automation API Example

python
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from datetime import datetime

app = FastAPI( title="Automation Service API", description="Internal API for workflow automation", version="1.0.0", docs_url="/docs", redoc_url="/redoc" )

class WorkflowTrigger(BaseModel): workflow_id: str trigger_data: dict priority: Optional[str] = "normal" scheduled_for: Optional[datetime] = None

class WorkflowResponse(BaseModel): execution_id: str status: str started_at: datetime estimated_completion: Optional[datetime]

@app.post("/trigger-workflow", response_model=WorkflowResponse) async def trigger_workflow(trigger: WorkflowTrigger): """ Trigger an automation workflow with provided data.

  • - workflow_id: ID of the workflow to trigger
  • - trigger_data: JSON data to pass to the workflow
  • - priority: Execution priority (low/normal/high)
  • - scheduled_for: Optional future execution time
""" # Simulate workflow execution execution_id = f"exec_{datetime.now().timestamp()}" return WorkflowResponse( execution_id=execution_id, status="queued", started_at=datetime.now(), estimated_completion=datetime.now() )

@app.get("/workflow-status/{execution_id}") async def get_workflow_status(execution_id: str): """Get the current status of a workflow execution.""" # Implementation would check database or task queue return { "execution_id": execution_id, "status": "completed", "progress": 100, "result": {"message": "Workflow executed successfully"} }

Building Automation-Focused API Endpoints

Workflow Management Endpoints

python
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from typing import List

from app.core.database import get_db from app.models.schemas import Workflow, WorkflowCreate, WorkflowUpdate from app.services.workflow_service import WorkflowService

router = APIRouter(prefix="/workflows", tags=["workflows"])

@router.get("/", response_model=List[Workflow]) async def list_workflows( skip: int = 0, limit: int = 100, db: Session = Depends(get_db), service: WorkflowService = Depends() ): """List all available automation workflows.""" return service.get_workflows(db, skip=skip, limit=limit)

@router.post("/", response_model=Workflow, status_code=status.HTTP_201_CREATED) async def create_workflow( workflow: WorkflowCreate, db: Session = Depends(get_db), service: WorkflowService = Depends() ): """Register a new automation workflow.""" return service.create_workflow(db, workflow)

@router.post("/{workflow_id}/execute") async def execute_workflow( workflow_id: str, input_data: dict, db: Session = Depends(get_db), service: WorkflowService = Depends() ): """Execute a specific workflow with input data.""" result = service.execute_workflow(db, workflow_id, input_data) if not result: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail=f"Workflow {workflow_id} not found" ) return { "execution_id": result.execution_id, "status": "started", "workflow": workflow_id, "started_at": datetime.now().isoformat() }

Task Queue Management

python
from fastapi import BackgroundTasks
from celery import Celery
import json

Celery configuration for async task processing

celery_app = Celery( 'automation_tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0' )

@router.post("/tasks/async") async def create_async_task( task_type: str, parameters: dict, background_tasks: BackgroundTasks ): """ Create an asynchronous automation task. Useful for long-running operations like:

  • - Data processing
  • - File generation
  • - External API calls
  • - Batch operations
""" # Add to background tasks (for lightweight operations) background_tasks.add_task(process_automation_task, task_type, parameters) # Or use Celery for distributed task processing celery_task = celery_app.send_task( 'process_automation', args=[task_type, parameters] ) return { "task_id": celery_task.id, "status": "queued", "message": f"Task {task_type} queued for processing" }

async def process_automation_task(task_type: str, parameters: dict): """Background task processor""" # Your automation logic here print(f"Processing {task_type} with params: {parameters}")

Database Integration for Automation State

SQLAlchemy Models for Automation

python
from sqlalchemy import Column, String, DateTime, JSON, Integer, Boolean
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime

Base = declarative_base()

class WorkflowExecution(Base): __tablename__ = "workflow_executions" id = Column(Integer, primary_key=True, index=True) execution_id = Column(String, unique=True, index=True) workflow_id = Column(String, index=True) status = Column(String, default="pending") # pending, running, completed, failed input_data = Column(JSON) output_data = Column(JSON, nullable=True) started_at = Column(DateTime, default=datetime.utcnow) completed_at = Column(DateTime, nullable=True) error_message = Column(String, nullable=True) class AutomationTask(Base): __tablename__ = "automation_tasks" id = Column(Integer, primary_key=True, index=True) task_id = Column(String, unique=True, index=True) task_type = Column(String) # email, webhook, data_transform, etc. parameters = Column(JSON) status = Column(String, default="queued") priority = Column(Integer, default=0) scheduled_for = Column(DateTime, nullable=True) created_at = Column(DateTime, default=datetime.utcnow) updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)

Pydantic Schemas for API Validation

python
from pydantic import BaseModel, Field
from typing import Optional, Dict, Any
from datetime import datetime

class WorkflowExecutionBase(BaseModel): workflow_id: str = Field(..., description="ID of the workflow to execute") trigger_data: Dict[str, Any] = Field(default_factory=dict) priority: Optional[str] = "normal" scheduled_for: Optional[datetime] = None

class WorkflowExecutionCreate(WorkflowExecutionBase): pass

class WorkflowExecution(WorkflowExecutionBase): execution_id: str status: str started_at: datetime completed_at: Optional[datetime] output_data: Optional[Dict[str, Any]] class Config: orm_mode = True

class TaskStatus(BaseModel): task_id: str status: str progress: Optional[int] = None result: Optional[Dict[str, Any]] = None error: Optional[str] = None created_at: datetime updated_at: datetime

Authentication and Authorization for Internal Services

API Key Authentication

python
from fastapi import Depends, HTTPException, status
from fastapi.security import APIKeyHeader
from sqlalchemy.orm import Session

from app.core.database import get_db from app.models.database import APIKey

api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)

async def get_api_key( api_key: str = Depends(api_key_header), db: Session = Depends(get_db) ): """Validate API key for internal service access.""" if not api_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="API key required" ) # Check if API key exists and is valid db_key = db.query(APIKey).filter( APIKey.key == api_key, APIKey.is_active == True ).first() if not db_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key" ) # Check expiration if db_key.expires_at and db_key.expires_at < datetime.utcnow(): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="API key expired" ) return db_key

Usage in endpoints

@router.post("/secure-trigger") async def secure_workflow_trigger( trigger: WorkflowTrigger, api_key: APIKey = Depends(get_api_key) ): """Secure endpoint requiring API key authentication.""" # Only accessible with valid API key return await trigger_workflow(trigger)

Role-Based Access Control

python
from enum import Enum
from functools import wraps

class UserRole(str, Enum): VIEWER = "viewer" OPERATOR = "operator" ADMIN = "admin" SUPER_ADMIN = "super_admin"

def require_role(required_role: UserRole): """Decorator to require specific user role.""" def decorator(func): @wraps(func) async def wrapper(*args, **kwargs): # Get current user from dependency current_user = kwargs.get("current_user") if not current_user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Authentication required" ) # Check role hierarchy role_hierarchy = { UserRole.VIEWER: 1, UserRole.OPERATOR: 2, UserRole.ADMIN: 3, UserRole.SUPER_ADMIN: 4 } user_role_level = role_hierarchy.get(current_user.role, 0) required_role_level = role_hierarchy[required_role] if user_role_level < required_role_level: raise HTTPException( status_code=status.HTTP_403_FORBIDDEN, detail=f"Requires {required_role.value} role or higher" ) return await func(*args, **kwargs) return wrapper return decorator

Usage

@router.post("/admin/workflows") @require_role(UserRole.ADMIN) async def create_admin_workflow( workflow: WorkflowCreate, current_user: User = Depends(get_current_user) ): """Create workflow (admin only).""" # Admin-only logic here pass

Integrating with n8n and Other Automation Tools

Webhook Endpoints for n8n Integration

python
from fastapi import Request
import hashlib
import hmac

@router.post("/webhooks/n8n/{workflow_id}")
async def n8n_webhook(
    workflow_id: str,
    request: Request,
    x_n8n_signature: Optional[str] = Header(None)
):
    """
    Webhook endpoint for n8n workflow triggers.
    
    n8n can call this endpoint to:
  • - Trigger internal workflows
  • - Pass data between n8n and custom services
  • - Handle complex logic outside n8n
""" # Verify webhook signature (if configured) if x_n8n_signature: body = await request.body() secret = os.getenv("N8N_WEBHOOK_SECRET", "").encode() expected_signature = hmac.new( secret, body, hashlib.sha256 ).hexdigest() if not hmac.compare_digest(x_n8n_signature, expected_signature): raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid webhook signature" ) # Parse incoming data data = await request.json() # Process based on workflow ID if workflow_id == "data-processing": result = await process_data_pipeline(data) elif workflow_id == "notification": result = await send_notifications(data) else: result = {"status": "unknown_workflow", "data": data} return { "webhook_received": True, "workflow_id": workflow_id, "processed": True, "result": result }

Calling External APIs from FastAPI

python
import httpx
from fastapi import HTTPException

class ExternalAPIService: def __init__(self): self.client = httpx.AsyncClient(timeout=30.0) async def call_n8n_webhook(self, webhook_url: str, data: dict): """Call n8n webhook from FastAPI service.""" try: response = await self.client.post( webhook_url, json=data, headers={"Content-Type": "application/json"} ) response.raise_for_status() return response.json() except httpx.RequestError as e: raise HTTPException( status_code=status.HTTP_502_BAD_GATEWAY, detail=f"Failed to call n8n webhook: {str(e)}" ) async def trigger_n8n_workflow(self, workflow_id: str, data: dict): """Trigger a specific n8n workflow.""" # Assuming n8n is running locally on port 5678 n8n_url = f"http://localhost:5678/webhook/{workflow_id}" return await self.call_n8n_webhook(n8n_url, data)

Usage in endpoint

@router.post("/trigger-n8n-workflow") async def trigger_external_n8n_workflow( workflow_id: str, trigger_data: dict, api_service: ExternalAPIService = Depends() ): """Trigger an n8n workflow from FastAPI.""" result = await api_service.trigger_n8n_workflow(workflow_id, trigger_data) return { "message": f"n8n workflow {workflow_id} triggered", "n8n_response": result }

Error Handling and Retry Logic

Robust Error Handling for Automation Services

python
from fastapi import HTTPException
from tenacity import retry, stop_after_attempt, wait_exponential
import logging

logger = logging.getLogger(__name__)

class AutomationError(Exception): """Custom exception for automation failures.""" def __init__(self, message: str, retryable: bool = True): self.message = message self.retryable = retryable super().__init__(self.message)

@retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=4, max=10) ) async def execute_with_retry(workflow_func, *args, **kwargs): """Execute automation function with retry logic.""" try: return await workflow_func(*args, **kwargs) except Exception as e: logger.error(f"Automation failed: {str(e)}") # Check if error is retryable if isinstance(e, AutomationError) and not e.retryable: raise # Re-raise for retry raise

@router.post("/robust-workflow") async def robust_workflow_execution(trigger: WorkflowTrigger): """Execute workflow with built-in retry logic.""" try: result = await execute_with_retry( process_complex_workflow, trigger.workflow_id, trigger.trigger_data ) return {"status": "success", "result": result} except Exception as e: raise HTTPException( status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Workflow failed after retries: {str(e)}" )

Monitoring and Logging

Structured Logging for Automation Services

python
import structlog
from datetime import datetime

Configure structured logging

structlog.configure( processors=[ structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer() ] )

logger = structlog.get_logger()

class WorkflowLogger: def __init__(self, execution_id: str): self.execution_id = execution_id self.logger = logger.bind(execution_id=execution_id) def log_step(self, step_name: str, data: dict = None): """Log a workflow step with execution context.""" log_data = { "step": step_name, "timestamp": datetime.utcnow().isoformat() } if data: log_data.update(data) self.logger.info("workflow_step", **log_data) def log_error(self, error: Exception, context: dict = None): """Log workflow error with context.""" error_data = { "error_type": type(error).__name__, "error_message": str(error), "timestamp": datetime.utcnow().isoformat() } if context: error_data.update(context) self.logger.error("workflow_error", **error_data)

Usage in workflow service

async def execute_workflow_with_logging(workflow_id: str, data: dict): workflow_logger = WorkflowLogger(f"exec_{workflow_id}") try: workflow_logger.log_step("start", {"input_data": data}) # Process workflow steps workflow_logger.log_step("data_validation") validated_data = validate_data(data) workflow_logger.log_step("external_api_call") api_result = await call_external_api(validated_data) workflow_logger.log_step("complete", {"result": api_result}) return api_result except Exception as e: workflow_logger.log_error(e, {"workflow_id": workflow_id}) raise

Performance Optimization

Async Database Operations

python
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
import asyncio

Async database engine

DATABASE_URL = "postgresql+asyncpg://user:password@localhost/automation_db" engine = create_async_engine(DATABASE_URL, echo=True) AsyncSessionLocal = sessionmaker( engine, class_=AsyncSession, expire_on_commit=False )

async def get_async_db(): """Async database session dependency.""" async with AsyncSessionLocal() as session: try: yield session finally: await session.close()

@router.get("/workflows/async") async def list_workflows_async( skip: int = 0, limit: int = 100, db: AsyncSession = Depends(get_async_db) ): """Async endpoint for listing workflows.""" from sqlalchemy import select from app.models.database import Workflow stmt = select(Workflow).offset(skip).limit(limit) result = await db.execute(stmt) workflows = result.scalars().all() return workflows

Caching for Performance

python
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
import redis

Initialize cache

redis_client = redis.Redis(host="localhost", port=6379, db=0) FastAPICache.init(RedisBackend(redis_client), prefix="automation-cache")

@router.get("/workflows/cached") @cache(expire=300) # Cache for 5 minutes async def get_cached_workflows( db: Session = Depends(get_db), service: WorkflowService = Depends() ): """Get workflows with Redis caching.""" return service.get_workflows(db)

@router.post("/workflows/invalidate-cache") async def invalidate_workflow_cache( workflow_id: Optional[str] = None ): """Invalidate workflow cache entries.""" if workflow_id: # Invalidate specific workflow await FastAPICache.clear(key=f"workflow:{workflow_id}") else: # Invalidate all workflow caches await FastAPICache.clear(namespace="workflows") return {"message": "Cache invalidated"}

Deployment and DevOps

Docker Configuration

dockerfile

Dockerfile

FROM python:3.11-slim

WORKDIR /app

Install system dependencies

RUN apt-get update && apt-get install -y \ gcc \ && rm -rf /var/lib/apt/lists/*

Copy requirements

COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt

Copy application

COPY . .

Create non-root user

RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app USER appuser

Run application

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

Docker Compose for Development

yaml

docker-compose.yml

version: '3.8' services: automation-api: build: . ports:
  • - "8000:8000"
environment:
  • - DATABASE_URL=postgresql://postgres:password@db/automation_db
  • - REDIS_URL=redis://redis:6379/0
depends_on:
  • - db
  • - redis
volumes:
  • - ./app:/app/app
  • - ./logs:/app/logs
db: image: postgres:15 environment:
  • - POSTGRES_USER=postgres
  • - POSTGRES_PASSWORD=password
  • - POSTGRES_DB=automation_db
volumes:
  • - postgres_data:/var/lib/postgresql/data
ports:
  • - "5432:5432"
redis: image: redis:7-alpine ports:
  • - "6379:6379"
n8n: image: n8nio/n8n ports:
  • - "5678:5678"
environment:
  • - N8N_BASIC_AUTH_ACTIVE=true
  • - N8N_BASIC_AUTH_USER=admin
  • - N8N_BASIC_AUTH_PASSWORD=password
volumes:
  • - n8n_data:/home/node/.n8n
volumes: postgres_data: n8n_data:

Testing Automation APIs

Unit Tests with pytest

python

tests/test_workflows.py

import pytest from fastapi.testclient import TestClient from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker

from app.main import app from app.core.database import Base, get_db

Test database

SQLALCHEMY_DATABASE_URL = "sqlite:///./test.db" engine = create_engine( SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False} ) TestingSessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)

Override get_db dependency

def override_get_db(): try: db = TestingSessionLocal() yield db finally: db.close()

app.dependency_overrides[get_db] = override_get_db

client = TestClient(app)

def test_trigger_workflow(): """Test workflow triggering endpoint.""" response = client.post( "/trigger-workflow", json={ "workflow_id": "test-workflow", "trigger_data": {"test": "data"}, "priority": "high" } ) assert response.status_code == 200 data = response.json() assert "execution_id" in data assert data["status"] == "queued"

def test_get_workflow_status(): """Test workflow status endpoint.""" # First trigger a workflow trigger_response = client.post( "/trigger-workflow", json={"workflow_id": "status-test", "trigger_data": {}} ) execution_id = trigger_response.json()["execution_id"] # Then check status status_response = client.get(f"/workflow-status/{execution_id}") assert status_response.status_code == 200 assert "status" in status_response.json()

@pytest.mark.asyncio async def test_async_workflow_execution(): """Test async workflow execution.""" from app.services.workflow_service import WorkflowService service = WorkflowService() result = await service.execute_async_workflow( "async-test", {"data": "test"} ) assert result is not None assert "task_id" in result

Best Practices for Automation Backends

1. Idempotency for Reliable Automation

python
from fastapi import Header
import hashlib

def generate_idempotency_key(workflow_id: str, data: dict) -> str: """Generate idempotency key from workflow and data.""" data_str = json.dumps(data, sort_keys=True) key_string = f"{workflow_id}:{data_str}" return hashlib.sha256(key_string.encode()).hexdigest()

@router.post("/idempotent-workflow") async def idempotent_workflow_trigger( trigger: WorkflowTrigger, idempotency_key: Optional[str] = Header(None), db: Session = Depends(get_db) ): """ Idempotent workflow trigger. Same request with same idempotency key returns same result. """ # Generate or use provided idempotency key if not idempotency_key: idempotency_key = generate_idempotency_key( trigger.workflow_id, trigger.trigger_data ) # Check if this request was already processed existing = db.query(WorkflowExecution).filter( WorkflowExecution.idempotency_key == idempotency_key ).first() if existing: # Return existing result return { "execution_id": existing.execution_id, "status": existing.status, "idempotent": True, "previous_execution": existing.started_at.isoformat() } # Process new execution execution = await process_workflow(trigger) execution.idempotency_key = idempotency_key db.add(execution) db.commit() return { "execution_id": execution.execution_id, "status": execution.status, "idempotent": False }

2. Rate Limiting for Internal Services

python
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

Initialize rate limiter

limiter = Limiter(key_func=get_remote_address) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) @router.post("/rate-limited-trigger") @limiter.limit("10/minute") async def rate_limited_workflow( request: Request, trigger: WorkflowTrigger ): """Workflow trigger with rate limiting.""" return await trigger_workflow(trigger)

3. Circuit Breaker Pattern

python
from pybreaker import CircuitBreaker
import time

Circuit breaker for external dependencies

workflow_breaker = CircuitBreaker( fail_max=5, # 5 failures before opening reset_timeout=60 # 60 seconds before half-open )

@workflow_breaker async def call_external_service(url: str, data: dict): """Call external service with circuit breaker protection.""" async with httpx.AsyncClient() as client: response = await client.post(url, json=data, timeout=10.0) response.raise_for_status() return response.json()

@router.post("/circuit-breaker-workflow") async def circuit_breaker_workflow(trigger: WorkflowTrigger): """Workflow with circuit breaker for external dependencies.""" try: result = await call_external_service( "https://external-service.com/api", trigger.trigger_data ) return {"status": "success", "result": result} except Exception as e: return { "status": "partial_failure", "message": "External service unavailable, using fallback", "fallback_result": process_locally(trigger.trigger_data) }

Real-World Use Cases

1. Data Pipeline Orchestration

python
@router.post("/data-pipeline")
async def trigger_data_pipeline(pipeline_config: dict):
    """
    Orchestrate complex data pipelines.
    
    Example pipeline:
    1. Extract data from source
    2. Transform/clean data
    3. Load to destination
    4. Send notifications
    5. Update metadata
    """
    
    pipeline_id = pipeline_config.get("pipeline_id")
    steps = pipeline_config.get("steps", [])
    
    results = []
    for step in steps:
        step_result = await execute_pipeline_step(step)
        results.append({
            "step": step["name"],
            "status": "completed" if step_result else "failed",
            "result": step_result
        })
    
    # Send completion notification
    await send_pipeline_notification(pipeline_id, results)
    
    return {
        "pipeline_id": pipeline_id,
        "status": "completed",
        "steps_executed": len(results),
        "results": results
    }

2. Scheduled Task Management

python
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.cron import CronTrigger

scheduler = AsyncIOScheduler()

@router.post("/schedule-task") async def schedule_automation_task( task_config: dict, cron_expression: str ): """Schedule recurring automation task.""" task_id = task_config.get("task_id") # Add to scheduler scheduler.add_job( execute_scheduled_task, CronTrigger.from_crontab(cron_expression), args=[task_config], id=task_id, replace_existing=True ) if not scheduler.running: scheduler.start() return { "task_id": task_id, "scheduled": True, "cron_expression": cron_expression, "next_run": scheduler.get_job(task_id).next_run_time.isoformat() }

3. Webhook Aggregation Service

`python @router.post("/webhook-aggregator") async def webhook_aggregator( source: str, event_type: str, payload: dict, background_tasks: BackgroundTasks ): """ Aggregate webhooks from multiple sources.
  • - Receive webhooks from various services
  • - Normalize data format
  • - Route to appropriate handlers
  • - Store for audit trail
""" # Normalize incoming webhook normalized_event = normalize_webhook(source, event_type, payload) # Store for audit background_tasks.add_task(store_webhook_audit, normalized_event) # Route to appropriate handler if event_type == "user.created": background_tasks.add_task(handle_user_creation, normalized_event) elif event_type == "order.completed": background_tasks.add_task(handle_order_completion, normalized_event) elif event_type == "payment.failed": background_tasks.add_task(handle_payment_failure, normalized_event) return { "received": True, "source":