From b06978f23cc2516b65283d6c944558d854efc20f Mon Sep 17 00:00:00 2001 From: "Doink (OpenClaw)" Date: Tue, 24 Feb 2026 12:27:36 -0800 Subject: [PATCH] feat: stateless daemon core (Phases 1-3) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements ADR 004: Stateless Daemon Core. ## Phase 1: Fix session accumulation bug - reconcileAndEnrich() detects sessions that disappear from adapter discover() results and marks them stopped + autoUnlock - 30-second grace period for recently-launched sessions to avoid false positives from adapter discovery latency ## Phase 2: Make session.list adapter-first - session.list handler now fans out discover() to all adapters in parallel with 5s per-adapter timeouts - Merges results and enriches with daemon launch metadata (prompt, group, spec, cwd) - session.status also fans out to adapters for fresh data - Graceful degradation: failed adapters are skipped, partial results returned. Sessions from failed adapters fall back to launch metadata. ## Phase 3: Clean up - Removed SessionTracker.poll(), reapStaleEntries(), validateAllSessions(), pruneDeadSessions(), pruneOldSessions(), listSessions(), activeCount(), startPolling(), stopPolling() - Removed 5-second polling interval and all background state reconciliation - Simplified StateManager usage to only persist launch metadata, locks, and fuses - Added lightweight 30s PID liveness check for lock cleanup (startLaunchCleanup) — much cheaper than full adapter fan-out - MetricsRegistry decoupled from SessionTracker; active session count updated on session.list calls - session.prune kept for backward compat but now just runs PID liveness cleanup ## Key design decisions - Adapters own session truth. Daemon owns what it launched. - session.list = fan out adapter.discover() → merge → return - No daemon-side session registry for listing - Handle adapter failures gracefully (partial results, not errors) Fixes #51 Co-Authored-By: Charlie Hulcher --- docs/decisions/004-stateless-daemon-core.md | 171 +++++++ src/daemon/metrics.test.ts | 11 +- src/daemon/metrics.ts | 16 +- src/daemon/server.ts | 227 +++++++-- src/daemon/session-tracker.test.ts | 510 ++++++++++---------- src/daemon/session-tracker.ts | 478 ++++++------------ 6 files changed, 805 insertions(+), 608 deletions(-) create mode 100644 docs/decisions/004-stateless-daemon-core.md diff --git a/docs/decisions/004-stateless-daemon-core.md b/docs/decisions/004-stateless-daemon-core.md new file mode 100644 index 0000000..63f3075 --- /dev/null +++ b/docs/decisions/004-stateless-daemon-core.md @@ -0,0 +1,171 @@ +# ADR 004: Stateless Daemon Core + +**Status:** Proposed +**Date:** 2026-02-24 +**Author:** Doink (OpenClaw) +**Related:** [#51](https://github.com/orgloop/agentctl/issues/51) + +## Problem + +The daemon maintains a `state.json` session registry that mirrors every session discovered by every adapter. Over 26 hours, this accumulated 394 "active" sessions because: + +1. **OpenClaw sessions have no PID** — the reaper relies on PID liveness, so remote sessions are never cleaned up +2. **The daemon is a session database** — `session.list` reads from `StateManager`, not from adapters +3. **Pruning is a band-aid** — `pruneDeadSessions()`, `pruneOldSessions()`, `validateAllSessions()`, and `reapStaleEntries()` are all compensating for the fundamental error of duplicating adapter state in the daemon + +The daemon acts as a **session database** when it should be a **stateless multiplexer**. + +## Current Architecture + +``` +CLI → daemon → StateManager (state.json) + ↑ + SessionTracker.poll() merges adapter.discover() into state.json +``` + +- `session.list` → reads `StateManager.getSessions()` (daemon cache) +- `session.status` → reads `SessionTracker.getSession()` (daemon cache) +- `session.peek` → delegates to adapter (correct!) +- `session.stop` → delegates to adapter (correct!) +- `session.launch` → delegates to adapter, then `track()` into state (partially correct) + +The `SessionTracker` polls every 5s, calling `discover()` on all adapters, merging results into `state.json`. Sessions enter state but exit only via PID death detection or periodic pruning. Remote sessions (OpenClaw) have no exit path. + +## Adapter Audit + +### Interface Contract (`AgentAdapter`) + +| Method | Purpose | Ground truth? | +|--------|---------|--------------| +| `discover()` | Find all sessions from adapter runtime | ✅ Yes — this IS the source of truth | +| `isAlive(id)` | Check if a specific session is alive | ✅ Yes | +| `list(opts)` | List sessions with filtering | Delegates to discover internally | +| `status(id)` | Get session details | Queries backend directly | +| `peek(id)` | Read session output | Queries backend directly | +| `launch(opts)` | Start a session | Creates in backend | +| `stop(id)` | Stop a session | Stops in backend | +| `resume(id)` | Resume a session | Resumes in backend | +| `events()` | Lifecycle event stream | Polls backend | + +The adapter interface is **already correct** — `discover()` returns ground truth from each backend. The problem is that the daemon doesn't use it that way. It copies discover results into its own state and serves from the copy. + +### Per-Adapter Maturity + +| Adapter | discover() source | Has PID? | Maturity | Notes | +|---------|------------------|----------|----------|-------| +| **claude-code** | `~/.claude` JSONL files + `ps` | ✅ Yes | High | Reads session index, cross-refs with running PIDs. Full lifecycle. | +| **openclaw** | Gateway RPC `sessions.list` | ❌ No | High | Gateway is source of truth. No local PID. **Primary victim of state accumulation.** | +| **pi** | `~/.pi` JSONL files + `ps` | ✅ Yes | High | Same pattern as claude-code, adapted for Pi sessions. | +| **pi-rust** | `~/.pi/agent/sessions` + `ps` | ✅ Yes | High | Same pattern, different session dir. | +| **opencode** | `~/.local/share/opencode/storage` + `ps` | ✅ Yes | High | Reads OpenCode JSON session files + message files. | +| **codex** | `~/.codex` JSONL files + `ps` | ✅ Yes | High | Same discover pattern as claude-code. | + +All adapters implement the full interface. All are mature. The gap isn't adapter quality — it's the daemon's misuse of adapter output. + +## Proposed Architecture: Stateless Core + +``` +CLI → daemon → fan-out to adapters → merge → return + ↓ + StateManager (minimal: launched PIDs, locks, fuses only) +``` + +### Principle: Adapters own session truth. Daemon owns what it launched. + +### `session.list` + +**Current:** Read from `StateManager` (stale cache). +**Proposed:** Fan out `adapter.discover()` to all adapters in parallel, merge, return. No daemon-side session registry needed for listing. + +```ts +case "session.list": { + const results = await Promise.allSettled( + Object.entries(ctx.adapters).map(([name, adapter]) => + adapter.discover().then(sessions => sessions.map(s => ({ ...s, adapter: name }))) + ) + ); + // Merge fulfilled results, skip failed adapters + let sessions = results + .filter(r => r.status === 'fulfilled') + .flatMap(r => r.value); + // Apply filters, enrich with daemon-only metadata (launch prompts, groups) + return sessions; +} +``` + +### `session.status` + +**Current:** Read from `SessionTracker.getSession()` (daemon cache). +**Proposed:** Route to the correct adapter's `status(id)`. Use daemon metadata to enrich (launch prompt, group tag). + +If the session ID prefix matches a known adapter (from launch metadata), route directly. Otherwise, fan out `isAlive(id)` to find the right adapter. + +### What the Daemon Still Needs to Track + +| State | Why | Scope | +|-------|-----|-------| +| **Launch metadata** | PIDs the daemon spawned, prompts, group tags, specs | Only sessions launched via `agentctl launch` | +| **Locks** | Local worktree locks | Adapter-scoped (only local adapters need locks) | +| **Fuses** | Timeout timers for sessions | Tied to directories, not sessions | + +This is ~10-20 records at any time, not 394. + +### What the Daemon Stops Tracking + +- **All discovered sessions** — adapters own this +- **Session status** — adapters own this +- **Historical sessions** — query adapters with `list({ all: true })` + +### SessionTracker Changes + +The `SessionTracker` class gets dramatically simplified: + +- **Remove:** `poll()`, `reapStaleEntries()`, `validateAllSessions()`, `pruneDeadSessions()`, `pruneOldSessions()`, `listSessions()` +- **Keep:** `track()` (for launch metadata only), `getSession()` (for daemon-launched enrichment), `onSessionExit()` (for lock/fuse cleanup) +- **Add:** `enrichWithLaunchMeta(discoveredSessions)` — merges daemon launch metadata (prompt, group, spec) into adapter-discovered sessions + +The 5-second polling loop goes away entirely. No more background reconciliation. + +## Locks + +### Current Model +Locks are directory-scoped in `StateManager`. `autoLock(cwd, sessionId)` on launch, `autoUnlock(sessionId)` on stop. + +### Proposed Model +Locks stay as-is but are **only meaningful for local adapters** (claude-code, pi, pi-rust, opencode, codex). OpenClaw sessions don't use local worktrees, so they never acquire locks. + +The lock model is actually fine. The bug (#51) where locks accumulated was because `reapStaleEntries()` didn't call `autoUnlock()` — that's a simple fix independent of this architecture change. + +**One improvement:** Lock cleanup should be tied to launch metadata, not session discovery. When a daemon-launched session's PID dies, release its lock. This is a simple PID liveness check on the small set of daemon-launched sessions, not a reconciliation of all discovered sessions. + +## Migration Path + +### Phase 1: Fix the immediate bug (quick) +- In `poll()`, track which adapter IDs were returned by `discover()`. Sessions in state whose adapter succeeded but whose ID wasn't returned → mark stopped + autoUnlock. +- This stops the accumulation without architectural change. + +### Phase 2: Make `session.list` adapter-first (medium) +- Change `session.list` handler to fan out `discover()` to adapters +- Enrich with daemon launch metadata (prompt, group, spec) +- Keep `StateManager.sessions` as a **write-through cache** for launch metadata only +- Remove the 5s polling loop + +### Phase 3: Clean up (small) +- Remove `SessionTracker.poll()`, `reapStaleEntries()`, all pruning code +- Simplify `StateManager` to only persist launch metadata, locks, fuses +- Remove `state.json` session entries (or reduce to launch-metadata-only) + +### Phase 4: Separate launch metadata from session state +- New `launches.json` (small, ~10 entries) replaces sessions in `state.json` +- `state.json` becomes locks + fuses only +- Clean separation of concerns + +## Risks + +1. **Fan-out latency**: 6 adapters × discover() could be slower than reading cache. Mitigation: parallel execution, timeouts per adapter, skip failed adapters gracefully. +2. **Adapter failures**: If OpenClaw gateway is down, `session.list` loses those sessions. Mitigation: return partial results with a warning, not an error. +3. **Launch metadata loss**: If we stop persisting all sessions, we lose the prompt/spec/group for sessions not launched via agentctl. This is acceptable — those sessions have their own metadata in their backends. + +## Decision + +Implement phases 1-3. Phase 4 is optional cleanup. diff --git a/src/daemon/metrics.test.ts b/src/daemon/metrics.test.ts index 95488d6..04be3d2 100644 --- a/src/daemon/metrics.test.ts +++ b/src/daemon/metrics.test.ts @@ -5,7 +5,6 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { FuseEngine } from "./fuse-engine.js"; import { LockManager } from "./lock-manager.js"; import { MetricsRegistry } from "./metrics.js"; -import { SessionTracker } from "./session-tracker.js"; import { StateManager } from "./state.js"; let tmpDir: string; @@ -13,15 +12,13 @@ let state: StateManager; let metrics: MetricsRegistry; let lockManager: LockManager; let fuseEngine: FuseEngine; -let sessionTracker: SessionTracker; beforeEach(async () => { tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "agentctl-metrics-test-")); state = await StateManager.load(tmpDir); lockManager = new LockManager(state); fuseEngine = new FuseEngine(state, { defaultDurationMs: 600000 }); - sessionTracker = new SessionTracker(state, { adapters: {} }); - metrics = new MetricsRegistry(sessionTracker, lockManager, fuseEngine); + metrics = new MetricsRegistry(lockManager, fuseEngine); }); afterEach(async () => { @@ -115,5 +112,11 @@ describe("MetricsRegistry", () => { expect(output).toContain('agentctl_locks_active{type="auto"} 2'); expect(output).toContain('agentctl_locks_active{type="manual"} 1'); }); + + it("reflects active session count set by session.list", () => { + metrics.setActiveSessionCount(5); + const output = metrics.generateMetrics(); + expect(output).toContain("agentctl_sessions_active 5"); + }); }); }); diff --git a/src/daemon/metrics.ts b/src/daemon/metrics.ts index 7e6b4a4..6f8d687 100644 --- a/src/daemon/metrics.ts +++ b/src/daemon/metrics.ts @@ -1,6 +1,5 @@ import type { FuseEngine } from "./fuse-engine.js"; import type { LockManager } from "./lock-manager.js"; -import type { SessionTracker } from "./session-tracker.js"; export class MetricsRegistry { sessionsTotalCompleted = 0; @@ -9,12 +8,23 @@ export class MetricsRegistry { fusesExpiredTotal = 0; sessionDurations: number[] = []; // seconds + /** Last-known active session count, updated by session.list fan-out */ + private _activeSessionCount = 0; + constructor( - private sessionTracker: SessionTracker, private lockManager: LockManager, private fuseEngine: FuseEngine, ) {} + /** Update the active session gauge (called after session.list fan-out) */ + setActiveSessionCount(count: number): void { + this._activeSessionCount = count; + } + + get activeSessionCount(): number { + return this._activeSessionCount; + } + recordSessionCompleted(durationSeconds?: number): void { this.sessionsTotalCompleted++; if (durationSeconds != null) this.sessionDurations.push(durationSeconds); @@ -53,7 +63,7 @@ export class MetricsRegistry { g( "agentctl_sessions_active", "Number of active sessions", - this.sessionTracker.activeCount(), + this._activeSessionCount, ); const locks = this.lockManager.listAll(); diff --git a/src/daemon/server.ts b/src/daemon/server.ts index 4634026..04abf3c 100644 --- a/src/daemon/server.ts +++ b/src/daemon/server.ts @@ -108,21 +108,30 @@ export async function startDaemon(opts: DaemonStartOpts = {}): Promise<{ emitter, }); const sessionTracker = new SessionTracker(state, { adapters }); - const metrics = new MetricsRegistry(sessionTracker, lockManager, fuseEngine); + const metrics = new MetricsRegistry(lockManager, fuseEngine); // Wire up events emitter.on("fuse.expired", () => { metrics.recordFuseExpired(); }); - // 9. Validate all sessions on startup — mark dead ones as stopped (#40) - sessionTracker.validateAllSessions(); + // 9. Initial PID liveness cleanup for daemon-launched sessions + // (replaces the old validateAllSessions — much simpler, only checks launches) + const initialDead = sessionTracker.cleanupDeadLaunches(); + if (initialDead.length > 0) { + for (const id of initialDead) lockManager.autoUnlock(id); + console.error( + `Startup cleanup: marked ${initialDead.length} dead launches as stopped`, + ); + } // 10. Resume fuse timers fuseEngine.resumeTimers(); - // 11. Start session polling - sessionTracker.startPolling(); + // 11. Start periodic PID liveness check for lock cleanup (30s interval) + sessionTracker.startLaunchCleanup((deadId) => { + lockManager.autoUnlock(deadId); + }); // 12. Create request handler const handleRequest = createRequestHandler({ @@ -199,7 +208,7 @@ export async function startDaemon(opts: DaemonStartOpts = {}): Promise<{ // Shutdown function const shutdown = async () => { - sessionTracker.stopPolling(); + sessionTracker.stopLaunchCleanup(); fuseEngine.shutdown(); state.flush(); await state.persist(); @@ -324,20 +333,146 @@ function createRequestHandler(ctx: HandlerContext) { switch (req.method) { case "session.list": { - let sessions = ctx.sessionTracker.listSessions({ - status: params.status as string | undefined, - all: params.all as boolean | undefined, - }); - if (params.group) { - sessions = sessions.filter((s) => s.group === params.group); + const adapterFilter = params.adapter as string | undefined; + const statusFilter = params.status as string | undefined; + const showAll = params.all as boolean | undefined; + const groupFilter = params.group as string | undefined; + + // Fan out discover() to adapters (or just one if filtered) + const adapterEntries = adapterFilter + ? Object.entries(ctx.adapters).filter( + ([name]) => name === adapterFilter, + ) + : Object.entries(ctx.adapters); + + const ADAPTER_TIMEOUT_MS = 5000; + const succeededAdapters = new Set(); + + const results = await Promise.allSettled( + adapterEntries.map(([name, adapter]) => + Promise.race([ + adapter.discover().then((sessions) => { + succeededAdapters.add(name); + return sessions.map((s) => ({ ...s, adapter: name })); + }), + new Promise((_, reject) => + setTimeout( + () => reject(new Error(`Adapter ${name} timed out`)), + ADAPTER_TIMEOUT_MS, + ), + ), + ]), + ), + ); + + // Merge fulfilled results, skip failed adapters + const discovered: import("../core/types.js").DiscoveredSession[] = + results + .filter( + ( + r, + ): r is PromiseFulfilledResult< + import("../core/types.js").DiscoveredSession[] + > => r.status === "fulfilled", + ) + .flatMap((r) => r.value); + + // Reconcile with launch metadata and enrich + const { sessions: allSessions, stoppedLaunchIds } = + ctx.sessionTracker.reconcileAndEnrich(discovered, succeededAdapters); + + // Release locks for sessions that disappeared from adapter results + for (const id of stoppedLaunchIds) { + ctx.lockManager.autoUnlock(id); + } + + // Apply filters + let sessions = allSessions; + if (statusFilter) { + sessions = sessions.filter((s) => s.status === statusFilter); + } else if (!showAll) { + sessions = sessions.filter( + (s) => s.status === "running" || s.status === "idle", + ); + } + + if (groupFilter) { + sessions = sessions.filter((s) => s.group === groupFilter); } + + // Sort: running first, then by most recent + sessions.sort((a, b) => { + if (a.status === "running" && b.status !== "running") return -1; + if (b.status === "running" && a.status !== "running") return 1; + return ( + new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime() + ); + }); + + // Update metrics gauge + ctx.metrics.setActiveSessionCount( + allSessions.filter( + (s) => s.status === "running" || s.status === "idle", + ).length, + ); + return sessions; } case "session.status": { - const session = ctx.sessionTracker.getSession(params.id as string); - if (!session) throw new Error(`Session not found: ${params.id}`); - return session; + const id = params.id as string; + + // Check launch metadata to determine adapter + const launchRecord = ctx.sessionTracker.getSession(id); + const adapterName = (params.adapter as string) || launchRecord?.adapter; + + // Determine which adapters to search + const adaptersToSearch = adapterName + ? Object.entries(ctx.adapters).filter( + ([name]) => name === adapterName, + ) + : Object.entries(ctx.adapters); + + // Search adapters for the session + for (const [name, adapter] of adaptersToSearch) { + try { + const discovered = await adapter.discover(); + let match = discovered.find((d) => d.id === id); + // Prefix match + if (!match) { + const prefixMatches = discovered.filter((d) => + d.id.startsWith(id), + ); + if (prefixMatches.length === 1) match = prefixMatches[0]; + } + if (match) { + const meta = ctx.sessionTracker.getSession(match.id); + return { + id: match.id, + adapter: name, + status: match.status, + startedAt: + match.startedAt?.toISOString() ?? new Date().toISOString(), + stoppedAt: match.stoppedAt?.toISOString(), + cwd: match.cwd ?? meta?.cwd, + model: match.model ?? meta?.model, + prompt: match.prompt ?? meta?.prompt, + tokens: match.tokens, + cost: match.cost, + pid: match.pid, + spec: meta?.spec, + group: meta?.group, + meta: match.nativeMetadata ?? meta?.meta ?? {}, + }; + } + } catch { + // Adapter failed — try next + } + } + + // Fall back to launch metadata if adapters didn't find it + if (launchRecord) return launchRecord; + throw new Error(`Session not found: ${id}`); } case "session.peek": { @@ -408,32 +543,40 @@ function createRequestHandler(ctx: HandlerContext) { } case "session.stop": { - const session = ctx.sessionTracker.getSession(params.id as string); - if (!session) throw new Error(`Session not found: ${params.id}`); + const id = params.id as string; + const launchRecord = ctx.sessionTracker.getSession(id); // Ghost pending entry with dead PID: remove from state with --force if ( - session.id.startsWith("pending-") && + launchRecord?.id.startsWith("pending-") && params.force && - session.pid && - !isProcessAlive(session.pid) + launchRecord.pid && + !isProcessAlive(launchRecord.pid) ) { - ctx.lockManager.autoUnlock(session.id); - ctx.sessionTracker.removeSession(session.id); + ctx.lockManager.autoUnlock(launchRecord.id); + ctx.sessionTracker.removeSession(launchRecord.id); return null; } - const adapter = ctx.adapters[session.adapter]; - if (!adapter) throw new Error(`Unknown adapter: ${session.adapter}`); - await adapter.stop(session.id, { + const adapterName = (params.adapter as string) || launchRecord?.adapter; + if (!adapterName) + throw new Error( + `Session not found: ${id}. Specify --adapter to stop a non-daemon session.`, + ); + + const adapter = ctx.adapters[adapterName]; + if (!adapter) throw new Error(`Unknown adapter: ${adapterName}`); + + const sessionId = launchRecord?.id || id; + await adapter.stop(sessionId, { force: params.force as boolean | undefined, }); // Remove auto-lock - ctx.lockManager.autoUnlock(session.id); + ctx.lockManager.autoUnlock(sessionId); - // Mark stopped - const stopped = ctx.sessionTracker.onSessionExit(session.id); + // Mark stopped in launch metadata + const stopped = ctx.sessionTracker.onSessionExit(sessionId); if (stopped) { ctx.metrics.recordSessionStopped(); } @@ -442,18 +585,28 @@ function createRequestHandler(ctx: HandlerContext) { } case "session.resume": { - const session = ctx.sessionTracker.getSession(params.id as string); - if (!session) throw new Error(`Session not found: ${params.id}`); - const adapter = ctx.adapters[session.adapter]; - if (!adapter) throw new Error(`Unknown adapter: ${session.adapter}`); - await adapter.resume(session.id, params.message as string); + const id = params.id as string; + const launchRecord = ctx.sessionTracker.getSession(id); + const adapterName = (params.adapter as string) || launchRecord?.adapter; + if (!adapterName) + throw new Error( + `Session not found: ${id}. Specify --adapter to resume a non-daemon session.`, + ); + const adapter = ctx.adapters[adapterName]; + if (!adapter) throw new Error(`Unknown adapter: ${adapterName}`); + await adapter.resume(launchRecord?.id || id, params.message as string); return null; } - // --- Prune command (#40) --- + // --- Prune command (#40) --- kept for CLI backward compat case "session.prune": { - const pruned = ctx.sessionTracker.pruneDeadSessions(); - return { pruned }; + // In the stateless model, there's no session registry to prune. + // Clean up dead launches (PID liveness check) as a best-effort action. + const deadIds = ctx.sessionTracker.cleanupDeadLaunches(); + for (const id of deadIds) { + ctx.lockManager.autoUnlock(id); + } + return { pruned: deadIds.length }; } case "lock.list": @@ -503,7 +656,7 @@ function createRequestHandler(ctx: HandlerContext) { return { pid: process.pid, uptime: Date.now() - startTime, - sessions: ctx.sessionTracker.activeCount(), + sessions: ctx.metrics.activeSessionCount, locks: ctx.lockManager.listAll().length, fuses: ctx.fuseEngine.listActive().length, } satisfies DaemonStatus; diff --git a/src/daemon/session-tracker.test.ts b/src/daemon/session-tracker.test.ts index e768be5..d70ad59 100644 --- a/src/daemon/session-tracker.test.ts +++ b/src/daemon/session-tracker.test.ts @@ -1,12 +1,8 @@ import fs from "node:fs/promises"; import os from "node:os"; import path from "node:path"; -import { afterEach, beforeEach, describe, expect, it } from "vitest"; -import type { - AgentAdapter, - AgentSession, - DiscoveredSession, -} from "../core/types.js"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import type { AgentSession, DiscoveredSession } from "../core/types.js"; import { SessionTracker } from "./session-tracker.js"; import { StateManager } from "./state.js"; @@ -21,7 +17,7 @@ beforeEach(async () => { }); afterEach(async () => { - tracker.stopPolling(); + tracker.stopLaunchCleanup(); state.flush(); await fs.rm(tmpDir, { recursive: true, force: true }); }); @@ -38,9 +34,22 @@ function makeSession(overrides: Partial = {}): AgentSession { }; } +function makeDiscovered( + overrides: Partial = {}, +): DiscoveredSession { + return { + id: "disc-1", + status: "running", + adapter: "claude-code", + startedAt: new Date(), + cwd: "/tmp/test", + ...overrides, + }; +} + describe("SessionTracker", () => { describe("track", () => { - it("tracks a new session", () => { + it("tracks a new session as launch metadata", () => { const session = makeSession(); const record = tracker.track(session, "claude-code"); @@ -55,6 +64,26 @@ describe("SessionTracker", () => { expect(state.getSession("test-session-1")).toBeDefined(); }); + + it("removes pending-PID entry when real session registers with same PID", () => { + tracker.track( + makeSession({ id: "pending-11111", status: "running", pid: 11111 }), + "claude-code", + ); + expect(state.getSession("pending-11111")).toBeDefined(); + + tracker.track( + makeSession({ + id: "real-uuid-session", + status: "running", + pid: 11111, + }), + "claude-code", + ); + + expect(state.getSession("pending-11111")).toBeUndefined(); + expect(state.getSession("real-uuid-session")).toBeDefined(); + }); }); describe("getSession", () => { @@ -77,343 +106,330 @@ describe("SessionTracker", () => { }); }); - describe("listSessions", () => { - it("lists all active sessions by default", () => { + describe("onSessionExit", () => { + it("marks session as stopped", () => { tracker.track( makeSession({ id: "s1", status: "running" }), "claude-code", ); - tracker.track(makeSession({ id: "s2", status: "idle" }), "claude-code"); - tracker.track( - makeSession({ id: "s3", status: "stopped" }), - "claude-code", - ); - const list = tracker.listSessions(); - expect(list).toHaveLength(2); - expect(list.map((s) => s.id)).toContain("s1"); - expect(list.map((s) => s.id)).toContain("s2"); + const stopped = tracker.onSessionExit("s1"); + expect(stopped).toBeDefined(); + expect(stopped?.status).toBe("stopped"); + expect(stopped?.stoppedAt).toBeDefined(); }); - it("includes all sessions with all flag", () => { - tracker.track( - makeSession({ id: "s1", status: "running" }), - "claude-code", - ); - tracker.track( - makeSession({ id: "s2", status: "stopped" }), - "claude-code", - ); - - const list = tracker.listSessions({ all: true }); - expect(list).toHaveLength(2); + it("returns undefined for unknown session", () => { + expect(tracker.onSessionExit("unknown")).toBeUndefined(); }); + }); - it("filters by status", () => { + describe("removeSession", () => { + it("removes a session from state", () => { tracker.track( - makeSession({ id: "s1", status: "running" }), - "claude-code", - ); - tracker.track( - makeSession({ id: "s2", status: "stopped" }), + makeSession({ id: "pending-55555", status: "running", pid: 55555 }), "claude-code", ); + expect(state.getSession("pending-55555")).toBeDefined(); - const list = tracker.listSessions({ status: "stopped" }); - expect(list).toHaveLength(1); - expect(list[0].id).toBe("s2"); + tracker.removeSession("pending-55555"); + expect(state.getSession("pending-55555")).toBeUndefined(); }); + }); - it("sorts running sessions first", () => { + describe("reconcileAndEnrich", () => { + it("enriches discovered sessions with launch metadata", () => { + // Pre-seed launch metadata with extra info tracker.track( makeSession({ id: "s1", - status: "idle", - startedAt: new Date("2025-01-02"), - }), - "claude-code", - ); - tracker.track( - makeSession({ - id: "s2", status: "running", - startedAt: new Date("2025-01-01"), + prompt: "Fix the bug", + group: "g-abc", + spec: "/tmp/spec.md", }), "claude-code", ); - const list = tracker.listSessions(); - expect(list[0].id).toBe("s2"); // running comes first - }); - }); + const discovered = [ + makeDiscovered({ + id: "s1", + status: "running", + adapter: "claude-code", + pid: 1234, + }), + ]; - describe("activeCount", () => { - it("counts running and idle sessions", () => { - tracker.track( - makeSession({ id: "s1", status: "running" }), - "claude-code", - ); - tracker.track(makeSession({ id: "s2", status: "idle" }), "claude-code"); - tracker.track( - makeSession({ id: "s3", status: "stopped" }), - "claude-code", + const { sessions } = tracker.reconcileAndEnrich( + discovered, + new Set(["claude-code"]), ); - expect(tracker.activeCount()).toBe(2); + expect(sessions).toHaveLength(1); + expect(sessions[0].id).toBe("s1"); + expect(sessions[0].prompt).toBe("Fix the bug"); + expect(sessions[0].group).toBe("g-abc"); + expect(sessions[0].spec).toBe("/tmp/spec.md"); + expect(sessions[0].pid).toBe(1234); }); - }); - describe("onSessionExit", () => { - it("marks session as stopped", () => { - tracker.track( - makeSession({ id: "s1", status: "running" }), - "claude-code", + it("returns discovered sessions without launch metadata", () => { + // No launch metadata for this session + const discovered = [ + makeDiscovered({ + id: "unknown-session", + status: "running", + adapter: "claude-code", + model: "claude-4", + }), + ]; + + const { sessions } = tracker.reconcileAndEnrich( + discovered, + new Set(["claude-code"]), ); - const stopped = tracker.onSessionExit("s1"); - expect(stopped).toBeDefined(); - expect(stopped?.status).toBe("stopped"); - expect(stopped?.stoppedAt).toBeDefined(); + expect(sessions).toHaveLength(1); + expect(sessions[0].id).toBe("unknown-session"); + expect(sessions[0].model).toBe("claude-4"); + expect(sessions[0].prompt).toBeUndefined(); }); - it("returns undefined for unknown session", () => { - expect(tracker.onSessionExit("unknown")).toBeUndefined(); - }); - }); - - describe("ghost session reaping (issue #22)", () => { - /** Create a mock adapter that returns the given sessions from discover() and list() */ - function mockAdapter(sessions: AgentSession[]): AgentAdapter { - const discovered: DiscoveredSession[] = sessions.map((s) => ({ - id: s.id, - status: s.status === "running" ? "running" : "stopped", - adapter: s.adapter, - cwd: s.cwd, - model: s.model, - startedAt: s.startedAt, - stoppedAt: s.stoppedAt, - pid: s.pid, - prompt: s.prompt, - tokens: s.tokens, - cost: s.cost, - })); - return { - id: "mock", - discover: async () => discovered, - isAlive: async (id) => - discovered.some((d) => d.id === id && d.status === "running"), - list: async () => sessions, - peek: async () => "", - status: async () => sessions[0], - launch: async () => sessions[0], - stop: async () => {}, - resume: async () => {}, - async *events() {}, - }; - } - - it("marks dead PID sessions as stopped during poll", async () => { - // Pre-seed state with a "running" session whose PID is dead + it("marks disappeared sessions as stopped when adapter succeeded", () => { + // Session was launched 60+ seconds ago tracker.track( - makeSession({ id: "pending-12345", status: "running", pid: 12345 }), + makeSession({ + id: "old-session", + status: "running", + startedAt: new Date(Date.now() - 120_000), // 2 minutes ago + }), "claude-code", ); - // Create tracker with dead-PID checker and an adapter that returns nothing - const reapTracker = new SessionTracker(state, { - adapters: { "claude-code": mockAdapter([]) }, - isProcessAlive: () => false, - }); - - // Trigger a poll cycle - reapTracker.startPolling(); - // Wait for poll to complete - await new Promise((r) => setTimeout(r, 100)); - reapTracker.stopPolling(); + // Adapter returns empty — session is gone + const { sessions, stoppedLaunchIds } = tracker.reconcileAndEnrich( + [], + new Set(["claude-code"]), + ); - const session = state.getSession("pending-12345"); - expect(session?.status).toBe("stopped"); - expect(session?.stoppedAt).toBeDefined(); + expect(stoppedLaunchIds).toContain("old-session"); + expect(state.getSession("old-session")?.status).toBe("stopped"); + // Stopped session should NOT appear in results + expect(sessions.map((s) => s.id)).not.toContain("old-session"); }); - it("removes pending-* entry when resolved session exists with same PID", async () => { - // Pre-seed state with a pending entry + it("preserves recently-launched sessions within grace period", () => { + // Session was launched just now tracker.track( - makeSession({ id: "pending-99999", status: "running", pid: 99999 }), + makeSession({ + id: "new-session", + status: "running", + startedAt: new Date(), // just now + }), "claude-code", ); - // Adapter returns a resolved session with the same PID - const resolvedSession = makeSession({ - id: "abc123-real-session-id", - status: "running", - pid: 99999, - }); - - const reapTracker = new SessionTracker(state, { - adapters: { "claude-code": mockAdapter([resolvedSession]) }, - isProcessAlive: (pid) => pid === 99999, - }); - - reapTracker.startPolling(); - await new Promise((r) => setTimeout(r, 100)); - reapTracker.stopPolling(); - - // pending-* entry should be removed - expect(state.getSession("pending-99999")).toBeUndefined(); - // Real session should exist - expect(state.getSession("abc123-real-session-id")).toBeDefined(); - expect(state.getSession("abc123-real-session-id")?.status).toBe( - "running", + // Adapter hasn't discovered it yet + const { sessions, stoppedLaunchIds } = tracker.reconcileAndEnrich( + [], + new Set(["claude-code"]), ); + + expect(stoppedLaunchIds).not.toContain("new-session"); + expect(sessions.map((s) => s.id)).toContain("new-session"); + expect(state.getSession("new-session")?.status).toBe("running"); }); - it("live PID sessions still show as running after poll", async () => { + it("does not reconcile sessions whose adapter failed", () => { tracker.track( - makeSession({ id: "live-session", status: "running", pid: 55555 }), - "claude-code", + makeSession({ + id: "oc-session", + status: "running", + startedAt: new Date(Date.now() - 120_000), + }), + "openclaw", ); - // Adapter returns this session as running - const liveSession = makeSession({ - id: "live-session", - status: "running", - pid: 55555, - }); - - const reapTracker = new SessionTracker(state, { - adapters: { "claude-code": mockAdapter([liveSession]) }, - isProcessAlive: (pid) => pid === 55555, - }); - - reapTracker.startPolling(); - await new Promise((r) => setTimeout(r, 100)); - reapTracker.stopPolling(); + // openclaw adapter failed (not in succeededAdapters) + const { sessions, stoppedLaunchIds } = tracker.reconcileAndEnrich( + [], + new Set(["claude-code"]), // only claude-code succeeded + ); - const session = state.getSession("live-session"); - expect(session?.status).toBe("running"); - expect(session?.pid).toBe(55555); + expect(stoppedLaunchIds).not.toContain("oc-session"); + // Session should still be included (from launch metadata) + expect(sessions.map((s) => s.id)).toContain("oc-session"); + expect(state.getSession("oc-session")?.status).toBe("running"); }); - it("listSessions deduplicates pending-* vs resolved entries by PID", () => { - // Both entries exist in state - tracker.track( - makeSession({ id: "pending-77777", status: "running", pid: 77777 }), - "claude-code", - ); + it("handles pending→UUID resolution via PID match", () => { + // pending entry was launched 2 min ago tracker.track( makeSession({ - id: "real-session-uuid", + id: "pending-99999", status: "running", - pid: 77777, + pid: 99999, + startedAt: new Date(Date.now() - 120_000), }), "claude-code", ); - const list = tracker.listSessions({ all: true }); - // Only the resolved session should appear - const ids = list.map((s) => s.id); - expect(ids).toContain("real-session-uuid"); - expect(ids).not.toContain("pending-77777"); - }); - - it("keeps pending-* entry if no resolved session shares its PID", () => { - // Only a pending entry, no resolved session with same PID - tracker.track( - makeSession({ id: "pending-44444", status: "running", pid: 44444 }), - "claude-code", - ); - tracker.track( - makeSession({ - id: "different-session", + // Adapter returns a resolved session with the same PID but different ID + const discovered = [ + makeDiscovered({ + id: "real-uuid", status: "running", - pid: 88888, + adapter: "claude-code", + pid: 99999, }), - "claude-code", + ]; + + const { sessions, stoppedLaunchIds } = tracker.reconcileAndEnrich( + discovered, + new Set(["claude-code"]), ); - const list = tracker.listSessions({ all: true }); - const ids = list.map((s) => s.id); - expect(ids).toContain("pending-44444"); - expect(ids).toContain("different-session"); + // pending entry should be cleaned up + expect(stoppedLaunchIds).toContain("pending-99999"); + expect(state.getSession("pending-99999")).toBeUndefined(); + // Real session should be in results + expect(sessions.map((s) => s.id)).toContain("real-uuid"); }); - }); - describe("ghost pending sessions (issue #27)", () => { - it("track() removes pending-PID entry when real UUID session registers with same PID", () => { - // Step 1: pending entry is created at launch time - tracker.track( - makeSession({ id: "pending-11111", status: "running", pid: 11111 }), - "claude-code", + it("merges results from multiple adapters", () => { + const discovered = [ + makeDiscovered({ + id: "cc-1", + status: "running", + adapter: "claude-code", + }), + makeDiscovered({ + id: "oc-1", + status: "running", + adapter: "openclaw", + }), + makeDiscovered({ + id: "pi-1", + status: "stopped", + adapter: "pi", + }), + ]; + + const { sessions } = tracker.reconcileAndEnrich( + discovered, + new Set(["claude-code", "openclaw", "pi"]), ); - expect(state.getSession("pending-11111")).toBeDefined(); - // Step 2: real session registers with the same PID + expect(sessions).toHaveLength(3); + const ids = sessions.map((s) => s.id); + expect(ids).toContain("cc-1"); + expect(ids).toContain("oc-1"); + expect(ids).toContain("pi-1"); + }); + + it("does not mark stopped sessions as stopped again", () => { + // Already stopped in launch metadata tracker.track( makeSession({ - id: "real-uuid-session", - status: "running", - pid: 11111, + id: "already-stopped", + status: "stopped", + startedAt: new Date(Date.now() - 120_000), }), "claude-code", ); - // pending entry should be consumed - expect(state.getSession("pending-11111")).toBeUndefined(); - // real session should exist - expect(state.getSession("real-uuid-session")).toBeDefined(); - expect(state.getSession("real-uuid-session")?.status).toBe("running"); + const { stoppedLaunchIds } = tracker.reconcileAndEnrich( + [], + new Set(["claude-code"]), + ); + + // Should not try to stop it again + expect(stoppedLaunchIds).not.toContain("already-stopped"); }); + }); - it("listSessions marks running sessions with dead PIDs as stopped", () => { - // Create tracker with dead-PID checker - const deadPidTracker = new SessionTracker(state, { + describe("cleanupDeadLaunches", () => { + it("marks sessions with dead PIDs as stopped", () => { + const deadTracker = new SessionTracker(state, { adapters: {}, isProcessAlive: () => false, }); - // Track a running session with a PID that will be "dead" - deadPidTracker.track( - makeSession({ id: "ghost-session", status: "running", pid: 22222 }), + deadTracker.track( + makeSession({ id: "s1", status: "running", pid: 12345 }), "claude-code", ); - // Listing should detect the dead PID and mark it stopped - const list = deadPidTracker.listSessions({ all: true }); - const ghost = list.find((s) => s.id === "ghost-session"); - expect(ghost?.status).toBe("stopped"); - expect(ghost?.stoppedAt).toBeDefined(); + const dead = deadTracker.cleanupDeadLaunches(); - // State should also be updated - const record = state.getSession("ghost-session"); - expect(record?.status).toBe("stopped"); + expect(dead).toContain("s1"); + expect(state.getSession("s1")?.status).toBe("stopped"); + expect(state.getSession("s1")?.stoppedAt).toBeDefined(); }); - it("listSessions does not mark sessions with live PIDs as stopped", () => { - const livePidTracker = new SessionTracker(state, { + it("does not mark sessions with live PIDs as stopped", () => { + const liveTracker = new SessionTracker(state, { adapters: {}, isProcessAlive: () => true, }); - livePidTracker.track( - makeSession({ id: "live-session", status: "running", pid: 33333 }), + liveTracker.track( + makeSession({ id: "s1", status: "running", pid: 12345 }), "claude-code", ); - const list = livePidTracker.listSessions(); - const live = list.find((s) => s.id === "live-session"); - expect(live?.status).toBe("running"); + const dead = liveTracker.cleanupDeadLaunches(); + + expect(dead).toHaveLength(0); + expect(state.getSession("s1")?.status).toBe("running"); }); - it("removeSession removes a session from state", () => { + it("skips already-stopped sessions", () => { tracker.track( - makeSession({ id: "pending-55555", status: "running", pid: 55555 }), + makeSession({ id: "s1", status: "stopped", pid: 12345 }), "claude-code", ); - expect(state.getSession("pending-55555")).toBeDefined(); - tracker.removeSession("pending-55555"); - expect(state.getSession("pending-55555")).toBeUndefined(); + const dead = tracker.cleanupDeadLaunches(); + expect(dead).toHaveLength(0); + }); + + it("handles sessions without PIDs (no change)", () => { + tracker.track( + makeSession({ id: "s1", status: "running" }), + "claude-code", + ); + + const dead = tracker.cleanupDeadLaunches(); + expect(dead).toHaveLength(0); + expect(state.getSession("s1")?.status).toBe("running"); + }); + }); + + describe("startLaunchCleanup / stopLaunchCleanup", () => { + it("periodically checks PID liveness", async () => { + vi.useFakeTimers(); + + const onDead = vi.fn(); + const deadTracker = new SessionTracker(state, { + adapters: {}, + isProcessAlive: () => false, + }); + + deadTracker.track( + makeSession({ id: "s1", status: "running", pid: 12345 }), + "claude-code", + ); + + deadTracker.startLaunchCleanup(onDead); + + // Advance past the 30s interval + vi.advanceTimersByTime(30_000); + + expect(onDead).toHaveBeenCalledWith("s1"); + + deadTracker.stopLaunchCleanup(); + vi.useRealTimers(); }); }); }); diff --git a/src/daemon/session-tracker.ts b/src/daemon/session-tracker.ts index 70fe491..5642511 100644 --- a/src/daemon/session-tracker.ts +++ b/src/daemon/session-tracker.ts @@ -7,256 +7,63 @@ import type { SessionRecord, StateManager } from "./state.js"; export interface SessionTrackerOpts { adapters: Record; - pollIntervalMs?: number; /** Override PID liveness check for testing (default: process.kill(pid, 0)) */ isProcessAlive?: (pid: number) => boolean; } -/** Max age for stopped sessions in state before pruning (7 days) */ -const STOPPED_SESSION_PRUNE_AGE_MS = 7 * 24 * 60 * 60 * 1000; +/** + * Grace period for recently-launched sessions. + * If a session was launched less than this many ms ago and the adapter + * doesn't return it yet, don't mark it stopped — the adapter may not + * have discovered it yet. + */ +const LAUNCH_GRACE_PERIOD_MS = 30_000; +/** + * Simplified session tracker for the stateless daemon core (ADR 004). + * + * Adapters own session truth. The daemon only tracks: + * - Launch metadata (prompt, group, spec, cwd) for sessions launched via agentctl + * - Locks and fuses (handled by LockManager / FuseEngine) + * + * The old polling loop, pruning, and state-based session registry are removed. + * session.list now fans out adapter.discover() at call time. + */ export class SessionTracker { private state: StateManager; private adapters: Record; - private pollIntervalMs: number; - private pollHandle: ReturnType | null = null; - private polling = false; private readonly isProcessAlive: (pid: number) => boolean; + private cleanupHandle: ReturnType | null = null; constructor(state: StateManager, opts: SessionTrackerOpts) { this.state = state; this.adapters = opts.adapters; - this.pollIntervalMs = opts.pollIntervalMs ?? 5000; this.isProcessAlive = opts.isProcessAlive ?? defaultIsProcessAlive; } - startPolling(): void { - if (this.pollHandle) return; - // Prune old stopped sessions on startup - this.pruneOldSessions(); - // Initial poll - this.guardedPoll(); - this.pollHandle = setInterval(() => { - this.guardedPoll(); - }, this.pollIntervalMs); - } - - /** Run poll() with a guard to skip if the previous cycle is still running */ - private guardedPoll(): void { - if (this.polling) return; - this.polling = true; - this.poll() - .catch((err) => console.error("Poll error:", err)) - .finally(() => { - this.polling = false; - }); - } - - stopPolling(): void { - if (this.pollHandle) { - clearInterval(this.pollHandle); - this.pollHandle = null; - } - } - - private async poll(): Promise { - // Collect PIDs from all adapter-discovered sessions (the source of truth) - const adapterPidToId = new Map(); - - for (const [adapterName, adapter] of Object.entries(this.adapters)) { - try { - // Discover-first: adapter.discover() is the ground truth - const discovered = await adapter.discover(); - for (const disc of discovered) { - if (disc.pid) { - adapterPidToId.set(disc.pid, disc.id); - } - - const existing = this.state.getSession(disc.id); - const record = discoveredToRecord(disc, adapterName); - - if (!existing) { - this.state.setSession(disc.id, record); - } else if ( - existing.status !== record.status || - (!existing.model && record.model) - ) { - // Status changed or model resolved — update, preserving metadata - this.state.setSession(disc.id, { - ...existing, - status: record.status, - stoppedAt: record.stoppedAt, - model: record.model || existing.model, - tokens: record.tokens, - cost: record.cost, - prompt: record.prompt || existing.prompt, - pid: record.pid, - }); - } - } - } catch { - // Adapter unavailable — skip - } - } - - // Reap stale entries from daemon state - this.reapStaleEntries(adapterPidToId); - } - /** - * Clean up ghost sessions in the daemon state: - * - pending-* entries whose PID matches a resolved session → remove pending - * - Any "running"/"idle" session in state whose PID is dead → mark stopped + * Start periodic PID liveness check for daemon-launched sessions. + * This is a lightweight check (no adapter fan-out) that runs every 30s + * to detect dead sessions and return their IDs for lock cleanup. */ - private reapStaleEntries(adapterPidToId: Map): void { - const sessions = this.state.getSessions(); - - for (const [id, record] of Object.entries(sessions)) { - // Bug 2: If this is a pending-* entry and a real session has the same PID, - // the pending entry is stale — remove it - if (id.startsWith("pending-") && record.pid) { - const resolvedId = adapterPidToId.get(record.pid); - if (resolvedId && resolvedId !== id) { - this.state.removeSession(id); - continue; - } - } - - // Bug 1: If session is "running"/"idle" but PID is dead, mark stopped - if ( - (record.status === "running" || record.status === "idle") && - record.pid - ) { - // Only reap if the adapter didn't return this session as running - // (adapter is the source of truth for sessions it knows about) - const adapterId = adapterPidToId.get(record.pid); - if (adapterId === id) continue; // Adapter confirmed this PID is active - - if (!this.isProcessAlive(record.pid)) { - this.state.setSession(id, { - ...record, - status: "stopped", - stoppedAt: new Date().toISOString(), - }); - } + startLaunchCleanup(onDead?: (sessionId: string) => void): void { + if (this.cleanupHandle) return; + this.cleanupHandle = setInterval(() => { + const dead = this.cleanupDeadLaunches(); + if (onDead) { + for (const id of dead) onDead(id); } - } + }, 30_000); } - /** - * Validate all sessions on daemon startup (#40). - * Any session marked as "running" or "idle" whose PID is dead gets - * immediately marked as "stopped". This prevents unbounded growth of - * ghost sessions across daemon restarts. - */ - validateAllSessions(): void { - const sessions = this.state.getSessions(); - let cleaned = 0; - - for (const [id, record] of Object.entries(sessions)) { - if (record.status !== "running" && record.status !== "idle") continue; - - if (record.pid) { - if (!this.isProcessAlive(record.pid)) { - this.state.setSession(id, { - ...record, - status: "stopped", - stoppedAt: new Date().toISOString(), - }); - cleaned++; - } - } else { - // No PID recorded — can't verify, mark as stopped - this.state.setSession(id, { - ...record, - status: "stopped", - stoppedAt: new Date().toISOString(), - }); - cleaned++; - } - } - - if (cleaned > 0) { - console.error( - `Validated sessions on startup: marked ${cleaned} dead sessions as stopped`, - ); - } - } - - /** - * Aggressively prune all clearly-dead sessions (#40). - * Returns the number of sessions pruned. - * Called via `agentctl prune` command. - */ - pruneDeadSessions(): number { - const sessions = this.state.getSessions(); - let pruned = 0; - - for (const [id, record] of Object.entries(sessions)) { - // Remove stopped/completed/failed sessions older than 24h - if ( - record.status === "stopped" || - record.status === "completed" || - record.status === "failed" - ) { - const stoppedAt = record.stoppedAt - ? new Date(record.stoppedAt).getTime() - : new Date(record.startedAt).getTime(); - const age = Date.now() - stoppedAt; - if (age > 24 * 60 * 60 * 1000) { - this.state.removeSession(id); - pruned++; - } - continue; - } - - // Remove running/idle sessions whose PID is dead - if (record.status === "running" || record.status === "idle") { - if (record.pid && !this.isProcessAlive(record.pid)) { - this.state.removeSession(id); - pruned++; - } else if (!record.pid) { - this.state.removeSession(id); - pruned++; - } - } - } - - return pruned; - } - - /** - * Remove stopped sessions from state that have been stopped for more than 7 days. - * This reduces overhead from accumulating hundreds of historical sessions. - */ - private pruneOldSessions(): void { - const sessions = this.state.getSessions(); - const now = Date.now(); - let pruned = 0; - - for (const [id, record] of Object.entries(sessions)) { - if ( - record.status !== "stopped" && - record.status !== "completed" && - record.status !== "failed" - ) { - continue; - } - const stoppedAt = record.stoppedAt - ? new Date(record.stoppedAt).getTime() - : new Date(record.startedAt).getTime(); - if (now - stoppedAt > STOPPED_SESSION_PRUNE_AGE_MS) { - this.state.removeSession(id); - pruned++; - } - } - - if (pruned > 0) { - console.error(`Pruned ${pruned} sessions stopped >7 days ago from state`); + stopLaunchCleanup(): void { + if (this.cleanupHandle) { + clearInterval(this.cleanupHandle); + this.cleanupHandle = null; } } - /** Track a newly launched session */ + /** Track a newly launched session (stores launch metadata in state) */ track(session: AgentSession, adapterName: string): SessionRecord { const record = sessionToRecord(session, adapterName); @@ -274,7 +81,7 @@ export class SessionTracker { return record; } - /** Get session record by id (exact or prefix) */ + /** Get session launch metadata by id (exact or prefix match) */ getSession(id: string): SessionRecord | undefined { // Exact match const exact = this.state.getSession(id); @@ -289,62 +96,12 @@ export class SessionTracker { return undefined; } - /** List all tracked sessions */ - listSessions(opts?: { - status?: string; - all?: boolean; - adapter?: string; - }): SessionRecord[] { - const sessions = Object.values(this.state.getSessions()); - - // Liveness check: mark sessions with dead PIDs as stopped - for (const s of sessions) { - if ((s.status === "running" || s.status === "idle") && s.pid) { - if (!this.isProcessAlive(s.pid)) { - s.status = "stopped"; - s.stoppedAt = new Date().toISOString(); - this.state.setSession(s.id, s); - } - } - } - - let filtered = sessions; - - if (opts?.adapter) { - filtered = filtered.filter((s) => s.adapter === opts.adapter); - } - - if (opts?.status) { - filtered = filtered.filter((s) => s.status === opts.status); - } else if (!opts?.all) { - filtered = filtered.filter( - (s) => s.status === "running" || s.status === "idle", - ); - } - - // Dedup: if a pending-* entry shares a PID with a resolved entry, show only the resolved one - filtered = deduplicatePendingSessions(filtered); - - return filtered.sort((a, b) => { - // Running first, then by recency - if (a.status === "running" && b.status !== "running") return -1; - if (b.status === "running" && a.status !== "running") return 1; - return new Date(b.startedAt).getTime() - new Date(a.startedAt).getTime(); - }); - } - - activeCount(): number { - return Object.values(this.state.getSessions()).filter( - (s) => s.status === "running" || s.status === "idle", - ).length; - } - - /** Remove a session from state entirely (used for ghost cleanup) */ + /** Remove a session from launch metadata */ removeSession(sessionId: string): void { this.state.removeSession(sessionId); } - /** Called when a session stops — returns the cwd for fuse/lock processing */ + /** Called when a session stops — marks it in launch metadata, returns the record */ onSessionExit(sessionId: string): SessionRecord | undefined { const session = this.state.getSession(sessionId); if (session) { @@ -354,6 +111,110 @@ export class SessionTracker { } return session; } + + /** + * Merge adapter-discovered sessions with daemon launch metadata. + * + * 1. Enrich discovered sessions with launch metadata (prompt, group, spec, etc.) + * 2. Reconcile: mark daemon-launched sessions as stopped if their adapter + * succeeded but didn't return them (and they're past the grace period). + * 3. Include recently-launched sessions that adapters haven't discovered yet. + * + * Returns the merged session list and IDs of sessions that were marked stopped + * (for lock cleanup by the caller). + */ + reconcileAndEnrich( + discovered: DiscoveredSession[], + succeededAdapters: Set, + ): { sessions: SessionRecord[]; stoppedLaunchIds: string[] } { + // Build lookups for discovered sessions + const discoveredIds = new Set(discovered.map((d) => d.id)); + const discoveredPids = new Map(); + for (const d of discovered) { + if (d.pid) discoveredPids.set(d.pid, d.id); + } + + // 1. Convert discovered sessions to records, enriching with launch metadata + const sessions: SessionRecord[] = discovered.map((disc) => + enrichDiscovered(disc, this.state.getSession(disc.id)), + ); + + // 2. Reconcile daemon-launched sessions that disappeared from adapter results + const stoppedLaunchIds: string[] = []; + const now = Date.now(); + + for (const [id, record] of Object.entries(this.state.getSessions())) { + if ( + record.status !== "running" && + record.status !== "idle" && + record.status !== "pending" + ) + continue; + + // If adapter for this session didn't succeed, include as-is from launch metadata + // (we can't verify status, so trust the last-known state) + if (!succeededAdapters.has(record.adapter)) { + sessions.push(record); + continue; + } + + // Skip if adapter returned this session (it's still active) + if (discoveredIds.has(id)) continue; + + // Check if this session's PID was resolved to a different ID (pending→UUID) + if (record.pid && discoveredPids.has(record.pid)) { + // PID was resolved to a real session — remove stale launch entry + this.state.removeSession(id); + stoppedLaunchIds.push(id); + continue; + } + + // Grace period: don't mark recently-launched sessions as stopped + const launchAge = now - new Date(record.startedAt).getTime(); + if (launchAge < LAUNCH_GRACE_PERIOD_MS) { + // Still within grace period — include as-is in results + sessions.push(record); + continue; + } + + // Session disappeared from adapter results — mark stopped + this.state.setSession(id, { + ...record, + status: "stopped", + stoppedAt: new Date().toISOString(), + }); + stoppedLaunchIds.push(id); + } + + return { sessions, stoppedLaunchIds }; + } + + /** + * Check PID liveness for daemon-launched sessions. + * Returns IDs of sessions whose PIDs have died. + * This is a lightweight check (no adapter fan-out) for lock cleanup. + */ + cleanupDeadLaunches(): string[] { + const dead: string[] = []; + for (const [id, record] of Object.entries(this.state.getSessions())) { + if ( + record.status !== "running" && + record.status !== "idle" && + record.status !== "pending" + ) + continue; + + if (record.pid && !this.isProcessAlive(record.pid)) { + this.state.setSession(id, { + ...record, + status: "stopped", + stoppedAt: new Date().toISOString(), + }); + dead.push(id); + } + } + return dead; + } } /** Check if a process is alive via kill(pid, 0) signal check */ @@ -367,24 +228,28 @@ function defaultIsProcessAlive(pid: number): boolean { } /** - * Remove pending-* entries that share a PID with a resolved (non-pending) session. - * This is a safety net for list output — the poll() reaper handles cleanup in state. + * Convert a discovered session to a SessionRecord, enriching with launch metadata. */ -function deduplicatePendingSessions( - sessions: SessionRecord[], -): SessionRecord[] { - const realPids = new Set(); - for (const s of sessions) { - if (!s.id.startsWith("pending-") && s.pid) { - realPids.add(s.pid); - } - } - return sessions.filter((s) => { - if (s.id.startsWith("pending-") && s.pid && realPids.has(s.pid)) { - return false; - } - return true; - }); +function enrichDiscovered( + disc: DiscoveredSession, + launchMeta: SessionRecord | undefined, +): SessionRecord { + return { + id: disc.id, + adapter: disc.adapter, + status: disc.status as SessionRecord["status"], + startedAt: disc.startedAt?.toISOString() ?? new Date().toISOString(), + stoppedAt: disc.stoppedAt?.toISOString(), + cwd: disc.cwd ?? launchMeta?.cwd, + model: disc.model ?? launchMeta?.model, + prompt: disc.prompt ?? launchMeta?.prompt, + tokens: disc.tokens, + cost: disc.cost, + pid: disc.pid, + spec: launchMeta?.spec, + group: launchMeta?.group, + meta: disc.nativeMetadata ?? launchMeta?.meta ?? {}, + }; } function sessionToRecord( @@ -408,24 +273,3 @@ function sessionToRecord( meta: session.meta, }; } - -/** Convert a DiscoveredSession (adapter ground truth) to a SessionRecord for state */ -function discoveredToRecord( - disc: DiscoveredSession, - adapterName: string, -): SessionRecord { - return { - id: disc.id, - adapter: adapterName, - status: disc.status, - startedAt: disc.startedAt?.toISOString() ?? new Date().toISOString(), - stoppedAt: disc.stoppedAt?.toISOString(), - cwd: disc.cwd, - model: disc.model, - prompt: disc.prompt, - tokens: disc.tokens, - cost: disc.cost, - pid: disc.pid, - meta: disc.nativeMetadata ?? {}, - }; -}