Part III: Building a Backend¶
Chapter 8: The GraphDbInterface ABC¶
There is a tension in interface design between expressiveness and simplicity. An expressive interface gives the caller more power -- more ways to phrase a request, more control over execution, more options to tune. A simple interface gives the implementor less to build and the caller fewer things to get wrong. The usual engineering response is to find a balance, to offer enough expressiveness without crossing into complexity.
BFS-QL resolves this tension by separating it into two layers. The
GraphDbInterface abstract base class is the implementor-facing interface.
It is deliberately primitive -- eight methods, all basic graph navigation,
no traversal intelligence. The BFS-QL server layer is the caller-facing
interface, where all the expressiveness lives: multi-seed BFS, stub/full
filtering, topology mode, caching, the full six-tool surface. These are
not in tension because they are in different places. The ABC is simple so
that backends are easy to write. The server is expressive because it has to
be. Neither layer compromises the other.
Eight Methods¶
The complete interface:
class GraphDbInterface(ABC):
@abstractmethod
async def search_entities(
self, query: str, node_types: list[str] | None = None
) -> list[EntityStub]:
"""Resolve a name or alias to candidate stubs."""
@abstractmethod
async def edges_from(self, entity_id: str) -> list[Edge]:
"""Return all outgoing edges from the given entity."""
@abstractmethod
async def edges_to(self, entity_id: str) -> list[Edge]:
"""Return all incoming edges to the given entity."""
@abstractmethod
async def get_node(self, entity_id: str) -> Node:
"""Return the node record for the given entity ID."""
@abstractmethod
async def metadata_for_node(
self, entity_id: str
) -> dict[str, Any]:
"""Return all available metadata for the entity."""
@abstractmethod
async def metadata_for_edge(
self, edge: Edge
) -> dict[str, Any]:
"""Return full metadata, including provenance."""
@abstractmethod
async def entity_types(self) -> list[str]:
"""Return valid entity type names in this graph."""
@abstractmethod
async def predicates(self) -> list[str]:
"""Return valid predicate names in this graph."""
Three pairs and two singletons. edges_from / edges_to are the traversal
primitives -- directed graph navigation in both directions. get_node /
metadata_for_node separate identity from detail: the first returns just
the entity ID and type, the second returns everything else. metadata_for_edge
pairs with the traversal primitives to provide full provenance. Then the two
singletons: search_entities maps natural-language names to canonical IDs,
and entity_types / predicates expose the graph's own vocabulary.
The separation of get_node from metadata_for_node is deliberate. During
BFS expansion, the engine calls both concurrently for nodes that need full
records, but calls only get_node for nodes that will become stubs. A backend
that fetches metadata lazily -- or from a separate service -- can implement
both cheaply without conflating the two concerns.
What the Interface Does Not Contain¶
There is no bfs_query method. There is no count_neighbors method. There
is no find_shortest_path method. There is no filter parameter, no hop limit,
no traversal mode.
All of that is in the server layer. The bfs_query function in engine.py
implements multi-seed BFS entirely in terms of edges_from, edges_to,
get_node, and metadata_for_node. The stub/full decision is made at the
server layer. The topology-only mode is a serialization choice at the
server layer. The caching layer wraps the backend transparently.
The consequence is that a backend implementor answers only one question: how do I perform these eight operations against this particular graph store? Not: how do I implement BFS? Not: how do I cache edge lists efficiently? Not: how do I serialize results? Those questions are already answered, once, in the server layer, and the answers apply to every backend automatically.
The Caching Layer¶
CachedGraphDb wraps any backend in a dict-keyed cache at the primitive
level. Every call to edges_from(id) checks a dict before hitting the backend.
Every metadata_for_node(id) is cached after the first fetch. entity_types
and predicates are cached indefinitely -- they are stable for the lifetime
of a session.
class CachedGraphDb(GraphDbInterface):
def __init__(
self, backend: GraphDbInterface, maxsize: int = 1024
) -> None:
self._backend = backend
self._edges_from_cache: dict[str, list[Edge]] = {}
self._node_meta_cache: dict[str, dict[str, Any]] = {}
# ... per-method dicts for all eight methods
async def edges_from(self, entity_id: str) -> list[Edge]:
if entity_id not in self._edges_from_cache:
res = await self._backend.edges_from(entity_id)
self._edges_from_cache[entity_id] = res
return self._edges_from_cache[entity_id]
The critical property is that caching operates at the level where it pays. BFS traversal at depth 2 may visit the same node from multiple directions. Without caching, each visit triggers a backend round-trip. With primitive-level caching, the second visit is a dict lookup. The backend sees each distinct call at most once per session. A multi-hop traversal over a well-connected graph can reduce backend round-trips by an order of magnitude.
Because the cache is transparent -- CachedGraphDb implements
GraphDbInterface -- backends do not implement caching themselves. They
return fresh data on every call. The server layer decides caching policy.
Backends stay simple.
All Methods Are Async¶
Every method in GraphDbInterface is async. This is not a formality.
BFS expansion calls edges_from and edges_to for every node in the
current frontier concurrently, via asyncio.gather. A 2-hop BFS over
a frontier of 40 nodes issues 80 concurrent edge queries. Against a
Postgres backend, these resolve as concurrent connection pool requests.
Against a SPARQL endpoint, they resolve as concurrent HTTP requests.
Against a Neo4j backend, they resolve as concurrent Bolt protocol calls.
A synchronous interface would serialize this work unnecessarily. The async
design is a performance contract: backends that can serve concurrent queries
concurrently will. Backends that cannot (in-memory dicts, file-backed stores)
pay no penalty -- async def with no await inside is just a regular
function in async clothing.
Chapter 9: The Postgres Backend¶
The Postgres backend is the primary BFS-QL backend, not because Postgres is
the only viable graph store -- it is not -- but because it is the natural
target for graphs built with the companion volume's extraction pipeline.
Kgraph writes entities, relationships, and embeddings into Postgres. BFS-QL
reads them through PostgresBackend. The two are designed as a pair; this
chapter covers the reading side.
The Schema¶
PostgresBackend expects the kgraph schema:
entitytable:entity_id(canonical ID),entity_type,name,embedding(pgvector float array),properties(JSON),status,confidence,synonyms(JSON),source,canonical_url.relationshiptable:subject_id,predicate,object_id,confidence,source_documents(JSON),properties(JSON).bundle_evidencetable:relationship_key(string FK of the formsubject_id:predicate:object_id),text_span,confidence,document_id,section.
Entities with status = 'merged' are excluded from all queries. The
kgraph pipeline resolves duplicate entities by merging them into a canonical
representative; merged entities are kept in the table for audit purposes
but are not exposed through BFS-QL.
Connection Pool and Initialization¶
PostgresBackend uses asyncpg for async I/O and manages a connection pool:
@classmethod
async def create(
cls, dsn: str | None = None, ...
) -> "PostgresBackend":
dsn = dsn or os.environ["DATABASE_URL"]
async def _init_conn(conn):
await conn.execute(
"CREATE EXTENSION IF NOT EXISTS vector"
)
await conn.set_type_codec(
"jsonb",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)
pool = await asyncpg.create_pool(
dsn, min_size=2, max_size=10, init=_init_conn
)
return cls(pool, embedding_fn)
The _init_conn callback runs on every new connection in the pool. It
installs the pgvector extension if not present and registers a type codec
that automatically decodes JSONB columns to Python dicts. The vector
type for embeddings is handled separately by casting to text in queries.
PostgresBackend.create is an async classmethod, not __init__. This is
a Python idiom for async initialization: __init__ cannot be async, so
the factory pattern is the clean solution. The create_server() function
accepts PostgresBackend.create as a factory callable -- the server's
lifespan handler calls it inside the running event loop, ensuring the pool
is created in the same loop that will use it.
Entity Search¶
search_entities has two implementations selected at pool creation time:
Vector search (when an embedding_fn is provided):
async def _search_by_vector(
self, query: str, limit: int = 10
) -> list[EntityStub]:
embedding = await self._embedding_fn(query)
embedding_str = (
"[" + ",".join(str(x) for x in embedding) + "]"
)
rows = await conn.fetch(
"""
SELECT entity_id, entity_type
FROM entity
WHERE embedding IS NOT NULL
AND (status IS NULL OR status != 'merged')
ORDER BY embedding::vector <=> $1::vector
LIMIT $2
""",
embedding_str, limit,
)
The <=> operator is pgvector's cosine distance operator. The query embeds
the search string and finds the nearest entity embeddings. Embedding model
consistency — using the same model at ingest time and query time — is
critical; mismatched models produce meaningless distances. This is guaranteed
automatically when the identity server owns all embeddings: it uses the same
model for both, and the query layer never needs to know which one.
Name search (fallback when no embedding_fn is provided):
async def _search_by_name(
self, query: str, limit: int = 10
) -> list[EntityStub]:
pattern = f"%{query}%"
rows = await conn.fetch(
"SELECT entity_id, entity_type FROM entity "
"WHERE name ILIKE $1 "
"AND (status IS NULL OR status != 'merged') "
"ORDER BY name LIMIT $2",
pattern, limit,
)
ILIKE is case-insensitive substring matching. It is less precise than vector search -- a query for "Cushing" matches "Cushing disease," "Cushing syndrome," and any entity whose name contains the substring. For the medlit demo graph, where entity names are short and specific, this is acceptable. For large general-purpose graphs, vector search is preferred.
Edge Traversal¶
The traversal methods are straightforward:
async def edges_from(self, entity_id: str) -> list[Edge]:
rows = await conn.fetch(
"SELECT subject_id, predicate, object_id "
"FROM relationship "
"WHERE subject_id = $1",
entity_id,
)
return [
Edge(
subject=r["subject_id"],
predicate=r["predicate"],
object=r["object_id"]
) for r in rows
]
edges_to is identical with WHERE object_id = $1. Both are called
concurrently for every node in the BFS frontier. The connection pool
handles concurrent acquisition; asyncpg's pool is safe for concurrent use.
Metadata and Evidence¶
metadata_for_edge is the most complex method in the backend. An edge
record in the relationship table carries confidence, source_documents,
and properties. Evidence provenance is in bundle_evidence, keyed by
the string subject_id:predicate:object_id:
async def metadata_for_edge(
self, edge: Edge
) -> dict[str, Any]:
rel_key = f"{edge.subject}:{edge.predicate}:{edge.object}"
evidence_rows = await _fetch_evidence(
conn, rel_row["id"], rel_key
)
...
The _fetch_evidence helper tries two schemas: the test schema (an evidence
table with a UUID foreign key) and the kgserver schema (bundle_evidence
with a string relationship key). This dual-schema support exists because the
integration tests build a minimal schema from scratch while the production
demo data uses the full kgserver schema. Both schemas are handled
transparently.
The resulting metadata dict for a full edge includes confidence, source
document IDs, any relationship properties, and a provenance list with
text spans, confidence scores, and document references. This provenance
is stripped by _slim_result in the server layer before returning BFS
results -- it is available only through describe_entity on a specific
node, which fetches the edge metadata directly.
Chapter 10: The SPARQL Backend¶
The Postgres backend covers kgraph-derived graphs -- graphs that were built by the extraction pipeline and live in a database the developer controls. But there are thousands of knowledge graphs that predate BFS-QL, that were built by other teams for other purposes, and that expose themselves through SPARQL 1.1 endpoints. DBpedia, Wikidata, UniProt, ChEMBL, the Gene Ontology, the NCI Thesaurus -- these are public graphs with public endpoints, accumulated over decades, containing knowledge that no extraction pipeline will soon replicate. A SPARQL backend makes all of them accessible through the same six-tool interface.
The SPARQL Endpoint Model¶
A SPARQL 1.1 endpoint accepts HTTP POST requests with a query parameter
containing a SPARQL query string and returns results as JSON, XML, or CSV.
The endpoint URL is the only configuration. The backend sends queries over
HTTP and parses the JSON binding response format.
The edges_from and edges_to methods translate directly to SPARQL
property path queries:
-- edges_from(entity_id)
SELECT ?predicate ?object WHERE {
<{entity_id}> ?predicate ?object .
FILTER(!isBlank(?object))
}
LIMIT 500
-- edges_to(entity_id)
SELECT ?subject ?predicate WHERE {
?subject ?predicate <{entity_id}> .
FILTER(!isBlank(?subject))
}
LIMIT 500
The FILTER(!isBlank(?object)) clause excludes blank nodes -- anonymous
intermediate nodes that appear in RDF data but have no canonical ID and
cannot be meaningfully referenced in BFS-QL. Blank nodes are a modeling
convenience in RDF; they are a navigational dead end for graph traversal.
URI Normalization¶
SPARQL endpoints represent entities as URIs:
BFS-QL canonical IDs are strings. The SPARQL backend must map between them.
The mapping strategy is endpoint-specific. For DBpedia, the URI prefix
http://dbpedia.org/resource/ maps to the prefix DBpedia:. For Wikidata,
http://www.wikidata.org/entity/ maps to Wikidata:. The backend's
initialization takes a prefix map:
backend = SparqlBackend(
endpoint="https://dbpedia.org/sparql",
prefixes={
"DBpedia": "http://dbpedia.org/resource/",
"DBpedia-owl": "http://dbpedia.org/ontology/",
}
)
Outgoing URIs are expanded to full form before insertion into SPARQL queries. Incoming URIs are compressed using the prefix map. URIs that match no known prefix are included as-is -- they are valid canonical IDs, just opaque to the user.
Schema Discovery¶
entity_types and predicates are the BFS-QL methods that return the
graph's vocabulary. For SPARQL endpoints, these translate to SELECT DISTINCT
queries:
-- entity_types()
SELECT DISTINCT ?type WHERE {
?s a ?type .
}
ORDER BY ?type
LIMIT 200
-- predicates()
SELECT DISTINCT ?pred WHERE {
?s ?pred ?o .
FILTER(?pred != rdf:type)
}
ORDER BY ?pred
LIMIT 500
These queries can be slow on large endpoints -- Wikidata has hundreds of
millions of triples and SELECT DISTINCT over all predicates is not
instantaneous. The CachedGraphDb wrapper handles this: both methods are
cached indefinitely after the first call. The server's lifespan handler
calls them once at startup and caches the results in _state.
For endpoints where SELECT DISTINCT is prohibitively slow, an alternative
is to probe from a known seed: start from a well-connected entity and collect
the entity types and predicates that appear in its neighborhood. This
produces a partial schema -- sufficient for the BFS-QL server to inject
into tool descriptions -- without scanning the entire graph.
search_entities Against SPARQL¶
SPARQL endpoints vary in their full-text search support. Virtuoso (which
backs DBpedia) supports bif:contains for full-text matching. GraphDB
supports Lucene-backed text search. Many endpoints support no full-text
search at all.
The most portable approach is rdfs:label matching with FILTER(CONTAINS(...)):
SELECT ?entity ?type WHERE {
?entity rdfs:label ?label ;
a ?type .
FILTER(CONTAINS(LCASE(?label), LCASE("{query}")))
}
LIMIT 20
This is not fast on large graphs, but it works everywhere and avoids endpoint-specific extensions. For production use against a specific endpoint, the backend should be configured with that endpoint's preferred search mechanism.
Handling Endpoint Variance¶
SPARQL 1.1 is a standard, but implementations differ. Virtuoso requires
DEFINE sql:describe-mode "CBD" for some queries. GraphDB has different
timeout behavior. Stardog enforces stricter blank node handling. Amazon
Neptune does not support all property path expressions.
The SPARQL backend handles this through a small set of configuration knobs:
query timeout (in seconds), result set size limit (LIMIT clause), and a
flag for whether SELECT DISTINCT over the full graph is safe to issue.
These are set at initialization and applied to all generated queries.
The abstraction boundary is clean: an LLM querying a BFS-QL server backed by Virtuoso, GraphDB, or Neptune sees identical behavior. The endpoint variance is confined entirely to the backend implementation.
Chapter 11: The Neo4j Backend¶
Neo4j is a property graph database, not an RDF store. The distinction matters for the implementation, though not for the BFS-QL interface. Where RDF graphs represent everything as triples of URIs and literals, property graphs attach key-value pairs directly to nodes and relationships. A node in Neo4j has a label (or multiple labels) and a set of properties. A relationship has a type and a set of properties. There are no blank nodes; everything is either a node or a named relationship.
For BFS-QL, the mapping from the property graph model to the
GraphDbInterface is direct:
- Node labels →
entity_type - Relationship types → predicates
- Node identity (Neo4j's internal ID or a canonical ID property) → entity ID
- Node properties →
metadata_for_node - Relationship properties →
metadata_for_edge
The implementation requires one configuration decision: which node property
holds the canonical ID. In a kgraph-derived Neo4j graph, this would be
entity_id. In a general Neo4j graph, it might be id, uri, name,
or something domain-specific. The backend is initialized with the canonical
ID property name.
Cypher Traversal¶
edges_from and edges_to are natural Cypher traversals:
-- edges_from(entity_id)
MATCH (n {entity_id: $id})-[r]->(m)
RETURN n.entity_id AS subject,
type(r) AS predicate,
m.entity_id AS object
-- edges_to(entity_id)
MATCH (n)-[r]->(m {entity_id: $id})
RETURN n.entity_id AS subject,
type(r) AS predicate,
m.entity_id AS object
type(r) returns the relationship type as a string, which becomes the
predicate. Neo4j relationship types are uppercase by convention (TREATS,
INHIBITS, ASSOCIATED_WITH); BFS-QL predicates are lowercase by
convention. The backend normalizes to lowercase at query time.
Full-Text Search¶
search_entities in Neo4j requires a full-text index. Unlike Postgres
(which can fall back to ILIKE) or a SPARQL endpoint (which can use
CONTAINS on labels), Neo4j has no built-in substring search on node
properties. A full-text index must be created at graph construction time:
With the index in place, search_entities becomes:
CALL db.index.fulltext.queryNodes("entity_names", $query)
YIELD node, score
RETURN node.entity_id AS id, labels(node)[0] AS entity_type
ORDER BY score DESC
LIMIT 10
The index requirement is a constraint on graph construction, not on
BFS-QL. A Neo4j graph served through BFS-QL must have the index; graphs
without it cannot support search_entities. The backend checks for index
existence at initialization and raises a clear error if it is missing,
rather than failing silently at query time.
entity_types and predicates¶
-- entity_types()
CALL db.labels() YIELD label RETURN label ORDER BY label
-- predicates()
CALL db.relationshipTypes() YIELD relationshipType
RETURN relationshipType ORDER BY relationshipType
Neo4j's db.labels() and db.relationshipTypes() procedures return the
complete label and relationship type vocabularies without scanning the
graph. They are fast, stable, and the natural implementation of
entity_types and predicates. No SELECT DISTINCT required.
Chapter 12: Writing Your Own Backend¶
The eight-method contract is a complete specification. If you can answer each of the eight questions for a given graph store, you can write a BFS-QL backend for it, and everything above that layer -- traversal, filtering, caching, the six-tool MCP interface -- comes for free.
What "Correct" Means for Each Method¶
search_entities(query): Return a ranked list of EntityStub records
whose names or aliases match the query string. "Ranked" means most-likely
matches first. "Match" is implementation-defined: substring, vector
similarity, full-text score, or exact match are all valid. Return at most
10-20 candidates. Do not filter by entity type here -- that is the caller's
job. Return an empty list, not an error, if nothing matches.
edges_from(entity_id) / edges_to(entity_id): Return all outgoing
or incoming edges for the entity. "All" means all -- do not apply relevance
filters. BFS traversal needs complete topology; filtering happens at the
server layer. Return Edge records with canonical IDs for subject and
object; do not return metadata here. Raise KeyError if the entity does
not exist; return [] if it exists but has no edges.
get_node(entity_id): Return a Node record with the entity's ID
and type. This is the identity call, not the metadata call. It is fast.
Raise KeyError if the entity does not exist or is inaccessible.
metadata_for_node(entity_id): Return a dict of all available
metadata for the entity. Keys and types are backend-defined. Include
everything: names, synonyms, descriptions, external links, confidence
scores. The server passes this dict to the LLM as-is; the LLM decides
what is relevant. Do not omit fields to save space -- the topology mode
handles that at the server layer.
metadata_for_edge(edge): Return a dict of all available edge metadata.
Include provenance: text spans, source documents, extraction confidence,
creation timestamps. The server strips verbose provenance fields from BFS
results (returning them only through describe_entity) but the backend
should return everything and let the server decide what to expose.
entity_types() / predicates(): Return complete, stable lists. The
server caches these indefinitely; they must not change during a session.
Return them in a consistent order (alphabetical is conventional). Return
an empty list if the graph has no schema (though this makes describe_schema
useless and should be avoided).
A Worked Example: JSON-LD REST API Backend¶
To make the contract concrete, consider a JSON-LD REST API as a backend.
The API exposes entities at /entities/{id} and their relationships at
/entities/{id}/relations. A schema endpoint at /schema returns the
vocabulary.
class JsonLdBackend(GraphDbInterface):
def __init__(self, base_url: str) -> None:
self._base = base_url.rstrip("/")
self._session: aiohttp.ClientSession | None = None
async def _get(self, path: str) -> dict:
url = f"{self._base}{path}"
async with self._session.get(url) as resp:
return await resp.json()
async def get_node(self, entity_id: str) -> Node:
data = await self._get(f"/entities/{entity_id}")
return Node(id=data["id"], entity_type=data["@type"])
async def metadata_for_node(
self, entity_id: str
) -> dict[str, Any]:
data = await self._get(f"/entities/{entity_id}")
return {
k: v for k, v in data.items()
if k not in ("id", "@type", "@context")
}
async def edges_from(self, entity_id: str) -> list[Edge]:
path = f"/entities/{entity_id}/relations?direction=out"
data = await self._get(path)
return [
Edge(
subject=r["subject"],
predicate=r["predicate"],
object=r["object"]
) for r in data["relations"]
]
async def edges_to(self, entity_id: str) -> list[Edge]:
path = f"/entities/{entity_id}/relations?direction=in"
data = await self._get(path)
return [
Edge(
subject=r["subject"],
predicate=r["predicate"],
object=r["object"]
) for r in data["relations"]
]
async def search_entities(
self, query: str, node_types: list[str] | None = None
) -> list[EntityStub]:
params = f"?q={query}&limit=10"
if node_types:
params += "&types=" + ",".join(node_types)
data = await self._get(f"/entities{params}")
return [EntityStub(id=e["id"], entity_type=e["@type"])
for e in data["results"]]
async def metadata_for_edge(
self, edge: Edge
) -> dict[str, Any]:
path = (
f"/relations?subject={edge.subject}"
f"&predicate={edge.predicate}"
f"&object={edge.object}"
)
data = await self._get(path)
return data.get("metadata", {})
async def entity_types(self) -> list[str]:
data = await self._get("/schema")
return sorted(data["entity_types"])
async def predicates(self) -> list[str]:
data = await self._get("/schema")
return sorted(data["predicates"])
This is approximately 60 lines. It is incomplete -- there is no session
management, no error handling for missing entities, no JSON-LD context
resolution. But it illustrates the contract. Once these eight methods work
correctly, the backend can be passed to create_server() and immediately
served through the full BFS-QL interface: six tools, stub/full filtering,
multi-seed BFS, topology mode, LRU caching. None of that is in the backend.
All of it comes for free.
The Bar Is Low; the Payoff Is Immediate¶
The eight-method interface is deliberately small. Its purpose is not to constrain what backends can do -- they can expose arbitrary metadata, use any storage technology, call any external service. Its purpose is to define the minimum surface that the BFS-QL server needs to function.
A backend that correctly implements all eight methods gets, automatically:
- BFS traversal to any depth with concurrency across the frontier
- Stub/full node and edge filtering based on the caller's
node_typesandpredicatesparameters - Topology mode: pure structural skeleton with no metadata
- Multi-seed union: BFS from multiple seeds simultaneously
- LRU caching at the primitive level: no repeated round-trips for the same entity or edge within a session
- The full six-tool MCP interface (all six BFS-QL tools)
- Schema injection: valid
node_typesandpredicatesinjected into thebfs_querytool description when the schema is small enough
The cost is eight method implementations. The payoff is a fully functional LLM graph interface against any data store you can navigate.