Skip to content

Appendix A: Reference Implementation Notes

This appendix documents implementation details of the medlit reference project: the ingestion pipeline's work queue, artifact files, parallelism, and shared code. These details are specific to one implementation and will evolve; the principles behind them are in Chapters 11 and 12.

Ingestion Pipeline: Work Queue

A Postgres table coordinates all work:

CREATE TABLE ingest_jobs (
    pmcid          TEXT PRIMARY KEY,
    status         TEXT NOT NULL DEFAULT 'pending',
                   -- pending → fetching → fetched
                   --        → extracting → extracted
                   --        → ingesting → done | failed
    raw_text       TEXT,
    raw_extraction JSONB,
    error          TEXT,
    attempts       INT DEFAULT 0,
    updated_at     TIMESTAMPTZ DEFAULT now()
);

Workers claim jobs with:

SELECT * FROM ingest_jobs
WHERE status = 'pending'
FOR UPDATE SKIP LOCKED
LIMIT 1;

SKIP LOCKED provides a distributed work queue with no additional infrastructure: no races, crash-safe, and naturally load-balanced across any number of workers or server instances. A crashed worker releases its lock when its connection drops and the job is retried by the next available worker. The attempts counter enables dead-lettering after N failures, leaving the job at status='failed' with the error recorded.

Progress is trivially observable:

SELECT status, count(*) FROM ingest_jobs GROUP BY status;

Paper Artifact Files

After extract_stage stores raw_extraction in Postgres, it also writes a per-paper artifact file using atomic write-then-rename:

tmp = artifact_dir / f".tmp_{pmcid}_{os.getpid()}.json"
tmp.write_text(json.dumps({
    "pmcid": pmcid,
    "raw_text": raw_text,
    "raw_extraction": raw_extraction,
}, indent=2))
tmp.rename(artifact_dir / f"paper_{pmcid}.json")

Including both raw_text and raw_extraction makes each file a self-contained record of everything that happened for that paper.

These files serve three purposes:

Recovery. If Postgres is lost, the artifact directory is sufficient to repopulate the graph. For extraction-side schema changes (new entity types, revised prompts), reset to status='fetched' and re-run from raw_text. For ingest-side changes only, reset to status='extracted' and re-run from raw_extraction. In both cases the identity server runs fresh and rebuilds all merges correctly.

Auditability. A human-readable record of what was fetched and what the LLM extracted, independent of any subsequent graph operations.

Retraction support. When a paper is retracted, its artifact file provides an exact record of every claim it contributed to the graph. Provenance-aware graph queries can then down-weight or exclude retracted sources. The artifact is the supporting record for that process; a retracted flag on ingest_jobs is the mechanism.

The artifact directory can be a local volume, NFS mount, or S3-compatible store.

Repopulation from artifacts:

python -m medlit.scripts.ingest \
    --from-artifacts ./artifacts/ --ingest-workers 16

Reads paper_*.json from the directory, inserts each into ingest_jobs with the appropriate status and fields populated from the file (INSERT ... ON CONFLICT DO NOTHING skips papers already in the table), then lets the normal worker pool handle the rest.

Parallelism

Different stages have different bottlenecks and should have independently tunable worker counts:

Stage Bottleneck
Fetch PMC API rate limits
Extract LLM token budget / cost
Ingest Postgres / identity server locks

The batch runner accepts separate concurrency limits per stage. Running multiple instances of the batch runner across machines is safe; all instances share the same ingest_jobs table and coordinate via SKIP LOCKED.

Batch Ingestion

The batch pipeline is driven by a shell script that sequences the four stages in order:

```bash