Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/bulk-actions-sdk-api.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@trigger.dev/core": patch
"@trigger.dev/sdk": patch
---

Add SDK and API client helpers for run bulk actions.
6 changes: 6 additions & 0 deletions .server-changes/bulk-actions-api-sdk.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: feature
---

Add API and SDK support for creating, listing, retrieving, polling, and aborting run bulk actions.
155 changes: 155 additions & 0 deletions apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import {
type BulkActionGroup,
type BulkActionStatus,
type BulkActionType,
} from "@trigger.dev/database";
import { z } from "zod";
import { BasePresenter } from "./basePresenter.server";

const DEFAULT_PAGE_SIZE = 25;
const MAX_PAGE_SIZE = 100;

export const ApiBulkActionListSearchParams = z.object({
"page[size]": z.coerce.number().int().positive().min(1).max(MAX_PAGE_SIZE).optional(),
"page[after]": z.string().optional(),
"page[before]": z.string().optional(),
});

export type ApiBulkActionListSearchParams = z.infer<typeof ApiBulkActionListSearchParams>;

type BulkActionListCursor = {
createdAt: Date;
id: string;
};

type BulkActionRow = Pick<
BulkActionGroup,
| "id"
| "friendlyId"
| "name"
| "status"
| "type"
| "createdAt"
| "completedAt"
| "totalCount"
| "successCount"
| "failureCount"
>;

export class ApiBulkActionPresenter extends BasePresenter {
public async list(environmentId: string, searchParams: ApiBulkActionListSearchParams) {
const pageSize = searchParams["page[size]"] ?? DEFAULT_PAGE_SIZE;
const after = searchParams["page[after]"];
const before = searchParams["page[before]"];

if (after && before) {
throw new Error("Only one of page[after] or page[before] can be provided");
}

const cursor = decodeCursor(after ?? before);
const direction = before ? "backward" : "forward";

const where = {
environmentId,
...(cursor
? direction === "forward"
? {
OR: [
{ createdAt: { lt: cursor.createdAt } },
{ createdAt: cursor.createdAt, id: { lt: cursor.id } },
],
}
: {
OR: [
{ createdAt: { gt: cursor.createdAt } },
{ createdAt: cursor.createdAt, id: { gt: cursor.id } },
],
}
: {}),
};

const rows = await this._replica.bulkActionGroup.findMany({
select: bulkActionSelect,
where,
orderBy:
direction === "forward"
? [{ createdAt: "desc" }, { id: "desc" }]
: [{ createdAt: "asc" }, { id: "asc" }],
take: pageSize + 1,
});

const hasMore = rows.length > pageSize;
const pageRows = rows.slice(0, pageSize);
const dataRows = direction === "forward" ? pageRows : [...pageRows].reverse();

const first = dataRows.at(0);
const last = dataRows.at(-1);

return {
data: dataRows.map(apiBulkActionObject),
pagination: {
next: last && (hasMore || direction === "backward") ? encodeCursor(last) : undefined,
previous:
first &&
((direction === "forward" && Boolean(after)) || (direction === "backward" && hasMore))
? encodeCursor(first)
: undefined,
},
};
}
}

export const bulkActionSelect = {
id: true,
friendlyId: true,
name: true,
status: true,
type: true,
createdAt: true,
completedAt: true,
totalCount: true,
successCount: true,
failureCount: true,
} as const;

export function apiBulkActionObject(row: BulkActionRow) {
return {
id: row.friendlyId,
name: row.name ?? undefined,
type: row.type as BulkActionType,
status: row.status as BulkActionStatus,
counts: {
total: row.totalCount,
success: row.successCount,
failure: row.failureCount,
},
createdAt: row.createdAt,
completedAt: row.completedAt ?? undefined,
};
}

function encodeCursor(row: Pick<BulkActionRow, "createdAt" | "id">) {
return Buffer.from(JSON.stringify({ createdAt: row.createdAt.getTime(), id: row.id })).toString(
"base64url"
);
}

function decodeCursor(cursor: string | undefined): BulkActionListCursor | undefined {
if (!cursor) {
return undefined;
}

try {
const parsed = JSON.parse(Buffer.from(cursor, "base64url").toString("utf8")) as {
createdAt?: unknown;
id?: unknown;
};
if (typeof parsed.createdAt !== "number" || typeof parsed.id !== "string") {
throw new Error("Invalid cursor");
}

return { createdAt: new Date(parsed.createdAt), id: parsed.id };
} catch {
throw new Error("Invalid cursor");
}
}
50 changes: 50 additions & 0 deletions apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import { logger } from "~/services/logger.server";
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server";
import { ServiceValidationError } from "~/v3/services/common.server";

const ParamsSchema = z.object({
bulkActionId: z.string(),
});

const { action } = createActionApiRoute(
{
params: ParamsSchema,
corsStrategy: "none",
authorization: {
action: "write",
resource: () => ({ type: "runs" }),
},
// Existence/auth gate. Reads from primary so create -> abort doesn't 404 on
// replica lag; the abort write path re-reads and mutates on primary.
findResource: async (params, auth) => {
return prisma.bulkActionGroup.findFirst({
select: { id: true },
where: {
friendlyId: params.bulkActionId,
environmentId: auth.environment.id,
},
});
},
},
async ({ params, authentication }) => {
const service = new BulkActionService();

try {
const result = await service.abort(params.bulkActionId, authentication.environment.id);
return json({ id: result.bulkActionId });
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: error.status ?? 400 });
}

