Skip to content

Commit cc274da

Browse files
Jay-juTrae AI
andauthored
feat: add index control to add_resource and refactor embedding logic (#401)
Co-authored-by: Trae AI <trae@bytedance.com>
1 parent 59d8bae commit cc274da

File tree

14 files changed

+771
-144
lines changed

14 files changed

+771
-144
lines changed

openviking/async_client.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
For HTTP mode, use AsyncHTTPClient or SyncHTTPClient.
77
"""
88

9+
from __future__ import annotations
910
import threading
1011
from typing import Any, Dict, List, Optional, Union
1112

@@ -177,24 +178,33 @@ async def add_resource(
177178
instruction: str = "",
178179
wait: bool = False,
179180
timeout: float = None,
181+
build_index: bool = True,
182+
summarize: bool = False,
180183
**kwargs,
181184
) -> Dict[str, Any]:
182-
"""Add resource to OpenViking (only supports resources scope).
185+
"""
186+
Add a resource (file/URL) to OpenViking.
183187
184188
Args:
185-
wait: Whether to wait for semantic extraction and vectorization to complete
186-
timeout: Wait timeout in seconds
187-
**kwargs: Extra options forwarded to the parser chain, e.g.
188-
``strict``, ``ignore_dirs``, ``include``, ``exclude``.
189+
path: Local file path or URL.
190+
reason: Context/reason for adding this resource.
191+
instruction: Specific instruction for processing.
192+
wait: If True, wait for processing to complete.
193+
target: Target path in VikingFS (e.g., "kb/docs").
194+
build_index: Whether to build vector index immediately (default: True).
195+
summarize: Whether to generate summary (default: False).
189196
"""
190197
await self._ensure_initialized()
198+
191199
return await self._client.add_resource(
192200
path=path,
193201
target=target,
194202
reason=reason,
195203
instruction=instruction,
196204
wait=wait,
197205
timeout=timeout,
206+
build_index=build_index,
207+
summarize=summarize,
198208
**kwargs,
199209
)
200210

@@ -203,6 +213,34 @@ async def wait_processed(self, timeout: float = None) -> Dict[str, Any]:
203213
await self._ensure_initialized()
204214
return await self._client.wait_processed(timeout=timeout)
205215

216+
async def build_index(
217+
self,
218+
resource_uris: Union[str, List[str]],
219+
**kwargs
220+
) -> Dict[str, Any]:
221+
"""
222+
Manually trigger index building for resources.
223+
224+
Args:
225+
resource_uris: Single URI or list of URIs to index.
226+
"""
227+
await self._ensure_initialized()
228+
return await self._client.build_index(resource_uris, **kwargs)
229+
230+
async def summarize(
231+
self,
232+
resource_uris: Union[str, List[str]],
233+
**kwargs
234+
) -> Dict[str, Any]:
235+
"""
236+
Manually trigger summarization for resources.
237+
238+
Args:
239+
resource_uris: Single URI or list of URIs to summarize.
240+
"""
241+
await self._ensure_initialized()
242+
return await self._client.summarize(resource_uris, **kwargs)
243+
206244
async def add_skill(
207245
self,
208246
data: Any,

openviking/client/local.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,26 @@ async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any
9393
"""Wait for all processing to complete."""
9494
return await self._service.resources.wait_processed(timeout=timeout)
9595

96+
async def build_index(
97+
self,
98+
resource_uris: Union[str, List[str]],
99+
**kwargs
100+
) -> Dict[str, Any]:
101+
"""Manually trigger index building."""
102+
if isinstance(resource_uris, str):
103+
resource_uris = [resource_uris]
104+
return await self._service.resources.build_index(resource_uris, ctx=self._ctx, **kwargs)
105+
106+
async def summarize(
107+
self,
108+
resource_uris: Union[str, List[str]],
109+
**kwargs
110+
) -> Dict[str, Any]:
111+
"""Manually trigger summarization."""
112+
if isinstance(resource_uris, str):
113+
resource_uris = [resource_uris]
114+
return await self._service.resources.summarize(resource_uris, ctx=self._ctx, **kwargs)
115+
96116
# ============= File System =============
97117

98118
async def ls(

openviking/parse/tree_builder.py

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
from typing import TYPE_CHECKING, Optional
2626

2727
from openviking.core.building_tree import BuildingTree
28+
from openviking.core.context import Context
2829
from openviking.parse.parsers.media.utils import get_media_base_uri, get_media_type
2930
from openviking.server.identity import RequestContext
3031
from openviking.storage.queuefs import SemanticMsg, get_queue_manager
@@ -87,29 +88,18 @@ async def finalize_from_temp(
8788
self,
8889
temp_dir_path: str,
8990
ctx: RequestContext,
90-
scope: str,
91+
scope: str = "resources",
9192
base_uri: Optional[str] = None,
9293
source_path: Optional[str] = None,
9394
source_format: Optional[str] = None,
95+
trigger_semantic: bool = False,
9496
) -> "BuildingTree":
9597
"""
96-
Finalize tree from temporary directory (v5.0 architecture).
97-
98-
New architecture:
99-
1. Move directory to AGFS
100-
2. Enqueue to SemanticQueue for async semantic generation
101-
3. Scan and create Resource objects (for compatibility)
98+
Finalize processing by moving from temp to AGFS.
10299
103100
Args:
104-
temp_dir_path: Temporary directory Viking URI (e.g., viking://temp/xxx)
105-
scope: Scope ("resources", "user", or "agent")
106-
base_uri: Base URI (None = use scope default)
107-
source_node: Source ResourceNode
108-
source_path: Source file path
109-
source_format: Source file format
110-
111-
Returns:
112-
Complete BuildingTree with all resources moved to AGFS
101+
trigger_semantic: Whether to automatically trigger semantic generation.
102+
Default is False (handled by ResourceProcessor/Summarizer).
113103
"""
114104

115105
viking_fs = get_viking_fs()
@@ -185,18 +175,23 @@ async def finalize_from_temp(
185175
logger.warning(f"[TreeBuilder] Failed to cleanup temp root: {e}")
186176

187177
# 6. Enqueue to SemanticQueue for async semantic generation
188-
try:
189-
await self._enqueue_semantic_generation(final_uri, "resource", ctx=ctx)
190-
logger.info(f"[TreeBuilder] Enqueued semantic generation for: {final_uri}")
191-
except Exception as e:
192-
logger.error(f"[TreeBuilder] Failed to enqueue semantic generation: {e}", exc_info=True)
178+
if trigger_semantic:
179+
try:
180+
await self._enqueue_semantic_generation(final_uri, "resource", ctx=ctx)
181+
logger.info(f"[TreeBuilder] Enqueued semantic generation for: {final_uri}")
182+
except Exception as e:
183+
logger.error(f"[TreeBuilder] Failed to enqueue semantic generation: {e}", exc_info=True)
193184

194185
# 7. Return simple BuildingTree (no scanning needed)
195186
tree = BuildingTree(
196187
source_path=source_path,
197188
source_format=source_format,
198189
)
199190
tree._root_uri = final_uri
191+
192+
# Create a minimal Context object for the root so that tree.root is not None
193+
root_context = Context(uri=final_uri)
194+
tree.add_context(root_context)
200195

201196
return tree
202197

openviking/service/resource_service.py

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
Provides resource management operations: add_resource, add_skill, wait_processed.
77
"""
88

9-
from typing import Any, Dict, Optional
9+
from typing import Any, Dict, List, Optional
1010

1111
from openviking.server.identity import RequestContext
1212
from openviking.storage import VikingDBManager
@@ -71,6 +71,8 @@ async def add_resource(
7171
instruction: str = "",
7272
wait: bool = False,
7373
timeout: Optional[float] = None,
74+
build_index: bool = True,
75+
summarize: bool = False,
7476
**kwargs,
7577
) -> Dict[str, Any]:
7678
"""Add resource to OpenViking (only supports resources scope).
@@ -82,9 +84,9 @@ async def add_resource(
8284
instruction: Processing instruction
8385
wait: Whether to wait for semantic extraction and vectorization to complete
8486
timeout: Wait timeout in seconds
85-
**kwargs: Extra options forwarded to the parser chain, e.g.
86-
``strict``, ``ignore_dirs``, ``include``, ``exclude``
87-
(used by ``DirectoryParser``).
87+
build_index: Whether to build vector index immediately (default: True).
88+
summarize: Whether to generate summary (default: False).
89+
**kwargs: Extra options forwarded to the parser chain.
8890
8991
Returns:
9092
Processing result
@@ -106,6 +108,8 @@ async def add_resource(
106108
instruction=instruction,
107109
scope="resources",
108110
target=target,
111+
build_index=build_index,
112+
summarize=summarize,
109113
**kwargs,
110114
)
111115

@@ -168,6 +172,42 @@ async def add_skill(
168172

169173
return result
170174

175+
async def build_index(
176+
self,
177+
resource_uris: List[str],
178+
ctx: RequestContext,
179+
**kwargs
180+
) -> Dict[str, Any]:
181+
"""Manually trigger index building.
182+
183+
Args:
184+
resource_uris: List of resource URIs to index.
185+
ctx: Request context.
186+
187+
Returns:
188+
Processing result
189+
"""
190+
self._ensure_initialized()
191+
return await self._resource_processor.build_index(resource_uris, ctx, **kwargs)
192+
193+
async def summarize(
194+
self,
195+
resource_uris: List[str],
196+
ctx: RequestContext,
197+
**kwargs
198+
) -> Dict[str, Any]:
199+
"""Manually trigger summarization.
200+
201+
Args:
202+
resource_uris: List of resource URIs to summarize.
203+
ctx: Request context.
204+
205+
Returns:
206+
Processing result
207+
"""
208+
self._ensure_initialized()
209+
return await self._resource_processor.summarize(resource_uris, ctx, **kwargs)
210+
171211
async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any]:
172212
"""Wait for all queued processing to complete.
173213

openviking/storage/queuefs/semantic_msg.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ class SemanticMsg:
3535
user_id: str = "default"
3636
agent_id: str = "default"
3737
role: str = "root"
38+
# Additional flags
39+
skip_vectorization: bool = False
3840

3941
def __init__(
4042
self,
@@ -45,6 +47,7 @@ def __init__(
4547
user_id: str = "default",
4648
agent_id: str = "default",
4749
role: str = "root",
50+
skip_vectorization: bool = False,
4851
):
4952
self.id = str(uuid4())
5053
self.uri = uri
@@ -54,6 +57,7 @@ def __init__(
5457
self.user_id = user_id
5558
self.agent_id = agent_id
5659
self.role = role
60+
self.skip_vectorization = skip_vectorization
5761

5862
def to_dict(self) -> Dict[str, Any]:
5963
"""Convert object to dictionary."""
@@ -88,6 +92,7 @@ def from_dict(cls, data: Dict[str, Any]) -> "SemanticMsg":
8892
user_id=data.get("user_id", "default"),
8993
agent_id=data.get("agent_id", "default"),
9094
role=data.get("role", "root"),
95+
skip_vectorization=data.get("skip_vectorization", False),
9196
)
9297
if "id" in data and data["id"]:
9398
obj.id = data["id"]

0 commit comments

Comments
 (0)