Invoice Data Extraction: How to Get JSON via GPT-4o Vision
Ship a production-ready, template-less invoice data extractor: jsonschema-validated JSON, retryable GPT-4o Vision, OCR fallback, and idempotent Postgres upserts with logs.
Alright, this is going to be a long one. I'm going to show you how to build something that actually works. An invoice extraction pipeline that takes those messy PDFs and images and turns them into clean, validated JSON. The whole thing runs in Colab, persists to Postgres, and logs everything so you can see what's happening under the hood.
I've been working on this kind of system for a while now, and let me tell you, it's way trickier than it looks at first. You've got GPT-4o Vision doing the heavy lifting, sure, but when that fails (and trust me, it will), we fall back to good old OCR. Plus, we validate everything against a strict schema and fix errors automatically. Here's the kicker: the whole thing is idempotent, so you can run it multiple times without creating duplicates. That's something I learned the hard way in a previous project where duplicate processing was eating up our database.

Prerequisites: Grab yourself an OpenAI API key and a Postgres DSN. You can get a free-tier managed instance from Neon or ElephantSQL if you don't have one lying around. Should take about 30 minutes to run through everything. Just so we're clear, this tutorial is about the extraction and validation pipeline. Deployment and UI? That's a whole other story that I'll probably tackle in another article.
How It Works (High-Level Overview)
File ingestion – Load up those PDF or image invoices and convert them to images
Image preprocessing – Maybe resize and enhance them for OCR if needed
Vision extraction – Hit GPT-4o Vision to pull out structured JSON
Schema validation – Check against JSON Schema and collect every single error
Repair loop – When validation fails, send those errors back to the model with retry logic
Numeric consistency check – Make sure line item totals actually match the subtotal and tax
OCR fallback – If vision craps out, preprocess the image, run Tesseract, extract from text
Postgres upsert – Save document and line items idempotently using hash
Logging – Track latency, token usage, errors for every single run
Verification – Query your results and make sure idempotency works on re-run
Setup: Install Dependencies
First things first, let's get the Python packages installed:
!pip install --quiet openai pytesseract Pillow opencv-python pdf2image jsonschema psycopg2-binary tenacity python-dotenv
Now the system dependencies for Tesseract and Poppler. You need these for PDF rendering:
!apt-get update && apt-get install -y tesseract-ocr poppler-utils
Configure Credentials
Time to set up your API key and database connection:
import os
from getpass import getpass
if "OPENAI_API_KEY" not in os.environ:
os.environ["OPENAI_API_KEY"] = getpass("Enter your OpenAI API key: ")
if "POSTGRES_DSN" not in os.environ:
os.environ["POSTGRES_DSN"] = getpass("Enter your Postgres DSN (postgresql://user:pass@host:port/db): ")
# Verify credentials are set
assert os.environ.get("OPENAI_API_KEY"), "OPENAI_API_KEY is required"
assert os.environ.get("POSTGRES_DSN"), "POSTGRES_DSN is required"
print("✓ Credentials configured")
Actually, if you're using Colab, there's another way that I've found pretty handy. You can use from google.colab import userdata and stick your secrets in Colab's secret manager. Works pretty well actually, and you don't have to worry about accidentally committing your keys to GitHub.
Initialize Database Schema
Let's create the tables for documents, line items, and processing logs:
import psycopg2
conn = psycopg2.connect(os.environ["POSTGRES_DSN"])
cur = conn.cursor()
cur.execute("""
CREATE TABLE IF NOT EXISTS documents (
id SERIAL PRIMARY KEY,
document_hash TEXT UNIQUE NOT NULL,
filename TEXT,
vendor TEXT,
invoice_number TEXT,
invoice_date DATE,
currency TEXT,
subtotal NUMERIC(12,2),
tax NUMERIC(12,2),
total NUMERIC(12,2),
data JSONB,
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW()
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS line_items (
id SERIAL PRIMARY KEY,
document_id INTEGER REFERENCES documents(id) ON DELETE CASCADE,
line_number INTEGER,
description TEXT,
quantity NUMERIC(12,3),
unit_price NUMERIC(12,2),
line_total NUMERIC(12,2)
)
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS processing_logs (
id SERIAL PRIMARY KEY,
document_hash TEXT,
request_id TEXT,
status TEXT,
latency_ms INTEGER,
prompt_tokens INTEGER,
completion_tokens INTEGER,
total_tokens INTEGER,
error_message TEXT,
created_at TIMESTAMP DEFAULT NOW()
)
""")
conn.commit()
cur.close()
conn.close()
print("✓ Database schema initialized")
Provision Sample Invoices
Download some sample invoices to /content/invoices:
import os
import urllib.request
os.makedirs("/content/invoices", exist_ok=True)
samples = [
("https://templates.invoicehome.com/invoice-template-us-neat-750px.png", "sample1.png"),
("https://www.freshbooks.com/wp-content/uploads/2021/10/invoice-sample.jpg", "sample2.jpg"),
]
for url, filename in samples:
path = f"/content/invoices/{filename}"
if not os.path.exists(path):
urllib.request.urlretrieve(url, path)
print(f"✓ Downloaded {filename}")
else:
print(f"✓ {filename} already exists")
Note: Feel free to swap these out with your own invoice files. Actually, the more variety you throw at this thing, the better. I once tested with just one type of invoice and thought everything was perfect. Then production hit and, well, let's just say I learned about edge cases real quick.
Define the Invoice JSON Schema
Here's where we get strict. Required fields, specific types, the works:
invoice_schema = {
"$schema": "https://json-schema.org/draft/2020-12/schema",
"title": "Invoice",
"type": "object",
"required": ["vendor", "invoice_number", "invoice_date", "currency", "subtotal", "tax", "total", "line_items"],
"properties": {
"vendor": {"type": "string", "minLength": 1},
"invoice_number": {"type": "string", "pattern": "^[A-Za-z0-9._\-/]+$"},
"invoice_date": {"type": "string", "pattern": "^\\d{4}-\\d{2}-\\d{2}$"},
"due_date": {"type": "string", "pattern": "^\\d{4}-\\d{2}-\\d{2}$"},
"currency": {"type": "string", "pattern": "^[A-Z]{3}$"},
"subtotal": {"type": "number"},
"tax": {"type": "number"},
"total": {"type": "number"},
"vendor_address": {"type": "string"},
"customer": {"type": "string"},
"customer_address": {"type": "string"},
"notes": {"type": "string"},
"line_items": {
"type": "array",
"minItems": 1,
"items": {
"type": "object",
"required": ["description", "quantity", "unit_price", "line_total"],
"properties": {
"description": {"type": "string", "minLength": 1},
"quantity": {"type": "number"},
"unit_price": {"type": "number"},
"line_total": {"type": "number"},
"sku": {"type": "string"}
},
"additionalProperties": False
}
}
},
"additionalProperties": False
}
Why this design: Setting additionalProperties: false is crucial. It stops the model from making stuff up, which believe me, it loves to do. And those patterns? They make sure dates and currency codes are actually valid. I spent way too much time debugging weird date formats before I added these.
Compute Document Hash for Idempotency
We hash each file to catch duplicates:
import hashlib
def compute_document_hash(file_path: str) -> str:
"""Compute SHA-256 hash of a file for idempotency."""
h = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
Why this design: This is how we avoid reprocessing the same file over and over. The hash enables those ON CONFLICT upserts later. Simple but effective.
Convert Files to Images
Handle both PDFs and regular image formats:
from PIL import Image
from pdf2image import convert_from_path
def file_to_images(file_path: str):
"""Convert a file to a list of PIL images."""
ext = os.path.splitext(file_path)[1].lower()
if ext in [".png", ".jpg", ".jpeg", ".webp", ".tiff", ".bmp"]:
return [Image.open(file_path).convert("RGB")]
elif ext == ".pdf":
pages = convert_from_path(file_path, dpi=300)
return [p.convert("RGB") for p in pages]
else:
raise ValueError(f"Unsupported file type: {ext}")
Why this design: DPI 300 is the sweet spot I've found. Good quality without blowing up your token costs. For multi-page PDFs, we're just doing the first page in this tutorial. You could extend it though. Actually, in one of my experiments, I tried processing all pages and the costs got... interesting.
Resize Images to Control Token Usage
Keep those API costs down by limiting image size:
def resize_image_for_vision(img: Image.Image, max_long_edge: int = 1600) -> Image.Image:
"""Resize image if longest edge exceeds max_long_edge."""
w, h = img.size
if max(w, h) <= max_long_edge:
return img
scale = max_long_edge / max(w, h)
new_w, new_h = int(w * scale), int(h * scale)
return img.resize((new_w, new_h), Image.LANCZOS)
Trade-off: Look, smaller images save money but they might hurt OCR accuracy on tiny fonts. I've played with max_long_edge a lot, and 2048 seems to work for most invoices. But honestly, you might need to tweak this based on what you're dealing with.
Preprocess Images for OCR
Make them more readable with grayscale, thresholding, and deskew:
import cv2
import numpy as np
def preprocess_for_ocr(pil_img: Image.Image) -> Image.Image:
"""Preprocess image for OCR by applying grayscale, thresholding, and deskew."""
img = np.array(pil_img)
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
th = cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
cv2.THRESH_BINARY, 35, 11)
# Deskew via moments (guard against empty image)
coords = np.column_stack(np.where(th > 0))
if len(coords) == 0:
return Image.fromarray(th)
angle = cv2.minAreaRect(coords)[-1]
if angle < -45:
angle = -(90 + angle)
else:
angle = -angle
(h, w) = th.shape[:2]
M = cv2.getRotationMatrix2D((w // 2, h // 2), angle, 1.0)
rotated = cv2.warpAffine(th, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
return Image.fromarray(rotated)
Why this design: Adaptive thresholding is great for uneven lighting. Deskew fixes rotation issues. And that guard? Prevents crashes when you accidentally feed it a blank image. Ask me how I know that one.
Run Tesseract OCR
Our fallback text extraction:
import pytesseract
def ocr_extract_text(pil_img: Image.Image) -> str:
"""Extract text from an image using Tesseract OCR."""
return pytesseract.image_to_string(pil_img, lang="eng", config="--psm 6 --oem 1")
Why this design: --psm 6 assumes you've got uniform text blocks. --oem 1 uses LSTM which, honestly, gives better accuracy most of the time. I tried all the different modes and this combo just works.
Convert Image to Data URL
Encode the image for the Vision API:
import base64
from io import BytesIO
def pil_to_data_url(img: Image.Image) -> str:
"""Convert a PIL image to a data URL."""
buf = BytesIO()
img.save(buf, format="PNG")
b64 = base64.b64encode(buf.getvalue()).decode("utf-8")
return f"data:image/png;base64,{b64}"
Define System Prompt and Few-Shot Examples
Guide the model with clear instructions and a solid example:
SYSTEM_PROMPT = (
"You are a careful invoicing parser. Extract fields strictly as JSON. "
"Do not include any text outside the JSON. If a value is missing, infer cautiously "
"from context; otherwise omit the field. Use ISO 8601 dates (YYYY-MM-DD) and ISO 4217 currency."
)
def schema_summary(schema: dict) -> str:
"""Summarize required fields and types for the prompt."""
req = schema.get("required", [])
props = schema.get("properties", {})
lines = ["Required fields and types:"]
for k in req:
t = props.get(k, {}).get("type", "any")
lines.append(f"- {k}: {t}")
lines.append("Line item fields: description (string), quantity (number), unit_price (number), line_total (number).")
return "\n".join(lines)
FEW_SHOT_USER = (
"Example: Extract JSON from this text-only invoice:\n"
"ACME Corp\nInvoice INV-123\nDate 2024-06-01\n"
"1x Widget A @ 10.00\n2x Widget B @ 5.00\nSubtotal 20.00\nTax 2.00\nTotal 22.00\n"
)
FEW_SHOT_ASSISTANT = """{
"vendor": "ACME Corp",
"invoice_number": "INV-123",
"invoice_date": "2024-06-01",
"currency": "USD",
"subtotal": 20.0,
"tax": 2.0,
"total": 22.0,
"line_items": [
{"description": "Widget A", "quantity": 1, "unit_price": 10.0, "line_total": 10.0},
{"description": "Widget B", "quantity": 2, "unit_price": 5.0, "line_total": 10.0}
]
}"""
Why this design: Few-shot examples are gold. They show the model exactly what format you want and really cut down on hallucinations. I used to skip this step. Big mistake.
Call GPT-4o Vision with Retry Logic
Extract invoice data from the image, with automatic retries when things go wrong:
import json
import time
from openai import OpenAI
from tenacity import retry, stop_after_attempt, wait_exponential
client = OpenAI()
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=8))
def call_gpt4o_vision(img: Image.Image, schema: dict):
"""Call GPT-4o Vision API to extract invoice data as JSON."""
data_url = pil_to_data_url(img)
start = time.time()
resp = client.chat.completions.create(
model="gpt-4o",
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": FEW_SHOT_USER},
{"role": "assistant", "content": FEW_SHOT_ASSISTANT},
{
"role": "user",
"content": [
{"type": "text", "text": "Extract an invoice as JSON. Follow this contract:\n" + schema_summary(schema)},
{"type": "image_url", "image_url": {"url": data_url}}
]
}
],
temperature=0.2
)
latency_ms = int((time.time() - start) * 1000)
text = resp.choices[0].message.content
usage = resp.usage
request_id = resp.id
return json.loads(text), latency_ms, usage, request_id
Why this design: That response_format=json_object is key. Forces JSON output every time. And the retry logic? Handles those annoying transient API errors that always seem to happen at the worst possible moment.
Validate Against JSON Schema
Collect all the validation errors for targeted repair:
from jsonschema import Draft202012Validator, ValidationError
def validate_against_schema(data: dict, schema: dict):
"""Validate data against the JSON schema and return all errors."""
validator = Draft202012Validator(schema)
errors = []
for e in validator.iter_errors(data):
path = "/".join(map(str, e.path)) or "$"
errors.append(f"path={path}: {e.message}")
return len(errors) == 0, errors
Why this design: Getting all errors at once is important. Gives the model the full picture of what needs fixing instead of playing whack-a-mole with individual issues.
Repair JSON with Targeted Feedback
Send those validation errors back to the model:
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=8))
def repair_with_gpt(img: Image.Image, schema: dict, prev: dict, errors: list):
"""Repair JSON data using GPT-4o Vision based on validation errors."""
data_url = pil_to_data_url(img)
prompt = "The previous JSON failed validation. Fix strictly per errors:\n" + "\n".join(f"- {err}" for err in errors)
resp = client.chat.completions.create(
model="gpt-4o",
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": FEW_SHOT_USER},
{"role": "assistant", "content": FEW_SHOT_ASSISTANT},
{"role": "user", "content": [
{"type": "text", "text": "Original image for reference:"},
{"type": "image_url", "image_url": {"url": data_url}}
]},
{"role": "user", "content": f"Previous JSON:\n{json.dumps(prev, ensure_ascii=False)}"},
{"role": "user", "content": prompt},
],
temperature=0.1
)
fixed = json.loads(resp.choices[0].message.content)
return fixed, resp
Why this design: Lower temperature (0.1) keeps the model from getting too creative during repairs. And showing the original image again? Keeps everything in context. The model needs to see what it's working with.
Check Numeric Consistency
Make sure the math actually adds up:
def numeric_consistency_ok(data: dict, tol: float = 0.05):
"""Check if numeric fields in the data are consistent within a tolerance."""
try:
subtotal = float(data.get("subtotal", 0))
tax = float(data.get("tax", 0))
total = float(data.get("total", 0))
li_sum = sum(float(it.get("line_total", 0)) for it in data.get("line_items", []))
return (abs(li_sum - subtotal) <= tol and
abs(subtotal + tax - total) <= tol)
except Exception:
return False
def build_numeric_errors(data: dict):
"""Build error messages for numeric inconsistencies."""
subtotal = data.get("subtotal", None)
tax = data.get("tax", None)
total = data.get("total", None)
li_sum = sum(float(it.get("line_total", 0)) for it in data.get("line_items", []))
msgs = []
msgs.append(f"Sum(line_items.line_total)={li_sum}, subtotal={subtotal}, tax={tax}, total={total}.")
msgs.append("Enforce: sum(line_items) ≈ subtotal; subtotal + tax ≈ total; update fields minimally to satisfy.")
return msgs
Why this design: The tolerance handles rounding issues. And those explicit error messages? They tell the model exactly what arithmetic to fix. I've seen some wild math from these models when they're left to their own devices.
Fallback to OCR and Text-Only Extraction
When vision fails, preprocess and extract from text:
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=8))
def call_gpt_text_only(ocr_text: str, schema: dict, extra_instructions: str = ""):
"""Call GPT-4o using OCR text to extract invoice data as JSON."""
resp = client.chat.completions.create(
model="gpt-4o",
response_format={"type": "json_object"},
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": FEW_SHOT_USER},
{"role": "assistant", "content": FEW_SHOT_ASSISTANT},
{"role": "user", "content": "Extract invoice JSON strictly per schema summary:\n" + schema_summary(schema)},
{"role": "user", "content": f"Invoice text:\n{ocr_text}"},
{"role": "user", "content": extra_instructions} if extra_instructions else {"role": "user", "content": ""}
],
temperature=0.2
)
data = json.loads(resp.choices[0].message.content)
return data, resp
Why this design: Passing numeric errors separately keeps the invoice text clean. Makes the repair signal much clearer. This whole fallback mechanism saved my bacon more than once.
Persist Results to Postgres
Upsert the document and replace line items idempotently:
def upsert_document(conn, doc_hash: str, filename: str, data: dict):
"""Upsert document data into Postgres and replace line items."""
with conn.cursor() as cur:
cur.execute("""
INSERT INTO documents (document_hash, filename, vendor, invoice_number, invoice_date, currency,
subtotal, tax, total, data, created_at, updated_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, NOW(), NOW())
ON CONFLICT (document_hash) DO UPDATE SET
filename = EXCLUDED.filename,
vendor = EXCLUDED.vendor,
invoice_number = EXCLUDED.invoice_number,
invoice_date = EXCLUDED.invoice_date,
currency = EXCLUDED.currency,
subtotal = EXCLUDED.subtotal,
tax = EXCLUDED.tax,
total = EXCLUDED.total,
data = EXCLUDED.data,
updated_at = NOW()
RETURNING id
""", (
doc_hash, filename,
data.get("vendor"),
data.get("invoice_number"),
data.get("invoice_date"),
data.get("currency"),
data.get("subtotal"),
data.get("tax"),
data.get("total"),
json.dumps(data),
))
doc_id = cur.fetchone()[0]
# Replace line items
cur.execute("DELETE FROM line_items WHERE document_id = %s", (doc_id,))
for idx, it in enumerate(data.get("line_items", []), start=1):
cur.execute("""
INSERT INTO line_items (document_id, line_number, description, quantity, unit_price, line_total)
VALUES (%s, %s, %s, %s, %s, %s)
""", (
doc_id, idx,
it.get("description"),
it.get("quantity"),
it.get("unit_price"),
it.get("line_total")
))
conn.commit()
return doc_id
Why this design: ON CONFLICT makes sure re-runs just update existing records. And deleting then reinserting line items? Keeps everything in sync. Not the most elegant solution maybe, but it works reliably.
Log Processing Metadata
Track everything for observability:
def log_processing(conn, doc_hash: str, request_id: str, status: str, latency_ms: int,
usage=None, error_message: str = None):
"""Log processing details into the database for observability."""
# Sanitize error message to avoid logging large payloads
if error_message and len(error_message) > 500:
error_message = error_message[:500] + "... (truncated)"
with conn.cursor() as cur:
cur.execute("""
INSERT INTO processing_logs (document_hash, request_id, status, latency_ms,
prompt_tokens, completion_tokens, total_tokens, error_message, created_at)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, NOW())
""", (
doc_hash,
request_id,
status,
latency_ms,
getattr(usage, "prompt_tokens", None) if usage else None,
getattr(usage, "completion_tokens", None) if usage else None,
getattr(usage, "total_tokens", None) if usage else None,
error_message
))
conn.commit()
Why this design: Truncating error messages is smart. Prevents PII leakage and keeps your logs from getting huge. Learned that one after accidentally logging full invoice text for a week.
Orchestrate the End-to-End Pipeline
Process a single invoice file through all the steps:
import glob
def process_invoice_file(file_path: str):
"""Process an invoice file through the full pipeline."""
doc_hash = compute_document_hash(file_path)
images = file_to_images(file_path)
first_img = resize_image_for_vision(images[0])
data = None
usage = None
request_id = None
status = "ok"
error_message = None
start_time = time.time()
try:
# Attempt Vision extraction
data, _, usage, request_id = call_gpt4o_vision(first_img, invoice_schema)
ok, errs = validate_against_schema(data, invoice_schema)
# Repair if validation fails
if not ok:
data, repair_resp = repair_with_gpt(first_img, invoice_schema, data, errs)
usage = repair_resp.usage
request_id = repair_resp.id
ok, errs = validate_against_schema(data, invoice_schema)
# Numeric consistency repair
if ok and not numeric_consistency_ok(data):
numeric_errs = build_numeric_errors(data)
data, repair_resp2 = repair_with_gpt(first_img, invoice_schema, data, numeric_errs)
usage = repair_resp2.usage
request_id = repair_resp2.id
ok, errs = validate_against_schema(data, invoice_schema)
# OCR fallback
if not ok:
pre = preprocess_for_ocr(first_img)
ocr_text = ocr_extract_text(pre)
data, text_resp = call_gpt_text_only(ocr_text, invoice_schema)
usage = text_resp.usage
request_id = text_resp.id
ok, errs = validate_against_schema(data, invoice_schema)
# Numeric repair for OCR path
if ok and not numeric_consistency_ok(data):
numeric_errs = build_numeric_errors(data)
data, repair_resp3 = call_gpt_text_only(ocr_text, invoice_schema, "\n".join(numeric_errs))
usage = repair_resp3.usage
request_id = repair_resp3.id
ok, errs = validate_against_schema(data, invoice_schema)
if not ok:
status = "failed"
error_message = "Validation failed after OCR fallback: " + "; ".join(errs)
# Persist if successful
latency_ms = int((time.time() - start_time) * 1000)
if status == "ok":
with psycopg2.connect(os.environ["POSTGRES_DSN"]) as conn:
upsert_document(conn, doc_hash, os.path.basename(file_path), data)
log_processing(conn, doc_hash, request_id, status, latency_ms, usage, None)
else:
with psycopg2.connect(os.environ["POSTGRES_DSN"]) as conn:
log_processing(conn, doc_hash, request_id, status, latency_ms, usage, error_message)
return {"file": file_path, "hash": doc_hash, "status": status, "errors": None if status == "ok" else error_message}
except Exception as e:
status = "error"
error_message = str(e)
latency_ms = int((time.time() - start_time) * 1000)
with psycopg2.connect(os.environ["POSTGRES_DSN"]) as conn:
log_processing(conn, doc_hash, request_id, status, latency_ms, usage, error_message)
return {"file": file_path, "hash": doc_hash, "status": status, "errors": error_message}
Why this design: Tracking start_time right at the top captures true end-to-end latency. Each path, whether it's vision, repair, or OCR, updates usage and request_id for accurate logging. You need this visibility when things go sideways.
Run the Pipeline on Sample Files
Process everything in your input directory:
input_dir = "/content/invoices"
files = sorted([p for p in glob.glob(os.path.join(input_dir, "*")) if os.path.isfile(p)])
results = []
for f in files:
res = process_invoice_file(f)
print(res)
results.append(res)
summary = {
"processed": len(results),
"ok": sum(1 for r in results if r["status"] == "ok"),
"failed": sum(1 for r in results if r["status"] == "failed"),
"error": sum(1 for r in results if r["status"] == "error"),
}
print("Summary:", summary)
Verify Idempotency
Run it again and make sure you don't get duplicates:
# Count documents before re-run
with psycopg2.connect(os.environ["POSTGRES_DSN"]) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM documents")
count_before = cur.fetchone()[0]
# Re-run processing
for f in files:
res = process_invoice_file(f)
print("Re-run:", res)
# Count documents after re-run
with psycopg2.connect(os.environ["POSTGRES_DSN"]) as conn:
with conn.cursor() as cur:
cur.execute("SELECT COUNT(*) FROM documents")
count_after = cur.fetchone()[0]
print(f"Documents before: {count_before}, after: {count_after}")
assert count_before == count_after, "Idempotency check failed: duplicate documents created"
print("✓ Idempotency verified")
Why this design: If the counts are equal, you know ON CONFLICT is working. Only thing that should change is updated_at. This check has caught so many bugs for me.
Query and Inspect Results
Take a look at what you extracted:
with psycopg2.connect(os.environ["POSTGRES_DSN"]) as conn:
with conn.cursor() as cur:
cur.execute("""
SELECT id, filename, vendor, invoice_number, invoice_date, currency, subtotal, tax, total
FROM documents ORDER BY id DESC LIMIT 5
""")
print("Documents:")
for row in cur.fetchall():
print(row)
cur.execute("""
SELECT d.invoice_number, li.line_number, li.description, li.quantity, li.unit_price, li.line_total
FROM line_items li
JOIN documents d ON li.document_id = d.id
ORDER BY d.id DESC, li.line_number ASC
LIMIT
Wrapping Up
So there you have it. A complete invoice extraction pipeline that actually handles the messy reality of real-world documents. It's not perfect, nothing ever is, but it's robust enough to handle most of what you'll throw at it. The combination of vision AI with OCR fallback, strict validation, and automatic repair gives you something you can actually rely on.
The best part? You can extend this in so many ways. Add support for multiple pages, integrate with your accounting system, build a nice UI on top. But start here, get this working, and then iterate. That's how I've always approached these systems, and it's served me well.