Skip to content

Python SDK

The official Python SDK for InCheck. One client over the proprietary document-grounding engine plus the EMS knowledge layer — typed responses, sync + async + streaming. It is not RAG; it is our IP, and it is materially more accurate than generic retrieval pipelines.

pip install incheck
# or
uv add incheck

Requires Python 3.10+.

Configure

export INCHECK_API_KEY="incheck_prod_..."

# Pick an environment (production is the default):
export INCHECK_ENVIRONMENT="staging"        # api-acceptance.incheck.ai
# or override fully:
# export INCHECK_BASE_URL="https://my-internal-proxy.example/incheck"

Resolution priority for the base URL (high → low):

  1. base_url= passed to Client(...)
  2. INCHECK_BASE_URL env var
  3. environment= passed to Client(...)
  4. INCHECK_ENVIRONMENT env var
  5. Default https://api.incheck.ai

EMS mode — no setup, just chat

from incheck import Client

with Client() as client:
    reply = client.chat.send(
        "Adult dose of atropine for symptomatic bradycardia?",
        scope="ALS",
        state="Massachusetts",
    )
    print(reply.content)

That's it — no org_id, no Pod, no onboarding. The model answers from general EMS knowledge under the scope/state you specify.

Unified mode — chat with your Pod

Onboard one or more documents into a Pod (one Pod per org_id), then chat against it.

from incheck import Client

with Client() as client:
    namespace = client.documents.list_orgs().filtered_by
    org_id = f"{namespace}_dispatch"

    # Onboard the Pod — initiate → upload → complete → poll, in one call.
    status = client.documents.upload(
        org_id,
        files=["./dispatch_sop.pdf", "./policies.docx"],
    )
    print("processed:", status.progress.processed_pages, "pages")

    # Chat against the Pod
    reply = client.chat.send(
        "What's our hazmat escalation policy?",
        org_id=org_id,
        user_id="alice@hospital.org",
    )
    print(reply.content)

A Pod holds multiple files, all queried together. Add or replace files later with another documents.upload(...) call, or use the explicit initiate_update / complete_update pair for finer control.

How does the document grounding work?

The engine — extraction, structuring, grounding, retrieval-time decisioning — is our IP. It is not RAG; it materially outperforms off-the-shelf retrieval pipelines on accuracy and faithfulness. The public contract you see (upload, poll, query) is the whole surface. For deeper guarantees, custom evaluations, or a tuned pipeline for your domain, talk to us.

Streaming

Both modes support streaming. The generator yields a ChatChunk per SSE event and terminates on type='complete'.

for chunk in client.chat.stream("Summarize the SOP.", org_id=org_id):
    if chunk.content:
        print(chunk.content, end="", flush=True)

EMS streaming is identical — just omit org_id:

for chunk in client.chat.stream("List three scene-safety bullets."):
    if chunk.content:
        print(chunk.content, end="", flush=True)

Async

import asyncio
from incheck import AsyncClient

async def main():
    async with AsyncClient() as client:
        # EMS
        r = await client.chat.send("Adult dose of epinephrine for anaphylaxis?")
        print(r.content)

        # Unified
        await client.documents.upload("acme_dispatch", ["./sop.pdf"])
        r = await client.chat.send("Summarize.", org_id="acme_dispatch")
        print(r.content)

asyncio.run(main())

AsyncClient mirrors Client one-for-one — every method has the same signature, just await it.

Document onboarding

The convenience helper handles initiate → S3 → complete → poll. Pass any mix of paths and (filename, file_like) tuples:

status = client.documents.upload(
    org_id,
    files=[
        "./sop.pdf",
        ("policies.docx", open("./policies.docx", "rb")),
    ],
    batch_size=6,          # chunk batch size (1-20)
    wait=True,             # block until the processing job is terminal
    timeout=600,           # seconds
    poll_interval=10,      # seconds
)

For a lower-level flow — for example to surface upload progress in a UI — drive the three steps yourself:

initiated = client.documents.initiate_upload(org_id, ["sop.pdf"])
# POST each file to its presigned URL …
client.documents.complete_upload(initiated.job_id, ["sop.pdf"])
status = client.documents.wait_for_job(initiated.job_id, timeout=600)

Errors

Every non-2xx response becomes a typed exception:

from incheck import (
    Client,
    AuthenticationError,
    PermissionError,
    ValidationError,
    JobFailedError,
    JobTimeoutError,
    RateLimitError,
)

with Client() as client:
    try:
        client.documents.upload("royal_dispatch", ["./sop.pdf"])
    except PermissionError as e:
        print("namespace mismatch:", e)
    except ValidationError as e:
        print("bad request:", e)
    except JobFailedError as e:
        print("job failed:", e.job_id, e.status)
    except JobTimeoutError as e:
        print("still pending:", e.last_status)
    except RateLimitError as e:
        print(f"slow down; retry after {e.retry_after}s")
    except AuthenticationError:
        print("check your API key")

Full hierarchy:

IncheckError
├── AuthenticationError      (401)
├── PermissionError          (403)
├── NotFoundError            (404)
├── ValidationError          (400, 422)
├── RateLimitError           (429, has .retry_after)
├── APIError                 (5xx)
├── APIConnectionError       (network)
├── JobFailedError           (.job_id, .status)
└── JobTimeoutError          (.job_id, .last_status)

Coverage

Surface Methods
client.chat send, stream (both modes; org_id keyword-only, optional)
client.documents list_orgs, list, version, upload, initiate_upload, complete_upload, initiate_update, complete_update, job, wait_for_job, delete, delete_version

Full type reference → HTTP API reference →