Skip to content

Part III: Building a Backend

Chapter 8: The GraphDbInterface ABC

There is a tension in interface design between expressiveness and simplicity. An expressive interface gives the caller more power -- more ways to phrase a request, more control over execution, more options to tune. A simple interface gives the implementor less to build and the caller fewer things to get wrong. The usual engineering response is to find a balance, to offer enough expressiveness without crossing into complexity.

BFS-QL resolves this tension by separating it into two layers. The GraphDbInterface abstract base class is the implementor-facing interface. It is deliberately primitive -- eight methods, all basic graph navigation, no traversal intelligence. The BFS-QL server layer is the caller-facing interface, where all the expressiveness lives: multi-seed BFS, stub/full filtering, topology mode, caching, the full six-tool surface. These are not in tension because they are in different places. The ABC is simple so that backends are easy to write. The server is expressive because it has to be. Neither layer compromises the other.

Eight Methods

The complete interface:

class GraphDbInterface(ABC):

    @abstractmethod
    async def search_entities(
        self, query: str, node_types: list[str] | None = None
    ) -> list[EntityStub]:
        """Resolve a name or alias to candidate stubs."""

    @abstractmethod
    async def edges_from(self, entity_id: str) -> list[Edge]:
        """Return all outgoing edges from the given entity."""

    @abstractmethod
    async def edges_to(self, entity_id: str) -> list[Edge]:
        """Return all incoming edges to the given entity."""

    @abstractmethod
    async def get_node(self, entity_id: str) -> Node:
        """Return the node record for the given entity ID."""

    @abstractmethod
    async def metadata_for_node(
        self, entity_id: str
    ) -> dict[str, Any]:
        """Return all available metadata for the entity."""

    @abstractmethod
    async def metadata_for_edge(
        self, edge: Edge
    ) -> dict[str, Any]:
        """Return full metadata, including provenance."""

    @abstractmethod
    async def entity_types(self) -> list[str]:
        """Return valid entity type names in this graph."""

    @abstractmethod
    async def predicates(self) -> list[str]:
        """Return valid predicate names in this graph."""

Three pairs and two singletons. edges_from / edges_to are the traversal primitives -- directed graph navigation in both directions. get_node / metadata_for_node separate identity from detail: the first returns just the entity ID and type, the second returns everything else. metadata_for_edge pairs with the traversal primitives to provide full provenance. Then the two singletons: search_entities maps natural-language names to canonical IDs, and entity_types / predicates expose the graph's own vocabulary.

The separation of get_node from metadata_for_node is deliberate. During BFS expansion, the engine calls both concurrently for nodes that need full records, but calls only get_node for nodes that will become stubs. A backend that fetches metadata lazily -- or from a separate service -- can implement both cheaply without conflating the two concerns.

What the Interface Does Not Contain

There is no bfs_query method. There is no count_neighbors method. There is no find_shortest_path method. There is no filter parameter, no hop limit, no traversal mode.

All of that is in the server layer. The bfs_query function in engine.py implements multi-seed BFS entirely in terms of edges_from, edges_to, get_node, and metadata_for_node. The stub/full decision is made at the server layer. The topology-only mode is a serialization choice at the server layer. The caching layer wraps the backend transparently.

The consequence is that a backend implementor answers only one question: how do I perform these eight operations against this particular graph store? Not: how do I implement BFS? Not: how do I cache edge lists efficiently? Not: how do I serialize results? Those questions are already answered, once, in the server layer, and the answers apply to every backend automatically.

The Caching Layer

CachedGraphDb wraps any backend in a dict-keyed cache at the primitive level. Every call to edges_from(id) checks a dict before hitting the backend. Every metadata_for_node(id) is cached after the first fetch. entity_types and predicates are cached indefinitely -- they are stable for the lifetime of a session.

class CachedGraphDb(GraphDbInterface):
    def __init__(
        self, backend: GraphDbInterface, maxsize: int = 1024
    ) -> None:
        self._backend = backend
        self._edges_from_cache: dict[str, list[Edge]] = {}
        self._node_meta_cache: dict[str, dict[str, Any]] = {}
        # ... per-method dicts for all eight methods

    async def edges_from(self, entity_id: str) -> list[Edge]:
        if entity_id not in self._edges_from_cache:
            res = await self._backend.edges_from(entity_id)
            self._edges_from_cache[entity_id] = res
        return self._edges_from_cache[entity_id]

