Skip to main content

SDK Methods

Complete reference for the DocuDevs Python SDK (docudevs-sdk).

Client Initialization

DocuDevsClient

Initialize the client with your API key.

from docudevs.docudevs_client import DocuDevsClient
import os

client = DocuDevsClient(
api_url="https://api.docudevs.ai", # Optional, defaults to production
token=os.getenv("DOCUDEVS_API_KEY") # Required
)

Schema Helpers

Convenience functions for building JSON Schema strings from Pydantic models or dicts.

json_schema

Convert a Pydantic model or dict to a JSON Schema string. Use for single-object extraction.

from docudevs import json_schema
from pydantic import BaseModel, Field

class Invoice(BaseModel):
number: str = Field(description="Invoice number")
total: float

# From a Pydantic model
schema = json_schema(Invoice)

# From a dict
schema = json_schema({"type": "object", "properties": {"name": {"type": "string"}}})

array_schema

Wrap a Pydantic model or dict as an array schema. Use for map-reduce extraction where each chunk produces multiple items.

from docudevs import array_schema
from pydantic import BaseModel

class LineItem(BaseModel):
sku: str
description: str
quantity: int

# Produces {"type": "array", "items": <LineItem schema>}
schema = array_schema(LineItem)

# From a dict
schema = array_schema({"type": "object", "properties": {"sku": {"type": "string"}}})

Both helpers accept:

  • Pydantic model classes — calls model_json_schema() automatically
  • Dicts — used as-is

Document Processing

submit_and_process_document

Upload and process a document for structured data extraction. Use extract_figures=True to store figure images and metadata. For fillable PDFs, acro_form_metadata=True stores AcroForm metadata as a separate job artifact that you can fetch later with get_acroform_metadata(...).

job_guid = await client.submit_and_process_document(
document=document_bytes,
document_mime_type="application/pdf",
prompt="Extract invoice data",
schema={...}, # Optional JSON schema
ocr="PREMIUM", # Optional: DEFAULT, PREMIUM, LOW
llm="HIGH", # Optional: DEFAULT, MINI, HIGH
extract_figures=True,
acro_form_metadata=True,
source_locations=True,
source_location_granularity="block",
)

submit_and_process_document_with_configuration

Process a document using a saved configuration.

job_guid = await client.submit_and_process_document_with_configuration(
document=document_bytes,
document_mime_type="application/pdf",
configuration_name="invoice-config"
)

submit_and_ocr_document

Process a document with OCR only (no structured extraction). Use extract_figures=True to store figure images and metadata.

job_guid = await client.submit_and_ocr_document(
document=document_bytes,
document_mime_type="application/pdf",
ocr="PREMIUM",
ocr_format="markdown", # markdown, plain, jsonl (for Excel)
describe_figures=True,
extract_figures=True
)

analyze_document

Analyze document structure and return a job GUID.

job_guid = await client.analyze_document(
document=document_bytes,
document_mime_type="application/pdf",
ocr="PREMIUM"
)

wait_until_ready

Wait for a job to complete and retrieve the result.

result = await client.wait_until_ready(
guid=job_guid,
timeout=180,
poll_interval=5.0,
result_format="json" # json, csv, excel, or None (legacy object)
)

Pipeline Processing

Pipeline mode uploads one document, runs one shared OCR pass, and executes a graph of nodes. Use the SDK builder so dependencies, source paths, and final outputs refer to previous nodes directly.

process_pipeline_document

Upload and process a document with a pipeline builder definition.

from docudevs import P, Pipeline

pipeline = Pipeline().ocr(mode="AUTO")
extract_data = pipeline.extract(
"extract_data",
source=P.ocr.content,
prompt="Extract the requested fields.",
schema={
"type": "object",
"properties": {"name": {"type": "string"}},
},
)
pipeline.final(
"final_result",
depends_on=[extract_data],
output=extract_data.result,
)

job_guid = await client.process_pipeline_document(
document=document_bytes,
document_mime_type="application/pdf",
pipeline=pipeline,
ocr="AUTO",
trace=True
)

result = await client.wait_until_ready(job_guid, result_format="json")
nodes = await client.get_pipeline_nodes(job_guid)

process_uploaded_pipeline_document

Run pipeline processing on a document that was already uploaded.

