Skip to main content

SDK Method Reference

Complete reference for all DocuDevs Python SDK methods.

Client Initialization

DocuDevsClient

from docudevs.docudevs_client import DocuDevsClient

client = DocuDevsClient(
api_url="https://api.docudevs.ai", # Optional, defaults to production
token="your-api-key" # Required
)

Configuration Management

list_configurations()

List all saved configurations.

configurations = await client.list_configurations()

Returns: List of configuration objects

get_configuration(name)

Get a specific configuration by name.

config = await client.get_configuration("my-config")

Parameters:

  • name (str): Configuration name

Returns: Configuration object

save_configuration(name, body)

Save a new configuration.

from docudevs.models import UploadCommand

body = UploadCommand(
instructions="Extract invoice data",
llm="gpt-4"
)

response = await client.save_configuration("invoice-config", body)

Parameters:

  • name (str): Configuration name
  • body (UploadCommand): Configuration details

delete_configuration(name)

Delete a configuration.

response = await client.delete_configuration("old-config")

Document Upload and Processing

upload_files(body)

Upload files with processing parameters.

from docudevs.models import UploadFilesBody
from docudevs.types import File

document_file = File(
payload=document_bytes,
file_name="document.pdf",
mime_type="application/pdf"
)

body = UploadFilesBody(
files=[document_file],
instructions="Extract key information"
)

response = await client.upload_files(body)

upload_document(body)

Upload a single document.

from docudevs.models import UploadDocumentBody

body = UploadDocumentBody(document=document_file)

response = await client.upload_document(body)

submit_and_process_document()

Convenience method to upload and process a document for structured extraction in one call. Returns a job GUID.

job_id = await client.submit_and_process_document(
document=document_bytes,
document_mime_type="application/pdf",
prompt="Extract invoice data",
llm="gpt-4"
)

result = await client.wait_until_ready(job_id, result_format="json")
print(json.dumps(result, indent=2))

Parameters:

  • document (bytes): Document content
  • document_mime_type (str): MIME type
  • prompt (str, optional): Processing instructions
  • llm (str, optional): LLM to use
  • schema (dict, optional): JSON schema for structured output

Job Management

status(uuid)

Get job status.

status_response = await client.status(uuid="job-guid")

result(uuid)

Get job result.

result_response = await client.result(uuid="job-guid")

wait_until_ready()

Wait for a job to complete and return the result.

result = await client.wait_until_ready(
guid="job-guid",
timeout=60,
poll_interval=1.0,
result_format="json"
)

Parameters:

  • guid (str): Job GUID
  • timeout (int): Maximum wait time in seconds
  • poll_interval (float): Polling interval in seconds
  • result_format (str | None): Optional explicit format ("json", "csv", "excel"). When omitted, returns legacy object mode.
  • excel_save_to (str | None): Optional file path to persist Excel output when result_format="excel".

Cases Management

create_case()

Create a new document case.

# Create a new document case
case = await client.create_case(
name="Invoice Processing Q1",
description="Q1 2024 invoices"
)

Parameters:

  • name (str): Case name
  • description (str, optional): Case description

list_cases()

List all cases.

cases = await client.list_cases()

get_case(case_id)

Get case details.

case = await client.get_case(case_id=123)

upload_case_document()

Upload a document to a case.

response = await client.upload_case_document(
case_id=123,
document=document_bytes,
document_mime_type="application/pdf",
filename="invoice.pdf"
)

Operations

submit_operation()

Submit an operation for a completed job.

response = await client.submit_operation(
job_guid="completed-job-guid",
operation_type="error-analysis"
)

submit_operation_with_parameters()

Submit operation with custom parameters.

response = await client.submit_operation_with_parameters(
job_guid="job-guid",
operation_type="error-analysis",
llm_type="HIGH",
custom_parameters={"focus": "numerical_data"}
)

get_operation_status()

Get status of operations for a job.

status = await client.get_operation_status(job_guid="job-guid")

get_operation_result()

Get result of a specific operation.

