Documentation/Pipeline

Data Intelligence Pipeline

Core

End-to-end document processing with structured Data Packages

Simplest Usage

One call. Smart defaults handle the rest. You get back a composed, structured Data Package.

Smart Defaults -- files onlyPython
from latence import Latence

client = Latence(api_key="lat_xxx")

# Submit files -- smart defaults handle the rest
job = client.pipeline.run(files=["contract.pdf"])

# Wait for the composed Data Package
pkg = job.wait_for_completion()

# Structured, summarized results
print(pkg.document.markdown)                       # Clean extracted text
print(pkg.entities.summary)                        # {"total": 142, "by_type": {"PERSON": 23, ...}}
print(pkg.knowledge_graph.summary.total_relations)  # 156
print(pkg.quality.confidence.entity_avg_confidence)  # 0.87

# Download as organized ZIP archive
pkg.download_archive("./output/contract_results.zip")

# Or save automatically when waiting:
pkg = job.wait_for_completion(save_to_disk="./output/contract_results.zip")

When you provide only files with no steps parameter, the intelligent default pipeline runs automatically: Document Intelligence → Entity Extraction → Knowledge Graph. No configuration required.

The Data Package

Every pipeline returns a DataPackage -- a structured, composed result with organized sections and quality metrics. This is not a raw JSON dump; it is a curated data deliverable.

SectionContentsPresent When
pkg.documentMarkdown text, per-page breakdowns, document metadataDocument Intelligence ran
pkg.entitiesEntity list + summary (total, by_type, avg_confidence)Entity Extraction ran
pkg.knowledge_graphEntities, relations, graph summary, type distributionsRelation Extraction ran
pkg.redactionRedacted text, PII list, summary by typeRedaction ran
pkg.qualityPer-stage report, confidence scores, processing time, costAlways

ZIP Archive

Download all results as an organized archive:

Download archive
# Download after completion
pkg.download_archive("./results.zip")

# Or save automatically during wait
pkg = job.wait_for_completion(save_to_disk="./results.zip")
Archive structure
Legal_Contract_Ingestion/
  README.md              # Human-readable processing summary
  document.md            # Full markdown text
  entities.json          # Entity list + summary
  knowledge_graph.json   # Graph data + summary
  redaction.json         # Redaction results (if run)
  quality_report.json    # Processing report & confidence
  metadata.json          # Pipeline config & timing
  pages/
    page_001.md
    page_002.md
    ...

Data Consolidation

Merge all pipeline outputs into a single, document-centric JSON with zero redundancy. Markdown text appears exactly once; every service's output is nested under its document.

Merge into consolidated JSON
# Merge all outputs into a single document-centric JSON
merged = pkg.merge(save_to="./output/consolidated.json")

# Access consolidated data
for doc in merged["documents"]:
    print(doc["filename"])
    print(doc["markdown"][:200])           # Extracted text (once, no redundancy)
    print(doc["entities"])                  # Entities for this document
    print(doc["knowledge_graph"])           # Graph for this document

# Global summary
print(merged["summary"]["entities"]["total"])
print(merged["summary"]["relations"]["total"])
print(merged["summary"]["cost_usd"])

merge() runs entirely client-side. It produces a single JSON ready for downstream systems -- no repeated inputs, no wasted tokens.

Smart Defaults

When you provide only files with no steps parameter, the intelligent default pipeline runs automatically:

1Document Intelligence
2Entity Extraction
3Knowledge Graph

This covers the most common use case: turning documents into structured, searchable knowledge. Override with explicit steps when you need different behavior.

Step Configuration

Use friendly step names. Steps are automatically sorted into the correct execution order.

Step NameAliasesDescription
doc_inteldocument_intelligence, ocrDocument processing with layout detection, table extraction, and chart recognition
redactionredactPII detection and masking
extractionextractZero-shot entity extraction
knowledge_graphgraph, ontologyRelation extraction and knowledge graph construction with entity resolution
compressioncompressToken compression for cost-efficient downstream use
embedding--Dense vector embeddings (Matryoshka dimensions)
colbert--Token-level ColBERT embeddings for neural retrieval
colpali--Vision-language ColPali embeddings
Explicit step configurationPython
job = client.pipeline.run(
    name="Legal Contract Ingestion",
    files=["contract.pdf"],
    steps={
        "doc_intel": {"mode": "performance"},
        "redaction": {"mode": "balanced", "threshold": 0.5},
        "extraction": {"threshold": 0.3, "user_labels": ["party", "clause"]},
        "knowledge_graph": {"resolve_entities": True},
    },
)
pkg = job.wait_for_completion(poll_interval=5.0)

DAG Execution Model

The pipeline executes services as a directed acyclic graph (DAG), not a linear chain. Independent branches run in parallel for maximum throughput.

Execution order
                    ┌─── extraction ──── ontology
                    │
