Documentation Index
Fetch the complete documentation index at: https://docs.gp.scale.com/llms.txt
Use this file to discover all available pages before exploring further.
Extract structured data from documents using Pydantic schemas and LLM models. Extract from single parse results or use vector stores for RAG-enhanced extraction on large documents.
extract_result = await parse_result.extract(
extraction_schema=MySchema,
user_prompt="Extract the data",
model="openai/gpt-4o",
generate_citations=True,
generate_confidence=True,
)
For large documents or multi-file analysis, use RAG-enhanced extraction from a vector store. This improves accuracy by retrieving relevant chunks before extraction.
result = await vector_store.extract(
extraction_schema=MySchema,
user_prompt="Extract from all documents",
model="openai/gpt-4o",
)
result = await parse_result.extract(
extraction_schema=Schema,
user_prompt="Extract data",
model="openai/gpt-4o",
generate_citations=True, # Get source locations
)
for field_name, field in result.result.data.items():
if field.citations:
print(f"{field_name} found on page {field.citations[0].page}")
Filter Low Confidence Results
result = await parse_result.extract(
extraction_schema=Schema,
user_prompt="Extract data",
model="openai/gpt-4o",
generate_confidence=True,
)
high_confidence = {
k: v for k, v in result.result.data.items()
if v.confidence and v.confidence > 0.8
}
Start extract jobs for multiple documents, then monitor and collect results. This pattern gives you control over job status and lets you handle failures per document.
import asyncio
# Start extract jobs for all parse results (returns immediately)
extract_parameters = ExtractionParameters(
model="openai/gpt-4o",
extraction_schema=InvoiceData.model_json_schema(),
user_prompt="Extract invoice details.",
generate_citations=True,
generate_confidence=True,
)
extract_jobs = [
await parse_result.start_extract_job(extract_parameters)
for parse_result in parse_results
]
# Monitor jobs until all complete
while any(
job.data.status not in [JobStatus.SUCCEEDED, JobStatus.FAILED]
for job in extract_jobs
):
await asyncio.sleep(1)
await asyncio.gather(*[job.refresh() for job in extract_jobs])
# Collect results from successful jobs
extractions = []
for i, job in enumerate(extract_jobs):
if job.data.status == JobStatus.SUCCEEDED:
extraction = await job.get_result()
extractions.append(extraction)
print(f"Document {i+1}: {extraction.result.data}")
else:
print(f"Document {i+1} failed: {job.data.error}")
To wait for all jobs and collect results in one call, use asyncio.gather with get_result:
# Start all jobs in parallel
extract_jobs = await asyncio.gather(*[
parse_result.start_extract_job(extract_parameters)
for parse_result in parse_results
])
# Wait for all to complete and collect results
extractions = await asyncio.gather(*[job.get_result() for job in extract_jobs])
for i, extraction in enumerate(extractions):
print(f"Document {i+1}: {extraction.result.data}")
Appendix: Essential Imports
from pydantic import BaseModel, Field
from dex_sdk.types import ExtractionParameters, JobStatus
Next Steps