Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion openviking/session/memory_deduplicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ async def _find_similar_memories(
# Build filter by memory scope + uri prefix (schema does not have category field yet).
filter_conds = [
{"field": "context_type", "op": "must", "conds": ["memory"]},
{"field": "is_leaf", "op": "must", "conds": [True]},
{"field": "level", "op": "must", "conds": [2]},
]
owner = candidate.user
if hasattr(owner, "account_id"):
Expand Down
22 changes: 22 additions & 0 deletions openviking/storage/viking_vector_index_backend.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,23 @@ def _update_meta_data_cache(self, collection_name: str, coll: Collection):
meta_data = coll.get_meta_data()
self._meta_data_cache[collection_name] = meta_data

@staticmethod
def _restore_uri_fields(record: Dict[str, Any]) -> Dict[str, Any]:
"""Restore viking:// prefix on uri/parent_uri fields read from VikingDB.

The volcengine backend sanitizes URIs to /path/ format on write;
this reverses that transformation so the rest of the system sees
the canonical viking:// scheme. Idempotent for values that
already carry the prefix (local/http backends).
"""
for key in ("uri", "parent_uri"):
val = record.get(key)
if isinstance(val, str) and not val.startswith("viking://"):
restored = val.strip("/")
if restored:
record[key] = f"viking://{restored}"
return record

# =========================================================================
# Collection/Table Management
# =========================================================================
Expand Down Expand Up @@ -445,6 +462,7 @@ async def get(self, collection: str, ids: List[str]) -> List[Dict[str, Any]]:
for item in result.items:
record = dict(item.fields) if item.fields else {}
record["id"] = item.id
self._restore_uri_fields(record)
records.append(record)
return records
elif isinstance(result, dict):
Expand All @@ -454,6 +472,7 @@ async def get(self, collection: str, ids: List[str]) -> List[Dict[str, Any]]:
record = dict(item.get("fields", {})) if item.get("fields") else {}
record["id"] = item.get("id")
if record["id"]:
self._restore_uri_fields(record)
records.append(record)
return records
else:
Expand All @@ -476,6 +495,7 @@ async def fetch_by_uri(self, collection: str, uri: str) -> Optional[Dict[str, An
for item in result.data:
record = dict(item.fields) if item.fields else {}
record["id"] = item.id
self._restore_uri_fields(record)
records.append(record)
if len(records) > 1:
raise ValueError(f"Duplicate records found for URI: {uri}")
Expand Down Expand Up @@ -670,6 +690,7 @@ async def search(
record = dict(item.fields) if item.fields else {}
record["id"] = item.id
record["_score"] = item.score if item.score is not None else 0.0
self._restore_uri_fields(record)

if not with_vector:
if "vector" in record:
Expand Down Expand Up @@ -734,6 +755,7 @@ async def filter(
for item in result.data:
record = dict(item.fields) if item.fields else {}
record["id"] = item.id
self._restore_uri_fields(record)
records.append(record)

return records
Expand Down
56 changes: 56 additions & 0 deletions third_party/agfs/agfs-server/pkg/plugins/s3fs/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,62 @@ func (c *S3Client) DirectoryExists(ctx context.Context, path string) (bool, erro
return len(result.Contents) > 0 || len(result.CommonPrefixes) > 0, nil
}

// CopyObject copies an object within the same bucket
func (c *S3Client) CopyObject(ctx context.Context, srcPath, dstPath string) error {
srcKey := c.buildKey(srcPath)
dstKey := c.buildKey(dstPath)

_, err := c.client.CopyObject(ctx, &s3.CopyObjectInput{
Bucket: aws.String(c.bucket),
CopySource: aws.String(c.bucket + "/" + srcKey),
Key: aws.String(dstKey),
})
if err != nil {
return fmt.Errorf("failed to copy object %s -> %s: %w", srcKey, dstKey, err)
}
return nil
}

// ListAllObjects lists all objects (recursively) under a given prefix.
// Unlike ListObjects which only lists immediate children, this returns
// every object in the subtree.
func (c *S3Client) ListAllObjects(ctx context.Context, path string) ([]S3Object, error) {
prefix := c.buildKey(path)
if prefix != "" && !strings.HasSuffix(prefix, "/") {
prefix += "/"
}

var objects []S3Object
paginator := s3.NewListObjectsV2Paginator(c.client, &s3.ListObjectsV2Input{
Bucket: aws.String(c.bucket),
Prefix: aws.String(prefix),
// No Delimiter — list all objects recursively
})

for paginator.HasMorePages() {
page, err := paginator.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to list all objects: %w", err)
}

for _, obj := range page.Contents {
if obj.Key == nil {
continue
}
relPath := strings.TrimPrefix(*obj.Key, prefix)
isDir := strings.HasSuffix(relPath, "/")
objects = append(objects, S3Object{
Key: relPath,
Size: aws.ToInt64(obj.Size),
LastModified: aws.ToTime(obj.LastModified),
IsDir: isDir,
})
}
}

return objects, nil
}

// getParentPath returns the parent directory path
func getParentPath(path string) string {
if path == "" || path == "/" {
Expand Down
75 changes: 60 additions & 15 deletions third_party/agfs/agfs-server/pkg/plugins/s3fs/s3fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,34 +428,38 @@ func (fs *S3FS) Rename(oldPath, newPath string) error {
fs.mu.Lock()
defer fs.mu.Unlock()

// Check if old path exists
exists, err := fs.client.ObjectExists(ctx, oldPath)
// Try as file first
fileExists, err := fs.client.ObjectExists(ctx, oldPath)
if err != nil {
return fmt.Errorf("failed to check source: %w", err)
}
if !exists {
return filesystem.ErrNotFound

if fileExists {
return fs.renameSingleObject(ctx, oldPath, newPath)
}

// Get the object
data, err := fs.client.GetObject(ctx, oldPath)
// Try as directory
dirExists, err := fs.client.DirectoryExists(ctx, oldPath)
if err != nil {
return fmt.Errorf("failed to read source: %w", err)
return fmt.Errorf("failed to check source directory: %w", err)
}
if !dirExists {
return filesystem.ErrNotFound
}

// Put to new location
err = fs.client.PutObject(ctx, newPath, data)
if err != nil {
return fmt.Errorf("failed to write destination: %w", err)
return fs.renameDirectory(ctx, oldPath, newPath)
}

// renameSingleObject moves a single S3 object via copy + delete.
func (fs *S3FS) renameSingleObject(ctx context.Context, oldPath, newPath string) error {
if err := fs.client.CopyObject(ctx, oldPath, newPath); err != nil {
return fmt.Errorf("failed to copy source: %w", err)
}

// Delete old object
err = fs.client.DeleteObject(ctx, oldPath)
if err != nil {
if err := fs.client.DeleteObject(ctx, oldPath); err != nil {
return fmt.Errorf("failed to delete source: %w", err)
}

// Invalidate caches
oldParent := getParentPath(oldPath)
newParent := getParentPath(newPath)
fs.dirCache.Invalidate(oldParent)
Expand All @@ -466,6 +470,47 @@ func (fs *S3FS) Rename(oldPath, newPath string) error {
return nil
}

// renameDirectory moves an entire directory subtree by copying every object
// under oldPath to newPath and then deleting the originals.
func (fs *S3FS) renameDirectory(ctx context.Context, oldPath, newPath string) error {
// List every object (recursively) under oldPath
objects, err := fs.client.ListAllObjects(ctx, oldPath)
if err != nil {
return fmt.Errorf("failed to list source directory: %w", err)
}

// Copy each object to the new prefix
for _, obj := range objects {
srcRel := obj.Key // relative to oldPath
if err := fs.client.CopyObject(ctx, oldPath+"/"+srcRel, newPath+"/"+srcRel); err != nil {
return fmt.Errorf("failed to copy %s: %w", srcRel, err)
}
}

// Create the new directory marker
if err := fs.client.CreateDirectory(ctx, newPath); err != nil {
// Ignore if already exists (implicit from copied children)
log.Debugf("[s3fs] CreateDirectory %s (may already exist): %v", newPath, err)
}

// Delete old directory tree (marker + all children)
if err := fs.client.DeleteDirectory(ctx, oldPath); err != nil {
return fmt.Errorf("failed to delete source directory: %w", err)
}

// Invalidate caches broadly
oldParent := getParentPath(oldPath)
newParent := getParentPath(newPath)
fs.dirCache.Invalidate(oldParent)
fs.dirCache.Invalidate(newParent)
fs.dirCache.InvalidatePrefix(oldPath)
fs.dirCache.InvalidatePrefix(newPath)
fs.statCache.InvalidatePrefix(oldPath)
fs.statCache.InvalidatePrefix(newPath)

return nil
}

func (fs *S3FS) Chmod(path string, mode uint32) error {
// S3 doesn't support Unix permissions
// This is a no-op for compatibility
Expand Down
Loading