From 136c15a0c3eeae03796ac6bd0cab43c4e0764817 Mon Sep 17 00:00:00 2001 From: qin-ctx Date: Thu, 5 Mar 2026 14:35:32 +0800 Subject: [PATCH] fix(session): trigger semantic indexing for parent directory after memory extraction Extracted memories were not searchable because _index_memory() only enqueued the memory file (level 2) to EmbeddingQueue, without triggering SemanticQueue for the parent directory. The hierarchical retriever searches level 0/1 first, so without parent directory .abstract.md/.overview.md being generated and indexed, the search could never discover the directory containing new memories. - Add _enqueue_semantic_for_parent() call in _index_memory() to generate parent directory summaries after every memory indexing operation - Remove duplicate _enqueue_semantic_for_parent() calls from _merge_tool_memory() and _merge_skill_memory() since _index_memory() now handles this uniformly Closes #422 Co-Authored-By: Claude Opus 4.6 --- openviking/session/compressor.py | 20 ++++++++++++-------- openviking/session/memory_extractor.py | 19 +++++++------------ 2 files changed, 19 insertions(+), 20 deletions(-) diff --git a/openviking/session/compressor.py b/openviking/session/compressor.py index 5a43412a..927d54c6 100644 --- a/openviking/session/compressor.py +++ b/openviking/session/compressor.py @@ -67,13 +67,15 @@ def __init__( self.extractor = MemoryExtractor() self.deduplicator = MemoryDeduplicator(vikingdb=vikingdb) - async def _index_memory(self, memory: Context) -> bool: - """Add memory to vectorization queue.""" + async def _index_memory(self, memory: Context, ctx: RequestContext) -> bool: + """Add memory to vectorization queue and trigger parent directory semantic generation.""" from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter embedding_msg = EmbeddingMsgConverter.from_context(memory) await self.vikingdb.enqueue_embedding_msg(embedding_msg) logger.info(f"Enqueued memory for vectorization: {memory.uri}") + + await self.extractor._enqueue_semantic_for_parent(memory.uri, ctx) return True async def _merge_into_existing( @@ -106,7 +108,7 @@ async def _merge_into_existing( "Merged memory %s with abstract %s", target_memory.uri, target_memory.abstract ) target_memory.set_vectorize(Vectorize(text=payload.content)) - await self._index_memory(target_memory) + await self._index_memory(target_memory, ctx) return True except Exception as e: logger.error(f"Failed to merge memory {target_memory.uri}: {e}") @@ -162,7 +164,7 @@ async def extract_long_term_memories( if memory: memories.append(memory) stats.created += 1 - await self._index_memory(memory) + await self._index_memory(memory, ctx) else: stats.skipped += 1 continue @@ -170,7 +172,9 @@ async def extract_long_term_memories( # Tool/Skill Memory: 特殊合并逻辑 if candidate.category in TOOL_SKILL_CATEGORIES: if isinstance(candidate, ToolSkillCandidateMemory): - tool_name, skill_name, tool_status = self._get_tool_skill_info(candidate, tool_parts) + tool_name, skill_name, tool_status = self._get_tool_skill_info( + candidate, tool_parts + ) candidate.tool_status = tool_status if skill_name: memory = await self.extractor._merge_skill_memory( @@ -187,7 +191,7 @@ async def extract_long_term_memories( if memory: memories.append(memory) stats.merged += 1 - await self._index_memory(memory) + await self._index_memory(memory, ctx) continue # Dedup check for other categories @@ -250,7 +254,7 @@ async def extract_long_term_memories( if memory: memories.append(memory) stats.created += 1 - await self._index_memory(memory) + await self._index_memory(memory, ctx) else: stats.skipped += 1 @@ -316,7 +320,6 @@ def _get_tool_skill_info( calibrated_tool = candidate_tool else: calibrated_tool = part.tool_name - if calibrated_skill: return ("", calibrated_skill, tool_status) @@ -346,6 +349,7 @@ def _is_similar_name(self, name1: str, name2: str) -> bool: return True from difflib import SequenceMatcher + ratio = SequenceMatcher(None, n1, n2).ratio() return ratio >= 0.7 diff --git a/openviking/session/memory_extractor.py b/openviking/session/memory_extractor.py index cc3da3c5..4d411b6c 100644 --- a/openviking/session/memory_extractor.py +++ b/openviking/session/memory_extractor.py @@ -66,8 +66,8 @@ class ToolSkillCandidateMemory(CandidateMemory): duration_ms: int = 0 # 执行耗时(毫秒) prompt_tokens: int = 0 # 输入 Token completion_tokens: int = 0 # 输出 Token - call_time: int = 0 # 调用次数 - success_time: int = 0 # 成功调用次数 + call_time: int = 0 # 调用次数 + success_time: int = 0 # 成功调用次数 @dataclass @@ -241,7 +241,6 @@ async def extract( messages = context["messages"] - tool_stats_map = self._collect_tool_stats_from_messages(messages) # logger.warning(f"tool_stats_map={tool_stats_map}") @@ -315,8 +314,8 @@ async def extract( language=output_language, tool_name=tool_name, skill_name=skill_name, - call_time=stats.get("call_count",0), - success_time=stats.get("success_time",0), + call_time=stats.get("call_count", 0), + success_time=stats.get("success_time", 0), duration_ms=stats.get("duration_ms", 0), prompt_tokens=stats.get("prompt_tokens", 0), completion_tokens=stats.get("completion_tokens", 0), @@ -563,7 +562,7 @@ async def _merge_tool_memory( new_stats = self._parse_tool_statistics(candidate.content) if new_stats["total_calls"] == 0: new_stats["total_calls"] = 1 - tool_status = getattr(candidate, 'tool_status', 'completed') + tool_status = getattr(candidate, "tool_status", "completed") if tool_status == "error": new_stats["fail_count"] = 1 new_stats["success_count"] = 0 @@ -575,14 +574,12 @@ async def _merge_tool_memory( merged_stats = self._compute_statistics_derived(new_stats) merged_content = self._generate_tool_memory_content(tool_name, merged_stats, candidate) await viking_fs.write_file(uri=uri, content=merged_content, ctx=ctx) - await self._enqueue_semantic_for_parent(uri, ctx) return self._create_tool_context(uri, candidate, ctx) existing_stats = self._parse_tool_statistics(existing) merged_stats = self._merge_tool_statistics(existing_stats, new_stats) merged_content = self._generate_tool_memory_content(tool_name, merged_stats, candidate) await viking_fs.write_file(uri=uri, content=merged_content, ctx=ctx) - await self._enqueue_semantic_for_parent(uri, ctx) return self._create_tool_context(uri, candidate, ctx) async def _enqueue_semantic_for_parent(self, file_uri: str, ctx: "RequestContext") -> None: @@ -680,11 +677,11 @@ def _format_ms(self, value_ms: float) -> str: first_nonzero = -1 s = f"{value_ms:.20f}" for i, c in enumerate(s): - if c not in ('0', '.'): + if c not in ("0", "."): first_nonzero = i break if first_nonzero > 0: - decimals_needed = first_nonzero - s.index('.') + 1 + decimals_needed = first_nonzero - s.index(".") + 1 formatted = f"{value_ms:.{decimals_needed}f}" return f"{formatted}ms" @@ -760,14 +757,12 @@ async def _merge_skill_memory( skill_name, merged_stats, candidate ) await viking_fs.write_file(uri=uri, content=merged_content, ctx=ctx) - await self._enqueue_semantic_for_parent(uri, ctx) return self._create_skill_context(uri, candidate, ctx) existing_stats = self._parse_skill_statistics(existing) merged_stats = self._merge_skill_statistics(existing_stats, new_stats) merged_content = self._generate_skill_memory_content(skill_name, merged_stats, candidate) await viking_fs.write_file(uri=uri, content=merged_content, ctx=ctx) - await self._enqueue_semantic_for_parent(uri, ctx) return self._create_skill_context(uri, candidate, ctx) def _compute_skill_statistics_derived(self, stats: dict) -> dict: