From 8f6ae6401e0deaaa531f9a727a9a24a03121a5b1 Mon Sep 17 00:00:00 2001 From: "Jarvis (OpenClaw)" Date: Mon, 16 Feb 2026 12:00:35 -0800 Subject: [PATCH] feat(openclaw): add OpenClaw gateway adapter Implements AgentAdapter for the OpenClaw gateway WebSocket RPC protocol. Maps gateway sessions to the standard AgentSession interface with resilient error handling when the gateway is unreachable. - Adapter reads sessions via sessions.list and sessions.preview RPC - Supports list, peek, status with prefix matching - launch/stop/resume throw (not supported for OpenClaw sessions) - Poll-based event streaming via diffing (same pattern as claude-code) - 18 unit tests with injected RPC mock (no real gateway needed) - Registered in CLI alongside claude-code adapter Co-Authored-By: Charlie Hulcher --- src/adapters/openclaw.test.ts | 408 ++++++++++++++++++++++++++++++++ src/adapters/openclaw.ts | 430 ++++++++++++++++++++++++++++++++++ src/cli.ts | 2 + 3 files changed, 840 insertions(+) create mode 100644 src/adapters/openclaw.test.ts create mode 100644 src/adapters/openclaw.ts diff --git a/src/adapters/openclaw.test.ts b/src/adapters/openclaw.test.ts new file mode 100644 index 0000000..40d7b6c --- /dev/null +++ b/src/adapters/openclaw.test.ts @@ -0,0 +1,408 @@ +import { describe, expect, it } from "vitest"; +import { + OpenClawAdapter, + type RpcCallFn, + type SessionsListResult, + type SessionsPreviewResult, +} from "./openclaw.js"; + +const now = Date.now(); +const fiveMinAgo = now - 5 * 60 * 1000 + 30_000; // just under 5 min ago — "running" +const hourAgo = now - 60 * 60 * 1000; // 1 hour ago — "idle" + +function makeListResult( + sessions: SessionsListResult["sessions"] = [], +): SessionsListResult { + return { + ts: now, + path: "/mock/store.json", + count: sessions.length, + defaults: { + modelProvider: "anthropic", + model: "claude-sonnet-4-5-20250929", + contextTokens: 200_000, + }, + sessions, + }; +} + +function makePreviewResult( + previews: SessionsPreviewResult["previews"] = [], +): SessionsPreviewResult { + return { ts: now, previews }; +} + +function makeMockRpc( + handlers: Record) => unknown>, +): RpcCallFn { + return async (method, params) => { + const handler = handlers[method]; + if (!handler) throw new Error(`Unexpected RPC method: ${method}`); + return handler(params); + }; +} + +// --- Tests --- + +describe("OpenClawAdapter", () => { + it("has correct id", () => { + const adapter = new OpenClawAdapter({ rpcCall: makeMockRpc({}) }); + expect(adapter.id).toBe("openclaw"); + }); + + describe("list()", () => { + it("returns empty array when gateway is unreachable", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: async () => { + throw new Error("connection refused"); + }, + }); + const sessions = await adapter.list(); + expect(sessions).toEqual([]); + }); + + it("returns mapped sessions from gateway", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:main", + kind: "direct", + label: "main", + displayName: "Main Session", + derivedTitle: "Help me with code", + updatedAt: fiveMinAgo, + sessionId: "aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee", + inputTokens: 1000, + outputTokens: 500, + model: "claude-opus-4-6", + modelProvider: "anthropic", + }, + ]), + }), + }); + + const sessions = await adapter.list({ all: true }); + expect(sessions).toHaveLength(1); + expect(sessions[0].id).toBe("aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee"); + expect(sessions[0].adapter).toBe("openclaw"); + expect(sessions[0].status).toBe("running"); + expect(sessions[0].model).toBe("claude-opus-4-6"); + expect(sessions[0].tokens).toEqual({ in: 1000, out: 500 }); + expect(sessions[0].prompt).toBe("Help me with code"); + expect(sessions[0].meta.key).toBe("agent:jarvis:main"); + }); + + it("classifies old sessions as idle", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:old-session", + kind: "direct", + updatedAt: hourAgo, + sessionId: "old-session-id", + }, + ]), + }), + }); + + const sessions = await adapter.list({ all: true }); + expect(sessions).toHaveLength(1); + expect(sessions[0].status).toBe("idle"); + }); + + it("default list shows running and idle sessions", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:active", + kind: "direct", + updatedAt: fiveMinAgo, + sessionId: "active-id", + }, + { + key: "agent:jarvis:old", + kind: "direct", + updatedAt: hourAgo, + sessionId: "old-id", + }, + ]), + }), + }); + + // OpenClaw sessions are either "running" (recently active) or "idle" + // (quiescent). Default list includes both. + const sessions = await adapter.list(); + expect(sessions).toHaveLength(2); + expect(sessions[0].status).toBe("running"); + expect(sessions[1].status).toBe("idle"); + }); + + it("filters by status", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:active", + kind: "direct", + updatedAt: fiveMinAgo, + sessionId: "active-id", + }, + { + key: "agent:jarvis:idle", + kind: "direct", + updatedAt: hourAgo, + sessionId: "idle-id", + }, + ]), + }), + }); + + const idle = await adapter.list({ status: "idle" }); + expect(idle).toHaveLength(1); + expect(idle[0].id).toBe("idle-id"); + + const running = await adapter.list({ status: "running" }); + expect(running).toHaveLength(1); + expect(running[0].id).toBe("active-id"); + }); + + it("uses session key as id when sessionId is missing", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:no-session-id", + kind: "direct", + updatedAt: fiveMinAgo, + }, + ]), + }), + }); + + const sessions = await adapter.list({ all: true }); + expect(sessions[0].id).toBe("agent:jarvis:no-session-id"); + }); + + it("uses default model when row has no model", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:test", + kind: "direct", + updatedAt: fiveMinAgo, + sessionId: "test-id", + }, + ]), + }), + }); + + const sessions = await adapter.list({ all: true }); + expect(sessions[0].model).toBe("claude-sonnet-4-5-20250929"); + }); + }); + + describe("peek()", () => { + it("returns assistant messages from preview", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:peek-test", + kind: "direct", + updatedAt: fiveMinAgo, + sessionId: "peek-session-id", + }, + ]), + "sessions.preview": () => + makePreviewResult([ + { + key: "agent:jarvis:peek-test", + status: "ok" as const, + items: [ + { role: "user", text: "Hello" }, + { role: "assistant", text: "Hi there!" }, + { role: "user", text: "How are you?" }, + { role: "assistant", text: "I'm doing well." }, + ], + }, + ]), + }), + }); + + const output = await adapter.peek("peek-session-id"); + expect(output).toContain("Hi there!"); + expect(output).toContain("I'm doing well."); + expect(output).not.toContain("Hello"); + }); + + it("respects line limit", async () => { + const items: Array<{ role: string; text: string }> = []; + for (let i = 0; i < 10; i++) { + items.push({ role: "assistant", text: `Message ${i}` }); + } + + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:limit-test", + kind: "direct", + updatedAt: fiveMinAgo, + sessionId: "limit-session-id", + }, + ]), + "sessions.preview": () => + makePreviewResult([ + { + key: "agent:jarvis:limit-test", + status: "ok" as const, + items, + }, + ]), + }), + }); + + const output = await adapter.peek("limit-session-id", { lines: 3 }); + expect(output).toContain("Message 7"); + expect(output).toContain("Message 8"); + expect(output).toContain("Message 9"); + expect(output).not.toContain("Message 6"); + }); + + it("throws for unknown session", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => makeListResult([]), + }), + }); + + await expect(adapter.peek("nonexistent")).rejects.toThrow( + "Session not found", + ); + }); + + it("supports prefix matching", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:prefix-test", + kind: "direct", + updatedAt: fiveMinAgo, + sessionId: "abcdef12-3456-7890-abcd-ef1234567890", + }, + ]), + "sessions.preview": () => + makePreviewResult([ + { + key: "agent:jarvis:prefix-test", + status: "ok" as const, + items: [{ role: "assistant", text: "Found by prefix!" }], + }, + ]), + }), + }); + + const output = await adapter.peek("abcdef12"); + expect(output).toContain("Found by prefix!"); + }); + }); + + describe("status()", () => { + it("returns session details", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:status-test", + kind: "direct", + displayName: "Status Test", + updatedAt: fiveMinAgo, + sessionId: "status-session-id", + inputTokens: 500, + outputTokens: 200, + model: "claude-opus-4-6", + modelProvider: "anthropic", + }, + ]), + }), + }); + + const session = await adapter.status("status-session-id"); + expect(session.id).toBe("status-session-id"); + expect(session.adapter).toBe("openclaw"); + expect(session.status).toBe("running"); + expect(session.model).toBe("claude-opus-4-6"); + expect(session.tokens).toEqual({ in: 500, out: 200 }); + }); + + it("throws for unknown session", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => makeListResult([]), + }), + }); + + await expect(adapter.status("nonexistent")).rejects.toThrow( + "Session not found", + ); + }); + + it("supports prefix matching", async () => { + const adapter = new OpenClawAdapter({ + rpcCall: makeMockRpc({ + "sessions.list": () => + makeListResult([ + { + key: "agent:jarvis:prefix-status", + kind: "direct", + updatedAt: fiveMinAgo, + sessionId: "abcdef12-9999-9999-9999-999999999999", + model: "claude-opus-4-6", + }, + ]), + }), + }); + + const session = await adapter.status("abcdef12"); + expect(session.id).toBe("abcdef12-9999-9999-9999-999999999999"); + }); + }); + + describe("unsupported operations", () => { + it("launch throws", async () => { + const adapter = new OpenClawAdapter({ rpcCall: makeMockRpc({}) }); + await expect( + adapter.launch({ adapter: "openclaw", prompt: "test" }), + ).rejects.toThrow("cannot be launched"); + }); + + it("stop throws", async () => { + const adapter = new OpenClawAdapter({ rpcCall: makeMockRpc({}) }); + await expect(adapter.stop("some-id")).rejects.toThrow( + "cannot be stopped", + ); + }); + + it("resume throws", async () => { + const adapter = new OpenClawAdapter({ rpcCall: makeMockRpc({}) }); + await expect(adapter.resume("some-id", "msg")).rejects.toThrow( + "Cannot resume", + ); + }); + }); +}); diff --git a/src/adapters/openclaw.ts b/src/adapters/openclaw.ts new file mode 100644 index 0000000..fe52b42 --- /dev/null +++ b/src/adapters/openclaw.ts @@ -0,0 +1,430 @@ +import { randomUUID } from "node:crypto"; +import type { + AgentAdapter, + AgentSession, + LaunchOpts, + LifecycleEvent, + ListOpts, + PeekOpts, + StopOpts, +} from "../core/types.js"; + +const DEFAULT_BASE_URL = "http://127.0.0.1:18789"; + +export interface OpenClawAdapterOpts { + baseUrl?: string; // Default: http://127.0.0.1:18789 + authToken?: string; // Default: process.env.OPENCLAW_WEBHOOK_TOKEN + /** Override for testing — replaces the real WebSocket RPC call */ + rpcCall?: RpcCallFn; +} + +/** + * Shape of a single RPC exchange: send method+params, get back the payload. + * Injected in tests to avoid a real WebSocket connection. + */ +export type RpcCallFn = ( + method: string, + params: Record, +) => Promise; + +/** Row returned by the gateway's `sessions.list` method */ +export interface GatewaySessionRow { + key: string; + kind: "direct" | "group" | "global" | "unknown"; + label?: string; + displayName?: string; + derivedTitle?: string; + lastMessagePreview?: string; + channel?: string; + updatedAt: number | null; + sessionId?: string; + inputTokens?: number; + outputTokens?: number; + totalTokens?: number; + model?: string; + modelProvider?: string; +} + +/** Result envelope from `sessions.list` */ +export interface SessionsListResult { + ts: number; + path: string; + count: number; + defaults: { + modelProvider: string | null; + model: string | null; + contextTokens: number | null; + }; + sessions: GatewaySessionRow[]; +} + +/** Single preview entry from `sessions.preview` */ +export interface SessionsPreviewEntry { + key: string; + status: "ok" | "empty" | "missing" | "error"; + items: Array<{ role: string; text: string }>; +} + +/** Result envelope from `sessions.preview` */ +export interface SessionsPreviewResult { + ts: number; + previews: SessionsPreviewEntry[]; +} + +/** + * OpenClaw adapter — reads session data from the OpenClaw gateway via + * its WebSocket RPC protocol. Falls back gracefully when the gateway + * is unreachable. + */ +export class OpenClawAdapter implements AgentAdapter { + readonly id = "openclaw"; + private readonly baseUrl: string; + private readonly authToken: string; + private readonly rpcCall: RpcCallFn; + + constructor(opts?: OpenClawAdapterOpts) { + this.baseUrl = opts?.baseUrl || DEFAULT_BASE_URL; + this.authToken = opts?.authToken || process.env.OPENCLAW_WEBHOOK_TOKEN || ""; + this.rpcCall = opts?.rpcCall || this.defaultRpcCall.bind(this); + } + + async list(opts?: ListOpts): Promise { + let result: SessionsListResult; + try { + result = (await this.rpcCall("sessions.list", { + includeDerivedTitles: true, + includeLastMessage: true, + })) as SessionsListResult; + } catch { + // Gateway unreachable — return empty + return []; + } + + let sessions = result.sessions.map((row) => + this.mapRowToSession(row, result.defaults), + ); + + if (opts?.status) { + sessions = sessions.filter((s) => s.status === opts.status); + } + + if (!opts?.all && !opts?.status) { + sessions = sessions.filter( + (s) => s.status === "running" || s.status === "idle", + ); + } + + return sessions; + } + + async peek(sessionId: string, opts?: PeekOpts): Promise { + const key = await this.resolveKey(sessionId); + if (!key) throw new Error(`Session not found: ${sessionId}`); + + const limit = opts?.lines ?? 20; + let result: SessionsPreviewResult; + try { + result = (await this.rpcCall("sessions.preview", { + keys: [key], + limit, + maxChars: 4000, + })) as SessionsPreviewResult; + } catch (err) { + throw new Error(`Failed to peek session ${sessionId}: ${(err as Error).message}`); + } + + const preview = result.previews?.[0]; + if (!preview || preview.status === "missing") { + throw new Error(`Session not found: ${sessionId}`); + } + + if (preview.items.length === 0) return "(no messages)"; + + const assistantMessages = preview.items + .filter((item) => item.role === "assistant") + .map((item) => item.text); + + if (assistantMessages.length === 0) return "(no assistant messages)"; + + return assistantMessages.slice(-limit).join("\n---\n"); + } + + async status(sessionId: string): Promise { + let result: SessionsListResult; + try { + result = (await this.rpcCall("sessions.list", { + includeDerivedTitles: true, + search: sessionId, + })) as SessionsListResult; + } catch (err) { + throw new Error(`Failed to get status for ${sessionId}: ${(err as Error).message}`); + } + + const row = result.sessions.find( + (s) => s.sessionId === sessionId || s.key === sessionId || + s.sessionId?.startsWith(sessionId) || s.key.startsWith(sessionId), + ); + + if (!row) throw new Error(`Session not found: ${sessionId}`); + + return this.mapRowToSession(row, result.defaults); + } + + async launch(_opts: LaunchOpts): Promise { + throw new Error("OpenClaw sessions cannot be launched via agent-ctl"); + } + + async stop(_sessionId: string, _opts?: StopOpts): Promise { + throw new Error("OpenClaw sessions cannot be stopped via agent-ctl"); + } + + async resume(sessionId: string, _message: string): Promise { + // OpenClaw sessions receive messages through their configured channels, + // not through a direct CLI interface. + throw new Error( + `Cannot resume OpenClaw session ${sessionId} — use the gateway UI or configured channel`, + ); + } + + async *events(): AsyncIterable { + // Poll-based diffing (same pattern as claude-code) + let knownSessions = new Map(); + + // Initial snapshot + const initial = await this.list({ all: true }); + for (const s of initial) { + knownSessions.set(s.id, s); + } + + while (true) { + await sleep(5000); + + let current: AgentSession[]; + try { + current = await this.list({ all: true }); + } catch { + continue; + } + + const currentMap = new Map(current.map((s) => [s.id, s])); + + for (const [id, session] of currentMap) { + const prev = knownSessions.get(id); + if (!prev) { + yield { + type: "session.started", + adapter: this.id, + sessionId: id, + session, + timestamp: new Date(), + }; + } else if (prev.status === "running" && session.status === "stopped") { + yield { + type: "session.stopped", + adapter: this.id, + sessionId: id, + session, + timestamp: new Date(), + }; + } else if (prev.status === "running" && session.status === "idle") { + yield { + type: "session.idle", + adapter: this.id, + sessionId: id, + session, + timestamp: new Date(), + }; + } + } + + knownSessions = currentMap; + } + } + + // --- Private helpers --- + + /** + * Map a gateway session row to the standard AgentSession interface. + * OpenClaw sessions with a recent updatedAt are considered "running". + */ + private mapRowToSession( + row: GatewaySessionRow, + defaults: SessionsListResult["defaults"], + ): AgentSession { + const now = Date.now(); + const updatedAt = row.updatedAt ?? 0; + const ageMs = now - updatedAt; + + // Consider "running" if updated in the last 5 minutes + const isActive = updatedAt > 0 && ageMs < 5 * 60 * 1000; + + const model = row.model || defaults.model || undefined; + const input = row.inputTokens ?? 0; + const output = row.outputTokens ?? 0; + + return { + id: row.sessionId || row.key, + adapter: this.id, + status: isActive ? "running" : "idle", + startedAt: updatedAt > 0 ? new Date(updatedAt) : new Date(), + cwd: undefined, + model, + prompt: row.derivedTitle || row.displayName || row.label, + tokens: input || output ? { in: input, out: output } : undefined, + meta: { + key: row.key, + kind: row.kind, + channel: row.channel, + displayName: row.displayName, + modelProvider: row.modelProvider || defaults.modelProvider, + lastMessagePreview: row.lastMessagePreview, + }, + }; + } + + /** + * Resolve a sessionId (or prefix) to a gateway session key. + */ + private async resolveKey(sessionId: string): Promise { + let result: SessionsListResult; + try { + result = (await this.rpcCall("sessions.list", { + search: sessionId, + })) as SessionsListResult; + } catch { + return null; + } + + const row = result.sessions.find( + (s) => s.sessionId === sessionId || s.key === sessionId || + s.sessionId?.startsWith(sessionId) || s.key.startsWith(sessionId), + ); + + return row?.key ?? null; + } + + /** + * Real WebSocket RPC call — connects, performs handshake, sends one + * request, reads the response, then disconnects. + */ + private async defaultRpcCall( + method: string, + params: Record, + ): Promise { + // Dynamic import so tests can inject a mock without loading ws + const { WebSocket } = await import("ws" as string).catch(() => { + // Fall back to globalThis.WebSocket (available in Node 22+) + return { WebSocket: globalThis.WebSocket }; + }); + + const wsUrl = this.baseUrl.replace(/^http/, "ws"); + const ws = new WebSocket(wsUrl); + + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + ws.close(); + reject(new Error("OpenClaw gateway connection timed out")); + }, 10_000); + + const reqId = randomUUID(); + let connected = false; + + ws.onopen = () => { + // Wait for challenge event, then send connect + }; + + ws.onmessage = (event: { data: unknown }) => { + try { + const raw = typeof event.data === "string" ? event.data : String(event.data); + const frame = JSON.parse(raw); + + // Step 1: Receive challenge, send connect + if (frame.type === "event" && frame.event === "connect.challenge") { + ws.send( + JSON.stringify({ + type: "req", + id: randomUUID(), + method: "connect", + params: { + minProtocol: 1, + maxProtocol: 1, + client: { + id: "agent-ctl", + version: "0.1.0", + platform: process.platform, + mode: "cli", + }, + role: "operator", + scopes: ["operator.read"], + auth: { token: this.authToken || null }, + }, + }), + ); + return; + } + + // Step 2: Receive hello-ok, send actual RPC + if (frame.type === "res" && frame.ok && !connected) { + connected = true; + ws.send( + JSON.stringify({ + type: "req", + id: reqId, + method, + params, + }), + ); + return; + } + + // Step 3: Receive RPC response + if (frame.type === "res" && frame.id === reqId) { + clearTimeout(timeout); + ws.close(); + if (frame.ok) { + resolve(frame.payload); + } else { + reject( + new Error( + frame.error?.message || `RPC error: ${method}`, + ), + ); + } + return; + } + + // Auth failure + if (frame.type === "res" && !frame.ok && !connected) { + clearTimeout(timeout); + ws.close(); + reject( + new Error( + frame.error?.message || "OpenClaw gateway auth failed", + ), + ); + } + } catch { + // Ignore malformed frames + } + }; + + ws.onerror = (err: unknown) => { + clearTimeout(timeout); + reject( + new Error( + `OpenClaw gateway error: ${(err as Error)?.message || "connection failed"}`, + ), + ); + }; + + ws.onclose = () => { + clearTimeout(timeout); + // Only reject if we haven't resolved yet + }; + }); + } +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} diff --git a/src/cli.ts b/src/cli.ts index cd63aad..5049816 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -2,10 +2,12 @@ import { Command } from "commander"; import { ClaudeCodeAdapter } from "./adapters/claude-code.js"; +import { OpenClawAdapter } from "./adapters/openclaw.js"; import type { AgentAdapter, AgentSession, ListOpts } from "./core/types.js"; const adapters: Record = { "claude-code": new ClaudeCodeAdapter(), + openclaw: new OpenClawAdapter(), }; function getAdapter(name?: string): AgentAdapter {