Build a Document Pipeline in Python: From Database to PDF

A complete tutorial for building a Python document pipeline that queries a database, formats data with Jinja2, generates PDFs via API, and delivers them via email or S3.

By LightningPDF Team · · 3 min read

I needed to generate 2,000 personalized reports from a PostgreSQL database every Monday morning. Each report pulled different data for a different client, used a shared HTML template, and had to land in the client's inbox by 9 AM.

The first version was a mess — a 400-line script with SQL queries mixed into string concatenation, error handling via try/except: pass, and a Gmail SMTP connection that dropped after 500 sends. It took 45 minutes to run and failed silently about 10% of the time.

The second version was 120 lines, ran in 8 minutes, and never failed silently. Here's how to build it.

What we're building

A Python pipeline with four stages:

  1. Query — Pull data from a database
  2. Render — Inject data into an HTML template using Jinja2
  3. Generate — Convert HTML to PDF via LightningPDF's API
  4. Deliver — Email the PDF or upload to S3

Each stage is a function. You can test them independently, swap implementations, and add retry logic where it matters.

Project setup

mkdir doc-pipeline && cd doc-pipeline
python -m venv venv
source venv/bin/activate
pip install requests jinja2 psycopg2-binary boto3

The dependencies:

  • requests — HTTP client for the PDF API
  • jinja2 — template engine
  • psycopg2-binary — PostgreSQL driver (swap for mysql-connector-python, sqlite3, etc.)
  • boto3 — AWS S3 uploads (optional, for archiving)

Stage 1: Query the database

Let's say you're generating monthly account statements. Each client gets a summary of their transactions for the previous month.

# pipeline/query.py
import psycopg2
from dataclasses import dataclass, field
from datetime import date
from decimal import Decimal

@dataclass
class Transaction:
    date: date
    description: str
    amount: Decimal
    category: str

@dataclass
class ClientReport:
    client_id: int
    client_name: str
    client_email: str
    account_number: str
    period_start: date
    period_end: date
    transactions: list[Transaction] = field(default_factory=list)

    @property
    def total_credits(self) -> Decimal:
        return sum(t.amount for t in self.transactions if t.amount > 0)

    @property
    def total_debits(self) -> Decimal:
        return sum(t.amount for t in self.transactions if t.amount < 0)

    @property
    def net(self) -> Decimal:
        return self.total_credits + self.total_debits


def fetch_client_reports(dsn: str, period_start: date, period_end: date) -> list[ClientReport]:
    conn = psycopg2.connect(dsn)
    cursor = conn.cursor()

    # Get all active clients
    cursor.execute("""
        SELECT id, name, email, account_number
        FROM clients
        WHERE active = true
        ORDER BY name
    """)
    clients = cursor.fetchall()

    reports = []
    for client_id, name, email, account_num in clients:
        # Fetch transactions for this client in the period
        cursor.execute("""
            SELECT txn_date, description, amount, category
            FROM transactions
            WHERE client_id = %s
              AND txn_date >= %s
              AND txn_date <= %s
            ORDER BY txn_date
        """, (client_id, period_start, period_end))

        transactions = [
            Transaction(
                date=row[0],
                description=row[1],
                amount=Decimal(str(row[2])),
                category=row[3],
            )
            for row in cursor.fetchall()
        ]

        # Skip clients with no transactions
        if not transactions:
            continue

        reports.append(ClientReport(
            client_id=client_id,
            client_name=name,
            client_email=email,
            account_number=account_num,
            period_start=period_start,
            period_end=period_end,
            transactions=transactions,
        ))

    conn.close()
    return reports

A few things to note:

One query per client vs. one big query. For 2,000 clients, running 2,000 individual queries is slow. In production, I'd use a single query with a JOIN and group in Python:

def fetch_all_reports_fast(dsn: str, period_start: date, period_end: date) -> list[ClientReport]:
    conn = psycopg2.connect(dsn)
    cursor = conn.cursor()

    cursor.execute("""
        SELECT
            c.id, c.name, c.email, c.account_number,
            t.txn_date, t.description, t.amount, t.category
        FROM clients c
        JOIN transactions t ON t.client_id = c.id
        WHERE c.active = true
          AND t.txn_date >= %s
          AND t.txn_date <= %s
        ORDER BY c.id, t.txn_date
    """, (period_start, period_end))

    reports = {}
    for row in cursor.fetchall():
        cid = row[0]
        if cid not in reports:
            reports[cid] = ClientReport(
                client_id=cid,
                client_name=row[1],
                client_email=row[2],
                account_number=row[3],
                period_start=period_start,
                period_end=period_end,
            )
        reports[cid].transactions.append(Transaction(
            date=row[4],
            description=row[5],
            amount=Decimal(str(row[6])),
            category=row[7],
        ))

    conn.close()
    return list(reports.values())

