Skip to content

feat(tasks): add async task tracking API for background operations#476

Merged
MaojiaSheng merged 2 commits intovolcengine:mainfrom
dr3243636-ops:feat/async-task-tracking
Mar 10, 2026
Merged

feat(tasks): add async task tracking API for background operations#476
MaojiaSheng merged 2 commits intovolcengine:mainfrom
dr3243636-ops:feat/async-task-tracking

Conversation

@dr3243636-ops
Copy link
Contributor

@dr3243636-ops dr3243636-ops commented Mar 8, 2026

Summary

  • Add lightweight in-memory TaskTracker for tracking async background operations (session commit with wait=false)
  • New API endpoints: GET /api/v1/tasks/{task_id} (single task) and GET /api/v1/tasks (list with filters)
  • POST /sessions/{id}/commit?wait=false now returns a task_id that callers can poll to check completion/failure
  • Duplicate background commits for the same session are atomically rejected (409 CONFLICT)
  • Error messages are sanitized (keys/tokens redacted) before exposure via API
  • TTL-based cleanup: completed tasks expire after 24h, failed after 7d, hard cap at 10k tasks

Motivation

Follow-up to #472. When wait=false, background commit failures were silently lost — callers had no way to know if memory extraction succeeded. This PR closes that gap by providing a polling API.

As discussed with @r266-tech on #472 — this addresses the status tracking concern raised there. Would appreciate your review!

Design

  • Pull model (caller polls) rather than push (webhook) — appropriate for OV's local deployment model. Webhook/callback support could be added as an optional v2 enhancement
  • In-memory only (v1) — no persistence across restarts, keeps implementation minimal. GET /tasks/{id} returns 404 after restart (expected behavior for ephemeral async tasks)
  • Generic TaskTracker — designed to support future task types (resource ingest, skill ingest) in Phase 2
  • Thread-safethreading.Lock for compatibility with QueueManager's thread-based workers

API

# Commit returns task_id
POST /api/v1/sessions/{id}/commit?wait=false
→ {"status":"ok","result":{"session_id":"...","status":"accepted","task_id":"uuid"}}

# Poll task status
GET /api/v1/tasks/{task_id}
→ {"status":"ok","result":{"task_id":"...","status":"completed|failed","result":{...},"error":"..."}}

# List tasks with filters
GET /api/v1/tasks?status=failed&task_type=session_commit&limit=50
→ {"status":"ok","result":[...]}

Files changed (4 modified + 4 new)

File Change
openviking/service/task_tracker.py New — Core TaskTracker singleton + TaskRecord + TTL cleanup
openviking/server/routers/tasks.py New — API router for task queries
tests/test_task_tracker.py New — 26 unit tests
tests/test_session_task_tracking.py New — 9 integration tests
openviking/server/routers/sessions.py Modified — integrate task tracking into commit endpoint
openviking/server/routers/__init__.py Modified — register tasks_router
openviking/server/app.py Modified — start/stop cleanup loop in lifespan
tests/conftest.py Modified — AGFS_Grep compatibility workaround

Test plan

  • 26 unit tests for TaskTracker (lifecycle, filters, TTL, eviction, sanitization, singleton)
  • 9 integration tests for session commit + task tracking (httpx ASGITransport)
  • 4 existing async commit tests pass (backward compatibility)
  • Manual end-to-end verification on local deployment (7 scenarios including failure handling)

Future work

  • Phase 2: Extend TaskTracker to resource ingest and skill ingest operations
  • Optional webhook: Support callback_url parameter for push-based notification

🤖 Generated with Claude Code

Follow-up to volcengine#472. When `wait=false`, background commit failures were
silently lost — callers had no way to know if memory extraction succeeded.

This adds a lightweight in-memory TaskTracker that returns a `task_id`
on async commit, which callers can poll via new `/tasks` endpoints to
check completion status, results, or errors.

Key changes:
- New TaskTracker singleton with TTL-based cleanup (24h completed, 7d failed)
- New API: GET /api/v1/tasks/{task_id} and GET /api/v1/tasks (with filters)
- Atomic duplicate commit detection (eliminates race condition)
- Error message sanitization (keys/tokens redacted)
- Defensive copies on all public reads (thread safety)
- 35 tests (26 unit + 9 integration), all existing tests pass

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
@dr3243636-ops
Copy link
Contributor Author