result = await client.get_operation_result(
job_guid="job-guid",
operation_type="error-analysis"
)

submit_and_wait_for_operation()

Submit operation and wait for completion.

result = await client.submit_and_wait_for_operation(
job_guid="job-guid",
operation_type="error-analysis",
timeout=120,
poll_interval=2.0
)

submit_and_wait_for_error_analysis()

Convenience method for error analysis.

analysis = await client.submit_and_wait_for_error_analysis(
job_guid="job-guid",
timeout=120
)

Generative Tasks

create_generative_task()

Create a generative task for a completed job.

response = await client.create_generative_task(
parent_job_id="completed-job-guid",
prompt="Summarize this document and explain its main purpose",
model="DEFAULT",
temperature=0.7,
max_tokens=500
)

Parameters:

  • parent_job_id (str): The parent job GUID (must be completed)
  • prompt (str): The prompt to send to the AI model
  • model (str, optional): AI model to use
  • temperature (float, optional): Temperature parameter (0.0 to 1.0)
  • max_tokens (int, optional): Maximum tokens to generate

Returns: Generative task creation response

submit_and_wait_for_generative_task()

Create a generative task and wait for completion.

result = await client.submit_and_wait_for_generative_task(
parent_job_id="completed-job-guid",
prompt="Summarize this document and explain its main purpose",
model="DEFAULT",
temperature=0.7,
max_tokens=500,
timeout=120,
poll_interval=2.0
)

# Parse the result JSON
import json
result_data = json.loads(result.result)
generated_text = result_data['generated_text']

Parameters:

  • parent_job_id (str): The parent job GUID (must be completed)
  • prompt (str): The prompt to send to the AI model
  • model (str, optional): AI model to use
  • temperature (float, optional): Temperature parameter (0.0 to 1.0)
  • max_tokens (int, optional): Maximum tokens to generate
  • timeout (int): Maximum time to wait in seconds (default: 120)
  • poll_interval (float): Time between status checks in seconds (default: 2.0)

Returns: The generative task result once complete

Raises:

  • TimeoutError: If the operation doesn't complete within the timeout
  • Exception: If the operation fails or errors occur

OCR Processing

submit_and_ocr_document()

Convenience method to upload and process a document with OCR only.

guid = await client.submit_and_ocr_document(
document=document_bytes,
document_mime_type="application/pdf",
ocr="PREMIUM",
ocr_format="markdown",
describe_figures=True
)

Parameters:

  • document (BytesIO): Document content
  • document_mime_type (str): MIME type
  • ocr (str, optional): OCR type ("DEFAULT", "LOW", "PREMIUM", "AUTO")
  • ocr_format (str, optional): OCR output format ("plain", "markdown")
  • describe_figures (bool, optional): Whether to describe figures in detail (ignored when ocr is "LOW")

Returns: Job GUID for the OCR processing job

Map-Reduce Convenience Methods

Use these helpers to process large documents in overlapping page chunks with deduplication.

submit_and_process_document_map_reduce()

job_id = await client.submit_and_process_document_map_reduce(
document=document_bytes,
document_mime_type="application/pdf",
prompt="Extract invoice line items (sku, description, qty, unit_price, total)",
pages_per_chunk=4,
overlap_pages=1,
dedup_key="items.sku"
)

result = await client.wait_until_ready(job_id, result_format="json")

Parameters:

  • pages_per_chunk (int, default 1) size of each window
  • overlap_pages (int, default 0) pages re-used from previous window
  • dedup_key (str, optional) path for cross-chunk deduplication (required if overlap > 0)

Validation rules enforced client-side:

  • pages_per_chunk >= 1
  • 0 <= overlap_pages < pages_per_chunk
  • dedup_key required when overlap_pages > 0

process_document_map_reduce()

Process a previously uploaded (already has GUID) document using map-reduce parameters.

response = await client.process_document_map_reduce(
guid="uploaded-guid",
prompt="Extract contract parties and obligations",
pages_per_chunk=5,
overlap_pages=1,
dedup_key="parties.name"
)