The critical property is that caching operates at the level where it pays. BFS traversal at depth 2 may visit the same node from multiple directions. Without caching, each visit triggers a backend round-trip. With primitive-level caching, the second visit is a dict lookup. The backend sees each distinct call at most once per session. A multi-hop traversal over a well-connected graph can reduce backend round-trips by an order of magnitude.

Because the cache is transparent -- CachedGraphDb implements GraphDbInterface -- backends do not implement caching themselves. They return fresh data on every call. The server layer decides caching policy. Backends stay simple.

All Methods Are Async

Every method in GraphDbInterface is async. This is not a formality. BFS expansion calls edges_from and edges_to for every node in the current frontier concurrently, via asyncio.gather. A 2-hop BFS over a frontier of 40 nodes issues 80 concurrent edge queries. Against a Postgres backend, these resolve as concurrent connection pool requests. Against a SPARQL endpoint, they resolve as concurrent HTTP requests. Against a Neo4j backend, they resolve as concurrent Bolt protocol calls.

A synchronous interface would serialize this work unnecessarily. The async design is a performance contract: backends that can serve concurrent queries concurrently will. Backends that cannot (in-memory dicts, file-backed stores) pay no penalty -- async def with no await inside is just a regular function in async clothing.

Chapter 9: The Postgres Backend

The Postgres backend is the primary BFS-QL backend, not because Postgres is the only viable graph store -- it is not -- but because it is the natural target for graphs built with the companion volume's extraction pipeline. Kgraph writes entities, relationships, and embeddings into Postgres. BFS-QL reads them through PostgresBackend. The two are designed as a pair; this chapter covers the reading side.

The Schema

PostgresBackend expects the kgraph schema:

  • entity table: entity_id (canonical ID), entity_type, name, embedding (pgvector float array), properties (JSON), status, confidence, synonyms (JSON), source, canonical_url.
  • relationship table: subject_id, predicate, object_id, confidence, source_documents (JSON), properties (JSON).
  • bundle_evidence table: relationship_key (string FK of the form subject_id:predicate:object_id), text_span, confidence, document_id, section.

Entities with status = 'merged' are excluded from all queries. The kgraph pipeline resolves duplicate entities by merging them into a canonical representative; merged entities are kept in the table for audit purposes but are not exposed through BFS-QL.

Connection Pool and Initialization

PostgresBackend uses asyncpg for async I/O and manages a connection pool:

@classmethod
async def create(
    cls, dsn: str | None = None, ...
) -> "PostgresBackend":
    dsn = dsn or os.environ["DATABASE_URL"]

    async def _init_conn(conn):
        await conn.execute(
            "CREATE EXTENSION IF NOT EXISTS vector"
        )
        await conn.set_type_codec(
            "jsonb",
            encoder=json.dumps,
            decoder=json.loads,
            schema="pg_catalog",
        )

    pool = await asyncpg.create_pool(
        dsn, min_size=2, max_size=10, init=_init_conn
    )
    return cls(pool, embedding_fn)

The _init_conn callback runs on every new connection in the pool. It installs the pgvector extension if not present and registers a type codec that automatically decodes JSONB columns to Python dicts. The vector type for embeddings is handled separately by casting to text in queries.

PostgresBackend.create is an async classmethod, not __init__. This is a Python idiom for async initialization: __init__ cannot be async, so the factory pattern is the clean solution. The create_server() function accepts PostgresBackend.create as a factory callable -- the server's lifespan handler calls it inside the running event loop, ensuring the pool is created in the same loop that will use it.

search_entities has two implementations selected at pool creation time:

Vector search (when an embedding_fn is provided):

async def _search_by_vector(
    self, query: str, limit: int = 10
) -> list[EntityStub]:
    embedding = await self._embedding_fn(query)
    embedding_str = (
        "[" + ",".join(str(x) for x in embedding) + "]"
    )
    rows = await conn.fetch(
        """
        SELECT entity_id, entity_type
        FROM entity
        WHERE embedding IS NOT NULL
          AND (status IS NULL OR status != 'merged')
        ORDER BY embedding::vector <=> $1::vector
        LIMIT $2
        """,
        embedding_str, limit,
    )

