Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 12 additions & 8 deletions openviking/session/compressor.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,15 @@ def __init__(
self.extractor = MemoryExtractor()
self.deduplicator = MemoryDeduplicator(vikingdb=vikingdb)

async def _index_memory(self, memory: Context) -> bool:
"""Add memory to vectorization queue."""
async def _index_memory(self, memory: Context, ctx: RequestContext) -> bool:
"""Add memory to vectorization queue and trigger parent directory semantic generation."""
from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter

embedding_msg = EmbeddingMsgConverter.from_context(memory)
await self.vikingdb.enqueue_embedding_msg(embedding_msg)
logger.info(f"Enqueued memory for vectorization: {memory.uri}")

await self.extractor._enqueue_semantic_for_parent(memory.uri, ctx)
return True

async def _merge_into_existing(
Expand Down Expand Up @@ -106,7 +108,7 @@ async def _merge_into_existing(
"Merged memory %s with abstract %s", target_memory.uri, target_memory.abstract
)
target_memory.set_vectorize(Vectorize(text=payload.content))
await self._index_memory(target_memory)
await self._index_memory(target_memory, ctx)
return True
except Exception as e:
logger.error(f"Failed to merge memory {target_memory.uri}: {e}")
Expand Down Expand Up @@ -162,15 +164,17 @@ async def extract_long_term_memories(
if memory:
memories.append(memory)
stats.created += 1
await self._index_memory(memory)
await self._index_memory(memory, ctx)
else:
stats.skipped += 1
continue

# Tool/Skill Memory: 特殊合并逻辑
if candidate.category in TOOL_SKILL_CATEGORIES:
if isinstance(candidate, ToolSkillCandidateMemory):
tool_name, skill_name, tool_status = self._get_tool_skill_info(candidate, tool_parts)
tool_name, skill_name, tool_status = self._get_tool_skill_info(
candidate, tool_parts
)
candidate.tool_status = tool_status
if skill_name:
memory = await self.extractor._merge_skill_memory(
Expand All @@ -187,7 +191,7 @@ async def extract_long_term_memories(
if memory:
memories.append(memory)
stats.merged += 1
await self._index_memory(memory)
await self._index_memory(memory, ctx)
continue

# Dedup check for other categories
Expand Down Expand Up @@ -250,7 +254,7 @@ async def extract_long_term_memories(
if memory:
memories.append(memory)
stats.created += 1
await self._index_memory(memory)
await self._index_memory(memory, ctx)
else:
stats.skipped += 1

Expand Down Expand Up @@ -316,7 +320,6 @@ def _get_tool_skill_info(
calibrated_tool = candidate_tool
else:
calibrated_tool = part.tool_name


if calibrated_skill:
return ("", calibrated_skill, tool_status)
Expand Down Expand Up @@ -346,6 +349,7 @@ def _is_similar_name(self, name1: str, name2: str) -> bool:
return True

from difflib import SequenceMatcher

ratio = SequenceMatcher(None, n1, n2).ratio()
return ratio >= 0.7

Expand Down
19 changes: 7 additions & 12 deletions openviking/session/memory_extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ class ToolSkillCandidateMemory(CandidateMemory):
duration_ms: int = 0 # 执行耗时(毫秒)
prompt_tokens: int = 0 # 输入 Token
completion_tokens: int = 0 # 输出 Token
call_time: int = 0 # 调用次数
success_time: int = 0 # 成功调用次数
call_time: int = 0 # 调用次数
success_time: int = 0 # 成功调用次数


@dataclass
Expand Down Expand Up @@ -241,7 +241,6 @@ async def extract(

messages = context["messages"]


tool_stats_map = self._collect_tool_stats_from_messages(messages)

# logger.warning(f"tool_stats_map={tool_stats_map}")
Expand Down Expand Up @@ -315,8 +314,8 @@ async def extract(
language=output_language,
tool_name=tool_name,
skill_name=skill_name,
call_time=stats.get("call_count",0),
success_time=stats.get("success_time",0),
call_time=stats.get("call_count", 0),
success_time=stats.get("success_time", 0),
duration_ms=stats.get("duration_ms", 0),
prompt_tokens=stats.get("prompt_tokens", 0),
completion_tokens=stats.get("completion_tokens", 0),
Expand Down Expand Up @@ -563,7 +562,7 @@ async def _merge_tool_memory(
new_stats = self._parse_tool_statistics(candidate.content)
if new_stats["total_calls"] == 0:
new_stats["total_calls"] = 1
tool_status = getattr(candidate, 'tool_status', 'completed')
tool_status = getattr(candidate, "tool_status", "completed")
if tool_status == "error":
new_stats["fail_count"] = 1
new_stats["success_count"] = 0
Expand All @@ -575,14 +574,12 @@ async def _merge_tool_memory(
merged_stats = self._compute_statistics_derived(new_stats)
merged_content = self._generate_tool_memory_content(tool_name, merged_stats, candidate)
await viking_fs.write_file(uri=uri, content=merged_content, ctx=ctx)
await self._enqueue_semantic_for_parent(uri, ctx)
return self._create_tool_context(uri, candidate, ctx)

existing_stats = self._parse_tool_statistics(existing)
merged_stats = self._merge_tool_statistics(existing_stats, new_stats)
merged_content = self._generate_tool_memory_content(tool_name, merged_stats, candidate)
await viking_fs.write_file(uri=uri, content=merged_content, ctx=ctx)
await self._enqueue_semantic_for_parent(uri, ctx)
return self._create_tool_context(uri, candidate, ctx)

async def _enqueue_semantic_for_parent(self, file_uri: str, ctx: "RequestContext") -> None:
Expand Down Expand Up @@ -680,11 +677,11 @@ def _format_ms(self, value_ms: float) -> str:
first_nonzero = -1
s = f"{value_ms:.20f}"
for i, c in enumerate(s):
if c not in ('0', '.'):
if c not in ("0", "."):
first_nonzero = i
break
if first_nonzero > 0:
decimals_needed = first_nonzero - s.index('.') + 1
decimals_needed = first_nonzero - s.index(".") + 1
formatted = f"{value_ms:.{decimals_needed}f}"
return f"{formatted}ms"

Expand Down Expand Up @@ -760,14 +757,12 @@ async def _merge_skill_memory(
skill_name, merged_stats, candidate
)
await viking_fs.write_file(uri=uri, content=merged_content, ctx=ctx)
await self._enqueue_semantic_for_parent(uri, ctx)
return self._create_skill_context(uri, candidate, ctx)

existing_stats = self._parse_skill_statistics(existing)
merged_stats = self._merge_skill_statistics(existing_stats, new_stats)
merged_content = self._generate_skill_memory_content(skill_name, merged_stats, candidate)
await viking_fs.write_file(uri=uri, content=merged_content, ctx=ctx)
await self._enqueue_semantic_for_parent(uri, ctx)
return self._create_skill_context(uri, candidate, ctx)

def _compute_skill_statistics_derived(self, stats: dict) -> dict:
Expand Down
Loading