This runs one query instead of 2,001. On a 50k-row transactions table, it finishes in ~200ms instead of ~8 seconds.

Use Decimal for money. Always. float(0.1) + float(0.2) == 0.30000000000000004. Your finance team will not find this funny.

Stage 2: Render HTML with Jinja2

Create a template that turns a ClientReport into styled HTML.

# pipeline/render.py
from jinja2 import Environment, FileSystemLoader
from pathlib import Path

# Load templates from a directory
env = Environment(
    loader=FileSystemLoader(Path(__file__).parent / "templates"),
    autoescape=True,
)

def render_report(report) -> str:
    template = env.get_template("account_statement.html")
    return template.render(report=report)

And the template itself:

{# pipeline/templates/account_statement.html #}
<!DOCTYPE html>
<html>
<head>
<style>
  * { margin: 0; padding: 0; box-sizing: border-box; }
  body {
    font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
    color: #1e293b;
    font-size: 12px;
    line-height: 1.5;
    padding: 40px;
  }
  .header {
    display: flex;
    justify-content: space-between;
    align-items: flex-start;
    border-bottom: 3px solid #0f172a;
    padding-bottom: 16px;
    margin-bottom: 24px;
  }
  .header h1 { font-size: 22px; color: #0f172a; }
  .header .period { color: #64748b; font-size: 13px; }
  .client-info {
    background: #f8fafc;
    padding: 16px;
    border-radius: 6px;
    margin-bottom: 24px;
  }
  .client-info h2 { font-size: 16px; margin-bottom: 4px; }
  .client-info p { color: #64748b; }
  .summary {
    display: flex;
    gap: 16px;
    margin-bottom: 24px;
  }
  .summary-box {
    flex: 1;
    padding: 16px;
    border-radius: 6px;
    text-align: center;
  }
  .summary-box.credits { background: #f0fdf4; border: 1px solid #bbf7d0; }
  .summary-box.debits { background: #fef2f2; border: 1px solid #fecaca; }
  .summary-box.net { background: #eff6ff; border: 1px solid #bfdbfe; }
  .summary-box .label { font-size: 11px; text-transform: uppercase; color: #64748b; }
  .summary-box .amount { font-size: 20px; font-weight: 700; margin-top: 4px; }
  table { width: 100%; border-collapse: collapse; }
  thead th {
    text-align: left;
    padding: 8px 10px;
    font-size: 10px;
    text-transform: uppercase;
    color: #94a3b8;
    border-bottom: 2px solid #e2e8f0;
  }
  thead th:nth-child(3) { text-align: right; }
  tbody td { padding: 7px 10px; border-bottom: 1px solid #f1f5f9; }
  tbody td:nth-child(3) { text-align: right; font-family: 'SF Mono', monospace; }
  .credit { color: #16a34a; }
  .debit { color: #dc2626; }
  .footer {
    margin-top: 32px;
    padding-top: 16px;
    border-top: 1px solid #e2e8f0;
    color: #94a3b8;
    font-size: 10px;
    text-align: center;
  }
  @media print {
    body { padding: 0; }
  }
</style>
</head>
<body>

<div class="header">
  <div>
    <h1>Account Statement</h1>
    <p>Acme Financial Services</p>
  </div>
  <div style="text-align: right;">
    <p class="period">
      {{ report.period_start.strftime('%B %d, %Y') }} &mdash;
      {{ report.period_end.strftime('%B %d, %Y') }}
    </p>
    <p style="color: #94a3b8; font-size: 11px;">
      Account {{ report.account_number }}
    </p>
  </div>
</div>

<div class="client-info">
  <h2>{{ report.client_name }}</h2>
  <p>{{ report.client_email }}</p>
</div>

<div class="summary">
  <div class="summary-box credits">
    <div class="label">Credits</div>
    <div class="amount credit">${{ "%.2f"|format(report.total_credits) }}</div>
  </div>
  <div class="summary-box debits">
    <div class="label">Debits</div>
    <div class="amount debit">${{ "%.2f"|format(report.total_debits|abs) }}</div>
  </div>
  <div class="summary-box net">
    <div class="label">Net</div>
    <div class="amount">${{ "%.2f"|format(report.net) }}</div>
  </div>
</div>

<table>
  <thead>
    <tr>
      <th>Date</th>
      <th>Description</th>
      <th>Amount</th>
      <th>Category</th>
    </tr>
  </thead>
  <tbody>
    {% for txn in report.transactions %}
    <tr>
      <td>{{ txn.date.strftime('%b %d') }}</td>
      <td>{{ txn.description }}</td>
      <td class="{{ 'credit' if txn.amount > 0 else 'debit' }}">
        {{ "$%.2f"|format(txn.amount) }}
      </td>
      <td>{{ txn.category }}</td>
    </tr>
    {% endfor %}
  </tbody>
</table>

<div class="footer">
  <p>This statement was generated automatically on {{ now.strftime('%B %d, %Y at %I:%M %p') }}.</p>
  <p>For questions, contact support@acmefinancial.com</p>
</div>

</body>
</html>

Update the render function to pass the current datetime:

from datetime import datetime

def render_report(report) -> str:
    template = env.get_template("account_statement.html")
    return template.render(report=report, now=datetime.now())

Stage 3: Generate PDFs

Send the rendered HTML to LightningPDF's API and get back PDF bytes.

# pipeline/generate.py
import requests
import time
from typing import Optional

API_URL = "https://lightningpdf.dev/api/v1/pdf/generate"

class PDFGenerator:
    def __init__(self, api_key: str, timeout: int = 30):
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json",
        })
        self.timeout = timeout

    def generate(self, html: str, retries: int = 2) -> bytes:
        payload = {
            "html": html,
            "options": {
                "format": "A4",
                "print_background": True,
                "margin": {
                    "top": "0.4in",
                    "right": "0.4in",
                    "bottom": "0.4in",
                    "left": "0.4in",
                }
            }
        }

        last_error: Optional[Exception] = None
        for attempt in range(retries + 1):
            try:
                resp = self.session.post(API_URL, json=payload, timeout=self.timeout)
                resp.raise_for_status()
                return resp.content
            except requests.exceptions.RequestException as e:
                last_error = e
                if attempt < retries:
                    wait = 2 ** attempt  # 1s, 2s
                    time.sleep(wait)

        raise RuntimeError(f"PDF generation failed after {retries + 1} attempts: {last_error}")

I'm using a requests.Session instead of plain requests.post. The session reuses TCP connections, which saves ~50ms per request when generating hundreds of PDFs. Over 2,000 documents, that's 100 seconds saved.

The retry logic uses exponential backoff. Network blips happen. A 1-second retry catches most transient failures without hammering the API.

Stage 4: Deliver

Two options: email and S3 archival. In production, I do both — email the client, archive in S3.

# pipeline/deliver.py
import smtplib
import boto3
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
from pathlib import Path

class EmailDelivery:
    def __init__(self, smtp_host: str, smtp_port: int, username: str, password: str):
        self.smtp_host = smtp_host
        self.smtp_port = smtp_port
        self.username = username
        self.password = password

    def send(self, to_email: str, subject: str, body: str,
             attachment: bytes, filename: str):
        msg = MIMEMultipart()
        msg["From"] = self.username
        msg["To"] = to_email
        msg["Subject"] = subject
        msg.attach(MIMEText(body, "plain"))

        part = MIMEBase("application", "pdf")
        part.set_payload(attachment)
        encoders.encode_base64(part)
        part.add_header("Content-Disposition", f"attachment; filename={filename}")
        msg.attach(part)

        with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
            server.starttls()
            server.login(self.username, self.password)
            server.send_message(msg)


class S3Archiver:
    def __init__(self, bucket: str, prefix: str = "reports"):
        self.s3 = boto3.client("s3")
        self.bucket = bucket
        self.prefix = prefix

    def upload(self, key: str, data: bytes, metadata: dict = None):
        self.s3.put_object(
            Bucket=self.bucket,
            Key=f"{self.prefix}/{key}",
            Body=data,
            ContentType="application/pdf",
            Metadata=metadata or {},
        )

The orchestrator

Now wire the four stages together:

# pipeline/main.py
import os
import logging
from datetime import date, timedelta
from dataclasses import dataclass

from pipeline.query import fetch_all_reports_fast
from pipeline.render import render_report
from pipeline.generate import PDFGenerator
from pipeline.deliver import EmailDelivery, S3Archiver

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger(__name__)

@dataclass
class PipelineConfig:
    db_dsn: str
    pdf_api_key: str
    smtp_host: str
    smtp_port: int
    smtp_user: str
    smtp_pass: str
    s3_bucket: str

def load_config() -> PipelineConfig:
    return PipelineConfig(
        db_dsn=os.environ["DATABASE_URL"],
        pdf_api_key=os.environ["LIGHTNINGPDF_KEY"],
        smtp_host=os.environ.get("SMTP_HOST", "smtp.example.com"),
        smtp_port=int(os.environ.get("SMTP_PORT", "587")),
        smtp_user=os.environ["SMTP_USER"],
        smtp_pass=os.environ["SMTP_PASS"],
        s3_bucket=os.environ.get("S3_BUCKET", "client-reports"),
    )

def run_pipeline():
    config = load_config()

    # Calculate the previous month
    today = date.today()
    period_end = today.replace(day=1) - timedelta(days=1)
    period_start = period_end.replace(day=1)

    log.info(f"Generating reports for {period_start} to {period_end}")

    # Stage 1: Query
    reports = fetch_all_reports_fast(config.db_dsn, period_start, period_end)
    log.info(f"Found {len(reports)} clients with transactions")

    # Initialize services
    pdf_gen = PDFGenerator(config.pdf_api_key)
    email = EmailDelivery(config.smtp_host, config.smtp_port,
                          config.smtp_user, config.smtp_pass)
    archiver = S3Archiver(config.s3_bucket)

    sent = 0
    failed = 0

    for report in reports:
        try:
            # Stage 2: Render
            html = render_report(report)

            # Stage 3: Generate PDF
            pdf_bytes = pdf_gen.generate(html)

            # Stage 4: Deliver
            filename = f"statement-{report.account_number}-{period_start.strftime('%Y%m')}.pdf"

            email.send(
                to_email=report.client_email,
                subject=f"Your Account Statement — {period_start.strftime('%B %Y')}",
                body=f"Hi {report.client_name},\n\nPlease find your account statement for "
                     f"{period_start.strftime('%B %Y')} attached.\n\nBest regards,\nAcme Financial",
                attachment=pdf_bytes,
                filename=filename,
            )

            archiver.upload(
                key=f"{period_start.strftime('%Y/%m')}/{filename}",
                data=pdf_bytes,
                metadata={
                    "client_id": str(report.client_id),
                    "period": period_start.strftime("%Y-%m"),
                },
            )

            sent += 1
            log.info(f"  [{sent}/{len(reports)}] Sent to {report.client_email}")

        except Exception as e:
            failed += 1
            log.error(f"  FAILED {report.client_name}: {e}")

    log.info(f"Done: {sent} sent, {failed} failed out of {len(reports)}")

if __name__ == "__main__":
    run_pipeline()

Running it

Set your environment variables and run:

export DATABASE_URL="postgresql://user:pass@localhost:5432/acme"
export LIGHTNINGPDF_KEY="lpdf_your_api_key"
export SMTP_USER="reports@acmefinancial.com"
export SMTP_PASS="smtp_password"
export SMTP_HOST="smtp.resend.com"
export SMTP_PORT="587"
export S3_BUCKET="acme-client-reports"

python -m pipeline.main

Output:

2026-04-01 09:00:01 INFO Generating reports for 2026-03-01 to 2026-03-31
2026-04-01 09:00:01 INFO Found 847 clients with transactions
2026-04-01 09:00:02 INFO   [1/847] Sent to alice@example.com
2026-04-01 09:00:03 INFO   [2/847] Sent to bob@example.com
...
2026-04-01 09:07:44 INFO Done: 843 sent, 4 failed out of 847

847 personalized PDFs in under 8 minutes. Each one queried from a database, rendered from a template, generated as a PDF, emailed to the client, and archived in S3.

Adding concurrency

The single-threaded version processes one report per second. If you need to go faster, use concurrent.futures:

from concurrent.futures import ThreadPoolExecutor, as_completed

def process_one(report, pdf_gen, email, archiver, period_start):
    html = render_report(report)
    pdf_bytes = pdf_gen.generate(html)
    filename = f"statement-{report.account_number}-{period_start.strftime('%Y%m')}.pdf"

    email.send(
        to_email=report.client_email,
        subject=f"Your Account Statement — {period_start.strftime('%B %Y')}",
        body=f"Hi {report.client_name},\n\nYour statement is attached.\n\nBest,\nAcme Financial",
        attachment=pdf_bytes,
        filename=filename,
    )
    archiver.upload(
        key=f"{period_start.strftime('%Y/%m')}/{filename}",
        data=pdf_bytes,
    )
    return report.client_email

# In the main pipeline, replace the for loop with:
with ThreadPoolExecutor(max_workers=10) as pool:
    futures = {
        pool.submit(process_one, r, pdf_gen, email, archiver, period_start): r
        for r in reports
    }
    for future in as_completed(futures):
        report = futures[future]
        try:
            email_addr = future.result()
            sent += 1
            log.info(f"  [{sent}/{len(reports)}] Sent to {email_addr}")
        except Exception as e:
            failed += 1
            log.error(f"  FAILED {report.client_name}: {e}")

With 10 workers, the same 847 reports finish in about 90 seconds instead of 8 minutes. Don't go above 10-15 concurrent workers — you'll hit SMTP rate limits or the PDF API's concurrency cap for your plan.

Testing without a database

For local development, mock the data layer:

# test_pipeline.py
from datetime import date, timedelta
from decimal import Decimal
from pipeline.query import ClientReport, Transaction
from pipeline.render import render_report
from pipeline.generate import PDFGenerator

def make_test_report() -> ClientReport:
    return ClientReport(
        client_id=1,
        client_name="Test User",
        client_email="test@example.com",
        account_number="ACC-001234",
        period_start=date(2026, 3, 1),
        period_end=date(2026, 3, 31),
        transactions=[
            Transaction(date(2026, 3, 5), "Direct deposit", Decimal("3200.00"), "Income"),
            Transaction(date(2026, 3, 8), "Grocery store", Decimal("-89.42"), "Food"),
            Transaction(date(2026, 3, 12), "Electric bill", Decimal("-142.00"), "Utilities"),
            Transaction(date(2026, 3, 15), "Freelance payment", Decimal("750.00"), "Income"),
            Transaction(date(2026, 3, 20), "Restaurant", Decimal("-67.30"), "Food"),
            Transaction(date(2026, 3, 25), "Gas station", Decimal("-45.18"), "Transport"),
        ],
    )

if __name__ == "__main__":
    report = make_test_report()
    html = render_report(report)

    # Save HTML for browser preview
    with open("test_statement.html", "w") as f:
        f.write(html)
    print("Saved test_statement.html — open in browser to preview")

    # Generate PDF if API key is set
    import os
    api_key = os.environ.get("LIGHTNINGPDF_KEY")
    if api_key:
        gen = PDFGenerator(api_key)
        pdf = gen.generate(html)
        with open("test_statement.pdf", "wb") as f:
            f.write(pdf)
        print(f"Saved test_statement.pdf ({len(pdf)} bytes)")
    else:
        print("Set LIGHTNINGPDF_KEY to generate a PDF")

This lets you iterate on the HTML template by opening it in a browser, then generate the actual PDF once the layout looks right.

Project structure

The final file layout:

doc-pipeline/
  pipeline/
    __init__.py
    query.py       # Stage 1: database queries
    render.py      # Stage 2: Jinja2 rendering
    generate.py    # Stage 3: PDF API client
    deliver.py     # Stage 4: email + S3
    main.py        # Orchestrator
    templates/
      account_statement.html
  test_pipeline.py
  requirements.txt

And requirements.txt:

requests>=2.31
jinja2>=3.1
psycopg2-binary>=2.9
boto3>=1.34

Four files, four stages, one clear flow. Each stage can be unit-tested independently. The orchestrator ties them together and handles logging. The LightningPDF API handles the hard part — turning HTML into a properly rendered PDF with correct fonts, page breaks, and print styles — so your pipeline stays focused on data and business logic.

Grab an API key at lightningpdf.dev to try it. The free tier gives you 100 PDFs per month, which is plenty to build and test the whole pipeline before you scale it up.

L

LightningPDF Team

Building fast, reliable PDF generation tools for developers.

Ready to generate PDFs?

Start free with 100 PDFs per month. No credit card required.

Get Started Free