await client.process_uploaded_pipeline_document(
guid=uploaded_guid,
pipeline=pipeline,
mime_type="application/pdf",
ocr="AUTO"
)

save_pipeline_configuration and get_pipeline_configuration

Save and retrieve named pipeline configurations.

await client.save_pipeline_configuration(
"router-pipeline",
pipeline=pipeline,
mime_type="application/pdf",
ocr="AUTO"
)

saved_pipeline = await client.get_pipeline_configuration("router-pipeline")

get_pipeline_nodes

Read node-level runtime status from a pipeline job.

nodes = await client.get_pipeline_nodes(job_guid)
for node in nodes:
print(node["id"], node["status"])

For the full pipeline JSON contract, node types, and branching examples, see Pipeline Mode.

Batch Processing

submit_and_process_batch

Upload and process multiple documents as a batch.

batch_guid = await client.submit_and_process_batch(
documents=[file1_bytes, file2_bytes],
document_mime_type="application/pdf",
prompt="Extract data",
max_concurrency=5
)

Configurations

save_configuration

Save a named configuration.

from docudevs.models import UploadCommand

config = UploadCommand(
prompt="Extract invoice data",
ocr="PREMIUM"
)
await client.save_configuration("invoice-config", config)

list_configurations

List all saved configurations.

configs = await client.list_configurations()

get_configuration

Get details of a specific configuration.

config = await client.get_configuration("invoice-config")

delete_configuration

Delete a configuration.

await client.delete_configuration("invoice-config")

Templates

upload_template

Upload a new document template.

with open("form.pdf", "rb") as f:
response = await client.upload_template(
name="form-template",
document=f.read(),
file_name="form.pdf",
mime_type="application/pdf"
)

list_templates

List all available templates.

templates = await client.list_templates()

metadata

Get metadata (fields) for a template.

For PDF templates uploaded moments earlier, the raw endpoint may still be preparing metadata. In that case prefer wait_for_template_metadata(...).

meta = await client.metadata("form-template")

wait_for_template_metadata

Wait until PDF template metadata is ready and return the decoded field list.

fields = await client.wait_for_template_metadata(
"form-template",
timeout=60,
poll_interval=1,
)

fill

Fill a template with data.

from docudevs.models import TemplateFillRequest

request = TemplateFillRequest(fields={"name": "John"})
response = await client.fill("form-template", request)

fill_with_retry

Fill a template while retrying transient readiness errors that can happen immediately after upload.

request = TemplateFillRequest(fields={"name": "John"})
response = await client.fill_with_retry(
"form-template",
request,
timeout=30,
poll_interval=1,
)

delete_template

Delete a template.

await client.delete_template("form-template")

AcroForm PDF Metadata

extract_acroform_metadata

Upload a PDF directly to /document/acroform-metadata and return the rich AcroForm metadata structure without creating a job GUID.

metadata = await client.extract_acroform_metadata(
document=pdf_bytes,
file_name="fillable-form.pdf",
mime_type="application/pdf",
)

get_acroform_metadata

Fetch the stored AcroForm metadata artifact from an async processed job or a pdf-acroform operation job.

job_guid = await client.submit_and_process_document(
document=pdf_bytes,
document_mime_type="application/pdf",
acro_form_metadata=True,
)

metadata = await client.get_acroform_metadata(job_guid)

The direct helper is for metadata-only PDF uploads. The async helper is for workflows where the same job also needs source locations, rendered page images, extraction, or overlays.

For generated AcroForm PDFs, this metadata reflects the widgets embedded in the output PDF. Use get_pdf_acroform_field_definitions(...) when you need the normalized editable boxes that were used to generate those widgets.

Agent Chat

agent_chat

Send a chat message to the agent and receive a job GUID.

response = await client.agent_chat(
messages=[{"role": "user", "content": "Help me extract invoice line items"}],
session_id="session-123"
)

agent_status

Check the status of an agent chat job.

status = await client.agent_status(response["jobGuid"])

agent_chat_and_wait

Send a message and wait for the completed response.

result = await client.agent_chat_and_wait(
messages=[{"role": "user", "content": "Create a schema for insurance claims"}],
session_id="session-123"
)
print(result["response"]["message"])

Cases

create_case