build_upload_command_map_reduce()

Builds the lower-level command object; primarily useful for advanced batching scenarios.

cmd = client.build_upload_command_map_reduce(
prompt="Extract table rows",
pages_per_chunk=3,
overlap_pages=1,
dedup_key="rows.id"
)

See also: Map-Reduce Extraction for strategy and tuning guidance.

Templates

list_templates()

List all templates.

templates = await client.list_templates()

upload_template()

Upload a new template.

response = await client.upload_template(
name="invoice-template",
template_file=template_bytes,
instructions="Extract invoice data"
)

metadata(template_id)

Get template metadata.

metadata = await client.metadata(template_id="template-name")

fill_template()

Fill a template with data.

from docudevs.models import TemplateFillRequest

request = TemplateFillRequest(
document_data={"name": "John Doe", "amount": 1000}
)

response = await client.fill_template(
template_id="template-name",
request=request
)

delete_template()

Delete a template.

response = await client.delete_template(name="old-template")

Schema Generation

generate_schema()

Generate JSON schema from a document.

response = await client.generate_schema(
document=document_bytes,
document_mime_type="application/pdf",
instructions="Focus on financial data"
)

Processing Methods

process_document()

Process an uploaded document.

response = await client.process_document(
guid="uploaded-doc-guid",
instructions="Extract all data",
llm="gpt-4"
)

process_document_with_configuration()

Process using a saved configuration.

response = await client.process_document_with_configuration(
guid="uploaded-doc-guid",
configuration_name="invoice-config"
)

ocr_document()

Process document with OCR only.

from docudevs.models import OcrCommand

ocr_body = OcrCommand(
ocr="advanced",
ocr_format="markdown",
mime_type="application/pdf"
)

response = await client.ocr_document(
guid="uploaded-doc-guid",
body=ocr_body
)

Error Handling

All SDK methods can raise exceptions. Common patterns:

try:
job_id = await client.submit_and_process_document(
document=document_bytes,
document_mime_type="application/pdf"
)
result = await client.wait_until_ready(job_id, result_format="json")
print(json.dumps(result, indent=2))
except TimeoutError as e:
print(f"Processing timed out: {e}")
except Exception as e:
print(f"Processing failed: {e}")

Response Objects

Standard Response Structure

Most methods return response objects with:

response.status_code  # HTTP status code
response.content # Raw response content
response.parsed # Parsed response object (if successful)

Common Response Types

UploadResponse:

response.parsed.guid    # Job GUID
response.parsed.status # Job status

Job Status:

status.status          # PENDING, PROCESSING, COMPLETED, FAILED
status.created_at # Creation timestamp
status.updated_at # Last update timestamp

Job Result:

result.result          # Extracted data (JSON string)
result.metadata # Processing metadata

Result Handling

How to read results depending on the workflow:

  • Extraction (structured data)
    • Use submit_and_process_document(...) to submit.
    • Call wait_until_ready(job_id, result_format="json") to fetch the canonical JSON payload directly.
    • The return value is a Python dict for single-document jobs or a list of dicts for batch jobs.
# Extraction flow (single document)
job_id = await client.submit_and_process_document(
document=document_bytes,
document_mime_type="application/pdf",
prompt="Extract invoice data"
)
data = await client.wait_until_ready(job_id, result_format="json")
print(json.dumps(data, indent=2))
  • Batch extraction
    • Submit with the batch helpers, then request JSON using the same result_format flag.
    • The API returns a list aligned to the uploaded document order. Each entry is either a dict matching your schema or null for failed/missing items.
batch_guid = await client.submit_and_process_batch(
documents=file_paths,
document_mime_type="application/pdf",
prompt="Extract invoice data"
)
batch_results = await client.wait_until_ready(batch_guid, result_format="json")
for index, item in enumerate(batch_results):
if item:
print(f"Document {index}:", json.dumps(item, indent=2))
else:
print(f"Document {index}: no structured result")
  • Legacy object mode

    • If you omit result_format, wait_until_ready retains backward compatibility by returning a SimpleNamespace-like object.
    • Structured extraction jobs expose their JSON in result.parsed; batch jobs now surface a list in result.result.
  • OCR-only (plain text or markdown)

    • Use submit_and_ocr_document(...) to submit.
    • wait_until_ready(job_id) (no result_format) returns text content in result.result.
