diff --git a/openviking/async_client.py b/openviking/async_client.py index 1ba053d2..65fc673d 100644 --- a/openviking/async_client.py +++ b/openviking/async_client.py @@ -6,6 +6,7 @@ For HTTP mode, use AsyncHTTPClient or SyncHTTPClient. """ +from __future__ import annotations import threading from typing import Any, Dict, List, Optional, Union @@ -177,17 +178,24 @@ async def add_resource( instruction: str = "", wait: bool = False, timeout: float = None, + build_index: bool = True, + summarize: bool = False, **kwargs, ) -> Dict[str, Any]: - """Add resource to OpenViking (only supports resources scope). + """ + Add a resource (file/URL) to OpenViking. Args: - wait: Whether to wait for semantic extraction and vectorization to complete - timeout: Wait timeout in seconds - **kwargs: Extra options forwarded to the parser chain, e.g. - ``strict``, ``ignore_dirs``, ``include``, ``exclude``. + path: Local file path or URL. + reason: Context/reason for adding this resource. + instruction: Specific instruction for processing. + wait: If True, wait for processing to complete. + target: Target path in VikingFS (e.g., "kb/docs"). + build_index: Whether to build vector index immediately (default: True). + summarize: Whether to generate summary (default: False). """ await self._ensure_initialized() + return await self._client.add_resource( path=path, target=target, @@ -195,6 +203,8 @@ async def add_resource( instruction=instruction, wait=wait, timeout=timeout, + build_index=build_index, + summarize=summarize, **kwargs, ) @@ -203,6 +213,34 @@ async def wait_processed(self, timeout: float = None) -> Dict[str, Any]: await self._ensure_initialized() return await self._client.wait_processed(timeout=timeout) + async def build_index( + self, + resource_uris: Union[str, List[str]], + **kwargs + ) -> Dict[str, Any]: + """ + Manually trigger index building for resources. + + Args: + resource_uris: Single URI or list of URIs to index. + """ + await self._ensure_initialized() + return await self._client.build_index(resource_uris, **kwargs) + + async def summarize( + self, + resource_uris: Union[str, List[str]], + **kwargs + ) -> Dict[str, Any]: + """ + Manually trigger summarization for resources. + + Args: + resource_uris: Single URI or list of URIs to summarize. + """ + await self._ensure_initialized() + return await self._client.summarize(resource_uris, **kwargs) + async def add_skill( self, data: Any, diff --git a/openviking/client/local.py b/openviking/client/local.py index 1e24757b..61cefcf9 100644 --- a/openviking/client/local.py +++ b/openviking/client/local.py @@ -93,6 +93,26 @@ async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any """Wait for all processing to complete.""" return await self._service.resources.wait_processed(timeout=timeout) + async def build_index( + self, + resource_uris: Union[str, List[str]], + **kwargs + ) -> Dict[str, Any]: + """Manually trigger index building.""" + if isinstance(resource_uris, str): + resource_uris = [resource_uris] + return await self._service.resources.build_index(resource_uris, ctx=self._ctx, **kwargs) + + async def summarize( + self, + resource_uris: Union[str, List[str]], + **kwargs + ) -> Dict[str, Any]: + """Manually trigger summarization.""" + if isinstance(resource_uris, str): + resource_uris = [resource_uris] + return await self._service.resources.summarize(resource_uris, ctx=self._ctx, **kwargs) + # ============= File System ============= async def ls( diff --git a/openviking/parse/tree_builder.py b/openviking/parse/tree_builder.py index ce8caa95..820ca554 100644 --- a/openviking/parse/tree_builder.py +++ b/openviking/parse/tree_builder.py @@ -25,6 +25,7 @@ from typing import TYPE_CHECKING, Optional from openviking.core.building_tree import BuildingTree +from openviking.core.context import Context from openviking.parse.parsers.media.utils import get_media_base_uri, get_media_type from openviking.server.identity import RequestContext from openviking.storage.queuefs import SemanticMsg, get_queue_manager @@ -87,29 +88,18 @@ async def finalize_from_temp( self, temp_dir_path: str, ctx: RequestContext, - scope: str, + scope: str = "resources", base_uri: Optional[str] = None, source_path: Optional[str] = None, source_format: Optional[str] = None, + trigger_semantic: bool = False, ) -> "BuildingTree": """ - Finalize tree from temporary directory (v5.0 architecture). - - New architecture: - 1. Move directory to AGFS - 2. Enqueue to SemanticQueue for async semantic generation - 3. Scan and create Resource objects (for compatibility) + Finalize processing by moving from temp to AGFS. Args: - temp_dir_path: Temporary directory Viking URI (e.g., viking://temp/xxx) - scope: Scope ("resources", "user", or "agent") - base_uri: Base URI (None = use scope default) - source_node: Source ResourceNode - source_path: Source file path - source_format: Source file format - - Returns: - Complete BuildingTree with all resources moved to AGFS + trigger_semantic: Whether to automatically trigger semantic generation. + Default is False (handled by ResourceProcessor/Summarizer). """ viking_fs = get_viking_fs() @@ -185,11 +175,12 @@ async def finalize_from_temp( logger.warning(f"[TreeBuilder] Failed to cleanup temp root: {e}") # 6. Enqueue to SemanticQueue for async semantic generation - try: - await self._enqueue_semantic_generation(final_uri, "resource", ctx=ctx) - logger.info(f"[TreeBuilder] Enqueued semantic generation for: {final_uri}") - except Exception as e: - logger.error(f"[TreeBuilder] Failed to enqueue semantic generation: {e}", exc_info=True) + if trigger_semantic: + try: + await self._enqueue_semantic_generation(final_uri, "resource", ctx=ctx) + logger.info(f"[TreeBuilder] Enqueued semantic generation for: {final_uri}") + except Exception as e: + logger.error(f"[TreeBuilder] Failed to enqueue semantic generation: {e}", exc_info=True) # 7. Return simple BuildingTree (no scanning needed) tree = BuildingTree( @@ -197,6 +188,10 @@ async def finalize_from_temp( source_format=source_format, ) tree._root_uri = final_uri + + # Create a minimal Context object for the root so that tree.root is not None + root_context = Context(uri=final_uri) + tree.add_context(root_context) return tree diff --git a/openviking/service/resource_service.py b/openviking/service/resource_service.py index 9f078580..54a4e2cc 100644 --- a/openviking/service/resource_service.py +++ b/openviking/service/resource_service.py @@ -6,7 +6,7 @@ Provides resource management operations: add_resource, add_skill, wait_processed. """ -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from openviking.server.identity import RequestContext from openviking.storage import VikingDBManager @@ -71,6 +71,8 @@ async def add_resource( instruction: str = "", wait: bool = False, timeout: Optional[float] = None, + build_index: bool = True, + summarize: bool = False, **kwargs, ) -> Dict[str, Any]: """Add resource to OpenViking (only supports resources scope). @@ -82,9 +84,9 @@ async def add_resource( instruction: Processing instruction wait: Whether to wait for semantic extraction and vectorization to complete timeout: Wait timeout in seconds - **kwargs: Extra options forwarded to the parser chain, e.g. - ``strict``, ``ignore_dirs``, ``include``, ``exclude`` - (used by ``DirectoryParser``). + build_index: Whether to build vector index immediately (default: True). + summarize: Whether to generate summary (default: False). + **kwargs: Extra options forwarded to the parser chain. Returns: Processing result @@ -106,6 +108,8 @@ async def add_resource( instruction=instruction, scope="resources", target=target, + build_index=build_index, + summarize=summarize, **kwargs, ) @@ -168,6 +172,42 @@ async def add_skill( return result + async def build_index( + self, + resource_uris: List[str], + ctx: RequestContext, + **kwargs + ) -> Dict[str, Any]: + """Manually trigger index building. + + Args: + resource_uris: List of resource URIs to index. + ctx: Request context. + + Returns: + Processing result + """ + self._ensure_initialized() + return await self._resource_processor.build_index(resource_uris, ctx, **kwargs) + + async def summarize( + self, + resource_uris: List[str], + ctx: RequestContext, + **kwargs + ) -> Dict[str, Any]: + """Manually trigger summarization. + + Args: + resource_uris: List of resource URIs to summarize. + ctx: Request context. + + Returns: + Processing result + """ + self._ensure_initialized() + return await self._resource_processor.summarize(resource_uris, ctx, **kwargs) + async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any]: """Wait for all queued processing to complete. diff --git a/openviking/storage/queuefs/semantic_msg.py b/openviking/storage/queuefs/semantic_msg.py index 4eb3ec89..5f7bd730 100644 --- a/openviking/storage/queuefs/semantic_msg.py +++ b/openviking/storage/queuefs/semantic_msg.py @@ -35,6 +35,8 @@ class SemanticMsg: user_id: str = "default" agent_id: str = "default" role: str = "root" + # Additional flags + skip_vectorization: bool = False def __init__( self, @@ -45,6 +47,7 @@ def __init__( user_id: str = "default", agent_id: str = "default", role: str = "root", + skip_vectorization: bool = False, ): self.id = str(uuid4()) self.uri = uri @@ -54,6 +57,7 @@ def __init__( self.user_id = user_id self.agent_id = agent_id self.role = role + self.skip_vectorization = skip_vectorization def to_dict(self) -> Dict[str, Any]: """Convert object to dictionary.""" @@ -88,6 +92,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "SemanticMsg": user_id=data.get("user_id", "default"), agent_id=data.get("agent_id", "default"), role=data.get("role", "root"), + skip_vectorization=data.get("skip_vectorization", False), ) if "id" in data and data["id"]: obj.id = data["id"] diff --git a/openviking/storage/queuefs/semantic_processor.py b/openviking/storage/queuefs/semantic_processor.py index 7d62d7fe..774ce07c 100644 --- a/openviking/storage/queuefs/semantic_processor.py +++ b/openviking/storage/queuefs/semantic_processor.py @@ -514,48 +514,20 @@ async def _vectorize_directory_simple( ) -> None: """Create directory Context and enqueue to EmbeddingQueue.""" - from openviking.storage.queuefs import get_queue_manager - from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter + if self._current_msg and getattr(self._current_msg, "skip_vectorization", False): + logger.info(f"Skipping vectorization for {uri} (requested via SemanticMsg)") + return - active_ctx = ctx or self._current_ctx - queue_manager = get_queue_manager() - embedding_queue = queue_manager.get_queue(queue_manager.EMBEDDING) - owner_space = self._owner_space_for_uri(uri, active_ctx) - parent_uri = VikingURI(uri).parent.uri + from openviking.utils.embedding_utils import vectorize_directory_meta - # Vectorize L0: .abstract.md (abstract), level=0 - context_abstract = Context( - uri=uri, - parent_uri=parent_uri, - is_leaf=False, - abstract=abstract, - context_type=context_type, - level=0, - user=active_ctx.user, - account_id=active_ctx.account_id, - owner_space=owner_space, - ) - context_abstract.set_vectorize(Vectorize(text=abstract)) - embedding_msg_abstract = EmbeddingMsgConverter.from_context(context_abstract) - await embedding_queue.enqueue(embedding_msg_abstract) # type: ignore - logger.debug(f"Enqueued directory L0 (abstract) for vectorization: {uri}") - - # Vectorize L1: .overview.md (overview), level=1 - context_overview = Context( + active_ctx = ctx or self._current_ctx + await vectorize_directory_meta( uri=uri, - parent_uri=parent_uri, - is_leaf=False, abstract=abstract, + overview=overview, context_type=context_type, - level=1, - user=active_ctx.user, - account_id=active_ctx.account_id, - owner_space=owner_space, + ctx=active_ctx, ) - context_overview.set_vectorize(Vectorize(text=overview)) - embedding_msg_overview = EmbeddingMsgConverter.from_context(context_overview) - await embedding_queue.enqueue(embedding_msg_overview) # type: ignore - logger.debug(f"Enqueued directory L1 (overview) for vectorization: {uri}") async def _vectorize_files( self, @@ -591,72 +563,14 @@ async def _vectorize_single_file( ctx: Optional[RequestContext] = None, ) -> None: """Vectorize a single file using its content or summary.""" - from datetime import datetime + from openviking.utils.embedding_utils import vectorize_file - from openviking.storage.queuefs import get_queue_manager - from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter - - try: - file_name = summary_dict.get("name") or file_path.split("/")[-1] - summary = summary_dict.get("summary", "") - - if embedding_queue is None: - queue_manager = get_queue_manager() - embedding_queue = queue_manager.get_queue(queue_manager.EMBEDDING) - - active_ctx = ctx or self._current_ctx - context = Context( - uri=file_path, - parent_uri=parent_uri, - is_leaf=True, - abstract=summary, - context_type=context_type, - created_at=datetime.now(), - user=active_ctx.user, - account_id=active_ctx.account_id, - owner_space=self._owner_space_for_uri(file_path, active_ctx), - ) - - if self.get_resource_content_type(file_name) == ResourceContentType.TEXT: - content = await get_viking_fs().read_file(file_path, ctx=active_ctx) - context.set_vectorize(Vectorize(text=content)) - elif summary: - context.set_vectorize(Vectorize(text=summary)) - else: - return + active_ctx = ctx or self._current_ctx + await vectorize_file( + file_path=file_path, + summary_dict=summary_dict, + parent_uri=parent_uri, + context_type=context_type, + ctx=active_ctx, + ) - embedding_msg = EmbeddingMsgConverter.from_context(context) - if not embedding_msg: - return - await embedding_queue.enqueue(embedding_msg) # type: ignore - logger.debug(f"Enqueued file for vectorization: {file_path}") - except Exception as e: - logger.error(f"Failed to vectorize file {file_path}: {e}", exc_info=True) - - def get_resource_content_type(self, file_name: str) -> ResourceContentType: - def _is_image_file(file_name: str) -> bool: - image_extensions = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".svg", ".webp"} - return any(file_name.endswith(ext) for ext in image_extensions) - - def _is_video_file(file_name: str) -> bool: - video_extensions = {".mp4", ".avi", ".mov", ".wmv", ".flv"} - return any(file_name.endswith(ext) for ext in video_extensions) - - def _is_text_file(file_name: str) -> bool: - text_extensions = {".txt", ".md", ".csv", ".json", ".xml"} - return any(file_name.endswith(ext) for ext in text_extensions) - - def _is_audio_file(file_name: str) -> bool: - audio_extensions = {".mp3", ".wav", ".aac", ".flac"} - return any(file_name.endswith(ext) for ext in audio_extensions) - - if _is_text_file(file_name): - return ResourceContentType.TEXT - elif _is_image_file(file_name): - return ResourceContentType.IMAGE - elif _is_video_file(file_name): - return ResourceContentType.VIDEO - elif _is_audio_file(file_name): - return ResourceContentType.AUDIO - - return ResourceContentType.BINARY diff --git a/openviking/sync_client.py b/openviking/sync_client.py index 9e58eb2a..7562715f 100644 --- a/openviking/sync_client.py +++ b/openviking/sync_client.py @@ -4,6 +4,7 @@ Synchronous OpenViking client implementation. """ +from __future__ import annotations from typing import TYPE_CHECKING, Any, Dict, List, Optional if TYPE_CHECKING: @@ -83,22 +84,28 @@ def add_resource( instruction: str = "", wait: bool = False, timeout: float = None, + build_index: bool = True, + summarize: bool = False, **kwargs, ) -> Dict[str, Any]: """Add resource to OpenViking (resources scope only) Args: + build_index: Whether to build vector index immediately (default: True). + summarize: Whether to generate summary (default: False). **kwargs: Extra options forwarded to the parser chain, e.g. ``strict``, ``ignore_dirs``, ``include``, ``exclude``. """ return run_async( self._async_client.add_resource( - path, - target, - reason, - instruction, - wait, - timeout, + path=path, + target=target, + reason=reason, + instruction=instruction, + wait=wait, + timeout=timeout, + build_index=build_index, + summarize=summarize, **kwargs, ) ) diff --git a/openviking/utils/embedding_utils.py b/openviking/utils/embedding_utils.py new file mode 100644 index 00000000..5f653065 --- /dev/null +++ b/openviking/utils/embedding_utils.py @@ -0,0 +1,236 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +""" +Embedding utilities for OpenViking. + +Common logic for creating Context objects and enqueuing them to EmbeddingQueue. +""" + +import asyncio +import os +from datetime import datetime +from typing import Any, Dict, List, Optional, Union + +from openviking.core.context import Context, ContextLevel, ResourceContentType, Vectorize +from openviking.server.identity import RequestContext +from openviking.storage.queuefs import get_queue_manager +from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter +from openviking.storage.viking_fs import get_viking_fs +from openviking_cli.utils import VikingURI, get_logger + +logger = get_logger(__name__) + +def _owner_space_for_uri(uri: str, ctx: RequestContext) -> str: + """Derive owner_space from a URI.""" + if uri.startswith("viking://agent/"): + return ctx.user.agent_space_name() + if uri.startswith("viking://user/") or uri.startswith("viking://session/"): + return ctx.user.user_space_name() + return "" + +def get_resource_content_type(file_name: str) -> ResourceContentType: + """Determine resource content type based on file extension.""" + file_name = file_name.lower() + + text_extensions = {".txt", ".md", ".csv", ".json", ".xml", ".py", ".js", ".ts", ".java", ".cpp", ".c", ".h", ".go", ".rs"} + image_extensions = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".svg", ".webp"} + video_extensions = {".mp4", ".avi", ".mov", ".wmv", ".flv"} + audio_extensions = {".mp3", ".wav", ".aac", ".flac"} + + if any(file_name.endswith(ext) for ext in text_extensions): + return ResourceContentType.TEXT + elif any(file_name.endswith(ext) for ext in image_extensions): + return ResourceContentType.IMAGE + elif any(file_name.endswith(ext) for ext in video_extensions): + return ResourceContentType.VIDEO + elif any(file_name.endswith(ext) for ext in audio_extensions): + return ResourceContentType.AUDIO + + return ResourceContentType.UNKNOWN + +async def vectorize_directory_meta( + uri: str, + abstract: str, + overview: str, + context_type: str = "resource", + ctx: Optional[RequestContext] = None, +) -> None: + """ + Vectorize directory metadata (.abstract.md and .overview.md). + + Creates Context objects for abstract and overview and enqueues them. + """ + if not ctx: + logger.warning("No context provided for vectorization") + return + + queue_manager = get_queue_manager() + embedding_queue = queue_manager.get_queue(queue_manager.EMBEDDING) + + parent_uri = VikingURI(uri).parent.uri + owner_space = _owner_space_for_uri(uri, ctx) + + # Vectorize L0: .abstract.md (abstract) + context_abstract = Context( + uri=uri, + parent_uri=parent_uri, + is_leaf=False, + abstract=abstract, + context_type=context_type, + level=ContextLevel.ABSTRACT, + user=ctx.user, + account_id=ctx.account_id, + owner_space=owner_space, + ) + context_abstract.set_vectorize(Vectorize(text=abstract)) + msg_abstract = EmbeddingMsgConverter.from_context(context_abstract) + if msg_abstract: + await embedding_queue.enqueue(msg_abstract) + logger.debug(f"Enqueued directory L0 (abstract) for vectorization: {uri}") + + # Vectorize L1: .overview.md (overview) + context_overview = Context( + uri=uri, + parent_uri=parent_uri, + is_leaf=False, + abstract=abstract, + context_type=context_type, + level=ContextLevel.OVERVIEW, + user=ctx.user, + account_id=ctx.account_id, + owner_space=owner_space, + ) + context_overview.set_vectorize(Vectorize(text=overview)) + msg_overview = EmbeddingMsgConverter.from_context(context_overview) + if msg_overview: + await embedding_queue.enqueue(msg_overview) + logger.debug(f"Enqueued directory L1 (overview) for vectorization: {uri}") + +async def vectorize_file( + file_path: str, + summary_dict: Dict[str, str], + parent_uri: str, + context_type: str = "resource", + ctx: Optional[RequestContext] = None, +) -> None: + """ + Vectorize a single file. + + Creates Context object for the file and enqueues it. + Reads content for TEXT files, otherwise uses summary. + """ + if not ctx: + logger.warning("No context provided for vectorization") + return + + queue_manager = get_queue_manager() + embedding_queue = queue_manager.get_queue(queue_manager.EMBEDDING) + viking_fs = get_viking_fs() + + try: + file_name = summary_dict.get("name") or os.path.basename(file_path) + summary = summary_dict.get("summary", "") + + context = Context( + uri=file_path, + parent_uri=parent_uri, + is_leaf=True, + abstract=summary, + context_type=context_type, + created_at=datetime.now(), + user=ctx.user, + account_id=ctx.account_id, + owner_space=_owner_space_for_uri(file_path, ctx), + ) + + content_type = get_resource_content_type(file_name) + if content_type == ResourceContentType.TEXT: + # For text files, try to read content + try: + content = await viking_fs.read_file(file_path, ctx=ctx) + if isinstance(content, bytes): + content = content.decode("utf-8", errors="replace") + context.set_vectorize(Vectorize(text=content)) + except Exception as e: + logger.warning(f"Failed to read file content for {file_path}, falling back to summary: {e}") + if summary: + context.set_vectorize(Vectorize(text=summary)) + else: + logger.warning(f"No summary available for {file_path}, skipping vectorization") + return + elif summary: + # For non-text files, use summary + context.set_vectorize(Vectorize(text=summary)) + else: + logger.debug(f"Skipping file {file_path} (no text content or summary)") + return + + embedding_msg = EmbeddingMsgConverter.from_context(context) + if not embedding_msg: + return + + await embedding_queue.enqueue(embedding_msg) + logger.debug(f"Enqueued file for vectorization: {file_path}") + + except Exception as e: + logger.error(f"Failed to vectorize file {file_path}: {e}", exc_info=True) + +async def index_resource( + uri: str, + ctx: RequestContext, +) -> None: + """ + Build vector index for a resource directory. + + 1. Reads .abstract.md and .overview.md and vectorizes them. + 2. Scans files in the directory and vectorizes them. + """ + viking_fs = get_viking_fs() + + # 1. Index Directory Metadata + abstract_uri = f"{uri}/.abstract.md" + overview_uri = f"{uri}/.overview.md" + + abstract = "" + overview = "" + + if await viking_fs.exists(abstract_uri): + content = await viking_fs.read_file(abstract_uri) + if isinstance(content, bytes): + abstract = content.decode("utf-8") + + if await viking_fs.exists(overview_uri): + content = await viking_fs.read_file(overview_uri) + if isinstance(content, bytes): + overview = content.decode("utf-8") + + if abstract or overview: + await vectorize_directory_meta(uri, abstract, overview, ctx=ctx) + + # 2. Index Files + try: + files = await viking_fs.ls(uri, ctx=ctx) + for file_info in files: + file_name = file_info["name"] + + # Skip hidden files (like .abstract.md) + if file_name.startswith("."): + continue + + if file_info.get("type") == "directory" or file_info.get("isDir"): + # TODO: Recursive indexing? For now, skip subdirectories to match previous behavior + continue + + file_uri = file_info.get("uri") or f"{uri}/{file_name}" + + # For direct indexing, we might not have summaries. + # We pass empty summary_dict, vectorize_file will try to read content for text files. + await vectorize_file( + file_path=file_uri, + summary_dict={"name": file_name}, + parent_uri=uri, + ctx=ctx + ) + + except Exception as e: + logger.error(f"Failed to scan directory {uri} for indexing: {e}") diff --git a/openviking/utils/resource_processor.py b/openviking/utils/resource_processor.py index c36a4ced..3ac5beaa 100644 --- a/openviking/utils/resource_processor.py +++ b/openviking/utils/resource_processor.py @@ -7,12 +7,14 @@ as described in the OpenViking design document. """ -from typing import TYPE_CHECKING, Any, Dict, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional from openviking.parse.tree_builder import TreeBuilder from openviking.server.identity import RequestContext from openviking.storage import VikingDBManager from openviking.storage.viking_fs import get_viking_fs +from openviking.utils.embedding_utils import index_resource +from openviking.utils.summarizer import Summarizer from openviking_cli.utils import get_logger from openviking_cli.utils.storage import StoragePath @@ -49,6 +51,13 @@ def __init__( self.tree_builder = TreeBuilder() self._vlm_processor = None self._media_processor = None + self._summarizer = None + + def _get_summarizer(self) -> "Summarizer": + """Lazy initialization of Summarizer.""" + if self._summarizer is None: + self._summarizer = Summarizer(self._get_vlm_processor()) + return self._summarizer def _get_vlm_processor(self) -> "VLMProcessor": """Lazy initialization of VLM processor.""" @@ -69,6 +78,15 @@ def _get_media_processor(self): ) return self._media_processor + async def build_index(self, resource_uris: List[str], ctx: RequestContext, **kwargs) -> Dict[str, Any]: + """Expose index building as a standalone method.""" + for uri in resource_uris: + await index_resource(uri, ctx) + return {"status": "success", "message": f"Indexed {len(resource_uris)} resources"} + + async def summarize(self, resource_uris: List[str], ctx: RequestContext, **kwargs) -> Dict[str, Any]: + """Expose summarization as a standalone method.""" + return await self._get_summarizer().summarize(resource_uris, ctx, **kwargs) async def process_resource( self, path: str, @@ -78,6 +96,8 @@ async def process_resource( scope: str = "resources", user: Optional[str] = None, target: Optional[str] = None, + build_index: bool = True, + summarize: bool = False, **kwargs, ) -> Dict[str, Any]: """ @@ -86,7 +106,8 @@ async def process_resource( Workflow: 1. Parse source (writes to temp directory) 2. TreeBuilder moves to AGFS - 3. SemanticQueue generates L0/L1 and vectorizes asynchronously + 3. (Optional) Build vector index + 4. (Optional) Summarize """ result = { "status": "success", @@ -157,6 +178,8 @@ async def process_resource( source_path=parse_result.source_path, source_format=parse_result.source_format, ) + if context_tree and context_tree.root: + result["root_uri"] = context_tree.root.uri except Exception as e: result["status"] = "error" result["errors"].append(f"Finalize from temp error: {e}") @@ -170,7 +193,35 @@ async def process_resource( return result - # Check media strategy + # ============ Phase 4: Optional Steps ============ + if summarize: + # Explicit summarization request. + # If build_index is ALSO True, we want vectorization. + # If build_index is False, we skip vectorization. + skip_vec = not build_index + try: + await self._get_summarizer().summarize( + resource_uris=[result["root_uri"]], + ctx=ctx, + skip_vectorization=skip_vec, + **kwargs + ) + except Exception as e: + logger.error(f"Summarization failed: {e}") + result["warnings"] = result.get("warnings", []) + [f"Summarization failed: {e}"] + + elif build_index: + # Standard compatibility mode: "Just Index it" usually implies ingestion flow. + # We assume this means "Ingest and Index", which requires summarization. + try: + await self._get_summarizer().summarize( + resource_uris=[result["root_uri"]], + ctx=ctx, + skip_vectorization=False, + **kwargs + ) + except Exception as e: + logger.error(f"Auto-index failed: {e}") + result["warnings"] = result.get("warnings", []) + [f"Auto-index failed: {e}"] - result["root_uri"] = context_tree._root_uri return result diff --git a/openviking/utils/summarizer.py b/openviking/utils/summarizer.py new file mode 100644 index 00000000..8b3a0939 --- /dev/null +++ b/openviking/utils/summarizer.py @@ -0,0 +1,63 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +""" +Summarizer for OpenViking. + +Handles summarization and key information extraction. +""" + +from typing import TYPE_CHECKING, Any, Dict, List, Optional +from openviking_cli.utils import get_logger +from openviking.storage.queuefs import SemanticMsg, get_queue_manager + +if TYPE_CHECKING: + from openviking.server.identity import RequestContext + from openviking.parse.vlm import VLMProcessor + +logger = get_logger(__name__) + +class Summarizer: + """ + Handles summarization of resources. + """ + + def __init__(self, vlm_processor: "VLMProcessor"): + self.vlm_processor = vlm_processor + + async def summarize( + self, + resource_uris: List[str], + ctx: "RequestContext", + skip_vectorization: bool = False, + **kwargs + ) -> Dict[str, Any]: + """ + Summarize the given resources. + Triggers SemanticQueue to generate .abstract.md and .overview.md. + """ + queue_manager = get_queue_manager() + semantic_queue = queue_manager.get_queue(queue_manager.SEMANTIC, allow_create=True) + + enqueued_count = 0 + for uri in resource_uris: + # Determine context_type based on URI + context_type = "resource" + if uri.startswith("viking://memory/"): + context_type = "memory" + elif uri.startswith("viking://agent/skills/"): + context_type = "skill" + + msg = SemanticMsg( + uri=uri, + context_type=context_type, + account_id=ctx.account_id, + user_id=ctx.user.user_id, + agent_id=ctx.user.agent_id, + role=ctx.role.value, + skip_vectorization=skip_vectorization, + ) + await semantic_queue.enqueue(msg) + enqueued_count += 1 + logger.info(f"Enqueued semantic generation for: {uri} (skip_vectorization={skip_vectorization})") + + return {"status": "success", "enqueued_count": enqueued_count} diff --git a/setup.py b/setup.py index f36e526a..98dd96fc 100644 --- a/setup.py +++ b/setup.py @@ -170,7 +170,9 @@ def build_agfs(self): print(f"Build stderr: {e.stderr.decode('utf-8', errors='replace')}") else: - if not agfs_server_dir.exists(): + if agfs_target_binary.exists(): + print(f"[Info] Go compiler not found, but AGFS binary exists at {agfs_target_binary}. Skipping build.") + elif not agfs_server_dir.exists(): print(f"[Warning] AGFS source directory not found at {agfs_server_dir}") else: print("[Warning] Go compiler not found. Cannot build AGFS from source.") diff --git a/tests/integration/test_add_resource_index.py b/tests/integration/test_add_resource_index.py new file mode 100644 index 00000000..69a664c3 --- /dev/null +++ b/tests/integration/test_add_resource_index.py @@ -0,0 +1,156 @@ +import pytest +import asyncio +import os +import json +import shutil +from pathlib import Path +from unittest.mock import MagicMock, AsyncMock, patch + +from openviking.async_client import AsyncOpenViking +from openviking_cli.utils.config.open_viking_config import OpenVikingConfigSingleton +from tests.utils.mock_agfs import MockLocalAGFS + +@pytest.fixture +def test_config(tmp_path): + """Create a temporary config file.""" + config_path = tmp_path / "ov.conf" + workspace = tmp_path / "workspace" + workspace.mkdir() + + config_content = { + "storage": { + "workspace": str(workspace), + "agfs": { + "backend": "local", + "port": 1833 + }, + "vectordb": { + "backend": "local" + } + }, + "embedding": { + "dense": { + "provider": "openai", + "api_key": "fake", + "model": "text-embedding-3-small" + } + }, + "vlm": { + "provider": "openai", + "api_key": "fake", + "model": "gpt-4-vision-preview" + } + } + config_path.write_text(json.dumps(config_content)) + return config_path + +@pytest.fixture +async def client(test_config, tmp_path): + """Initialize AsyncOpenViking client with mocks.""" + + # Set config env var + os.environ["OPENVIKING_CONFIG_FILE"] = str(test_config) + + # Reset Singletons + OpenVikingConfigSingleton._instance = None + await AsyncOpenViking.reset() + + mock_agfs = MockLocalAGFS(root_path=tmp_path / "mock_agfs_root") + + # Mock LLM/VLM services AND AGFS + with patch("openviking.utils.summarizer.Summarizer.summarize") as mock_summarize, \ + patch("openviking.utils.index_builder.IndexBuilder.build_index") as mock_build_index, \ + patch("openviking.utils.agfs_utils.create_agfs_client", return_value=mock_agfs), \ + patch("openviking.agfs_manager.AGFSManager.start"), \ + patch("openviking.agfs_manager.AGFSManager.stop"): + + # Make mocks return success + mock_summarize.return_value = {"status": "success"} + mock_build_index.return_value = {"status": "success"} + + client = AsyncOpenViking(path=str(test_config.parent)) + await client.initialize() + + yield client + + await client.close() + + # Cleanup + OpenVikingConfigSingleton._instance = None + if "OPENVIKING_CONFIG_FILE" in os.environ: + del os.environ["OPENVIKING_CONFIG_FILE"] + +@pytest.mark.asyncio +async def test_add_resource_indexing_logic(test_config, tmp_path): + """ + Integration-like test for add_resource indexing logic. + Uses Mock AGFS but tests the client logic. + """ + # Set config env var + os.environ["OPENVIKING_CONFIG_FILE"] = str(test_config) + OpenVikingConfigSingleton._instance = None + await AsyncOpenViking.reset() + + # Create dummy resource + resource_file = tmp_path / "test_doc.md" + resource_file.write_text("# Test Document\n\nThis is a test document.", encoding="utf-8") + + mock_agfs = MockLocalAGFS(root_path=tmp_path / "mock_agfs_root") + + # Patch the Summarizer and IndexBuilder to verify calls + with patch("openviking.utils.summarizer.Summarizer.summarize", new_callable=AsyncMock) as mock_summarize, \ + patch("openviking.utils.agfs_utils.create_agfs_client", return_value=mock_agfs), \ + patch("openviking.agfs_manager.AGFSManager.start"), \ + patch("openviking.agfs_manager.AGFSManager.stop"): + + mock_summarize.return_value = {"status": "success"} + + client = AsyncOpenViking(path=str(test_config.parent)) + await client.initialize() + + try: + # 1. Test with build_index=True + await client.add_resource( + path=str(resource_file), + build_index=True, + wait=True + ) + + # Verify summarizer called with skip_vectorization=False + assert mock_summarize.call_count == 1 + call_kwargs = mock_summarize.call_args.kwargs + assert call_kwargs.get("skip_vectorization") is False + + mock_summarize.reset_mock() + + # 2. Test with build_index=False, summarize=True + await client.add_resource( + path=str(resource_file), + build_index=False, + summarize=True, + wait=True + ) + + # Verify summarizer called with skip_vectorization=True + assert mock_summarize.call_count == 1 + call_kwargs = mock_summarize.call_args.kwargs + assert call_kwargs.get("skip_vectorization") is True + + mock_summarize.reset_mock() + + # 3. Test with build_index=False, summarize=False + await client.add_resource( + path=str(resource_file), + build_index=False, + summarize=False, + wait=True + ) + + # Verify summarizer NOT called + mock_summarize.assert_not_called() + + finally: + await client.close() + OpenVikingConfigSingleton._instance = None + if "OPENVIKING_CONFIG_FILE" in os.environ: + del os.environ["OPENVIKING_CONFIG_FILE"] diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/utils/mock_agfs.py b/tests/utils/mock_agfs.py new file mode 100644 index 00000000..af8fa649 --- /dev/null +++ b/tests/utils/mock_agfs.py @@ -0,0 +1,100 @@ +import shutil +from pathlib import Path +from unittest.mock import MagicMock + +class MockLocalAGFS: + """ + A mock implementation of AGFSClient that operates on a local directory. + Useful for tests where running a real AGFS server is not feasible or desired. + """ + def __init__(self, config=None, root_path=None): + self.config = config + self.root = Path(root_path) if root_path else Path("/tmp/viking_data") + self.root.mkdir(parents=True, exist_ok=True) + + def _resolve(self, path): + if str(path).startswith("viking://"): + path = str(path).replace("viking://", "") + if str(path).startswith("/"): + path = str(path)[1:] + return self.root / path + + def exists(self, path, ctx=None): + return self._resolve(path).exists() + + def mkdir(self, path, ctx=None, parents=True, exist_ok=True): + self._resolve(path).mkdir(parents=parents, exist_ok=exist_ok) + + def ls(self, path, ctx=None, **kwargs): + p = self._resolve(path) + if not p.exists(): + return [] + res = [] + for item in p.iterdir(): + res.append({ + "name": item.name, + "isDir": item.is_dir(), # Note: JS style camelCase for some APIs + "type": "directory" if item.is_dir() else "file", + "size": item.stat().st_size if item.is_file() else 0, + "mtime": item.stat().st_mtime, + "uri": f"viking://{path}/{item.name}".replace("//", "/") + }) + return res + + def writeto(self, path, content, ctx=None, **kwargs): + p = self._resolve(path) + p.parent.mkdir(parents=True, exist_ok=True) + if isinstance(content, str): + p.write_text(content, encoding="utf-8") + else: + p.write_bytes(content) + return str(p) + + def write(self, path, content, ctx=None, **kwargs): + return self.writeto(path, content, ctx, **kwargs) + + def write_file(self, path, content, ctx=None, **kwargs): + return self.writeto(path, content, ctx, **kwargs) + + def read_file(self, path, ctx=None, **kwargs): + p = self._resolve(path) + if not p.exists(): + raise FileNotFoundError(path) + return p.read_bytes() + + def read(self, path, ctx=None, **kwargs): + return self.read_file(path, ctx, **kwargs) + + def rm(self, path, recursive=False, ctx=None): + p = self._resolve(path) + if p.exists(): + if p.is_dir(): + if recursive: + shutil.rmtree(p) + else: + p.rmdir() + else: + p.unlink() + + def delete_temp(self, path, ctx=None): + self.rm(path, recursive=True, ctx=ctx) + + def mv(self, src, dst, ctx=None): + s = self._resolve(src) + d = self._resolve(dst) + d.parent.mkdir(parents=True, exist_ok=True) + shutil.move(str(s), str(d)) + + def stat(self, path, ctx=None): + p = self._resolve(path) + if not p.exists(): + raise FileNotFoundError(path) + s = p.stat() + return { + "size": s.st_size, + "mtime": s.st_mtime, + "is_dir": p.is_dir() + } + + def bind_request_context(self, ctx): + return MagicMock(__enter__=lambda x: None, __exit__=lambda x,y,z: None)