diff --git a/src/adapters/large-prompt-launch.test.ts b/src/adapters/large-prompt-launch.test.ts index f056c90..234b5c2 100644 --- a/src/adapters/large-prompt-launch.test.ts +++ b/src/adapters/large-prompt-launch.test.ts @@ -80,7 +80,7 @@ describe("Large prompt handling — OpenCode", () => { }); }); - it("passes small prompt as CLI arg", async () => { + it("passes small prompt in wrapper script CLI arg", async () => { await adapter.launch({ adapter: "opencode", prompt: smallPrompt, @@ -88,7 +88,11 @@ describe("Large prompt handling — OpenCode", () => { }); expect(spawnCalls).toHaveLength(1); - expect(spawnCalls[0].args).toContain(smallPrompt); + // Now launches via wrapper: /bin/sh + expect(spawnCalls[0].cmd).toBe("/bin/sh"); + const wrapperPath = spawnCalls[0].args[0]; + const wrapperContent = await fs.readFile(wrapperPath, "utf-8"); + expect(wrapperContent).toContain(smallPrompt); // stdin should be "ignore" for small prompts expect(spawnCalls[0].opts.stdio[0]).toBe("ignore"); }); @@ -101,9 +105,11 @@ describe("Large prompt handling — OpenCode", () => { }); expect(spawnCalls).toHaveLength(1); - // Large prompt must NOT appear in args - expect(spawnCalls[0].args).not.toContain(largePrompt); - expect(spawnCalls[0].args).toEqual(["run"]); + // Wrapper script should NOT contain the large prompt + const wrapperPath = spawnCalls[0].args[0]; + const wrapperContent = await fs.readFile(wrapperPath, "utf-8"); + expect(wrapperContent).not.toContain(largePrompt); + expect(wrapperContent).toContain("'run'"); // stdin should be a file descriptor (number), not "ignore" expect(typeof spawnCalls[0].opts.stdio[0]).toBe("number"); }); diff --git a/src/adapters/opencode-fuse.test.ts b/src/adapters/opencode-fuse.test.ts new file mode 100644 index 0000000..93aec89 --- /dev/null +++ b/src/adapters/opencode-fuse.test.ts @@ -0,0 +1,409 @@ +import * as fs from "node:fs/promises"; +import * as os from "node:os"; +import * as path from "node:path"; +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import type { LifecycleEvent } from "../core/types.js"; +import { + generateWrapperScript, + OpenCodeAdapter, + type OpenCodeAdapterOpts, +} from "./opencode.js"; + +let tmpDir: string; +let storageDir: string; +let sessionDir: string; +let sessionsMetaDir: string; + +/** Set of PIDs considered alive by the mock */ +let alivePids: Set; + +function makeAdapter( + overrides?: Partial, +): OpenCodeAdapter { + return new OpenCodeAdapter({ + storageDir, + sessionsMetaDir, + getPids: async () => new Map(), + isProcessAlive: (pid) => alivePids.has(pid), + pollIntervalMs: 10, // fast polling for tests + masterTimeoutMs: 100, // short timeout for tests + ...overrides, + }); +} + +beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), "agentctl-opencode-fuse-")); + storageDir = path.join(tmpDir, "storage"); + sessionDir = path.join(storageDir, "session"); + sessionsMetaDir = path.join(tmpDir, "opencode-sessions"); + await fs.mkdir(sessionDir, { recursive: true }); + await fs.mkdir(path.join(storageDir, "message"), { recursive: true }); + await fs.mkdir(sessionsMetaDir, { recursive: true }); + alivePids = new Set(); +}); + +afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }); +}); + +/** Write a fake session meta file (simulates what launch() writes) */ +async function writeMeta( + sessionId: string, + pid: number, + launchedAt?: Date, +): Promise { + const meta = { + sessionId, + pid, + launchedAt: (launchedAt ?? new Date()).toISOString(), + }; + await fs.writeFile( + path.join(sessionsMetaDir, `${sessionId}.json`), + JSON.stringify(meta), + ); +} + +/** Write a .exit file for a session */ +async function writeExitFile( + sessionId: string, + exitCode: number, +): Promise { + await fs.writeFile( + path.join(sessionsMetaDir, `${sessionId}.exit`), + String(exitCode), + ); +} + +/** Collect events from events() generator until predicate is met or timeout */ +async function collectEvents( + adapter: OpenCodeAdapter, + opts: { + until?: (e: LifecycleEvent) => boolean; + maxEvents?: number; + timeoutMs?: number; + }, +): Promise { + const events: LifecycleEvent[] = []; + const maxEvents = opts.maxEvents ?? 10; + const timeoutMs = opts.timeoutMs ?? 2000; + + const gen = adapter.events()[Symbol.asyncIterator](); + + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + const remaining = deadline - Date.now(); + if (remaining <= 0) break; + + const result = await Promise.race([ + gen.next(), + sleep(Math.min(remaining, 100)).then(() => "timeout" as const), + ]); + + if (result === "timeout") continue; + if (result.done) break; + + events.push(result.value); + if (opts.until?.(result.value)) break; + if (events.length >= maxEvents) break; + } + + // Clean up generator + gen.return?.(undefined); + return events; +} + +describe("generateWrapperScript", () => { + it("produces a valid shell script that writes exit code", () => { + const script = generateWrapperScript( + "/usr/local/bin/opencode", + ["run", "--", "hello world"], + "/tmp/test.exit", + ); + expect(script).toContain("#!/bin/sh"); + expect(script).toContain("/usr/local/bin/opencode"); + expect(script).toContain("'run'"); + expect(script).toContain("'hello world'"); + expect(script).toContain("EC=$?"); + expect(script).toContain("echo \"$EC\" > '/tmp/test.exit'"); + }); + + it("shell-escapes single quotes in args", () => { + const script = generateWrapperScript( + "/usr/bin/opencode", + ["run", "--", "it's a test"], + "/tmp/out.exit", + ); + expect(script).toContain("'it'\\''s a test'"); + }); +}); + +describe("three-prong fuse: exit file signal", () => { + it("emits session.stopped when exit file appears", async () => { + const sessionId = "fuse-exit-test-001"; + const pid = 54321; + alivePids.add(pid); + + // Write meta (simulates launch) + await writeMeta(sessionId, pid); + + const adapter = makeAdapter(); + + // Start events generator and write exit file after a short delay + const eventsPromise = collectEvents(adapter, { + until: (e) => e.type === "session.stopped", + timeoutMs: 1000, + }); + + // Give the generator time to bootstrap, then write exit file + await sleep(50); + await writeExitFile(sessionId, 0); + + const events = await eventsPromise; + const stopped = events.find((e) => e.type === "session.stopped"); + expect(stopped).toBeDefined(); + expect(stopped?.sessionId).toBe(sessionId); + expect(stopped?.meta?.exitCode).toBe(0); + expect(stopped?.meta?.signal).toBe("exit-file"); + }); + + it("captures non-zero exit codes", async () => { + const sessionId = "fuse-exit-nonzero"; + const pid = 54322; + alivePids.add(pid); + + await writeMeta(sessionId, pid); + const adapter = makeAdapter(); + + const eventsPromise = collectEvents(adapter, { + until: (e) => e.type === "session.stopped", + timeoutMs: 1000, + }); + + await sleep(50); + await writeExitFile(sessionId, 1); + + const events = await eventsPromise; + const stopped = events.find((e) => e.type === "session.stopped"); + expect(stopped).toBeDefined(); + expect(stopped?.meta?.exitCode).toBe(1); + }); +}); + +describe("three-prong fuse: PID death signal", () => { + it("emits session.stopped when PID dies without exit file", async () => { + const sessionId = "fuse-pid-death-001"; + const pid = 54323; + alivePids.add(pid); + + await writeMeta(sessionId, pid); + const adapter = makeAdapter(); + + const eventsPromise = collectEvents(adapter, { + until: (e) => e.type === "session.stopped", + timeoutMs: 1000, + }); + + // Kill the PID (remove from alive set) + await sleep(50); + alivePids.delete(pid); + + const events = await eventsPromise; + const stopped = events.find((e) => e.type === "session.stopped"); + expect(stopped).toBeDefined(); + expect(stopped?.sessionId).toBe(sessionId); + expect(stopped?.meta?.signal).toBe("pid-death"); + // No exit code available from PID death + expect(stopped?.meta?.exitCode).toBeUndefined(); + }); +}); + +describe("three-prong fuse: master timeout signal", () => { + it("emits session.timeout when timeout exceeded", async () => { + const sessionId = "fuse-timeout-001"; + const pid = 54324; + alivePids.add(pid); + + // Write meta with a launchedAt in the past (so timeout is already exceeded) + const pastDate = new Date(Date.now() - 200); // 200ms ago, timeout is 100ms + await writeMeta(sessionId, pid, pastDate); + + const adapter = makeAdapter({ masterTimeoutMs: 100 }); + + const events = await collectEvents(adapter, { + until: (e) => e.type === "session.timeout", + timeoutMs: 1000, + }); + + const timeout = events.find((e) => e.type === "session.timeout"); + expect(timeout).toBeDefined(); + expect(timeout?.sessionId).toBe(sessionId); + expect(timeout?.meta?.signal).toBe("master-timeout"); + expect(timeout?.meta?.timeoutMs).toBe(100); + }); + + it("does not kill the process on timeout", async () => { + const sessionId = "fuse-timeout-nokill"; + const pid = 54325; + alivePids.add(pid); + + const pastDate = new Date(Date.now() - 200); + await writeMeta(sessionId, pid, pastDate); + + const adapter = makeAdapter({ masterTimeoutMs: 100 }); + + await collectEvents(adapter, { + until: (e) => e.type === "session.timeout", + timeoutMs: 1000, + }); + + // PID should still be alive — timeout does NOT kill + expect(alivePids.has(pid)).toBe(true); + }); +}); + +describe("fuse cancellation pattern", () => { + it("exit file cancels PID poll and timeout (only one event emitted)", async () => { + const sessionId = "fuse-cancel-001"; + const pid = 54326; + alivePids.add(pid); + + // Use a past launchedAt so timeout would also fire + const pastDate = new Date(Date.now() - 200); + await writeMeta(sessionId, pid, pastDate); + + const adapter = makeAdapter({ masterTimeoutMs: 100 }); + + // Write exit file immediately — this should fire first + await writeExitFile(sessionId, 0); + + // Collect the first event for this session, then let the generator + // run a few more cycles to verify no duplicates + const events = await collectEvents(adapter, { + until: (e) => e.sessionId === sessionId, + timeoutMs: 500, + }); + + // Should get exactly one event for this session + const sessionEvents = events.filter((e) => e.sessionId === sessionId); + expect(sessionEvents).toHaveLength(1); + expect(sessionEvents[0].type).toBe("session.stopped"); + expect(sessionEvents[0].meta?.signal).toBe("exit-file"); + }); + + it("PID death cancels timeout (no timeout event after PID death)", async () => { + const sessionId = "fuse-cancel-pid"; + const pid = 54327; + alivePids.add(pid); + + await writeMeta(sessionId, pid); + + const adapter = makeAdapter({ masterTimeoutMs: 200 }); + + // Start collecting events, then kill PID after bootstrap + const eventsPromise = collectEvents(adapter, { + until: (e) => e.sessionId === sessionId, + timeoutMs: 1000, + }); + + await sleep(30); + alivePids.delete(pid); + + const events = await eventsPromise; + + const sessionEvents = events.filter((e) => e.sessionId === sessionId); + expect(sessionEvents).toHaveLength(1); + expect(sessionEvents[0].type).toBe("session.stopped"); + expect(sessionEvents[0].meta?.signal).toBe("pid-death"); + }); + + it("after fuse fires, subsequent polls do not re-emit for that session", async () => { + const sessionId = "fuse-no-reemit"; + const pid = 54399; + alivePids.add(pid); + + await writeMeta(sessionId, pid); + + const adapter = makeAdapter(); + + // Start collecting, then kill PID after bootstrap + const eventsPromise = collectEvents(adapter, { + maxEvents: 10, + timeoutMs: 400, + }); + + await sleep(30); + alivePids.delete(pid); + + const events = await eventsPromise; + + // Only one event for this session despite multiple poll cycles + const sessionEvents = events.filter((e) => e.sessionId === sessionId); + expect(sessionEvents).toHaveLength(1); + }); +}); + +describe("list() meta-dir primary source", () => { + it("shows sessions from meta dir that are not in native storage", async () => { + const sessionId = "meta-only-session"; + const pid = 54328; + alivePids.add(pid); + + await writeMeta(sessionId, pid); + + const adapter = makeAdapter(); + const sessions = await adapter.list({ all: true }); + + const found = sessions.find((s) => s.id === sessionId); + expect(found).toBeDefined(); + expect(found?.status).toBe("running"); + expect(found?.pid).toBe(pid); + }); + + it("shows stopped meta-dir sessions when --all", async () => { + const sessionId = "meta-stopped-session"; + const pid = 54329; + // PID is NOT alive + await writeMeta(sessionId, pid); + + const adapter = makeAdapter(); + const sessions = await adapter.list({ all: true }); + + const found = sessions.find((s) => s.id === sessionId); + expect(found).toBeDefined(); + expect(found?.status).toBe("stopped"); + }); + + it("filters meta-dir sessions by status", async () => { + const runningId = "meta-running"; + const stoppedId = "meta-stopped"; + alivePids.add(100); + await writeMeta(runningId, 100); + await writeMeta(stoppedId, 200); // pid 200 not alive + + const adapter = makeAdapter(); + + const running = await adapter.list({ status: "running" }); + expect(running.find((s) => s.id === runningId)).toBeDefined(); + expect(running.find((s) => s.id === stoppedId)).toBeUndefined(); + }); + + it("deduplicates sessions between meta dir and native storage", async () => { + // This verifies seenIds prevents duplicates + const sessionId = "dedup-session"; + const pid = 54330; + alivePids.add(pid); + + // Write to meta dir + await writeMeta(sessionId, pid); + + const adapter = makeAdapter(); + const sessions = await adapter.list({ all: true }); + + const matches = sessions.filter((s) => s.id === sessionId); + expect(matches).toHaveLength(1); + }); +}); + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/src/adapters/opencode-launch.test.ts b/src/adapters/opencode-launch.test.ts index af44da2..9412bfc 100644 --- a/src/adapters/opencode-launch.test.ts +++ b/src/adapters/opencode-launch.test.ts @@ -30,9 +30,12 @@ vi.mock("../utils/resolve-binary.js", () => ({ })); // Import after mocks are declared (vitest hoists vi.mock) -const { OpenCodeAdapter } = await import("./opencode.js"); +const { OpenCodeAdapter, generateWrapperScript } = await import( + "./opencode.js" +); let tmpDir: string; +let sessionsMetaDir: string; let adapter: InstanceType; beforeEach(async () => { @@ -41,7 +44,7 @@ beforeEach(async () => { path.join(os.tmpdir(), "agentctl-opencode-launch-"), ); const storageDir = path.join(tmpDir, "storage"); - const sessionsMetaDir = path.join(tmpDir, "opencode-sessions"); + sessionsMetaDir = path.join(tmpDir, "opencode-sessions"); await fs.mkdir(path.join(storageDir, "session"), { recursive: true }); await fs.mkdir(sessionsMetaDir, { recursive: true }); @@ -57,8 +60,49 @@ afterEach(async () => { await fs.rm(tmpDir, { recursive: true, force: true }); }); +describe("generateWrapperScript", () => { + it("generates a shell script that runs the binary and writes exit code", () => { + const script = generateWrapperScript( + "/usr/local/bin/opencode", + ["run", "--model", "gpt-4", "--", "fix bug"], + "/tmp/test.exit", + ); + expect(script).toContain("#!/bin/sh"); + expect(script).toContain("/usr/local/bin/opencode"); + expect(script).toContain("'--model'"); + expect(script).toContain("'gpt-4'"); + expect(script).toContain("'fix bug'"); + expect(script).toContain("EC=$?"); + expect(script).toContain("'/tmp/test.exit'"); + }); + + it("shell-escapes single quotes in arguments", () => { + const script = generateWrapperScript( + "/usr/local/bin/opencode", + ["run", "--", "it's a bug"], + "/tmp/test.exit", + ); + expect(script).toContain("'it'\\''s a bug'"); + }); +}); + describe("OpenCodeAdapter launch", () => { - it("passes --model flag when opts.model is set", async () => { + it("spawns /bin/sh with a wrapper script", async () => { + await adapter.launch({ + adapter: "opencode", + prompt: "fix the bug", + cwd: tmpDir, + }); + + expect(spawnCalls).toHaveLength(1); + expect(spawnCalls[0].cmd).toBe("/bin/sh"); + // The arg is the wrapper script path + const wrapperPath = spawnCalls[0].args[0]; + expect(wrapperPath).toContain("wrapper-"); + expect(wrapperPath).toContain(".sh"); + }); + + it("wrapper script includes --model flag when opts.model is set", async () => { await adapter.launch({ adapter: "opencode", prompt: "fix the bug", @@ -67,23 +111,15 @@ describe("OpenCodeAdapter launch", () => { }); expect(spawnCalls).toHaveLength(1); - const args = spawnCalls[0].args; - expect(args).toContain("--model"); - expect(args).toContain("deepseek-r1"); - // --model and its value should appear before the prompt - const modelIdx = args.indexOf("--model"); - const promptIdx = args.indexOf("fix the bug"); - expect(modelIdx).toBeLessThan(promptIdx); - expect(args).toEqual([ - "run", - "--model", - "deepseek-r1", - "--", - "fix the bug", - ]); + const wrapperPath = spawnCalls[0].args[0]; + const wrapperContent = await fs.readFile(wrapperPath, "utf-8"); + + expect(wrapperContent).toContain("'--model'"); + expect(wrapperContent).toContain("'deepseek-r1'"); + expect(wrapperContent).toContain("/usr/local/bin/opencode"); }); - it("omits --model flag when opts.model is not set", async () => { + it("wrapper script omits --model flag when opts.model is not set", async () => { await adapter.launch({ adapter: "opencode", prompt: "fix the bug", @@ -91,12 +127,15 @@ describe("OpenCodeAdapter launch", () => { }); expect(spawnCalls).toHaveLength(1); - const args = spawnCalls[0].args; - expect(args).not.toContain("--model"); - expect(args).toEqual(["run", "--", "fix the bug"]); + const wrapperPath = spawnCalls[0].args[0]; + const wrapperContent = await fs.readFile(wrapperPath, "utf-8"); + + expect(wrapperContent).not.toContain("'--model'"); + expect(wrapperContent).toContain("'run'"); + expect(wrapperContent).toContain("'fix the bug'"); }); - it("inserts -- before prompts that start with dashes (e.g. YAML frontmatter)", async () => { + it("wrapper script includes -- before prompts starting with dashes", async () => { const dashPrompt = "---\ntitle: My Spec\n---\nBuild this."; await adapter.launch({ adapter: "opencode", @@ -105,10 +144,53 @@ describe("OpenCodeAdapter launch", () => { }); expect(spawnCalls).toHaveLength(1); - const args = spawnCalls[0].args; - // -- must appear immediately before the prompt positional - const separatorIdx = args.indexOf("--"); - expect(separatorIdx).toBeGreaterThan(-1); - expect(args[separatorIdx + 1]).toBe(dashPrompt); + const wrapperPath = spawnCalls[0].args[0]; + const wrapperContent = await fs.readFile(wrapperPath, "utf-8"); + + expect(wrapperContent).toContain("'--'"); + }); + + it("wrapper writes .exit file alongside session meta", async () => { + await adapter.launch({ + adapter: "opencode", + prompt: "fix the bug", + cwd: tmpDir, + }); + + const wrapperPath = spawnCalls[0].args[0]; + const wrapperContent = await fs.readFile(wrapperPath, "utf-8"); + + // Wrapper should reference a .exit file in the sessions meta dir + expect(wrapperContent).toContain(".exit"); + expect(wrapperContent).toContain("EC=$?"); + expect(wrapperContent).toContain('echo "$EC"'); + }); + + it("creates a fuse entry for the launched session", async () => { + const session = await adapter.launch({ + adapter: "opencode", + prompt: "fix the bug", + cwd: tmpDir, + }); + + expect(session.status).toBe("running"); + expect(session.pid).toBe(99999); + expect(session.adapter).toBe("opencode"); + }); + + it("persists session metadata with cwd and model", async () => { + const session = await adapter.launch({ + adapter: "opencode", + prompt: "fix the bug", + model: "gpt-4o", + cwd: tmpDir, + }); + + const metaPath = path.join(sessionsMetaDir, `${session.id}.json`); + const meta = JSON.parse(await fs.readFile(metaPath, "utf-8")); + expect(meta.cwd).toBe(tmpDir); + expect(meta.model).toBe("gpt-4o"); + expect(meta.prompt).toBe("fix the bug"); + expect(meta.adapter).toBe("opencode"); }); }); diff --git a/src/adapters/opencode.test.ts b/src/adapters/opencode.test.ts index 1c32156..cfbb1c8 100644 --- a/src/adapters/opencode.test.ts +++ b/src/adapters/opencode.test.ts @@ -1464,3 +1464,98 @@ describe("OpenCodeAdapter", () => { }); }); }); + +// ================================================================ +// Lifecycle fuse tests → see opencode-fuse.test.ts +// Wrapper script tests → see opencode-launch.test.ts +// ================================================================ + +describe("Meta-dir sessions visible in list()", () => { + it("lists sessions from meta dir even without native storage files", async () => { + const alivePids = new Set([77777]); + const listAdapter = new OpenCodeAdapter({ + storageDir, + sessionsMetaDir, + getPids: async () => new Map(), + isProcessAlive: (pid) => alivePids.has(pid), + }); + + // Write only a meta-dir session (no native storage) + const sid = "meta-only-session-0000-000000000000"; + const meta: LaunchedSessionMeta = { + sessionId: sid, + pid: 77777, + launchedAt: new Date().toISOString(), + }; + await fs.writeFile( + path.join(sessionsMetaDir, `${sid}.json`), + JSON.stringify(meta, null, 2), + ); + + const sessions = await listAdapter.list(); + expect(sessions.length).toBe(1); + expect(sessions[0].id).toBe(sid); + expect(sessions[0].status).toBe("running"); + expect(sessions[0].meta?.source).toBe("meta-dir"); + }); + + it("shows stopped meta-dir session when exit file exists", async () => { + const listAdapter = new OpenCodeAdapter({ + storageDir, + sessionsMetaDir, + getPids: async () => new Map(), + isProcessAlive: () => false, + }); + + const sid = "stopped-meta-session-000000000000"; + const meta: LaunchedSessionMeta = { + sessionId: sid, + pid: 88888, + launchedAt: new Date().toISOString(), + }; + await fs.writeFile( + path.join(sessionsMetaDir, `${sid}.json`), + JSON.stringify(meta, null, 2), + ); + await fs.writeFile(path.join(sessionsMetaDir, `${sid}.exit`), "0"); + + // --all to include stopped + const sessions = await listAdapter.list({ all: true }); + const match = sessions.find((s) => s.id === sid); + expect(match).toBeDefined(); + expect(match?.status).toBe("stopped"); + }); + + it("does not duplicate sessions found in both meta-dir and native storage", async () => { + const alivePids = new Set([99999]); + const listAdapter = new OpenCodeAdapter({ + storageDir, + sessionsMetaDir, + getPids: async () => new Map(), + isProcessAlive: (pid) => alivePids.has(pid), + }); + + // Session exists in both meta-dir and native storage + const sid = "dual-source-session-000000000000"; + const meta: LaunchedSessionMeta = { + sessionId: sid, + pid: 99999, + launchedAt: new Date().toISOString(), + }; + await fs.writeFile( + path.join(sessionsMetaDir, `${sid}.json`), + JSON.stringify(meta, null, 2), + ); + + // Also create native storage entry + const session = makeSession({ + id: sid, + directory: "/Users/test/my-project", + }); + await createFakeSession("/Users/test/my-project", session); + + const sessions = await listAdapter.list({ all: true }); + const matches = sessions.filter((s) => s.id === sid); + expect(matches.length).toBe(1); + }); +}); diff --git a/src/adapters/opencode.ts b/src/adapters/opencode.ts index 8f12b3d..332f30c 100644 --- a/src/adapters/opencode.ts +++ b/src/adapters/opencode.ts @@ -26,10 +26,10 @@ import { resolveBinaryPath } from "../utils/resolve-binary.js"; import { cleanupExpiredMeta, deleteSessionMeta, + type LaunchedSessionMeta, readSessionMeta, writeSessionMeta, } from "../utils/session-meta.js"; -import { spawnWithRetry } from "../utils/spawn-with-retry.js"; const execFileAsync = promisify(execFile); @@ -44,6 +44,12 @@ const DEFAULT_STORAGE_DIR = path.join( // Default: only show stopped sessions from the last 7 days const STOPPED_SESSION_MAX_AGE_MS = 7 * 24 * 60 * 60 * 1000; +/** Default master timeout: 3 hours */ +const DEFAULT_MASTER_TIMEOUT_MS = 3 * 60 * 60 * 1000; + +/** PID poll interval: 15 seconds */ +const POLL_INTERVAL_MS = 15_000; + export interface PidInfo { pid: number; cwd: string; @@ -107,12 +113,28 @@ export interface OpenCodeMessageFile { providerID?: string; } +/** Per-session fuse state — tracks the three lifecycle signals */ +export interface SessionFuse { + sessionId: string; + pid: number; + exitFilePath: string; + launchedAt: Date; + timeoutMs: number; + abortController: AbortController; + /** Cached session object for event emission */ + session: AgentSession; +} + export interface OpenCodeAdapterOpts { storageDir?: string; // Override ~/.local/share/opencode/storage for testing sessionsMetaDir?: string; // Override metadata dir for testing getPids?: () => Promise>; // Override PID detection for testing /** Override PID liveness check for testing (default: process.kill(pid, 0)) */ isProcessAlive?: (pid: number) => boolean; + /** Override master timeout for testing (default: 3h) */ + masterTimeoutMs?: number; + /** Override poll interval for testing (default: 15s) */ + pollIntervalMs?: number; } /** @@ -122,9 +144,37 @@ export function computeProjectHash(directory: string): string { return crypto.createHash("sha1").update(directory).digest("hex"); } +/** + * Generate a wrapper shell script that runs opencode and writes exit code to a file. + * This gives us immediate exit code capture — the primary signal in the fuse. + */ +export function generateWrapperScript( + opencodeBin: string, + args: string[], + exitFilePath: string, +): string { + // Shell-escape each arg: wrap in single quotes, escape embedded single quotes + const escapedArgs = args + .map((a) => `'${a.replace(/'/g, "'\\''")}'`) + .join(" "); + return [ + "#!/bin/sh", + `${opencodeBin} ${escapedArgs}`, + `EC=$?`, + `echo "$EC" > '${exitFilePath.replace(/'/g, "'\\''")}'`, + `exit $EC`, + ].join("\n"); +} + /** * OpenCode adapter — reads session data from ~/.local/share/opencode/storage/ * and cross-references with running opencode processes. + * + * Implements three-prong session lifecycle fuse: + * 1. Wrapper exit hook (writes .exit file with exit code) + * 2. PID death poll (kill(pid,0) every 15s) + * 3. Master timeout (configurable, default 3h) + * First signal to fire cancels the others via AbortController. */ export class OpenCodeAdapter implements AgentAdapter { readonly id = "opencode"; @@ -134,6 +184,14 @@ export class OpenCodeAdapter implements AgentAdapter { private readonly sessionsMetaDir: string; private readonly getPids: () => Promise>; private readonly isProcessAlive: (pid: number) => boolean; + private readonly masterTimeoutMs: number; + private readonly pollIntervalMs: number; + + /** Active fuses for launched sessions — keyed by sessionId */ + private readonly fuses = new Map(); + + /** Session IDs that have already fired a fuse event — prevents re-emission by legacy poll */ + private readonly firedFuseIds = new Set(); constructor(opts?: OpenCodeAdapterOpts) { this.storageDir = opts?.storageDir || DEFAULT_STORAGE_DIR; @@ -144,6 +202,8 @@ export class OpenCodeAdapter implements AgentAdapter { path.join(os.homedir(), ".agentctl", "opencode-sessions"); this.getPids = opts?.getPids || getOpenCodePids; this.isProcessAlive = opts?.isProcessAlive || defaultIsProcessAlive; + this.masterTimeoutMs = opts?.masterTimeoutMs ?? DEFAULT_MASTER_TIMEOUT_MS; + this.pollIntervalMs = opts?.pollIntervalMs ?? POLL_INTERVAL_MS; } async discover(): Promise { @@ -215,46 +275,13 @@ export class OpenCodeAdapter implements AgentAdapter { async list(opts?: ListOpts): Promise { const runningPids = await this.getPids(); const sessions: AgentSession[] = []; + const seenIds = new Set(); - let projectDirs: string[]; - try { - projectDirs = await fs.readdir(this.sessionDir); - } catch { - return []; - } - - for (const projHash of projectDirs) { - const projPath = path.join(this.sessionDir, projHash); - const stat = await fs.stat(projPath).catch(() => null); - if (!stat?.isDirectory()) continue; - - const sessionFiles = await this.getSessionFilesForProject(projPath); + // Primary source: opencode's native storage (has rich metadata + PID recycling) + await this.listFromNativeStorage(sessions, seenIds, runningPids, opts); - for (const sessionData of sessionFiles) { - const session = await this.buildSession(sessionData, runningPids); - - // Filter by status - if (opts?.status && session.status !== opts.status) continue; - - // If not --all, skip old stopped sessions - if (!opts?.all && session.status === "stopped") { - const age = Date.now() - session.startedAt.getTime(); - if (age > STOPPED_SESSION_MAX_AGE_MS) continue; - } - - // Default: only show running sessions unless --all - if ( - !opts?.all && - !opts?.status && - session.status !== "running" && - session.status !== "idle" - ) { - continue; - } - - sessions.push(session); - } - } + // Supplementary: meta dir for agentctl-launched sessions not in native storage + await this.listFromMetaDir(sessions, seenIds, runningPids, opts); // Sort: running first, then by most recent sessions.sort((a, b) => { @@ -364,12 +391,28 @@ export class OpenCodeAdapter implements AgentAdapter { await fs.mkdir(this.sessionsMetaDir, { recursive: true }); + const sessionId = crypto.randomUUID(); + const exitFilePath = path.join(this.sessionsMetaDir, `${sessionId}.exit`); + // Write stdout/stderr to a log file so we don't keep pipes open // (which would prevent full detachment of the child process). const logPath = path.join(this.sessionsMetaDir, `launch-${Date.now()}.log`); const logFd = await fs.open(logPath, "w"); - const child = await spawnWithRetry("opencode", args, { + // Generate wrapper script that captures exit code + const opencodeBin = await resolveBinaryPath("opencode"); + const wrapperScript = generateWrapperScript( + opencodeBin, + args, + exitFilePath, + ); + const wrapperPath = path.join( + this.sessionsMetaDir, + `wrapper-${sessionId}.sh`, + ); + await fs.writeFile(wrapperPath, wrapperScript, { mode: 0o755 }); + + const child = spawn("/bin/sh", [wrapperPath], { cwd, env, stdio: [promptFd ? promptFd.fd : "ignore", logFd.fd, logFd.fd], @@ -386,11 +429,16 @@ export class OpenCodeAdapter implements AgentAdapter { if (promptFd) await promptFd.close(); if (promptFilePath) await cleanupPromptFile(promptFilePath); - const sessionId = crypto.randomUUID(); - // Persist session metadata so status checks work after wrapper exits if (pid) { - await writeSessionMeta(this.sessionsMetaDir, { sessionId, pid }); + await writeSessionMeta(this.sessionsMetaDir, { + sessionId, + pid, + cwd, + model: opts.model, + prompt: opts.prompt.slice(0, 200), + adapter: this.id, + }); } const session: AgentSession = { @@ -408,6 +456,21 @@ export class OpenCodeAdapter implements AgentAdapter { }, }; + // Register fuse for this session + if (pid) { + const timeoutMs = opts.timeout ?? this.masterTimeoutMs; + const fuse: SessionFuse = { + sessionId, + pid, + exitFilePath, + launchedAt: now, + timeoutMs, + abortController: new AbortController(), + session, + }; + this.fuses.set(sessionId, fuse); + } + return session; } @@ -457,7 +520,19 @@ export class OpenCodeAdapter implements AgentAdapter { await logFd.close(); } + /** + * Three-prong lifecycle fuse event generator. + * + * For each tracked session, checks three signals every poll cycle: + * 1. Exit file exists → session.stopped with exit code + * 2. PID dead (kill(pid,0) fails) → session.stopped with unknown exit code + * 3. Master timeout exceeded → session.timeout + * + * First signal to fire cancels the others via AbortController. + * Also falls back to the legacy poll for sessions not launched via agentctl. + */ async *events(): AsyncIterable { + // Legacy tracking for sessions discovered from native storage let knownSessions = new Map(); const initial = await this.list({ all: true }); @@ -465,7 +540,7 @@ export class OpenCodeAdapter implements AgentAdapter { knownSessions.set(s.id, s); } - // Poll + fs.watch hybrid + // Poll + fs.watch hybrid for native storage let watcher: ReturnType | undefined; try { watcher = watch(this.sessionDir, { recursive: true }); @@ -475,12 +550,23 @@ export class OpenCodeAdapter implements AgentAdapter { try { while (true) { - await sleep(5000); + // Re-scan meta dir for newly launched sessions not yet tracked + await this.bootstrapFusesFromMeta(); + + // Check fuses for tracked sessions (three-prong) + yield* this.checkFuses(); + + // Sleep before next cycle + await sleep(this.pollIntervalMs); + // Legacy poll for native-storage sessions const current = await this.list({ all: true }); const currentMap = new Map(current.map((s) => [s.id, s])); for (const [id, session] of currentMap) { + // Skip sessions tracked by fuse — they're handled above + if (this.fuses.has(id) || this.firedFuseIds.has(id)) continue; + const prev = knownSessions.get(id); if (!prev) { yield { @@ -516,6 +602,290 @@ export class OpenCodeAdapter implements AgentAdapter { } } finally { watcher?.close(); + // Clean up any remaining fuses + for (const fuse of this.fuses.values()) { + fuse.abortController.abort(); + } + this.fuses.clear(); + } + } + + // --- Fuse management --- + + /** + * Check all active fuses for signals. Yields events for any that fired. + * First signal cancels the others (AbortController pattern). + */ + private async *checkFuses(): AsyncIterable { + for (const [sessionId, fuse] of this.fuses) { + if (fuse.abortController.signal.aborted) { + this.fuses.delete(sessionId); + continue; + } + + // Signal 1: Exit file exists (wrapper completed) + const exitCode = await this.readExitFile(fuse.exitFilePath); + if (exitCode !== null) { + fuse.abortController.abort(); + this.fuses.delete(sessionId); + this.firedFuseIds.add(sessionId); + const session = { + ...fuse.session, + status: "stopped" as const, + stoppedAt: new Date(), + }; + yield { + type: "session.stopped", + adapter: this.id, + sessionId, + session, + timestamp: new Date(), + meta: { exitCode, signal: "exit-file" }, + }; + continue; + } + + // Signal 2: PID death poll + if (!this.isProcessAlive(fuse.pid)) { + fuse.abortController.abort(); + this.fuses.delete(sessionId); + this.firedFuseIds.add(sessionId); + const session = { + ...fuse.session, + status: "stopped" as const, + stoppedAt: new Date(), + }; + yield { + type: "session.stopped", + adapter: this.id, + sessionId, + session, + timestamp: new Date(), + meta: { signal: "pid-death" }, + }; + continue; + } + + // Signal 3: Master timeout + const elapsed = Date.now() - fuse.launchedAt.getTime(); + if (elapsed >= fuse.timeoutMs) { + fuse.abortController.abort(); + this.fuses.delete(sessionId); + this.firedFuseIds.add(sessionId); + yield { + type: "session.timeout", + adapter: this.id, + sessionId, + session: fuse.session, + timestamp: new Date(), + meta: { timeoutMs: fuse.timeoutMs, signal: "master-timeout" }, + }; + } + } + } + + /** + * Bootstrap fuses for meta-dir sessions not yet tracked. + * Called each poll cycle to pick up sessions launched between cycles. + * + * Creates fuses even for dead PIDs — checkFuses will immediately detect + * the death and emit session.stopped on the next cycle. + */ + private async bootstrapFusesFromMeta(): Promise { + let files: string[]; + try { + files = await fs.readdir(this.sessionsMetaDir); + } catch { + return; + } + + for (const file of files) { + if (!file.endsWith(".json")) continue; + const sessionId = file.replace(/\.json$/, ""); + + // Skip if already tracked or already fired + if (this.fuses.has(sessionId) || this.firedFuseIds.has(sessionId)) + continue; + + const meta = await readSessionMeta(this.sessionsMetaDir, sessionId); + if (!meta?.pid) continue; + + // Create fuse if PID is alive or exit file exists. + // If exit file exists, checkFuses will fire immediately on next cycle. + // If PID is dead with no exit file, checkFuses detects that too. + const exitFilePath = path.join(this.sessionsMetaDir, `${sessionId}.exit`); + const hasExitFile = (await this.readExitFile(exitFilePath)) !== null; + if (!hasExitFile && !this.isProcessAlive(meta.pid)) continue; + + const launchedAt = new Date(meta.launchedAt); + + const session: AgentSession = { + id: sessionId, + adapter: this.id, + status: "running", + startedAt: launchedAt, + pid: meta.pid, + cwd: meta.cwd, + model: meta.model, + prompt: meta.prompt, + meta: { source: "meta-dir" }, + }; + + this.fuses.set(sessionId, { + sessionId, + pid: meta.pid, + exitFilePath, + launchedAt, + timeoutMs: this.masterTimeoutMs, + abortController: new AbortController(), + session, + }); + } + } + + /** + * Read exit code from a .exit file. Returns null if file doesn't exist. + */ + private async readExitFile(exitFilePath: string): Promise { + try { + const content = await fs.readFile(exitFilePath, "utf-8"); + const code = parseInt(content.trim(), 10); + return Number.isNaN(code) ? null : code; + } catch { + return null; + } + } + + // --- list() helpers --- + + /** + * Enumerate sessions from the meta dir (agentctl-launched sessions). + * This is the primary source — these sessions may not appear in native storage. + */ + private async listFromMetaDir( + sessions: AgentSession[], + seenIds: Set, + _runningPids: Map, + opts?: ListOpts, + ): Promise { + let files: string[]; + try { + files = await fs.readdir(this.sessionsMetaDir); + } catch { + return; + } + + for (const file of files) { + if (!file.endsWith(".json")) continue; + const sessionId = file.replace(/\.json$/, ""); + if (seenIds.has(sessionId)) continue; + + let meta: LaunchedSessionMeta | null; + try { + const raw = await fs.readFile( + path.join(this.sessionsMetaDir, file), + "utf-8", + ); + meta = JSON.parse(raw) as LaunchedSessionMeta; + } catch { + continue; + } + if (!meta?.sessionId) continue; + + // Check if it has an exit file → stopped + const exitFilePath = path.join(this.sessionsMetaDir, `${sessionId}.exit`); + const exitCode = await this.readExitFile(exitFilePath); + const pidAlive = meta.pid ? this.isProcessAlive(meta.pid) : false; + const isRunning = exitCode === null && pidAlive; + + const launchedAt = meta.launchedAt + ? new Date(meta.launchedAt) + : new Date(); + + const session: AgentSession = { + id: meta.sessionId, + adapter: this.id, + status: isRunning ? "running" : "stopped", + startedAt: launchedAt, + stoppedAt: isRunning ? undefined : new Date(), + pid: isRunning ? meta.pid : undefined, + cwd: meta.cwd, + model: meta.model, + prompt: meta.prompt, + group: meta.group, + meta: { source: "meta-dir", exitCode: exitCode ?? undefined }, + }; + + // Apply filters + if (opts?.status && session.status !== opts.status) continue; + if (!opts?.all && session.status === "stopped") { + const age = Date.now() - session.startedAt.getTime(); + if (age > STOPPED_SESSION_MAX_AGE_MS) continue; + } + if ( + !opts?.all && + !opts?.status && + session.status !== "running" && + session.status !== "idle" + ) { + continue; + } + + seenIds.add(sessionId); + sessions.push(session); + } + } + + /** + * Enumerate sessions from opencode's native storage directory. + */ + private async listFromNativeStorage( + sessions: AgentSession[], + seenIds: Set, + runningPids: Map, + opts?: ListOpts, + ): Promise { + let projectDirs: string[]; + try { + projectDirs = await fs.readdir(this.sessionDir); + } catch { + return; + } + + for (const projHash of projectDirs) { + const projPath = path.join(this.sessionDir, projHash); + const stat = await fs.stat(projPath).catch(() => null); + if (!stat?.isDirectory()) continue; + + const sessionFiles = await this.getSessionFilesForProject(projPath); + + for (const sessionData of sessionFiles) { + if (seenIds.has(sessionData.id)) continue; + + const session = await this.buildSession(sessionData, runningPids); + + // Filter by status + if (opts?.status && session.status !== opts.status) continue; + + // If not --all, skip old stopped sessions + if (!opts?.all && session.status === "stopped") { + const age = Date.now() - session.startedAt.getTime(); + if (age > STOPPED_SESSION_MAX_AGE_MS) continue; + } + + // Default: only show running sessions unless --all + if ( + !opts?.all && + !opts?.status && + session.status !== "running" && + session.status !== "idle" + ) { + continue; + } + + seenIds.add(sessionData.id); + sessions.push(session); + } } } diff --git a/src/core/types.ts b/src/core/types.ts index 8819ee3..c0f1405 100644 --- a/src/core/types.ts +++ b/src/core/types.ts @@ -67,7 +67,8 @@ export interface LifecycleEvent { | "session.started" | "session.stopped" | "session.idle" - | "session.error"; + | "session.error" + | "session.timeout"; adapter: string; sessionId: string; session: AgentSession; @@ -102,6 +103,8 @@ export interface LaunchOpts { worktree?: { repo: string; branch: string }; /** Lifecycle hooks — shell commands to run at various points */ hooks?: LifecycleHooks; + /** Master timeout in ms — emits session.timeout after this duration (default: 3h) */ + timeout?: number; /** Callback session key for orchestration (stored in meta) */ callbackSessionKey?: string; /** Callback agent ID for orchestration (stored in meta) */ diff --git a/src/utils/session-meta.test.ts b/src/utils/session-meta.test.ts index 79493fa..80d86b0 100644 --- a/src/utils/session-meta.test.ts +++ b/src/utils/session-meta.test.ts @@ -5,7 +5,9 @@ import { afterEach, beforeEach, describe, expect, it } from "vitest"; import { cleanupExpiredMeta, deleteSessionMeta, + listSessionMeta, readSessionMeta, + updateSessionMeta, writeSessionMeta, } from "./session-meta.js"; @@ -131,4 +133,74 @@ describe("session-meta", () => { expect(cleaned).toBe(0); }); }); + + describe("listSessionMeta", () => { + it("lists all non-expired metadata files", async () => { + await writeSessionMeta(tmpDir, { sessionId: "s1", pid: 11111 }); + await writeSessionMeta(tmpDir, { sessionId: "s2", pid: 22222 }); + + const metas = await listSessionMeta(tmpDir); + expect(metas).toHaveLength(2); + const ids = metas.map((m) => m.sessionId).sort(); + expect(ids).toEqual(["s1", "s2"]); + }); + + it("skips expired metadata", async () => { + await writeSessionMeta(tmpDir, { sessionId: "fresh", pid: 11111 }); + // Write an expired meta manually + await fs.writeFile( + path.join(tmpDir, "old.json"), + JSON.stringify({ + sessionId: "old", + pid: 22222, + launchedAt: new Date(Date.now() - 25 * 60 * 60 * 1000).toISOString(), + }), + ); + + const metas = await listSessionMeta(tmpDir); + expect(metas).toHaveLength(1); + expect(metas[0].sessionId).toBe("fresh"); + }); + + it("returns empty array for non-existent directory", async () => { + const metas = await listSessionMeta("/tmp/does-not-exist-list"); + expect(metas).toEqual([]); + }); + }); + + describe("updateSessionMeta", () => { + it("updates specific fields on an existing meta file", async () => { + await writeSessionMeta(tmpDir, { sessionId: "s1", pid: 11111 }); + const updated = await updateSessionMeta(tmpDir, "s1", { + exitCode: 42, + }); + expect(updated).toBe(true); + + const meta = await readSessionMeta(tmpDir, "s1"); + expect(meta?.exitCode).toBe(42); + expect(meta?.pid).toBe(11111); // unchanged + }); + + it("returns false for non-existent session", async () => { + const updated = await updateSessionMeta(tmpDir, "nonexistent", { + exitCode: 1, + }); + expect(updated).toBe(false); + }); + + it("preserves extra fields written by writeSessionMeta", async () => { + await writeSessionMeta(tmpDir, { + sessionId: "s1", + pid: 11111, + cwd: "/some/path", + model: "gpt-4o", + }); + await updateSessionMeta(tmpDir, "s1", { exitCode: 0 }); + + const meta = await readSessionMeta(tmpDir, "s1"); + expect(meta?.cwd).toBe("/some/path"); + expect(meta?.model).toBe("gpt-4o"); + expect(meta?.exitCode).toBe(0); + }); + }); }); diff --git a/src/utils/session-meta.ts b/src/utils/session-meta.ts index d4d54e9..af58b9c 100644 --- a/src/utils/session-meta.ts +++ b/src/utils/session-meta.ts @@ -21,6 +21,18 @@ export interface LaunchedSessionMeta { launchedAt: string; // ISO 8601 — used for TTL expiry /** Path to adapter launch log — used as fallback for peek on short-lived sessions */ logPath?: string; + /** Exit code written by the wrapper script (undefined = still running or unknown) */ + exitCode?: number; + /** Working directory at launch */ + cwd?: string; + /** Model used for the session */ + model?: string; + /** First 200 chars of the prompt */ + prompt?: string; + /** Adapter ID (e.g. "opencode") */ + adapter?: string; + /** Launch group tag (e.g. "g-a1b2c3") */ + group?: string; } /** @@ -29,7 +41,15 @@ export interface LaunchedSessionMeta { */ export async function writeSessionMeta( metaDir: string, - meta: { sessionId: string; pid: number }, + meta: { + sessionId: string; + pid: number; + cwd?: string; + model?: string; + prompt?: string; + adapter?: string; + group?: string; + }, ): Promise { await fs.mkdir(metaDir, { recursive: true }); @@ -51,6 +71,11 @@ export async function writeSessionMeta( pid: meta.pid, startTime, launchedAt: new Date().toISOString(), + cwd: meta.cwd, + model: meta.model, + prompt: meta.prompt, + adapter: meta.adapter, + group: meta.group, }; const metaPath = path.join(metaDir, `${meta.sessionId}.json`); await fs.writeFile(metaPath, JSON.stringify(fullMeta, null, 2)); @@ -144,3 +169,53 @@ function isMetaExpired(meta: LaunchedSessionMeta): boolean { if (!meta.launchedAt) return false; return Date.now() - new Date(meta.launchedAt).getTime() > META_TTL_MS; } + +/** + * List all non-expired session metadata files. + */ +export async function listSessionMeta( + metaDir: string, +): Promise { + const results: LaunchedSessionMeta[] = []; + try { + const files = await fs.readdir(metaDir); + for (const file of files) { + if (!file.endsWith(".json")) continue; + try { + const filePath = path.join(metaDir, file); + const raw = await fs.readFile(filePath, "utf-8"); + const meta = JSON.parse(raw) as LaunchedSessionMeta; + if (isMetaExpired(meta)) { + await fs.unlink(filePath).catch(() => {}); + continue; + } + results.push(meta); + } catch { + // skip unreadable files + } + } + } catch { + // Dir doesn't exist + } + return results; +} + +/** + * Atomically update specific fields on an existing session meta file. + */ +export async function updateSessionMeta( + metaDir: string, + sessionId: string, + updates: Partial, +): Promise { + const metaPath = path.join(metaDir, `${sessionId}.json`); + try { + const raw = await fs.readFile(metaPath, "utf-8"); + const meta = JSON.parse(raw) as LaunchedSessionMeta; + Object.assign(meta, updates); + await fs.writeFile(metaPath, JSON.stringify(meta, null, 2)); + return true; + } catch { + return false; + } +}