diff --git a/.changeset/bulk-actions-sdk-api.md b/.changeset/bulk-actions-sdk-api.md new file mode 100644 index 00000000000..e62f8dcc4c2 --- /dev/null +++ b/.changeset/bulk-actions-sdk-api.md @@ -0,0 +1,6 @@ +--- +"@trigger.dev/core": patch +"@trigger.dev/sdk": patch +--- + +Add SDK and API client helpers for run bulk actions. diff --git a/.server-changes/bulk-actions-api-sdk.md b/.server-changes/bulk-actions-api-sdk.md new file mode 100644 index 00000000000..01da3859cbd --- /dev/null +++ b/.server-changes/bulk-actions-api-sdk.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Add API and SDK support for creating, listing, retrieving, polling, and aborting run bulk actions. diff --git a/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts new file mode 100644 index 00000000000..5e329bee404 --- /dev/null +++ b/apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts @@ -0,0 +1,155 @@ +import { + type BulkActionGroup, + type BulkActionStatus, + type BulkActionType, +} from "@trigger.dev/database"; +import { z } from "zod"; +import { BasePresenter } from "./basePresenter.server"; + +const DEFAULT_PAGE_SIZE = 25; +const MAX_PAGE_SIZE = 100; + +export const ApiBulkActionListSearchParams = z.object({ + "page[size]": z.coerce.number().int().positive().min(1).max(MAX_PAGE_SIZE).optional(), + "page[after]": z.string().optional(), + "page[before]": z.string().optional(), +}); + +export type ApiBulkActionListSearchParams = z.infer; + +type BulkActionListCursor = { + createdAt: Date; + id: string; +}; + +type BulkActionRow = Pick< + BulkActionGroup, + | "id" + | "friendlyId" + | "name" + | "status" + | "type" + | "createdAt" + | "completedAt" + | "totalCount" + | "successCount" + | "failureCount" +>; + +export class ApiBulkActionPresenter extends BasePresenter { + public async list(environmentId: string, searchParams: ApiBulkActionListSearchParams) { + const pageSize = searchParams["page[size]"] ?? DEFAULT_PAGE_SIZE; + const after = searchParams["page[after]"]; + const before = searchParams["page[before]"]; + + if (after && before) { + throw new Error("Only one of page[after] or page[before] can be provided"); + } + + const cursor = decodeCursor(after ?? before); + const direction = before ? "backward" : "forward"; + + const where = { + environmentId, + ...(cursor + ? direction === "forward" + ? { + OR: [ + { createdAt: { lt: cursor.createdAt } }, + { createdAt: cursor.createdAt, id: { lt: cursor.id } }, + ], + } + : { + OR: [ + { createdAt: { gt: cursor.createdAt } }, + { createdAt: cursor.createdAt, id: { gt: cursor.id } }, + ], + } + : {}), + }; + + const rows = await this._replica.bulkActionGroup.findMany({ + select: bulkActionSelect, + where, + orderBy: + direction === "forward" + ? [{ createdAt: "desc" }, { id: "desc" }] + : [{ createdAt: "asc" }, { id: "asc" }], + take: pageSize + 1, + }); + + const hasMore = rows.length > pageSize; + const pageRows = rows.slice(0, pageSize); + const dataRows = direction === "forward" ? pageRows : [...pageRows].reverse(); + + const first = dataRows.at(0); + const last = dataRows.at(-1); + + return { + data: dataRows.map(apiBulkActionObject), + pagination: { + next: last && (hasMore || direction === "backward") ? encodeCursor(last) : undefined, + previous: + first && + ((direction === "forward" && Boolean(after)) || (direction === "backward" && hasMore)) + ? encodeCursor(first) + : undefined, + }, + }; + } +} + +export const bulkActionSelect = { + id: true, + friendlyId: true, + name: true, + status: true, + type: true, + createdAt: true, + completedAt: true, + totalCount: true, + successCount: true, + failureCount: true, +} as const; + +export function apiBulkActionObject(row: BulkActionRow) { + return { + id: row.friendlyId, + name: row.name ?? undefined, + type: row.type as BulkActionType, + status: row.status as BulkActionStatus, + counts: { + total: row.totalCount, + success: row.successCount, + failure: row.failureCount, + }, + createdAt: row.createdAt, + completedAt: row.completedAt ?? undefined, + }; +} + +function encodeCursor(row: Pick) { + return Buffer.from(JSON.stringify({ createdAt: row.createdAt.getTime(), id: row.id })).toString( + "base64url" + ); +} + +function decodeCursor(cursor: string | undefined): BulkActionListCursor | undefined { + if (!cursor) { + return undefined; + } + + try { + const parsed = JSON.parse(Buffer.from(cursor, "base64url").toString("utf8")) as { + createdAt?: unknown; + id?: unknown; + }; + if (typeof parsed.createdAt !== "number" || typeof parsed.id !== "string") { + throw new Error("Invalid cursor"); + } + + return { createdAt: new Date(parsed.createdAt), id: parsed.id }; + } catch { + throw new Error("Invalid cursor"); + } +} diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts new file mode 100644 index 00000000000..a672faf9e41 --- /dev/null +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts @@ -0,0 +1,50 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { prisma } from "~/db.server"; +import { logger } from "~/services/logger.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server"; +import { ServiceValidationError } from "~/v3/services/common.server"; + +const ParamsSchema = z.object({ + bulkActionId: z.string(), +}); + +const { action } = createActionApiRoute( + { + params: ParamsSchema, + corsStrategy: "none", + authorization: { + action: "write", + resource: () => ({ type: "runs" }), + }, + // Existence/auth gate. Reads from primary so create -> abort doesn't 404 on + // replica lag; the abort write path re-reads and mutates on primary. + findResource: async (params, auth) => { + return prisma.bulkActionGroup.findFirst({ + select: { id: true }, + where: { + friendlyId: params.bulkActionId, + environmentId: auth.environment.id, + }, + }); + }, + }, + async ({ params, authentication }) => { + const service = new BulkActionService(); + + try { + const result = await service.abort(params.bulkActionId, authentication.environment.id); + return json({ id: result.bulkActionId }); + } catch (error) { + if (error instanceof ServiceValidationError) { + return json({ error: error.message }, { status: error.status ?? 400 }); + } + + logger.error("Failed to abort API bulk action", { error }); + return json({ error: "Failed to abort bulk action" }, { status: 500 }); + } + } +); + +export { action }; diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts new file mode 100644 index 00000000000..cfbf50e052c --- /dev/null +++ b/apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts @@ -0,0 +1,36 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { prisma } from "~/db.server"; +import { + apiBulkActionObject, + bulkActionSelect, +} from "~/presenters/v3/ApiBulkActionPresenter.server"; +import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; + +const ParamsSchema = z.object({ + bulkActionId: z.string(), +}); + +export const loader = createLoaderApiRoute( + { + params: ParamsSchema, + corsStrategy: "none", + authorization: { + action: "read", + resource: () => ({ type: "runs" }), + }, + // Read from primary so create -> retrieve/poll doesn't 404 on replica lag. + findResource: async (params, auth) => { + return prisma.bulkActionGroup.findFirst({ + select: bulkActionSelect, + where: { + friendlyId: params.bulkActionId, + environmentId: auth.environment.id, + }, + }); + }, + }, + async ({ resource }) => { + return json(apiBulkActionObject(resource)); + } +); diff --git a/apps/webapp/app/routes/api.v1.bulk-actions.ts b/apps/webapp/app/routes/api.v1.bulk-actions.ts new file mode 100644 index 00000000000..16b8b46acba --- /dev/null +++ b/apps/webapp/app/routes/api.v1.bulk-actions.ts @@ -0,0 +1,129 @@ +import { json } from "@remix-run/server-runtime"; +import { CreateBulkActionRequestBody, type QueueTypeName } from "@trigger.dev/core/v3"; +import type { z } from "zod"; +import { + ApiBulkActionListSearchParams, + ApiBulkActionPresenter, +} from "~/presenters/v3/ApiBulkActionPresenter.server"; +import { ApiRunListPresenter } from "~/presenters/v3/ApiRunListPresenter.server"; +import { logger } from "~/services/logger.server"; +import type { RunListInputFilters } from "~/services/runsRepository/runsRepository.server"; +import { + createActionApiRoute, + createLoaderApiRoute, +} from "~/services/routeBuilders/apiBuilder.server"; +import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server"; +import { ServiceValidationError } from "~/v3/services/common.server"; + +const MAX_CREATE_BODY_SIZE = 1024 * 1024; + +const { action } = createActionApiRoute( + { + body: CreateBulkActionRequestBody, + maxContentLength: MAX_CREATE_BODY_SIZE, + corsStrategy: "none", + authorization: { + action: "write", + resource: () => ({ type: "runs" }), + }, + }, + async ({ body, authentication }) => { + if (!body) { + return json({ error: "Invalid request body" }, { status: 400 }); + } + + const service = new BulkActionService(); + + try { + const result = await service.create({ + organizationId: authentication.environment.organizationId, + projectId: authentication.environment.projectId, + environmentId: authentication.environment.id, + userId: authentication.actor?.sub ?? null, + action: body.action, + title: body.name, + region: body.targetRegion, + filters: body.runIds + ? { runId: body.runIds } + : bulkActionFilterToRunListFilters(body.filter), + triggerSource: "api", + }); + + return json({ id: result.bulkActionId }, { status: 202 }); + } catch (error) { + if (error instanceof ServiceValidationError) { + return json({ error: error.message }, { status: error.status ?? 400 }); + } + + logger.error("Failed to create API bulk action", { error }); + return json({ error: "Failed to create bulk action" }, { status: 500 }); + } + } +); + +const loader = createLoaderApiRoute( + { + searchParams: ApiBulkActionListSearchParams, + corsStrategy: "none", + authorization: { + action: "read", + resource: () => ({ type: "runs" }), + }, + findResource: async () => 1, + }, + async ({ searchParams, authentication }) => { + const presenter = new ApiBulkActionPresenter(); + const result = await presenter.list(authentication.environment.id, searchParams); + return json(result); + } +); + +export { action, loader }; + +function bulkActionFilterToRunListFilters( + filter: z.infer["filter"] +): RunListInputFilters { + if (!filter) { + return {}; + } + + const filters: RunListInputFilters = {}; + + if (filter.status) { + filters.statuses = asArray(filter.status).flatMap((status) => + ApiRunListPresenter.apiStatusToRunStatuses(status) + ); + } + + if (filter.taskIdentifier) filters.tasks = asArray(filter.taskIdentifier); + if (filter.version) filters.versions = asArray(filter.version); + if (filter.tag) filters.tags = asArray(filter.tag); + if (filter.bulkAction) filters.bulkId = filter.bulkAction; + if (filter.schedule) filters.scheduleId = filter.schedule; + if (filter.isTest !== undefined) filters.isTest = filter.isTest; + if (filter.from !== undefined) filters.from = dateOrNumberToMs(filter.from); + if (filter.to !== undefined) filters.to = dateOrNumberToMs(filter.to); + if (filter.period) filters.period = filter.period; + if (filter.batch) filters.batchId = filter.batch; + if (filter.queue) filters.queues = asArray(filter.queue).map(queueNameFromQueueTypeName); + if (filter.machine) filters.machines = asArray(filter.machine); + if (filter.region) filters.regions = asArray(filter.region); + + return filters; +} + +function asArray(value: T | T[]): T[] { + return Array.isArray(value) ? value : [value]; +} + +function dateOrNumberToMs(value: Date | number): number { + return value instanceof Date ? value.getTime() : value; +} + +function queueNameFromQueueTypeName(queue: QueueTypeName): string { + if (queue.type === "task") { + return `task/${queue.name}`; + } + + return queue.name; +} diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction.tsx index 59c6da64126..5964947d816 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction.tsx @@ -51,6 +51,7 @@ import { redirectWithErrorMessage, redirectWithSuccessMessage } from "~/models/m import { resolveOrgIdFromSlug } from "~/models/organization.server"; import { findProjectBySlug } from "~/models/project.server"; import { findEnvironmentBySlug } from "~/models/runtimeEnvironment.server"; +import { getRunFiltersFromRequest } from "~/presenters/RunFilters.server"; import { CreateBulkActionPresenter } from "~/presenters/v3/CreateBulkActionPresenter.server"; import { RegionsPresenter } from "~/presenters/v3/RegionsPresenter.server"; import { RUNS_BULK_INSPECTOR_UI_SEARCH_PARAMS } from "~/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs._index/shouldRevalidateRunsList"; @@ -188,14 +189,25 @@ export const action = dashboardAction( const service = new BulkActionService(); const [error, result] = await tryCatch( - service.create( - project.organizationId, - project.id, - environment.id, - user.id, - submission.value, - request - ) + (async () => { + const filters = + submission.value.mode === "selected" + ? { runId: submission.value.selectedRunIds } + : await getRunFiltersFromRequest(request); + + return service.create({ + organizationId: project.organizationId, + projectId: project.id, + environmentId: environment.id, + userId: user.id, + action: submission.value.action, + title: submission.value.title, + region: submission.value.region, + emailNotification: submission.value.emailNotification, + filters, + triggerSource: "dashboard", + }); + })() ); if (error) { diff --git a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts index d03ab71796f..51859eeab4b 100644 --- a/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts +++ b/apps/webapp/app/v3/services/bulk/BulkActionV2.server.ts @@ -5,8 +5,6 @@ import { BulkActionType, type PrismaClient, } from "@trigger.dev/database"; -import { getRunFiltersFromRequest } from "~/presenters/RunFilters.server"; -import { type CreateBulkActionPayload } from "~/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.bulkaction"; import { clickhouseFactory } from "~/services/clickhouse/clickhouseFactoryInstance.server"; import { parseRunListInputOptions, @@ -14,18 +12,33 @@ import { RunsRepository, } from "~/services/runsRepository/runsRepository.server"; import { BaseService } from "../baseService.server"; +import { ServiceValidationError } from "../common.server"; import { commonWorker } from "~/v3/commonWorker.server"; import { env } from "~/env.server"; import { logger } from "@trigger.dev/sdk"; import { CancelTaskRunService } from "../cancelTaskRun.server"; import { tryCatch } from "@trigger.dev/core"; import { ReplayTaskRunService } from "../replayTaskRun.server"; +import { WorkerGroupService } from "../worker/workerGroupService.server"; import { timeFilters } from "~/components/runs/v3/SharedFilters"; import parseDuration from "parse-duration"; import { v3BulkActionPath } from "~/utils/pathBuilder"; import { formatDateTime } from "~/components/primitives/DateTime"; import pMap from "p-map"; +export type CreateBulkActionInput = { + organizationId: string; + projectId: string; + environmentId: string; + userId?: string | null; + action: "cancel" | "replay"; + filters: RunListInputFilters; + title?: string; + region?: string; + emailNotification?: boolean; + triggerSource?: string; +}; + export type ProcessToCompletionOptions = { /** Absolute timestamp (ms) after which processing stops and returns incomplete. */ deadline?: number; @@ -36,21 +49,33 @@ export type ProcessToCompletionResult = { }; export class BulkActionService extends BaseService { - public async create( - organizationId: string, - projectId: string, - environmentId: string, - userId: string, - payload: CreateBulkActionPayload, - request: Request - ) { - const filters = await getFilters(payload, request); + public async create(input: CreateBulkActionInput) { + const { organizationId, projectId, environmentId, userId } = input; + const filters = freezeRunListFilters(input.filters); // Region is a replay-only override that re-routes the replayed runs. It's // stored alongside the run-list filters under a dedicated key so it isn't // mistaken for a `regions` selection filter when the params are parsed. - const replayRegion = payload.action === "replay" ? payload.region : undefined; - const params = replayRegion ? { ...filters, replayRegion } : filters; + const replayRegion = input.action === "replay" ? input.region : undefined; + if (replayRegion) { + // Validating the region override up-front so an invalid/unauthorized + // region surfaces as a user-input (400) error rather than a 500. + const [regionError] = await tryCatch( + new WorkerGroupService({ prisma: this._prisma }).getDefaultWorkerGroupForProject({ + projectId, + regionOverride: replayRegion, + }) + ); + if (regionError) { + throw new ServiceValidationError(regionError.message, 400); + } + } + + const params = { + ...filters, + ...(replayRegion ? { replayRegion } : {}), + ...(input.triggerSource ? { triggerSource: input.triggerSource } : {}), + }; // Count the runs that will be affected by the bulk action const clickhouse = await clickhouseFactory.getClickhouseForOrganization( @@ -77,13 +102,13 @@ export class BulkActionService extends BaseService { projectId, environmentId, userId, - name: payload.title, - type: payload.action === "cancel" ? BulkActionType.CANCEL : BulkActionType.REPLAY, + name: input.title, + type: input.action === "cancel" ? BulkActionType.CANCEL : BulkActionType.REPLAY, params, queryName: "bulk_action_v1", totalCount: count, completionNotification: - payload.emailNotification === true + input.emailNotification === true ? BulkActionNotificationType.EMAIL : BulkActionNotificationType.NONE, }, @@ -202,6 +227,10 @@ export class BulkActionService extends BaseService { "replayRegion" in rawParams && typeof (rawParams as any).replayRegion === "string" ? (rawParams as any).replayRegion : undefined; + const triggerSource = + "triggerSource" in rawParams && typeof (rawParams as any).triggerSource === "string" + ? (rawParams as any).triggerSource + : "dashboard"; const filters = parseRunListInputOptions({ organizationId: group.project.organizationId, projectId: group.projectId, @@ -317,7 +346,7 @@ export class BulkActionService extends BaseService { const [error, result] = await tryCatch( replayService.call(run, { bulkActionId: bulkActionId, - triggerSource: "dashboard", + triggerSource, region: replayRegion, }) ); @@ -454,15 +483,15 @@ export class BulkActionService extends BaseService { }); if (!group) { - throw new Error(`Bulk action not found: ${friendlyId}`); + throw new ServiceValidationError(`Bulk action not found: ${friendlyId}`, 404); } if (group.status === BulkActionStatus.COMPLETED) { - throw new Error(`Bulk action group already completed: ${friendlyId}`); + throw new ServiceValidationError(`Bulk action group already completed: ${friendlyId}`, 409); } if (group.status === BulkActionStatus.ABORTED) { - throw new Error(`Bulk action group already aborted: ${friendlyId}`); + throw new ServiceValidationError(`Bulk action group already aborted: ${friendlyId}`, 409); } //ack the job (this doesn't guarantee it won't run again) @@ -479,28 +508,26 @@ export class BulkActionService extends BaseService { } } -async function getFilters( - payload: CreateBulkActionPayload, - request: Request -): Promise { - if (payload.mode === "selected") { - return { - runId: payload.selectedRunIds, - }; +export function freezeRunListFilters(filters: RunListInputFilters): RunListInputFilters { + const { + cursor: _cursor, + direction: _direction, + ...frozenFilters + } = filters as RunListInputFilters & { + cursor?: string; + direction?: "forward" | "backward"; + }; + + // Explicit run-id selections target specific, already-existing runs, so we + // don't apply a time bound (which could otherwise exclude a selected run). + if (frozenFilters.runId?.length) { + return frozenFilters; } - const filters = await getRunFiltersFromRequest(request); - filters.cursor = undefined; - filters.direction = undefined; - - const { - period, - from: _from, - to: _to, - } = timeFilters({ - period: filters.period, - from: filters.from, - to: filters.to, + const { period } = timeFilters({ + period: frozenFilters.period, + from: frozenFilters.from, + to: frozenFilters.to, }); // We fix the time period to a from/to date @@ -512,18 +539,18 @@ async function getFilters( const to = new Date(); const from = new Date(to.getTime() - periodMs); - filters.from = from.getTime(); - filters.to = to.getTime(); - filters.period = undefined; - return filters; + frozenFilters.from = from.getTime(); + frozenFilters.to = to.getTime(); + frozenFilters.period = undefined; + return frozenFilters; } // If no to date is set, we lock it to now - if (!filters.to) { - filters.to = Date.now(); + if (!frozenFilters.to) { + frozenFilters.to = Date.now(); } - filters.period = undefined; + frozenFilters.period = undefined; - return filters; + return frozenFilters; } diff --git a/apps/webapp/test/bulk-actions-api.e2e.full.test.ts b/apps/webapp/test/bulk-actions-api.e2e.full.test.ts new file mode 100644 index 00000000000..43be105962a --- /dev/null +++ b/apps/webapp/test/bulk-actions-api.e2e.full.test.ts @@ -0,0 +1,249 @@ +import { randomBytes } from "node:crypto"; +import { + BulkActionStatus, + BulkActionType, + type PrismaClient, + type Project, + type RuntimeEnvironment, +} from "@trigger.dev/database"; +import { describe, expect, it } from "vitest"; +import { getTestServer } from "./helpers/sharedTestServer"; +import { seedTestEnvironment } from "./helpers/seedTestEnvironment"; + +describe("Bulk actions API", () => { + it("lists bulk actions with cursor pagination", async () => { + const server = getTestServer(); + const { apiKey, project, environment } = await seedTestEnvironment(server.prisma); + + const oldest = await seedBulkAction(server.prisma, project, environment, { + name: "Oldest", + createdAt: new Date("2026-07-01T10:00:00.000Z"), + }); + const middle = await seedBulkAction(server.prisma, project, environment, { + name: "Middle", + createdAt: new Date("2026-07-01T10:01:00.000Z"), + }); + const latest = await seedBulkAction(server.prisma, project, environment, { + name: "Latest", + createdAt: new Date("2026-07-01T10:02:00.000Z"), + }); + + const firstResponse = await server.webapp.fetch("/api/v1/bulk-actions?page[size]=2", { + headers: authHeaders(apiKey), + }); + expect(firstResponse.status).toBe(200); + const firstPage = await firstResponse.json(); + expect(firstPage.data.map((item: { id: string }) => item.id)).toEqual([ + latest.friendlyId, + middle.friendlyId, + ]); + expect(firstPage.pagination.next).toEqual(expect.any(String)); + expect(firstPage.pagination.previous).toBeUndefined(); + + const secondResponse = await server.webapp.fetch( + `/api/v1/bulk-actions?page[size]=2&page[after]=${encodeURIComponent( + firstPage.pagination.next + )}`, + { headers: authHeaders(apiKey) } + ); + expect(secondResponse.status).toBe(200); + const secondPage = await secondResponse.json(); + expect(secondPage.data.map((item: { id: string }) => item.id)).toEqual([oldest.friendlyId]); + expect(secondPage.pagination.next).toBeUndefined(); + expect(secondPage.pagination.previous).toEqual(expect.any(String)); + + const previousResponse = await server.webapp.fetch( + `/api/v1/bulk-actions?page[size]=2&page[before]=${encodeURIComponent( + secondPage.pagination.previous + )}`, + { headers: authHeaders(apiKey) } + ); + expect(previousResponse.status).toBe(200); + const previousPage = await previousResponse.json(); + expect(previousPage.data.map((item: { id: string }) => item.id)).toEqual([ + latest.friendlyId, + middle.friendlyId, + ]); + }); + + it("retrieves a bulk action in the authenticated environment", async () => { + const server = getTestServer(); + const { apiKey, project, environment } = await seedTestEnvironment(server.prisma); + const bulkAction = await seedBulkAction(server.prisma, project, environment, { + name: "Retrieve me", + type: BulkActionType.REPLAY, + status: BulkActionStatus.COMPLETED, + totalCount: 4, + successCount: 3, + failureCount: 1, + completedAt: new Date("2026-07-01T10:05:00.000Z"), + }); + + const response = await server.webapp.fetch(`/api/v1/bulk-actions/${bulkAction.friendlyId}`, { + headers: authHeaders(apiKey), + }); + + expect(response.status).toBe(200); + const body = await response.json(); + expect(body).toMatchObject({ + id: bulkAction.friendlyId, + name: "Retrieve me", + type: "REPLAY", + status: "COMPLETED", + counts: { total: 4, success: 3, failure: 1 }, + }); + expect(body.createdAt).toEqual(expect.any(String)); + expect(body.completedAt).toEqual("2026-07-01T10:05:00.000Z"); + }); + + it("does not retrieve bulk actions from another environment", async () => { + const server = getTestServer(); + const a = await seedTestEnvironment(server.prisma); + const b = await seedTestEnvironment(server.prisma); + const bulkAction = await seedBulkAction(server.prisma, a.project, a.environment, { + name: "Other environment", + }); + + const response = await server.webapp.fetch(`/api/v1/bulk-actions/${bulkAction.friendlyId}`, { + headers: authHeaders(b.apiKey), + }); + + expect(response.status).toBe(404); + }); + + it("aborts a pending bulk action", async () => { + const server = getTestServer(); + const { apiKey, project, environment } = await seedTestEnvironment(server.prisma); + const bulkAction = await seedBulkAction(server.prisma, project, environment, { + status: BulkActionStatus.PENDING, + }); + + const response = await server.webapp.fetch( + `/api/v1/bulk-actions/${bulkAction.friendlyId}/abort`, + { method: "POST", headers: authHeaders(apiKey) } + ); + + expect(response.status).toBe(200); + await expect(response.json()).resolves.toEqual({ id: bulkAction.friendlyId }); + + const updated = await server.prisma.bulkActionGroup.findUniqueOrThrow({ + where: { id: bulkAction.id }, + select: { status: true }, + }); + expect(updated.status).toBe(BulkActionStatus.ABORTED); + }); + + it("returns a safe validation error when aborting a completed bulk action", async () => { + const server = getTestServer(); + const { apiKey, project, environment } = await seedTestEnvironment(server.prisma); + const bulkAction = await seedBulkAction(server.prisma, project, environment, { + status: BulkActionStatus.COMPLETED, + completedAt: new Date("2026-07-01T10:05:00.000Z"), + }); + + const response = await server.webapp.fetch( + `/api/v1/bulk-actions/${bulkAction.friendlyId}/abort`, + { method: "POST", headers: authHeaders(apiKey) } + ); + + expect(response.status).toBe(409); + const body = await response.json(); + expect(body.error).toEqual(expect.any(String)); + expect(body.error).toContain(bulkAction.friendlyId); + }); + + it("rejects create requests with both filter and runIds", async () => { + const server = getTestServer(); + const { apiKey } = await seedTestEnvironment(server.prisma); + + const response = await server.webapp.fetch("/api/v1/bulk-actions", { + method: "POST", + headers: authHeaders(apiKey), + body: JSON.stringify({ action: "cancel", filter: { status: "FAILED" }, runIds: ["run_123"] }), + }); + + expect(response.status).toBe(400); + const body = await response.json(); + expect(body.error).toContain("Exactly one of filter or runIds must be provided"); + }); + + it("rejects create requests with an empty filter", async () => { + const server = getTestServer(); + const { apiKey } = await seedTestEnvironment(server.prisma); + + const response = await server.webapp.fetch("/api/v1/bulk-actions", { + method: "POST", + headers: authHeaders(apiKey), + body: JSON.stringify({ action: "cancel", filter: {} }), + }); + + expect(response.status).toBe(400); + const body = await response.json(); + expect(body.error).toContain("At least one filter must be provided"); + }); + + it("returns a generic error for unexpected create failures", async () => { + const server = getTestServer(); + const { apiKey } = await seedTestEnvironment(server.prisma); + + const response = await server.webapp.fetch("/api/v1/bulk-actions", { + method: "POST", + headers: authHeaders(apiKey), + body: JSON.stringify({ + action: "cancel", + filter: { status: "FAILED" }, + name: "No ClickHouse in this suite", + }), + }); + + expect(response.status).toBe(500); + await expect(response.json()).resolves.toEqual({ error: "Failed to create bulk action" }); + }); +}); + +function authHeaders(apiKey: string) { + return { + Authorization: `Bearer ${apiKey}`, + "Content-Type": "application/json", + }; +} + +async function seedBulkAction( + prisma: PrismaClient, + project: Pick, + environment: Pick, + overrides: { + name?: string; + type?: BulkActionType; + status?: BulkActionStatus; + createdAt?: Date; + completedAt?: Date; + totalCount?: number; + successCount?: number; + failureCount?: number; + } = {} +) { + return prisma.bulkActionGroup.create({ + data: { + friendlyId: `bulk_${randomHex(16)}`, + projectId: project.id, + environmentId: environment.id, + name: overrides.name ?? "Test bulk action", + type: overrides.type ?? BulkActionType.CANCEL, + status: overrides.status ?? BulkActionStatus.PENDING, + queryName: "bulk_action_v1", + params: {}, + totalCount: overrides.totalCount ?? 1, + successCount: overrides.successCount ?? 0, + failureCount: overrides.failureCount ?? 0, + createdAt: overrides.createdAt, + completedAt: overrides.completedAt, + }, + }); +} + +function randomHex(length: number) { + return randomBytes(Math.ceil(length / 2)) + .toString("hex") + .slice(0, length); +} diff --git a/docs/docs.json b/docs/docs.json index f373503049c..0cb85c81380 100644 --- a/docs/docs.json +++ b/docs/docs.json @@ -333,6 +333,7 @@ "management/runs/retrieve", "management/runs/replay", "management/runs/cancel", + "management/runs/bulk-actions", "management/runs/reschedule", "management/runs/update-metadata", "management/runs/add-tags", diff --git a/docs/management/runs/bulk-actions.mdx b/docs/management/runs/bulk-actions.mdx new file mode 100644 index 00000000000..af9118ce8b6 --- /dev/null +++ b/docs/management/runs/bulk-actions.mdx @@ -0,0 +1,165 @@ +--- +title: "Bulk actions" +description: "Cancel or replay many runs from the SDK using run IDs or the same filters as runs.list()." +--- + +**Bulk actions let you cancel or replay many runs asynchronously from the SDK by selecting runs with run IDs or `runs.list()` filters.** + +A bulk action returns a handle immediately. Use the handle to retrieve progress, poll until completion, list previous actions, or abort pending work. + +## Create a bulk replay + +Use `runs.bulk.replay()` to replay every run that matches a filter. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +const action = await runs.bulk.replay({ + filter: { + status: "FAILED", + taskIdentifier: "sync-customer", + period: "24h", + }, + name: "Replay failed customer syncs", + targetRegion: "eu-central-1", +}); + +const completed = await runs.bulk.poll(action.id); +console.log(completed.status, completed.counts); +``` + +`filter` accepts the same filters as [`runs.list()`](/management/runs/list), excluding pagination fields. Provide at least one filter field; use `runIds` when you want to target specific runs. Relative time filters such as `period` are resolved when the bulk action is created, so later batches process the same fixed time range. + + + Selects runs using the same filter shape as `runs.list()`, excluding `limit`, `after`, and `before`. + + + + Selects specific run IDs. Provide either `filter` or `runIds`, not both. + + + + A name for the bulk action. + + + + Replays matching runs in a specific region. When omitted, each replay keeps the original run's region. This option is only available for `runs.bulk.replay()`. + + +## Create a bulk cancel + +Use `runs.bulk.cancel()` to cancel every run that matches a filter, or specific run IDs. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +const action = await runs.bulk.cancel({ + runIds: ["run_1234", "run_5678"], + name: "Cancel selected runs", +}); + +console.log(action.id); +``` + +Only runs that are still cancelable when the action reaches them are canceled. Runs that have already reached a final state count as failures in the bulk action summary. + +## Retrieve progress + +Use `runs.bulk.retrieve()` to read the current status and aggregate counts. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +const action = await runs.bulk.retrieve("bulk_1234"); + +console.log(action.status); +console.log(action.counts.total, action.counts.success, action.counts.failure); +``` + +The returned bulk action object has these fields: + + + The bulk action ID, starting with `bulk_`. + + + + The action being performed. + + + + The current bulk action status. + + + + Aggregate processing counts. + + + + The number of runs selected when the bulk action was created. + + + The number of runs processed successfully. + + + The number of runs that could not be processed. + + + + + + The date and time the bulk action was created. + + + + The date and time the bulk action completed. + + +## Poll for completion + +Use `runs.bulk.poll()` to wait until the bulk action leaves the `PENDING` state. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +const completed = await runs.bulk.poll("bulk_1234", { + pollIntervalMs: 2_000, +}); + +console.log(completed.status); +``` + +## Abort a bulk action + +Use `runs.bulk.abort()` to stop future batches from being processed. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +await runs.bulk.abort("bulk_1234"); +``` + +Abort is best effort. Runs already being processed in the current batch may still finish. + +## List bulk actions + +Use `runs.bulk.list()` to page through previous bulk actions in the current environment. + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +const page = await runs.bulk.list({ limit: 25 }); + +for (const action of page.data) { + console.log(action.id, action.status); +} +``` + +List results support the same auto-pagination helpers as other management API list methods: + +```ts Your backend code +import { runs } from "@trigger.dev/sdk"; + +for await (const action of runs.bulk.list({ limit: 25 })) { + console.log(action.id, action.status); +} +``` diff --git a/packages/core/src/v3/apiClient/bulkActions.test.ts b/packages/core/src/v3/apiClient/bulkActions.test.ts new file mode 100644 index 00000000000..dc2207aca10 --- /dev/null +++ b/packages/core/src/v3/apiClient/bulkActions.test.ts @@ -0,0 +1,182 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http"; +import type { AddressInfo } from "node:net"; +import { ApiClient } from "./index.js"; + +type ReceivedRequest = { + method: string; + url: string; + headers: IncomingMessage["headers"]; + body: string; +}; + +type RequestHandler = (request: ReceivedRequest, response: ServerResponse) => void | Promise; + +describe("ApiClient bulk actions", () => { + let server: Server; + let baseUrl: string; + let receivedRequests: ReceivedRequest[] = []; + let requestHandler: RequestHandler | undefined; + + beforeEach(async () => { + receivedRequests = []; + requestHandler = undefined; + + server = createServer((req, res) => { + const chunks: Buffer[] = []; + req.on("data", (chunk) => chunks.push(chunk)); + req.on("end", async () => { + const received = { + method: req.method ?? "", + url: req.url ?? "", + headers: req.headers, + body: Buffer.concat(chunks).toString(), + } satisfies ReceivedRequest; + receivedRequests.push(received); + + try { + if (requestHandler) { + await requestHandler(received, res); + } else { + json(res, { error: "No handler" }, 500); + } + } catch (error) { + json(res, { error: error instanceof Error ? error.message : String(error) }, 500); + } + }); + }); + + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => { + const address = server.address() as AddressInfo; + baseUrl = `http://127.0.0.1:${address.port}`; + resolve(); + }); + }); + }); + + afterEach(async () => { + await new Promise((resolve) => server.close(() => resolve())); + }); + + it("posts the exact create bulk action request body", async () => { + requestHandler = (_request, response) => json(response, { id: "bulk_created" }); + + const client = new ApiClient(baseUrl, "tr_test_key"); + const result = await client.createBulkAction({ + action: "replay", + filter: { status: ["FAILED"], taskIdentifier: "my-task" }, + name: "Replay failures", + targetRegion: "eu_1", + }); + + expect(result).toEqual({ id: "bulk_created" }); + expect(receivedRequests).toHaveLength(1); + expect(receivedRequests[0]?.method).toBe("POST"); + expect(receivedRequests[0]?.url).toBe("/api/v1/bulk-actions"); + expect(receivedRequests[0]?.headers.authorization).toBe("Bearer tr_test_key"); + expect(JSON.parse(receivedRequests[0]?.body ?? "{}")).toEqual({ + action: "replay", + filter: { status: ["FAILED"], taskIdentifier: "my-task" }, + name: "Replay failures", + targetRegion: "eu_1", + }); + }); + + it("lists bulk actions with cursor pagination params and parses dates", async () => { + const createdAt = "2026-07-01T10:00:00.000Z"; + const completedAt = "2026-07-01T10:05:00.000Z"; + requestHandler = (_request, response) => + json(response, { + data: [ + { + id: "bulk_listed", + name: "Cancel queued runs", + type: "CANCEL", + status: "COMPLETED", + counts: { total: 3, success: 2, failure: 1 }, + createdAt, + completedAt, + }, + ], + pagination: { next: "cursor_next", previous: "cursor_previous" }, + }); + + const client = new ApiClient(baseUrl, "tr_test_key"); + const page = await client.listBulkActions({ limit: 2, after: "cursor_after" }); + + expect(receivedRequests[0]?.method).toBe("GET"); + const url = new URL(receivedRequests[0]?.url ?? "", baseUrl); + expect(url.pathname).toBe("/api/v1/bulk-actions"); + expect(url.searchParams.get("page[size]")).toBe("2"); + expect(url.searchParams.get("page[after]")).toBe("cursor_after"); + expect(page.pagination).toEqual({ next: "cursor_next", previous: "cursor_previous" }); + expect(page.data[0]?.createdAt).toEqual(new Date(createdAt)); + expect(page.data[0]?.completedAt).toEqual(new Date(completedAt)); + }); + + it("auto-paginates bulk action lists", async () => { + requestHandler = (request, response) => { + const url = new URL(request.url, baseUrl); + if (!url.searchParams.has("page[after]")) { + return json(response, { + data: [bulkActionObject("bulk_first")], + pagination: { next: "cursor_next" }, + }); + } + + expect(url.searchParams.get("page[after]")).toBe("cursor_next"); + return json(response, { + data: [bulkActionObject("bulk_second")], + pagination: {}, + }); + }; + + const client = new ApiClient(baseUrl, "tr_test_key"); + const ids: string[] = []; + + for await (const bulkAction of client.listBulkActions({ limit: 1 })) { + ids.push(bulkAction.id); + } + + expect(ids).toEqual(["bulk_first", "bulk_second"]); + expect(receivedRequests).toHaveLength(2); + }); + + it("retrieves a bulk action by id", async () => { + requestHandler = (_request, response) => json(response, bulkActionObject("bulk_retrieve")); + + const client = new ApiClient(baseUrl, "tr_test_key"); + const bulkAction = await client.retrieveBulkAction("bulk_retrieve"); + + expect(receivedRequests[0]?.method).toBe("GET"); + expect(receivedRequests[0]?.url).toBe("/api/v1/bulk-actions/bulk_retrieve"); + expect(bulkAction.id).toBe("bulk_retrieve"); + }); + + it("aborts a bulk action by id", async () => { + requestHandler = (_request, response) => json(response, { id: "bulk_abort" }); + + const client = new ApiClient(baseUrl, "tr_test_key"); + const result = await client.abortBulkAction("bulk_abort"); + + expect(receivedRequests[0]?.method).toBe("POST"); + expect(receivedRequests[0]?.url).toBe("/api/v1/bulk-actions/bulk_abort/abort"); + expect(result).toEqual({ id: "bulk_abort" }); + }); +}); + +function json(response: ServerResponse, body: unknown, status = 200) { + response.writeHead(status, { "content-type": "application/json" }); + response.end(JSON.stringify(body)); +} + +function bulkActionObject(id: string) { + return { + id, + type: "REPLAY", + status: "PENDING", + counts: { total: 1, success: 0, failure: 0 }, + createdAt: "2026-07-01T10:00:00.000Z", + }; +} diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 3cdc78e1b31..deda5d01f9a 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -36,7 +36,10 @@ import { type UpdateScheduleOptions, type UpdateSessionRequestBody, type WaitForDurationRequestBody, + AbortBulkActionResponseBody, ApiDeploymentListResponseItem, + BulkActionObject, + CreateBulkActionResponseBody, AppendToStreamResponseBody, BatchTaskRunExecutionResult, BatchTriggerTaskV3Response, @@ -118,8 +121,10 @@ import { type SSEStreamPart, } from "./runStream.js"; import type { + CreateBulkActionOptions, CreateEnvironmentVariableParams, ImportEnvironmentVariablesParams, + ListBulkActionsQueryParams, ListProjectRunsQueryParams, ListRunsQueryParams, ListWaitpointTokensQueryParams, @@ -141,9 +146,11 @@ export type CreateBatchApiResponse = Prettify< >; export type { + CreateBulkActionOptions, CreateEnvironmentVariableParams, ImportEnvironmentVariablesParams, RealtimeRunSkipColumns, + ListBulkActionsQueryParams, SubscribeToRunsQueryParams, UpdateEnvironmentVariableParams, }; @@ -738,6 +745,64 @@ export class ApiClient { ); } + createBulkAction(options: CreateBulkActionOptions, requestOptions?: ZodFetchOptions) { + return zodfetch( + CreateBulkActionResponseBody, + `${this.baseUrl}/api/v1/bulk-actions`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify(options), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + listBulkActions( + query?: ListBulkActionsQueryParams, + requestOptions?: ZodFetchOptions + ): CursorPagePromise { + return zodfetchCursorPage( + BulkActionObject, + `${this.baseUrl}/api/v1/bulk-actions`, + { + query: new URLSearchParams(), + limit: query?.limit, + after: query?.after, + before: query?.before, + }, + { + method: "GET", + headers: this.#getHeaders(false), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + retrieveBulkAction(bulkActionId: string, requestOptions?: ZodFetchOptions) { + return zodfetch( + BulkActionObject, + `${this.baseUrl}/api/v1/bulk-actions/${bulkActionId}`, + { + method: "GET", + headers: this.#getHeaders(false), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + abortBulkAction(bulkActionId: string, requestOptions?: ZodFetchOptions) { + return zodfetch( + AbortBulkActionResponseBody, + `${this.baseUrl}/api/v1/bulk-actions/${bulkActionId}/abort`, + { + method: "POST", + headers: this.#getHeaders(false), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + resetIdempotencyKey( taskIdentifier: string, idempotencyKey: string, diff --git a/packages/core/src/v3/apiClient/types.ts b/packages/core/src/v3/apiClient/types.ts index d3ee6427032..0684118ff28 100644 --- a/packages/core/src/v3/apiClient/types.ts +++ b/packages/core/src/v3/apiClient/types.ts @@ -70,6 +70,42 @@ export interface ListProjectRunsQueryParams extends CursorPageParams, ListRunsQu env?: Array<"dev" | "staging" | "prod"> | "dev" | "staging" | "prod"; } +type RequireAtLeastOne = T & + { + [K in Keys]-?: Required>; + }[Keys]; + +/** Same filters as runs.list(), excluding pagination. */ +export type BulkActionFilter = RequireAtLeastOne>; + +export type BulkActionSelection = + | { filter: BulkActionFilter; runIds?: never } + | { runIds: string[]; filter?: never }; + +type BaseBulkActionOptions = BulkActionSelection & { + name?: string; +}; + +type TargetRegionOption = { + /** Region identifier to replay runs in. When omitted, each replay keeps the original run's region. */ + targetRegion?: string; +}; + +export type CreateBulkActionOptions = + | (BaseBulkActionOptions & { + action: "cancel"; + targetRegion?: never; + }) + | (BaseBulkActionOptions & { action: "replay" } & TargetRegionOption); + +export type CreateBulkCancelActionOptions = BaseBulkActionOptions & { + targetRegion?: never; +}; + +export type CreateBulkReplayActionOptions = BaseBulkActionOptions & TargetRegionOption; + +export type ListBulkActionsQueryParams = CursorPageParams; + export interface SubscribeToRunsQueryParams { tasks?: Array | string; tags?: Array | string; diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index e9b5b03cc64..528d899ea58 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -9,6 +9,7 @@ import { } from "./common.js"; import { BackgroundWorkerMetadata } from "./resources.js"; import { DequeuedMessage, MachineResources } from "./runEngine.js"; +import { QueueTypeName } from "./queues.js"; export const RunEngineVersion = z.union([z.literal("V1"), z.literal("V2")]); @@ -1223,6 +1224,118 @@ export const ListRunResponse = z.object({ export type ListRunResponse = z.infer; +const StringOrStringArray = z.union([z.string(), z.array(z.string())]); +const MachineOrMachineArray = z.union([MachinePresetName, z.array(MachinePresetName)]); +const QueueOrQueueArray = z.union([QueueTypeName, z.array(QueueTypeName)]); +const DateOrNumber = z.union([z.coerce.date(), z.number()]); + +const BulkActionFilterRequestBody = z + .object({ + status: z.union([RunStatus, z.array(RunStatus)]).optional(), + taskIdentifier: StringOrStringArray.optional(), + version: StringOrStringArray.optional(), + from: DateOrNumber.optional(), + to: DateOrNumber.optional(), + period: z.string().optional(), + bulkAction: z.string().optional(), + tag: StringOrStringArray.optional(), + schedule: z.string().optional(), + isTest: z.boolean().optional(), + batch: z.string().optional(), + queue: QueueOrQueueArray.optional(), + machine: MachineOrMachineArray.optional(), + region: StringOrStringArray.optional(), + }) + .refine((filter) => Object.values(filter).some(isNonEmptyBulkActionFilterValue), { + message: "At least one filter must be provided", + }); + +/** Recursively checks for at least one non-undefined, non-empty value. */ +function isNonEmptyBulkActionFilterValue(value: unknown): boolean { + if (value === undefined) { + return false; + } + + if (Array.isArray(value)) { + return value.some(isNonEmptyBulkActionFilterValue); + } + + if (typeof value === "string") { + return value.trim().length > 0; + } + + return true; +} + +const BulkActionSelectionRequestBody = { + filter: BulkActionFilterRequestBody.optional(), + runIds: z.array(z.string()).min(1).optional(), + name: z.string().optional(), +}; + +export const CreateBulkActionRequestBody = z + .discriminatedUnion("action", [ + z.object({ + action: z.literal("cancel"), + targetRegion: z.never().optional(), + ...BulkActionSelectionRequestBody, + }), + z.object({ + action: z.literal("replay"), + targetRegion: z.string().optional(), + ...BulkActionSelectionRequestBody, + }), + ]) + .refine((body) => (body.filter ? 1 : 0) + (body.runIds ? 1 : 0) === 1, { + message: "Exactly one of filter or runIds must be provided", + }); + +export type CreateBulkActionRequestBody = z.infer; + +export const BulkActionStatus = z.enum(["PENDING", "COMPLETED", "ABORTED"]); +export type BulkActionStatus = z.infer; + +export const BulkActionType = z.enum(["CANCEL", "REPLAY"]); +export type BulkActionType = z.infer; + +export const BulkActionObject = z.object({ + id: z.string(), + name: z.string().optional(), + type: BulkActionType, + status: BulkActionStatus, + counts: z.object({ + total: z.number(), + success: z.number(), + failure: z.number(), + }), + createdAt: z.coerce.date(), + completedAt: z.coerce.date().optional(), +}); + +export type BulkActionObject = z.infer; + +export const CreateBulkActionResponseBody = z.object({ + id: z.string(), +}); + +export type CreateBulkActionResponseBody = z.infer; + +export const AbortBulkActionResponseBody = z.object({ + id: z.string(), +}); + +export type AbortBulkActionResponseBody = z.infer; + +export const ListBulkActionsResponseBody = z.object({ + data: z.array(BulkActionObject), + pagination: z.object({ + next: z.string().optional(), + previous: z.string().optional(), + }), +}); + +export type ListBulkActionsResponseBody = z.infer; + export const CreateEnvironmentVariableRequestBody = z.object({ name: z.string(), value: z.string(), diff --git a/packages/trigger-sdk/src/v3/index.ts b/packages/trigger-sdk/src/v3/index.ts index f993105f0bd..4bdb582d7ae 100644 --- a/packages/trigger-sdk/src/v3/index.ts +++ b/packages/trigger-sdk/src/v3/index.ts @@ -55,6 +55,7 @@ export { type AnyRealtimeRun, type RetrieveRunResult, type AnyRetrieveRunResult, + type BulkAction, } from "./runs.js"; export * as schedules from "./schedules/index.js"; export { diff --git a/packages/trigger-sdk/src/v3/runs-bulk.test.ts b/packages/trigger-sdk/src/v3/runs-bulk.test.ts new file mode 100644 index 00000000000..c31743eddc3 --- /dev/null +++ b/packages/trigger-sdk/src/v3/runs-bulk.test.ts @@ -0,0 +1,181 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { createServer, type IncomingMessage, type Server, type ServerResponse } from "node:http"; +import type { AddressInfo } from "node:net"; +import { apiClientManager } from "@trigger.dev/core/v3"; +import { runs } from "./runs.js"; + +type ReceivedRequest = { + method: string; + url: string; + headers: IncomingMessage["headers"]; + body: string; +}; + +type RequestHandler = (request: ReceivedRequest, response: ServerResponse) => void | Promise; + +describe("runs.bulk", () => { + let server: Server; + let baseUrl: string; + let receivedRequests: ReceivedRequest[] = []; + let requestHandler: RequestHandler | undefined; + + beforeEach(async () => { + receivedRequests = []; + requestHandler = undefined; + + server = createServer((req, res) => { + const chunks: Buffer[] = []; + req.on("data", (chunk) => chunks.push(chunk)); + req.on("end", async () => { + const received = { + method: req.method ?? "", + url: req.url ?? "", + headers: req.headers, + body: Buffer.concat(chunks).toString(), + } satisfies ReceivedRequest; + receivedRequests.push(received); + + try { + if (requestHandler) { + await requestHandler(received, res); + } else { + json(res, { error: "No handler" }, 500); + } + } catch (error) { + json(res, { error: error instanceof Error ? error.message : String(error) }, 500); + } + }); + }); + + await new Promise((resolve) => { + server.listen(0, "127.0.0.1", () => { + const address = server.address() as AddressInfo; + baseUrl = `http://127.0.0.1:${address.port}`; + resolve(); + }); + }); + }); + + afterEach(async () => { + apiClientManager.disable(); + await new Promise((resolve) => server.close(() => resolve())); + }); + + it("creates a cancel bulk action", async () => { + requestHandler = (_request, response) => json(response, { id: "bulk_cancel" }); + + const result = await withApiClient(() => + runs.bulk.cancel({ runIds: ["run_1", "run_2"], name: "Cancel selected" }) + ); + + expect(result).toEqual({ id: "bulk_cancel" }); + expect(receivedRequests[0]?.method).toBe("POST"); + expect(receivedRequests[0]?.url).toBe("/api/v1/bulk-actions"); + expect(JSON.parse(receivedRequests[0]?.body ?? "{}")).toEqual({ + action: "cancel", + runIds: ["run_1", "run_2"], + name: "Cancel selected", + }); + }); + + it("creates a replay bulk action", async () => { + requestHandler = (_request, response) => json(response, { id: "bulk_replay" }); + + const result = await withApiClient(() => + runs.bulk.replay({ + filter: { status: "FAILED", taskIdentifier: ["task-a", "task-b"] }, + name: "Replay failed tasks", + targetRegion: "eu_1", + }) + ); + + expect(result).toEqual({ id: "bulk_replay" }); + expect(receivedRequests[0]?.method).toBe("POST"); + expect(receivedRequests[0]?.url).toBe("/api/v1/bulk-actions"); + expect(JSON.parse(receivedRequests[0]?.body ?? "{}")).toEqual({ + action: "replay", + filter: { status: "FAILED", taskIdentifier: ["task-a", "task-b"] }, + name: "Replay failed tasks", + targetRegion: "eu_1", + }); + }); + + it("retrieves and aborts bulk actions", async () => { + requestHandler = (request, response) => { + if (request.method === "GET") { + return json(response, bulkActionObject("bulk_read", "PENDING")); + } + + return json(response, { id: "bulk_read" }); + }; + + const retrieved = await withApiClient(() => runs.bulk.retrieve("bulk_read")); + const aborted = await withApiClient(() => runs.bulk.abort("bulk_read")); + + expect(retrieved.id).toBe("bulk_read"); + expect(retrieved.createdAt).toEqual(new Date("2026-07-01T10:00:00.000Z")); + expect(aborted).toEqual({ id: "bulk_read" }); + expect(receivedRequests.map((request) => `${request.method} ${request.url}`)).toEqual([ + "GET /api/v1/bulk-actions/bulk_read", + "POST /api/v1/bulk-actions/bulk_read/abort", + ]); + }); + + it("lists bulk actions", async () => { + requestHandler = (_request, response) => + json(response, { + data: [bulkActionObject("bulk_listed", "COMPLETED")], + pagination: { next: "cursor_next" }, + }); + + const page = await withApiClient(() => runs.bulk.list({ limit: 1, before: "cursor_before" })); + + const url = new URL(receivedRequests[0]?.url ?? "", baseUrl); + expect(receivedRequests[0]?.method).toBe("GET"); + expect(url.pathname).toBe("/api/v1/bulk-actions"); + expect(url.searchParams.get("page[size]")).toBe("1"); + expect(url.searchParams.get("page[before]")).toBe("cursor_before"); + expect(page.data[0]?.id).toBe("bulk_listed"); + expect(page.pagination.next).toBe("cursor_next"); + }); + + it("polls until the bulk action finishes", async () => { + requestHandler = (_request, response) => { + const status = receivedRequests.length === 1 ? "PENDING" : "COMPLETED"; + return json(response, bulkActionObject("bulk_poll", status)); + }; + + const bulkAction = await withApiClient(() => + runs.bulk.poll("bulk_poll", { pollIntervalMs: 1 }) + ); + + expect(bulkAction.status).toBe("COMPLETED"); + expect(receivedRequests.map((request) => request.url)).toEqual([ + "/api/v1/bulk-actions/bulk_poll", + "/api/v1/bulk-actions/bulk_poll", + ]); + }); + + function withApiClient(fn: () => Promise) { + return apiClientManager.runWithConfig( + { baseURL: baseUrl, accessToken: "tr_test_key" }, + async () => fn() + ); + } +}); + +function json(response: ServerResponse, body: unknown, status = 200) { + response.writeHead(status, { "content-type": "application/json" }); + response.end(JSON.stringify(body)); +} + +function bulkActionObject(id: string, status: "PENDING" | "COMPLETED" | "ABORTED") { + return { + id, + type: "REPLAY", + status, + counts: { total: 2, success: status === "COMPLETED" ? 2 : 0, failure: 0 }, + createdAt: "2026-07-01T10:00:00.000Z", + completedAt: status === "COMPLETED" ? "2026-07-01T10:05:00.000Z" : undefined, + }; +} diff --git a/packages/trigger-sdk/src/v3/runs.ts b/packages/trigger-sdk/src/v3/runs.ts index 1ac9582df20..3bd2a9ea7f8 100644 --- a/packages/trigger-sdk/src/v3/runs.ts +++ b/packages/trigger-sdk/src/v3/runs.ts @@ -2,7 +2,10 @@ import type { AnyRetrieveRunResult, AnyRunShape, ApiRequestOptions, + CreateBulkCancelActionOptions, + CreateBulkReplayActionOptions, InferRunTypes, + ListBulkActionsQueryParams, ListProjectRunsQueryParams, ListRunsQueryParams, RescheduleRunRequestBody, @@ -16,7 +19,10 @@ import type { AsyncIterableStream, ApiPromise, RealtimeRunSkipColumns, + AbortBulkActionResponseBody, + BulkActionObject, CanceledRunResponse, + CreateBulkActionResponseBody, CursorPagePromise, ListRunResponseItem, ReplayRunResponse, @@ -49,6 +55,14 @@ export const runs = { retrieve: retrieveRun, list: listRuns, reschedule: rescheduleRun, + bulk: { + cancel: bulkCancelRuns, + replay: bulkReplayRuns, + retrieve: retrieveBulkAction, + abort: abortBulkAction, + list: listBulkActions, + poll: pollBulkAction, + }, poll, subscribeToRun, subscribeToRunsWithTag, @@ -57,6 +71,7 @@ export const runs = { }; export type ListRunsItem = ListRunResponseItem; +export type BulkAction = BulkActionObject; function listRuns( projectRef: string, @@ -278,6 +293,139 @@ function cancelRun( return apiClient.cancelRun(runId, $requestOptions); } +function bulkCancelRuns( + options: CreateBulkCancelActionOptions, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "runs.bulk.cancel()", + icon: "runs", + attributes: { + ...flattenAttributes(options as Record, "bulkAction"), + }, + }, + requestOptions + ); + + return apiClient.createBulkAction({ ...options, action: "cancel" }, $requestOptions); +} + +function bulkReplayRuns( + options: CreateBulkReplayActionOptions, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "runs.bulk.replay()", + icon: "runs", + attributes: { + ...flattenAttributes(options as Record, "bulkAction"), + }, + }, + requestOptions + ); + + return apiClient.createBulkAction({ ...options, action: "replay" }, $requestOptions); +} + +function retrieveBulkAction( + bulkActionId: string, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "runs.bulk.retrieve()", + icon: "runs", + attributes: { + bulkActionId, + ...accessoryAttributes({ + items: [{ text: bulkActionId, variant: "normal" }], + style: "codepath", + }), + }, + }, + requestOptions + ); + + return apiClient.retrieveBulkAction(bulkActionId, $requestOptions); +} + +function abortBulkAction( + bulkActionId: string, + requestOptions?: ApiRequestOptions +): ApiPromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "runs.bulk.abort()", + icon: "runs", + attributes: { + bulkActionId, + ...accessoryAttributes({ + items: [{ text: bulkActionId, variant: "normal" }], + style: "codepath", + }), + }, + }, + requestOptions + ); + + return apiClient.abortBulkAction(bulkActionId, $requestOptions); +} + +function listBulkActions( + params?: ListBulkActionsQueryParams, + requestOptions?: ApiRequestOptions +): CursorPagePromise { + const apiClient = apiClientManager.clientOrThrow(); + + const $requestOptions = mergeRequestOptions( + { + tracer, + name: "runs.bulk.list()", + icon: "runs", + attributes: { + ...flattenAttributes(params as Record, "queryParams"), + }, + }, + requestOptions + ); + + return apiClient.listBulkActions(params, $requestOptions); +} + +async function pollBulkAction( + bulkActionId: string, + options?: { pollIntervalMs?: number }, + requestOptions?: ApiRequestOptions +): Promise { + let attempts = 0; + + while (attempts++ < MAX_POLL_ATTEMPTS) { + const bulkAction = await retrieveBulkAction(bulkActionId, requestOptions); + + if (bulkAction.status !== "PENDING") { + return bulkAction; + } + + await new Promise((resolve) => setTimeout(resolve, options?.pollIntervalMs ?? 1000)); + } + + throw new Error(`Bulk action ${bulkActionId} did not finish after ${MAX_POLL_ATTEMPTS} attempts`); +} + function rescheduleRun( runId: string, body: RescheduleRunRequestBody,