Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions src/adapters/claude-code.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import {
writePromptFile,
} from "../utils/prompt-file.js";
import { resolveBinaryPath } from "../utils/resolve-binary.js";
import { spawnWithRetry } from "../utils/spawn-with-retry.js";

const execFileAsync = promisify(execFile);

Expand Down Expand Up @@ -480,19 +481,13 @@ export class ClaudeCodeAdapter implements AgentAdapter {
const logFd = await fs.open(logPath, "w");

// Capture stderr to the same log file for debugging launch failures
const claudePath = await resolveBinaryPath("claude");
const child = spawn(claudePath, args, {
const child = await spawnWithRetry("claude", args, {
cwd,
env,
stdio: [promptFd ? promptFd.fd : "ignore", logFd.fd, logFd.fd],
detached: true,
});

// Handle spawn errors (e.g. ENOENT) gracefully instead of crashing the daemon
child.on("error", (err) => {
console.error(`[claude-code] spawn error: ${err.message}`);
});

// Fully detach: child runs in its own process group.
// When the wrapper gets SIGTERM, the child keeps running.
child.unref();
Expand Down
8 changes: 2 additions & 6 deletions src/adapters/codex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
writePromptFile,
} from "../utils/prompt-file.js";
import { resolveBinaryPath } from "../utils/resolve-binary.js";
import { spawnWithRetry } from "../utils/spawn-with-retry.js";

const execFileAsync = promisify(execFile);

Expand Down Expand Up @@ -280,18 +281,13 @@ export class CodexAdapter implements AgentAdapter {
const logPath = path.join(this.sessionsMetaDir, `launch-${Date.now()}.log`);
const logFd = await fs.open(logPath, "w");

const codexPath = await resolveBinaryPath("codex");
const child = spawn(codexPath, args, {
const child = await spawnWithRetry("codex", args, {
cwd,
env,
stdio: [promptFd ? promptFd.fd : "ignore", logFd.fd, "ignore"],
detached: true,
});

child.on("error", (err) => {
console.error(`[codex] spawn error: ${err.message}`);
});

child.unref();

const pid = child.pid;
Expand Down
8 changes: 2 additions & 6 deletions src/adapters/opencode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
writePromptFile,
} from "../utils/prompt-file.js";
import { resolveBinaryPath } from "../utils/resolve-binary.js";
import { spawnWithRetry } from "../utils/spawn-with-retry.js";

const execFileAsync = promisify(execFile);

Expand Down Expand Up @@ -372,18 +373,13 @@ export class OpenCodeAdapter implements AgentAdapter {
const logPath = path.join(this.sessionsMetaDir, `launch-${Date.now()}.log`);
const logFd = await fs.open(logPath, "w");

const opencodePath = await resolveBinaryPath("opencode");
const child = spawn(opencodePath, args, {
const child = await spawnWithRetry("opencode", args, {
cwd,
env,
stdio: [promptFd ? promptFd.fd : "ignore", logFd.fd, logFd.fd],
detached: true,
});

child.on("error", (err) => {
console.error(`[opencode] spawn error: ${err.message}`);
});

child.unref();

const pid = child.pid;
Expand Down
8 changes: 2 additions & 6 deletions src/adapters/pi-rust.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
writePromptFile,
} from "../utils/prompt-file.js";
import { resolveBinaryPath } from "../utils/resolve-binary.js";
import { spawnWithRetry } from "../utils/spawn-with-retry.js";

const execFileAsync = promisify(execFile);

Expand Down Expand Up @@ -378,18 +379,13 @@ export class PiRustAdapter implements AgentAdapter {
const logPath = path.join(this.sessionsMetaDir, `launch-${Date.now()}.log`);
const logFd = await fs.open(logPath, "w");

const piRustPath = await resolveBinaryPath("pi-rust");
const child = spawn(piRustPath, args, {
const child = await spawnWithRetry("pi-rust", args, {
cwd,
env,
stdio: [promptFd ? promptFd.fd : "ignore", logFd.fd, "ignore"],
detached: true,
});

child.on("error", (err) => {
console.error(`[pi-rust] spawn error: ${err.message}`);
});

child.unref();

const pid = child.pid;
Expand Down
9 changes: 2 additions & 7 deletions src/adapters/pi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
writePromptFile,
} from "../utils/prompt-file.js";
import { resolveBinaryPath } from "../utils/resolve-binary.js";
import { spawnWithRetry } from "../utils/spawn-with-retry.js";

const execFileAsync = promisify(execFile);

Expand Down Expand Up @@ -356,19 +357,13 @@ export class PiAdapter implements AgentAdapter {
const logPath = path.join(this.sessionsMetaDir, `launch-${Date.now()}.log`);
const logFd = await fs.open(logPath, "w");

const piPath = await resolveBinaryPath("pi");
const child = spawn(piPath, args, {
const child = await spawnWithRetry("pi", args, {
cwd,
env,
stdio: [promptFd ? promptFd.fd : "ignore", logFd.fd, logFd.fd],
detached: true,
});

child.on("error", (err) => {
console.error(`[pi] spawn error: ${err.message}`);
});

// Fully detach: child runs in its own process group.
child.unref();

const pid = child.pid;
Expand Down
20 changes: 20 additions & 0 deletions src/cli.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,6 +567,8 @@ program
.option("--matrix <file>", "YAML matrix file for advanced sweep launch")
.option("--on-create <script>", "Hook: run after session is created")
.option("--on-complete <script>", "Hook: run after session completes")
.option("--callback-session <key>", "Callback session key for orchestration")
.option("--callback-agent <id>", "Callback agent ID for orchestration")
.allowUnknownOption() // Allow interleaved --adapter/--model for parseAdapterSlots
.action(async (adapterName: string | undefined, opts) => {
// Load persistent config defaults; CLI flags override config values
Expand All @@ -586,6 +588,10 @@ program
}
: undefined;

// Collect callback metadata
const callbackSessionKey = opts.callbackSession as string | undefined;
const callbackAgentId = opts.callbackAgent as string | undefined;

// --- Multi-adapter / matrix detection ---
let slots: AdapterSlot[] = [];

Expand Down Expand Up @@ -657,6 +663,8 @@ program
cwd,
hooks,
adapters,
callbackSessionKey,
callbackAgentId,
onSessionLaunched: (slotResult) => {
// Track in daemon if available
if (daemonRunning && !slotResult.error) {
Expand Down Expand Up @@ -740,6 +748,8 @@ program
? { repo: worktreeInfo.repo, branch: worktreeInfo.branch }
: undefined,
hooks,
callbackSessionKey,
callbackAgentId,
});
console.log(
`Launched session ${shortId(session.id)} (PID: ${session.pid})`,
Expand Down Expand Up @@ -777,7 +787,17 @@ program
cwd,
model,
hooks,
callbackSessionKey,
callbackAgentId,
});

// Inject callback metadata into session meta
if (callbackSessionKey) {
session.meta.openclaw_callback_session_key = callbackSessionKey;
}
if (callbackAgentId) {
session.meta.openclaw_callback_agent_id = callbackAgentId;
}
console.log(
`Launched session ${shortId(session.id)} (PID: ${session.pid})`,
);
Expand Down
4 changes: 4 additions & 0 deletions src/core/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,10 @@ export interface LaunchOpts {
worktree?: { repo: string; branch: string };
/** Lifecycle hooks — shell commands to run at various points */
hooks?: LifecycleHooks;
/** Callback session key for orchestration (stored in meta) */
callbackSessionKey?: string;
/** Callback agent ID for orchestration (stored in meta) */
callbackAgentId?: string;
}

export interface LifecycleHooks {
Expand Down
48 changes: 46 additions & 2 deletions src/daemon/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@ import { PiRustAdapter } from "../adapters/pi-rust.js";
import type { AgentAdapter } from "../core/types.js";
import { migrateLocks } from "../migration/migrate-locks.js";

import { loadConfig } from "../utils/config.js";
import { clearBinaryCache } from "../utils/resolve-binary.js";
import { FuseEngine } from "./fuse-engine.js";
import { LockManager } from "./lock-manager.js";
import { MetricsRegistry } from "./metrics.js";
import { SessionTracker } from "./session-tracker.js";
import type { SessionRecord } from "./state.js";
import { StateManager } from "./state.js";
import {
buildWebhookPayload,
emitWebhook,
resolveWebhookConfig,
type WebhookConfig,
} from "./webhook.js";

const execFileAsync = promisify(execFile);

Expand Down Expand Up @@ -98,6 +106,10 @@ export async function startDaemon(opts: DaemonStartOpts = {}): Promise<{
"pi-rust": new PiRustAdapter(),
};

// 7a. Resolve webhook config from config file + env
const config = await loadConfig(path.join(configDir, "config.json"));
const webhookConfig = resolveWebhookConfig(config);

const lockManager = new LockManager(state);
const emitter = new EventEmitter();
const fuseEngine = new FuseEngine(state, {
Expand All @@ -112,11 +124,22 @@ export async function startDaemon(opts: DaemonStartOpts = {}): Promise<{
metrics.recordFuseExpired();
});

// Helper: emit webhook for a stopped session (fire-and-forget)
const emitSessionStoppedWebhook = (record: SessionRecord) => {
if (!webhookConfig) return;
const payload = buildWebhookPayload(record);
emitWebhook(webhookConfig, payload);
};

// 8. Initial PID liveness cleanup for daemon-launched sessions
// (replaces the old validateAllSessions — much simpler, only checks launches)
const initialDead = sessionTracker.cleanupDeadLaunches();
if (initialDead.length > 0) {
for (const id of initialDead) lockManager.autoUnlock(id);
for (const id of initialDead) {
lockManager.autoUnlock(id);
const rec = state.getSession(id);
if (rec) emitSessionStoppedWebhook(rec);
}
console.error(
`Startup cleanup: marked ${initialDead.length} dead launches as stopped`,
);
Expand All @@ -128,6 +151,8 @@ export async function startDaemon(opts: DaemonStartOpts = {}): Promise<{
// 10. Start periodic PID liveness check for lock cleanup (30s interval)
sessionTracker.startLaunchCleanup((deadId) => {
lockManager.autoUnlock(deadId);
const rec = state.getSession(deadId);
if (rec) emitSessionStoppedWebhook(rec);
});

// 11. Create request handler
Expand All @@ -140,6 +165,8 @@ export async function startDaemon(opts: DaemonStartOpts = {}): Promise<{
state,
configDir,
sockPath,
webhookConfig,
emitSessionStoppedWebhook,
});

// 12. Start Unix socket server
Expand Down Expand Up @@ -322,6 +349,8 @@ interface HandlerContext {
state: StateManager;
configDir: string;
sockPath: string;
webhookConfig: WebhookConfig | null;
emitSessionStoppedWebhook: (record: SessionRecord) => void;
}

function createRequestHandler(ctx: HandlerContext) {
Expand Down Expand Up @@ -378,9 +407,11 @@ function createRequestHandler(ctx: HandlerContext) {
const { sessions: allSessions, stoppedLaunchIds } =
ctx.sessionTracker.reconcileAndEnrich(discovered, succeededAdapters);

// Release locks for sessions that disappeared from adapter results
// Release locks and emit webhooks for sessions that disappeared
for (const id of stoppedLaunchIds) {
ctx.lockManager.autoUnlock(id);
const rec = ctx.state.getSession(id);
if (rec) ctx.emitSessionStoppedWebhook(rec);
}

// Apply filters
Expand Down Expand Up @@ -522,13 +553,25 @@ function createRequestHandler(ctx: HandlerContext) {
adapterOpts: params.adapterOpts as
| Record<string, unknown>
| undefined,
callbackSessionKey: params.callbackSessionKey as string | undefined,
callbackAgentId: params.callbackAgentId as string | undefined,
});

// Propagate group tag if provided
if (params.group) {
session.group = params.group as string;
}

// Propagate callback metadata
if (params.callbackSessionKey) {
session.meta.openclaw_callback_session_key =
params.callbackSessionKey as string;
}
if (params.callbackAgentId) {
session.meta.openclaw_callback_agent_id =
params.callbackAgentId as string;
}

const record = ctx.sessionTracker.track(session, adapterName);

// Auto-lock
Expand Down Expand Up @@ -576,6 +619,7 @@ function createRequestHandler(ctx: HandlerContext) {
const stopped = ctx.sessionTracker.onSessionExit(sessionId);
if (stopped) {
ctx.metrics.recordSessionStopped();
ctx.emitSessionStoppedWebhook(stopped);
}

return null;
Expand Down
Loading