-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat(sdk): add bulk replay to api and sdk #4105
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
carderne
wants to merge
12
commits into
main
Choose a base branch
from
feature/tri-6326-bulk-actions-via-the-sdkapi
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
d57949b
feat: add bulk replay to API and SDK
carderne f5cbe18
some reads from primary to avoid read-your-write lag
carderne b010042
improve errors
carderne 13071a4
add tests
carderne a687ab4
improve bulk action service
carderne cd116a5
improve api schema and types
carderne 909be39
add docs
carderne c9c208c
format
carderne 81f4c2d
format, lint
carderne 6dcdfc8
fix e2e test
carderne d789912
disallow empty filter
carderne c7f451b
simplify route reads
carderne File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| "@trigger.dev/core": patch | ||
| "@trigger.dev/sdk": patch | ||
| --- | ||
|
|
||
| Add SDK and API client helpers for run bulk actions. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| area: webapp | ||
| type: feature | ||
| --- | ||
|
|
||
| Add API and SDK support for creating, listing, retrieving, polling, and aborting run bulk actions. |
155 changes: 155 additions & 0 deletions
155
apps/webapp/app/presenters/v3/ApiBulkActionPresenter.server.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,155 @@ | ||
| import { | ||
| type BulkActionGroup, | ||
| type BulkActionStatus, | ||
| type BulkActionType, | ||
| } from "@trigger.dev/database"; | ||
| import { z } from "zod"; | ||
| import { BasePresenter } from "./basePresenter.server"; | ||
|
|
||
| const DEFAULT_PAGE_SIZE = 25; | ||
| const MAX_PAGE_SIZE = 100; | ||
|
|
||
| export const ApiBulkActionListSearchParams = z.object({ | ||
| "page[size]": z.coerce.number().int().positive().min(1).max(MAX_PAGE_SIZE).optional(), | ||
| "page[after]": z.string().optional(), | ||
| "page[before]": z.string().optional(), | ||
| }); | ||
|
|
||
| export type ApiBulkActionListSearchParams = z.infer<typeof ApiBulkActionListSearchParams>; | ||
|
|
||
| type BulkActionListCursor = { | ||
| createdAt: Date; | ||
| id: string; | ||
| }; | ||
|
|
||
| type BulkActionRow = Pick< | ||
| BulkActionGroup, | ||
| | "id" | ||
| | "friendlyId" | ||
| | "name" | ||
| | "status" | ||
| | "type" | ||
| | "createdAt" | ||
| | "completedAt" | ||
| | "totalCount" | ||
| | "successCount" | ||
| | "failureCount" | ||
| >; | ||
|
|
||
| export class ApiBulkActionPresenter extends BasePresenter { | ||
| public async list(environmentId: string, searchParams: ApiBulkActionListSearchParams) { | ||
| const pageSize = searchParams["page[size]"] ?? DEFAULT_PAGE_SIZE; | ||
| const after = searchParams["page[after]"]; | ||
| const before = searchParams["page[before]"]; | ||
|
|
||
| if (after && before) { | ||
| throw new Error("Only one of page[after] or page[before] can be provided"); | ||
| } | ||
|
|
||
| const cursor = decodeCursor(after ?? before); | ||
| const direction = before ? "backward" : "forward"; | ||
|
|
||
| const where = { | ||
| environmentId, | ||
| ...(cursor | ||
| ? direction === "forward" | ||
| ? { | ||
| OR: [ | ||
| { createdAt: { lt: cursor.createdAt } }, | ||
| { createdAt: cursor.createdAt, id: { lt: cursor.id } }, | ||
| ], | ||
| } | ||
| : { | ||
| OR: [ | ||
| { createdAt: { gt: cursor.createdAt } }, | ||
| { createdAt: cursor.createdAt, id: { gt: cursor.id } }, | ||
| ], | ||
| } | ||
| : {}), | ||
| }; | ||
|
|
||
| const rows = await this._replica.bulkActionGroup.findMany({ | ||
| select: bulkActionSelect, | ||
| where, | ||
| orderBy: | ||
| direction === "forward" | ||
| ? [{ createdAt: "desc" }, { id: "desc" }] | ||
| : [{ createdAt: "asc" }, { id: "asc" }], | ||
| take: pageSize + 1, | ||
| }); | ||
|
|
||
| const hasMore = rows.length > pageSize; | ||
| const pageRows = rows.slice(0, pageSize); | ||
| const dataRows = direction === "forward" ? pageRows : [...pageRows].reverse(); | ||
|
|
||
| const first = dataRows.at(0); | ||
| const last = dataRows.at(-1); | ||
|
|
||
| return { | ||
| data: dataRows.map(apiBulkActionObject), | ||
| pagination: { | ||
| next: last && (hasMore || direction === "backward") ? encodeCursor(last) : undefined, | ||
| previous: | ||
| first && | ||
| ((direction === "forward" && Boolean(after)) || (direction === "backward" && hasMore)) | ||
| ? encodeCursor(first) | ||
| : undefined, | ||
| }, | ||
| }; | ||
| } | ||
| } | ||
|
|
||
| export const bulkActionSelect = { | ||
| id: true, | ||
| friendlyId: true, | ||
| name: true, | ||
| status: true, | ||
| type: true, | ||
| createdAt: true, | ||
| completedAt: true, | ||
| totalCount: true, | ||
| successCount: true, | ||
| failureCount: true, | ||
| } as const; | ||
|
|
||
| export function apiBulkActionObject(row: BulkActionRow) { | ||
| return { | ||
| id: row.friendlyId, | ||
| name: row.name ?? undefined, | ||
| type: row.type as BulkActionType, | ||
| status: row.status as BulkActionStatus, | ||
| counts: { | ||
| total: row.totalCount, | ||
| success: row.successCount, | ||
| failure: row.failureCount, | ||
| }, | ||
| createdAt: row.createdAt, | ||
| completedAt: row.completedAt ?? undefined, | ||
| }; | ||
| } | ||
|
|
||
| function encodeCursor(row: Pick<BulkActionRow, "createdAt" | "id">) { | ||
| return Buffer.from(JSON.stringify({ createdAt: row.createdAt.getTime(), id: row.id })).toString( | ||
| "base64url" | ||
| ); | ||
| } | ||
|
|
||
| function decodeCursor(cursor: string | undefined): BulkActionListCursor | undefined { | ||
| if (!cursor) { | ||
| return undefined; | ||
| } | ||
|
|
||
| try { | ||
| const parsed = JSON.parse(Buffer.from(cursor, "base64url").toString("utf8")) as { | ||
| createdAt?: unknown; | ||
| id?: unknown; | ||
| }; | ||
| if (typeof parsed.createdAt !== "number" || typeof parsed.id !== "string") { | ||
| throw new Error("Invalid cursor"); | ||
| } | ||
|
|
||
| return { createdAt: new Date(parsed.createdAt), id: parsed.id }; | ||
| } catch { | ||
| throw new Error("Invalid cursor"); | ||
| } | ||
| } |
50 changes: 50 additions & 0 deletions
50
apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.abort.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| import { json } from "@remix-run/server-runtime"; | ||
| import { z } from "zod"; | ||
| import { prisma } from "~/db.server"; | ||
| import { logger } from "~/services/logger.server"; | ||
| import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; | ||
| import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server"; | ||
| import { ServiceValidationError } from "~/v3/services/common.server"; | ||
|
|
||
| const ParamsSchema = z.object({ | ||
| bulkActionId: z.string(), | ||
| }); | ||
|
|
||
| const { action } = createActionApiRoute( | ||
| { | ||
| params: ParamsSchema, | ||
| corsStrategy: "none", | ||
| authorization: { | ||
| action: "write", | ||
| resource: () => ({ type: "runs" }), | ||
| }, | ||
| // Existence/auth gate. Reads from primary so create -> abort doesn't 404 on | ||
| // replica lag; the abort write path re-reads and mutates on primary. | ||
| findResource: async (params, auth) => { | ||
| return prisma.bulkActionGroup.findFirst({ | ||
| select: { id: true }, | ||
| where: { | ||
| friendlyId: params.bulkActionId, | ||
| environmentId: auth.environment.id, | ||
| }, | ||
| }); | ||
| }, | ||
| }, | ||
| async ({ params, authentication }) => { | ||
| const service = new BulkActionService(); | ||
|
|
||
| try { | ||
| const result = await service.abort(params.bulkActionId, authentication.environment.id); | ||
| return json({ id: result.bulkActionId }); | ||
| } catch (error) { | ||
| if (error instanceof ServiceValidationError) { | ||
| return json({ error: error.message }, { status: error.status ?? 400 }); | ||
| } | ||
|
|
||
| logger.error("Failed to abort API bulk action", { error }); | ||
| return json({ error: "Failed to abort bulk action" }, { status: 500 }); | ||
| } | ||
| } | ||
| ); | ||
|
|
||
| export { action }; |
36 changes: 36 additions & 0 deletions
36
apps/webapp/app/routes/api.v1.bulk-actions.$bulkActionId.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,36 @@ | ||
| import { json } from "@remix-run/server-runtime"; | ||
| import { z } from "zod"; | ||
| import { prisma } from "~/db.server"; | ||
| import { | ||
| apiBulkActionObject, | ||
| bulkActionSelect, | ||
| } from "~/presenters/v3/ApiBulkActionPresenter.server"; | ||
| import { createLoaderApiRoute } from "~/services/routeBuilders/apiBuilder.server"; | ||
|
|
||
| const ParamsSchema = z.object({ | ||
| bulkActionId: z.string(), | ||
| }); | ||
|
|
||
| export const loader = createLoaderApiRoute( | ||
| { | ||
| params: ParamsSchema, | ||
| corsStrategy: "none", | ||
| authorization: { | ||
| action: "read", | ||
| resource: () => ({ type: "runs" }), | ||
| }, | ||
| // Read from primary so create -> retrieve/poll doesn't 404 on replica lag. | ||
| findResource: async (params, auth) => { | ||
| return prisma.bulkActionGroup.findFirst({ | ||
| select: bulkActionSelect, | ||
| where: { | ||
| friendlyId: params.bulkActionId, | ||
| environmentId: auth.environment.id, | ||
| }, | ||
| }); | ||
| }, | ||
| }, | ||
| async ({ resource }) => { | ||
| return json(apiBulkActionObject(resource)); | ||
| } | ||
| ); | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,129 @@ | ||
| import { json } from "@remix-run/server-runtime"; | ||
| import { CreateBulkActionRequestBody, type QueueTypeName } from "@trigger.dev/core/v3"; | ||
| import type { z } from "zod"; | ||
| import { | ||
| ApiBulkActionListSearchParams, | ||
| ApiBulkActionPresenter, | ||
| } from "~/presenters/v3/ApiBulkActionPresenter.server"; | ||
| import { ApiRunListPresenter } from "~/presenters/v3/ApiRunListPresenter.server"; | ||
| import { logger } from "~/services/logger.server"; | ||
| import type { RunListInputFilters } from "~/services/runsRepository/runsRepository.server"; | ||
| import { | ||
| createActionApiRoute, | ||
| createLoaderApiRoute, | ||
| } from "~/services/routeBuilders/apiBuilder.server"; | ||
| import { BulkActionService } from "~/v3/services/bulk/BulkActionV2.server"; | ||
| import { ServiceValidationError } from "~/v3/services/common.server"; | ||
|
|
||
| const MAX_CREATE_BODY_SIZE = 1024 * 1024; | ||
|
|
||
| const { action } = createActionApiRoute( | ||
| { | ||
| body: CreateBulkActionRequestBody, | ||
| maxContentLength: MAX_CREATE_BODY_SIZE, | ||
| corsStrategy: "none", | ||
| authorization: { | ||
| action: "write", | ||
| resource: () => ({ type: "runs" }), | ||
| }, | ||
| }, | ||
| async ({ body, authentication }) => { | ||
| if (!body) { | ||
| return json({ error: "Invalid request body" }, { status: 400 }); | ||
| } | ||
|
|
||
| const service = new BulkActionService(); | ||
|
|
||
| try { | ||
| const result = await service.create({ | ||
| organizationId: authentication.environment.organizationId, | ||
| projectId: authentication.environment.projectId, | ||
| environmentId: authentication.environment.id, | ||
| userId: authentication.actor?.sub ?? null, | ||
| action: body.action, | ||
| title: body.name, | ||
| region: body.targetRegion, | ||
| filters: body.runIds | ||
| ? { runId: body.runIds } | ||
| : bulkActionFilterToRunListFilters(body.filter), | ||
|
carderne marked this conversation as resolved.
|
||
| triggerSource: "api", | ||
| }); | ||
|
|
||
| return json({ id: result.bulkActionId }, { status: 202 }); | ||
| } catch (error) { | ||
| if (error instanceof ServiceValidationError) { | ||
| return json({ error: error.message }, { status: error.status ?? 400 }); | ||
| } | ||
|
|
||
| logger.error("Failed to create API bulk action", { error }); | ||
| return json({ error: "Failed to create bulk action" }, { status: 500 }); | ||
| } | ||
| } | ||
| ); | ||
|
|
||
| const loader = createLoaderApiRoute( | ||
| { | ||
| searchParams: ApiBulkActionListSearchParams, | ||
| corsStrategy: "none", | ||
| authorization: { | ||
| action: "read", | ||
| resource: () => ({ type: "runs" }), | ||
| }, | ||
| findResource: async () => 1, | ||
| }, | ||
| async ({ searchParams, authentication }) => { | ||
| const presenter = new ApiBulkActionPresenter(); | ||
| const result = await presenter.list(authentication.environment.id, searchParams); | ||
| return json(result); | ||
| } | ||
| ); | ||
|
|
||
| export { action, loader }; | ||
|
|
||
| function bulkActionFilterToRunListFilters( | ||
| filter: z.infer<typeof CreateBulkActionRequestBody>["filter"] | ||
| ): RunListInputFilters { | ||
| if (!filter) { | ||
| return {}; | ||
| } | ||
|
|
||
| const filters: RunListInputFilters = {}; | ||
|
|
||
| if (filter.status) { | ||
| filters.statuses = asArray(filter.status).flatMap((status) => | ||
| ApiRunListPresenter.apiStatusToRunStatuses(status) | ||
| ); | ||
| } | ||
|
|
||
| if (filter.taskIdentifier) filters.tasks = asArray(filter.taskIdentifier); | ||
| if (filter.version) filters.versions = asArray(filter.version); | ||
| if (filter.tag) filters.tags = asArray(filter.tag); | ||
| if (filter.bulkAction) filters.bulkId = filter.bulkAction; | ||
| if (filter.schedule) filters.scheduleId = filter.schedule; | ||
| if (filter.isTest !== undefined) filters.isTest = filter.isTest; | ||
| if (filter.from !== undefined) filters.from = dateOrNumberToMs(filter.from); | ||
| if (filter.to !== undefined) filters.to = dateOrNumberToMs(filter.to); | ||
| if (filter.period) filters.period = filter.period; | ||
| if (filter.batch) filters.batchId = filter.batch; | ||
| if (filter.queue) filters.queues = asArray(filter.queue).map(queueNameFromQueueTypeName); | ||
| if (filter.machine) filters.machines = asArray(filter.machine); | ||
| if (filter.region) filters.regions = asArray(filter.region); | ||
|
|
||
| return filters; | ||
| } | ||
|
|
||
| function asArray<T>(value: T | T[]): T[] { | ||
| return Array.isArray(value) ? value : [value]; | ||
| } | ||
|
|
||
| function dateOrNumberToMs(value: Date | number): number { | ||
| return value instanceof Date ? value.getTime() : value; | ||
| } | ||
|
|
||
| function queueNameFromQueueTypeName(queue: QueueTypeName): string { | ||
| if (queue.type === "task") { | ||
| return `task/${queue.name}`; | ||
| } | ||
|
|
||
| return queue.name; | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.