The Pipeline¶
Each stage in detail: parsing (and chunking), extraction (entity then relationship), dedup, resolution (to canonical or provisional), and bundle building. The framework exposes interfaces for each; domain pipelines (e.g. medlit) implement them.
Two-pass architecture¶
The knowledge graph ingestion pipeline uses a two-pass architecture to transform raw documents into structured knowledge:
- Pass 1 (Entity Extraction): Parse documents, extract entity mentions, and resolve them to canonical or provisional entities.
- Pass 2 (Relationship Extraction): Identify relationships (edges) between resolved entities within each document.
This separation allows the system to build a consistent entity vocabulary before attempting relationship extraction, which improves accuracy and enables cross-document entity linking.
The pipeline consists of pluggable components for parsing, extraction, resolution, and embedding generation. Each component is defined as an abstract interface, allowing domain-specific implementations.
For a complete, real-world example, see the examples/sherlock directory.
Component Interfaces¶
Each interface is designed to be stateless and async-first, enabling parallel processing and easy testing with mock implementations.
DocumentParserInterface¶
Converts raw document bytes into structured BaseDocument instances. This is the entry point for document ingestion—the parser handles format detection, content extraction, and metadata identification.
Parsers are responsible for: - Decoding raw bytes based on content type (UTF-8 text, PDF binary, HTML, etc.) - Extracting document structure (title, sections, paragraphs) - Identifying metadata (author, date, source identifiers) - Normalizing content for downstream extraction
from kgraph.pipeline import DocumentParserInterface
from kgraph.document import BaseDocument
class DocumentParserInterface(ABC):
async def parse(
self,
raw_content: bytes,
content_type: str,
source_uri: str | None = None,
) -> BaseDocument: ...
Example implementation using an LLM for structure extraction:
class LLMDocumentParser(DocumentParserInterface):
def __init__(self, llm_client, document_class: type[BaseDocument]):
self._llm = llm_client
self._doc_class = document_class
async def parse(
self,
raw_content: bytes,
content_type: str,
source_uri: str | None = None,
) -> BaseDocument:
text = raw_content.decode("utf-8")
# Use LLM to extract structure
response = await self._llm.complete(
f"Extract title and sections from this document:\n\n{text}"
)
return self._doc_class(
document_id=str(uuid.uuid4()),
title=response.title,
content=text,
content_type=content_type,
source_uri=source_uri,
created_at=datetime.now(timezone.utc),
metadata={"sections": response.sections},
)
EntityExtractorInterface¶
Extracts entity mentions from documents. This is the core of Pass 1—identifying text spans that refer to entities of interest.
The extractor produces EntityMention objects representing raw extractions that have not yet been resolved to canonical entities. This separation of extraction and resolution allows different strategies for each phase and enables batch processing optimizations.
from kgraph.pipeline import EntityExtractorInterface
from kgraph import EntityMention
class EntityExtractorInterface(ABC):
async def extract(self, document: BaseDocument) -> list[EntityMention]: ...
EntityMention captures raw extractions before resolution:
mention = EntityMention(
text="aspirin", # Exact text span
entity_type="drug", # Domain-specific type
start_offset=145, # Position in document
end_offset=152,
confidence=0.95, # Extraction confidence
context="patients taking aspirin showed...", # Surrounding text
metadata={"extraction_model": "gpt-4"},
)
Example NER-based extractor:
class SpacyEntityExtractor(EntityExtractorInterface):
def __init__(self, nlp, type_mapping: dict[str, str]):
self._nlp = nlp
self._type_map = type_mapping # spaCy label -> domain type
async def extract(self, document: BaseDocument) -> list[EntityMention]:
doc = self._nlp(document.content)
mentions = []
for ent in doc.ents:
if ent.label_ in self._type_map:
mentions.append(EntityMention(
text=ent.text,
entity_type=self._type_map[ent.label_],
start_offset=ent.start_char,
end_offset=ent.end_char,
confidence=0.9,
))
return mentions
EntityResolverInterface¶
Maps mentions to canonical or provisional entities. Resolution is the critical step that transforms raw text spans into structured knowledge graph nodes.
The resolver must decide whether a mention refers to: - An existing canonical entity already in the knowledge graph - A new canonical entity that can be linked to an external authority (UMLS, DBPedia, etc.) - A provisional entity that requires further evidence before promotion
The returned confidence score enables quality filtering and informs the entity promotion process.
from kgraph.pipeline import EntityResolverInterface
from kgraph.storage import EntityStorageInterface
class EntityResolverInterface(ABC):
async def resolve(
self,
mention: EntityMention,
existing_storage: EntityStorageInterface,
) -> tuple[BaseEntity, float]: ...
async def resolve_batch(
self,
mentions: Sequence[EntityMention],
existing_storage: EntityStorageInterface,
) -> list[tuple[BaseEntity, float]]: ...
Resolution strategy:
- Search existing entities by name/synonym
- Search by embedding similarity if available
- Query external authority using
CanonicalIdLookupInterface(UMLS, DBPedia, etc.) - Create provisional entity if no match found
The resolver can use a CanonicalIdLookupInterface implementation to look up canonical IDs from external authorities. See Canonical IDs and Entity Resolution for details.
class HybridEntityResolver(EntityResolverInterface):
def __init__(
self,
embedding_gen: EmbeddingGeneratorInterface,
authority_client, # External ID authority
entity_factory, # Creates domain-specific entities
):
self._embedder = embedding_gen
self._authority = authority_client
self._factory = entity_factory
async def resolve(
self,
mention: EntityMention,
existing_storage: EntityStorageInterface,
) -> tuple[BaseEntity, float]:
# Try name match first
existing = await existing_storage.find_by_name(
mention.text, mention.entity_type, limit=1
)
if existing:
return existing[0], 0.95
# Try embedding similarity
embedding = await self._embedder.generate(mention.text)
similar = await existing_storage.find_by_embedding(
embedding, threshold=0.9, limit=1
)
if similar:
return similar[0][0], similar[0][1]
# Try external authority (using CanonicalIdLookupInterface)
canonical_id_obj = await self._authority.lookup(
mention.text, mention.entity_type
)
if canonical_id_obj:
entity = self._factory.create_canonical(
canonical_id=canonical_id_obj.id,
name=mention.text, # Use mention text as name
entity_type=mention.entity_type,
embedding=embedding,
)
return entity, mention.confidence
# Create provisional
entity = self._factory.create_provisional(
name=mention.text,
entity_type=mention.entity_type,
embedding=embedding,
confidence=mention.confidence,
)
return entity, mention.confidence
RelationshipExtractorInterface¶
Extracts relationships between entities. This is Pass 2 of the pipeline—identifying the edges that connect entity nodes in the knowledge graph.
Relationship extraction operates on resolved entities, not raw mentions, which enables: - Consistent entity references across relationships - Cross-document relationship aggregation - Confidence scoring based on entity resolution quality
The predicate vocabulary is typically domain-specific (e.g., 'treats', 'causes' for medical; 'cites', 'amends' for legal).
from kgraph.pipeline import RelationshipExtractorInterface
class RelationshipExtractorInterface(ABC):
async def extract(
self,
document: BaseDocument,
entities: Sequence[BaseEntity],
) -> list[BaseRelationship]: ...
Example LLM-based extractor:
class LLMRelationshipExtractor(RelationshipExtractorInterface):
def __init__(self, llm_client, relationship_factory, valid_predicates: list[str]):
self._llm = llm_client
self._factory = relationship_factory
self._predicates = valid_predicates
async def extract(
self,
document: BaseDocument,
entities: Sequence[BaseEntity],
) -> list[BaseRelationship]:
# Build entity context for LLM
entity_list = "\n".join(
f"- {e.name} ({e.get_entity_type()}): {e.entity_id}"
for e in entities
)
prompt = f"""
Given these entities:
{entity_list}
And this document:
{document.content[:2000]}
Extract relationships using these predicates: {self._predicates}
Format: subject_id | predicate | object_id | confidence
"""
response = await self._llm.complete(prompt)
return self._parse_relationships(response, document.document_id)
EmbeddingGeneratorInterface¶
Generates semantic vector embeddings for entity similarity. Embeddings enable:
- Entity resolution: Finding existing entities with similar meaning but different surface forms
- Duplicate detection: Identifying canonical entities that should be merged
- Semantic search: Querying the knowledge graph by meaning rather than exact text
The dimension property is critical for storage backends that need to pre-allocate vector columns or configure similarity indices.
from kgraph.pipeline.embedding import EmbeddingGeneratorInterface
class EmbeddingGeneratorInterface(ABC):
@property
def dimension(self) -> int: ...
async def generate(self, text: str) -> tuple[float, ...]: ...
async def generate_batch(
self, texts: Sequence[str]
) -> list[tuple[float, ...]]: ...
Example OpenAI implementation:
class OpenAIEmbedding(EmbeddingGeneratorInterface):
def __init__(self, client, model: str = "text-embedding-3-small"):
self._client = client
self._model = model
self._dim = 1536 # Model-dependent
@property
def dimension(self) -> int:
return self._dim
async def generate(self, text: str) -> tuple[float, ...]:
response = await self._client.embeddings.create(
model=self._model,
input=text,
)
return tuple(response.data[0].embedding)
async def generate_batch(
self, texts: Sequence[str]
) -> list[tuple[float, ...]]:
response = await self._client.embeddings.create(
model=self._model,
input=list(texts),
)
return [tuple(d.embedding) for d in response.data]
Assembling the Pipeline¶
The IngestionOrchestrator connects all pipeline components and manages the two-pass ingestion workflow. It handles:
- Document parsing and storage
- Pass 1: Entity extraction, resolution, and storage
- Pass 2: Relationship extraction and storage
- Entity promotion (provisional → canonical based on usage thresholds)
Connect components via IngestionOrchestrator:
from kgraph import IngestionOrchestrator
orchestrator = IngestionOrchestrator(
domain=my_domain,
parser=my_parser,
entity_extractor=my_entity_extractor,
entity_resolver=my_resolver,
relationship_extractor=my_rel_extractor,
embedding_generator=my_embedder,
entity_storage=entity_storage,
relationship_storage=relationship_storage,
document_storage=document_storage,
)
# Ingest a single document
result = await orchestrator.ingest_document(
raw_content=document_bytes,
content_type="text/plain",
source_uri="https://example.com/doc.txt",
)
# Batch ingestion
results = await orchestrator.ingest_batch([
(doc1_bytes, "text/plain", "source1"),
(doc2_bytes, "application/pdf", "source2"),
])
# Run promotion after ingestion
promoted = await orchestrator.run_promotion()