logger.error("Failed to abort API bulk action", { error });
return json({ error: "Failed to abort bulk action" }, { status: 500 });
}
}
);

export { action };
36 changes: 36 additions & 0 deletions apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { json } from "@remix-run/server-runtime";
import { z } from "zod";
import { prisma } from "~/db.server";
import {
apiBulkActionObject,
bulkActionSelect,
} from "~/presenters/v3/ApiBulkActionPresenter.server";
import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server";

const ParamsSchema = z.object({
bulkActionId: z.string(),
});

export const loader = createLoaderApiRoute(
{
params: ParamsSchema,
corsStrategy: "none",
authorization: {
action: "read",
resource: () => ({ type: "runs" }),
},
// Read from primary so create -> retrieve/poll doesn't 404 on replica lag.
Comment thread
carderne marked this conversation as resolved.
findResource: async (params, auth) => {
return prisma.bulkActionGroup.findFirst({
select: bulkActionSelect,
where: {
friendlyId: params.bulkActionId,
environmentId: auth.environment.id,
},
});
},
},
async ({ resource }) => {
return json(apiBulkActionObject(resource));
}
);
129 changes: 129 additions & 0 deletions apps/webapp/app/routes/api.v1.bulk-actions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import { json } from "@remix-run/server-runtime";
import { CreateBulkActionRequestBody, type QueueTypeName } from "@trigger.dev/core/v3";
import type { z } from "zod";
import {
ApiBulkActionListSearchParams,
ApiBulkActionPresenter,
} from "~/presenters/v3/ApiBulkActionPresenter.server";
import { ApiRunListPresenter } from "~/presenters/v3/ApiRunListPresenter.server";
import { logger } from "~/services/logger.server";
import type { RunListInputFilters } from "~/services/runsRepository/runsRepository.server";
import {
createActionApiRoute,
createLoaderApiRoute,
} from "~/services/routeBuilders/apiBuilder.server";
import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server";
import { ServiceValidationError } from "~/v3/services/common.server";

const MAX_CREATE_BODY_SIZE = 1024 * 1024;

const { action } = createActionApiRoute(
{
body: CreateBulkActionRequestBody,
maxContentLength: MAX_CREATE_BODY_SIZE,
corsStrategy: "none",
authorization: {
action: "write",
resource: () => ({ type: "runs" }),
},
},
async ({ body, authentication }) => {
if (!body) {
return json({ error: "Invalid request body" }, { status: 400 });
}

const service = new BulkActionService();

try {
const result = await service.create({
organizationId: authentication.environment.organizationId,
projectId: authentication.environment.projectId,
environmentId: authentication.environment.id,
userId: authentication.actor?.sub ?? null,
action: body.action,
title: body.name,
region: body.targetRegion,
filters: body.runIds
? { runId: body.runIds }
: bulkActionFilterToRunListFilters(body.filter),
Comment thread
carderne marked this conversation as resolved.
triggerSource: "api",
});

return json({ id: result.bulkActionId }, { status: 202 });
} catch (error) {
if (error instanceof ServiceValidationError) {
return json({ error: error.message }, { status: error.status ?? 400 });
}

logger.error("Failed to create API bulk action", { error });
return json({ error: "Failed to create bulk action" }, { status: 500 });
}
}
);

const loader = createLoaderApiRoute(
{
searchParams: ApiBulkActionListSearchParams,
corsStrategy: "none",
authorization: {
action: "read",
resource: () => ({ type: "runs" }),
},
findResource: async () => 1,
},
async ({ searchParams, authentication }) => {
const presenter = new ApiBulkActionPresenter();
const result = await presenter.list(authentication.environment.id, searchParams);
return json(result);
}
);

export { action, loader };

function bulkActionFilterToRunListFilters(
filter: z.infer<typeof CreateBulkActionRequestBody>["filter"]
): RunListInputFilters {
if (!filter) {
return {};
}

const filters: RunListInputFilters = {};

if (filter.status) {
filters.statuses = asArray(filter.status).flatMap((status) =>
ApiRunListPresenter.apiStatusToRunStatuses(status)
);
}

if (filter.taskIdentifier) filters.tasks = asArray(filter.taskIdentifier);
if (filter.version) filters.versions = asArray(filter.version);
if (filter.tag) filters.tags = asArray(filter.tag);
if (filter.bulkAction) filters.bulkId = filter.bulkAction;
if (filter.schedule) filters.scheduleId = filter.schedule;
if (filter.isTest !== undefined) filters.isTest = filter.isTest;
if (filter.from !== undefined) filters.from = dateOrNumberToMs(filter.from);
if (filter.to !== undefined) filters.to = dateOrNumberToMs(filter.to);
if (filter.period) filters.period = filter.period;
if (filter.batch) filters.batchId = filter.batch;
if (filter.queue) filters.queues = asArray(filter.queue).map(queueNameFromQueueTypeName);
if (filter.machine) filters.machines = asArray(filter.machine);
if (filter.region) filters.regions = asArray(filter.region);

return filters;
}

function asArray<T>(value: T | T[]): T[] {
return Array.isArray(value) ? value : [value];
}

function dateOrNumberToMs(value: Date | number): number {
return value instanceof Date ? value.getTime() : value;
}

function queueNameFromQueueTypeName(queue: QueueTypeName): string {
if (queue.type === "task") {
return `task/${queue.name}`;
}

return queue.name;
}
Loading