Compare commits

...

1 Commits

Author SHA1 Message Date
Tak Hoffman
e4d90613bc fix(cron): cancel timed-out runs before side effects 2026-02-20 22:35:18 -06:00
7 changed files with 197 additions and 9 deletions

View File

@@ -359,7 +359,11 @@ async function maybeQueueSubagentAnnounce(params: {
triggerMessage: string;
summaryLine?: string;
requesterOrigin?: DeliveryContext;
signal?: AbortSignal;
}): Promise<"steered" | "queued" | "none"> {
if (params.signal?.aborted) {
return "none";
}
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
const sessionId = entry?.sessionId;
@@ -438,7 +442,14 @@ async function sendSubagentAnnounceDirectly(params: {
completionDirectOrigin?: DeliveryContext;
directOrigin?: DeliveryContext;
requesterIsSubagent: boolean;
signal?: AbortSignal;
}): Promise<SubagentAnnounceDeliveryResult> {
if (params.signal?.aborted) {
return {
delivered: false,
path: "none",
};
}
const cfg = loadConfig();
const canonicalRequesterSessionKey = resolveRequesterStoreKey(
cfg,
@@ -468,6 +479,12 @@ async function sendSubagentAnnounceDirectly(params: {
completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== ""
? String(completionDirectOrigin.threadId)
: undefined;
if (params.signal?.aborted) {
return {
delivered: false,
path: "none",
};
}
await callGateway({
method: "send",
params: {
@@ -493,6 +510,12 @@ async function sendSubagentAnnounceDirectly(params: {
directOrigin?.threadId != null && directOrigin.threadId !== ""
? String(directOrigin.threadId)
: undefined;
if (params.signal?.aborted) {
return {
delivered: false,
path: "none",
};
}
await callGateway({
method: "agent",
params: {
@@ -535,7 +558,14 @@ async function deliverSubagentAnnouncement(params: {
requesterIsSubagent: boolean;
expectsCompletionMessage: boolean;
directIdempotencyKey: string;
signal?: AbortSignal;
}): Promise<SubagentAnnounceDeliveryResult> {
if (params.signal?.aborted) {
return {
delivered: false,
path: "none",
};
}
// Non-completion mode mirrors historical behavior: try queued/steered delivery first,
// then (only if not queued) attempt direct delivery.
if (!params.expectsCompletionMessage) {
@@ -545,6 +575,7 @@ async function deliverSubagentAnnouncement(params: {
triggerMessage: params.triggerMessage,
summaryLine: params.summaryLine,
requesterOrigin: params.requesterOrigin,
signal: params.signal,
});
const queued = queueOutcomeToDeliveryResult(queueOutcome);
if (queued.delivered) {
@@ -563,6 +594,7 @@ async function deliverSubagentAnnouncement(params: {
directOrigin: params.directOrigin,
requesterIsSubagent: params.requesterIsSubagent,
expectsCompletionMessage: params.expectsCompletionMessage,
signal: params.signal,
});
if (direct.delivered || !params.expectsCompletionMessage) {
return direct;
@@ -576,6 +608,7 @@ async function deliverSubagentAnnouncement(params: {
triggerMessage: params.triggerMessage,
summaryLine: params.summaryLine,
requesterOrigin: params.requesterOrigin,
signal: params.signal,
});
if (queueOutcome === "steered" || queueOutcome === "queued") {
return queueOutcomeToDeliveryResult(queueOutcome);
@@ -724,6 +757,7 @@ export async function runSubagentAnnounceFlow(params: {
outcome?: SubagentRunOutcome;
announceType?: SubagentAnnounceType;
expectsCompletionMessage?: boolean;
signal?: AbortSignal;
}): Promise<boolean> {
let didAnnounce = false;
const expectsCompletionMessage = params.expectsCompletionMessage === true;
@@ -952,6 +986,7 @@ export async function runSubagentAnnounceFlow(params: {
requesterIsSubagent,
expectsCompletionMessage: expectsCompletionMessage,
directIdempotencyKey,
signal: params.signal,
});
didAnnounce = delivery.delivered;
if (!delivery.delivered && delivery.path === "direct" && delivery.error) {

View File

@@ -133,4 +133,56 @@ describe("runCronIsolatedAgentTurn", () => {
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
});
});
it("skips structured outbound delivery when timeout abort is already set", async () => {
await withTempCronHome(async (home) => {
const storePath = await writeSessionStore(home, {
lastProvider: "telegram",
lastChannel: "telegram",
lastTo: "123",
});
const deps: CliDeps = {
sendMessageSlack: vi.fn(),
sendMessageWhatsApp: vi.fn(),
sendMessageTelegram: vi.fn().mockResolvedValue({
messageId: "t1",
chatId: "123",
}),
sendMessageDiscord: vi.fn(),
sendMessageSignal: vi.fn(),
sendMessageIMessage: vi.fn(),
};
const controller = new AbortController();
controller.abort("cron: job execution timed out");
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
payloads: [{ text: "HEARTBEAT_OK", mediaUrl: "https://example.com/img.png" }],
meta: {
durationMs: 5,
agentMeta: { sessionId: "s", provider: "p", model: "m" },
},
});
const res = await runCronIsolatedAgentTurn({
cfg: makeCfg(home, storePath),
deps,
job: {
...makeJob({
kind: "agentTurn",
message: "do it",
}),
delivery: { mode: "announce", channel: "telegram", to: "123" },
},
message: "do it",
sessionKey: "cron:job-1",
signal: controller.signal,
lane: "cron",
});
expect(res.status).toBe("error");
expect(res.error).toContain("timed out");
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
});
});
});

View File

@@ -155,9 +155,17 @@ export async function runCronIsolatedAgentTurn(params: {
job: CronJob;
message: string;
sessionKey: string;
signal?: AbortSignal;
agentId?: string;
lane?: string;
}): Promise<RunCronAgentTurnResult> {
const isAborted = () => params.signal?.aborted === true;
const abortReason = () => {
const reason = params.signal?.reason;
return typeof reason === "string" && reason.trim()
? reason.trim()
: "cron: job execution timed out";
};
const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1";
const defaultAgentId = resolveDefaultAgentId(params.cfg);
const requestedAgentId =
@@ -503,6 +511,10 @@ export async function runCronIsolatedAgentTurn(params: {
return withRunSession({ status: "error", error: String(err) });
}
if (isAborted()) {
return withRunSession({ status: "error", error: abortReason() });
}
const payloads = runResult.payloads ?? [];
// Update token+model fields in the session store.
@@ -558,6 +570,10 @@ export async function runCronIsolatedAgentTurn(params: {
}
await persistSessionEntry();
}
if (isAborted()) {
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
}
const firstText = payloads[0]?.text ?? "";
let summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText);
let outputText = pickLastNonEmptyTextFromPayloads(payloads);
@@ -635,6 +651,9 @@ export async function runCronIsolatedAgentTurn(params: {
? [{ text: synthesizedText }]
: [];
if (payloadsForDelivery.length > 0) {
if (isAborted()) {
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
}
const deliveryResults = await deliverOutboundPayloads({
cfg: cfgWithAgentDefaults,
channel: resolvedDelivery.channel,
@@ -646,6 +665,7 @@ export async function runCronIsolatedAgentTurn(params: {
identity,
bestEffort: deliveryBestEffort,
deps: createOutboundSendDeps(params.deps),
abortSignal: params.signal,
});
delivered = deliveryResults.length > 0;
}
@@ -728,6 +748,9 @@ export async function runCronIsolatedAgentTurn(params: {
return withRunSession({ status: "ok", summary, outputText, ...telemetry });
}
try {
if (isAborted()) {
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
}
const didAnnounce = await runSubagentAnnounceFlow({
childSessionKey: agentSessionKey,
childRunId: `${params.job.id}:${runSessionId}`,
@@ -748,6 +771,7 @@ export async function runCronIsolatedAgentTurn(params: {
endedAt: runEndedAt,
outcome: { status: "ok" },
announceType: "cron job",
signal: params.signal,
});
if (didAnnounce) {
delivered = true;

View File

@@ -683,6 +683,58 @@ describe("Cron issue regressions", () => {
expect(job?.state.lastStatus).toBe("ok");
});
it("suppresses isolated follow-up side effects after timeout", async () => {
vi.useRealTimers();
const store = await makeStorePath();
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
const enqueueSystemEvent = vi.fn();
const cronJob = createIsolatedRegressionJob({
id: "timeout-side-effects",
name: "timeout side effects",
scheduledAt,
schedule: { kind: "every", everyMs: 60_000, anchorMs: scheduledAt },
payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0.01 },
state: { nextRunAtMs: scheduledAt },
});
await writeCronJobs(store.storePath, [cronJob]);
let now = scheduledAt;
const state = createCronServiceState({
cronEnabled: true,
storePath: store.storePath,
log: noopLogger,
nowMs: () => now,
enqueueSystemEvent,
requestHeartbeatNow: vi.fn(),
runIsolatedAgentJob: vi.fn(async (params: { signal?: AbortSignal }) => {
const signal = params.signal;
await new Promise<void>((resolve, reject) => {
const onAbort = () => {
signal?.removeEventListener("abort", onAbort);
now += 100;
reject(new Error("aborted"));
};
signal?.addEventListener("abort", onAbort, { once: true });
});
return {
status: "ok" as const,
summary: "late-summary",
delivered: false,
error: signal?.aborted && typeof signal.reason === "string" ? signal.reason : undefined,
};
}),
});
const timerPromise = onTimer(state);
await timerPromise;
const jobAfterTimeout = state.store?.jobs.find((j) => j.id === "timeout-side-effects");
expect(jobAfterTimeout?.state.lastStatus).toBe("error");
expect(jobAfterTimeout?.state.lastError).toContain("timed out");
expect(enqueueSystemEvent).not.toHaveBeenCalled();
});
it("retries cron schedule computation from the next second when the first attempt returns undefined (#17821)", () => {
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
const cronJob = createIsolatedRegressionJob({

View File

@@ -61,7 +61,7 @@ export type CronServiceDeps = {
wakeNowHeartbeatBusyMaxWaitMs?: number;
/** WakeMode=now: delay between runHeartbeatOnce retries while busy. */
wakeNowHeartbeatBusyRetryDelayMs?: number;
runIsolatedAgentJob: (params: { job: CronJob; message: string }) => Promise<
runIsolatedAgentJob: (params: { job: CronJob; message: string; signal?: AbortSignal }) => Promise<
{
summary?: string;
/** Last non-empty agent text output (not truncated). */

View File

@@ -45,6 +45,16 @@ function resolveRunConcurrency(state: CronServiceState): number {
}
return Math.max(1, Math.floor(raw));
}
function timeoutErrorMessage(): string {
return "cron: job execution timed out";
}
function isAbortError(err: unknown): boolean {
if (!(err instanceof Error)) {
return false;
}
return err.name === "AbortError" || err.message === timeoutErrorMessage();
}
/**
* Exponential backoff delays (in ms) indexed by consecutive error count.
* After the last entry the delay stays constant.
@@ -267,15 +277,16 @@ export async function onTimer(state: CronServiceState) {
const result =
typeof jobTimeoutMs === "number"
? await (async () => {
const timeoutController = new AbortController();
let timeoutId: NodeJS.Timeout | undefined;
try {
return await Promise.race([
executeJobCore(state, job),
executeJobCore(state, job, timeoutController.signal),
new Promise<never>((_, reject) => {
timeoutId = setTimeout(
() => reject(new Error("cron: job execution timed out")),
jobTimeoutMs,
);
timeoutId = setTimeout(() => {
timeoutController.abort(timeoutErrorMessage());
reject(new Error(timeoutErrorMessage()));
}, jobTimeoutMs);
}),
]);
} finally {
@@ -287,14 +298,15 @@ export async function onTimer(state: CronServiceState) {
: await executeJobCore(state, job);
return { jobId: id, ...result, startedAt, endedAt: state.deps.nowMs() };
} catch (err) {
const errorText = isAbortError(err) ? timeoutErrorMessage() : String(err);
state.deps.log.warn(
{ jobId: id, jobName: job.name, timeoutMs: jobTimeoutMs ?? null },
`cron: job failed: ${String(err)}`,
`cron: job failed: ${errorText}`,
);
return {
jobId: id,
status: "error",
error: String(err),
error: errorText,
startedAt,
endedAt: state.deps.nowMs(),
};
@@ -486,7 +498,11 @@ export async function runDueJobs(state: CronServiceState) {
async function executeJobCore(
state: CronServiceState,
job: CronJob,
signal?: AbortSignal,
): Promise<CronRunOutcome & CronRunTelemetry> {
if (signal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
}
if (job.sessionTarget === "main") {
const text = resolveJobPayloadTextForMain(job);
if (!text) {
@@ -513,6 +529,9 @@ async function executeJobCore(
let heartbeatResult: HeartbeatRunResult;
for (;;) {
if (signal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
}
heartbeatResult = await state.deps.runHeartbeatOnce({
reason,
agentId: job.agentId,
@@ -559,8 +578,13 @@ async function executeJobCore(
const res = await state.deps.runIsolatedAgentJob({
job,
message: job.payload.message,
signal,
});
if (signal?.aborted) {
return { status: "error", error: timeoutErrorMessage() };
}
// Post a short summary back to the main session — but only when the
// isolated run did NOT already deliver its output to the target channel.
// When `res.delivered` is true the announce flow (or direct outbound

View File

@@ -185,13 +185,14 @@ export function buildGatewayCronService(params: {
deps: { ...params.deps, runtime: defaultRuntime },
});
},
runIsolatedAgentJob: async ({ job, message }) => {
runIsolatedAgentJob: async ({ job, message, signal }) => {
const { agentId, cfg: runtimeConfig } = resolveCronAgent(job.agentId);
return await runCronIsolatedAgentTurn({
cfg: runtimeConfig,
deps: params.deps,
job,
message,
signal,
agentId,
sessionKey: `cron:${job.id}`,
lane: "cron",