core / bus.py — MemoryBus
In-memory pub/sub façade over Vector / Graph / Skill / Profile / Episodic stores. Hydrates context with Reciprocal Rank Fusion; publishes facts in parallel.
Public surface
| Member | Signature | Purpose |
|---|---|---|
__init__ | (vector_memory, graph_memory, skill_memory, profile_memory, episodic_memory) | All five tiers are optional — missing ones are skipped at fan-out time. |
async hydrate_context | (query, max_chars=6000, rrf_k=60, context_budget=0, llm_client=None) -> str | Fetch all tiers in parallel, fuse with RRF, return sectioned markdown for prompt injection. |
async publish_fact | (event_type, fact_data) -> Dict[str, Any] | Fan-out write to vector, graph, profile, skill stores in parallel. Dedup-LRU gate prevents replays. |
RRF (Reciprocal Rank Fusion)
For each ranker (vector, graph, skill, episodic) the bus computes:
score(doc) = Σ_ranker weight_ranker / (k + rank_in_ranker + 1)
k = rrf_k defaults to 60 (line 130s). Per-source weights depend on a query intent classification:
| Intent | graph | vector | skill | episodic |
|---|---|---|---|---|
| factual | 2.0 | 1.0 | 0.5 | 0.3 |
| procedural | 0.5 | 1.0 | 2.0 | 1.5 |
| contextual | 1.0 | 1.5 | 1.0 | 1.0 |
Section budgets
Total max_chars is split across sources before truncation:
- graph — 25 %
- vector — 40 %
- skill — 20 %
- episodic — 15 %
Dedup LRU
_DEDUP_LRU_MAX = 256. Each publish_fact hashes (event_type, fact_data) with MD5 before the fan-out — a recently-seen signature short-circuits and writes nothing. Prevents re-emission storms when the LLM repeats itself.
Fan-out diagram
Figure 5 — Memory-bus hydration: parallel fetch across tiers, RRF fusion at the end.
Concurrency
asyncio.gather drives the parallel fetches; sync-only memory stores are wrapped via asyncio.to_thread.
Detailed RRF derivation in algorithms / memory hydration.