Skip to content

Chapter09

Chapter 9: The Postgres Backend\index{PostgresBackend}

The Postgres backend is the primary BFS-QL backend, not because Postgres is the only viable graph store -- it is not -- but because it is the natural target for graphs built with the companion volume's extraction pipeline. Kgraph writes entities, relationships, and embeddings into Postgres. BFS-QL reads them through PostgresBackend. The two are designed as a pair; this chapter covers the reading side.

The Schema

PostgresBackend expects the kgraph schema:

  • entity table: entity_id (canonical ID), entity_type, name, embedding (pgvector float array), properties (JSON), status, confidence, synonyms (JSON), source, canonical_url.
  • relationship table: subject_id, predicate, object_id, confidence, source_documents (JSON), properties (JSON).
  • bundle_evidence table: relationship_key (string FK of the form subject_id:predicate:object_id), text_span, confidence, document_id, section.

Entities with status = 'merged' are excluded from all queries. The kgraph pipeline resolves duplicate entities by merging them into a canonical representative; merged entities are kept in the table for audit purposes but are not exposed through BFS-QL.

Connection Pool and Initialization

PostgresBackend uses asyncpg for async I/O and manages a connection pool:

@classmethod
async def create(
    cls, dsn: str | None = None, ...
) -> "PostgresBackend":
    dsn = dsn or os.environ["DATABASE_URL"]

    async def _init_conn(conn):
        await conn.execute(
            "CREATE EXTENSION IF NOT EXISTS vector"
        )
        await conn.set_type_codec(
            "jsonb",
            encoder=json.dumps,
            decoder=json.loads,
            schema="pg_catalog",
        )

    pool = await asyncpg.create_pool(
        dsn, min_size=2, max_size=10, init=_init_conn
    )
    return cls(pool, embedding_fn)

The _init_conn callback runs on every new connection in the pool. It installs the pgvector extension if not present and registers a type codec that automatically decodes JSONB columns to Python dicts. The vector type for embeddings is handled separately by casting to text in queries.

PostgresBackend.create is an async classmethod, not __init__. This is a Python idiom for async initialization: __init__ cannot be async, so the factory pattern is the clean solution. The create_server() function accepts PostgresBackend.create as a factory callable -- the server's lifespan handler calls it inside the running event loop, ensuring the pool is created in the same loop that will use it.

search_entities has two implementations selected at pool creation time:

Vector search (when an embedding_fn is provided):

async def _search_by_vector(
    self, query: str, limit: int = 10
) -> list[EntityStub]:
    embedding = await self._embedding_fn(query)
    embedding_str = (
        "[" + ",".join(str(x) for x in embedding) + "]"
    )
    rows = await conn.fetch(
        """
        SELECT entity_id, entity_type
        FROM entity
        WHERE embedding IS NOT NULL
          AND (status IS NULL OR status != 'merged')
        ORDER BY embedding::vector <=> $1::vector
        LIMIT $2
        """,
        embedding_str, limit,
    )

The <=> operator is pgvector's cosine distance operator. The query embeds the search string and finds the nearest entity embeddings. Embedding model consistency — using the same model at ingest time and query time — is critical; mismatched models produce meaningless distances. This is guaranteed automatically when the identity server owns all embeddings: it uses the same model for both, and the query layer never needs to know which one.

Name search (fallback when no embedding_fn is provided):

async def _search_by_name(
    self, query: str, limit: int = 10
) -> list[EntityStub]:
    pattern = f"%{query}%"
    rows = await conn.fetch(
        "SELECT entity_id, entity_type FROM entity "
        "WHERE name ILIKE $1 "
        "AND (status IS NULL OR status != 'merged') "
        "ORDER BY name LIMIT $2",
        pattern, limit,
    )

ILIKE is case-insensitive substring matching. It is less precise than vector search -- a query for "Cushing" matches "Cushing disease," "Cushing syndrome," and any entity whose name contains the substring. For the medlit demo graph, where entity names are short and specific, this is acceptable. For large general-purpose graphs, vector search is preferred.

Edge Traversal

The traversal methods are straightforward:

async def edges_from(self, entity_id: str) -> list[Edge]:
    rows = await conn.fetch(
        "SELECT subject_id, predicate, object_id "
        "FROM relationship "
        "WHERE subject_id = $1",
        entity_id,
    )
    return [
        Edge(
            subject=r["subject_id"],
            predicate=r["predicate"],
            object=r["object_id"]
        ) for r in rows
    ]

edges_to is identical with WHERE object_id = $1. Both are called concurrently for every node in the BFS frontier. The connection pool handles concurrent acquisition; asyncpg's pool is safe for concurrent use.

Metadata and Evidence

metadata_for_edge is the most complex method in the backend. An edge record in the relationship table carries confidence, source_documents, and properties. Evidence provenance is in bundle_evidence, keyed by the string subject_id:predicate:object_id:

async def metadata_for_edge(
    self, edge: Edge
) -> dict[str, Any]:
    rel_key = f"{edge.subject}:{edge.predicate}:{edge.object}"
    evidence_rows = await _fetch_evidence(
        conn, rel_row["id"], rel_key
    )
    ...

The _fetch_evidence helper tries two schemas: the test schema (an evidence table with a UUID foreign key) and the kgserver schema (bundle_evidence with a string relationship key). This dual-schema support exists because the integration tests build a minimal schema from scratch while the production demo data uses the full kgserver schema. Both schemas are handled transparently.

The resulting metadata dict for a full edge includes confidence, source document IDs, any relationship properties, and a provenance list with text spans, confidence scores, and document references. This provenance is stripped by _slim_result in the server layer before returning BFS results -- it is available only through describe_entity on a specific node, which fetches the edge metadata directly.