Skip to main content
Extract structured data from documents using Pydantic schemas and LLM models. Extract from single parse results or use vector stores for RAG-enhanced extraction on large documents.

Extract from Parse Result

extract_result = await parse_result.extract(
    extraction_schema=MySchema,
    user_prompt="Extract the data",
    model="openai/gpt-4o",
    generate_citations=True,
    generate_confidence=True,
)

Extract from Vector Store

For large documents or multi-file analysis, use RAG-enhanced extraction from a vector store. This improves accuracy by retrieving relevant chunks before extraction.
result = await vector_store.extract(
    extraction_schema=MySchema,
    user_prompt="Extract from all documents",
    model="openai/gpt-4o",
)

Extract with Citations

result = await parse_result.extract(
    extraction_schema=Schema,
    user_prompt="Extract data",
    model="openai/gpt-4o",
    generate_citations=True,  # Get source locations
)

for field_name, field in result.result.data.items():
    if field.citations:
        print(f"{field_name} found on page {field.citations[0].page}")

Filter Low Confidence Results

result = await parse_result.extract(
    extraction_schema=Schema,
    user_prompt="Extract data",
    model="openai/gpt-4o",
    generate_confidence=True,
)

high_confidence = {
    k: v for k, v in result.result.data.items()
    if v.confidence and v.confidence > 0.8
}

Batch Extraction

Start extract jobs for multiple documents, then monitor and collect results. This pattern gives you control over job status and lets you handle failures per document.
import asyncio

# Start extract jobs for all parse results (returns immediately)
extract_parameters = ExtractionParameters(
    model="openai/gpt-4o",
    extraction_schema=InvoiceData.model_json_schema(),
    user_prompt="Extract invoice details.",
    generate_citations=True,
    generate_confidence=True,
)

extract_jobs = [
    await parse_result.start_extract_job(extract_parameters)
    for parse_result in parse_results
]

# Monitor jobs until all complete
while any(
    job.data.status not in [JobStatus.SUCCEEDED, JobStatus.FAILED]
    for job in extract_jobs
):
    await asyncio.sleep(1)
    await asyncio.gather(*[job.refresh() for job in extract_jobs])

# Collect results from successful jobs
extractions = []
for i, job in enumerate(extract_jobs):
    if job.data.status == JobStatus.SUCCEEDED:
        extraction = await job.get_result()
        extractions.append(extraction)
        print(f"Document {i+1}: {extraction.result.data}")
    else:
        print(f"Document {i+1} failed: {job.data.error}")
To wait for all jobs and collect results in one call, use asyncio.gather with get_result:
# Start all jobs in parallel
extract_jobs = await asyncio.gather(*[
    parse_result.start_extract_job(extract_parameters)
    for parse_result in parse_results
])

# Wait for all to complete and collect results
extractions = await asyncio.gather(*[job.get_result() for job in extract_jobs])

for i, extraction in enumerate(extractions):
    print(f"Document {i+1}: {extraction.result.data}")

Appendix: Essential Imports

from pydantic import BaseModel, Field
from dex_sdk.types import ExtractionParameters, JobStatus

Next Steps