Create a new case (collection of documents).

from docudevs.models import CreateCaseBody

case = await client.create_case(
body=CreateCaseBody(name="Q1 Invoices")
)

list_cases

List all cases.

cases = await client.list_cases()

upload_case_document

Upload a document to a case.

from docudevs.models import UploadCaseDocumentBody
from docudevs.types import File

await client.upload_case_document(
case_id=123,
body=UploadCaseDocumentBody(
document=File(payload=data, file_name="doc.pdf")
)
)

get_document_summary

Get the AI-generated summary for a specific document in a case.

result = await client.get_document_summary(case_id=123, document_id="doc-uuid")
summary = result.parsed
print(summary["filename"]) # "invoice.pdf"
print(summary["summary"]["document_type"]) # "invoice"

list_case_summaries

List AI-generated summaries for all documents in a case.

result = await client.list_case_summaries(case_id=123)
for doc in result.parsed:
print(f"{doc['filename']}: {doc['summary']['document_type']}")

Operations

submit_and_wait_for_error_analysis

Run error analysis on a completed job.

analysis = await client.submit_and_wait_for_error_analysis(job_guid)

submit_and_wait_for_generative_task

Run a generative AI task on a completed job.

task = await client.submit_and_wait_for_generative_task(
parent_job_id=job_guid,
prompt="Summarize this document",
model="DEFAULT"
)
# Result is in task.result (JSON string)

submit_and_wait_for_operation_with_parameters

Run an operation with custom parameters.

result = await client.submit_and_wait_for_operation_with_parameters(
job_guid=job_guid,
operation_type="error-analysis",
llm_type="HIGH",
custom_parameters={"focus": "dates"}
)

submit_and_wait_for_image_selection

Select relevant figures from a completed job that extracted figures.

selection = await client.submit_and_wait_for_image_selection(
job_guid,
prompt="Return all diagrams from the document",
top_k=5,
match_mode="all",
use_vision=False
)

import json
selection_payload = json.loads(selection.result)
selected = selection_payload.get("selected", [])

submit_pdf_acroform_operation

Queue pdf-acroform on a completed PDF job.

response = await client.submit_pdf_acroform_operation(
job_guid=parent_job_guid,
llm_type="DEFAULT",
ocr="PREMIUM",
page_range=[1, 2],
min_confidence=0.35,
max_fields_per_page=300,
)

print(response.parsed["jobGuid"])

submit_and_wait_for_pdf_acroform_operation

Run pdf-acroform, wait for completion, and download the generated fillable PDF.

conversion = await client.submit_and_wait_for_pdf_acroform_operation(
parent_job_guid,
ocr="PREMIUM",
min_confidence=0.35,
max_fields_per_page=300,
save_to="fillable.pdf",
)

print(conversion.operation_job_guid)

get_pdf_acroform

Download the generated PDF bytes for a pdf-acroform or pdf-acroform-apply operation job.

pdf_bytes = await client.get_pdf_acroform(operation_job_guid, save_to="fillable.pdf")

get_pdf_acroform_field_definitions

Fetch the normalized editable field definitions for a pdf-acroform operation.

entryBbox uses normalized page-image coordinates from 0..1, which makes this artifact the correct input for review tooling and manual edits.

definitions = await client.get_pdf_acroform_field_definitions(operation_job_guid)
fields = definitions["fieldDefinitions"] if definitions else []

submit_pdf_acroform_apply_operation

Queue reviewed field definitions to regenerate the PDF without rerunning visual detection.

response = await client.submit_pdf_acroform_apply_operation(
parent_job_guid,
field_definitions=edited_fields,
source_operation_guid=operation_job_guid,
)

print(response.parsed["jobGuid"])

submit_and_wait_for_pdf_acroform_apply_operation

Submit reviewed field definitions, wait for completion, and download the regenerated PDF.

applied = await client.submit_and_wait_for_pdf_acroform_apply_operation(
parent_job_guid,
field_definitions=edited_fields,
source_operation_guid=operation_job_guid,
save_to="fillable-reviewed.pdf",
)

print(applied.operation_job_guid)

Map-Reduce Helpers

submit_and_process_document_map_reduce

Process large documents using map-reduce strategy.

