Build a Document Pipeline in Python: From Database to PDF
A complete tutorial for building a Python document pipeline that queries a database, formats data with Jinja2, generates PDFs via API, and delivers them via email or S3.
I needed to generate 2,000 personalized reports from a PostgreSQL database every Monday morning. Each report pulled different data for a different client, used a shared HTML template, and had to land in the client's inbox by 9 AM.
The first version was a mess — a 400-line script with SQL queries mixed into string concatenation, error handling via try/except: pass, and a Gmail SMTP connection that dropped after 500 sends. It took 45 minutes to run and failed silently about 10% of the time.
The second version was 120 lines, ran in 8 minutes, and never failed silently. Here's how to build it.
What we're building
A Python pipeline with four stages:
- Query — Pull data from a database
- Render — Inject data into an HTML template using Jinja2
- Generate — Convert HTML to PDF via LightningPDF's API
- Deliver — Email the PDF or upload to S3
Each stage is a function. You can test them independently, swap implementations, and add retry logic where it matters.
Project setup
mkdir doc-pipeline && cd doc-pipeline
python -m venv venv
source venv/bin/activate
pip install requests jinja2 psycopg2-binary boto3
The dependencies:
requests— HTTP client for the PDF APIjinja2— template enginepsycopg2-binary— PostgreSQL driver (swap formysql-connector-python,sqlite3, etc.)boto3— AWS S3 uploads (optional, for archiving)
Stage 1: Query the database
Let's say you're generating monthly account statements. Each client gets a summary of their transactions for the previous month.
# pipeline/query.py
import psycopg2
from dataclasses import dataclass, field
from datetime import date
from decimal import Decimal
@dataclass
class Transaction:
date: date
description: str
amount: Decimal
category: str
@dataclass
class ClientReport:
client_id: int
client_name: str
client_email: str
account_number: str
period_start: date
period_end: date
transactions: list[Transaction] = field(default_factory=list)
@property
def total_credits(self) -> Decimal:
return sum(t.amount for t in self.transactions if t.amount > 0)
@property
def total_debits(self) -> Decimal:
return sum(t.amount for t in self.transactions if t.amount < 0)
@property
def net(self) -> Decimal:
return self.total_credits + self.total_debits
def fetch_client_reports(dsn: str, period_start: date, period_end: date) -> list[ClientReport]:
conn = psycopg2.connect(dsn)
cursor = conn.cursor()
# Get all active clients
cursor.execute("""
SELECT id, name, email, account_number
FROM clients
WHERE active = true
ORDER BY name
""")
clients = cursor.fetchall()
reports = []
for client_id, name, email, account_num in clients:
# Fetch transactions for this client in the period
cursor.execute("""
SELECT txn_date, description, amount, category
FROM transactions
WHERE client_id = %s
AND txn_date >= %s
AND txn_date <= %s
ORDER BY txn_date
""", (client_id, period_start, period_end))
transactions = [
Transaction(
date=row[0],
description=row[1],
amount=Decimal(str(row[2])),
category=row[3],
)
for row in cursor.fetchall()
]
# Skip clients with no transactions
if not transactions:
continue
reports.append(ClientReport(
client_id=client_id,
client_name=name,
client_email=email,
account_number=account_num,
period_start=period_start,
period_end=period_end,
transactions=transactions,
))
conn.close()
return reports
A few things to note:
One query per client vs. one big query. For 2,000 clients, running 2,000 individual queries is slow. In production, I'd use a single query with a JOIN and group in Python:
def fetch_all_reports_fast(dsn: str, period_start: date, period_end: date) -> list[ClientReport]:
conn = psycopg2.connect(dsn)
cursor = conn.cursor()
cursor.execute("""
SELECT
c.id, c.name, c.email, c.account_number,
t.txn_date, t.description, t.amount, t.category
FROM clients c
JOIN transactions t ON t.client_id = c.id
WHERE c.active = true
AND t.txn_date >= %s
AND t.txn_date <= %s
ORDER BY c.id, t.txn_date
""", (period_start, period_end))
reports = {}
for row in cursor.fetchall():
cid = row[0]
if cid not in reports:
reports[cid] = ClientReport(
client_id=cid,
client_name=row[1],
client_email=row[2],
account_number=row[3],
period_start=period_start,
period_end=period_end,
)
reports[cid].transactions.append(Transaction(
date=row[4],
description=row[5],
amount=Decimal(str(row[6])),
category=row[7],
))
conn.close()
return list(reports.values())
This runs one query instead of 2,001. On a 50k-row transactions table, it finishes in ~200ms instead of ~8 seconds.
Use Decimal for money. Always. float(0.1) + float(0.2) == 0.30000000000000004. Your finance team will not find this funny.
Stage 2: Render HTML with Jinja2
Create a template that turns a ClientReport into styled HTML.
# pipeline/render.py
from jinja2 import Environment, FileSystemLoader
from pathlib import Path
# Load templates from a directory
env = Environment(
loader=FileSystemLoader(Path(__file__).parent / "templates"),
autoescape=True,
)
def render_report(report) -> str:
template = env.get_template("account_statement.html")
return template.render(report=report)
And the template itself:
{# pipeline/templates/account_statement.html #}
<!DOCTYPE html>
<html>
<head>
<style>
* { margin: 0; padding: 0; box-sizing: border-box; }
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', sans-serif;
color: #1e293b;
font-size: 12px;
line-height: 1.5;
padding: 40px;
}
.header {
display: flex;
justify-content: space-between;
align-items: flex-start;
border-bottom: 3px solid #0f172a;
padding-bottom: 16px;
margin-bottom: 24px;
}
.header h1 { font-size: 22px; color: #0f172a; }
.header .period { color: #64748b; font-size: 13px; }
.client-info {
background: #f8fafc;
padding: 16px;
border-radius: 6px;
margin-bottom: 24px;
}
.client-info h2 { font-size: 16px; margin-bottom: 4px; }
.client-info p { color: #64748b; }
.summary {
display: flex;
gap: 16px;
margin-bottom: 24px;
}
.summary-box {
flex: 1;
padding: 16px;
border-radius: 6px;
text-align: center;
}
.summary-box.credits { background: #f0fdf4; border: 1px solid #bbf7d0; }
.summary-box.debits { background: #fef2f2; border: 1px solid #fecaca; }
.summary-box.net { background: #eff6ff; border: 1px solid #bfdbfe; }
.summary-box .label { font-size: 11px; text-transform: uppercase; color: #64748b; }
.summary-box .amount { font-size: 20px; font-weight: 700; margin-top: 4px; }
table { width: 100%; border-collapse: collapse; }
thead th {
text-align: left;
padding: 8px 10px;
font-size: 10px;
text-transform: uppercase;
color: #94a3b8;
border-bottom: 2px solid #e2e8f0;
}
thead th:nth-child(3) { text-align: right; }
tbody td { padding: 7px 10px; border-bottom: 1px solid #f1f5f9; }
tbody td:nth-child(3) { text-align: right; font-family: 'SF Mono', monospace; }
.credit { color: #16a34a; }
.debit { color: #dc2626; }
.footer {
margin-top: 32px;
padding-top: 16px;
border-top: 1px solid #e2e8f0;
color: #94a3b8;
font-size: 10px;
text-align: center;
}
@media print {
body { padding: 0; }
}
</style>
</head>
<body>
<div class="header">
<div>
<h1>Account Statement</h1>
<p>Acme Financial Services</p>
</div>
<div style="text-align: right;">
<p class="period">
{{ report.period_start.strftime('%B %d, %Y') }} —
{{ report.period_end.strftime('%B %d, %Y') }}
</p>
<p style="color: #94a3b8; font-size: 11px;">
Account {{ report.account_number }}
</p>
</div>
</div>
<div class="client-info">
<h2>{{ report.client_name }}</h2>
<p>{{ report.client_email }}</p>
</div>
<div class="summary">
<div class="summary-box credits">
<div class="label">Credits</div>
<div class="amount credit">${{ "%.2f"|format(report.total_credits) }}</div>
</div>
<div class="summary-box debits">
<div class="label">Debits</div>
<div class="amount debit">${{ "%.2f"|format(report.total_debits|abs) }}</div>
</div>
<div class="summary-box net">
<div class="label">Net</div>
<div class="amount">${{ "%.2f"|format(report.net) }}</div>
</div>
</div>
<table>
<thead>
<tr>
<th>Date</th>
<th>Description</th>
<th>Amount</th>
<th>Category</th>
</tr>
</thead>
<tbody>
{% for txn in report.transactions %}
<tr>
<td>{{ txn.date.strftime('%b %d') }}</td>
<td>{{ txn.description }}</td>
<td class="{{ 'credit' if txn.amount > 0 else 'debit' }}">
{{ "$%.2f"|format(txn.amount) }}
</td>
<td>{{ txn.category }}</td>
</tr>
{% endfor %}
</tbody>
</table>
<div class="footer">
<p>This statement was generated automatically on {{ now.strftime('%B %d, %Y at %I:%M %p') }}.</p>
<p>For questions, contact support@acmefinancial.com</p>
</div>
</body>
</html>
Update the render function to pass the current datetime:
from datetime import datetime
def render_report(report) -> str:
template = env.get_template("account_statement.html")
return template.render(report=report, now=datetime.now())
Stage 3: Generate PDFs
Send the rendered HTML to LightningPDF's API and get back PDF bytes.
# pipeline/generate.py
import requests
import time
from typing import Optional
API_URL = "https://lightningpdf.dev/api/v1/pdf/generate"
class PDFGenerator:
def __init__(self, api_key: str, timeout: int = 30):
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json",
})
self.timeout = timeout
def generate(self, html: str, retries: int = 2) -> bytes:
payload = {
"html": html,
"options": {
"format": "A4",
"print_background": True,
"margin": {
"top": "0.4in",
"right": "0.4in",
"bottom": "0.4in",
"left": "0.4in",
}
}
}
last_error: Optional[Exception] = None
for attempt in range(retries + 1):
try:
resp = self.session.post(API_URL, json=payload, timeout=self.timeout)
resp.raise_for_status()
return resp.content
except requests.exceptions.RequestException as e:
last_error = e
if attempt < retries:
wait = 2 ** attempt # 1s, 2s
time.sleep(wait)
raise RuntimeError(f"PDF generation failed after {retries + 1} attempts: {last_error}")
I'm using a requests.Session instead of plain requests.post. The session reuses TCP connections, which saves ~50ms per request when generating hundreds of PDFs. Over 2,000 documents, that's 100 seconds saved.
The retry logic uses exponential backoff. Network blips happen. A 1-second retry catches most transient failures without hammering the API.
Stage 4: Deliver
Two options: email and S3 archival. In production, I do both — email the client, archive in S3.
# pipeline/deliver.py
import smtplib
import boto3
from email.mime.multipart import MIMEMultipart
from email.mime.base import MIMEBase
from email.mime.text import MIMEText
from email import encoders
from pathlib import Path
class EmailDelivery:
def __init__(self, smtp_host: str, smtp_port: int, username: str, password: str):
self.smtp_host = smtp_host
self.smtp_port = smtp_port
self.username = username
self.password = password
def send(self, to_email: str, subject: str, body: str,
attachment: bytes, filename: str):
msg = MIMEMultipart()
msg["From"] = self.username
msg["To"] = to_email
msg["Subject"] = subject
msg.attach(MIMEText(body, "plain"))
part = MIMEBase("application", "pdf")
part.set_payload(attachment)
encoders.encode_base64(part)
part.add_header("Content-Disposition", f"attachment; filename={filename}")
msg.attach(part)
with smtplib.SMTP(self.smtp_host, self.smtp_port) as server:
server.starttls()
server.login(self.username, self.password)
server.send_message(msg)
class S3Archiver:
def __init__(self, bucket: str, prefix: str = "reports"):
self.s3 = boto3.client("s3")
self.bucket = bucket
self.prefix = prefix
def upload(self, key: str, data: bytes, metadata: dict = None):
self.s3.put_object(
Bucket=self.bucket,
Key=f"{self.prefix}/{key}",
Body=data,
ContentType="application/pdf",
Metadata=metadata or {},
)
The orchestrator
Now wire the four stages together:
# pipeline/main.py
import os
import logging
from datetime import date, timedelta
from dataclasses import dataclass
from pipeline.query import fetch_all_reports_fast
from pipeline.render import render_report
from pipeline.generate import PDFGenerator
from pipeline.deliver import EmailDelivery, S3Archiver
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
log = logging.getLogger(__name__)
@dataclass
class PipelineConfig:
db_dsn: str
pdf_api_key: str
smtp_host: str
smtp_port: int
smtp_user: str
smtp_pass: str
s3_bucket: str
def load_config() -> PipelineConfig:
return PipelineConfig(
db_dsn=os.environ["DATABASE_URL"],
pdf_api_key=os.environ["LIGHTNINGPDF_KEY"],
smtp_host=os.environ.get("SMTP_HOST", "smtp.example.com"),
smtp_port=int(os.environ.get("SMTP_PORT", "587")),
smtp_user=os.environ["SMTP_USER"],
smtp_pass=os.environ["SMTP_PASS"],
s3_bucket=os.environ.get("S3_BUCKET", "client-reports"),
)
def run_pipeline():
config = load_config()
# Calculate the previous month
today = date.today()
period_end = today.replace(day=1) - timedelta(days=1)
period_start = period_end.replace(day=1)
log.info(f"Generating reports for {period_start} to {period_end}")
# Stage 1: Query
reports = fetch_all_reports_fast(config.db_dsn, period_start, period_end)
log.info(f"Found {len(reports)} clients with transactions")
# Initialize services
pdf_gen = PDFGenerator(config.pdf_api_key)
email = EmailDelivery(config.smtp_host, config.smtp_port,
config.smtp_user, config.smtp_pass)
archiver = S3Archiver(config.s3_bucket)
sent = 0
failed = 0
for report in reports:
try:
# Stage 2: Render
html = render_report(report)
# Stage 3: Generate PDF
pdf_bytes = pdf_gen.generate(html)
# Stage 4: Deliver
filename = f"statement-{report.account_number}-{period_start.strftime('%Y%m')}.pdf"
email.send(
to_email=report.client_email,
subject=f"Your Account Statement — {period_start.strftime('%B %Y')}",
body=f"Hi {report.client_name},\n\nPlease find your account statement for "
f"{period_start.strftime('%B %Y')} attached.\n\nBest regards,\nAcme Financial",
attachment=pdf_bytes,
filename=filename,
)
archiver.upload(
key=f"{period_start.strftime('%Y/%m')}/{filename}",
data=pdf_bytes,
metadata={
"client_id": str(report.client_id),
"period": period_start.strftime("%Y-%m"),
},
)
sent += 1
log.info(f" [{sent}/{len(reports)}] Sent to {report.client_email}")
except Exception as e:
failed += 1
log.error(f" FAILED {report.client_name}: {e}")
log.info(f"Done: {sent} sent, {failed} failed out of {len(reports)}")
if __name__ == "__main__":
run_pipeline()
Running it
Set your environment variables and run:
export DATABASE_URL="postgresql://user:pass@localhost:5432/acme"
export LIGHTNINGPDF_KEY="lpdf_your_api_key"
export SMTP_USER="reports@acmefinancial.com"
export SMTP_PASS="smtp_password"
export SMTP_HOST="smtp.resend.com"
export SMTP_PORT="587"
export S3_BUCKET="acme-client-reports"
python -m pipeline.main
Output:
2026-04-01 09:00:01 INFO Generating reports for 2026-03-01 to 2026-03-31
2026-04-01 09:00:01 INFO Found 847 clients with transactions
2026-04-01 09:00:02 INFO [1/847] Sent to alice@example.com
2026-04-01 09:00:03 INFO [2/847] Sent to bob@example.com
...
2026-04-01 09:07:44 INFO Done: 843 sent, 4 failed out of 847
847 personalized PDFs in under 8 minutes. Each one queried from a database, rendered from a template, generated as a PDF, emailed to the client, and archived in S3.
Adding concurrency
The single-threaded version processes one report per second. If you need to go faster, use concurrent.futures:
from concurrent.futures import ThreadPoolExecutor, as_completed
def process_one(report, pdf_gen, email, archiver, period_start):
html = render_report(report)
pdf_bytes = pdf_gen.generate(html)
filename = f"statement-{report.account_number}-{period_start.strftime('%Y%m')}.pdf"
email.send(
to_email=report.client_email,
subject=f"Your Account Statement — {period_start.strftime('%B %Y')}",
body=f"Hi {report.client_name},\n\nYour statement is attached.\n\nBest,\nAcme Financial",
attachment=pdf_bytes,
filename=filename,
)
archiver.upload(
key=f"{period_start.strftime('%Y/%m')}/{filename}",
data=pdf_bytes,
)
return report.client_email
# In the main pipeline, replace the for loop with:
with ThreadPoolExecutor(max_workers=10) as pool:
futures = {
pool.submit(process_one, r, pdf_gen, email, archiver, period_start): r
for r in reports
}
for future in as_completed(futures):
report = futures[future]
try:
email_addr = future.result()
sent += 1
log.info(f" [{sent}/{len(reports)}] Sent to {email_addr}")
except Exception as e:
failed += 1
log.error(f" FAILED {report.client_name}: {e}")
With 10 workers, the same 847 reports finish in about 90 seconds instead of 8 minutes. Don't go above 10-15 concurrent workers — you'll hit SMTP rate limits or the PDF API's concurrency cap for your plan.
Testing without a database
For local development, mock the data layer:
# test_pipeline.py
from datetime import date, timedelta
from decimal import Decimal
from pipeline.query import ClientReport, Transaction
from pipeline.render import render_report
from pipeline.generate import PDFGenerator
def make_test_report() -> ClientReport:
return ClientReport(
client_id=1,
client_name="Test User",
client_email="test@example.com",
account_number="ACC-001234",
period_start=date(2026, 3, 1),
period_end=date(2026, 3, 31),
transactions=[
Transaction(date(2026, 3, 5), "Direct deposit", Decimal("3200.00"), "Income"),
Transaction(date(2026, 3, 8), "Grocery store", Decimal("-89.42"), "Food"),
Transaction(date(2026, 3, 12), "Electric bill", Decimal("-142.00"), "Utilities"),
Transaction(date(2026, 3, 15), "Freelance payment", Decimal("750.00"), "Income"),
Transaction(date(2026, 3, 20), "Restaurant", Decimal("-67.30"), "Food"),
Transaction(date(2026, 3, 25), "Gas station", Decimal("-45.18"), "Transport"),
],
)
if __name__ == "__main__":
report = make_test_report()
html = render_report(report)
# Save HTML for browser preview
with open("test_statement.html", "w") as f:
f.write(html)
print("Saved test_statement.html — open in browser to preview")
# Generate PDF if API key is set
import os
api_key = os.environ.get("LIGHTNINGPDF_KEY")
if api_key:
gen = PDFGenerator(api_key)
pdf = gen.generate(html)
with open("test_statement.pdf", "wb") as f:
f.write(pdf)
print(f"Saved test_statement.pdf ({len(pdf)} bytes)")
else:
print("Set LIGHTNINGPDF_KEY to generate a PDF")
This lets you iterate on the HTML template by opening it in a browser, then generate the actual PDF once the layout looks right.
Project structure
The final file layout:
doc-pipeline/
pipeline/
__init__.py
query.py # Stage 1: database queries
render.py # Stage 2: Jinja2 rendering
generate.py # Stage 3: PDF API client
deliver.py # Stage 4: email + S3
main.py # Orchestrator
templates/
account_statement.html
test_pipeline.py
requirements.txt
And requirements.txt:
requests>=2.31
jinja2>=3.1
psycopg2-binary>=2.9
boto3>=1.34
Four files, four stages, one clear flow. Each stage can be unit-tested independently. The orchestrator ties them together and handles logging. The LightningPDF API handles the hard part — turning HTML into a properly rendered PDF with correct fonts, page breaks, and print styles — so your pipeline stays focused on data and business logic.
Grab an API key at lightningpdf.dev to try it. The free tier gives you 100 PDFs per month, which is plenty to build and test the whole pipeline before you scale it up.
LightningPDF Team
Building fast, reliable PDF generation tools for developers.