SDK Method Reference
Complete reference for all DocuDevs Python SDK methods.
Client Initialization
DocuDevsClient
from docudevs.docudevs_client import DocuDevsClient
client = DocuDevsClient(
api_url="https://api.docudevs.ai", # Optional, defaults to production
token="your-api-key" # Required
)
Configuration Management
list_configurations()
List all saved configurations.
configurations = await client.list_configurations()
Returns: List of configuration objects
get_configuration(name)
Get a specific configuration by name.
config = await client.get_configuration("my-config")
Parameters:
name
(str): Configuration name
Returns: Configuration object
save_configuration(name, body)
Save a new configuration.
from docudevs.models import UploadCommand
body = UploadCommand(
instructions="Extract invoice data",
llm="gpt-4"
)
response = await client.save_configuration("invoice-config", body)
Parameters:
name
(str): Configuration namebody
(UploadCommand): Configuration details
delete_configuration(name)
Delete a configuration.
response = await client.delete_configuration("old-config")
Document Upload and Processing
upload_files(body)
Upload files with processing parameters.
from docudevs.models import UploadFilesBody
from docudevs.types import File
document_file = File(
payload=document_bytes,
file_name="document.pdf",
mime_type="application/pdf"
)
body = UploadFilesBody(
files=[document_file],
instructions="Extract key information"
)
response = await client.upload_files(body)
upload_document(body)
Upload a single document.
from docudevs.models import UploadDocumentBody
body = UploadDocumentBody(document=document_file)
response = await client.upload_document(body)
submit_and_process_document()
Convenience method to upload and process a document for structured extraction in one call. Returns a job GUID.
job_id = await client.submit_and_process_document(
document=document_bytes,
document_mime_type="application/pdf",
prompt="Extract invoice data",
llm="gpt-4"
)
result = await client.wait_until_ready(job_id, result_format="json")
print(json.dumps(result, indent=2))
Parameters:
document
(bytes): Document contentdocument_mime_type
(str): MIME typeprompt
(str, optional): Processing instructionsllm
(str, optional): LLM to useschema
(dict, optional): JSON schema for structured output
Job Management
status(uuid)
Get job status.
status_response = await client.status(uuid="job-guid")
result(uuid)
Get job result.
result_response = await client.result(uuid="job-guid")
wait_until_ready()
Wait for a job to complete and return the result.
result = await client.wait_until_ready(
guid="job-guid",
timeout=60,
poll_interval=1.0,
result_format="json"
)
Parameters:
guid
(str): Job GUIDtimeout
(int): Maximum wait time in secondspoll_interval
(float): Polling interval in secondsresult_format
(str | None): Optional explicit format ("json"
,"csv"
,"excel"
). When omitted, returns legacy object mode.excel_save_to
(str | None): Optional file path to persist Excel output whenresult_format="excel"
.
Cases Management
create_case()
Create a new document case.
# Create a new document case
case = await client.create_case(
name="Invoice Processing Q1",
description="Q1 2024 invoices"
)
Parameters:
name
(str): Case namedescription
(str, optional): Case description
list_cases()
List all cases.
cases = await client.list_cases()
get_case(case_id)
Get case details.
case = await client.get_case(case_id=123)
upload_case_document()
Upload a document to a case.
response = await client.upload_case_document(
case_id=123,
document=document_bytes,
document_mime_type="application/pdf",
filename="invoice.pdf"
)
Operations
submit_operation()
Submit an operation for a completed job.
response = await client.submit_operation(
job_guid="completed-job-guid",
operation_type="error-analysis"
)
submit_operation_with_parameters()
Submit operation with custom parameters.
response = await client.submit_operation_with_parameters(
job_guid="job-guid",
operation_type="error-analysis",
llm_type="HIGH",
custom_parameters={"focus": "numerical_data"}
)
get_operation_status()
Get status of operations for a job.
status = await client.get_operation_status(job_guid="job-guid")
get_operation_result()
Get result of a specific operation.
result = await client.get_operation_result(
job_guid="job-guid",
operation_type="error-analysis"
)
submit_and_wait_for_operation()
Submit operation and wait for completion.
result = await client.submit_and_wait_for_operation(
job_guid="job-guid",
operation_type="error-analysis",
timeout=120,
poll_interval=2.0
)
submit_and_wait_for_error_analysis()
Convenience method for error analysis.
analysis = await client.submit_and_wait_for_error_analysis(
job_guid="job-guid",
timeout=120
)
Generative Tasks
create_generative_task()
Create a generative task for a completed job.
response = await client.create_generative_task(
parent_job_id="completed-job-guid",
prompt="Summarize this document and explain its main purpose",
model="DEFAULT",
temperature=0.7,
max_tokens=500
)
Parameters:
parent_job_id
(str): The parent job GUID (must be completed)prompt
(str): The prompt to send to the AI modelmodel
(str, optional): AI model to usetemperature
(float, optional): Temperature parameter (0.0 to 1.0)max_tokens
(int, optional): Maximum tokens to generate
Returns: Generative task creation response
submit_and_wait_for_generative_task()
Create a generative task and wait for completion.
result = await client.submit_and_wait_for_generative_task(
parent_job_id="completed-job-guid",
prompt="Summarize this document and explain its main purpose",
model="DEFAULT",
temperature=0.7,
max_tokens=500,
timeout=120,
poll_interval=2.0
)
# Parse the result JSON
import json
result_data = json.loads(result.result)
generated_text = result_data['generated_text']
Parameters:
parent_job_id
(str): The parent job GUID (must be completed)prompt
(str): The prompt to send to the AI modelmodel
(str, optional): AI model to usetemperature
(float, optional): Temperature parameter (0.0 to 1.0)max_tokens
(int, optional): Maximum tokens to generatetimeout
(int): Maximum time to wait in seconds (default: 120)poll_interval
(float): Time between status checks in seconds (default: 2.0)
Returns: The generative task result once complete
Raises:
TimeoutError
: If the operation doesn't complete within the timeoutException
: If the operation fails or errors occur
OCR Processing
submit_and_ocr_document()
Convenience method to upload and process a document with OCR only.
guid = await client.submit_and_ocr_document(
document=document_bytes,
document_mime_type="application/pdf",
ocr="PREMIUM",
ocr_format="markdown",
describe_figures=True
)
Parameters:
document
(BytesIO): Document contentdocument_mime_type
(str): MIME typeocr
(str, optional): OCR type ("DEFAULT", "LOW", "PREMIUM", "AUTO")ocr_format
(str, optional): OCR output format ("plain", "markdown")describe_figures
(bool, optional): Whether to describe figures in detail (ignored whenocr
is "LOW")
Returns: Job GUID for the OCR processing job
Map-Reduce Convenience Methods
Use these helpers to process large documents in overlapping page chunks with deduplication.
submit_and_process_document_map_reduce()
job_id = await client.submit_and_process_document_map_reduce(
document=document_bytes,
document_mime_type="application/pdf",
prompt="Extract invoice line items (sku, description, qty, unit_price, total)",
pages_per_chunk=4,
overlap_pages=1,
dedup_key="items.sku"
)
result = await client.wait_until_ready(job_id, result_format="json")
Parameters:
pages_per_chunk
(int, default 1) size of each windowoverlap_pages
(int, default 0) pages re-used from previous windowdedup_key
(str, optional) path for cross-chunk deduplication (required if overlap > 0)
Validation rules enforced client-side:
pages_per_chunk >= 1
0 <= overlap_pages < pages_per_chunk
dedup_key
required whenoverlap_pages > 0
process_document_map_reduce()
Process a previously uploaded (already has GUID) document using map-reduce parameters.
response = await client.process_document_map_reduce(
guid="uploaded-guid",
prompt="Extract contract parties and obligations",
pages_per_chunk=5,
overlap_pages=1,
dedup_key="parties.name"
)
build_upload_command_map_reduce()
Builds the lower-level command object; primarily useful for advanced batching scenarios.
cmd = client.build_upload_command_map_reduce(
prompt="Extract table rows",
pages_per_chunk=3,
overlap_pages=1,
dedup_key="rows.id"
)
See also: Map-Reduce Extraction for strategy and tuning guidance.
Templates
list_templates()
List all templates.
templates = await client.list_templates()
upload_template()
Upload a new template.
response = await client.upload_template(
name="invoice-template",
template_file=template_bytes,
instructions="Extract invoice data"
)
metadata(template_id)
Get template metadata.
metadata = await client.metadata(template_id="template-name")
fill_template()
Fill a template with data.
from docudevs.models import TemplateFillRequest
request = TemplateFillRequest(
document_data={"name": "John Doe", "amount": 1000}
)
response = await client.fill_template(
template_id="template-name",
request=request
)
delete_template()
Delete a template.
response = await client.delete_template(name="old-template")
Schema Generation
generate_schema()
Generate JSON schema from a document.
response = await client.generate_schema(
document=document_bytes,
document_mime_type="application/pdf",
instructions="Focus on financial data"
)
Processing Methods
process_document()
Process an uploaded document.
response = await client.process_document(
guid="uploaded-doc-guid",
instructions="Extract all data",
llm="gpt-4"
)
process_document_with_configuration()
Process using a saved configuration.
response = await client.process_document_with_configuration(
guid="uploaded-doc-guid",
configuration_name="invoice-config"
)
ocr_document()
Process document with OCR only.
from docudevs.models import OcrCommand
ocr_body = OcrCommand(
ocr="advanced",
ocr_format="markdown",
mime_type="application/pdf"
)
response = await client.ocr_document(
guid="uploaded-doc-guid",
body=ocr_body
)
Error Handling
All SDK methods can raise exceptions. Common patterns:
try:
job_id = await client.submit_and_process_document(
document=document_bytes,
document_mime_type="application/pdf"
)
result = await client.wait_until_ready(job_id, result_format="json")
print(json.dumps(result, indent=2))
except TimeoutError as e:
print(f"Processing timed out: {e}")
except Exception as e:
print(f"Processing failed: {e}")
Response Objects
Standard Response Structure
Most methods return response objects with:
response.status_code # HTTP status code
response.content # Raw response content
response.parsed # Parsed response object (if successful)
Common Response Types
UploadResponse:
response.parsed.guid # Job GUID
response.parsed.status # Job status
Job Status:
status.status # PENDING, PROCESSING, COMPLETED, FAILED
status.created_at # Creation timestamp
status.updated_at # Last update timestamp
Job Result:
result.result # Extracted data (JSON string)
result.metadata # Processing metadata
Result Handling
How to read results depending on the workflow:
- Extraction (structured data)
- Use
submit_and_process_document(...)
to submit. - Call
wait_until_ready(job_id, result_format="json")
to fetch the canonical JSON payload directly. - The return value is a Python dict for single-document jobs or a list of dicts for batch jobs.
- Use
# Extraction flow (single document)
job_id = await client.submit_and_process_document(
document=document_bytes,
document_mime_type="application/pdf",
prompt="Extract invoice data"
)
data = await client.wait_until_ready(job_id, result_format="json")
print(json.dumps(data, indent=2))
- Batch extraction
- Submit with the batch helpers, then request JSON using the same
result_format
flag. - The API returns a list aligned to the uploaded document order. Each entry is either a dict matching your schema or
null
for failed/missing items.
- Submit with the batch helpers, then request JSON using the same
batch_guid = await client.submit_and_process_batch(
documents=file_paths,
document_mime_type="application/pdf",
prompt="Extract invoice data"
)
batch_results = await client.wait_until_ready(batch_guid, result_format="json")
for index, item in enumerate(batch_results):
if item:
print(f"Document {index}:", json.dumps(item, indent=2))
else:
print(f"Document {index}: no structured result")
-
Legacy object mode
- If you omit
result_format
,wait_until_ready
retains backward compatibility by returning a SimpleNamespace-like object. - Structured extraction jobs expose their JSON in
result.parsed
; batch jobs now surface a list inresult.result
.
- If you omit
-
OCR-only (plain text or markdown)
- Use
submit_and_ocr_document(...)
to submit. wait_until_ready(job_id)
(noresult_format
) returns text content inresult.result
.
- Use
# OCR flow
job_id = await client.submit_and_ocr_document(
document=document_bytes,
document_mime_type="application/pdf",
ocr="PREMIUM",
ocr_format="markdown"
)
ocr_result = await client.wait_until_ready(job_id)
print(ocr_result.result)
- Generative tasks (JSON string)
submit_and_wait_for_generative_task(...)
returns a job whoseresult.result
is a JSON string.- Parse with
json.loads(...)
then accessgenerated_text
.
# Generative task flow
gen = await client.submit_and_wait_for_generative_task(
parent_job_id=parent_job_id,
prompt="Summarize this document"
)
data = json.loads(gen.result)
print(data["generated_text"])
Constants and Enums
LLM Types
- "gpt-3.5-turbo"
- "gpt-4"
- "gpt-4-turbo"
OCR Types
- "DEFAULT"
- "LOW"
- "PREMIUM"
- "AUTO"
OCR Formats
- "plain"
- "markdown"
- "json"
Operation Types
- "error-analysis"
- "generative-task"
Generative Task Models
- "DEFAULT" - Standard AI model, good balance of speed and quality
- "HIGH" - Premium AI model, slower but higher quality responses
Best Practices
Authentication
import os
# Use environment variables for API keys
client = DocuDevsClient(
token=os.getenv("DOCUDEVS_API_KEY")
)
Error Handling (Best Practices)
async def safe_process_document(document_data, mime_type):
try:
job_id = await client.submit_and_process_document(
document=document_data,
document_mime_type=mime_type
)
result = await client.wait_until_ready(job_id, timeout=120, result_format="json")
return result, None
except Exception as e:
return None, str(e)
Batch Processing
async def process_documents_batch(documents):
# Submit all jobs concurrently
submit_tasks = [
client.submit_and_process_document(document=doc, document_mime_type=mime)
for doc, mime in documents
]
job_ids = await asyncio.gather(*submit_tasks)
# Wait for all to complete
wait_tasks = [client.wait_until_ready(job_id, result_format="json") for job_id in job_ids]
return await asyncio.gather(*wait_tasks)
Generative Task Workflows
async def analyze_document_with_ai(document_data, mime_type):
"""Complete workflow: OCR + multiple generative tasks"""
# Step 1: Process with OCR
ocr_job_id = await client.submit_and_ocr_document(
document=document_data,
document_mime_type=mime_type,
ocr="PREMIUM",
ocr_format="markdown"
)
# Wait for OCR to complete
await client.wait_until_ready(ocr_job_id)
# Step 2: Generate summary
summary_task = client.submit_and_wait_for_generative_task(
parent_job_id=ocr_job_id,
prompt="Provide a concise summary of this document."
)
# Step 3: Extract key information
key_info_task = client.submit_and_wait_for_generative_task(
parent_job_id=ocr_job_id,
prompt="List the most important facts, dates, and numbers from this document."
)
# Step 4: Categorize document
category_task = client.submit_and_wait_for_generative_task(
parent_job_id=ocr_job_id,
prompt="What type of document is this? (e.g., invoice, contract, report, etc.)"
)
# Wait for all generative tasks to complete
summary, key_info, category = await asyncio.gather(
summary_task, key_info_task, category_task
)
# Parse results
import json
return {
'summary': json.loads(summary.result)['generated_text'],
'key_info': json.loads(key_info.result)['generated_text'],
'category': json.loads(category.result)['generated_text']
}
# Usage
analysis = await analyze_document_with_ai(document_bytes, "application/pdf")
print(f"Document Type: {analysis['category']}")
print(f"Summary: {analysis['summary']}")
print(f"Key Information: {analysis['key_info']}")