The <=> operator is pgvector's cosine distance operator. The query embeds the search string and finds the nearest entity embeddings. Embedding model consistency — using the same model at ingest time and query time — is critical; mismatched models produce meaningless distances. This is guaranteed automatically when the identity server owns all embeddings: it uses the same model for both, and the query layer never needs to know which one.

Name search (fallback when no embedding_fn is provided):

async def _search_by_name(
    self, query: str, limit: int = 10
) -> list[EntityStub]:
    pattern = f"%{query}%"
    rows = await conn.fetch(
        "SELECT entity_id, entity_type FROM entity "
        "WHERE name ILIKE $1 "
        "AND (status IS NULL OR status != 'merged') "
        "ORDER BY name LIMIT $2",
        pattern, limit,
    )

ILIKE is case-insensitive substring matching. It is less precise than vector search -- a query for "Cushing" matches "Cushing disease," "Cushing syndrome," and any entity whose name contains the substring. For the medlit demo graph, where entity names are short and specific, this is acceptable. For large general-purpose graphs, vector search is preferred.

Edge Traversal

The traversal methods are straightforward:

async def edges_from(self, entity_id: str) -> list[Edge]:
    rows = await conn.fetch(
        "SELECT subject_id, predicate, object_id "
        "FROM relationship "
        "WHERE subject_id = $1",
        entity_id,
    )
    return [
        Edge(
            subject=r["subject_id"],
            predicate=r["predicate"],
            object=r["object_id"]
        ) for r in rows
    ]

edges_to is identical with WHERE object_id = $1. Both are called concurrently for every node in the BFS frontier. The connection pool handles concurrent acquisition; asyncpg's pool is safe for concurrent use.

Metadata and Evidence

metadata_for_edge is the most complex method in the backend. An edge record in the relationship table carries confidence, source_documents, and properties. Evidence provenance is in bundle_evidence, keyed by the string subject_id:predicate:object_id:

async def metadata_for_edge(
    self, edge: Edge
) -> dict[str, Any]:
    rel_key = f"{edge.subject}:{edge.predicate}:{edge.object}"
    evidence_rows = await _fetch_evidence(
        conn, rel_row["id"], rel_key
    )
    ...

The _fetch_evidence helper tries two schemas: the test schema (an evidence table with a UUID foreign key) and the kgserver schema (bundle_evidence with a string relationship key). This dual-schema support exists because the integration tests build a minimal schema from scratch while the production demo data uses the full kgserver schema. Both schemas are handled transparently.

The resulting metadata dict for a full edge includes confidence, source document IDs, any relationship properties, and a provenance list with text spans, confidence scores, and document references. This provenance is stripped by _slim_result in the server layer before returning BFS results -- it is available only through describe_entity on a specific node, which fetches the edge metadata directly.

Chapter 10: The SPARQL Backend

The Postgres backend covers kgraph-derived graphs -- graphs that were built by the extraction pipeline and live in a database the developer controls. But there are thousands of knowledge graphs that predate BFS-QL, that were built by other teams for other purposes, and that expose themselves through SPARQL 1.1 endpoints. DBpedia, Wikidata, UniProt, ChEMBL, the Gene Ontology, the NCI Thesaurus -- these are public graphs with public endpoints, accumulated over decades, containing knowledge that no extraction pipeline will soon replicate. A SPARQL backend makes all of them accessible through the same six-tool interface.

The SPARQL Endpoint Model

A SPARQL 1.1 endpoint accepts HTTP POST requests with a query parameter containing a SPARQL query string and returns results as JSON, XML, or CSV. The endpoint URL is the only configuration. The backend sends queries over HTTP and parses the JSON binding response format.

The edges_from and edges_to methods translate directly to SPARQL property path queries:

-- edges_from(entity_id)
SELECT ?predicate ?object WHERE {
    <{entity_id}> ?predicate ?object .
    FILTER(!isBlank(?object))
}
LIMIT 500

-- edges_to(entity_id)
SELECT ?subject ?predicate WHERE {
    ?subject ?predicate <{entity_id}> .
    FILTER(!isBlank(?subject))
}
LIMIT 500

