Python SDK¶
The official Python SDK for InCheck. One client over the proprietary document-grounding engine plus the EMS knowledge layer — typed responses, sync + async + streaming. It is not RAG; it is our IP, and it is materially more accurate than generic retrieval pipelines.
Requires Python 3.10+.
Configure¶
export INCHECK_API_KEY="incheck_prod_..."
# Pick an environment (production is the default):
export INCHECK_ENVIRONMENT="staging" # api-acceptance.incheck.ai
# or override fully:
# export INCHECK_BASE_URL="https://my-internal-proxy.example/incheck"
Resolution priority for the base URL (high → low):
base_url=passed toClient(...)INCHECK_BASE_URLenv varenvironment=passed toClient(...)INCHECK_ENVIRONMENTenv var- Default
https://api.incheck.ai
EMS mode — no setup, just chat¶
from incheck import Client
with Client() as client:
reply = client.chat.send(
"Adult dose of atropine for symptomatic bradycardia?",
scope="ALS",
state="Massachusetts",
)
print(reply.content)
That's it — no org_id, no Pod, no onboarding. The model answers from
general EMS knowledge under the scope/state you specify.
Unified mode — chat with your Pod¶
Onboard one or more documents into a Pod (one Pod per org_id), then
chat against it.
from incheck import Client
with Client() as client:
namespace = client.documents.list_orgs().filtered_by
org_id = f"{namespace}_dispatch"
# Onboard the Pod — initiate → upload → complete → poll, in one call.
status = client.documents.upload(
org_id,
files=["./dispatch_sop.pdf", "./policies.docx"],
)
print("processed:", status.progress.processed_pages, "pages")
# Chat against the Pod
reply = client.chat.send(
"What's our hazmat escalation policy?",
org_id=org_id,
user_id="alice@hospital.org",
)
print(reply.content)
A Pod holds multiple files, all queried together. Add or replace
files later with another documents.upload(...) call, or use the
explicit initiate_update / complete_update pair for finer control.
How does the document grounding work?
The engine — extraction, structuring, grounding, retrieval-time decisioning — is our IP. It is not RAG; it materially outperforms off-the-shelf retrieval pipelines on accuracy and faithfulness. The public contract you see (upload, poll, query) is the whole surface. For deeper guarantees, custom evaluations, or a tuned pipeline for your domain, talk to us.
Streaming¶
Both modes support streaming. The generator yields a ChatChunk per
SSE event and terminates on type='complete'.
for chunk in client.chat.stream("Summarize the SOP.", org_id=org_id):
if chunk.content:
print(chunk.content, end="", flush=True)
EMS streaming is identical — just omit org_id:
for chunk in client.chat.stream("List three scene-safety bullets."):
if chunk.content:
print(chunk.content, end="", flush=True)
Async¶
import asyncio
from incheck import AsyncClient
async def main():
async with AsyncClient() as client:
# EMS
r = await client.chat.send("Adult dose of epinephrine for anaphylaxis?")
print(r.content)
# Unified
await client.documents.upload("acme_dispatch", ["./sop.pdf"])
r = await client.chat.send("Summarize.", org_id="acme_dispatch")
print(r.content)
asyncio.run(main())
AsyncClient mirrors Client one-for-one — every method has the same
signature, just await it.
Document onboarding¶
The convenience helper handles initiate → S3 → complete → poll. Pass
any mix of paths and (filename, file_like) tuples:
status = client.documents.upload(
org_id,
files=[
"./sop.pdf",
("policies.docx", open("./policies.docx", "rb")),
],
batch_size=6, # chunk batch size (1-20)
wait=True, # block until the processing job is terminal
timeout=600, # seconds
poll_interval=10, # seconds
)
For a lower-level flow — for example to surface upload progress in a UI — drive the three steps yourself:
initiated = client.documents.initiate_upload(org_id, ["sop.pdf"])
# POST each file to its presigned URL …
client.documents.complete_upload(initiated.job_id, ["sop.pdf"])
status = client.documents.wait_for_job(initiated.job_id, timeout=600)
Errors¶
Every non-2xx response becomes a typed exception:
from incheck import (
Client,
AuthenticationError,
PermissionError,
ValidationError,
JobFailedError,
JobTimeoutError,
RateLimitError,
)
with Client() as client:
try:
client.documents.upload("royal_dispatch", ["./sop.pdf"])
except PermissionError as e:
print("namespace mismatch:", e)
except ValidationError as e:
print("bad request:", e)
except JobFailedError as e:
print("job failed:", e.job_id, e.status)
except JobTimeoutError as e:
print("still pending:", e.last_status)
except RateLimitError as e:
print(f"slow down; retry after {e.retry_after}s")
except AuthenticationError:
print("check your API key")
Full hierarchy:
IncheckError
├── AuthenticationError (401)
├── PermissionError (403)
├── NotFoundError (404)
├── ValidationError (400, 422)
├── RateLimitError (429, has .retry_after)
├── APIError (5xx)
├── APIConnectionError (network)
├── JobFailedError (.job_id, .status)
└── JobTimeoutError (.job_id, .last_status)
Coverage¶
| Surface | Methods |
|---|---|
client.chat |
send, stream (both modes; org_id keyword-only, optional) |
client.documents |
list_orgs, list, version, upload, initiate_upload, complete_upload, initiate_update, complete_update, job, wait_for_job, delete, delete_version |