Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 43 additions & 5 deletions openviking/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
For HTTP mode, use AsyncHTTPClient or SyncHTTPClient.
"""

from __future__ import annotations
import threading
from typing import Any, Dict, List, Optional, Union

Expand Down Expand Up @@ -177,24 +178,33 @@ async def add_resource(
instruction: str = "",
wait: bool = False,
timeout: float = None,
build_index: bool = True,
summarize: bool = False,
**kwargs,
) -> Dict[str, Any]:
"""Add resource to OpenViking (only supports resources scope).
"""
Add a resource (file/URL) to OpenViking.
Args:
wait: Whether to wait for semantic extraction and vectorization to complete
timeout: Wait timeout in seconds
**kwargs: Extra options forwarded to the parser chain, e.g.
``strict``, ``ignore_dirs``, ``include``, ``exclude``.
path: Local file path or URL.
reason: Context/reason for adding this resource.
instruction: Specific instruction for processing.
wait: If True, wait for processing to complete.
target: Target path in VikingFS (e.g., "kb/docs").
build_index: Whether to build vector index immediately (default: True).
summarize: Whether to generate summary (default: False).
"""
await self._ensure_initialized()

return await self._client.add_resource(
path=path,
target=target,
reason=reason,
instruction=instruction,
wait=wait,
timeout=timeout,
build_index=build_index,
summarize=summarize,
**kwargs,
)

Expand All @@ -203,6 +213,34 @@ async def wait_processed(self, timeout: float = None) -> Dict[str, Any]:
await self._ensure_initialized()
return await self._client.wait_processed(timeout=timeout)

async def build_index(
self,
resource_uris: Union[str, List[str]],
**kwargs
) -> Dict[str, Any]:
"""
Manually trigger index building for resources.
Args:
resource_uris: Single URI or list of URIs to index.
"""
await self._ensure_initialized()
return await self._client.build_index(resource_uris, **kwargs)

async def summarize(
self,
resource_uris: Union[str, List[str]],
**kwargs
) -> Dict[str, Any]:
"""
Manually trigger summarization for resources.
Args:
resource_uris: Single URI or list of URIs to summarize.
"""
await self._ensure_initialized()
return await self._client.summarize(resource_uris, **kwargs)