document_intelligence ─┼─── redaction
                    │
                    ├─── compression
                    │
                    ├─── embedding
                    │
                    ├─── colbert
                    │
                    └─── colpali

You declare which services you want. The pipeline handles ordering, dependency injection, and parallel execution automatically. For example, extraction and compression run in parallel since both depend only on document_intelligence.

Fluent Builder

For power users who prefer a fluent API:

PipelineBuilderPython
from latence import PipelineBuilder

config = (
    PipelineBuilder()
    .doc_intel(mode="performance")
    .redaction(mode="balanced")
    .extraction(threshold=0.3, user_labels=["person", "org"])
    .ontology(resolve_entities=True)
    .build()
)

job = client.pipeline.submit(config, files=["contract.pdf"], name="My Pipeline")
pkg = job.wait_for_completion()

YAML Configuration

Load pipeline configuration from a YAML file for version-controlled, reproducible pipelines:

pipeline.yamlYAML
steps:
  document_intelligence:
    mode: performance
  extraction:
    label_mode: generated
    user_labels: [person, organization]
  ontology:
    resolve_entities: true
Load and runPython
from latence import PipelineBuilder

config = PipelineBuilder.from_yaml("pipeline.yaml")
job = client.pipeline.submit(config, files=["contract.pdf"])
pkg = job.wait_for_completion()

Async Job Handling

Pipelines are async by nature. Submit and come back later. Data flows from service to service on the backend without you waiting.

Fire and Forget

pipeline.run() returns a Job handle immediately. No blocking.

Status Tracking

job.status() shows current stage, progress, and per-stage details.

Collect Results

job.wait_for_completion() blocks until done and returns a DataPackage.

Fire and check laterPython
# Submit and return immediately
job = client.pipeline.run(files=["big_archive.pdf"])
print(f"Submitted: {job.id}")  # pipe_abc123

# ... do other work ...

# Check progress
status = job.status()
print(f"Stage: {status.current_service} ({status.stages_completed}/{status.total_stages})")

# When ready, collect results
pkg = job.wait_for_completion()

# Or cancel if no longer needed
job.cancel()
Async/awaitPython
from latence import AsyncLatence

async with AsyncLatence(api_key="lat_xxx") as client:
    job = await client.pipeline.run(files=["contract.pdf"])
    pkg = await job.wait_for_completion(save_to_disk="./results.zip")
    print(pkg.document.markdown)

Job Statuses

Pipeline jobs transition through these statuses:

StatusMeaning
QUEUEDWaiting to start
IN_PROGRESSProcessing -- check current_service for stage
COMPLETEDAll stages finished successfully
CACHED / PULLEDResults served from cache or storage
RESUMABLEFailed mid-pipeline; completed stages are checkpointed
FAILEDPipeline failed (not resumable)
CANCELLEDCancelled by user

Resumable Pipelines

If a pipeline fails partway through, completed stages are checkpointed. The job enters RESUMABLE status, and you can restart from the last checkpoint instead of re-processing everything.

Resume from checkpointPython
from latence import JobError

try:
    pkg = job.wait_for_completion()
except JobError as e:
    if e.is_resumable:
        # Restart from the last completed checkpoint
        pkg = job.resume().wait_for_completion()
    else:
        raise

Progress Callbacks

Track pipeline progress in real time with the on_progress callback:

on_progress callbackPython
pkg = job.wait_for_completion(
    poll_interval=5.0,
    timeout=1800.0,
    on_progress=lambda status, elapsed: print(f"  {status} ({elapsed:.0f}s)"),
)

The callback receives the current status string and elapsed seconds on each poll.

Error Handling

The SDK provides a structured exception hierarchy for pipeline errors:

Exception handlingPython
from latence import (
    AuthenticationError,
    InsufficientCreditsError,
    RateLimitError,
    JobError,
    JobTimeoutError,
    TransportError,
    PipelineValidationError,
)

try:
    job = client.pipeline.run(files=["doc.pdf"])
    pkg = job.wait_for_completion(timeout=600)
except AuthenticationError:
    print("Invalid API key")
except InsufficientCreditsError:
    print("No credits remaining")
except RateLimitError as e:
    print(f"Rate limited -- retry after {e.retry_after}s")
except JobTimeoutError as e:
    print(f"Pipeline {e.job_id} did not finish in time")
except JobError as e:
    if e.is_resumable:
        pkg = job.resume().wait_for_completion()
    else:
        print(f"Pipeline failed: {e.message}")
except TransportError:
    print("Network / connection error")
except PipelineValidationError as e:
    print(f"Invalid pipeline config: {e.errors}")

The SDK automatically retries on HTTP 429 and 5xx with exponential backoff and jitter. TransportError is the base class for APIConnectionError and APITimeoutError.

Pipeline Patterns

Document → Knowledge Graph (Default)
Document Intelligence → Entity Extraction → Knowledge Graph

