Skip to main content

SDK Methods

Complete reference for the DocuDevs Python SDK (docudevs-sdk).

Client Initialization

DocuDevsClient

Initialize the client with your API key.

from docudevs.docudevs_client import DocuDevsClient
import os

client = DocuDevsClient(
api_url="https://api.docudevs.ai", # Optional, defaults to production
token=os.getenv("DOCUDEVS_API_KEY") # Required
)

Document Processing

submit_and_process_document

Upload and process a document for structured data extraction. Use extract_figures=True to store figure images and metadata.

job_guid = await client.submit_and_process_document(
document=document_bytes,
document_mime_type="application/pdf",
prompt="Extract invoice data",
schema={...}, # Optional JSON schema
ocr="PREMIUM", # Optional: DEFAULT, PREMIUM, LOW
llm="HIGH", # Optional: DEFAULT, MINI, HIGH
extract_figures=True
)

submit_and_process_document_with_configuration

Process a document using a saved configuration.

job_guid = await client.submit_and_process_document_with_configuration(
document=document_bytes,
document_mime_type="application/pdf",
configuration_name="invoice-config"
)

submit_and_ocr_document

Process a document with OCR only (no structured extraction). Use extract_figures=True to store figure images and metadata.

job_guid = await client.submit_and_ocr_document(
document=document_bytes,
document_mime_type="application/pdf",
ocr="PREMIUM",
ocr_format="markdown", # markdown, plain, jsonl (for Excel)
describe_figures=True,
extract_figures=True
)

analyze_document

Analyze document structure and return a job GUID.

job_guid = await client.analyze_document(
document=document_bytes,
document_mime_type="application/pdf",
ocr="PREMIUM"
)

wait_until_ready

Wait for a job to complete and retrieve the result.

result = await client.wait_until_ready(
guid=job_guid,
timeout=180,
poll_interval=5.0,
result_format="json" # json, csv, excel, or None (legacy object)
)

Batch Processing

submit_and_process_batch

Upload and process multiple documents as a batch.

batch_guid = await client.submit_and_process_batch(
documents=[file1_bytes, file2_bytes],
document_mime_type="application/pdf",
prompt="Extract data",
max_concurrency=5
)

Configurations

save_configuration

Save a named configuration.

from docudevs.models import UploadCommand

config = UploadCommand(
prompt="Extract invoice data",
ocr="PREMIUM"
)
await client.save_configuration("invoice-config", config)

list_configurations

List all saved configurations.

configs = await client.list_configurations()

get_configuration

Get details of a specific configuration.

config = await client.get_configuration("invoice-config")

delete_configuration

Delete a configuration.

await client.delete_configuration("invoice-config")

Templates

upload_template

Upload a new document template.

with open("form.pdf", "rb") as f:
await client.upload_template(
name="form-template",
document=f,
mime_type="application/pdf"
)

list_templates

List all available templates.

templates = await client.list_templates()

metadata

Get metadata (fields) for a template.

meta = await client.metadata("form-template")

fill

Fill a template with data.

from docudevs.models import TemplateFillRequest

request = TemplateFillRequest(fields={"name": "John"})
response = await client.fill("form-template", request)

delete_template

Delete a template.

await client.delete_template("form-template")

Agent Chat

agent_chat

Send a chat message to the agent and receive a job GUID.

response = await client.agent_chat(
messages=[{"role": "user", "content": "Help me extract invoice line items"}],
session_id="session-123"
)

agent_status

Check the status of an agent chat job.

status = await client.agent_status(response["jobGuid"])

agent_chat_and_wait

Send a message and wait for the completed response.

result = await client.agent_chat_and_wait(
messages=[{"role": "user", "content": "Create a schema for insurance claims"}],
session_id="session-123"
)
print(result["response"]["message"])

Cases

create_case

Create a new case (collection of documents).

from docudevs.models import CreateCaseBody

case = await client.create_case(
body=CreateCaseBody(name="Q1 Invoices")
)

list_cases

List all cases.

cases = await client.list_cases()

upload_case_document

Upload a document to a case.

from docudevs.models import UploadCaseDocumentBody
from docudevs.types import File

await client.upload_case_document(
case_id=123,
body=UploadCaseDocumentBody(
document=File(payload=data, file_name="doc.pdf")
)
)

Operations

submit_and_wait_for_error_analysis

Run error analysis on a completed job.

analysis = await client.submit_and_wait_for_error_analysis(job_guid)

submit_and_wait_for_generative_task

Run a generative AI task on a completed job.

task = await client.submit_and_wait_for_generative_task(
parent_job_id=job_guid,
prompt="Summarize this document",
model="DEFAULT"
)
# Result is in task.result (JSON string)

submit_and_wait_for_operation_with_parameters

Run an operation with custom parameters.