The FILTER(!isBlank(?object)) clause excludes blank nodes -- anonymous intermediate nodes that appear in RDF data but have no canonical ID and cannot be meaningfully referenced in BFS-QL. Blank nodes are a modeling convenience in RDF; they are a navigational dead end for graph traversal.

URI Normalization

SPARQL endpoints represent entities as URIs:

<http://dbpedia.org/resource/Cushing%27s_disease>
<http://www.wikidata.org/entity/Q183417>

BFS-QL canonical IDs are strings. The SPARQL backend must map between them.

The mapping strategy is endpoint-specific. For DBpedia, the URI prefix http://dbpedia.org/resource/ maps to the prefix DBpedia:. For Wikidata, http://www.wikidata.org/entity/ maps to Wikidata:. The backend's initialization takes a prefix map:

backend = SparqlBackend(
    endpoint="https://dbpedia.org/sparql",
    prefixes={
        "DBpedia": "http://dbpedia.org/resource/",
        "DBpedia-owl": "http://dbpedia.org/ontology/",
    }
)

Outgoing URIs are expanded to full form before insertion into SPARQL queries. Incoming URIs are compressed using the prefix map. URIs that match no known prefix are included as-is -- they are valid canonical IDs, just opaque to the user.

Schema Discovery

entity_types and predicates are the BFS-QL methods that return the graph's vocabulary. For SPARQL endpoints, these translate to SELECT DISTINCT queries:

-- entity_types()
SELECT DISTINCT ?type WHERE {
    ?s a ?type .
}
ORDER BY ?type
LIMIT 200

-- predicates()
SELECT DISTINCT ?pred WHERE {
    ?s ?pred ?o .
    FILTER(?pred != rdf:type)
}
ORDER BY ?pred
LIMIT 500

These queries can be slow on large endpoints -- Wikidata has hundreds of millions of triples and SELECT DISTINCT over all predicates is not instantaneous. The CachedGraphDb wrapper handles this: both methods are cached indefinitely after the first call. The server's lifespan handler calls them once at startup and caches the results in _state.

For endpoints where SELECT DISTINCT is prohibitively slow, an alternative is to probe from a known seed: start from a well-connected entity and collect the entity types and predicates that appear in its neighborhood. This produces a partial schema -- sufficient for the BFS-QL server to inject into tool descriptions -- without scanning the entire graph.

search_entities Against SPARQL

SPARQL endpoints vary in their full-text search support. Virtuoso (which backs DBpedia) supports bif:contains for full-text matching. GraphDB supports Lucene-backed text search. Many endpoints support no full-text search at all.

The most portable approach is rdfs:label matching with FILTER(CONTAINS(...)):

SELECT ?entity ?type WHERE {
    ?entity rdfs:label ?label ;
            a ?type .
    FILTER(CONTAINS(LCASE(?label), LCASE("{query}")))
}
LIMIT 20

This is not fast on large graphs, but it works everywhere and avoids endpoint-specific extensions. For production use against a specific endpoint, the backend should be configured with that endpoint's preferred search mechanism.

Handling Endpoint Variance

SPARQL 1.1 is a standard, but implementations differ. Virtuoso requires DEFINE sql:describe-mode "CBD" for some queries. GraphDB has different timeout behavior. Stardog enforces stricter blank node handling. Amazon Neptune does not support all property path expressions.

The SPARQL backend handles this through a small set of configuration knobs: query timeout (in seconds), result set size limit (LIMIT clause), and a flag for whether SELECT DISTINCT over the full graph is safe to issue. These are set at initialization and applied to all generated queries.

The abstraction boundary is clean: an LLM querying a BFS-QL server backed by Virtuoso, GraphDB, or Neptune sees identical behavior. The endpoint variance is confined entirely to the backend implementation.

Chapter 11: The Neo4j Backend

Neo4j is a property graph database, not an RDF store. The distinction matters for the implementation, though not for the BFS-QL interface. Where RDF graphs represent everything as triples of URIs and literals, property graphs attach key-value pairs directly to nodes and relationships. A node in Neo4j has a label (or multiple labels) and a set of properties. A relationship has a type and a set of properties. There are no blank nodes; everything is either a node or a named relationship.