async def add_skill(
self,
data: Any,
Expand Down
20 changes: 20 additions & 0 deletions openviking/client/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,26 @@ async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any
"""Wait for all processing to complete."""
return await self._service.resources.wait_processed(timeout=timeout)

async def build_index(
self,
resource_uris: Union[str, List[str]],
**kwargs
) -> Dict[str, Any]:
"""Manually trigger index building."""
if isinstance(resource_uris, str):
resource_uris = [resource_uris]
return await self._service.resources.build_index(resource_uris, ctx=self._ctx, **kwargs)

async def summarize(
self,
resource_uris: Union[str, List[str]],
**kwargs
) -> Dict[str, Any]:
"""Manually trigger summarization."""
if isinstance(resource_uris, str):
resource_uris = [resource_uris]
return await self._service.resources.summarize(resource_uris, ctx=self._ctx, **kwargs)

# ============= File System =============

async def ls(
Expand Down
37 changes: 16 additions & 21 deletions openviking/parse/tree_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from typing import TYPE_CHECKING, Optional

from openviking.core.building_tree import BuildingTree
from openviking.core.context import Context
from openviking.parse.parsers.media.utils import get_media_base_uri, get_media_type
from openviking.server.identity import RequestContext
from openviking.storage.queuefs import SemanticMsg, get_queue_manager
Expand Down Expand Up @@ -87,29 +88,18 @@ async def finalize_from_temp(
self,
temp_dir_path: str,
ctx: RequestContext,
scope: str,
scope: str = "resources",
base_uri: Optional[str] = None,
source_path: Optional[str] = None,
source_format: Optional[str] = None,
trigger_semantic: bool = False,
) -> "BuildingTree":
"""
Finalize tree from temporary directory (v5.0 architecture).
New architecture:
1. Move directory to AGFS
2. Enqueue to SemanticQueue for async semantic generation
3. Scan and create Resource objects (for compatibility)
Finalize processing by moving from temp to AGFS.
Args:
temp_dir_path: Temporary directory Viking URI (e.g., viking://temp/xxx)
scope: Scope ("resources", "user", or "agent")
base_uri: Base URI (None = use scope default)
source_node: Source ResourceNode
source_path: Source file path
source_format: Source file format
Returns:
Complete BuildingTree with all resources moved to AGFS
trigger_semantic: Whether to automatically trigger semantic generation.
Default is False (handled by ResourceProcessor/Summarizer).
"""

viking_fs = get_viking_fs()
Expand Down Expand Up @@ -185,18 +175,23 @@ async def finalize_from_temp(
logger.warning(f"[TreeBuilder] Failed to cleanup temp root: {e}")

# 6. Enqueue to SemanticQueue for async semantic generation
try:
await self._enqueue_semantic_generation(final_uri, "resource", ctx=ctx)
logger.info(f"[TreeBuilder] Enqueued semantic generation for: {final_uri}")
except Exception as e:
logger.error(f"[TreeBuilder] Failed to enqueue semantic generation: {e}", exc_info=True)
if trigger_semantic:
try:
await self._enqueue_semantic_generation(final_uri, "resource", ctx=ctx)
logger.info(f"[TreeBuilder] Enqueued semantic generation for: {final_uri}")
except Exception as e:
logger.error(f"[TreeBuilder] Failed to enqueue semantic generation: {e}", exc_info=True)

# 7. Return simple BuildingTree (no scanning needed)
tree = BuildingTree(
source_path=source_path,
source_format=source_format,
)
tree._root_uri = final_uri

# Create a minimal Context object for the root so that tree.root is not None
root_context = Context(uri=final_uri)
tree.add_context(root_context)

return tree

Expand Down
48 changes: 44 additions & 4 deletions openviking/service/resource_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
Provides resource management operations: add_resource, add_skill, wait_processed.
"""

from typing import Any, Dict, Optional
from typing import Any, Dict, List, Optional

from openviking.server.identity import RequestContext
from openviking.storage import VikingDBManager
Expand Down Expand Up @@ -71,6 +71,8 @@ async def add_resource(
instruction: str = "",
wait: bool = False,
timeout: Optional[float] = None,
build_index: bool = True,
summarize: bool = False,
**kwargs,
) -> Dict[str, Any]:
"""Add resource to OpenViking (only supports resources scope).
Expand All @@ -82,9 +84,9 @@ async def add_resource(
instruction: Processing instruction
wait: Whether to wait for semantic extraction and vectorization to complete
timeout: Wait timeout in seconds
**kwargs: Extra options forwarded to the parser chain, e.g.
``strict``, ``ignore_dirs``, ``include``, ``exclude``
(used by ``DirectoryParser``).
build_index: Whether to build vector index immediately (default: True).
summarize: Whether to generate summary (default: False).
**kwargs: Extra options forwarded to the parser chain.
Returns:
Processing result
Expand All @@ -106,6 +108,8 @@ async def add_resource(
instruction=instruction,
scope="resources",
target=target,
build_index=build_index,
summarize=summarize,
**kwargs,
)

Expand Down Expand Up @@ -168,6 +172,42 @@ async def add_skill(

return result

async def build_index(
self,
resource_uris: List[str],
ctx: RequestContext,
**kwargs
) -> Dict[str, Any]:
"""Manually trigger index building.
Args:
resource_uris: List of resource URIs to index.
ctx: Request context.
Returns:
Processing result
"""
self._ensure_initialized()
return await self._resource_processor.build_index(resource_uris, ctx, **kwargs)

async def summarize(
self,
resource_uris: List[str],
ctx: RequestContext,
**kwargs
) -> Dict[str, Any]:
"""Manually trigger summarization.
Args:
resource_uris: List of resource URIs to summarize.
ctx: Request context.
Returns:
Processing result
"""
self._ensure_initialized()
return await self._resource_processor.summarize(resource_uris, ctx, **kwargs)

async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any]:
"""Wait for all queued processing to complete.
Expand Down
5 changes: 5 additions & 0 deletions openviking/storage/queuefs/semantic_msg.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ class SemanticMsg:
user_id: str = "default"
agent_id: str = "default"
role: str = "root"
# Additional flags
skip_vectorization: bool = False

def __init__(
self,
Expand All @@ -45,6 +47,7 @@ def __init__(
user_id: str = "default",
agent_id: str = "default",
role: str = "root",
skip_vectorization: bool = False,
):
self.id = str(uuid4())
self.uri = uri
Expand All @@ -54,6 +57,7 @@ def __init__(
self.user_id = user_id
self.agent_id = agent_id
self.role = role
self.skip_vectorization = skip_vectorization

def to_dict(self) -> Dict[str, Any]:
"""Convert object to dictionary."""
Expand Down Expand Up @@ -88,6 +92,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "SemanticMsg":
user_id=data.get("user_id", "default"),
agent_id=data.get("agent_id", "default"),
role=data.get("role", "root"),
skip_vectorization=data.get("skip_vectorization", False),
)
if "id" in data and data["id"]:
obj.id = data["id"]
Expand Down
Loading