core / bus.py — MemoryBus

In-memory pub/sub façade over Vector / Graph / Skill / Profile / Episodic stores. Hydrates context with Reciprocal Rank Fusion; publishes facts in parallel.

Public surface

MemberSignaturePurpose
__init__(vector_memory, graph_memory, skill_memory, profile_memory, episodic_memory)All five tiers are optional — missing ones are skipped at fan-out time.
async hydrate_context(query, max_chars=6000, rrf_k=60, context_budget=0, llm_client=None) -> strFetch all tiers in parallel, fuse with RRF, return sectioned markdown for prompt injection.
async publish_fact(event_type, fact_data) -> Dict[str, Any]Fan-out write to vector, graph, profile, skill stores in parallel. Dedup-LRU gate prevents replays.

RRF (Reciprocal Rank Fusion)

For each ranker (vector, graph, skill, episodic) the bus computes:

score(doc) = Σ_ranker  weight_ranker / (k + rank_in_ranker + 1)

k = rrf_k defaults to 60 (line 130s). Per-source weights depend on a query intent classification:

Intentgraphvectorskillepisodic
factual2.01.00.50.3
procedural0.51.02.01.5
contextual1.01.51.01.0

Section budgets

Total max_chars is split across sources before truncation:

Dedup LRU

_DEDUP_LRU_MAX = 256. Each publish_fact hashes (event_type, fact_data) with MD5 before the fan-out — a recently-seen signature short-circuits and writes nothing. Prevents re-emission storms when the LLM repeats itself.

Fan-out diagram

user query decompose into 2-3 sub-queries VectorMemory GraphMemory SkillMemory EpisodicMemory RRF fusion + intent weights → markdown

Figure 5 — Memory-bus hydration: parallel fetch across tiers, RRF fusion at the end.

Concurrency

asyncio.gather drives the parallel fetches; sync-only memory stores are wrapped via asyncio.to_thread.

Detailed RRF derivation in algorithms / memory hydration.