For BFS-QL, the mapping from the property graph model to the GraphDbInterface is direct:

  • Node labels → entity_type
  • Relationship types → predicates
  • Node identity (Neo4j's internal ID or a canonical ID property) → entity ID
  • Node properties → metadata_for_node
  • Relationship properties → metadata_for_edge

The implementation requires one configuration decision: which node property holds the canonical ID. In a kgraph-derived Neo4j graph, this would be entity_id. In a general Neo4j graph, it might be id, uri, name, or something domain-specific. The backend is initialized with the canonical ID property name.

Cypher Traversal

edges_from and edges_to are natural Cypher traversals:

-- edges_from(entity_id)
MATCH (n {entity_id: $id})-[r]->(m)
RETURN n.entity_id AS subject,
       type(r) AS predicate,
       m.entity_id AS object

-- edges_to(entity_id)
MATCH (n)-[r]->(m {entity_id: $id})
RETURN n.entity_id AS subject,
       type(r) AS predicate,
       m.entity_id AS object

type(r) returns the relationship type as a string, which becomes the predicate. Neo4j relationship types are uppercase by convention (TREATS, INHIBITS, ASSOCIATED_WITH); BFS-QL predicates are lowercase by convention. The backend normalizes to lowercase at query time.

search_entities in Neo4j requires a full-text index. Unlike Postgres (which can fall back to ILIKE) or a SPARQL endpoint (which can use CONTAINS on labels), Neo4j has no built-in substring search on node properties. A full-text index must be created at graph construction time:

CREATE FULLTEXT INDEX entity_names
  FOR (n:Entity) ON EACH [n.name, n.synonyms]

With the index in place, search_entities becomes:

CALL db.index.fulltext.queryNodes("entity_names", $query)
YIELD node, score
RETURN node.entity_id AS id, labels(node)[0] AS entity_type
ORDER BY score DESC
LIMIT 10

The index requirement is a constraint on graph construction, not on BFS-QL. A Neo4j graph served through BFS-QL must have the index; graphs without it cannot support search_entities. The backend checks for index existence at initialization and raises a clear error if it is missing, rather than failing silently at query time.

entity_types and predicates

-- entity_types()
CALL db.labels() YIELD label RETURN label ORDER BY label

-- predicates()
CALL db.relationshipTypes() YIELD relationshipType
RETURN relationshipType ORDER BY relationshipType

Neo4j's db.labels() and db.relationshipTypes() procedures return the complete label and relationship type vocabularies without scanning the graph. They are fast, stable, and the natural implementation of entity_types and predicates. No SELECT DISTINCT required.

Chapter 12: Writing Your Own Backend

The eight-method contract is a complete specification. If you can answer each of the eight questions for a given graph store, you can write a BFS-QL backend for it, and everything above that layer -- traversal, filtering, caching, the six-tool MCP interface -- comes for free.

What "Correct" Means for Each Method

search_entities(query): Return a ranked list of EntityStub records whose names or aliases match the query string. "Ranked" means most-likely matches first. "Match" is implementation-defined: substring, vector similarity, full-text score, or exact match are all valid. Return at most 10-20 candidates. Do not filter by entity type here -- that is the caller's job. Return an empty list, not an error, if nothing matches.

edges_from(entity_id) / edges_to(entity_id): Return all outgoing or incoming edges for the entity. "All" means all -- do not apply relevance filters. BFS traversal needs complete topology; filtering happens at the server layer. Return Edge records with canonical IDs for subject and object; do not return metadata here. Raise KeyError if the entity does not exist; return [] if it exists but has no edges.

get_node(entity_id): Return a Node record with the entity's ID and type. This is the identity call, not the metadata call. It is fast. Raise KeyError if the entity does not exist or is inaccessible.

metadata_for_node(entity_id): Return a dict of all available metadata for the entity. Keys and types are backend-defined. Include everything: names, synonyms, descriptions, external links, confidence scores. The server passes this dict to the LLM as-is; the LLM decides what is relevant. Do not omit fields to save space -- the topology mode handles that at the server layer.

metadata_for_edge(edge): Return a dict of all available edge metadata. Include provenance: text spans, source documents, extraction confidence, creation timestamps. The server strips verbose provenance fields from BFS results (returning them only through describe_entity) but the backend should return everything and let the server decide what to expose.

entity_types() / predicates(): Return complete, stable lists. The server caches these indefinitely; they must not change during a session. Return them in a consistent order (alphabetical is conventional). Return an empty list if the graph has no schema (though this makes describe_schema useless and should be avoided).

A Worked Example: JSON-LD REST API Backend

To make the contract concrete, consider a JSON-LD REST API as a backend. The API exposes entities at /entities/{id} and their relationships at /entities/{id}/relations. A schema endpoint at /schema returns the vocabulary.

class JsonLdBackend(GraphDbInterface):
    def __init__(self, base_url: str) -> None:
        self._base = base_url.rstrip("/")
        self._session: aiohttp.ClientSession | None = None

    async def _get(self, path: str) -> dict:
        url = f"{self._base}{path}"
        async with self._session.get(url) as resp:
            return await resp.json()

    async def get_node(self, entity_id: str) -> Node:
        data = await self._get(f"/entities/{entity_id}")
        return Node(id=data["id"], entity_type=data["@type"])

    async def metadata_for_node(
        self, entity_id: str
    ) -> dict[str, Any]:
        data = await self._get(f"/entities/{entity_id}")
        return {
            k: v for k, v in data.items()
            if k not in ("id", "@type", "@context")
        }

    async def edges_from(self, entity_id: str) -> list[Edge]:
        path = f"/entities/{entity_id}/relations?direction=out"
        data = await self._get(path)
        return [
            Edge(
                subject=r["subject"],
                predicate=r["predicate"],
                object=r["object"]
            ) for r in data["relations"]
        ]

    async def edges_to(self, entity_id: str) -> list[Edge]:
        path = f"/entities/{entity_id}/relations?direction=in"
        data = await self._get(path)
        return [
            Edge(
                subject=r["subject"],
                predicate=r["predicate"],
                object=r["object"]
            ) for r in data["relations"]
        ]

    async def search_entities(
        self, query: str, node_types: list[str] | None = None
    ) -> list[EntityStub]:
        params = f"?q={query}&limit=10"
        if node_types:
            params += "&types=" + ",".join(node_types)
        data = await self._get(f"/entities{params}")
        return [EntityStub(id=e["id"], entity_type=e["@type"])
                for e in data["results"]]

    async def metadata_for_edge(
        self, edge: Edge
    ) -> dict[str, Any]:
        path = (
            f"/relations?subject={edge.subject}"
            f"&predicate={edge.predicate}"
            f"&object={edge.object}"
        )
        data = await self._get(path)
        return data.get("metadata", {})

    async def entity_types(self) -> list[str]:
        data = await self._get("/schema")
        return sorted(data["entity_types"])

    async def predicates(self) -> list[str]:
        data = await self._get("/schema")
        return sorted(data["predicates"])

This is approximately 60 lines. It is incomplete -- there is no session management, no error handling for missing entities, no JSON-LD context resolution. But it illustrates the contract. Once these eight methods work correctly, the backend can be passed to create_server() and immediately served through the full BFS-QL interface: six tools, stub/full filtering, multi-seed BFS, topology mode, LRU caching. None of that is in the backend. All of it comes for free.

The Bar Is Low; the Payoff Is Immediate

The eight-method interface is deliberately small. Its purpose is not to constrain what backends can do -- they can expose arbitrary metadata, use any storage technology, call any external service. Its purpose is to define the minimum surface that the BFS-QL server needs to function.

A backend that correctly implements all eight methods gets, automatically:

  • BFS traversal to any depth with concurrency across the frontier
  • Stub/full node and edge filtering based on the caller's node_types and predicates parameters
  • Topology mode: pure structural skeleton with no metadata
  • Multi-seed union: BFS from multiple seeds simultaneously
  • LRU caching at the primitive level: no repeated round-trips for the same entity or edge within a session
  • The full six-tool MCP interface (all six BFS-QL tools)
  • Schema injection: valid node_types and predicates injected into the bfs_query tool description when the schema is small enough

The cost is eight method implementations. The payoff is a fully functional LLM graph interface against any data store you can navigate.