# OCR flow
job_id = await client.submit_and_ocr_document(
document=document_bytes,
document_mime_type="application/pdf",
ocr="PREMIUM",
ocr_format="markdown"
)
ocr_result = await client.wait_until_ready(job_id)
print(ocr_result.result)
  • Generative tasks (JSON string)
    • submit_and_wait_for_generative_task(...) returns a job whose result.result is a JSON string.
    • Parse with json.loads(...) then access generated_text.
# Generative task flow
gen = await client.submit_and_wait_for_generative_task(
parent_job_id=parent_job_id,
prompt="Summarize this document"
)
data = json.loads(gen.result)
print(data["generated_text"])

Constants and Enums

LLM Types

  • "gpt-3.5-turbo"
  • "gpt-4"
  • "gpt-4-turbo"

OCR Types

  • "DEFAULT"
  • "LOW"
  • "PREMIUM"
  • "AUTO"

OCR Formats

  • "plain"
  • "markdown"
  • "json"

Operation Types

  • "error-analysis"
  • "generative-task"

Generative Task Models

  • "DEFAULT" - Standard AI model, good balance of speed and quality
  • "HIGH" - Premium AI model, slower but higher quality responses

Best Practices

Authentication

import os

# Use environment variables for API keys
client = DocuDevsClient(
token=os.getenv("DOCUDEVS_API_KEY")
)

Error Handling (Best Practices)

async def safe_process_document(document_data, mime_type):
try:
job_id = await client.submit_and_process_document(
document=document_data,
document_mime_type=mime_type
)
result = await client.wait_until_ready(job_id, timeout=120, result_format="json")
return result, None
except Exception as e:
return None, str(e)

Batch Processing

async def process_documents_batch(documents):
# Submit all jobs concurrently
submit_tasks = [
client.submit_and_process_document(document=doc, document_mime_type=mime)
for doc, mime in documents
]
job_ids = await asyncio.gather(*submit_tasks)

# Wait for all to complete
wait_tasks = [client.wait_until_ready(job_id, result_format="json") for job_id in job_ids]
return await asyncio.gather(*wait_tasks)

Generative Task Workflows

async def analyze_document_with_ai(document_data, mime_type):
"""Complete workflow: OCR + multiple generative tasks"""

# Step 1: Process with OCR
ocr_job_id = await client.submit_and_ocr_document(
document=document_data,
document_mime_type=mime_type,
ocr="PREMIUM",
ocr_format="markdown"
)

# Wait for OCR to complete
await client.wait_until_ready(ocr_job_id)

# Step 2: Generate summary
summary_task = client.submit_and_wait_for_generative_task(
parent_job_id=ocr_job_id,
prompt="Provide a concise summary of this document."
)

# Step 3: Extract key information
key_info_task = client.submit_and_wait_for_generative_task(
parent_job_id=ocr_job_id,
prompt="List the most important facts, dates, and numbers from this document."
)

# Step 4: Categorize document
category_task = client.submit_and_wait_for_generative_task(
parent_job_id=ocr_job_id,
prompt="What type of document is this? (e.g., invoice, contract, report, etc.)"
)

# Wait for all generative tasks to complete
summary, key_info, category = await asyncio.gather(
summary_task, key_info_task, category_task
)

# Parse results
import json
return {
'summary': json.loads(summary.result)['generated_text'],
'key_info': json.loads(key_info.result)['generated_text'],
'category': json.loads(category.result)['generated_text']
}

# Usage
analysis = await analyze_document_with_ai(document_bytes, "application/pdf")
print(f"Document Type: {analysis['category']}")
print(f"Summary: {analysis['summary']}")
print(f"Key Information: {analysis['key_info']}")