PDF/image → text → entities → relations and knowledge graph. This is the smart default when you provide only files.

Compliance Pipeline
Document Intelligence → Redaction → Entity Extraction → Knowledge Graph

Process documents, remove PII first, then extract entities and build graphs from clean text.

Text → Knowledge Graph
Entity Extraction → Knowledge Graph

For text input (no document processing needed). Extract entities and build a knowledge graph directly.

Invalid: Missing Document Processing
Entity Extraction → Knowledge Graph (with file input)

Entity Extraction expects text, not files. Add doc_intel first, or use smart defaults which handle this automatically.

Service Compatibility

Each service has specific input/output types. The pipeline validates chains before execution.

ServiceInputOutput
document_intelligencefile imagetext
redactiontexttext
extractiontextentities
ontologytext entitiesknowledge_graph
compressiontexttext
embeddingtextvectors
colberttexttoken_vectors
colpalitext imagevision_vectors

Dependency Rule

knowledge_graph (relation extraction) requiresextraction to run first -- it needs entities as input. The pipeline validates this before execution. With smart defaults, this is handled automatically.

API Reference

POST/api/v1/pipeline/execute
Request Body
{
  "services": [
    {"service": "document_intelligence", "config": {"mode": "performance"}},
    {"service": "extraction", "config": {"threshold": 0.3}},
    {"service": "ontology"}
  ],
  "input": {
    "file_base64": "...",
    "filename": "document.pdf"
  },
  "store_intermediate": true,
  "name": "Legal Contract Ingestion"
}
Response
{
  "job_id": "pipe_abc123def456",
  "poll_url": "/api/v1/pipeline/pipe_abc123def456",
  "services": ["document_intelligence", "extraction", "ontology"],
  "name": "Legal Contract Ingestion",
  "message": "Pipeline submitted. Poll the status URL for progress.",
  "retention": "48 hours"
}
GET/api/v1/pipeline/{job_id}
Response (in progress)
{
  "job_id": "pipe_abc123def456",
  "status": "IN_PROGRESS",
  "current_stage": 1,
  "current_service": "extraction",
  "stages_completed": 1,
  "total_stages": 3
}
GET/api/v1/pipeline/{job_id}/result
Response (completed)
{
  "job_id": "pipe_abc123def456",
  "status": "COMPLETED",
  "output_url": "https://...",
  "download_url": "https://...",
  "output_size_bytes": 145280,
  "execution_summary": {
    "total_stages": 3,
    "completed_stages": 3,
    "total_cost_usd": 0.16
  }
}
DELETE/api/v1/pipeline/{job_id}

Cancel a running pipeline job. Already-completed stages are not rolled back.

Response
{
  "job_id": "pipe_abc123def456",
  "status": "CANCELLED",
  "message": "Pipeline job cancelled"
}

Troubleshooting

Validation Error: Input type mismatch

Cause: Your input type doesn't match the first service's expected input.

Fix: Use client.pipeline.run(files=[...]) with smart defaults, which handles this automatically. Or add doc_intel as the first step for file input.

Validation Error: Missing dependency

Cause: knowledge_graph requires extraction.

Fix: Add extraction before knowledge_graph in your steps, or use smart defaults which include both.

JobTimeoutError

Cause: Pipeline did not complete within the timeout period (default: 30 minutes).

Fix: Increase timeout: job.wait_for_completion(timeout=3600). For very large documents, consider splitting into batches.

Pricing

Pipeline processing is billed per page. The cost includes all stages in the pipeline.

Per-page pricing: Each page processed through the pipeline is billed at a flat per-page rate. This includes document intelligence, entity extraction, knowledge graph construction, and any other stages you configure. No additional orchestration fees. Check your dashboard for current rates and usage.

latence-python SDK v0.2

Pipeline-first SDK with DataPackage, Job handles, and smart defaults

View on GitHub

Next Step: Dataset Intelligence

Feed pipeline outputs into Dataset Intelligence for corpus-level analysis — entity resolution, knowledge graph construction with RotatE link prediction, and ontology induction. Supports incremental ingestion so you can append new documents without reprocessing.

Tier 1

Enrichment

Semantic feature vectors via EmbeddingGemma. CPU-only, fast.

$1.00 / 1K pages

Tier 2

Knowledge Graph

Entity resolution, deduplication, RotatE link prediction.

$10.00 / 1K pages

Tier 3

Ontology

Concept clustering, hierarchy induction, SHACL shapes.

$50.00 / 1K pages

# Pipeline → Dataset Intelligence
di = client.experimental.dataset_intelligence_service

# Create a new dataset from pipeline output
job = di.run(input_data=pipeline_output, return_job=True)
# Poll status at GET /api/v1/pipeline/{job.job_id}

# Append new documents later (incremental)
delta = di.run(input_data=new_output, dataset_id="ds_...", return_job=True)

Ready to build your pipeline?

Install the SDK and submit your first documents in under a minute.