Compare commits
3 Commits
fix/node-i
...
fix/gatewa
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
110079d99d | ||
|
|
34a126a6d7 | ||
|
|
31462f64d8 |
@@ -32,9 +32,9 @@ Docs: https://docs.clawd.bot
|
||||
- Diagnostics: emit message-flow diagnostics across channels via shared dispatch; gate heartbeat/webhook logging. (#1244) — thanks @oscargavin.
|
||||
- CLI: preserve cron delivery settings when editing message payloads. (#1322) — thanks @KrauseFx.
|
||||
- CLI: keep `clawdbot logs` output resilient to broken pipes while preserving progress output.
|
||||
- Nodes: enforce node.invoke timeouts for node handlers. (#1357) — thanks @vignesh07.
|
||||
- Model catalog: avoid caching import failures, log transient discovery errors, and keep partial results. (#1332) — thanks @dougvk.
|
||||
- Doctor: clarify plugin auto-enable hint text in the startup banner.
|
||||
- Gateway: allow mobile node client ids for iOS + Android handshake validation. (#1354) — thanks @vignesh07.
|
||||
- Gateway: clarify unauthorized handshake responses with token/password mismatch guidance.
|
||||
- Gateway: clarify connect/validation errors for gateway params. (#1347) — thanks @vignesh07.
|
||||
- Gateway: preserve restart wake routing + thread replies across restarts. (#1337) — thanks @John-Rood.
|
||||
|
||||
@@ -529,7 +529,7 @@ class NodeRuntime(context: Context) {
|
||||
caps = buildCapabilities(),
|
||||
commands = buildInvokeCommands(),
|
||||
permissions = emptyMap(),
|
||||
client = buildClientInfo(clientId = "node-host", clientMode = "node"),
|
||||
client = buildClientInfo(clientId = "clawdbot-android", clientMode = "node"),
|
||||
userAgent = buildUserAgent(),
|
||||
)
|
||||
}
|
||||
|
||||
@@ -11,35 +11,6 @@ private struct NodeInvokeRequestPayload: Codable, Sendable {
|
||||
var idempotencyKey: String?
|
||||
}
|
||||
|
||||
// Ensures the timeout can win even if the invoke task never completes.
|
||||
private actor InvokeTimeoutRace {
|
||||
private var finished = false
|
||||
private let continuation: CheckedContinuation<BridgeInvokeResponse, Never>
|
||||
private var invokeTask: Task<Void, Never>?
|
||||
private var timeoutTask: Task<Void, Never>?
|
||||
|
||||
init(continuation: CheckedContinuation<BridgeInvokeResponse, Never>) {
|
||||
self.continuation = continuation
|
||||
}
|
||||
|
||||
func registerTasks(invoke: Task<Void, Never>, timeout: Task<Void, Never>) {
|
||||
self.invokeTask = invoke
|
||||
self.timeoutTask = timeout
|
||||
if finished {
|
||||
invoke.cancel()
|
||||
timeout.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func finish(_ response: BridgeInvokeResponse) {
|
||||
guard !finished else { return }
|
||||
finished = true
|
||||
continuation.resume(returning: response)
|
||||
invokeTask?.cancel()
|
||||
timeoutTask?.cancel()
|
||||
}
|
||||
}
|
||||
|
||||
public actor GatewayNodeSession {
|
||||
private let logger = Logger(subsystem: "com.clawdbot", category: "node.gateway")
|
||||
private let decoder = JSONDecoder()
|
||||
@@ -52,45 +23,6 @@ public actor GatewayNodeSession {
|
||||
private var onConnected: (@Sendable () async -> Void)?
|
||||
private var onDisconnected: (@Sendable (String) async -> Void)?
|
||||
private var onInvoke: (@Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse)?
|
||||
|
||||
static func invokeWithTimeout(
|
||||
request: BridgeInvokeRequest,
|
||||
timeoutMs: Int?,
|
||||
onInvoke: @escaping @Sendable (BridgeInvokeRequest) async -> BridgeInvokeResponse
|
||||
) async -> BridgeInvokeResponse {
|
||||
let timeout = max(0, timeoutMs ?? 0)
|
||||
guard timeout > 0 else {
|
||||
return await onInvoke(request)
|
||||
}
|
||||
|
||||
let cappedTimeout = min(timeout, Int(UInt64.max / 1_000_000))
|
||||
let timeoutResponse = BridgeInvokeResponse(
|
||||
id: request.id,
|
||||
ok: false,
|
||||
error: ClawdbotNodeError(
|
||||
code: .unavailable,
|
||||
message: "node invoke timed out")
|
||||
)
|
||||
|
||||
return await withCheckedContinuation { continuation in
|
||||
let race = InvokeTimeoutRace(continuation: continuation)
|
||||
let invokeTask = Task {
|
||||
let response = await onInvoke(request)
|
||||
await race.finish(response)
|
||||
}
|
||||
let timeoutTask = Task {
|
||||
do {
|
||||
try await Task.sleep(nanoseconds: UInt64(cappedTimeout) * 1_000_000)
|
||||
} catch {
|
||||
return
|
||||
}
|
||||
await race.finish(timeoutResponse)
|
||||
}
|
||||
Task {
|
||||
await race.registerTasks(invoke: invokeTask, timeout: timeoutTask)
|
||||
}
|
||||
}
|
||||
}
|
||||
private var serverEventSubscribers: [UUID: AsyncStream<EventFrame>.Continuation] = [:]
|
||||
private var canvasHostUrl: String?
|
||||
|
||||
@@ -235,11 +167,7 @@ public actor GatewayNodeSession {
|
||||
let request = try self.decoder.decode(NodeInvokeRequestPayload.self, from: data)
|
||||
guard let onInvoke else { return }
|
||||
let req = BridgeInvokeRequest(id: request.id, command: request.command, paramsJSON: request.paramsJSON)
|
||||
let response = await Self.invokeWithTimeout(
|
||||
request: req,
|
||||
timeoutMs: request.timeoutMs,
|
||||
onInvoke: onInvoke
|
||||
)
|
||||
let response = await onInvoke(req)
|
||||
await self.sendInvokeResult(request: request, response: response)
|
||||
} catch {
|
||||
self.logger.error("node invoke decode failed: \(error.localizedDescription, privacy: .public)")
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
import Foundation
|
||||
import Testing
|
||||
@testable import ClawdbotKit
|
||||
import ClawdbotProtocol
|
||||
|
||||
struct GatewayNodeSessionTests {
|
||||
@Test
|
||||
func invokeWithTimeoutReturnsUnderlyingResponseBeforeTimeout() async {
|
||||
let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil)
|
||||
let response = await GatewayNodeSession.invokeWithTimeout(
|
||||
request: request,
|
||||
timeoutMs: 50,
|
||||
onInvoke: { req in
|
||||
#expect(req.id == "1")
|
||||
return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: "{}", error: nil)
|
||||
}
|
||||
)
|
||||
|
||||
#expect(response.ok == true)
|
||||
#expect(response.error == nil)
|
||||
#expect(response.payloadJSON == "{}")
|
||||
}
|
||||
|
||||
@Test
|
||||
func invokeWithTimeoutReturnsTimeoutError() async {
|
||||
let request = BridgeInvokeRequest(id: "abc", command: "x", paramsJSON: nil)
|
||||
let response = await GatewayNodeSession.invokeWithTimeout(
|
||||
request: request,
|
||||
timeoutMs: 10,
|
||||
onInvoke: { _ in
|
||||
try? await Task.sleep(nanoseconds: 200_000_000) // 200ms
|
||||
return BridgeInvokeResponse(id: "abc", ok: true, payloadJSON: "{}", error: nil)
|
||||
}
|
||||
)
|
||||
|
||||
#expect(response.ok == false)
|
||||
#expect(response.error?.code == .unavailable)
|
||||
#expect(response.error?.message.contains("timed out") == true)
|
||||
}
|
||||
|
||||
@Test
|
||||
func invokeWithTimeoutReturnsWhenHandlerNeverCompletes() async {
|
||||
let request = BridgeInvokeRequest(id: "stall", command: "x", paramsJSON: nil)
|
||||
let response = try? await AsyncTimeout.withTimeoutMs(
|
||||
timeoutMs: 200,
|
||||
onTimeout: { NSError(domain: "GatewayNodeSessionTests", code: 1) },
|
||||
operation: {
|
||||
await GatewayNodeSession.invokeWithTimeout(
|
||||
request: request,
|
||||
timeoutMs: 10,
|
||||
onInvoke: { _ in
|
||||
await withCheckedContinuation { _ in }
|
||||
}
|
||||
)
|
||||
}
|
||||
)
|
||||
|
||||
#expect(response != nil)
|
||||
#expect(response?.ok == false)
|
||||
#expect(response?.error?.code == .unavailable)
|
||||
}
|
||||
|
||||
@Test
|
||||
func invokeWithTimeoutZeroDisablesTimeout() async {
|
||||
let request = BridgeInvokeRequest(id: "1", command: "x", paramsJSON: nil)
|
||||
let response = await GatewayNodeSession.invokeWithTimeout(
|
||||
request: request,
|
||||
timeoutMs: 0,
|
||||
onInvoke: { req in
|
||||
try? await Task.sleep(nanoseconds: 5_000_000)
|
||||
return BridgeInvokeResponse(id: req.id, ok: true, payloadJSON: nil, error: nil)
|
||||
}
|
||||
)
|
||||
|
||||
#expect(response.ok == true)
|
||||
#expect(response.error == nil)
|
||||
}
|
||||
}
|
||||
@@ -9,7 +9,6 @@ import { defaultRuntime } from "../runtime.js";
|
||||
import { formatDocsLink } from "../terminal/links.js";
|
||||
import { theme } from "../terminal/theme.js";
|
||||
import { renderTable } from "../terminal/table.js";
|
||||
import type { ChannelDirectoryEntry } from "../channels/plugins/types.core.js";
|
||||
|
||||
function parseLimit(value: unknown): number | null {
|
||||
if (typeof value === "number" && Number.isFinite(value)) {
|
||||
@@ -31,15 +30,6 @@ function buildRows(entries: Array<{ id: string; name?: string | undefined }>) {
|
||||
}));
|
||||
}
|
||||
|
||||
function formatEntry(entry: ChannelDirectoryEntry): string {
|
||||
const name = entry.name?.trim();
|
||||
const handle = entry.handle?.trim();
|
||||
const handleLabel = handle ? (handle.startsWith("@") ? handle : `@${handle}`) : null;
|
||||
const label = [name, handleLabel].filter(Boolean).join(" ");
|
||||
if (!label) return entry.id;
|
||||
return `${label} ${theme.muted(`(${entry.id})`)}`;
|
||||
}
|
||||
|
||||
export function registerDirectoryCli(program: Command) {
|
||||
const directory = program
|
||||
.command("directory")
|
||||
|
||||
@@ -7,6 +7,6 @@ describe("dns cli", () => {
|
||||
const log = vi.spyOn(console, "log").mockImplementation(() => {});
|
||||
const program = buildProgram();
|
||||
await program.parseAsync(["dns", "setup"], { from: "user" });
|
||||
expect(log).toHaveBeenCalledWith(expect.stringContaining("clawdbot.internal"));
|
||||
expect(log).toHaveBeenCalledWith(expect.stringContaining("Domain:"));
|
||||
});
|
||||
});
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { theme } from "../terminal/theme.js";
|
||||
|
||||
export type HelpExample = readonly [command: string, description: string];
|
||||
export type HelpExample = [command: string, description: string];
|
||||
|
||||
export function formatHelpExample(command: string, description: string): string {
|
||||
return ` ${theme.command(command)}\n ${theme.muted(description)}`;
|
||||
@@ -11,15 +11,11 @@ export function formatHelpExampleLine(command: string, description: string): str
|
||||
return ` ${theme.command(command)} ${theme.muted(`# ${description}`)}`;
|
||||
}
|
||||
|
||||
export function formatHelpExamples(examples: readonly HelpExample[], inline = false): string {
|
||||
export function formatHelpExamples(examples: HelpExample[], inline = false): string {
|
||||
const formatter = inline ? formatHelpExampleLine : formatHelpExample;
|
||||
return examples.map(([command, description]) => formatter(command, description)).join("\n");
|
||||
}
|
||||
|
||||
export function formatHelpExampleGroup(
|
||||
label: string,
|
||||
examples: readonly HelpExample[],
|
||||
inline = false,
|
||||
) {
|
||||
export function formatHelpExampleGroup(label: string, examples: HelpExample[], inline = false) {
|
||||
return `${theme.muted(label)}\n${formatHelpExamples(examples, inline)}`;
|
||||
}
|
||||
|
||||
@@ -46,12 +46,13 @@ function formatNodeVersions(node: {
|
||||
|
||||
function parseSinceMs(raw: unknown, label: string): number | undefined {
|
||||
if (raw === undefined || raw === null) return undefined;
|
||||
if (typeof raw !== "string" && typeof raw !== "number" && typeof raw !== "bigint") {
|
||||
defaultRuntime.error(`${label}: invalid duration`);
|
||||
const value =
|
||||
typeof raw === "string" ? raw.trim() : typeof raw === "number" ? String(raw).trim() : null;
|
||||
if (value === null) {
|
||||
defaultRuntime.error(`${label}: invalid duration value`);
|
||||
defaultRuntime.exit(1);
|
||||
return undefined;
|
||||
}
|
||||
const value = String(raw).trim();
|
||||
if (!value) return undefined;
|
||||
try {
|
||||
return parseDurationMs(value);
|
||||
|
||||
@@ -71,9 +71,7 @@ describe("pairing cli", () => {
|
||||
await program.parseAsync(["pairing", "list", "--channel", "telegram"], {
|
||||
from: "user",
|
||||
});
|
||||
const output = log.mock.calls.map(([value]) => String(value)).join("\n");
|
||||
expect(output).toContain("telegramUserId");
|
||||
expect(output).toContain("123");
|
||||
expect(log).toHaveBeenCalledWith(expect.stringContaining("telegramUserId=123"));
|
||||
});
|
||||
|
||||
it("accepts channel as positional for list", async () => {
|
||||
@@ -133,9 +131,7 @@ describe("pairing cli", () => {
|
||||
await program.parseAsync(["pairing", "list", "--channel", "discord"], {
|
||||
from: "user",
|
||||
});
|
||||
const output = log.mock.calls.map(([value]) => String(value)).join("\n");
|
||||
expect(output).toContain("discordUserId");
|
||||
expect(output).toContain("999");
|
||||
expect(log).toHaveBeenCalledWith(expect.stringContaining("discordUserId=999"));
|
||||
});
|
||||
|
||||
it("accepts channel as positional for approve (npm-run compatible)", async () => {
|
||||
|
||||
@@ -5,6 +5,8 @@ export const GATEWAY_CLIENT_IDS = {
|
||||
CLI: "cli",
|
||||
GATEWAY_CLIENT: "gateway-client",
|
||||
MACOS_APP: "clawdbot-macos",
|
||||
IOS_APP: "clawdbot-ios",
|
||||
ANDROID_APP: "clawdbot-android",
|
||||
NODE_HOST: "node-host",
|
||||
TEST: "test",
|
||||
FINGERPRINT: "fingerprint",
|
||||
|
||||
87
src/gateway/server.ios-client-id.test.ts
Normal file
87
src/gateway/server.ios-client-id.test.ts
Normal file
@@ -0,0 +1,87 @@
|
||||
import { test } from "vitest";
|
||||
import WebSocket from "ws";
|
||||
|
||||
import { PROTOCOL_VERSION } from "./protocol/index.js";
|
||||
import { getFreePort, onceMessage, startGatewayServer } from "./test-helpers.server.js";
|
||||
|
||||
function connectReq(
|
||||
ws: WebSocket,
|
||||
params: { clientId: string; platform: string; token?: string; password?: string },
|
||||
): Promise<{ ok: boolean; error?: { message?: string } }> {
|
||||
const id = `c-${Math.random().toString(16).slice(2)}`;
|
||||
ws.send(
|
||||
JSON.stringify({
|
||||
type: "req",
|
||||
id,
|
||||
method: "connect",
|
||||
params: {
|
||||
minProtocol: PROTOCOL_VERSION,
|
||||
maxProtocol: PROTOCOL_VERSION,
|
||||
client: {
|
||||
id: params.clientId,
|
||||
version: "dev",
|
||||
platform: params.platform,
|
||||
mode: "node",
|
||||
},
|
||||
auth: {
|
||||
token: params.token,
|
||||
password: params.password,
|
||||
},
|
||||
role: "node",
|
||||
scopes: [],
|
||||
caps: ["canvas"],
|
||||
commands: ["system.notify"],
|
||||
permissions: {},
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
return onceMessage(
|
||||
ws,
|
||||
(o) => (o as { type?: string }).type === "res" && (o as { id?: string }).id === id,
|
||||
);
|
||||
}
|
||||
|
||||
test("accepts clawdbot-ios as a valid gateway client id", async () => {
|
||||
const port = await getFreePort();
|
||||
const server = await startGatewayServer(port);
|
||||
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||
await new Promise<void>((resolve) => ws.once("open", resolve));
|
||||
|
||||
const res = await connectReq(ws, { clientId: "clawdbot-ios", platform: "ios" });
|
||||
// We don't care if auth fails here; we only care that schema validation accepts the client id.
|
||||
// A schema rejection would close the socket before sending a response.
|
||||
if (!res.ok) {
|
||||
// allow unauthorized error when gateway requires auth
|
||||
// but reject schema validation errors
|
||||
const message = String(res.error?.message ?? "");
|
||||
if (message.includes("invalid connect params")) {
|
||||
throw new Error(message);
|
||||
}
|
||||
}
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
});
|
||||
|
||||
test("accepts clawdbot-android as a valid gateway client id", async () => {
|
||||
const port = await getFreePort();
|
||||
const server = await startGatewayServer(port);
|
||||
const ws = new WebSocket(`ws://127.0.0.1:${port}`);
|
||||
await new Promise<void>((resolve) => ws.once("open", resolve));
|
||||
|
||||
const res = await connectReq(ws, { clientId: "clawdbot-android", platform: "android" });
|
||||
// We don't care if auth fails here; we only care that schema validation accepts the client id.
|
||||
// A schema rejection would close the socket before sending a response.
|
||||
if (!res.ok) {
|
||||
// allow unauthorized error when gateway requires auth
|
||||
// but reject schema validation errors
|
||||
const message = String(res.error?.message ?? "");
|
||||
if (message.includes("invalid connect params")) {
|
||||
throw new Error(message);
|
||||
}
|
||||
}
|
||||
|
||||
ws.close();
|
||||
await server.close();
|
||||
});
|
||||
Reference in New Issue
Block a user