Chapter09
Chapter 9: The Postgres Backend\index{PostgresBackend}¶
The Postgres backend is the primary BFS-QL backend, not because Postgres is
the only viable graph store -- it is not -- but because it is the natural
target for graphs built with the companion volume's extraction pipeline.
Kgraph writes entities, relationships, and embeddings into Postgres. BFS-QL
reads them through PostgresBackend. The two are designed as a pair; this
chapter covers the reading side.
The Schema¶
PostgresBackend expects the kgraph schema:
entitytable:entity_id(canonical ID),entity_type,name,embedding(pgvector float array),properties(JSON),status,confidence,synonyms(JSON),source,canonical_url.relationshiptable:subject_id,predicate,object_id,confidence,source_documents(JSON),properties(JSON).bundle_evidencetable:relationship_key(string FK of the formsubject_id:predicate:object_id),text_span,confidence,document_id,section.
Entities with status = 'merged' are excluded from all queries. The
kgraph pipeline resolves duplicate entities by merging them into a canonical
representative; merged entities are kept in the table for audit purposes
but are not exposed through BFS-QL.
Connection Pool and Initialization¶
PostgresBackend uses asyncpg for async I/O and manages a connection pool:
@classmethod
async def create(
cls, dsn: str | None = None, ...
) -> "PostgresBackend":
dsn = dsn or os.environ["DATABASE_URL"]
async def _init_conn(conn):
await conn.execute(
"CREATE EXTENSION IF NOT EXISTS vector"
)
await conn.set_type_codec(
"jsonb",
encoder=json.dumps,
decoder=json.loads,
schema="pg_catalog",
)
pool = await asyncpg.create_pool(
dsn, min_size=2, max_size=10, init=_init_conn
)
return cls(pool, embedding_fn)
The _init_conn callback runs on every new connection in the pool. It
installs the pgvector extension if not present and registers a type codec
that automatically decodes JSONB columns to Python dicts. The vector
type for embeddings is handled separately by casting to text in queries.
PostgresBackend.create is an async classmethod, not __init__. This is
a Python idiom for async initialization: __init__ cannot be async, so
the factory pattern is the clean solution. The create_server() function
accepts PostgresBackend.create as a factory callable -- the server's
lifespan handler calls it inside the running event loop, ensuring the pool
is created in the same loop that will use it.
Entity Search¶
search_entities has two implementations selected at pool creation time:
Vector search (when an embedding_fn is provided):
async def _search_by_vector(
self, query: str, limit: int = 10
) -> list[EntityStub]:
embedding = await self._embedding_fn(query)
embedding_str = (
"[" + ",".join(str(x) for x in embedding) + "]"
)
rows = await conn.fetch(
"""
SELECT entity_id, entity_type
FROM entity
WHERE embedding IS NOT NULL
AND (status IS NULL OR status != 'merged')
ORDER BY embedding::vector <=> $1::vector
LIMIT $2
""",
embedding_str, limit,
)
The <=> operator is pgvector's cosine distance operator. The query embeds
the search string and finds the nearest entity embeddings. Embedding model
consistency — using the same model at ingest time and query time — is
critical; mismatched models produce meaningless distances. This is guaranteed
automatically when the identity server owns all embeddings: it uses the same
model for both, and the query layer never needs to know which one.
Name search (fallback when no embedding_fn is provided):
async def _search_by_name(
self, query: str, limit: int = 10
) -> list[EntityStub]:
pattern = f"%{query}%"
rows = await conn.fetch(
"SELECT entity_id, entity_type FROM entity "
"WHERE name ILIKE $1 "
"AND (status IS NULL OR status != 'merged') "
"ORDER BY name LIMIT $2",
pattern, limit,
)
ILIKE is case-insensitive substring matching. It is less precise than vector search -- a query for "Cushing" matches "Cushing disease," "Cushing syndrome," and any entity whose name contains the substring. For the medlit demo graph, where entity names are short and specific, this is acceptable. For large general-purpose graphs, vector search is preferred.
Edge Traversal¶
The traversal methods are straightforward:
async def edges_from(self, entity_id: str) -> list[Edge]:
rows = await conn.fetch(
"SELECT subject_id, predicate, object_id "
"FROM relationship "
"WHERE subject_id = $1",
entity_id,
)
return [
Edge(
subject=r["subject_id"],
predicate=r["predicate"],
object=r["object_id"]
) for r in rows
]
edges_to is identical with WHERE object_id = $1. Both are called
concurrently for every node in the BFS frontier. The connection pool
handles concurrent acquisition; asyncpg's pool is safe for concurrent use.
Metadata and Evidence¶
metadata_for_edge is the most complex method in the backend. An edge
record in the relationship table carries confidence, source_documents,
and properties. Evidence provenance is in bundle_evidence, keyed by
the string subject_id:predicate:object_id:
async def metadata_for_edge(
self, edge: Edge
) -> dict[str, Any]:
rel_key = f"{edge.subject}:{edge.predicate}:{edge.object}"
evidence_rows = await _fetch_evidence(
conn, rel_row["id"], rel_key
)
...
The _fetch_evidence helper tries two schemas: the test schema (an evidence
table with a UUID foreign key) and the kgserver schema (bundle_evidence
with a string relationship key). This dual-schema support exists because the
integration tests build a minimal schema from scratch while the production
demo data uses the full kgserver schema. Both schemas are handled
transparently.
The resulting metadata dict for a full edge includes confidence, source
document IDs, any relationship properties, and a provenance list with
text spans, confidence scores, and document references. This provenance
is stripped by _slim_result in the server layer before returning BFS
results -- it is available only through describe_entity on a specific
node, which fetches the edge metadata directly.