diff --git a/.gitignore b/.gitignore index c0e20a0e..90296c16 100644 --- a/.gitignore +++ b/.gitignore @@ -167,6 +167,5 @@ test_db_*/ test_project_root/ benchmark_stress_db/ examples/data/ -third_party/agfs/bin/ openviking/_version.py specs/ diff --git a/README.md b/README.md index 862d23bf..bdb3349b 100644 --- a/README.md +++ b/README.md @@ -349,6 +349,23 @@ set "OPENVIKING_CONFIG_FILE=%USERPROFILE%\.openviking\ov.conf" > 💡 **Tip**: You can also place the configuration file in other locations, just specify the correct path in the environment variable. +#### AGFS Binding Mode (High Performance) + +For better performance, you can use the Python Binding mode to bypass HTTP overhead by directly calling the AGFS core. + +**Update your `ov.conf`**: + Add `mode` to the `agfs` section: + ```json + { + "storage": { + "agfs": { + "mode": "binding-client", + "backend": "local" + } + } + } + ``` + ### 4. Run Your First Example > 📝 **Prerequisite**: Ensure you have completed the environment configuration in the previous step. diff --git a/docs/en/guides/01-configuration.md b/docs/en/guides/01-configuration.md index 2275b70f..b44dab58 100644 --- a/docs/en/guides/01-configuration.md +++ b/docs/en/guides/01-configuration.md @@ -323,6 +323,79 @@ Storage backend configuration. } ``` +#### agfs + +| Parameter | Type | Description | Default | +|-----------|------|-------------|---------| +| `mode` | str | `"http-client"` or `"binding-client"` | `"http-client"` | +| `backend` | str | `"local"`, `"s3"`, or `"memory"` | `"local"` | +| `path` | str | Local directory path for `local` backend | `"./data"` | +| `url` | str | AGFS service URL for `http-client` mode | `"http://localhost:1833"` | +| `timeout` | float | Request timeout in seconds | `10.0` | + +**Configuration Examples** + +
+HTTP Client (Default) + +Connects to a remote or local AGFS service via HTTP. + +```json +{ + "storage": { + "agfs": { + "mode": "http-client", + "url": "http://localhost:1833", + "timeout": 10.0 + } + } +} +``` + +
+ +
+Binding Client (High Performance) + +Directly uses the AGFS Go implementation through a shared library. + +**Config**: +```json +{ + "storage": { + "agfs": { + "mode": "binding-client", + "backend": "local", + "path": "./data" + } + } +} +``` + +
+ +**S3 Backend** + +```json +{ + "storage": { + "agfs": { + "backend": "s3", + "s3": { + "bucket": "my-bucket", + "endpoint": "s3.amazonaws.com", + "region": "us-east-1", + "access_key": "your-ak", + "secret_key": "your-sk" + } + } + } +} +``` + +#### vectordb + + ## Config Files OpenViking uses two config files: diff --git a/docs/zh/guides/01-configuration.md b/docs/zh/guides/01-configuration.md index 983b269a..e8376043 100644 --- a/docs/zh/guides/01-configuration.md +++ b/docs/zh/guides/01-configuration.md @@ -330,6 +330,78 @@ OpenViking 使用 JSON 配置文件(`ov.conf`)进行设置。配置文件支 } ``` +#### agfs + +| 参数 | 类型 | 说明 | 默认值 | +|------|------|------|--------| +| `mode` | str | `"http-client"` 或 `"binding-client"` | `"http-client"` | +| `backend` | str | `"local"`、`"s3"` 或 `"memory"` | `"local"` | +| `path` | str | `local` 后端的本地目录路径 | `"./data"` | +| `url` | str | `http-client` 模式下的 AGFS 服务地址 | `"http://localhost:1833"` | +| `timeout` | float | 请求超时时间(秒) | `10.0` | + +**配置示例** + +
+HTTP Client(默认) + +通过 HTTP 连接到远程或本地的 AGFS 服务。 + +```json +{ + "storage": { + "agfs": { + "mode": "http-client", + "url": "http://localhost:1833", + "timeout": 10.0 + } + } +} +``` + +
+ +
+Binding Client(高性能) + +通过共享库直接使用 AGFS 的 Go 实现。 + +**配置**: +```json +{ + "storage": { + "agfs": { + "mode": "binding-client", + "backend": "local", + "path": "./data" + } + } +} +``` + +
+ +**S3 后端** + +```json +{ + "storage": { + "agfs": { + "backend": "s3", + "s3": { + "bucket": "my-bucket", + "endpoint": "s3.amazonaws.com", + "region": "us-east-1", + "access_key": "your-ak", + "secret_key": "your-sk" + } + } + } +} +``` + +#### vectordb + ## 配置文件 OpenViking 使用两个配置文件: diff --git a/openviking/eval/ragas/playback.py b/openviking/eval/ragas/playback.py index bab5327a..6ec9e10c 100644 --- a/openviking/eval/ragas/playback.py +++ b/openviking/eval/ragas/playback.py @@ -88,9 +88,7 @@ def to_dict(self) -> Dict[str, Any]: ) agfs_fs_total = self.agfs_fs_success_count + self.agfs_fs_error_count agfs_fs_success_rate = ( - self.agfs_fs_success_count / agfs_fs_total * 100 - if agfs_fs_total > 0 - else 0 + self.agfs_fs_success_count / agfs_fs_total * 100 if agfs_fs_total > 0 else 0 ) avg_agfs_calls = ( self.total_agfs_calls / self.total_viking_fs_operations @@ -217,29 +215,26 @@ def _init_backends(self) -> None: from openviking.agfs_manager import AGFSManager from openviking.storage.viking_fs import init_viking_fs from openviking.storage.viking_vector_index_backend import VikingVectorIndexBackend + from openviking.utils.agfs_utils import create_agfs_client from openviking_cli.utils.config import get_openviking_config from openviking_cli.utils.config.vectordb_config import VectorDBBackendConfig config = get_openviking_config() - - agfs_url = config.storage.agfs.url + agfs_config = config.storage.agfs agfs_manager = None - if config.storage.agfs.backend == "local": - agfs_manager = AGFSManager(config=config.storage.agfs) + # Determine if we need to start AGFSManager for HTTP mode + mode = getattr(agfs_config, "mode", "http-client") + if mode == "http-client": + agfs_manager = AGFSManager(config=agfs_config) agfs_manager.start() - agfs_url = agfs_manager.url - logger.info(f"[IOPlayback] Started AGFS at {agfs_url}") - elif config.storage.agfs.backend in ["s3", "memory"]: - agfs_manager = AGFSManager(config=config.storage.agfs) - agfs_manager.start() - agfs_url = agfs_manager.url logger.info( - f"[IOPlayback] Started AGFS with {config.storage.agfs.backend} backend at {agfs_url}" + f"[IOPlayback] Started AGFS manager in HTTP mode at {agfs_manager.url} " + f"with {agfs_config.backend} backend" ) - elif not agfs_url: - agfs_url = "http://localhost:8080" - logger.warning(f"[IOPlayback] No AGFS URL configured, using default: {agfs_url}") + + # Create AGFS client using utility + agfs_client = create_agfs_client(agfs_config) vector_store = None if self.enable_vikingdb: @@ -255,8 +250,9 @@ def _init_backends(self) -> None: vector_store = VikingVectorIndexBackend(config=backend_config) if self.enable_fs: + # Use init_viking_fs which handles mode (HTTP/Binding) automatically based on agfs_config self._viking_fs = init_viking_fs( - agfs_url=agfs_url, + agfs=agfs_client, vector_store=vector_store, ) self._vector_store = vector_store @@ -332,7 +328,9 @@ def process_arg(arg: Any) -> Any: return result - def _compare_agfs_calls(self, recorded_calls: List[Any], actual_calls: List[Dict[str, Any]]) -> bool: + def _compare_agfs_calls( + self, recorded_calls: List[Any], actual_calls: List[Dict[str, Any]] + ) -> bool: """ Compare recorded AGFS calls with actual AGFS calls. @@ -365,14 +363,10 @@ def _compare_agfs_calls(self, recorded_calls: List[Any], actual_calls: List[Dict ) return False if recorded_req != actual_call["request"]: - logger.warning( - f"AGFS request mismatch for operation {recorded_op}" - ) + logger.warning(f"AGFS request mismatch for operation {recorded_op}") return False if recorded_success != actual_call["success"]: - logger.warning( - f"AGFS success status mismatch for operation {recorded_op}" - ) + logger.warning(f"AGFS success status mismatch for operation {recorded_op}") return False return True diff --git a/openviking/retrieve/hierarchical_retriever.py b/openviking/retrieve/hierarchical_retriever.py index d49f2bc6..7f92e1df 100644 --- a/openviking/retrieve/hierarchical_retriever.py +++ b/openviking/retrieve/hierarchical_retriever.py @@ -12,6 +12,7 @@ from typing import Any, Dict, List, Optional, Tuple from openviking.models.embedder.base import EmbedResult +from openviking.retrieve.memory_lifecycle import hotness_score from openviking.server.identity import RequestContext, Role from openviking.storage import VikingDBInterface from openviking.storage.viking_fs import get_viking_fs @@ -22,7 +23,6 @@ RelatedContext, TypedQuery, ) -from openviking.retrieve.memory_lifecycle import hotness_score from openviking_cli.utils.config import RerankConfig from openviking_cli.utils.logger import get_logger diff --git a/openviking/service/core.py b/openviking/service/core.py index be603058..31f4c3a7 100644 --- a/openviking/service/core.py +++ b/openviking/service/core.py @@ -68,6 +68,7 @@ def __init__( # Infrastructure self._agfs_manager: Optional[AGFSManager] = None self._agfs_url: Optional[str] = None + self._agfs_client: Optional[Any] = None self._queue_manager: Optional[QueueManager] = None self._vikingdb_manager: Optional[VikingDBManager] = None self._viking_fs: Optional[VikingFS] = None @@ -108,21 +109,30 @@ def _init_storage( max_concurrent_semantic: int = 100, ) -> None: """Initialize storage resources.""" - self._agfs_manager = AGFSManager(config=config.agfs) - self._agfs_manager.start() - self._agfs_url = self._agfs_manager.url - config.agfs.url = self._agfs_url + from openviking.utils.agfs_utils import create_agfs_client + + mode = getattr(config.agfs, "mode", "http-client") + if mode == "http-client" and config.agfs.backend == "local": + self._agfs_manager = AGFSManager(config=config.agfs) + self._agfs_manager.start() + self._agfs_url = self._agfs_manager.url + config.agfs.url = self._agfs_url + else: + self._agfs_url = config.agfs.url + + # Create AGFS client using utility + self._agfs_client = create_agfs_client(config.agfs) - # Initialize QueueManager - if self._agfs_url: + # Initialize QueueManager with agfs_client + if self._agfs_client: self._queue_manager = init_queue_manager( - agfs_url=self._agfs_url, + agfs=self._agfs_client, timeout=config.agfs.timeout, max_concurrent_embedding=max_concurrent_embedding, max_concurrent_semantic=max_concurrent_semantic, ) else: - logger.warning("AGFS URL not configured, skipping queue manager initialization") + logger.warning("AGFS client not initialized, skipping queue manager") # Initialize VikingDBManager with QueueManager self._vikingdb_manager = VikingDBManager( @@ -221,11 +231,10 @@ async def initialize(self) -> None: await init_context_collection(self._vikingdb_manager) self._viking_fs = init_viking_fs( - agfs_url=self._agfs_url or "http://localhost:8080", + agfs=self._agfs_client, query_embedder=self._embedder, rerank_config=config.rerank, vector_store=self._vikingdb_manager, - timeout=config.storage.agfs.timeout, enable_recorder=enable_recorder, ) if enable_recorder: diff --git a/openviking/storage/queuefs/queue_manager.py b/openviking/storage/queuefs/queue_manager.py index 3c5de9c0..dc9aeb57 100644 --- a/openviking/storage/queuefs/queue_manager.py +++ b/openviking/storage/queuefs/queue_manager.py @@ -12,8 +12,6 @@ import traceback from typing import Any, Dict, Optional, Set, Union -from pyagfs import AGFSClient - from openviking_cli.utils.logger import get_logger from .embedding_queue import EmbeddingQueue @@ -27,16 +25,24 @@ def init_queue_manager( - agfs_url: str = "http://localhost:8080", + agfs: Any, timeout: int = 10, mount_point: str = "/queue", max_concurrent_embedding: int = 10, max_concurrent_semantic: int = 100, ) -> "QueueManager": - """Initialize QueueManager singleton.""" + """Initialize QueueManager singleton. + + Args: + agfs: Pre-initialized AGFS client (HTTP or Binding). + timeout: Request timeout in seconds. + mount_point: Path where QueueFS is mounted. + max_concurrent_embedding: Max concurrent embedding tasks. + max_concurrent_semantic: Max concurrent semantic tasks. + """ global _instance _instance = QueueManager( - agfs_url=agfs_url, + agfs=agfs, timeout=timeout, mount_point=mount_point, max_concurrent_embedding=max_concurrent_embedding, @@ -65,19 +71,18 @@ class QueueManager: def __init__( self, - agfs_url: str = "http://localhost:8080", + agfs: Any, timeout: int = 10, mount_point: str = "/queue", max_concurrent_embedding: int = 10, max_concurrent_semantic: int = 100, ): """Initialize QueueManager.""" - self._agfs_url = agfs_url + self._agfs = agfs self.timeout = timeout self.mount_point = mount_point self._max_concurrent_embedding = max_concurrent_embedding self._max_concurrent_semantic = max_concurrent_semantic - self._agfs: Optional[Any] = None self._queues: Dict[str, NamedQueue] = {} self._started = False self._queue_threads: Dict[str, threading.Thread] = {} @@ -86,15 +91,14 @@ def __init__( atexit.register(self.stop) logger.info( - f"[QueueManager] Initialized with agfs_url={agfs_url}, mount_point={mount_point}" + f"[QueueManager] Initialized with agfs={type(agfs).__name__}, mount_point={mount_point}" ) def start(self) -> None: - """Start QueueManager, establish connection and ensure queuefs is mounted.""" + """Start QueueManager workers.""" if self._started: return - self._agfs = AGFSClient(api_base_url=self._agfs_url, timeout=self.timeout) self._started = True # Start queue workers for existing queues diff --git a/openviking/storage/vectordb/collection/local_collection.py b/openviking/storage/vectordb/collection/local_collection.py index 87914eec..dc84d39f 100644 --- a/openviking/storage/vectordb/collection/local_collection.py +++ b/openviking/storage/vectordb/collection/local_collection.py @@ -337,22 +337,22 @@ def search_by_id( ) -> SearchResult: if not self.store_mgr: raise RuntimeError("Store manager is not initialized") - + # Validate input ID if id is None: return SearchResult() - + # Handle empty string IDs if isinstance(id, str) and not id.strip(): return SearchResult() - + try: pk = self.meta.primary_key label = str_to_uint64(str(id)) if pk != AUTO_ID_KEY else int(id) - except (ValueError, OverflowError) as e: + except (ValueError, OverflowError): # Invalid ID format - return empty result instead of crashing return SearchResult() - + cands_list: List[CandidateData] = self.store_mgr.fetch_cands_data([label]) if not cands_list or cands_list[0] is None: return SearchResult() diff --git a/openviking/storage/viking_fs.py b/openviking/storage/viking_fs.py index 2c42bc12..cfc7b971 100644 --- a/openviking/storage/viking_fs.py +++ b/openviking/storage/viking_fs.py @@ -22,7 +22,6 @@ from pathlib import PurePath from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union -from pyagfs import AGFSClient from pyagfs.exceptions import AGFSHTTPError from openviking.server.identity import RequestContext, Role @@ -69,30 +68,29 @@ def from_dict(data: Dict[str, Any]) -> "RelationEntry": def init_viking_fs( - agfs_url: str = "http://localhost:8080", + agfs: Any, query_embedder: Optional[Any] = None, rerank_config: Optional["RerankConfig"] = None, vector_store: Optional["VikingDBInterface"] = None, - timeout: int = 10, enable_recorder: bool = False, ) -> "VikingFS": """Initialize VikingFS singleton. Args: - agfs_url: AGFS service URL + agfs: Pre-initialized AGFS client (HTTP or Binding) + agfs_config: AGFS configuration object for backend settings query_embedder: Embedder instance rerank_config: Rerank configuration vector_store: Vector store instance - timeout: Request timeout in seconds enable_recorder: Whether to enable IO recording """ global _instance + _instance = VikingFS( - agfs_url=agfs_url, + agfs=agfs, query_embedder=query_embedder, rerank_config=rerank_config, vector_store=vector_store, - timeout=timeout, ) if enable_recorder: @@ -153,24 +151,26 @@ class VikingFS: APIs are divided into two categories: - AGFS basic commands (direct forwarding): read, ls, write, mkdir, rm, mv, grep, stat - VikingFS specific capabilities: abstract, overview, find, search, relations, link, unlink + + Supports two modes: + - HTTP mode: Use AGFSClient to connect to AGFS server via HTTP + - Binding mode: Use AGFSBindingClient to directly use AGFS implementation """ def __init__( self, - agfs_url: str = "http://localhost:8080", + agfs: Any, query_embedder: Optional[Any] = None, rerank_config: Optional["RerankConfig"] = None, vector_store: Optional["VikingDBInterface"] = None, - timeout: int = 10, ): - self.agfs = AGFSClient(api_base_url=agfs_url, timeout=timeout) + self.agfs = agfs self.query_embedder = query_embedder self.rerank_config = rerank_config self.vector_store = vector_store self._bound_ctx: contextvars.ContextVar[Optional[RequestContext]] = contextvars.ContextVar( "vikingfs_bound_ctx", default=None ) - logger.info(f"[VikingFS] Initialized with agfs_url={agfs_url}") @staticmethod def _default_ctx() -> RequestContext: diff --git a/openviking/storage/vikingdb_manager.py b/openviking/storage/vikingdb_manager.py index 1321756e..2b68b6a1 100644 --- a/openviking/storage/vikingdb_manager.py +++ b/openviking/storage/vikingdb_manager.py @@ -11,7 +11,6 @@ from openviking.storage.queuefs.queue_manager import QueueManager from openviking.storage.viking_vector_index_backend import VikingVectorIndexBackend from openviking_cli.utils import get_logger -from openviking_cli.utils.config.agfs_config import AGFSConfig from openviking_cli.utils.config.vectordb_config import VectorDBBackendConfig logger = get_logger(__name__) diff --git a/openviking/utils/agfs_utils.py b/openviking/utils/agfs_utils.py new file mode 100644 index 00000000..c1b0da8e --- /dev/null +++ b/openviking/utils/agfs_utils.py @@ -0,0 +1,130 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +""" +AGFS Client utilities for creating and configuring AGFS clients. +""" + +import os +from typing import Any + +from pyagfs import AGFSBindingClient, AGFSClient + +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +def create_agfs_client(agfs_config: Any) -> Any: + """ + Create an AGFS client based on the provided configuration. + + Args: + agfs_config: AGFS configuration object containing mode and other settings. + + Returns: + An AGFSClient or AGFSBindingClient instance. + """ + # Ensure agfs_config is not None + if agfs_config is None: + raise ValueError("agfs_config cannot be None") + mode = getattr(agfs_config, "mode", "http-client") + + if mode == "binding-client": + # Setup library path if needed + lib_path = getattr(agfs_config, "lib_path", None) + if lib_path and lib_path not in ["1", "default"]: + os.environ["AGFS_LIB_PATH"] = lib_path + + client = AGFSBindingClient() + logger.info(f"[AGFSUtils] Created AGFSBindingClient (lib_path={lib_path})") + + # Automatically mount backend for binding client + mount_agfs_backend(client, agfs_config) + + return client + else: + # Default to http-client + url = getattr(agfs_config, "url", "http://localhost:8080") + timeout = getattr(agfs_config, "timeout", 10) + client = AGFSClient(api_base_url=url, timeout=timeout) + logger.info(f"[AGFSUtils] Created AGFSClient at {url}") + return client + + +def mount_agfs_backend(agfs: Any, agfs_config: Any) -> None: + """ + Mount backend filesystem for an AGFS client based on configuration. + + Args: + agfs: AGFS client instance (HTTP or Binding). + agfs_config: AGFS configuration object containing backend settings. + """ + from pyagfs import AGFSBindingClient + + # Only binding-client needs manual mounting, but we can also do it for HTTP client + # if it supports it. Usually, HTTP server handles its own mounting. + if not isinstance(agfs, AGFSBindingClient): + return + + # 1. Mount standard plugins to align with HTTP server behavior + # serverinfofs: /serverinfo + try: + agfs.unmount("/serverinfo") + except Exception: + pass + try: + agfs.mount("serverinfofs", "/serverinfo", {"version": "1.0.0"}) + except Exception as e: + logger.warning(f"[AGFSUtils] Failed to mount serverinfofs at /serverinfo: {e}") + + # queuefs: /queue + try: + agfs.unmount("/queue") + except Exception: + pass + try: + agfs.mount("queuefs", "/queue", {}) + except Exception as e: + logger.warning(f"[AGFSUtils] Failed to mount queuefs at /queue: {e}") + + # 2. Mount primary storage backend to /local + backend = getattr(agfs_config, "backend", "local") + mount_path = "/local" + config = {} + + if backend == "local": + path = getattr(agfs_config, "path", "./data") + config = {"local_dir": str(path)} + elif backend == "s3": + s3_config = getattr(agfs_config, "s3", None) + if s3_config: + config = { + "bucket": s3_config.bucket, + "region": s3_config.region, + "access_key_id": s3_config.access_key, + "secret_access_key": s3_config.secret_key, + "endpoint": s3_config.endpoint, + "prefix": s3_config.prefix or "", + "disable_ssl": not s3_config.use_ssl, + "use_path_style": s3_config.use_path_style, + } + elif backend == "memory": + # memfs plugin + config = {} + + fstype = f"{backend}fs" + if backend == "memory": + fstype = "memfs" + + # Try to unmount existing mount at /local to allow backend switching + try: + agfs.unmount(mount_path) + except Exception: + pass + + try: + agfs.mount(fstype, mount_path, config) + logger.info(f"[AGFSUtils] Mounted {fstype} at {mount_path} with config={config}") + except Exception as e: + logger.error(f"[AGFSUtils] Failed to mount {fstype} at {mount_path}: {e}") + raise e diff --git a/openviking_cli/utils/config/agfs_config.py b/openviking_cli/utils/config/agfs_config.py index a152042e..2f2df7c8 100644 --- a/openviking_cli/utils/config/agfs_config.py +++ b/openviking_cli/utils/config/agfs_config.py @@ -84,6 +84,11 @@ class AGFSConfig(BaseModel): default="http://localhost:1833", description="AGFS service URL for service mode" ) + mode: str = Field( + default="http-client", + description="AGFS client mode: 'http-client' | 'binding-client'", + ) + backend: str = Field( default="local", description="AGFS storage backend: 'local' | 's3' | 'memory'" ) @@ -97,6 +102,12 @@ class AGFSConfig(BaseModel): description="Enable/Disable SSL (HTTPS) for AGFS service. Set to False for local testing without HTTPS.", ) + lib_path: Optional[str] = Field( + default=None, + description="Path to AGFS binding shared library. If set, use python binding instead of HTTP client. " + "Default: third_party/agfs/bin/libagfsbinding.{so,dylib}", + ) + # S3 backend configuration # These settings are used when backend is set to 's3'. # AGFS will act as a gateway to the specified S3 bucket. @@ -107,6 +118,11 @@ class AGFSConfig(BaseModel): @model_validator(mode="after") def validate_config(self): """Validate configuration completeness and consistency""" + if self.mode not in ["http-client", "binding-client"]: + raise ValueError( + f"Invalid AGFS mode: '{self.mode}'. Must be one of: 'http-client', 'binding-client'" + ) + if self.backend not in ["local", "s3", "memory"]: raise ValueError( f"Invalid AGFS backend: '{self.backend}'. Must be one of: 'local', 's3', 'memory'" diff --git a/setup.py b/setup.py index 364d78dd..8a8089c4 100644 --- a/setup.py +++ b/setup.py @@ -33,21 +33,31 @@ def _copy_binary(self, src, dst): os.chmod(str(dst), 0o755) def build_agfs(self): - """Build AGFS server from source.""" + """Build AGFS server and binding library from source.""" # Paths binary_name = "agfs-server.exe" if sys.platform == "win32" else "agfs-server" + if sys.platform == "win32": + lib_name = "libagfsbinding.dll" + elif sys.platform == "darwin": + lib_name = "libagfsbinding.dylib" + else: + lib_name = "libagfsbinding.so" + agfs_server_dir = Path("third_party/agfs/agfs-server").resolve() # Target in source tree (for development/install) agfs_bin_dir = Path("openviking/bin").resolve() agfs_target_binary = agfs_bin_dir / binary_name + agfs_target_lib = agfs_bin_dir / lib_name # 1. Try to build from source if agfs_server_dir.exists() and shutil.which("go"): - print("Building AGFS server from source...") + print("Building AGFS from source...") import subprocess + # Build server try: + print(f"Building AGFS server: {binary_name}") build_args = ( ["go", "build", "-o", f"build/{binary_name}", "cmd/server/main.go"] if sys.platform == "win32" @@ -71,7 +81,7 @@ def build_agfs(self): f"Build succeeded but binary not found at {agfs_built_binary}" ) except (subprocess.CalledProcessError, Exception) as e: - error_msg = f"Failed to build AGFS from source: {e}" + error_msg = f"Failed to build AGFS server from source: {e}" if isinstance(e, subprocess.CalledProcessError): if e.stdout: error_msg += ( @@ -81,19 +91,61 @@ def build_agfs(self): error_msg += ( f"\nBuild stderr:\n{e.stderr.decode('utf-8', errors='replace')}" ) - raise RuntimeError(error_msg) + print(f"[Warning] {error_msg}") + + # Build binding library + try: + print(f"Building AGFS binding library: {lib_name}") + # Use CGO_ENABLED=1 for shared library + env = os.environ.copy() + env["CGO_ENABLED"] = "1" + + pybinding_dir = agfs_server_dir / "cmd/pybinding" + lib_build_args = [ + "go", + "build", + "-buildmode=c-shared", + "-o", + f"build/{lib_name}", + ".", + ] + + subprocess.run( + lib_build_args, + cwd=str(pybinding_dir), + env=env, + check=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + + agfs_built_lib = pybinding_dir / "build" / lib_name + if agfs_built_lib.exists(): + self._copy_binary(agfs_built_lib, agfs_target_lib) + print("[OK] AGFS binding library built successfully") + else: + print(f"[Warning] Binding library not found at {agfs_built_lib}") + except Exception as e: + print(f"[Warning] Failed to build AGFS binding library: {e}") + if isinstance(e, subprocess.CalledProcessError): + if e.stdout: + print(f"Build stdout: {e.stdout.decode('utf-8', errors='replace')}") + if e.stderr: + print(f"Build stderr: {e.stderr.decode('utf-8', errors='replace')}") + else: if not agfs_server_dir.exists(): raise FileNotFoundError(f"AGFS source directory not found at {agfs_server_dir}") else: - raise RuntimeError("Go compiler not found. Please install Go to build AGFS server.") + raise RuntimeError("Go compiler not found. Please install Go to build AGFS.") - # 2. Ensure AGFS binary is copied to the build directory (where wheel is packaged from) + # 2. Ensure binaries are copied to the build directory (where wheel is packaged from) if self.build_lib: agfs_bin_dir_build = Path(self.build_lib) / "openviking/bin" - dst = agfs_bin_dir_build / binary_name if agfs_target_binary.exists(): - self._copy_binary(agfs_target_binary, dst) + self._copy_binary(agfs_target_binary, agfs_bin_dir_build / binary_name) + if agfs_target_lib.exists(): + self._copy_binary(agfs_target_lib, agfs_bin_dir_build / lib_name) def build_extension(self, ext): """Build a single C++ extension module using CMake.""" @@ -143,6 +195,9 @@ def build_extension(self, ext): "openviking": [ "bin/agfs-server", "bin/agfs-server.exe", + "bin/libagfsbinding.so", + "bin/libagfsbinding.dylib", + "bin/libagfsbinding.dll", ], }, include_package_data=True, diff --git a/tests/agfs/test_fs_binding.py b/tests/agfs/test_fs_binding.py new file mode 100644 index 00000000..55986f82 --- /dev/null +++ b/tests/agfs/test_fs_binding.py @@ -0,0 +1,154 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 + +"""AGFS Python Binding Tests for VikingFS interface + +Tests the python binding mode of VikingFS which directly uses AGFS implementation +without HTTP server. +""" + +import os +import platform +import uuid +from pathlib import Path + +import pytest + +from openviking.storage.viking_fs import init_viking_fs +from openviking_cli.utils.config.agfs_config import AGFSConfig + +# Direct configuration for testing +AGFS_CONF = AGFSConfig(path="/tmp/ov-test", backend="local", mode="binding-client") + +# Ensure test directory exists +os.makedirs(AGFS_CONF.path, exist_ok=True) + + +def get_lib_path() -> str: + """Get the path to AGFS binding shared library.""" + system = platform.system() + if system == "Darwin": + lib_name = "libagfsbinding.dylib" + elif system == "Windows": + lib_name = "libagfsbinding.dll" + else: + lib_name = "libagfsbinding.so" + + project_root = Path(__file__).parent.parent.parent + lib_path = project_root / "third_party" / "agfs" / "bin" / lib_name + + if lib_path.exists(): + return str(lib_path) + + env_path = os.environ.get("AGFS_LIB_PATH") + if env_path and Path(env_path).exists(): + return env_path + + return None + + +LIB_PATH = get_lib_path() + + +pytestmark = pytest.mark.skipif( + LIB_PATH is None, + reason="AGFS binding library not found. Build it first: make -C third_party/agfs/agfs-server/cmd/pybinding", +) + + +@pytest.fixture(scope="module") +async def viking_fs_binding_instance(): + """Initialize VikingFS with binding mode.""" + from openviking.utils.agfs_utils import create_agfs_client + + # Set lib_path for the test + AGFS_CONF.lib_path = LIB_PATH + + # Create AGFS client + agfs_client = create_agfs_client(AGFS_CONF) + + # Initialize VikingFS with client + vfs = init_viking_fs(agfs=agfs_client) + + yield vfs + + +@pytest.mark.asyncio +class TestVikingFSBindingLocal: + """Test VikingFS operations with binding mode (local backend).""" + + async def test_file_operations(self, viking_fs_binding_instance): + """Test VikingFS file operations: read, write, ls, stat.""" + vfs = viking_fs_binding_instance + test_filename = f"binding_file_{uuid.uuid4().hex}.txt" + test_content = "Hello VikingFS Binding! " + uuid.uuid4().hex + test_uri = f"viking://temp/{test_filename}" + + await vfs.write(test_uri, test_content) + + stat_info = await vfs.stat(test_uri) + assert stat_info["name"] == test_filename + assert not stat_info["isDir"] + + entries = await vfs.ls("viking://temp/") + assert any(e["name"] == test_filename for e in entries) + + read_data = await vfs.read(test_uri) + assert read_data.decode("utf-8") == test_content + + await vfs.rm(test_uri) + + async def test_directory_operations(self, viking_fs_binding_instance): + """Test VikingFS directory operations: mkdir, rm, ls, stat.""" + vfs = viking_fs_binding_instance + test_dir = f"binding_dir_{uuid.uuid4().hex}" + test_dir_uri = f"viking://temp/{test_dir}/" + + await vfs.mkdir(test_dir_uri) + + stat_info = await vfs.stat(test_dir_uri) + assert stat_info["name"] == test_dir + assert stat_info["isDir"] + + root_entries = await vfs.ls("viking://temp/") + assert any(e["name"] == test_dir and e["isDir"] for e in root_entries) + + file_uri = f"{test_dir_uri}inner.txt" + await vfs.write(file_uri, "inner content") + + sub_entries = await vfs.ls(test_dir_uri) + assert any(e["name"] == "inner.txt" for e in sub_entries) + + await vfs.rm(test_dir_uri, recursive=True) + + root_entries = await vfs.ls("viking://temp/") + assert not any(e["name"] == test_dir for e in root_entries) + + async def test_tree_operations(self, viking_fs_binding_instance): + """Test VikingFS tree operations.""" + vfs = viking_fs_binding_instance + base_dir = f"binding_tree_test_{uuid.uuid4().hex}" + sub_dir = f"viking://temp/{base_dir}/a/b/" + file_uri = f"{sub_dir}leaf.txt" + + await vfs.mkdir(sub_dir) + await vfs.write(file_uri, "leaf content") + + entries = await vfs.tree(f"viking://temp/{base_dir}/") + assert any("leaf.txt" in e["uri"] for e in entries) + + await vfs.rm(f"viking://temp/{base_dir}/", recursive=True) + + async def test_binary_operations(self, viking_fs_binding_instance): + """Test VikingFS binary file operations.""" + vfs = viking_fs_binding_instance + test_filename = f"binding_binary_{uuid.uuid4().hex}.bin" + test_content = bytes([i % 256 for i in range(256)]) + test_uri = f"viking://temp/{test_filename}" + + await vfs.write(test_uri, test_content) + + read_data = await vfs.read(test_uri) + assert read_data == test_content + + await vfs.rm(test_uri) diff --git a/tests/agfs/test_fs_binding_s3.py b/tests/agfs/test_fs_binding_s3.py new file mode 100644 index 00000000..713d8c92 --- /dev/null +++ b/tests/agfs/test_fs_binding_s3.py @@ -0,0 +1,175 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 + +"""AGFS Python Binding Tests for VikingFS interface with S3 backend + +Tests the python binding mode of VikingFS with S3 backend (MinIO/TOS). +""" + +import json +import os +import platform +import uuid +from pathlib import Path + +import pytest + +from openviking.storage.viking_fs import init_viking_fs +from openviking_cli.utils.config.agfs_config import AGFSConfig + +CONFIG_FILE = os.getenv("OPENVIKING_CONFIG_FILE") +if not CONFIG_FILE: + default_conf = Path(__file__).parent / "ov.conf" + if default_conf.exists(): + CONFIG_FILE = str(default_conf) + + +def load_agfs_config() -> AGFSConfig: + """Load only AGFS configuration from the config file.""" + if not CONFIG_FILE or not Path(CONFIG_FILE).exists(): + return None + + try: + with open(CONFIG_FILE, "r") as f: + full_config = json.load(f) + + agfs_data = full_config.get("storage", {}).get("agfs") or full_config.get("agfs") + if not agfs_data: + return None + + return AGFSConfig(**agfs_data) + except Exception: + return None + + +AGFS_CONF = load_agfs_config() + + +def get_lib_path() -> str: + """Get the path to AGFS binding shared library.""" + system = platform.system() + if system == "Darwin": + lib_name = "libagfsbinding.dylib" + elif system == "Windows": + lib_name = "libagfsbinding.dll" + else: + lib_name = "libagfsbinding.so" + + project_root = Path(__file__).parent.parent.parent + lib_path = project_root / "third_party" / "agfs" / "bin" / lib_name + + if lib_path.exists(): + return str(lib_path) + + env_path = os.environ.get("AGFS_LIB_PATH") + if env_path and Path(env_path).exists(): + return env_path + + return None + + +LIB_PATH = get_lib_path() + + +pytestmark = pytest.mark.skipif( + LIB_PATH is None or AGFS_CONF is None or AGFS_CONF.backend != "s3", + reason="AGFS binding library not found or S3 configuration not available", +) + + +@pytest.fixture(scope="module") +async def viking_fs_binding_s3_instance(): + """Initialize VikingFS with binding mode for S3 backend.""" + from openviking.utils.agfs_utils import create_agfs_client + + # Set lib_path for the test + AGFS_CONF.lib_path = LIB_PATH + + # Create AGFS client + agfs_client = create_agfs_client(AGFS_CONF) + + # Initialize VikingFS with client + vfs = init_viking_fs(agfs=agfs_client) + + yield vfs + + +@pytest.mark.asyncio +class TestVikingFSBindingS3: + """Test VikingFS operations with binding mode (S3 backend).""" + + async def test_s3_file_operations(self, viking_fs_binding_s3_instance): + """Test VikingFS file operations on S3: read, write, ls, stat.""" + vfs = viking_fs_binding_s3_instance + test_filename = f"s3_binding_file_{uuid.uuid4().hex}.txt" + test_content = "Hello VikingFS S3 Binding! " + uuid.uuid4().hex + test_uri = f"viking://temp/{test_filename}" + + await vfs.write(test_uri, test_content) + + stat_info = await vfs.stat(test_uri) + assert stat_info["name"] == test_filename + assert not stat_info["isDir"] + + entries = await vfs.ls("viking://temp/") + assert any(e["name"] == test_filename for e in entries) + + read_data = await vfs.read(test_uri) + assert read_data.decode("utf-8") == test_content + + await vfs.rm(test_uri) + + async def test_s3_directory_operations(self, viking_fs_binding_s3_instance): + """Test VikingFS directory operations on S3: mkdir, rm, ls, stat.""" + vfs = viking_fs_binding_s3_instance + test_dir = f"s3_binding_dir_{uuid.uuid4().hex}" + test_dir_uri = f"viking://temp/{test_dir}/" + + await vfs.mkdir(test_dir_uri) + + stat_info = await vfs.stat(test_dir_uri) + assert stat_info["name"] == test_dir + assert stat_info["isDir"] + + root_entries = await vfs.ls("viking://temp/") + assert any(e["name"] == test_dir and e["isDir"] for e in root_entries) + + file_uri = f"{test_dir_uri}inner.txt" + await vfs.write(file_uri, "inner content for S3") + + sub_entries = await vfs.ls(test_dir_uri) + assert any(e["name"] == "inner.txt" for e in sub_entries) + + await vfs.rm(test_dir_uri, recursive=True) + + root_entries = await vfs.ls("viking://temp/") + assert not any(e["name"] == test_dir for e in root_entries) + + async def test_s3_tree_operations(self, viking_fs_binding_s3_instance): + """Test VikingFS tree operations on S3.""" + vfs = viking_fs_binding_s3_instance + base_dir = f"s3_binding_tree_{uuid.uuid4().hex}" + sub_dir = f"viking://temp/{base_dir}/a/b/" + file_uri = f"{sub_dir}leaf.txt" + + await vfs.mkdir(sub_dir) + await vfs.write(file_uri, "leaf content in S3") + + entries = await vfs.tree(f"viking://temp/{base_dir}/") + assert any("leaf.txt" in e["uri"] for e in entries) + + await vfs.rm(f"viking://temp/{base_dir}/", recursive=True) + + async def test_s3_binary_operations(self, viking_fs_binding_s3_instance): + """Test VikingFS binary file operations on S3.""" + vfs = viking_fs_binding_s3_instance + test_filename = f"s3_binding_binary_{uuid.uuid4().hex}.bin" + test_content = bytes([i % 256 for i in range(256)]) + test_uri = f"viking://temp/{test_filename}" + + await vfs.write(test_uri, test_content) + + read_data = await vfs.read(test_uri) + assert read_data == test_content + + await vfs.rm(test_uri) diff --git a/tests/agfs/test_fs_local.py b/tests/agfs/test_fs_local.py index d9e2e1cb..4dd2fbe3 100644 --- a/tests/agfs/test_fs_local.py +++ b/tests/agfs/test_fs_local.py @@ -3,10 +3,8 @@ """AGFS Local Backend Tests for VikingFS interface""" -import json import os import uuid -from pathlib import Path import pytest @@ -14,52 +12,28 @@ from openviking.storage.viking_fs import init_viking_fs from openviking_cli.utils.config.agfs_config import AGFSConfig -# 1. Config loading logic -# Try to load from environment variable or default ov.conf -CONFIG_FILE = os.getenv("OPENVIKING_CONFIG_FILE") -if not CONFIG_FILE: - # Try default ov.conf in tests/agfs - default_conf = Path(__file__).parent / "ov.conf" - if default_conf.exists(): - CONFIG_FILE = str(default_conf) - - -def load_agfs_config() -> AGFSConfig: - """Load only AGFS configuration from the config file.""" - if not CONFIG_FILE or not Path(CONFIG_FILE).exists(): - return None - - try: - with open(CONFIG_FILE, "r") as f: - full_config = json.load(f) - - # Support both 'storage.agfs' and top-level 'agfs' structures - agfs_data = full_config.get("storage", {}).get("agfs") or full_config.get("agfs") - if not agfs_data: - return None - - return AGFSConfig(**agfs_data) - except Exception: - return None - - -AGFS_CONF = load_agfs_config() - -# 2. Skip tests if no local config found or backend is not local -pytestmark = pytest.mark.skipif( - AGFS_CONF is None or AGFS_CONF.backend != "local", - reason="AGFS local configuration not found in ov.conf or backend is not local", +# 1. Direct configuration for testing +AGFS_CONF = AGFSConfig( + path="/tmp/ov-test", backend="local", port=1833, url="http://localhost:1833", timeout=10 ) +# 2. Ensure test directory exists +os.makedirs(AGFS_CONF.path, exist_ok=True) + @pytest.fixture(scope="module") async def viking_fs_instance(): """Initialize AGFS Manager and VikingFS singleton.""" + from openviking.utils.agfs_utils import create_agfs_client + manager = AGFSManager(config=AGFS_CONF) manager.start() - # Initialize VikingFS with agfs_url - vfs = init_viking_fs(agfs_url=AGFS_CONF.url, timeout=AGFS_CONF.timeout) + # Create AGFS client + agfs_client = create_agfs_client(AGFS_CONF) + + # Initialize VikingFS with client + vfs = init_viking_fs(agfs=agfs_client) yield vfs @@ -76,7 +50,7 @@ async def test_file_operations(self, viking_fs_instance): vfs = viking_fs_instance test_filename = f"local_file_{uuid.uuid4().hex}.txt" test_content = "Hello VikingFS Local! " + uuid.uuid4().hex - test_uri = f"viking://{test_filename}" + test_uri = f"viking://temp/{test_filename}" # 1. Write file await vfs.write(test_uri, test_content) @@ -87,7 +61,7 @@ async def test_file_operations(self, viking_fs_instance): assert not stat_info["isDir"] # 3. List directory - entries = await vfs.ls("viking://") + entries = await vfs.ls("viking://temp/") assert any(e["name"] == test_filename for e in entries) # 4. Read file @@ -101,7 +75,7 @@ async def test_directory_operations(self, viking_fs_instance): """Test VikingFS directory operations: mkdir, rm, ls, stat.""" vfs = viking_fs_instance test_dir = f"local_dir_{uuid.uuid4().hex}" - test_dir_uri = f"viking://{test_dir}/" + test_dir_uri = f"viking://temp/{test_dir}/" # 1. Create directory await vfs.mkdir(test_dir_uri) @@ -112,7 +86,7 @@ async def test_directory_operations(self, viking_fs_instance): assert stat_info["isDir"] # 3. List root to see directory - root_entries = await vfs.ls("viking://") + root_entries = await vfs.ls("viking://temp/") assert any(e["name"] == test_dir and e["isDir"] for e in root_entries) # 4. Write a file inside @@ -127,22 +101,22 @@ async def test_directory_operations(self, viking_fs_instance): await vfs.rm(test_dir_uri, recursive=True) # 7. Verify deletion - root_entries = await vfs.ls("viking://") + root_entries = await vfs.ls("viking://temp/") assert not any(e["name"] == test_dir for e in root_entries) async def test_ensure_dirs(self, viking_fs_instance): """Test VikingFS ensure_dirs.""" vfs = viking_fs_instance base_dir = f"local_tree_test_{uuid.uuid4().hex}" - sub_dir = f"viking://{base_dir}/a/b/" + sub_dir = f"viking://temp/{base_dir}/a/b/" file_uri = f"{sub_dir}leaf.txt" await vfs.mkdir(sub_dir) await vfs.write(file_uri, "leaf content") # VikingFS.tree provides recursive listing - entries = await vfs.tree(f"viking://{base_dir}/") + entries = await vfs.tree(f"viking://temp/{base_dir}/") assert any("leaf.txt" in e["uri"] for e in entries) # Cleanup - await vfs.rm(f"viking://{base_dir}/", recursive=True) + await vfs.rm(f"viking://temp/{base_dir}/", recursive=True) diff --git a/tests/agfs/test_fs_s3.py b/tests/agfs/test_fs_s3.py index 62a6f8f0..ce8d0f86 100644 --- a/tests/agfs/test_fs_s3.py +++ b/tests/agfs/test_fs_s3.py @@ -72,11 +72,16 @@ def s3_client(): @pytest.fixture(scope="module") async def viking_fs_instance(): """Initialize AGFS Manager and VikingFS singleton.""" + from openviking.utils.agfs_utils import create_agfs_client + manager = AGFSManager(config=AGFS_CONF) manager.start() - # Initialize VikingFS with agfs_url (only basic IO needed) - vfs = init_viking_fs(agfs_url=AGFS_CONF.url, timeout=AGFS_CONF.timeout) + # Create AGFS client + agfs_client = create_agfs_client(AGFS_CONF) + + # Initialize VikingFS with client + vfs = init_viking_fs(agfs=agfs_client) yield vfs @@ -97,13 +102,14 @@ async def test_file_operations(self, viking_fs_instance: "VikingFS", s3_client): test_filename = f"verify_{uuid.uuid4().hex}.txt" test_content = "Hello VikingFS S3! " + uuid.uuid4().hex - test_uri = f"viking://{test_filename}" + test_uri = f"viking://temp/{test_filename}" # 1. Write via VikingFS await vfs.write(test_uri, test_content) # 2. Verify existence and content via S3 client - s3_key = f"{prefix}{test_filename}" + # VikingFS maps viking://temp/{test_filename} to /local/default/temp/{test_filename} + s3_key = f"{prefix}default/temp/{test_filename}" response = s3_client.get_object(Bucket=bucket, Key=s3_key) s3_content = response["Body"].read().decode("utf-8") assert s3_content == test_content @@ -114,7 +120,7 @@ async def test_file_operations(self, viking_fs_instance: "VikingFS", s3_client): assert not stat_info["isDir"] # 4. List via VikingFS - entries = await vfs.ls("viking://") + entries = await vfs.ls("viking://temp/") assert any(e["name"] == test_filename for e in entries) # 5. Read back via VikingFS @@ -137,7 +143,7 @@ async def test_directory_operations(self, viking_fs_instance, s3_client): prefix = s3_conf.prefix or "" test_dir = f"test_dir_{uuid.uuid4().hex}" - test_dir_uri = f"viking://{test_dir}/" + test_dir_uri = f"viking://temp/{test_dir}/" # 1. Create directory via VikingFS await vfs.mkdir(test_dir_uri) @@ -147,12 +153,13 @@ async def test_directory_operations(self, viking_fs_instance, s3_client): file_content = "inner content" await vfs.write(file_uri, file_content) - s3_key = f"{prefix}{test_dir}/inner.txt" + # VikingFS maps viking://temp/{test_dir}/inner.txt to /local/default/temp/{test_dir}/inner.txt + s3_key = f"{prefix}default/temp/{test_dir}/inner.txt" response = s3_client.get_object(Bucket=bucket, Key=s3_key) assert response["Body"].read().decode("utf-8") == file_content # 3. List via VikingFS - root_entries = await vfs.ls("viking://") + root_entries = await vfs.ls("viking://temp/") assert any(e["name"] == test_dir and e["isDir"] for e in root_entries) # 4. Delete directory recursively via VikingFS @@ -166,15 +173,15 @@ async def test_ensure_dirs(self, viking_fs_instance: "VikingFS"): """Test VikingFS ensure_dirs.""" vfs = viking_fs_instance base_dir = f"tree_test_{uuid.uuid4().hex}" - sub_dir = f"viking://{base_dir}/a/b/" + sub_dir = f"viking://temp/{base_dir}/a/b/" file_uri = f"{sub_dir}leaf.txt" await vfs.mkdir(sub_dir) await vfs.write(file_uri, "leaf content") # VikingFS.tree provides recursive listing - entries = await vfs.tree(f"viking://{base_dir}/") + entries = await vfs.tree(f"viking://temp/{base_dir}/") assert any("leaf.txt" in e["uri"] for e in entries) # Cleanup - await vfs.rm(f"viking://{base_dir}/", recursive=True) + await vfs.rm(f"viking://temp/{base_dir}/", recursive=True) diff --git a/tests/eval/test_ragas_basic.py b/tests/eval/test_ragas_basic.py index 486dbb85..edd3b382 100644 --- a/tests/eval/test_ragas_basic.py +++ b/tests/eval/test_ragas_basic.py @@ -1,14 +1,13 @@ # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. # SPDX-License-Identifier: Apache-2.0 -import pytest -import tempfile import json +import tempfile from pathlib import Path -from openviking.eval.ragas.types import EvalSample, EvalDataset from openviking.eval.ragas.generator import DatasetGenerator from openviking.eval.ragas.pipeline import RAGQueryPipeline +from openviking.eval.ragas.types import EvalDataset, EvalSample def test_eval_types(): @@ -16,11 +15,11 @@ def test_eval_types(): query="test query", context=["context1", "context2"], response="test response", - ground_truth="test ground truth" + ground_truth="test ground truth", ) assert sample.query == "test query" assert len(sample.context) == 2 - + dataset = EvalDataset(samples=[sample]) assert len(dataset) == 1 @@ -38,16 +37,16 @@ def test_pipeline_initialization(): def test_question_loader(): - with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f: + with tempfile.NamedTemporaryFile(mode="w", suffix=".jsonl", delete=False) as f: f.write('{"question": "What is OpenViking?"}\n') f.write('{"question": "How does memory work?", "ground_truth": "Hierarchical"}\n') - f.write('\n') + f.write("\n") f.write('{"invalid": "no question field"}\n') temp_path = f.name - + try: questions = [] - with open(temp_path, 'r') as f: + with open(temp_path, "r") as f: for line in f: line = line.strip() if not line: @@ -58,7 +57,7 @@ def test_question_loader(): questions.append(item) except json.JSONDecodeError: pass - + assert len(questions) == 2 assert questions[0]["question"] == "What is OpenViking?" assert questions[1]["ground_truth"] == "Hierarchical" @@ -71,10 +70,10 @@ def test_eval_dataset_operations(): EvalSample(query="q1", context=["c1"], response="r1"), EvalSample(query="q2", context=["c2"], response="r2"), ] - + dataset = EvalDataset(name="test_dataset", samples=samples) assert len(dataset) == 2 assert dataset.name == "test_dataset" - + dataset.samples.append(EvalSample(query="q3", context=["c3"])) assert len(dataset) == 3 diff --git a/tests/eval/test_ragas_eval.py b/tests/eval/test_ragas_eval.py index 02cef7c0..8a92131b 100644 --- a/tests/eval/test_ragas_eval.py +++ b/tests/eval/test_ragas_eval.py @@ -2,18 +2,18 @@ # SPDX-License-Identifier: Apache-2.0 import json -import pytest from pathlib import Path +import pytest + from openviking.eval.ragas import ( - RagasEvaluator, - RagasConfig, - EvalSample, EvalDataset, + EvalSample, + RagasConfig, + RagasEvaluator, _create_ragas_llm_from_config, ) - EVAL_RESULTS_FILE = Path(__file__).parent.parent.parent / "eval_results.json" @@ -233,5 +233,6 @@ def test_ragas_evaluator_no_llm_error(monkeypatch): pytest.skip("LLM is configured, skipping no-LLM error test") import asyncio + with pytest.raises(ValueError, match="RAGAS evaluation requires an LLM"): asyncio.run(evaluator.evaluate_dataset(dataset)) diff --git a/tests/eval/test_ragas_validation.py b/tests/eval/test_ragas_validation.py index d699d6af..d4a321b0 100644 --- a/tests/eval/test_ragas_validation.py +++ b/tests/eval/test_ragas_validation.py @@ -8,13 +8,13 @@ import json import sys from pathlib import Path -from typing import List, Dict, Any +from typing import Any, Dict, List def load_jsonl(file_path: str) -> List[Dict[str, Any]]: """Load JSONL file and return list of dicts.""" data = [] - with open(file_path, 'r', encoding='utf-8') as f: + with open(file_path, "r", encoding="utf-8") as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: @@ -30,24 +30,24 @@ def load_jsonl(file_path: str) -> List[Dict[str, Any]]: def validate_item(item: Dict[str, Any], index: int) -> List[str]: """Validate a single item from JSONL.""" errors = [] - - if 'question' not in item: + + if "question" not in item: errors.append(f"Item {index}: Missing 'question' field") - - if 'files' not in item: + + if "files" not in item: errors.append(f"Item {index}: Missing 'files' field") - elif not isinstance(item['files'], list): + elif not isinstance(item["files"], list): errors.append(f"Item {index}: 'files' should be a list") else: - for i, file_ref in enumerate(item['files']): + for i, file_ref in enumerate(item["files"]): if not isinstance(file_ref, str): errors.append(f"Item {index}: files[{i}] should be a string") - elif ':' not in file_ref: + elif ":" not in file_ref: errors.append(f"Item {index}: files[{i}] should contain ':' for line range") - - if 'answer' not in item: + + if "answer" not in item: errors.append(f"Item {index}: Missing 'answer' field") - + return errors @@ -56,37 +56,38 @@ def test_eval_types(): print("\n📦 Testing EvalSample and EvalDataset types...") jsonl_path = Path.cwd() / "openviking" / "eval" / "datasets" / "local_doc_example_glm5.jsonl" data = load_jsonl(jsonl_path) - - from openviking.eval.ragas.types import EvalSample, EvalDataset - + + from openviking.eval.ragas.types import EvalDataset, EvalSample + samples = [] - for i, item in enumerate(data[:3]): + for _i, item in enumerate(data[:3]): sample = EvalSample( - query=item.get('question', ''), - context=[item.get('answer', '')[:200]], - response=item.get('answer', '')[:100], - ground_truth=item.get('answer', '')[:100], - meta={'source': 'validation', 'files': item.get('files', [])} + query=item.get("question", ""), + context=[item.get("answer", "")[:200]], + response=item.get("answer", "")[:100], + ground_truth=item.get("answer", "")[:100], + meta={"source": "validation", "files": item.get("files", [])}, ) samples.append(sample) - - dataset = EvalDataset(name='validation_test', samples=samples) - + + dataset = EvalDataset(name="validation_test", samples=samples) + print(f" ✅ Created {len(samples)} EvalSample instances") print(f" ✅ Created EvalDataset with {len(dataset)} samples") - + assert len(dataset) == len(samples), "Dataset length mismatch" - assert dataset.name == 'validation_test', "Dataset name mismatch" - + assert dataset.name == "validation_test", "Dataset name mismatch" + print(" ✅ All type tests passed") def test_evaluator_initialization(): """Test RagasEvaluator initialization.""" print("\n🔧 Testing RagasEvaluator initialization...") - + try: from openviking.eval import RagasEvaluator + evaluator = RagasEvaluator() print(" ✅ RagasEvaluator initialized successfully") print(f" ✅ Metrics: {[m.name for m in evaluator.metrics]}") @@ -94,22 +95,22 @@ def test_evaluator_initialization(): print(f" ⚠️ RAGAS not installed: {e}") print(" ℹ️ Install with: pip install ragas datasets") return False - + return True def test_pipeline_initialization(): """Test RAGQueryPipeline initialization.""" print("\n🔧 Testing RAGQueryPipeline initialization...") - + from openviking.eval.ragas.pipeline import RAGQueryPipeline - + pipeline = RAGQueryPipeline(config_path="./test.conf", data_path="./test_data") - + assert pipeline.config_path == "./test.conf" assert pipeline.data_path == "./test_data" assert pipeline._client is None - + print(" ✅ RAGQueryPipeline initialized successfully") print(f" ✅ Config path: {pipeline.config_path}") print(f" ✅ Data path: {pipeline.data_path}") @@ -118,16 +119,16 @@ def test_pipeline_initialization(): def test_question_loader(): """Test question loading from JSONL.""" print("\n📄 Testing question loader...") - + jsonl_path = Path.cwd() / "openviking" / "eval" / "datasets" / "local_doc_example_glm5.jsonl" - + data = load_jsonl(jsonl_path) print(f" ✅ Loaded {len(data)} questions from JSONL") - + errors = [] for i, item in enumerate(data): errors.extend(validate_item(item, i)) - + if errors: print(f" ❌ Found {len(errors)} validation errors:") for error in errors[:5]: @@ -140,22 +141,24 @@ def main(): print("=" * 60) print("🧪 OpenViking Eval Module Validation") print("=" * 60) - - jsonl_path = "/Users/bytedance/workspace/github/OpenViking/openviking/eval/local_doc_example_glm5.jsonl" - + + jsonl_path = ( + "/Users/bytedance/workspace/github/OpenViking/openviking/eval/local_doc_example_glm5.jsonl" + ) + if not Path(jsonl_path).exists(): print(f"❌ File not found: {jsonl_path}") sys.exit(1) - + print(f"\n📂 Loading: {jsonl_path}") - + data = load_jsonl(jsonl_path) print(f"✅ Loaded {len(data)} items") - + all_errors = [] for i, item in enumerate(data): all_errors.extend(validate_item(item, i)) - + if all_errors: print(f"\n❌ Found {len(all_errors)} validation errors") for error in all_errors[:10]: @@ -164,31 +167,31 @@ def main(): print(f" ... and {len(all_errors) - 10} more errors") else: print(f"\n✅ All {len(data)} items validated successfully") - + try: test_eval_types(data) except Exception as e: print(f" ❌ Eval types test failed: {e}") all_errors.append(f"Eval types test: {e}") - + try: test_pipeline_initialization() except Exception as e: print(f" ❌ Pipeline test failed: {e}") all_errors.append(f"Pipeline test: {e}") - + try: test_question_loader() except Exception as e: print(f" ❌ Question loader test failed: {e}") all_errors.append(f"Question loader test: {e}") - + try: - ragas_available = test_evaluator_initialization() + test_evaluator_initialization() except Exception as e: print(f" ❌ Evaluator test failed: {e}") all_errors.append(f"Evaluator test: {e}") - + print("\n" + "=" * 60) if all_errors: print(f"❌ Validation completed with {len(all_errors)} errors") diff --git a/tests/test_edge_cases_simple.py b/tests/test_edge_cases_simple.py index d73acc97..92f75e24 100644 --- a/tests/test_edge_cases_simple.py +++ b/tests/test_edge_cases_simple.py @@ -8,72 +8,68 @@ with minimal dependencies, highlighting potential issues in the codebase. """ -import asyncio import json -import os -import tempfile import unicodedata -from pathlib import Path class TestBasicEdgeCases: """Basic edge case tests without heavy dependencies.""" - + def test_filename_length_boundaries(self): """Test various filename lengths.""" # Test exactly 255 bytes filename_255 = "a" * 251 + ".txt" - assert len(filename_255.encode('utf-8')) == 255 - print(f"255-byte filename: PASS") - + assert len(filename_255.encode("utf-8")) == 255 + print("255-byte filename: PASS") + # Test 256 bytes (just over limit) filename_256 = "b" * 252 + ".txt" - assert len(filename_256.encode('utf-8')) == 256 - print(f"256-byte filename: PASS") - + assert len(filename_256.encode("utf-8")) == 256 + print("256-byte filename: PASS") + # Test very long with CJK cjk_filename = "测试文件名" * 30 + ".py" - assert len(cjk_filename.encode('utf-8')) > 400 + assert len(cjk_filename.encode("utf-8")) > 400 print(f"Long CJK filename ({len(cjk_filename.encode('utf-8'))} bytes): PASS") - + def test_special_character_filenames(self): """Test filenames with special characters.""" special_chars = [ "file!@#$.txt", - "file with spaces.txt", + "file with spaces.txt", "file\ttab.txt", "file\nnewline.txt", "файл.txt", # Cyrillic - "档案.txt", # Chinese - "ملف.txt", # Arabic + "档案.txt", # Chinese + "ملف.txt", # Arabic ] - + for filename in special_chars: # Basic validation - should not crash assert len(filename) > 0 assert isinstance(filename, str) - + print("Special character filenames: PASS") - + def test_unicode_edge_cases(self): """Test Unicode edge cases.""" # Zero-width characters zwsp_filename = "test\u200bfile.txt" assert "\u200b" in zwsp_filename print("Zero-width character test: PASS") - + # Combining characters combined = "e\u0301\u0302\u0303.txt" # e with multiple accents assert len(combined) > 5 # Base char + combining chars + extension print("Combining characters test: PASS") - + # Unicode normalization nfc = "café.txt" nfd = "cafe\u0301.txt" assert nfc != nfd - assert unicodedata.normalize('NFC', nfd) == nfc + assert unicodedata.normalize("NFC", nfd) == nfc print("Unicode normalization test: PASS") - + def test_json_edge_cases(self): """Test JSON handling edge cases.""" # Empty JSON @@ -81,7 +77,7 @@ def test_json_edge_cases(self): parsed = json.loads(empty_json) assert parsed == {} print("Empty JSON test: PASS") - + # Deeply nested (but not too deep to crash) nested = {} current = nested @@ -89,20 +85,20 @@ def test_json_edge_cases(self): current[f"level_{i}"] = {} current = current[f"level_{i}"] current["value"] = "deep" - + # Should serialize/deserialize without issues json_str = json.dumps(nested) parsed_nested = json.loads(json_str) assert parsed_nested is not None print("Nested JSON test: PASS") - + # JSON with special characters special_json = {"unicode": "测试", "emoji": "😀", "null_byte": "test\x00null"} json_str = json.dumps(special_json) parsed_special = json.loads(json_str) assert parsed_special["unicode"] == "测试" print("Special character JSON test: PASS") - + def test_path_traversal_patterns(self): """Test path traversal patterns.""" dangerous_paths = [ @@ -112,42 +108,42 @@ def test_path_traversal_patterns(self): "/etc/passwd", "C:\\windows\\system32\\config", ] - + for path in dangerous_paths: # Basic validation - paths should be detectable as dangerous assert ".." in path or "/" in path or "\\" in path - + print("Path traversal pattern detection: PASS") - + def test_empty_and_null_inputs(self): """Test empty and null inputs.""" # Empty strings assert "" == "" assert len("") == 0 - + # Null bytes null_string = "hello\x00world" assert "\x00" in null_string assert len(null_string) == 11 - + # Whitespace only whitespace = " \t\n " assert whitespace.strip() == "" - + print("Empty/null input tests: PASS") - + def test_encoding_edge_cases(self): """Test various encoding scenarios.""" # UTF-8 BOM bom_text = "\ufeffHello World" assert bom_text.startswith("\ufeff") - + # Mixed encoding content (as much as we can test without complex imports) mixed_content = "ASCII text with 中文 and émojis 😀" - utf8_bytes = mixed_content.encode('utf-8') - decoded = utf8_bytes.decode('utf-8') + utf8_bytes = mixed_content.encode("utf-8") + decoded = utf8_bytes.decode("utf-8") assert decoded == mixed_content - + print("Encoding edge case tests: PASS") @@ -155,9 +151,9 @@ def run_all_tests(): """Run all edge case tests.""" print("Running OpenViking Edge Case Tests...") print("=" * 50) - + test_instance = TestBasicEdgeCases() - + tests = [ test_instance.test_filename_length_boundaries, test_instance.test_special_character_filenames, @@ -167,10 +163,10 @@ def run_all_tests(): test_instance.test_empty_and_null_inputs, test_instance.test_encoding_edge_cases, ] - + passed = 0 failed = 0 - + for test in tests: try: test() @@ -178,10 +174,10 @@ def run_all_tests(): except Exception as e: print(f"{test.__name__}: FAILED - {e}") failed += 1 - + print("=" * 50) print(f"Results: {passed} passed, {failed} failed") - + if failed > 0: print("\nFailed tests indicate potential edge cases that need attention!") return 1 @@ -192,4 +188,4 @@ def run_all_tests(): if __name__ == "__main__": exit_code = run_all_tests() - exit(exit_code) \ No newline at end of file + exit(exit_code) diff --git a/third_party/agfs/agfs-sdk/python/pyagfs/__init__.py b/third_party/agfs/agfs-sdk/python/pyagfs/__init__.py index 35d23738..ac0c665d 100644 --- a/third_party/agfs/agfs-sdk/python/pyagfs/__init__.py +++ b/third_party/agfs/agfs-sdk/python/pyagfs/__init__.py @@ -1,18 +1,28 @@ """AGFS Python SDK - Client library for AGFS Server API""" -__version__ = "0.1.6" +__version__ = "0.1.7" from .client import AGFSClient, FileHandle -from .exceptions import AGFSClientError, AGFSConnectionError, AGFSTimeoutError, AGFSHTTPError +from .binding_client import AGFSBindingClient, FileHandle as BindingFileHandle +from .exceptions import ( + AGFSClientError, + AGFSConnectionError, + AGFSTimeoutError, + AGFSHTTPError, + AGFSNotSupportedError, +) from .helpers import cp, upload, download __all__ = [ "AGFSClient", + "AGFSBindingClient", "FileHandle", + "BindingFileHandle", "AGFSClientError", "AGFSConnectionError", "AGFSTimeoutError", "AGFSHTTPError", + "AGFSNotSupportedError", "cp", "upload", "download", diff --git a/third_party/agfs/agfs-sdk/python/pyagfs/binding_client.py b/third_party/agfs/agfs-sdk/python/pyagfs/binding_client.py new file mode 100644 index 00000000..cd7c84ce --- /dev/null +++ b/third_party/agfs/agfs-sdk/python/pyagfs/binding_client.py @@ -0,0 +1,603 @@ +"""AGFS Python Binding Client - Direct binding to AGFS Server implementation""" + +import ctypes +import json +import os +import platform +from pathlib import Path +from typing import List, Dict, Any, Optional, Union, Iterator, BinaryIO + +from .exceptions import AGFSClientError, AGFSNotSupportedError + + +def _find_library() -> str: + """Find the AGFS binding shared library.""" + system = platform.system() + + if system == "Darwin": + lib_name = "libagfsbinding.dylib" + elif system == "Linux": + lib_name = "libagfsbinding.so" + elif system == "Windows": + lib_name = "libagfsbinding.dll" + else: + raise AGFSClientError(f"Unsupported platform: {system}") + + search_paths = [ + Path(__file__).parent / "lib" / lib_name, + Path(__file__).parent.parent / "lib" / lib_name, + Path("/usr/local/lib") / lib_name, + Path("/usr/lib") / lib_name, + Path(os.environ.get("AGFS_LIB_PATH", "")) / lib_name + if os.environ.get("AGFS_LIB_PATH") + else None, + Path("/tmp") / lib_name, + ] + + for path in search_paths: + if path and path.exists(): + return str(path) + + raise AGFSClientError( + f"Could not find {lib_name}. Please set AGFS_LIB_PATH environment variable " + f"or install the library to /usr/local/lib" + ) + + +class BindingLib: + """Wrapper for the AGFS binding shared library.""" + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._load_library() + return cls._instance + + def _load_library(self): + lib_path = _find_library() + self.lib = ctypes.CDLL(lib_path) + self._setup_functions() + + def _setup_functions(self): + self.lib.AGFS_NewClient.argtypes = [] + self.lib.AGFS_NewClient.restype = ctypes.c_int64 + + self.lib.AGFS_FreeClient.argtypes = [ctypes.c_int64] + self.lib.AGFS_FreeClient.restype = None + + self.lib.AGFS_GetLastError.argtypes = [ctypes.c_int64] + self.lib.AGFS_GetLastError.restype = ctypes.c_char_p + + self.lib.AGFS_FreeString.argtypes = [ctypes.c_char_p] + self.lib.AGFS_FreeString.restype = None + + self.lib.AGFS_Health.argtypes = [ctypes.c_int64] + self.lib.AGFS_Health.restype = ctypes.c_int + + self.lib.AGFS_GetCapabilities.argtypes = [ctypes.c_int64] + self.lib.AGFS_GetCapabilities.restype = ctypes.c_char_p + + self.lib.AGFS_Ls.argtypes = [ctypes.c_int64, ctypes.c_char_p] + self.lib.AGFS_Ls.restype = ctypes.c_char_p + + self.lib.AGFS_Read.argtypes = [ + ctypes.c_int64, + ctypes.c_char_p, + ctypes.c_int64, + ctypes.c_int64, + ctypes.POINTER(ctypes.c_char_p), + ctypes.POINTER(ctypes.c_int64), + ] + self.lib.AGFS_Read.restype = ctypes.c_int64 + + self.lib.AGFS_Write.argtypes = [ + ctypes.c_int64, + ctypes.c_char_p, + ctypes.c_void_p, + ctypes.c_int64, + ] + self.lib.AGFS_Write.restype = ctypes.c_char_p + + self.lib.AGFS_Create.argtypes = [ctypes.c_int64, ctypes.c_char_p] + self.lib.AGFS_Create.restype = ctypes.c_char_p + + self.lib.AGFS_Mkdir.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.c_uint] + self.lib.AGFS_Mkdir.restype = ctypes.c_char_p + + self.lib.AGFS_Rm.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.c_int] + self.lib.AGFS_Rm.restype = ctypes.c_char_p + + self.lib.AGFS_Stat.argtypes = [ctypes.c_int64, ctypes.c_char_p] + self.lib.AGFS_Stat.restype = ctypes.c_char_p + + self.lib.AGFS_Mv.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.c_char_p] + self.lib.AGFS_Mv.restype = ctypes.c_char_p + + self.lib.AGFS_Chmod.argtypes = [ctypes.c_int64, ctypes.c_char_p, ctypes.c_uint] + self.lib.AGFS_Chmod.restype = ctypes.c_char_p + + self.lib.AGFS_Touch.argtypes = [ctypes.c_int64, ctypes.c_char_p] + self.lib.AGFS_Touch.restype = ctypes.c_char_p + + self.lib.AGFS_Mounts.argtypes = [ctypes.c_int64] + self.lib.AGFS_Mounts.restype = ctypes.c_char_p + + self.lib.AGFS_Mount.argtypes = [ + ctypes.c_int64, + ctypes.c_char_p, + ctypes.c_char_p, + ctypes.c_char_p, + ] + self.lib.AGFS_Mount.restype = ctypes.c_char_p + + self.lib.AGFS_Unmount.argtypes = [ctypes.c_int64, ctypes.c_char_p] + self.lib.AGFS_Unmount.restype = ctypes.c_char_p + + self.lib.AGFS_LoadPlugin.argtypes = [ctypes.c_int64, ctypes.c_char_p] + self.lib.AGFS_LoadPlugin.restype = ctypes.c_char_p + + self.lib.AGFS_UnloadPlugin.argtypes = [ctypes.c_int64, ctypes.c_char_p] + self.lib.AGFS_UnloadPlugin.restype = ctypes.c_char_p + + self.lib.AGFS_ListPlugins.argtypes = [ctypes.c_int64] + self.lib.AGFS_ListPlugins.restype = ctypes.c_char_p + + self.lib.AGFS_OpenHandle.argtypes = [ + ctypes.c_int64, + ctypes.c_char_p, + ctypes.c_int, + ctypes.c_uint, + ctypes.c_int, + ] + self.lib.AGFS_OpenHandle.restype = ctypes.c_int64 + + self.lib.AGFS_CloseHandle.argtypes = [ctypes.c_int64] + self.lib.AGFS_CloseHandle.restype = ctypes.c_char_p + + self.lib.AGFS_HandleRead.argtypes = [ + ctypes.c_int64, + ctypes.c_int64, + ctypes.c_int64, + ctypes.c_int, + ] + self.lib.AGFS_HandleRead.restype = ctypes.c_char_p + + self.lib.AGFS_HandleWrite.argtypes = [ + ctypes.c_int64, + ctypes.c_void_p, + ctypes.c_int64, + ctypes.c_int64, + ctypes.c_int, + ] + self.lib.AGFS_HandleWrite.restype = ctypes.c_char_p + + self.lib.AGFS_HandleSeek.argtypes = [ctypes.c_int64, ctypes.c_int64, ctypes.c_int] + self.lib.AGFS_HandleSeek.restype = ctypes.c_char_p + + self.lib.AGFS_HandleSync.argtypes = [ctypes.c_int64] + self.lib.AGFS_HandleSync.restype = ctypes.c_char_p + + self.lib.AGFS_HandleStat.argtypes = [ctypes.c_int64] + self.lib.AGFS_HandleStat.restype = ctypes.c_char_p + + self.lib.AGFS_ListHandles.argtypes = [ctypes.c_int64] + self.lib.AGFS_ListHandles.restype = ctypes.c_char_p + + self.lib.AGFS_GetHandleInfo.argtypes = [ctypes.c_int64] + self.lib.AGFS_GetHandleInfo.restype = ctypes.c_char_p + + +class AGFSBindingClient: + """Client for interacting with AGFS using Python binding (no HTTP server required). + + This client directly uses the AGFS server implementation through a shared library, + providing better performance than the HTTP client by avoiding network overhead. + + The interface is compatible with the HTTP client (AGFSClient), allowing easy + switching between implementations. + """ + + def __init__(self, config_path: Optional[str] = None): + """ + Initialize AGFS binding client. + + Args: + config_path: Optional path to configuration file (not used in binding mode). + """ + self._lib = BindingLib() + self._client_id = self._lib.lib.AGFS_NewClient() + if self._client_id <= 0: + raise AGFSClientError("Failed to create AGFS client") + + def __del__(self): + if hasattr(self, "_client_id") and self._client_id > 0: + try: + self._lib.lib.AGFS_FreeClient(self._client_id) + except Exception: + pass + + def _parse_response(self, result: bytes) -> Dict[str, Any]: + """Parse JSON response from the library.""" + if isinstance(result, bytes): + result = result.decode("utf-8") + data = json.loads(result) + + if "error_id" in data and data["error_id"] != 0: + error_msg = self._lib.lib.AGFS_GetLastError(data["error_id"]) + if isinstance(error_msg, bytes): + error_msg = error_msg.decode("utf-8") + raise AGFSClientError(error_msg if error_msg else "Unknown error") + + return data + + def health(self) -> Dict[str, Any]: + """Check client health.""" + result = self._lib.lib.AGFS_Health(self._client_id) + return {"status": "healthy" if result == 1 else "unhealthy"} + + def get_capabilities(self) -> Dict[str, Any]: + """Get client capabilities.""" + result = self._lib.lib.AGFS_GetCapabilities(self._client_id) + return self._parse_response(result) + + def ls(self, path: str = "/") -> List[Dict[str, Any]]: + """List directory contents.""" + result = self._lib.lib.AGFS_Ls(self._client_id, path.encode("utf-8")) + data = self._parse_response(result) + return data.get("files", []) + + def read(self, path: str, offset: int = 0, size: int = -1, stream: bool = False): + return self.cat(path, offset, size, stream) + + def cat(self, path: str, offset: int = 0, size: int = -1, stream: bool = False): + """Read file content with optional offset and size.""" + if stream: + raise AGFSNotSupportedError("Streaming not supported in binding mode") + + result_ptr = ctypes.c_char_p() + size_ptr = ctypes.c_int64() + + error_id = self._lib.lib.AGFS_Read( + self._client_id, + path.encode("utf-8"), + ctypes.c_int64(offset), + ctypes.c_int64(size), + ctypes.byref(result_ptr), + ctypes.byref(size_ptr), + ) + + if error_id < 0: + error_msg = self._lib.lib.AGFS_GetLastError(error_id) + if isinstance(error_msg, bytes): + error_msg = error_msg.decode("utf-8") + raise AGFSClientError(error_msg if error_msg else "Unknown error") + + if result_ptr: + data = ctypes.string_at(result_ptr, size_ptr.value) + return data + + return b"" + + def write( + self, path: str, data: Union[bytes, Iterator[bytes], BinaryIO], max_retries: int = 3 + ) -> str: + """Write data to file.""" + if not isinstance(data, bytes): + if hasattr(data, "read"): + data = data.read() + else: + data = b"".join(data) + + result = self._lib.lib.AGFS_Write( + self._client_id, path.encode("utf-8"), data, ctypes.c_int64(len(data)) + ) + resp = self._parse_response(result) + return resp.get("message", "OK") + + def create(self, path: str) -> Dict[str, Any]: + """Create a new file.""" + result = self._lib.lib.AGFS_Create(self._client_id, path.encode("utf-8")) + return self._parse_response(result) + + def mkdir(self, path: str, mode: str = "755") -> Dict[str, Any]: + """Create a directory.""" + mode_int = int(mode, 8) + result = self._lib.lib.AGFS_Mkdir( + self._client_id, path.encode("utf-8"), ctypes.c_uint(mode_int) + ) + return self._parse_response(result) + + def rm(self, path: str, recursive: bool = False) -> Dict[str, Any]: + """Remove a file or directory.""" + result = self._lib.lib.AGFS_Rm(self._client_id, path.encode("utf-8"), 1 if recursive else 0) + return self._parse_response(result) + + def stat(self, path: str) -> Dict[str, Any]: + """Get file/directory information.""" + result = self._lib.lib.AGFS_Stat(self._client_id, path.encode("utf-8")) + return self._parse_response(result) + + def mv(self, old_path: str, new_path: str) -> Dict[str, Any]: + """Rename/move a file or directory.""" + result = self._lib.lib.AGFS_Mv( + self._client_id, old_path.encode("utf-8"), new_path.encode("utf-8") + ) + return self._parse_response(result) + + def chmod(self, path: str, mode: int) -> Dict[str, Any]: + """Change file permissions.""" + result = self._lib.lib.AGFS_Chmod( + self._client_id, path.encode("utf-8"), ctypes.c_uint(mode) + ) + return self._parse_response(result) + + def touch(self, path: str) -> Dict[str, Any]: + """Touch a file.""" + result = self._lib.lib.AGFS_Touch(self._client_id, path.encode("utf-8")) + return self._parse_response(result) + + def mounts(self) -> List[Dict[str, Any]]: + """List all mounted plugins.""" + result = self._lib.lib.AGFS_Mounts(self._client_id) + data = self._parse_response(result) + return data.get("mounts", []) + + def mount(self, fstype: str, path: str, config: Dict[str, Any]) -> Dict[str, Any]: + """Mount a plugin dynamically.""" + config_json = json.dumps(config) + result = self._lib.lib.AGFS_Mount( + self._client_id, + fstype.encode("utf-8"), + path.encode("utf-8"), + config_json.encode("utf-8"), + ) + return self._parse_response(result) + + def unmount(self, path: str) -> Dict[str, Any]: + """Unmount a plugin.""" + result = self._lib.lib.AGFS_Unmount(self._client_id, path.encode("utf-8")) + return self._parse_response(result) + + def load_plugin(self, library_path: str) -> Dict[str, Any]: + """Load an external plugin.""" + result = self._lib.lib.AGFS_LoadPlugin(self._client_id, library_path.encode("utf-8")) + return self._parse_response(result) + + def unload_plugin(self, library_path: str) -> Dict[str, Any]: + """Unload an external plugin.""" + result = self._lib.lib.AGFS_UnloadPlugin(self._client_id, library_path.encode("utf-8")) + return self._parse_response(result) + + def list_plugins(self) -> List[str]: + """List all loaded external plugins.""" + result = self._lib.lib.AGFS_ListPlugins(self._client_id) + data = self._parse_response(result) + return data.get("loaded_plugins", []) + + def get_plugins_info(self) -> List[dict]: + """Get detailed information about all loaded plugins.""" + return self.list_plugins() + + def grep( + self, + path: str, + pattern: str, + recursive: bool = False, + case_insensitive: bool = False, + stream: bool = False, + ): + """Search for a pattern in files.""" + raise AGFSNotSupportedError("Grep not supported in binding mode") + + def digest(self, path: str, algorithm: str = "xxh3") -> Dict[str, Any]: + """Calculate the digest of a file.""" + raise AGFSNotSupportedError("Digest not supported in binding mode") + + def open_handle( + self, path: str, flags: int = 0, mode: int = 0o644, lease: int = 60 + ) -> "FileHandle": + """Open a file handle for stateful operations.""" + handle_id = self._lib.lib.AGFS_OpenHandle( + self._client_id, path.encode("utf-8"), flags, ctypes.c_uint(mode), lease + ) + + if handle_id < 0: + raise AGFSClientError("Failed to open handle") + + return FileHandle(self, handle_id, path, flags) + + def list_handles(self) -> List[Dict[str, Any]]: + """List all active file handles.""" + result = self._lib.lib.AGFS_ListHandles(self._client_id) + data = self._parse_response(result) + return data.get("handles", []) + + def get_handle_info(self, handle_id: int) -> Dict[str, Any]: + """Get information about a specific handle.""" + result = self._lib.lib.AGFS_GetHandleInfo(ctypes.c_int64(handle_id)) + return self._parse_response(result) + + def close_handle(self, handle_id: int) -> Dict[str, Any]: + """Close a file handle.""" + result = self._lib.lib.AGFS_CloseHandle(ctypes.c_int64(handle_id)) + return self._parse_response(result) + + def handle_read(self, handle_id: int, size: int = -1, offset: Optional[int] = None) -> bytes: + """Read from a file handle.""" + has_offset = 1 if offset is not None else 0 + offset_val = offset if offset is not None else 0 + + result = self._lib.lib.AGFS_HandleRead( + ctypes.c_int64(handle_id), ctypes.c_int64(size), ctypes.c_int64(offset_val), has_offset + ) + + if isinstance(result, bytes): + return result + + data = json.loads(result.decode("utf-8") if isinstance(result, bytes) else result) + if "error_id" in data and data["error_id"] != 0: + error_msg = self._lib.lib.AGFS_GetLastError(data["error_id"]) + if isinstance(error_msg, bytes): + error_msg = error_msg.decode("utf-8") + raise AGFSClientError(error_msg if error_msg else "Unknown error") + + return result if isinstance(result, bytes) else result.encode("utf-8") + + def handle_write(self, handle_id: int, data: bytes, offset: Optional[int] = None) -> int: + """Write to a file handle.""" + has_offset = 1 if offset is not None else 0 + offset_val = offset if offset is not None else 0 + + result = self._lib.lib.AGFS_HandleWrite( + ctypes.c_int64(handle_id), + data, + ctypes.c_int64(len(data)), + ctypes.c_int64(offset_val), + has_offset, + ) + resp = self._parse_response(result) + return resp.get("bytes_written", 0) + + def handle_seek(self, handle_id: int, offset: int, whence: int = 0) -> int: + """Seek within a file handle.""" + result = self._lib.lib.AGFS_HandleSeek( + ctypes.c_int64(handle_id), ctypes.c_int64(offset), whence + ) + data = self._parse_response(result) + return data.get("position", 0) + + def handle_sync(self, handle_id: int) -> Dict[str, Any]: + """Sync a file handle.""" + result = self._lib.lib.AGFS_HandleSync(ctypes.c_int64(handle_id)) + return self._parse_response(result) + + def handle_stat(self, handle_id: int) -> Dict[str, Any]: + """Get file info via handle.""" + result = self._lib.lib.AGFS_HandleStat(ctypes.c_int64(handle_id)) + return self._parse_response(result) + + def renew_handle(self, handle_id: int, lease: int = 60) -> Dict[str, Any]: + """Renew the lease on a file handle.""" + return {"message": "lease renewed", "lease": lease} + + +class FileHandle: + """A file handle for stateful file operations. + + Supports context manager protocol for automatic cleanup. + """ + + O_RDONLY = 0 + O_WRONLY = 1 + O_RDWR = 2 + O_APPEND = 8 + O_CREATE = 16 + O_EXCL = 32 + O_TRUNC = 64 + + SEEK_SET = 0 + SEEK_CUR = 1 + SEEK_END = 2 + + def __init__(self, client: AGFSBindingClient, handle_id: int, path: str, flags: int): + self._client = client + self._handle_id = handle_id + self._path = path + self._flags = flags + self._closed = False + + @property + def handle_id(self) -> int: + """The handle ID.""" + return self._handle_id + + @property + def path(self) -> str: + """The file path.""" + return self._path + + @property + def flags(self) -> int: + """The open flags (numeric).""" + return self._flags + + @property + def closed(self) -> bool: + """Whether the handle is closed.""" + return self._closed + + def read(self, size: int = -1) -> bytes: + """Read from current position.""" + if self._closed: + raise AGFSClientError("Handle is closed") + return self._client.handle_read(self._handle_id, size) + + def read_at(self, size: int, offset: int) -> bytes: + """Read at specific offset (pread).""" + if self._closed: + raise AGFSClientError("Handle is closed") + return self._client.handle_read(self._handle_id, size, offset) + + def write(self, data: bytes) -> int: + """Write at current position.""" + if self._closed: + raise AGFSClientError("Handle is closed") + return self._client.handle_write(self._handle_id, data) + + def write_at(self, data: bytes, offset: int) -> int: + """Write at specific offset (pwrite).""" + if self._closed: + raise AGFSClientError("Handle is closed") + return self._client.handle_write(self._handle_id, data, offset) + + def seek(self, offset: int, whence: int = 0) -> int: + """Seek to position.""" + if self._closed: + raise AGFSClientError("Handle is closed") + return self._client.handle_seek(self._handle_id, offset, whence) + + def tell(self) -> int: + """Get current position.""" + return self.seek(0, self.SEEK_CUR) + + def sync(self) -> None: + """Flush data to storage.""" + if self._closed: + raise AGFSClientError("Handle is closed") + self._client.handle_sync(self._handle_id) + + def stat(self) -> Dict[str, Any]: + """Get file info.""" + if self._closed: + raise AGFSClientError("Handle is closed") + return self._client.handle_stat(self._handle_id) + + def info(self) -> Dict[str, Any]: + """Get handle info.""" + if self._closed: + raise AGFSClientError("Handle is closed") + return self._client.get_handle_info(self._handle_id) + + def renew(self, lease: int = 60) -> Dict[str, Any]: + """Renew the handle lease.""" + if self._closed: + raise AGFSClientError("Handle is closed") + return self._client.renew_handle(self._handle_id, lease) + + def close(self) -> None: + """Close the handle.""" + if not self._closed: + self._client.close_handle(self._handle_id) + self._closed = True + + def __enter__(self) -> "FileHandle": + return self + + def __exit__(self, exc_type, exc_val, exc_tb) -> None: + self.close() + + def __repr__(self) -> str: + status = "closed" if self._closed else "open" + return f"FileHandle(id={self._handle_id}, path={self._path}, flags={self._flags}, {status})" diff --git a/third_party/agfs/agfs-server/cmd/pybinding/Makefile b/third_party/agfs/agfs-server/cmd/pybinding/Makefile new file mode 100644 index 00000000..99f40592 --- /dev/null +++ b/third_party/agfs/agfs-server/cmd/pybinding/Makefile @@ -0,0 +1,47 @@ +.PHONY: all build-macos build-linux build-windows clean install test + +GO=go +BUILD_DIR=build +LIB_NAME=libagfsbinding +BIN_DIR=../../../bin + +all: build-macos + +build-macos: + @echo "Building shared library for macOS..." + @mkdir -p $(BUILD_DIR) + @mkdir -p $(BIN_DIR) + CGO_ENABLED=1 $(GO) build -buildmode=c-shared -o $(BUILD_DIR)/$(LIB_NAME).dylib . + @cp $(BUILD_DIR)/$(LIB_NAME).dylib $(BIN_DIR)/ + @echo "Build complete: $(BIN_DIR)/$(LIB_NAME).dylib" + +build-linux: + @echo "Building shared library for Linux..." + @mkdir -p $(BUILD_DIR) + @mkdir -p $(BIN_DIR) + CGO_ENABLED=1 $(GO) build -buildmode=c-shared -o $(BUILD_DIR)/$(LIB_NAME).so . + @cp $(BUILD_DIR)/$(LIB_NAME).so $(BIN_DIR)/ + @echo "Build complete: $(BIN_DIR)/$(LIB_NAME).so" + +build-windows: + @echo "Building shared library for Windows..." + @mkdir -p $(BUILD_DIR) + @mkdir -p $(BIN_DIR) + CC=x86_64-w64-mingw32-gcc CXX=x86_64-w64-mingw32-g++ AR=x86_64-w64-mingw32-ar GOOS=windows GOARCH=amd64 CGO_ENABLED=1 $(GO) build -buildmode=c-shared -o $(BUILD_DIR)/$(LIB_NAME).dll . + @cp $(BUILD_DIR)/$(LIB_NAME).dll $(BIN_DIR)/ + @echo "Build complete: $(BIN_DIR)/$(LIB_NAME).dll" + +clean: + @echo "Cleaning..." + @rm -rf $(BUILD_DIR) + @rm -f *.so *.dylib *.dll *.h + @rm -f $(BIN_DIR)/$(LIB_NAME).so $(BIN_DIR)/$(LIB_NAME).dylib $(BIN_DIR)/$(LIB_NAME).dll + @echo "Clean complete" + +install: build-macos + @echo "Installing..." + @cp $(BUILD_DIR)/$(LIB_NAME).dylib /usr/local/lib/ 2>/dev/null || true + @echo "Install complete" + +test: + $(GO) test -v ./... diff --git a/third_party/agfs/agfs-server/cmd/pybinding/main.go b/third_party/agfs/agfs-server/cmd/pybinding/main.go new file mode 100644 index 00000000..0f95cfa6 --- /dev/null +++ b/third_party/agfs/agfs-server/cmd/pybinding/main.go @@ -0,0 +1,673 @@ +package main + +/* +#include +#include +#include +*/ +import "C" + +import ( + "encoding/json" + "fmt" + "sync" + "time" + "unsafe" + + "github.com/c4pt0r/agfs/agfs-server/pkg/filesystem" + "github.com/c4pt0r/agfs/agfs-server/pkg/mountablefs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugin" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugin/api" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugin/loader" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/gptfs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/heartbeatfs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/hellofs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/httpfs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/kvfs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/localfs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/memfs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/queuefs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/s3fs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/serverinfofs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/sqlfs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/streamfs" + "github.com/c4pt0r/agfs/agfs-server/pkg/plugins/streamrotatefs" +) + +var ( + globalFS *mountablefs.MountableFS + globalFSMu sync.RWMutex + handleMap = make(map[int64]filesystem.FileHandle) + handleMapMu sync.RWMutex + handleIDGen int64 + errorBuffer = make(map[int64]string) + errorBufferMu sync.RWMutex + errorIDGen int64 +) + +func init() { + poolConfig := api.PoolConfig{ + MaxInstances: 10, + } + globalFS = mountablefs.NewMountableFS(poolConfig) + registerBuiltinPlugins() +} + +func registerBuiltinPlugins() { + registerFunc := func(name string, factory func() plugin.ServicePlugin) { + globalFS.RegisterPluginFactory(name, factory) + } + + registerFunc("serverinfofs", func() plugin.ServicePlugin { return serverinfofs.NewServerInfoFSPlugin() }) + registerFunc("memfs", func() plugin.ServicePlugin { return memfs.NewMemFSPlugin() }) + registerFunc("queuefs", func() plugin.ServicePlugin { return queuefs.NewQueueFSPlugin() }) + registerFunc("kvfs", func() plugin.ServicePlugin { return kvfs.NewKVFSPlugin() }) + registerFunc("hellofs", func() plugin.ServicePlugin { return hellofs.NewHelloFSPlugin() }) + registerFunc("heartbeatfs", func() plugin.ServicePlugin { return heartbeatfs.NewHeartbeatFSPlugin() }) + registerFunc("httpfs", func() plugin.ServicePlugin { return httpfs.NewHTTPFSPlugin() }) + registerFunc("s3fs", func() plugin.ServicePlugin { return s3fs.NewS3FSPlugin() }) + registerFunc("streamfs", func() plugin.ServicePlugin { return streamfs.NewStreamFSPlugin() }) + registerFunc("streamrotatefs", func() plugin.ServicePlugin { return streamrotatefs.NewStreamRotateFSPlugin() }) + registerFunc("sqlfs", func() plugin.ServicePlugin { return sqlfs.NewSQLFSPlugin() }) + registerFunc("localfs", func() plugin.ServicePlugin { return localfs.NewLocalFSPlugin() }) + registerFunc("gptfs", func() plugin.ServicePlugin { return gptfs.NewGptfs() }) +} + +func storeError(err error) int64 { + if err == nil { + return 0 + } + errorBufferMu.Lock() + errorIDGen++ + id := errorIDGen + errorBuffer[id] = err.Error() + errorBufferMu.Unlock() + return id +} + +func getAndClearError(id int64) string { + if id == 0 { + return "" + } + errorBufferMu.Lock() + msg := errorBuffer[id] + delete(errorBuffer, id) + errorBufferMu.Unlock() + return msg +} + +func storeHandle(handle filesystem.FileHandle) int64 { + handleMapMu.Lock() + handleIDGen++ + id := handleIDGen + handleMap[id] = handle + handleMapMu.Unlock() + return id +} + +func getHandle(id int64) filesystem.FileHandle { + handleMapMu.RLock() + handle := handleMap[id] + handleMapMu.RUnlock() + return handle +} + +func removeHandle(id int64) { + handleMapMu.Lock() + delete(handleMap, id) + handleMapMu.Unlock() +} + +//export AGFS_NewClient +func AGFS_NewClient() int64 { + return 1 +} + +//export AGFS_FreeClient +func AGFS_FreeClient(clientID int64) { +} + +//export AGFS_GetLastError +func AGFS_GetLastError(errorID int64) *C.char { + msg := getAndClearError(errorID) + return C.CString(msg) +} + +//export AGFS_FreeString +func AGFS_FreeString(s *C.char) { + C.free(unsafe.Pointer(s)) +} + +//export AGFS_Health +func AGFS_Health(clientID int64) C.int { + return C.int(1) +} + +//export AGFS_GetCapabilities +func AGFS_GetCapabilities(clientID int64) *C.char { + caps := map[string]interface{}{ + "version": "binding", + "features": []string{"handlefs", "grep", "digest", "stream", "touch"}, + } + data, _ := json.Marshal(caps) + return C.CString(string(data)) +} + +//export AGFS_Ls +func AGFS_Ls(clientID int64, path *C.char) *C.char { + p := C.GoString(path) + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + files, err := fs.ReadDir(p) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + result := make([]map[string]interface{}, len(files)) + for i, f := range files { + result[i] = map[string]interface{}{ + "name": f.Name, + "size": f.Size, + "mode": f.Mode, + "modTime": f.ModTime.Format(time.RFC3339Nano), + "isDir": f.IsDir, + } + } + + data, _ := json.Marshal(map[string]interface{}{"files": result}) + return C.CString(string(data)) +} + +//export AGFS_Read +func AGFS_Read(clientID int64, path *C.char, offset C.int64_t, size C.int64_t, outData **C.char, outSize *C.int64_t) C.int64_t { + p := C.GoString(path) + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + data, err := fs.Read(p, int64(offset), int64(size)) + if err != nil && err.Error() != "EOF" { + errorID := storeError(err) + return C.int64_t(errorID) + } + + if len(data) > 0 { + buf := C.malloc(C.size_t(len(data))) + C.memcpy(buf, unsafe.Pointer(&data[0]), C.size_t(len(data))) + *outData = (*C.char)(buf) + *outSize = C.int64_t(len(data)) + } else { + *outData = nil + *outSize = 0 + } + return 0 +} + +//export AGFS_Write +func AGFS_Write(clientID int64, path *C.char, data unsafe.Pointer, dataSize C.int64_t) *C.char { + p := C.GoString(path) + bytesData := C.GoBytes(data, C.int(dataSize)) + + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + n, err := fs.Write(p, bytesData, -1, filesystem.WriteFlagCreate|filesystem.WriteFlagTruncate) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(fmt.Sprintf(`{"message": "Written %d bytes"}`, n)) +} + +//export AGFS_Create +func AGFS_Create(clientID int64, path *C.char) *C.char { + p := C.GoString(path) + + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + err := fs.Create(p) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(`{"message": "file created"}`) +} + +//export AGFS_Mkdir +func AGFS_Mkdir(clientID int64, path *C.char, mode C.uint) *C.char { + p := C.GoString(path) + + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + err := fs.Mkdir(p, uint32(mode)) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(`{"message": "directory created"}`) +} + +//export AGFS_Rm +func AGFS_Rm(clientID int64, path *C.char, recursive C.int) *C.char { + p := C.GoString(path) + + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + var err error + if recursive != 0 { + err = fs.RemoveAll(p) + } else { + err = fs.Remove(p) + } + + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(`{"message": "deleted"}`) +} + +//export AGFS_Stat +func AGFS_Stat(clientID int64, path *C.char) *C.char { + p := C.GoString(path) + + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + info, err := fs.Stat(p) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + result := map[string]interface{}{ + "name": info.Name, + "size": info.Size, + "mode": info.Mode, + "modTime": info.ModTime.Format(time.RFC3339Nano), + "isDir": info.IsDir, + } + + data, _ := json.Marshal(result) + return C.CString(string(data)) +} + +//export AGFS_Mv +func AGFS_Mv(clientID int64, oldPath *C.char, newPath *C.char) *C.char { + oldP := C.GoString(oldPath) + newP := C.GoString(newPath) + + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + err := fs.Rename(oldP, newP) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(`{"message": "renamed"}`) +} + +//export AGFS_Chmod +func AGFS_Chmod(clientID int64, path *C.char, mode C.uint) *C.char { + p := C.GoString(path) + + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + err := fs.Chmod(p, uint32(mode)) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(`{"message": "permissions changed"}`) +} + +//export AGFS_Touch +func AGFS_Touch(clientID int64, path *C.char) *C.char { + p := C.GoString(path) + + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + err := fs.Touch(p) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(`{"message": "touched"}`) +} + +//export AGFS_Mounts +func AGFS_Mounts(clientID int64) *C.char { + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + mounts := fs.GetMounts() + result := make([]map[string]interface{}, len(mounts)) + for i, m := range mounts { + result[i] = map[string]interface{}{ + "path": m.Path, + "fstype": m.Plugin.Name(), + } + } + + data, _ := json.Marshal(map[string]interface{}{"mounts": result}) + return C.CString(string(data)) +} + +//export AGFS_Mount +func AGFS_Mount(clientID int64, fstype *C.char, path *C.char, configJSON *C.char) *C.char { + fsType := C.GoString(fstype) + p := C.GoString(path) + cfgJSON := C.GoString(configJSON) + + var config map[string]interface{} + if err := json.Unmarshal([]byte(cfgJSON), &config); err != nil { + config = make(map[string]interface{}) + } + + globalFSMu.Lock() + fs := globalFS + globalFSMu.Unlock() + + err := fs.MountPlugin(fsType, p, config) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(fmt.Sprintf(`{"message": "mounted %s at %s"}`, fsType, p)) +} + +//export AGFS_Unmount +func AGFS_Unmount(clientID int64, path *C.char) *C.char { + p := C.GoString(path) + + globalFSMu.Lock() + fs := globalFS + globalFSMu.Unlock() + + err := fs.Unmount(p) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(`{"message": "unmounted"}`) +} + +//export AGFS_LoadPlugin +func AGFS_LoadPlugin(clientID int64, libraryPath *C.char) *C.char { + libPath := C.GoString(libraryPath) + + globalFSMu.Lock() + fs := globalFS + globalFSMu.Unlock() + + p, err := fs.LoadExternalPlugin(libPath) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(fmt.Sprintf(`{"message": "loaded plugin %s", "name": "%s"}`, libPath, p.Name())) +} + +//export AGFS_UnloadPlugin +func AGFS_UnloadPlugin(clientID int64, libraryPath *C.char) *C.char { + libPath := C.GoString(libraryPath) + + globalFSMu.Lock() + fs := globalFS + globalFSMu.Unlock() + + err := fs.UnloadExternalPlugin(libPath) + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(`{"message": "unloaded plugin"}`) +} + +//export AGFS_ListPlugins +func AGFS_ListPlugins(clientID int64) *C.char { + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + plugins := fs.GetLoadedExternalPlugins() + data, _ := json.Marshal(map[string]interface{}{"loaded_plugins": plugins}) + return C.CString(string(data)) +} + +//export AGFS_OpenHandle +func AGFS_OpenHandle(clientID int64, path *C.char, flags C.int, mode C.uint, lease C.int) C.int64_t { + p := C.GoString(path) + + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + handle, err := fs.OpenHandle(p, filesystem.OpenFlag(flags), uint32(mode)) + if err != nil { + storeError(err) + return -1 + } + + id := storeHandle(handle) + return C.int64_t(id) +} + +//export AGFS_CloseHandle +func AGFS_CloseHandle(handleID C.int64_t) *C.char { + id := int64(handleID) + handle := getHandle(id) + if handle == nil { + return C.CString(`{"error_id": 0}`) + } + + err := handle.Close() + removeHandle(id) + + if err != nil { + errorID := storeError(err) + return C.CString(fmt.Sprintf(`{"error_id": %d}`, errorID)) + } + + return C.CString(`{"message": "handle closed"}`) +} + +//export AGFS_HandleRead +func AGFS_HandleRead(handleID C.int64_t, size C.int64_t, offset C.int64_t, hasOffset C.int) (*C.char, C.int64_t, C.int64_t) { + id := int64(handleID) + handle := getHandle(id) + if handle == nil { + errJSON := fmt.Sprintf(`{"error_id": %d}`, storeError(fmt.Errorf("handle not found"))) + return C.CString(errJSON), 0, -1 + } + + buf := make([]byte, int(size)) + var n int + var err error + + if hasOffset != 0 { + n, err = handle.ReadAt(buf, int64(offset)) + } else { + n, err = handle.Read(buf) + } + + if err != nil && err.Error() != "EOF" { + errJSON := fmt.Sprintf(`{"error_id": %d}`, storeError(err)) + return C.CString(errJSON), 0, -1 + } + + return C.CString(string(buf[:n])), C.int64_t(n), 0 +} + +//export AGFS_HandleWrite +func AGFS_HandleWrite(handleID C.int64_t, data unsafe.Pointer, dataSize C.int64_t, offset C.int64_t, hasOffset C.int) *C.char { + id := int64(handleID) + handle := getHandle(id) + if handle == nil { + return C.CString(fmt.Sprintf(`{"error_id": %d}`, storeError(fmt.Errorf("handle not found")))) + } + + bytesData := C.GoBytes(data, C.int(dataSize)) + var n int + var err error + + if hasOffset != 0 { + n, err = handle.WriteAt(bytesData, int64(offset)) + } else { + n, err = handle.Write(bytesData) + } + + if err != nil { + return C.CString(fmt.Sprintf(`{"error_id": %d}`, storeError(err))) + } + + return C.CString(fmt.Sprintf(`{"bytes_written": %d}`, n)) +} + +//export AGFS_HandleSeek +func AGFS_HandleSeek(handleID C.int64_t, offset C.int64_t, whence C.int) *C.char { + id := int64(handleID) + handle := getHandle(id) + if handle == nil { + return C.CString(fmt.Sprintf(`{"error_id": %d}`, storeError(fmt.Errorf("handle not found")))) + } + + newPos, err := handle.Seek(int64(offset), int(whence)) + if err != nil { + return C.CString(fmt.Sprintf(`{"error_id": %d}`, storeError(err))) + } + + return C.CString(fmt.Sprintf(`{"position": %d}`, newPos)) +} + +//export AGFS_HandleSync +func AGFS_HandleSync(handleID C.int64_t) *C.char { + id := int64(handleID) + handle := getHandle(id) + if handle == nil { + return C.CString(fmt.Sprintf(`{"error_id": %d}`, storeError(fmt.Errorf("handle not found")))) + } + + err := handle.Sync() + if err != nil { + return C.CString(fmt.Sprintf(`{"error_id": %d}`, storeError(err))) + } + + return C.CString(`{"message": "synced"}`) +} + +//export AGFS_HandleStat +func AGFS_HandleStat(handleID C.int64_t) *C.char { + id := int64(handleID) + handle := getHandle(id) + if handle == nil { + return C.CString(fmt.Sprintf(`{"error_id": %d}`, storeError(fmt.Errorf("handle not found")))) + } + + info, err := handle.Stat() + if err != nil { + return C.CString(fmt.Sprintf(`{"error_id": %d}`, storeError(err))) + } + + result := map[string]interface{}{ + "name": info.Name, + "size": info.Size, + "mode": info.Mode, + "modTime": info.ModTime.Format(time.RFC3339Nano), + "isDir": info.IsDir, + } + + data, _ := json.Marshal(result) + return C.CString(string(data)) +} + +//export AGFS_ListHandles +func AGFS_ListHandles(clientID int64) *C.char { + handleMapMu.RLock() + handles := make([]map[string]interface{}, 0, len(handleMap)) + for id, h := range handleMap { + handles = append(handles, map[string]interface{}{ + "handle_id": id, + "path": h.Path(), + }) + } + handleMapMu.RUnlock() + + data, _ := json.Marshal(map[string]interface{}{"handles": handles}) + return C.CString(string(data)) +} + +//export AGFS_GetHandleInfo +func AGFS_GetHandleInfo(handleID C.int64_t) *C.char { + id := int64(handleID) + handle := getHandle(id) + if handle == nil { + return C.CString(fmt.Sprintf(`{"error_id": %d}`, storeError(fmt.Errorf("handle not found")))) + } + + result := map[string]interface{}{ + "handle_id": id, + "path": handle.Path(), + "flags": int(handle.Flags()), + } + + data, _ := json.Marshal(result) + return C.CString(string(data)) +} + +//export AGFS_GetPluginLoader +func AGFS_GetPluginLoader() unsafe.Pointer { + globalFSMu.RLock() + fs := globalFS + globalFSMu.RUnlock() + + l := fs.GetPluginLoader() + return unsafe.Pointer(l) +} + +func GetMountableFS() *mountablefs.MountableFS { + globalFSMu.RLock() + defer globalFSMu.RUnlock() + return globalFS +} + +func SetMountableFS(fs *mountablefs.MountableFS) { + globalFSMu.Lock() + globalFS = fs + globalFSMu.Unlock() +} + +func GetPluginLoaderInternal() *loader.PluginLoader { + return globalFS.GetPluginLoader() +} + +func main() {} diff --git a/third_party/agfs/bin/libagfsbinding.dll b/third_party/agfs/bin/libagfsbinding.dll new file mode 100644 index 00000000..68b4f724 Binary files /dev/null and b/third_party/agfs/bin/libagfsbinding.dll differ diff --git a/third_party/agfs/bin/libagfsbinding.dylib b/third_party/agfs/bin/libagfsbinding.dylib new file mode 100644 index 00000000..df134494 Binary files /dev/null and b/third_party/agfs/bin/libagfsbinding.dylib differ diff --git a/third_party/agfs/bin/libagfsbinding.h b/third_party/agfs/bin/libagfsbinding.h new file mode 100644 index 00000000..51f5e228 --- /dev/null +++ b/third_party/agfs/bin/libagfsbinding.h @@ -0,0 +1,134 @@ +/* Code generated by cmd/cgo; DO NOT EDIT. */ + +/* package github.com/c4pt0r/agfs/agfs-server/cmd/pybinding */ + + +#line 1 "cgo-builtin-export-prolog" + +#include + +#ifndef GO_CGO_EXPORT_PROLOGUE_H +#define GO_CGO_EXPORT_PROLOGUE_H + +#ifndef GO_CGO_GOSTRING_TYPEDEF +typedef struct { const char *p; ptrdiff_t n; } _GoString_; +extern size_t _GoStringLen(_GoString_ s); +extern const char *_GoStringPtr(_GoString_ s); +#endif + +#endif + +/* Start of preamble from import "C" comments. */ + + +#line 3 "main.go" + +#include +#include +#include + +#line 1 "cgo-generated-wrapper" + + +/* End of preamble from import "C" comments. */ + + +/* Start of boilerplate cgo prologue. */ +#line 1 "cgo-gcc-export-header-prolog" + +#ifndef GO_CGO_PROLOGUE_H +#define GO_CGO_PROLOGUE_H + +typedef signed char GoInt8; +typedef unsigned char GoUint8; +typedef short GoInt16; +typedef unsigned short GoUint16; +typedef int GoInt32; +typedef unsigned int GoUint32; +typedef long long GoInt64; +typedef unsigned long long GoUint64; +typedef GoInt64 GoInt; +typedef GoUint64 GoUint; +typedef size_t GoUintptr; +typedef float GoFloat32; +typedef double GoFloat64; +#ifdef _MSC_VER +#if !defined(__cplusplus) || _MSVC_LANG <= 201402L +#include +typedef _Fcomplex GoComplex64; +typedef _Dcomplex GoComplex128; +#else +#include +typedef std::complex GoComplex64; +typedef std::complex GoComplex128; +#endif +#else +typedef float _Complex GoComplex64; +typedef double _Complex GoComplex128; +#endif + +/* + static assertion to make sure the file is being used on architecture + at least with matching size of GoInt. +*/ +typedef char _check_for_64_bit_pointer_matching_GoInt[sizeof(void*)==64/8 ? 1:-1]; + +#ifndef GO_CGO_GOSTRING_TYPEDEF +typedef _GoString_ GoString; +#endif +typedef void *GoMap; +typedef void *GoChan; +typedef struct { void *t; void *v; } GoInterface; +typedef struct { void *data; GoInt len; GoInt cap; } GoSlice; + +#endif + +/* End of boilerplate cgo prologue. */ + +#ifdef __cplusplus +extern "C" { +#endif + +extern GoInt64 AGFS_NewClient(void); +extern void AGFS_FreeClient(GoInt64 clientID); +extern char* AGFS_GetLastError(GoInt64 errorID); +extern void AGFS_FreeString(char* s); +extern int AGFS_Health(GoInt64 clientID); +extern char* AGFS_GetCapabilities(GoInt64 clientID); +extern char* AGFS_Ls(GoInt64 clientID, char* path); +extern int64_t AGFS_Read(GoInt64 clientID, char* path, int64_t offset, int64_t size, char** outData, int64_t* outSize); +extern char* AGFS_Write(GoInt64 clientID, char* path, void* data, int64_t dataSize); +extern char* AGFS_Create(GoInt64 clientID, char* path); +extern char* AGFS_Mkdir(GoInt64 clientID, char* path, unsigned int mode); +extern char* AGFS_Rm(GoInt64 clientID, char* path, int recursive); +extern char* AGFS_Stat(GoInt64 clientID, char* path); +extern char* AGFS_Mv(GoInt64 clientID, char* oldPath, char* newPath); +extern char* AGFS_Chmod(GoInt64 clientID, char* path, unsigned int mode); +extern char* AGFS_Touch(GoInt64 clientID, char* path); +extern char* AGFS_Mounts(GoInt64 clientID); +extern char* AGFS_Mount(GoInt64 clientID, char* fstype, char* path, char* configJSON); +extern char* AGFS_Unmount(GoInt64 clientID, char* path); +extern char* AGFS_LoadPlugin(GoInt64 clientID, char* libraryPath); +extern char* AGFS_UnloadPlugin(GoInt64 clientID, char* libraryPath); +extern char* AGFS_ListPlugins(GoInt64 clientID); +extern int64_t AGFS_OpenHandle(GoInt64 clientID, char* path, int flags, unsigned int mode, int lease); +extern char* AGFS_CloseHandle(int64_t handleID); + +/* Return type for AGFS_HandleRead */ +struct AGFS_HandleRead_return { + char* r0; + int64_t r1; + int64_t r2; +}; +extern struct AGFS_HandleRead_return AGFS_HandleRead(int64_t handleID, int64_t size, int64_t offset, int hasOffset); +extern char* AGFS_HandleWrite(int64_t handleID, void* data, int64_t dataSize, int64_t offset, int hasOffset); +extern char* AGFS_HandleSeek(int64_t handleID, int64_t offset, int whence); +extern char* AGFS_HandleSync(int64_t handleID); +extern char* AGFS_HandleStat(int64_t handleID); +extern char* AGFS_ListHandles(GoInt64 clientID); +extern char* AGFS_GetHandleInfo(int64_t handleID); +extern void* AGFS_GetPluginLoader(void); + +#ifdef __cplusplus +} +#endif diff --git a/third_party/agfs/bin/libagfsbinding.so b/third_party/agfs/bin/libagfsbinding.so new file mode 100644 index 00000000..85a64bc7 Binary files /dev/null and b/third_party/agfs/bin/libagfsbinding.so differ