result = await client.submit_and_wait_for_operation_with_parameters(
job_guid=job_guid,
operation_type="error-analysis",
llm_type="HIGH",
custom_parameters={"focus": "dates"}
)

submit_and_wait_for_image_selection

Select relevant figures from a completed job that extracted figures.

selection = await client.submit_and_wait_for_image_selection(
job_guid,
prompt="Return all diagrams from the document",
top_k=5,
match_mode="all",
use_vision=False
)

import json
selection_payload = json.loads(selection.result)
selected = selection_payload.get("selected", [])

Map-Reduce Helpers

submit_and_process_document_map_reduce

Process large documents using map-reduce strategy.

job_guid = await client.submit_and_process_document_map_reduce(
document=doc_bytes,
document_mime_type="application/pdf",
prompt="Extract line items",
split_type="page",
pages_per_chunk=5,
overlap_pages=1,
dedup_key="sku",
parallel_processing=True
)

submit_and_wait_for_map_reduce

Run map-reduce on an already-processed job without re-uploading or re-running OCR. This is the map-reduce equivalent of submit_and_wait_for_generative_task.

# First: process a document normally (OCR runs here)
job_guid = await client.submit_and_process_document(
document=doc_bytes,
document_mime_type="application/pdf",
prompt="Extract summary",
)
await client.wait_until_ready(job_guid)

# Later: re-run with map-reduce — no new upload, reuses OCR from parent job
result = await client.submit_and_wait_for_map_reduce(
parent_job_id=job_guid,
prompt="Extract all line items (sku, description, quantity, total)",
schema='{"type":"array","items":{"type":"object"}}',
split_type="page",
pages_per_chunk=5,
overlap_pages=1,
dedup_key="sku",
parallel_processing=True,
timeout=300,
result_format="json"
)
print(result["records"])

Parameters:

ParameterTypeDefaultDescription
parent_job_idstrrequiredGUID of the completed job whose document to re-process.
promptstr""Extraction instructions.
schemastr""JSON schema for structured extraction.
split_typestr"page"Chunk strategy: "page" or "markdown_header".
split_header_levelint2 (markdown mode)Header level (1 or 2) used when split_type="markdown_header".
pages_per_chunkint1Pages per chunk.
overlap_pagesint0Overlapping pages between chunks.
dedup_keystrNoneRequired when overlap_pages > 0.
parallel_processingboolFalseRun chunks in parallel.
mime_typestr"application/pdf"Document MIME type.
timeoutint180Max seconds to wait.
poll_intervalfloat5.0Seconds between status polls.
result_formatstr"json""json", "csv", "excel", or None.

When split_type="markdown_header", overlap_pages and dedup_key are not supported.

All other map-reduce parameters (header_options, header_schema, header_prompt, stop_when_empty, empty_chunk_grace, ocr, llm, trace, page_range, tools, etc.) are also accepted.

LLM Tracing

get_trace

Get the LLM trace for a completed job (only available if trace=True was set).

trace = await client.get_trace(job_guid)
if trace:
print(f"Total tokens: {trace['total_tokens']}")
print(f"LLM calls: {trace['total_llm_calls']}")
for event in trace['events']:
print(f" {event['type']}: {event['name']}")

get_image

Get a page thumbnail image from a processed job.

image_bytes = await client.get_image(job_guid, page_index=0)
if image_bytes:
with open("page_0.png", "wb") as f:
f.write(image_bytes)

get_figures_metadata

Get extracted figure metadata for a job.

metadata = await client.get_figures_metadata(job_guid)
images = metadata.get("images", []) if metadata else []

get_figure_image

Download a figure image by ID.

figure_id = images[0]["id"]
image_bytes = await client.get_figure_image(job_guid, figure_id)
if image_bytes:
with open("figure_0.png", "wb") as f:
f.write(image_bytes)

Job Management

delete_job

Delete a job and its associated data. Jobs must be in a terminal state (COMPLETED, ERROR, TIMEOUT, or PARTIAL). Jobs older than 14 days are automatically purged, so this method is primarily for cleaning up recent jobs.

result = await client.delete_job(job_guid)
if result.status_code == 200:
print(f"Deleted {result.parsed['jobsDeleted']} job(s)")

Parameters:

  • guid (str): The job GUID to delete

Returns: A response object with:

  • status_code (int): HTTP status (200 on success, 404 if not found)
  • parsed (dict): Contains jobsDeleted, errors on success

Note: Deleting a job removes all associated data including uploaded documents, OCR results, and extracted data. Usage/billing records are preserved but disassociated from the deleted job.

Enabling Tracing

Pass trace=True to any processing method:

job_guid = await client.submit_and_process_document(
document=doc_bytes,
document_mime_type="application/pdf",
prompt="Extract data",
trace=True # Enable LLM tracing
)

Error Handling

The SDK raises exceptions for API errors.

try:
await client.get_configuration("non-existent")
except Exception as e:
print(f"Error: {e}")