CI Status: lint ✅ passed, test-lite ❌ failed.

The test-lite failure is not caused by this PR — it's a pre-existing environment issue:

The package `vikingbot` depends on the package `openviking` but the name is shadowed by your project.

This same error occurs on other PR branches (add_console, fix/multi-worker-server) and appears to be a uv dependency resolution issue in the CI workflow. The main branch CI passes because it runs a different workflow (02. Main Branch Checks) that doesn't hit this path.

All tests pass locally (39 tests including 26 unit + 9 integration + 4 regression).

@MaojiaSheng
Copy link
Collaborator

/review

@github-actions
Copy link

github-actions bot commented Mar 9, 2026

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🎫 Ticket compliance analysis ✅

472 - Fully compliant

Compliant requirements:

  • Add async commit support with wait parameter
  • When wait=false, commit runs in background and returns immediately
  • When wait=true (default), blocks until commit completes (backward compatible)
⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Wait=True Conflict Behavior

When wait=true, the endpoint now rejects requests if a background commit is already running. This changes previous behavior where wait=true would block and process the commit. Verify if this conflict rejection for wait=true is intended.

if wait:
    # Reject if same session already has a background commit running
    if tracker.has_running("session_commit", session_id):
        return Response(
            status="error",
            error=ErrorInfo(
                code="CONFLICT",
                message=f"Session {session_id} already has a commit in progress",
            ),
        )
Cleanup Loop Event Loop Binding

The cleanup loop is started with asyncio.create_task(), which binds to the current event loop. Verify that this is called in the correct event loop context (the FastAPI server's loop) and that it doesn't cause issues in multi-worker setups.

def start_cleanup_loop(self) -> None:
    """Start the background TTL cleanup coroutine.

    Safe to call multiple times; subsequent calls are no-ops.
    Must be called from within a running event loop.
    """
    if self._cleanup_task is not None and not self._cleanup_task.done():
        return
    self._cleanup_task = asyncio.create_task(self._cleanup_loop())
    logger.debug("[TaskTracker] Cleanup loop started")
AGFS Grep Patch Scope

The conftest.py adds a global patch for missing AGFS_Grep symbol. Verify that this patch is only applied during tests and doesn't affect production code, and that it's necessary for the test suite.

# ── Workaround: local .so may lack AGFS_Grep symbol (new in latest source) ──
def _patch_agfs_grep_if_missing():
    """Wrap _setup_functions to catch missing AGFS_Grep and skip its binding."""
    try:
        from openviking.pyagfs.binding_client import BindingLib

        _orig_setup = BindingLib._setup_functions

        def _safe_setup(self):
            try:
                _orig_setup(self)
            except AttributeError as e:
                if "AGFS_Grep" not in str(e):
                    raise
                # Re-implement _setup_functions but skip AGFS_Grep lines.
                # We do this by temporarily removing the Grep lines from the
                # source, but since we can't edit .so, we monkey-patch the lib
                # object's __getattr__ to not fail on AGFS_Grep.
                import ctypes

                class _GrepStub:
                    """Fake ctypes function descriptor for AGFS_Grep."""

                    argtypes = [
                        ctypes.c_int64,
                        ctypes.c_char_p,
                        ctypes.c_char_p,
                        ctypes.c_int,
                        ctypes.c_int,
                        ctypes.c_int,
                        ctypes.c_int,
                    ]
                    restype = ctypes.c_char_p

                    def __call__(self, *args):
                        return b'{"error":"AGFS_Grep not available in this .so version"}'

                # Patch at the CDLL instance level by overriding __getattr__
                orig_class = type(self.lib)
                orig_getattr = orig_class.__getattr__

                def patched_getattr(cdll_self, name):
                    if name == "AGFS_Grep":
                        return _GrepStub()
                    return orig_getattr(cdll_self, name)

                orig_class.__getattr__ = patched_getattr
                try:
                    _orig_setup(self)
                finally:
                    orig_class.__getattr__ = orig_getattr

        BindingLib._setup_functions = _safe_setup
    except Exception:
        pass


_patch_agfs_grep_if_missing()

@MaojiaSheng MaojiaSheng merged commit cd4bf44 into volcengine:main Mar 10, 2026
5 of 6 checks passed
@github-project-automation github-project-automation bot moved this from Backlog to Done in OpenViking project Mar 10, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

2 participants