job_guid = await client.submit_and_process_document_map_reduce(
document=doc_bytes,
document_mime_type="application/pdf",
prompt="Extract line items",
split_type="page",
pages_per_chunk=5,
overlap_pages=1,
dedup_key="sku",
parallel_processing=True
)

submit_and_wait_for_map_reduce

Run map-reduce on an already-processed job without re-uploading or re-running OCR. This is the map-reduce equivalent of submit_and_wait_for_generative_task.

# First: process a document normally (OCR runs here)
job_guid = await client.submit_and_process_document(
document=doc_bytes,
document_mime_type="application/pdf",
prompt="Extract summary",
)
await client.wait_until_ready(job_guid)

# Later: re-run with map-reduce — no new upload, reuses OCR from parent job
result = await client.submit_and_wait_for_map_reduce(
parent_job_id=job_guid,
prompt="Extract all line items (sku, description, quantity, total)",
schema='{"type":"array","items":{"type":"object"}}',
split_type="page",
pages_per_chunk=5,
overlap_pages=1,
dedup_key="sku",
parallel_processing=True,
timeout=300,
result_format="json"
)
print(result["records"])

Parameters:

ParameterTypeDefaultDescription
parent_job_idstrrequiredGUID of the completed job whose document to re-process.
promptstr""Extraction instructions.
schemastr""JSON schema for structured extraction.
split_typestr"page"Chunk strategy: "page" or "markdown_header".
split_header_levelint2 (markdown mode)Header level (1 or 2) used when split_type="markdown_header".
pages_per_chunkint1Pages per chunk.
overlap_pagesint0Overlapping pages between chunks.
dedup_keystrNoneRequired when overlap_pages > 0.
parallel_processingboolFalseRun chunks in parallel.
mime_typestr"application/pdf"Document MIME type.
timeoutint180Max seconds to wait.
poll_intervalfloat5.0Seconds between status polls.
result_formatstr"json""json", "csv", "excel", or None.

When split_type="markdown_header", overlap_pages and dedup_key are not supported.

All other map-reduce parameters (header_options, header_schema, header_prompt, stop_when_empty, empty_chunk_grace, ocr, llm, trace, page_range, tools, etc.) are also accepted.

LLM Tracing

get_trace

Get the LLM trace for a completed job (only available if trace=True was set).

trace = await client.get_trace(job_guid)
if trace:
print(f"Total tokens: {trace['total_tokens']}")
print(f"LLM calls: {trace['total_llm_calls']}")
for event in trace['events']:
print(f" {event['type']}: {event['name']}")

get_image

Get a page thumbnail image from a processed job.

image_bytes = await client.get_image(job_guid, page_index=0)
if image_bytes:
with open("page_0.png", "wb") as f:
f.write(image_bytes)

get_figures_metadata

Get extracted figure metadata for a job.

metadata = await client.get_figures_metadata(job_guid)
images = metadata.get("images", []) if metadata else []

get_figure_image

Download a figure image by ID.

figure_id = images[0]["id"]
image_bytes = await client.get_figure_image(job_guid, figure_id)
if image_bytes:
with open("figure_0.png", "wb") as f:
f.write(image_bytes)

Job Management

delete_job

Delete a job and its associated data. Jobs must be in a terminal state (COMPLETED, ERROR, TIMEOUT, or PARTIAL). Jobs older than 14 days are automatically purged, so this method is primarily for cleaning up recent jobs.

result = await client.delete_job(job_guid)
if result.status_code == 200:
print(f"Deleted {result.parsed['jobsDeleted']} job(s)")

Parameters:

  • guid (str): The job GUID to delete

Returns: A response object with:

  • status_code (int): HTTP status (200 on success, 404 if not found)
  • parsed (dict): Contains jobsDeleted, errors on success

Note: Deleting a job removes all associated data including uploaded documents, OCR results, and extracted data. Usage/billing records are preserved but disassociated from the deleted job.

Enabling Tracing

Pass trace=True to any processing method:

job_guid = await client.submit_and_process_document(
document=doc_bytes,
document_mime_type="application/pdf",
prompt="Extract data",
trace=True # Enable LLM tracing
)

Error Handling

The SDK raises exceptions for API errors.

try:
await client.get_configuration("non-existent")
except Exception as e:
print(f"Error: {e}")