diff --git a/.changeset/input-stream-wait.md b/.changeset/input-stream-wait.md new file mode 100644 index 00000000000..c4f4b923352 --- /dev/null +++ b/.changeset/input-stream-wait.md @@ -0,0 +1,8 @@ +--- +"@trigger.dev/sdk": patch +"@trigger.dev/react-hooks": patch +--- + +Add input streams for bidirectional communication with running tasks. Define typed input streams with `streams.input({ id })`, then consume inside tasks via `.wait()` (suspends the process), `.once()` (waits for next message), or `.on()` (subscribes to a continuous stream). Send data from backends with `.send(runId, data)` or from frontends with the new `useInputStreamSend` React hook. + +Upgrade S2 SDK from 0.17 to 0.22 with support for custom endpoints (s2-lite) via the new `endpoints` configuration, `AppendRecord.string()` API, and `maxInflightBytes` session option. diff --git a/.gitignore b/.gitignore index 071b9b59035..5f6adddba0a 100644 --- a/.gitignore +++ b/.gitignore @@ -67,4 +67,5 @@ apps/**/public/build **/.claude/settings.local.json .mcp.log .mcp.json -.cursor/debug.log \ No newline at end of file +.cursor/debug.log +ailogger-output.log \ No newline at end of file diff --git a/.server-changes/input-stream-wait.md b/.server-changes/input-stream-wait.md new file mode 100644 index 00000000000..e846fc3517c --- /dev/null +++ b/.server-changes/input-stream-wait.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: feature +--- + +Add input streams with API routes for sending data to running tasks, SSE reading, and waitpoint creation. Includes Redis cache for fast `.send()` to `.wait()` bridging, dashboard span support for input stream operations, and s2-lite support with configurable S2 endpoint, access token skipping, and S2-Basin headers for self-hosted deployments. Adds s2-lite to Docker Compose for local development. diff --git a/apps/webapp/app/env.server.ts b/apps/webapp/app/env.server.ts index fa019f2f75e..7868a38ac56 100644 --- a/apps/webapp/app/env.server.ts +++ b/apps/webapp/app/env.server.ts @@ -39,6 +39,7 @@ const S2EnvSchema = z.preprocess( S2_ENABLED: z.literal("1"), S2_ACCESS_TOKEN: z.string(), S2_DEPLOYMENT_LOGS_BASIN_NAME: z.string(), + S2_DEPLOYMENT_STREAMS_LOCAL: z.string().default("0"), }), z.object({ S2_ENABLED: z.literal("0"), @@ -1344,6 +1345,8 @@ const EnvironmentSchema = z REALTIME_STREAMS_S2_BASIN: z.string().optional(), REALTIME_STREAMS_S2_ACCESS_TOKEN: z.string().optional(), + REALTIME_STREAMS_S2_ENDPOINT: z.string().optional(), + REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS: z.enum(["true", "false"]).default("false"), REALTIME_STREAMS_S2_ACCESS_TOKEN_EXPIRATION_IN_MS: z.coerce .number() .int() diff --git a/apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts b/apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts index 8d80112de9e..9a15f8b4ac0 100644 --- a/apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/DeploymentPresenter.server.ts @@ -217,7 +217,7 @@ export class DeploymentPresenter { let eventStream = undefined; if ( env.S2_ENABLED === "1" && - (buildServerMetadata || gitMetadata?.source === "trigger_github_app") + (buildServerMetadata || gitMetadata?.source === "trigger_github_app" || env.S2_DEPLOYMENT_STREAMS_LOCAL === "1") ) { const [error, accessToken] = await tryCatch(this.getS2AccessToken(project.externalRef)); @@ -290,9 +290,9 @@ export class DeploymentPresenter { return cachedToken; } - const { access_token: accessToken } = await s2.accessTokens.issue({ + const { accessToken } = await s2.accessTokens.issue({ id: `${projectRef}-${new Date().getTime()}`, - expires_at: new Date(Date.now() + 60 * 60 * 1000).toISOString(), // 1 hour + expiresAt: new Date(Date.now() + 60 * 60 * 1000), // 1 hour scope: { ops: ["read"], basins: { diff --git a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts index a85d8b20dd2..ce83c2e242b 100644 --- a/apps/webapp/app/presenters/v3/SpanPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/SpanPresenter.server.ts @@ -629,6 +629,41 @@ export class SpanPresenter extends BasePresenter { }, }; } + case "input-stream": { + if (!span.entity.id) { + logger.error(`SpanPresenter: No input stream id`, { + spanId, + inputStreamId: span.entity.id, + }); + return { ...data, entity: null }; + } + + const [runId, streamId] = span.entity.id.split(":"); + + if (!runId || !streamId) { + logger.error(`SpanPresenter: Invalid input stream id`, { + spanId, + inputStreamId: span.entity.id, + }); + return { ...data, entity: null }; + } + + // Translate user-facing stream ID to internal S2 stream name + const s2StreamKey = `$trigger.input:${streamId}`; + + return { + ...data, + entity: { + type: "realtime-stream" as const, + object: { + runId, + streamKey: s2StreamKey, + displayName: streamId, + metadata: undefined, + }, + }, + }; + } default: return { ...data, entity: null }; } diff --git a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments.$deploymentParam/route.tsx b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments.$deploymentParam/route.tsx index 4a7d1df5ce8..36ca0f335e8 100644 --- a/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments.$deploymentParam/route.tsx +++ b/apps/webapp/app/routes/_app.orgs.$organizationSlug.projects.$projectParam.env.$envParam.deployments.$deploymentParam/route.tsx @@ -198,17 +198,14 @@ export default function Page() { const readSession = await stream.readSession( { - seq_num: 0, - wait: 60, - as: "bytes", + start: { from: { seqNum: 0 }, clamp: true }, + stop: { waitSecs: 60 }, }, { signal: abortController.signal } ); - const decoder = new TextDecoder(); - for await (const record of readSession) { - const decoded = decoder.decode(record.body); + const decoded = record.body; const result = DeploymentEventFromString.safeParse(decoded); if (!result.success) { @@ -217,8 +214,8 @@ export default function Page() { const headers: Record = {}; if (record.headers) { - for (const [nameBytes, valueBytes] of record.headers) { - headers[decoder.decode(nameBytes)] = decoder.decode(valueBytes); + for (const [name, value] of record.headers) { + headers[name] = value; } } const level = (headers["level"]?.toLowerCase() as LogEntry["level"]) ?? "info"; diff --git a/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts new file mode 100644 index 00000000000..8e41e9fe4c8 --- /dev/null +++ b/apps/webapp/app/routes/api.v1.runs.$runFriendlyId.input-streams.wait.ts @@ -0,0 +1,148 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { + CreateInputStreamWaitpointRequestBody, + type CreateInputStreamWaitpointResponseBody, +} from "@trigger.dev/core/v3"; +import { WaitpointId } from "@trigger.dev/core/v3/isomorphic"; +import { $replica } from "~/db.server"; +import { createWaitpointTag, MAX_TAGS_PER_WAITPOINT } from "~/models/waitpointTag.server"; +import { + deleteInputStreamWaitpoint, + setInputStreamWaitpoint, +} from "~/services/inputStreamWaitpointCache.server"; +import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; +import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server"; +import { parseDelay } from "~/utils/delays"; +import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server"; +import { engine } from "~/v3/runEngine.server"; +import { ServiceValidationError } from "~/v3/services/baseService.server"; + +const ParamsSchema = z.object({ + runFriendlyId: z.string(), +}); + +const { action, loader } = createActionApiRoute( + { + params: ParamsSchema, + body: CreateInputStreamWaitpointRequestBody, + maxContentLength: 1024 * 10, // 10KB + method: "POST", + }, + async ({ authentication, body, params }) => { + try { + const run = await $replica.taskRun.findFirst({ + where: { + friendlyId: params.runFriendlyId, + runtimeEnvironmentId: authentication.environment.id, + }, + select: { + id: true, + friendlyId: true, + realtimeStreamsVersion: true, + }, + }); + + if (!run) { + return json({ error: "Run not found" }, { status: 404 }); + } + + const idempotencyKeyExpiresAt = body.idempotencyKeyTTL + ? resolveIdempotencyKeyTTL(body.idempotencyKeyTTL) + : undefined; + + const timeout = await parseDelay(body.timeout); + + // Process tags (same pattern as api.v1.waitpoints.tokens.ts) + const bodyTags = typeof body.tags === "string" ? [body.tags] : body.tags; + + if (bodyTags && bodyTags.length > MAX_TAGS_PER_WAITPOINT) { + throw new ServiceValidationError( + `Waitpoints can only have ${MAX_TAGS_PER_WAITPOINT} tags, you're trying to set ${bodyTags.length}.` + ); + } + + if (bodyTags && bodyTags.length > 0) { + for (const tag of bodyTags) { + await createWaitpointTag({ + tag, + environmentId: authentication.environment.id, + projectId: authentication.environment.projectId, + }); + } + } + + // Step 1: Create the waitpoint + const result = await engine.createManualWaitpoint({ + environmentId: authentication.environment.id, + projectId: authentication.environment.projectId, + idempotencyKey: body.idempotencyKey, + idempotencyKeyExpiresAt, + timeout, + tags: bodyTags, + }); + + // Step 2: Cache the mapping in Redis for fast lookup from .send() + const ttlMs = timeout ? timeout.getTime() - Date.now() : undefined; + await setInputStreamWaitpoint( + run.friendlyId, + body.streamId, + result.waitpoint.id, + ttlMs && ttlMs > 0 ? ttlMs : undefined + ); + + // Step 3: Check if data was already sent to this input stream (race condition handling). + // If .send() landed before .wait(), the data is in the S2 stream but no waitpoint + // existed to complete. We check from the client's last known position. + if (!result.isCached) { + try { + const realtimeStream = getRealtimeStreamInstance( + authentication.environment, + run.realtimeStreamsVersion + ); + + const records = await realtimeStream.readRecords( + run.friendlyId, + `$trigger.input:${body.streamId}`, + body.lastSeqNum + ); + + if (records.length > 0) { + const record = records[0]!; + + // Record data is the raw user payload — no wrapper to unwrap + await engine.completeWaitpoint({ + id: result.waitpoint.id, + output: { + value: record.data, + type: "application/json", + isError: false, + }, + }); + + // Clean up the Redis cache since we completed it ourselves + await deleteInputStreamWaitpoint(run.friendlyId, body.streamId); + } + } catch { + // Non-fatal: if the S2 check fails, the waitpoint is still PENDING. + // The next .send() will complete it via the Redis cache path. + } + } + + return json({ + waitpointId: WaitpointId.toFriendlyId(result.waitpoint.id), + isCached: result.isCached, + }); + } catch (error) { + if (error instanceof ServiceValidationError) { + return json({ error: error.message }, { status: 422 }); + } else if (error instanceof Error) { + return json({ error: error.message }, { status: 500 }); + } + + return json({ error: "Something went wrong" }, { status: 500 }); + } + } +); + +export { action, loader }; diff --git a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts index 0c188c17768..d55c3659eea 100644 --- a/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts +++ b/apps/webapp/app/routes/api.v1.tasks.$taskId.trigger.ts @@ -166,7 +166,7 @@ async function responseHeaders( const claims = { sub: environment.id, pub: true, - scopes: [`read:runs:${run.friendlyId}`], + scopes: [`read:runs:${run.friendlyId}`, `write:inputStreams:${run.friendlyId}`], realtime, }; diff --git a/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts new file mode 100644 index 00000000000..98c348a023a --- /dev/null +++ b/apps/webapp/app/routes/realtime.v1.streams.$runId.input.$streamId.ts @@ -0,0 +1,174 @@ +import { json } from "@remix-run/server-runtime"; +import { z } from "zod"; +import { $replica } from "~/db.server"; +import { + getInputStreamWaitpoint, + deleteInputStreamWaitpoint, +} from "~/services/inputStreamWaitpointCache.server"; +import { + createActionApiRoute, + createLoaderApiRoute, +} from "~/services/routeBuilders/apiBuilder.server"; +import { getRealtimeStreamInstance } from "~/services/realtime/v1StreamsGlobal.server"; +import { engine } from "~/v3/runEngine.server"; + +const ParamsSchema = z.object({ + runId: z.string(), + streamId: z.string(), +}); + +const BodySchema = z.object({ + data: z.unknown(), +}); + +// POST: Send data to an input stream +const { action } = createActionApiRoute( + { + params: ParamsSchema, + maxContentLength: 1024 * 1024, // 1MB max + allowJWT: true, + corsStrategy: "all", + authorization: { + action: "write", + resource: (params) => ({ inputStreams: params.runId }), + superScopes: ["write:inputStreams", "write:all", "admin"], + }, + }, + async ({ request, params, authentication }) => { + const run = await $replica.taskRun.findFirst({ + where: { + friendlyId: params.runId, + runtimeEnvironmentId: authentication.environment.id, + }, + select: { + id: true, + friendlyId: true, + completedAt: true, + realtimeStreamsVersion: true, + }, + }); + + if (!run) { + return json({ ok: false, error: "Run not found" }, { status: 404 }); + } + + if (run.completedAt) { + return json( + { ok: false, error: "Cannot send to input stream on a completed run" }, + { status: 400 } + ); + } + + const body = BodySchema.safeParse(await request.json()); + + if (!body.success) { + return json({ ok: false, error: "Invalid request body" }, { status: 400 }); + } + + const realtimeStream = getRealtimeStreamInstance( + authentication.environment, + run.realtimeStreamsVersion + ); + + // Build the input stream record (raw user data, no wrapper) + const recordId = `inp_${crypto.randomUUID().replace(/-/g, "").slice(0, 12)}`; + const record = JSON.stringify(body.data.data); + + // Append the record to the per-stream S2 stream (auto-creates on first write) + await realtimeStream.appendPart( + record, + recordId, + run.friendlyId, + `$trigger.input:${params.streamId}` + ); + + // Check Redis cache for a linked .wait() waitpoint (fast, no DB hit if none) + // Get first, complete, then delete — so the mapping survives if completeWaitpoint throws + const waitpointId = await getInputStreamWaitpoint(params.runId, params.streamId); + if (waitpointId) { + await engine.completeWaitpoint({ + id: waitpointId, + output: { + value: JSON.stringify(body.data.data), + type: "application/json", + isError: false, + }, + }); + await deleteInputStreamWaitpoint(params.runId, params.streamId); + } + + return json({ ok: true }); + } +); + +// GET: SSE stream for reading input stream data (used by the in-task SSE tail) +const loader = createLoaderApiRoute( + { + params: ParamsSchema, + allowJWT: true, + corsStrategy: "all", + findResource: async (params, auth) => { + return $replica.taskRun.findFirst({ + where: { + friendlyId: params.runId, + runtimeEnvironmentId: auth.environment.id, + }, + include: { + batch: { + select: { + friendlyId: true, + }, + }, + }, + }); + }, + authorization: { + action: "read", + resource: (run) => ({ + runs: run.friendlyId, + tags: run.runTags, + batch: run.batch?.friendlyId, + tasks: run.taskIdentifier, + }), + superScopes: ["read:runs", "read:all", "admin"], + }, + }, + async ({ params, request, resource: run, authentication }) => { + const lastEventId = request.headers.get("Last-Event-ID") || undefined; + + const timeoutInSecondsRaw = request.headers.get("Timeout-Seconds") ?? undefined; + const timeoutInSeconds = + timeoutInSecondsRaw !== undefined ? parseInt(timeoutInSecondsRaw, 10) : undefined; + + if (timeoutInSeconds !== undefined && isNaN(timeoutInSeconds)) { + return new Response("Invalid timeout seconds", { status: 400 }); + } + + if (timeoutInSeconds !== undefined && timeoutInSeconds < 1) { + return new Response("Timeout seconds must be greater than 0", { status: 400 }); + } + + if (timeoutInSeconds !== undefined && timeoutInSeconds > 600) { + return new Response("Timeout seconds must be less than 600", { status: 400 }); + } + + const realtimeStream = getRealtimeStreamInstance( + authentication.environment, + run.realtimeStreamsVersion + ); + + // Read from the internal S2 stream name (prefixed to avoid user stream collisions) + return realtimeStream.streamResponse( + request, + run.friendlyId, + `$trigger.input:${params.streamId}`, + request.signal, + { + lastEventId, + timeoutInSeconds, + } + ); + } +); + +export { action, loader }; diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx index a78c95d6036..e46eaa5148f 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.spans.$spanParam/route.tsx @@ -1348,6 +1348,7 @@ function SpanEntity({ span }: { span: Span }) { runId={span.entity.object.runId} streamKey={span.entity.object.streamKey} metadata={span.entity.object.metadata} + displayName={span.entity.object.displayName} /> ); } diff --git a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx index a1d94159700..b5763bb4e9c 100644 --- a/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx +++ b/apps/webapp/app/routes/resources.orgs.$organizationSlug.projects.$projectParam.env.$envParam.runs.$runParam.streams.$streamKey/route.tsx @@ -98,10 +98,12 @@ export function RealtimeStreamViewer({ runId, streamKey, metadata, + displayName, }: { runId: string; streamKey: string; metadata: Record | undefined; + displayName?: string; }) { const organization = useOrganization(); const project = useProject(); @@ -244,8 +246,8 @@ export function RealtimeStreamViewer({ variant="small/bright" className="mb-0 flex min-w-0 items-center gap-1 truncate whitespace-nowrap" > - Stream: - {streamKey} + {displayName ? "Input stream:" : "Stream:"} + {displayName ?? streamKey}
@@ -487,6 +489,9 @@ function useRealtimeStream(resourcePath: string, startIndex?: number) { const [isConnected, setIsConnected] = useState(false); useEffect(() => { + setChunks([]); + setError(null); + const abortController = new AbortController(); let reader: ReadableStreamDefaultReader> | null = null; diff --git a/apps/webapp/app/services/authorization.server.ts b/apps/webapp/app/services/authorization.server.ts index 15f85cc3278..2ea410e2c1d 100644 --- a/apps/webapp/app/services/authorization.server.ts +++ b/apps/webapp/app/services/authorization.server.ts @@ -1,6 +1,6 @@ export type AuthorizationAction = "read" | "write" | string; // Add more actions as needed -const ResourceTypes = ["tasks", "tags", "runs", "batch", "waitpoints", "deployments"] as const; +const ResourceTypes = ["tasks", "tags", "runs", "batch", "waitpoints", "deployments", "inputStreams"] as const; export type AuthorizationResources = { [key in (typeof ResourceTypes)[number]]?: string | string[]; diff --git a/apps/webapp/app/services/inputStreamWaitpointCache.server.ts b/apps/webapp/app/services/inputStreamWaitpointCache.server.ts new file mode 100644 index 00000000000..ab360c837a8 --- /dev/null +++ b/apps/webapp/app/services/inputStreamWaitpointCache.server.ts @@ -0,0 +1,100 @@ +import { Redis } from "ioredis"; +import { env } from "~/env.server"; +import { singleton } from "~/utils/singleton"; +import { logger } from "./logger.server"; + +const KEY_PREFIX = "isw:"; +const DEFAULT_TTL_MS = 7 * 24 * 60 * 60 * 1000; // 7 days + +function buildKey(runFriendlyId: string, streamId: string): string { + return `${KEY_PREFIX}${runFriendlyId}:${streamId}`; +} + +function initializeRedis(): Redis | undefined { + const host = env.CACHE_REDIS_HOST; + if (!host) { + return undefined; + } + + return new Redis({ + connectionName: "inputStreamWaitpointCache", + host, + port: env.CACHE_REDIS_PORT, + username: env.CACHE_REDIS_USERNAME, + password: env.CACHE_REDIS_PASSWORD, + keyPrefix: "tr:", + enableAutoPipelining: true, + ...(env.CACHE_REDIS_TLS_DISABLED === "true" ? {} : { tls: {} }), + }); +} + +const redis = singleton("inputStreamWaitpointCache", initializeRedis); + +/** + * Store a mapping from input stream to waitpoint ID in Redis. + * Called when `.wait()` creates a new waitpoint. + */ +export async function setInputStreamWaitpoint( + runFriendlyId: string, + streamId: string, + waitpointId: string, + ttlMs?: number +): Promise { + if (!redis) return; + + try { + const key = buildKey(runFriendlyId, streamId); + await redis.set(key, waitpointId, "PX", ttlMs ?? DEFAULT_TTL_MS); + } catch (error) { + logger.error("Failed to set input stream waitpoint cache", { + runFriendlyId, + streamId, + error, + }); + } +} + +/** + * Get the waitpoint ID for an input stream without deleting it. + * Called from the `.send()` route before completing the waitpoint. + */ +export async function getInputStreamWaitpoint( + runFriendlyId: string, + streamId: string +): Promise { + if (!redis) return null; + + try { + const key = buildKey(runFriendlyId, streamId); + return await redis.get(key); + } catch (error) { + logger.error("Failed to get input stream waitpoint cache", { + runFriendlyId, + streamId, + error, + }); + return null; + } +} + +/** + * Delete the cache entry for an input stream waitpoint. + * Called when a waitpoint is completed or timed out. + */ +export async function deleteInputStreamWaitpoint( + runFriendlyId: string, + streamId: string +): Promise { + if (!redis) return; + + try { + const key = buildKey(runFriendlyId, streamId); + await redis.del(key); + } catch (error) { + logger.error("Failed to delete input stream waitpoint cache", { + runFriendlyId, + streamId, + error, + }); + } +} diff --git a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts index a0a6de09e98..e742f770a99 100644 --- a/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/redisRealtimeStreams.server.ts @@ -466,4 +466,8 @@ export class RedisRealtimeStreams implements StreamIngestor, StreamResponder { }); } } + + async readRecords(): Promise { + throw new Error("readRecords is not implemented for Redis realtime streams"); + } } diff --git a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts index 1d03116ff57..4a7acb60606 100644 --- a/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts +++ b/apps/webapp/app/services/realtime/s2realtimeStreams.server.ts @@ -1,6 +1,6 @@ // app/realtime/S2RealtimeStreams.ts import type { UnkeyCache } from "@internal/cache"; -import { StreamIngestor, StreamResponder, StreamResponseOptions } from "./types"; +import { StreamIngestor, StreamRecord, StreamResponder, StreamResponseOptions } from "./types"; import { Logger, LogLevel } from "@trigger.dev/core/logger"; import { randomUUID } from "node:crypto"; @@ -10,6 +10,12 @@ export type S2RealtimeStreamsOptions = { accessToken: string; // "Bearer" token issued in S2 console streamPrefix?: string; // defaults to "" + // Custom endpoint for s2-lite (self-hosted) + endpoint?: string; // e.g., "http://localhost:4566/v1" + + // Skip access token issuance (s2-lite doesn't support /access-tokens) + skipAccessTokens?: boolean; + // Read behavior s2WaitSeconds?: number; @@ -37,8 +43,11 @@ type S2AppendAck = { export class S2RealtimeStreams implements StreamResponder, StreamIngestor { private readonly basin: string; private readonly baseUrl: string; + private readonly accountUrl: string; + private readonly endpoint?: string; private readonly token: string; private readonly streamPrefix: string; + private readonly skipAccessTokens: boolean; private readonly s2WaitSeconds: number; @@ -56,9 +65,12 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { constructor(opts: S2RealtimeStreamsOptions) { this.basin = opts.basin; - this.baseUrl = `https://${this.basin}.b.aws.s2.dev/v1`; + this.baseUrl = opts.endpoint ?? `https://${this.basin}.b.aws.s2.dev/v1`; + this.accountUrl = opts.endpoint ?? `https://aws.s2.dev/v1`; + this.endpoint = opts.endpoint; this.token = opts.accessToken; this.streamPrefix = opts.streamPrefix ?? ""; + this.skipAccessTokens = opts.skipAccessTokens ?? false; this.s2WaitSeconds = opts.s2WaitSeconds ?? 60; @@ -80,17 +92,20 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { runId: string, streamId: string ): Promise<{ responseHeaders?: Record }> { - const id = randomUUID(); - - const accessToken = await this.getS2AccessToken(id); + const accessToken = this.skipAccessTokens + ? this.token + : await this.getS2AccessToken(randomUUID()); return { responseHeaders: { "X-S2-Access-Token": accessToken, - "X-S2-Stream-Name": `/runs/${runId}/${streamId}`, + "X-S2-Stream-Name": this.skipAccessTokens + ? this.toStreamName(runId, streamId) + : `/runs/${runId}/${streamId}`, "X-S2-Basin": this.basin, "X-S2-Flush-Interval-Ms": this.flushIntervalMs.toString(), "X-S2-Max-Retries": this.maxRetries.toString(), + ...(this.endpoint ? { "X-S2-Endpoint": this.endpoint } : {}), }, }; } @@ -121,6 +136,88 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { throw new Error("S2 streams are written to S2 via the client, not from the server"); } + async readRecords( + runId: string, + streamId: string, + afterSeqNum?: number + ): Promise { + const s2Stream = this.toStreamName(runId, streamId); + const startSeq = afterSeqNum != null ? afterSeqNum + 1 : 0; + + const qs = new URLSearchParams(); + qs.set("seq_num", String(startSeq)); + qs.set("clamp", "true"); + qs.set("wait", "0"); // Non-blocking: return immediately with existing records + + const res = await fetch( + `${this.baseUrl}/streams/${encodeURIComponent(s2Stream)}/records?${qs}`, + { + method: "GET", + headers: { + Authorization: `Bearer ${this.token}`, + Accept: "text/event-stream", + "S2-Format": "raw", + "S2-Basin": this.basin, + }, + } + ); + + if (!res.ok) { + // Stream may not exist yet (no data sent) + if (res.status === 404) { + return []; + } + const text = await res.text().catch(() => ""); + throw new Error(`S2 readRecords failed: ${res.status} ${res.statusText} ${text}`); + } + + // Parse the SSE response body to extract records + const body = await res.text(); + return this.parseSSEBatchRecords(body); + } + + private parseSSEBatchRecords(sseText: string): StreamRecord[] { + const records: StreamRecord[] = []; + + // SSE events are separated by double newlines + const events = sseText.split("\n\n").filter((e) => e.trim()); + + for (const event of events) { + const lines = event.split("\n"); + let eventType: string | undefined; + let data: string | undefined; + + for (const line of lines) { + if (line.startsWith("event:")) { + eventType = line.slice(6).trim(); + } else if (line.startsWith("data:")) { + data = line.slice(5).trim(); + } + } + + if (eventType === "batch" && data) { + try { + const parsed = JSON.parse(data) as { + records: Array<{ body: string; seq_num: number; timestamp: number }>; + }; + + for (const record of parsed.records) { + const parsedBody = JSON.parse(record.body) as { data: string; id: string }; + records.push({ + data: parsedBody.data, + id: parsedBody.id, + seqNum: record.seq_num, + }); + } + } catch { + // Skip malformed events + } + } + } + + return records; + } + // ---------- Serve SSE from S2 ---------- async streamResponse( @@ -155,7 +252,8 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { headers: { Authorization: `Bearer ${this.token}`, "Content-Type": "application/json", - "S2-Format": "raw", // UTF-8 JSON encoding (no base64 overhead) when your data is text. :contentReference[oaicite:8]{index=8} + "S2-Format": "raw", + "S2-Basin": this.basin, }, body: JSON.stringify(body), }); @@ -184,7 +282,7 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { private async s2IssueAccessToken(id: string): Promise { // POST /v1/access-tokens - const res = await fetch(`https://aws.s2.dev/v1/access-tokens`, { + const res = await fetch(`${this.accountUrl}/access-tokens`, { method: "POST", headers: { Authorization: `Bearer ${this.token}`, @@ -235,6 +333,7 @@ export class S2RealtimeStreams implements StreamResponder, StreamIngestor { Authorization: `Bearer ${this.token}`, Accept: "text/event-stream", "S2-Format": "raw", + "S2-Basin": this.basin, }, signal: opts.signal, }); diff --git a/apps/webapp/app/services/realtime/types.ts b/apps/webapp/app/services/realtime/types.ts index 912711019ab..64433a716f4 100644 --- a/apps/webapp/app/services/realtime/types.ts +++ b/apps/webapp/app/services/realtime/types.ts @@ -1,3 +1,9 @@ +export type StreamRecord = { + data: string; + id: string; + seqNum: number; +}; + // Interface for stream ingestion export interface StreamIngestor { initializeStream( @@ -16,6 +22,12 @@ export interface StreamIngestor { appendPart(part: string, partId: string, runId: string, streamId: string): Promise; getLastChunkIndex(runId: string, streamId: string, clientId: string): Promise; + + readRecords( + runId: string, + streamId: string, + afterSeqNum?: number + ): Promise; } export type StreamResponseOptions = { diff --git a/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts b/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts index 7cc21101bf2..b1bf15b9fed 100644 --- a/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts +++ b/apps/webapp/app/services/realtime/v1StreamsGlobal.server.ts @@ -36,10 +36,16 @@ export function getRealtimeStreamInstance( if (streamVersion === "v1") { return v1RealtimeStreams; } else { - if (env.REALTIME_STREAMS_S2_BASIN && env.REALTIME_STREAMS_S2_ACCESS_TOKEN) { + if ( + env.REALTIME_STREAMS_S2_BASIN && + (env.REALTIME_STREAMS_S2_ACCESS_TOKEN || + env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true") + ) { return new S2RealtimeStreams({ basin: env.REALTIME_STREAMS_S2_BASIN, - accessToken: env.REALTIME_STREAMS_S2_ACCESS_TOKEN, + accessToken: env.REALTIME_STREAMS_S2_ACCESS_TOKEN ?? "", + endpoint: env.REALTIME_STREAMS_S2_ENDPOINT, + skipAccessTokens: env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true", streamPrefix: [ "org", environment.organization.id, @@ -68,7 +74,7 @@ export function determineRealtimeStreamsVersion(streamVersion?: string): "v1" | if ( streamVersion === "v2" && env.REALTIME_STREAMS_S2_BASIN && - env.REALTIME_STREAMS_S2_ACCESS_TOKEN + (env.REALTIME_STREAMS_S2_ACCESS_TOKEN || env.REALTIME_STREAMS_S2_SKIP_ACCESS_TOKENS === "true") ) { return "v2"; } diff --git a/apps/webapp/app/v3/services/deployment.server.ts b/apps/webapp/app/v3/services/deployment.server.ts index 985e0e47543..71292f4093b 100644 --- a/apps/webapp/app/v3/services/deployment.server.ts +++ b/apps/webapp/app/v3/services/deployment.server.ts @@ -13,7 +13,7 @@ import { env } from "~/env.server"; import { createRemoteImageBuild } from "../remoteImageBuilder.server"; import { FINAL_DEPLOYMENT_STATUSES } from "./failDeployment.server"; import { enqueueBuild, generateRegistryCredentials } from "~/services/platform.v3.server"; -import { AppendRecord, S2 } from "@s2-dev/streamstore"; +import { AppendInput, AppendRecord, S2 } from "@s2-dev/streamstore"; import { createRedisClient } from "~/redis.server"; const S2_TOKEN_KEY_PREFIX = "s2-token:read:deployment-event-stream:project:"; @@ -368,7 +368,11 @@ export class DeploymentService extends BaseService { ); return fromPromise( - stream.append(events.map((event) => AppendRecord.make(JSON.stringify(event)))), + stream.append( + AppendInput.create( + events.map((event) => AppendRecord.string({ body: JSON.stringify(event) })) + ) + ), (error) => ({ type: "failed_to_append_to_event_log" as const, cause: error, @@ -396,9 +400,9 @@ export class DeploymentService extends BaseService { type: "failed_to_create_event_stream" as const, cause: error, }) - ).map(({ name }) => ({ + ).map(() => ({ basin: basin.name, - stream: name, + stream: `projects/${project.externalRef}/deployments/${deployment.shortCode}`, })); } @@ -426,7 +430,7 @@ export class DeploymentService extends BaseService { fromPromise( s2.accessTokens.issue({ id: `${project.externalRef}-${new Date().getTime()}`, - expires_at: new Date(Date.now() + 60 * 60 * 1000).toISOString(), // 1 hour + expiresAt: new Date(Date.now() + 60 * 60 * 1000), // 1 hour scope: { ops: ["read"], basins: { @@ -441,7 +445,7 @@ export class DeploymentService extends BaseService { type: "other" as const, cause: error, }) - ).map(({ access_token }) => access_token); + ).map(({ accessToken }) => accessToken); const cacheToken = (token: string) => fromPromise( diff --git a/apps/webapp/package.json b/apps/webapp/package.json index 42a1741d2eb..80c7552d766 100644 --- a/apps/webapp/package.json +++ b/apps/webapp/package.json @@ -107,7 +107,7 @@ "@remix-run/serve": "2.1.0", "@remix-run/server-runtime": "2.1.0", "@remix-run/v1-meta": "^0.1.3", - "@s2-dev/streamstore": "^0.17.2", + "@s2-dev/streamstore": "^0.22.5", "@sentry/remix": "9.46.0", "@slack/web-api": "7.9.1", "@socket.io/redis-adapter": "^8.3.0", diff --git a/docker/config/s2-spec.json b/docker/config/s2-spec.json new file mode 100644 index 00000000000..6c3e610bf05 --- /dev/null +++ b/docker/config/s2-spec.json @@ -0,0 +1,12 @@ +{ + "$schema": "https://raw.githubusercontent.com/s2-streamstore/s2/main/cli/schema.json", + "basins": [ + { + "name": "trigger-local", + "config": { + "create_stream_on_append": true, + "create_stream_on_read": true + } + } + ] +} diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c80648d710c..d9c02973948 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -143,6 +143,22 @@ services: networks: - app_network + s2: + image: ghcr.io/s2-streamstore/s2 + command: ["lite", "--init-file", "/s2-spec.json"] + volumes: + - ./config/s2-spec.json:/s2-spec.json:ro + ports: + - "4566:80" + networks: + - app_network + healthcheck: + test: ["CMD-SHELL", "wget -qO- http://localhost:80/v1/basins?limit=1 || exit 1"] + interval: 2s + timeout: 3s + retries: 5 + start_period: 3s + toxiproxy: container_name: toxiproxy image: ghcr.io/shopify/toxiproxy:latest diff --git a/internal-packages/database/prisma/schema.prisma b/internal-packages/database/prisma/schema.prisma index e28b951f05d..89325996c80 100644 --- a/internal-packages/database/prisma/schema.prisma +++ b/internal-packages/database/prisma/schema.prisma @@ -778,6 +778,7 @@ model TaskRun { /// Store the stream keys that are being used by the run realtimeStreams String[] @default([]) + @@unique([oneTimeUseToken]) @@unique([runtimeEnvironmentId, taskIdentifier, idempotencyKey]) // Finding child runs diff --git a/packages/cli-v3/package.json b/packages/cli-v3/package.json index 2ed11d197f0..423d1b8031c 100644 --- a/packages/cli-v3/package.json +++ b/packages/cli-v3/package.json @@ -92,7 +92,7 @@ "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", - "@s2-dev/streamstore": "^0.17.6", + "@s2-dev/streamstore": "^0.22.5", "@trigger.dev/build": "workspace:4.4.1", "@trigger.dev/core": "workspace:4.4.1", "@trigger.dev/schema-to-json": "workspace:4.4.1", diff --git a/packages/cli-v3/src/commands/deploy.ts b/packages/cli-v3/src/commands/deploy.ts index 7c71b45667b..1ac161d3e4a 100644 --- a/packages/cli-v3/src/commands/deploy.ts +++ b/packages/cli-v3/src/commands/deploy.ts @@ -1155,9 +1155,8 @@ async function handleNativeBuildServerDeploy({ const [readSessionError, readSession] = await tryCatch( stream.readSession( { - seq_num: 0, - wait: 60 * 20, // 20 minutes - as: "bytes", + start: { from: { seqNum: 0 }, clamp: true }, + stop: { waitSecs: 60 * 20 }, // 20 minutes }, { signal: abortController.signal } ) @@ -1176,12 +1175,11 @@ async function handleNativeBuildServerDeploy({ return process.exit(0); } - const decoder = new TextDecoder(); let finalDeploymentEvent: DeploymentFinalizedEvent["data"] | undefined; let queuedSpinnerStopped = false; for await (const record of readSession) { - const decoded = decoder.decode(record.body); + const decoded = record.body; const result = DeploymentEventFromString.safeParse(decoded); if (!result.success) { logger.debug("Failed to parse deployment event, skipping", { @@ -1195,7 +1193,7 @@ async function handleNativeBuildServerDeploy({ switch (event.type) { case "log": { - if (record.seq_num === 0) { + if (record.seqNum === 0) { $queuedSpinner.stop("Build started"); console.log("│"); queuedSpinnerStopped = true; diff --git a/packages/cli-v3/src/entryPoints/dev-run-worker.ts b/packages/cli-v3/src/entryPoints/dev-run-worker.ts index 0da61bfba67..ae9a2667f51 100644 --- a/packages/cli-v3/src/entryPoints/dev-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/dev-run-worker.ts @@ -33,6 +33,7 @@ import { traceContext, heartbeats, realtimeStreams, + inputStreams, } from "@trigger.dev/core/v3"; import { TriggerTracer } from "@trigger.dev/core/v3/tracer"; import { @@ -59,6 +60,7 @@ import { StandardTraceContextManager, StandardHeartbeatsManager, StandardRealtimeStreamsManager, + StandardInputStreamManager, } from "@trigger.dev/core/v3/workers"; import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc"; import { readFile } from "node:fs/promises"; @@ -160,6 +162,14 @@ const standardRealtimeStreamsManager = new StandardRealtimeStreamsManager( ); realtimeStreams.setGlobalManager(standardRealtimeStreamsManager); +const standardInputStreamManager = new StandardInputStreamManager( + apiClientManager.clientOrThrow(), + getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev", + (getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ?? + false +); +inputStreams.setGlobalManager(standardInputStreamManager); + const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000); const waitUntilManager = new StandardWaitUntilManager(waitUntilTimeoutInMs); waitUntil.setGlobalManager(waitUntilManager); @@ -333,6 +343,7 @@ function resetExecutionEnvironment() { usageTimeoutManager.reset(); runMetadataManager.reset(); standardRealtimeStreamsManager.reset(); + standardInputStreamManager.reset(); waitUntilManager.reset(); _sharedWorkerRuntime?.reset(); durableClock.reset(); @@ -380,6 +391,7 @@ const zodIpc = new ZodIpcConnection({ } resetExecutionEnvironment(); + standardInputStreamManager.setRunId(execution.run.id, execution.run.realtimeStreamsVersion); standardTraceContextManager.traceContext = traceContext; standardRunTimelineMetricsManager.registerMetricsFromExecution(metrics, isWarmStart); diff --git a/packages/cli-v3/src/entryPoints/managed-run-worker.ts b/packages/cli-v3/src/entryPoints/managed-run-worker.ts index 922accfa4a5..f8234aa68c4 100644 --- a/packages/cli-v3/src/entryPoints/managed-run-worker.ts +++ b/packages/cli-v3/src/entryPoints/managed-run-worker.ts @@ -32,6 +32,7 @@ import { traceContext, heartbeats, realtimeStreams, + inputStreams, } from "@trigger.dev/core/v3"; import { TriggerTracer } from "@trigger.dev/core/v3/tracer"; import { @@ -59,6 +60,7 @@ import { StandardTraceContextManager, StandardHeartbeatsManager, StandardRealtimeStreamsManager, + StandardInputStreamManager, } from "@trigger.dev/core/v3/workers"; import { ZodIpcConnection } from "@trigger.dev/core/v3/zodIpc"; import { readFile } from "node:fs/promises"; @@ -140,6 +142,14 @@ const standardRealtimeStreamsManager = new StandardRealtimeStreamsManager( ); realtimeStreams.setGlobalManager(standardRealtimeStreamsManager); +const standardInputStreamManager = new StandardInputStreamManager( + apiClientManager.clientOrThrow(), + getEnvVar("TRIGGER_STREAM_URL", getEnvVar("TRIGGER_API_URL")) ?? "https://api.trigger.dev", + (getEnvVar("TRIGGER_STREAMS_DEBUG") === "1" || getEnvVar("TRIGGER_STREAMS_DEBUG") === "true") ?? + false +); +inputStreams.setGlobalManager(standardInputStreamManager); + const waitUntilTimeoutInMs = getNumberEnvVar("TRIGGER_WAIT_UNTIL_TIMEOUT_MS", 60_000); const waitUntilManager = new StandardWaitUntilManager(waitUntilTimeoutInMs); waitUntil.setGlobalManager(waitUntilManager); @@ -313,6 +323,7 @@ function resetExecutionEnvironment() { runMetadataManager.reset(); waitUntilManager.reset(); standardRealtimeStreamsManager.reset(); + standardInputStreamManager.reset(); _sharedWorkerRuntime?.reset(); durableClock.reset(); taskContext.disable(); @@ -364,6 +375,7 @@ const zodIpc = new ZodIpcConnection({ } resetExecutionEnvironment(); + standardInputStreamManager.setRunId(execution.run.id, execution.run.realtimeStreamsVersion); standardTraceContextManager.traceContext = traceContext; diff --git a/packages/core/package.json b/packages/core/package.json index 369b3266c7c..52d3b877483 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -177,8 +177,8 @@ "@opentelemetry/core": "2.0.1", "@opentelemetry/exporter-logs-otlp-http": "0.203.0", "@opentelemetry/exporter-metrics-otlp-http": "0.203.0", - "@opentelemetry/host-metrics": "^0.37.0", "@opentelemetry/exporter-trace-otlp-http": "0.203.0", + "@opentelemetry/host-metrics": "^0.37.0", "@opentelemetry/instrumentation": "0.203.0", "@opentelemetry/resources": "2.0.1", "@opentelemetry/sdk-logs": "0.203.0", @@ -186,7 +186,7 @@ "@opentelemetry/sdk-trace-base": "2.0.1", "@opentelemetry/sdk-trace-node": "2.0.1", "@opentelemetry/semantic-conventions": "1.36.0", - "@s2-dev/streamstore": "0.17.3", + "@s2-dev/streamstore": "0.22.5", "dequal": "^2.0.3", "eventsource": "^3.0.5", "eventsource-parser": "^3.0.0", diff --git a/packages/core/src/v3/apiClient/index.ts b/packages/core/src/v3/apiClient/index.ts index 428493b71e2..7de6e275fc4 100644 --- a/packages/core/src/v3/apiClient/index.ts +++ b/packages/core/src/v3/apiClient/index.ts @@ -17,6 +17,8 @@ import { CreateBatchRequestBody, CreateBatchResponse, CreateEnvironmentVariableRequestBody, + CreateInputStreamWaitpointRequestBody, + CreateInputStreamWaitpointResponseBody, CreateScheduleOptions, CreateStreamResponseBody, CreateUploadPayloadUrlResponseBody, @@ -41,6 +43,7 @@ import { RetrieveRunResponse, RetrieveRunTraceResponseBody, ScheduleObject, + SendInputStreamResponseBody, StreamBatchItemsResponse, TaskRunExecutionResult, TriggerTaskRequestBody, @@ -1317,6 +1320,8 @@ export class ApiClient { onComplete?: () => void; onError?: (error: Error) => void; lastEventId?: string; + /** Called for each SSE event with the full event metadata (id, timestamp). */ + onPart?: (part: SSEStreamPart) => void; } ): Promise> { const streamFactory = new SSEStreamSubscriptionFactory(options?.baseUrl ?? this.baseUrl, { @@ -1333,10 +1338,14 @@ export class ApiClient { const stream = await subscription.subscribe(); + const onPart = options?.onPart; + return stream.pipeThrough( new TransformStream({ transform(chunk, controller) { - controller.enqueue(chunk.chunk as T); + const data = chunk.chunk as T; + onPart?.(chunk as SSEStreamPart); + controller.enqueue(data); }, }) ); @@ -1385,6 +1394,41 @@ export class ApiClient { ); } + async sendInputStream( + runId: string, + streamId: string, + data: unknown, + requestOptions?: ZodFetchOptions + ) { + return zodfetch( + SendInputStreamResponseBody, + `${this.baseUrl}/realtime/v1/streams/${runId}/input/${streamId}`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify({ data }), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + + async createInputStreamWaitpoint( + runFriendlyId: string, + body: CreateInputStreamWaitpointRequestBody, + requestOptions?: ZodFetchOptions + ) { + return zodfetch( + CreateInputStreamWaitpointResponseBody, + `${this.baseUrl}/api/v1/runs/${runFriendlyId}/input-streams/wait`, + { + method: "POST", + headers: this.#getHeaders(false), + body: JSON.stringify(body), + }, + mergeRequestOptions(this.defaultRequestOptions, requestOptions) + ); + } + async generateJWTClaims(requestOptions?: ZodFetchOptions): Promise> { return zodfetch( z.record(z.any()), diff --git a/packages/core/src/v3/index.ts b/packages/core/src/v3/index.ts index b714d8cb933..2757363f4be 100644 --- a/packages/core/src/v3/index.ts +++ b/packages/core/src/v3/index.ts @@ -20,6 +20,8 @@ export * from "./lifecycle-hooks-api.js"; export * from "./locals-api.js"; export * from "./heartbeats-api.js"; export * from "./realtime-streams-api.js"; +export * from "./input-streams-api.js"; +export * from "./waitpoints/index.js"; export * from "./schemas/index.js"; export { SemanticInternalAttributes } from "./semanticInternalAttributes.js"; export * from "./resource-catalog-api.js"; diff --git a/packages/core/src/v3/input-streams-api.ts b/packages/core/src/v3/input-streams-api.ts new file mode 100644 index 00000000000..17875db5053 --- /dev/null +++ b/packages/core/src/v3/input-streams-api.ts @@ -0,0 +1,7 @@ +// Split module-level variable definition into separate files to allow +// tree-shaking on each api instance. +import { InputStreamsAPI } from "./inputStreams/index.js"; + +export const inputStreams = InputStreamsAPI.getInstance(); + +export * from "./inputStreams/types.js"; diff --git a/packages/core/src/v3/inputStreams/index.ts b/packages/core/src/v3/inputStreams/index.ts new file mode 100644 index 00000000000..4a871d6bfcc --- /dev/null +++ b/packages/core/src/v3/inputStreams/index.ts @@ -0,0 +1,69 @@ +import { getGlobal, registerGlobal } from "../utils/globals.js"; +import { NoopInputStreamManager } from "./noopManager.js"; +import { InputStreamManager, InputStreamOncePromise } from "./types.js"; +import { InputStreamOnceOptions } from "../realtimeStreams/types.js"; + +const API_NAME = "input-streams"; + +const NOOP_MANAGER = new NoopInputStreamManager(); + +export class InputStreamsAPI implements InputStreamManager { + private static _instance?: InputStreamsAPI; + + private constructor() {} + + public static getInstance(): InputStreamsAPI { + if (!this._instance) { + this._instance = new InputStreamsAPI(); + } + + return this._instance; + } + + setGlobalManager(manager: InputStreamManager): boolean { + return registerGlobal(API_NAME, manager); + } + + #getManager(): InputStreamManager { + return getGlobal(API_NAME) ?? NOOP_MANAGER; + } + + public setRunId(runId: string, streamsVersion?: string): void { + this.#getManager().setRunId(runId, streamsVersion); + } + + public on( + streamId: string, + handler: (data: unknown) => void | Promise + ): { off: () => void } { + return this.#getManager().on(streamId, handler); + } + + public once(streamId: string, options?: InputStreamOnceOptions): InputStreamOncePromise { + return this.#getManager().once(streamId, options); + } + + public peek(streamId: string): unknown | undefined { + return this.#getManager().peek(streamId); + } + + public lastSeqNum(streamId: string): number | undefined { + return this.#getManager().lastSeqNum(streamId); + } + + public clearHandlers(): void { + this.#getManager().clearHandlers(); + } + + public reset(): void { + this.#getManager().reset(); + } + + public disconnect(): void { + this.#getManager().disconnect(); + } + + public connectTail(runId: string, fromSeq?: number): void { + this.#getManager().connectTail(runId, fromSeq); + } +} diff --git a/packages/core/src/v3/inputStreams/manager.ts b/packages/core/src/v3/inputStreams/manager.ts new file mode 100644 index 00000000000..f393f4a169a --- /dev/null +++ b/packages/core/src/v3/inputStreams/manager.ts @@ -0,0 +1,353 @@ +import { ApiClient } from "../apiClient/index.js"; +import { + InputStreamManager, + InputStreamOncePromise, + InputStreamOnceResult, + InputStreamTimeoutError, +} from "./types.js"; +import { InputStreamOnceOptions } from "../realtimeStreams/types.js"; + +type InputStreamHandler = (data: unknown) => void | Promise; + +type OnceWaiter = { + resolve: (result: InputStreamOnceResult) => void; + reject: (error: Error) => void; + timeoutHandle?: ReturnType; +}; + + +type TailState = { + abortController: AbortController; + promise: Promise; +}; + +export class StandardInputStreamManager implements InputStreamManager { + private handlers = new Map>(); + private onceWaiters = new Map(); + private buffer = new Map(); + private tails = new Map(); + private seqNums = new Map(); + private currentRunId: string | null = null; + private streamsVersion: string | undefined; + + constructor( + private apiClient: ApiClient, + private baseUrl: string, + private debug: boolean = false + ) {} + + lastSeqNum(streamId: string): number | undefined { + return this.seqNums.get(streamId); + } + + setRunId(runId: string, streamsVersion?: string): void { + this.currentRunId = runId; + this.streamsVersion = streamsVersion; + } + + on(streamId: string, handler: InputStreamHandler): { off: () => void } { + this.#requireV2Streams(); + + let handlerSet = this.handlers.get(streamId); + if (!handlerSet) { + handlerSet = new Set(); + this.handlers.set(streamId, handlerSet); + } + handlerSet.add(handler); + + // Lazily connect a tail for this stream + this.#ensureStreamTailConnected(streamId); + + // Flush any buffered data for this stream + const buffered = this.buffer.get(streamId); + if (buffered && buffered.length > 0) { + for (const data of buffered) { + this.#invokeHandler(handler, data); + } + this.buffer.delete(streamId); + } + + return { + off: () => { + handlerSet?.delete(handler); + if (handlerSet?.size === 0) { + this.handlers.delete(streamId); + } + }, + }; + } + + once(streamId: string, options?: InputStreamOnceOptions): InputStreamOncePromise { + this.#requireV2Streams(); + + // Lazily connect a tail for this stream + this.#ensureStreamTailConnected(streamId); + + // Check buffer first + const buffered = this.buffer.get(streamId); + if (buffered && buffered.length > 0) { + const data = buffered.shift()!; + if (buffered.length === 0) { + this.buffer.delete(streamId); + } + return new InputStreamOncePromise((resolve) => { + resolve({ ok: true, output: data }); + }); + } + + return new InputStreamOncePromise((resolve, reject) => { + const waiter: OnceWaiter = { resolve, reject }; + + // Handle abort signal + if (options?.signal) { + if (options.signal.aborted) { + reject(new Error("Aborted")); + return; + } + options.signal.addEventListener( + "abort", + () => { + if (waiter.timeoutHandle) { + clearTimeout(waiter.timeoutHandle); + } + this.#removeOnceWaiter(streamId, waiter); + reject(new Error("Aborted")); + }, + { once: true } + ); + } + + // Handle timeout — resolve with error result instead of rejecting + if (options?.timeoutMs) { + waiter.timeoutHandle = setTimeout(() => { + this.#removeOnceWaiter(streamId, waiter); + resolve({ + ok: false, + error: new InputStreamTimeoutError(streamId, options.timeoutMs!), + }); + }, options.timeoutMs); + } + + let waiters = this.onceWaiters.get(streamId); + if (!waiters) { + waiters = []; + this.onceWaiters.set(streamId, waiters); + } + waiters.push(waiter); + }); + } + + peek(streamId: string): unknown | undefined { + const buffered = this.buffer.get(streamId); + if (buffered && buffered.length > 0) { + return buffered[0]; + } + return undefined; + } + + clearHandlers(): void { + this.handlers.clear(); + + // Abort tails that no longer have any once waiters either + for (const [streamId, tail] of this.tails) { + const hasWaiters = this.onceWaiters.has(streamId) && this.onceWaiters.get(streamId)!.length > 0; + if (!hasWaiters) { + tail.abortController.abort(); + this.tails.delete(streamId); + } + } + } + + connectTail(runId: string, _fromSeq?: number): void { + // No-op: tails are now created per-stream lazily + } + + disconnect(): void { + for (const [, tail] of this.tails) { + tail.abortController.abort(); + } + this.tails.clear(); + } + + reset(): void { + this.disconnect(); + this.currentRunId = null; + this.streamsVersion = undefined; + this.seqNums.clear(); + this.handlers.clear(); + + // Reject all pending once waiters + for (const [, waiters] of this.onceWaiters) { + for (const waiter of waiters) { + if (waiter.timeoutHandle) { + clearTimeout(waiter.timeoutHandle); + } + waiter.reject(new Error("Input stream manager reset")); + } + } + this.onceWaiters.clear(); + this.buffer.clear(); + } + + #requireV2Streams(): void { + if (this.currentRunId && this.streamsVersion !== "v2") { + throw new Error( + "Input streams require v2 realtime streams. Enable them with: { future: { v2RealtimeStreams: true } }" + ); + } + } + + #ensureStreamTailConnected(streamId: string): void { + if (!this.tails.has(streamId) && this.currentRunId) { + const abortController = new AbortController(); + const promise = this.#runTail(this.currentRunId, streamId, abortController.signal) + .catch((error) => { + if (this.debug) { + console.error(`[InputStreamManager] Tail error for "${streamId}":`, error); + } + }) + .finally(() => { + this.tails.delete(streamId); + + // Auto-reconnect if there are still active handlers or waiters + const hasHandlers = + this.handlers.has(streamId) && this.handlers.get(streamId)!.size > 0; + const hasWaiters = + this.onceWaiters.has(streamId) && this.onceWaiters.get(streamId)!.length > 0; + if (hasHandlers || hasWaiters) { + this.#ensureStreamTailConnected(streamId); + } + }); + this.tails.set(streamId, { abortController, promise }); + } + } + + async #runTail(runId: string, streamId: string, signal: AbortSignal): Promise { + try { + const lastSeq = this.seqNums.get(streamId); + const stream = await this.apiClient.fetchStream( + runId, + `input/${streamId}`, + { + signal, + baseUrl: this.baseUrl, + // Max allowed by the SSE endpoint is 600s; the tail will reconnect on close + timeoutInSeconds: 600, + // Resume from last seen sequence number to avoid replaying history on reconnect + lastEventId: lastSeq !== undefined ? String(lastSeq) : undefined, + onPart: (part) => { + const seqNum = parseInt(part.id, 10); + if (Number.isFinite(seqNum)) { + this.seqNums.set(streamId, seqNum); + } + }, + onComplete: () => { + if (this.debug) { + console.log(`[InputStreamManager] Tail stream completed for "${streamId}"`); + } + }, + onError: (error) => { + if (this.debug) { + console.error(`[InputStreamManager] Tail stream error for "${streamId}":`, error); + } + }, + } + ); + + for await (const record of stream) { + if (signal.aborted) break; + + // S2 SSE returns record bodies as JSON strings; parse if needed + let data: unknown; + if (typeof record === "string") { + try { + data = JSON.parse(record); + } catch { + data = record; + } + } else { + data = record; + } + + this.#dispatch(streamId, data); + } + } catch (error) { + // AbortError is expected when disconnecting + if (error instanceof Error && error.name === "AbortError") { + return; + } + throw error; + } + } + + #dispatch(streamId: string, data: unknown): void { + // First try to resolve a once waiter + const waiters = this.onceWaiters.get(streamId); + if (waiters && waiters.length > 0) { + const waiter = waiters.shift()!; + if (waiters.length === 0) { + this.onceWaiters.delete(streamId); + } + if (waiter.timeoutHandle) { + clearTimeout(waiter.timeoutHandle); + } + waiter.resolve({ ok: true, output: data }); + // Also invoke persistent handlers + this.#invokeHandlers(streamId, data); + return; + } + + // Invoke persistent handlers + const handlers = this.handlers.get(streamId); + if (handlers && handlers.size > 0) { + this.#invokeHandlers(streamId, data); + return; + } + + // No handlers, buffer the data + let buffered = this.buffer.get(streamId); + if (!buffered) { + buffered = []; + this.buffer.set(streamId, buffered); + } + buffered.push(data); + } + + #invokeHandlers(streamId: string, data: unknown): void { + const handlers = this.handlers.get(streamId); + if (!handlers) return; + for (const handler of handlers) { + this.#invokeHandler(handler, data); + } + } + + #invokeHandler(handler: InputStreamHandler, data: unknown): void { + try { + const result = handler(data); + // If the handler returns a promise, catch errors silently + if (result && typeof result === "object" && "catch" in result) { + (result as Promise).catch((error) => { + if (this.debug) { + console.error("[InputStreamManager] Handler error:", error); + } + }); + } + } catch (error) { + if (this.debug) { + console.error("[InputStreamManager] Handler error:", error); + } + } + } + + #removeOnceWaiter(streamId: string, waiter: OnceWaiter): void { + const waiters = this.onceWaiters.get(streamId); + if (!waiters) return; + const index = waiters.indexOf(waiter); + if (index !== -1) { + waiters.splice(index, 1); + } + if (waiters.length === 0) { + this.onceWaiters.delete(streamId); + } + } +} diff --git a/packages/core/src/v3/inputStreams/noopManager.ts b/packages/core/src/v3/inputStreams/noopManager.ts new file mode 100644 index 00000000000..6d72d9e2f76 --- /dev/null +++ b/packages/core/src/v3/inputStreams/noopManager.ts @@ -0,0 +1,29 @@ +import { InputStreamManager, InputStreamOncePromise } from "./types.js"; +import { InputStreamOnceOptions } from "../realtimeStreams/types.js"; + +export class NoopInputStreamManager implements InputStreamManager { + setRunId(_runId: string, _streamsVersion?: string): void {} + + on(_streamId: string, _handler: (data: unknown) => void | Promise): { off: () => void } { + return { off: () => {} }; + } + + once(_streamId: string, _options?: InputStreamOnceOptions): InputStreamOncePromise { + return new InputStreamOncePromise(() => { + // Never resolves in noop mode + }); + } + + peek(_streamId: string): unknown | undefined { + return undefined; + } + + lastSeqNum(_streamId: string): number | undefined { + return undefined; + } + + clearHandlers(): void {} + reset(): void {} + disconnect(): void {} + connectTail(_runId: string, _fromSeq?: number): void {} +} diff --git a/packages/core/src/v3/inputStreams/types.ts b/packages/core/src/v3/inputStreams/types.ts new file mode 100644 index 00000000000..0816c06493f --- /dev/null +++ b/packages/core/src/v3/inputStreams/types.ts @@ -0,0 +1,93 @@ +import { InputStreamOnceOptions } from "../realtimeStreams/types.js"; + +export class InputStreamTimeoutError extends Error { + constructor( + public readonly streamId: string, + public readonly timeoutMs: number + ) { + super(`Timeout waiting for input stream "${streamId}" after ${timeoutMs}ms`); + this.name = "InputStreamTimeoutError"; + } +} + +export type InputStreamOnceResult = + | { ok: true; output: TData } + | { ok: false; error: InputStreamTimeoutError }; + +export class InputStreamOncePromise extends Promise> { + constructor( + executor: ( + resolve: ( + value: InputStreamOnceResult | PromiseLike> + ) => void, + reject: (reason?: any) => void + ) => void + ) { + super(executor); + } + + unwrap(): Promise { + return this.then((result) => { + if (result.ok) { + return result.output; + } else { + throw result.error; + } + }); + } +} + +export interface InputStreamManager { + /** + * Set the current run ID and streams version. The tail connection will be + * established lazily when `on()` or `once()` is first called, but only + * for v2 (S2-backed) realtime streams. + */ + setRunId(runId: string, streamsVersion?: string): void; + + /** + * Register a handler that fires every time data arrives on the given input stream. + * Handlers are automatically cleaned up when the task run completes. + * Returns `{ off }` for early unsubscription if needed. + */ + on(streamId: string, handler: (data: unknown) => void | Promise): { off: () => void }; + + /** + * Wait for the next piece of data on the given input stream. + * Returns a result object `{ ok, output }` or `{ ok, error }`. + * Chain `.unwrap()` to get the data directly or throw on timeout. + */ + once(streamId: string, options?: InputStreamOnceOptions): InputStreamOncePromise; + + /** + * Non-blocking peek at the most recent data on the given input stream. + */ + peek(streamId: string): unknown | undefined; + + /** + * The last S2 sequence number seen for the given input stream. + * Used by `.wait()` to tell the server where to check for existing data. + */ + lastSeqNum(streamId: string): number | undefined; + + /** + * Clear all persistent `.on()` handlers and abort tails that have no remaining once waiters. + * Called automatically when a task run completes. + */ + clearHandlers(): void; + + /** + * Reset state between task executions. + */ + reset(): void; + + /** + * Disconnect any active tails / connections. + */ + disconnect(): void; + + /** + * Connect a tail to receive input stream records for the given run. + */ + connectTail(runId: string, fromSeq?: number): void; +} diff --git a/packages/core/src/v3/realtimeStreams/streamInstance.ts b/packages/core/src/v3/realtimeStreams/streamInstance.ts index 5efbcb225a3..6d8106ffe6c 100644 --- a/packages/core/src/v3/realtimeStreams/streamInstance.ts +++ b/packages/core/src/v3/realtimeStreams/streamInstance.ts @@ -52,6 +52,7 @@ export class StreamInstance implements StreamsWriter { basin: parsedResponse.basin, stream: parsedResponse.streamName ?? this.options.key, accessToken: parsedResponse.accessToken, + endpoint: parsedResponse.endpoint, source: this.options.source, signal: this.options.signal, debug: this.options.debug, @@ -103,6 +104,7 @@ type ParsedStreamResponse = version: "v2"; accessToken: string; basin: string; + endpoint?: string; flushIntervalMs?: number; maxRetries?: number; streamName?: string; @@ -123,6 +125,7 @@ function parseCreateStreamResponse( return { version: "v1" }; } + const endpoint = headers?.["x-s2-endpoint"]; const flushIntervalMs = headers?.["x-s2-flush-interval-ms"]; const maxRetries = headers?.["x-s2-max-retries"]; const streamName = headers?.["x-s2-stream-name"]; @@ -131,6 +134,7 @@ function parseCreateStreamResponse( version: "v2", accessToken, basin, + endpoint, flushIntervalMs: flushIntervalMs ? parseInt(flushIntervalMs) : undefined, maxRetries: maxRetries ? parseInt(maxRetries) : undefined, streamName, diff --git a/packages/core/src/v3/realtimeStreams/streamsWriterV2.ts b/packages/core/src/v3/realtimeStreams/streamsWriterV2.ts index 568ff5574e6..91713630dbe 100644 --- a/packages/core/src/v3/realtimeStreams/streamsWriterV2.ts +++ b/packages/core/src/v3/realtimeStreams/streamsWriterV2.ts @@ -6,12 +6,13 @@ export type StreamsWriterV2Options = { basin: string; stream: string; accessToken: string; + endpoint?: string; // Custom S2 endpoint (for s2-lite) source: ReadableStream; signal?: AbortSignal; flushIntervalMs?: number; // Used as lingerDuration for BatchTransform (default 200ms) maxRetries?: number; // Not used with appendSession, kept for compatibility debug?: boolean; // Enable debug logging (default false) - maxQueuedBytes?: number; // Max queued bytes for appendSession (default 10MB) + maxInflightBytes?: number; // Max queued bytes for appendSession (default 10MB) }; /** @@ -50,18 +51,28 @@ export class StreamsWriterV2 implements StreamsWriter { private streamPromise: Promise; private readonly flushIntervalMs: number; private readonly debug: boolean; - private readonly maxQueuedBytes: number; + private readonly maxInflightBytes: number; private aborted = false; private sessionWritable: WritableStream | null = null; constructor(private options: StreamsWriterV2Options) { this.debug = options.debug ?? false; - this.s2Client = new S2({ accessToken: options.accessToken }); + this.s2Client = new S2({ + accessToken: options.accessToken, + ...(options.endpoint + ? { + endpoints: { + account: options.endpoint, + basin: options.endpoint, + }, + } + : {}), + }); this.flushIntervalMs = options.flushIntervalMs ?? 200; - this.maxQueuedBytes = options.maxQueuedBytes ?? 1024 * 1024 * 10; // 10MB default + this.maxInflightBytes = options.maxInflightBytes ?? 1024 * 1024 * 10; // 10MB default this.log( - `[S2MetadataStream] Initializing: basin=${options.basin}, stream=${options.stream}, flushIntervalMs=${this.flushIntervalMs}, maxQueuedBytes=${this.maxQueuedBytes}` + `[S2MetadataStream] Initializing: basin=${options.basin}, stream=${options.stream}, flushIntervalMs=${this.flushIntervalMs}, maxInflightBytes=${this.maxInflightBytes}` ); // Check if already aborted @@ -124,7 +135,7 @@ export class StreamsWriterV2 implements StreamsWriter { const stream = basin.stream(this.options.stream); const session = await stream.appendSession({ - maxQueuedBytes: this.maxQueuedBytes, + maxInflightBytes: this.maxInflightBytes, }); this.sessionWritable = session.writable; @@ -141,7 +152,7 @@ export class StreamsWriterV2 implements StreamsWriter { return; } // Convert each chunk to JSON string and wrap in AppendRecord - controller.enqueue(AppendRecord.make(JSON.stringify({ data: chunk, id: nanoid(7) }))); + controller.enqueue(AppendRecord.string({ body: JSON.stringify({ data: chunk, id: nanoid(7) }) })); }, }) ) @@ -158,9 +169,9 @@ export class StreamsWriterV2 implements StreamsWriter { const lastAcked = session.lastAckedPosition(); if (lastAcked?.end) { - const recordsWritten = lastAcked.end.seq_num; + const recordsWritten = lastAcked.end.seqNum; this.log( - `[S2MetadataStream] Written ${recordsWritten} records, ending at seq_num=${lastAcked.end.seq_num}` + `[S2MetadataStream] Written ${recordsWritten} records, ending at seqNum=${lastAcked.end.seqNum}` ); } } catch (error) { diff --git a/packages/core/src/v3/realtimeStreams/types.ts b/packages/core/src/v3/realtimeStreams/types.ts index 25f116d5d96..174970c2830 100644 --- a/packages/core/src/v3/realtimeStreams/types.ts +++ b/packages/core/src/v3/realtimeStreams/types.ts @@ -1,6 +1,10 @@ import { AnyZodFetchOptions, ApiRequestOptions } from "../apiClient/core.js"; +import type { InputStreamOncePromise } from "../inputStreams/types.js"; +export { InputStreamOncePromise, InputStreamTimeoutError } from "../inputStreams/types.js"; +export type { InputStreamOnceResult } from "../inputStreams/types.js"; import { AsyncIterableStream } from "../streams/asyncIterableStream.js"; import { Prettify } from "../types/utils.js"; +import type { ManualWaitpointPromise } from "../waitpoints/index.js"; export type RealtimeStreamOperationOptions = { signal?: AbortSignal; @@ -143,3 +147,106 @@ export type WriterStreamOptions = Prettify< }) => Promise | void; } >; + +// --- Input streams (inbound data to running tasks) --- + +/** + * A defined input stream that can receive typed data from external callers. + * + * Inside a task, use `.on()`, `.once()`, or `.peek()` to receive data. + * Outside a task, use `.send()` to send data to a running task. + */ +export type RealtimeDefinedInputStream = { + id: string; + /** + * Register a handler that fires every time data arrives on this input stream. + * Handlers are automatically cleaned up when the task run completes, so calling + * `.off()` is optional. Returns a subscription with `.off()` for early unsubscription. + */ + on: (handler: (data: TData) => void | Promise) => InputStreamSubscription; + /** + * Wait for the next piece of data on this input stream. + * Returns a result object `{ ok, output }` or `{ ok, error }`. + * Chain `.unwrap()` to get the data directly or throw on timeout. + */ + once: (options?: InputStreamOnceOptions) => InputStreamOncePromise; + /** + * Non-blocking peek at the most recent data received on this input stream. + * Returns `undefined` if no data has been received yet. + */ + peek: () => TData | undefined; + /** + * Suspend the task until data arrives on this input stream. + * + * Unlike `.once()` which keeps the task process alive while waiting, + * `.wait()` suspends the task entirely — freeing compute resources. + * The task resumes when data is sent via `.send()`. + * + * Uses a waitpoint token internally. Can only be called inside a task.run(). + */ + wait: (options?: InputStreamWaitOptions) => ManualWaitpointPromise; + /** + * Send data to this input stream on a specific run. + * This is used from outside the task (e.g., from your backend or another task). + */ + send: (runId: string, data: TData, options?: SendInputStreamOptions) => Promise; +}; + +export type InputStreamSubscription = { + off: () => void; +}; + +export type InputStreamOnceOptions = { + signal?: AbortSignal; + timeoutMs?: number; +}; + +export type SendInputStreamOptions = { + requestOptions?: ApiRequestOptions; +}; + +export type InputStreamWaitOptions = { + /** + * Maximum time to wait before the waitpoint times out. + * Uses the same period format as `wait.createToken()`. + * If the timeout is reached, the result will be `{ ok: false, error }`. + * + * @example "30s", "5m", "1h", "24h", "7d" + */ + timeout?: string; + + /** + * Idempotency key for the underlying waitpoint token. + * If the same key is used again (and hasn't expired), the existing + * waitpoint is reused. This means if the task retries, it will + * resume waiting on the same waitpoint rather than creating a new one. + */ + idempotencyKey?: string; + + /** + * TTL for the idempotency key. After this period, the same key + * will create a new waitpoint. + */ + idempotencyKeyTTL?: string; + + /** + * Tags for the underlying waitpoint token, useful for querying + * and filtering waitpoints via `wait.listTokens()`. + */ + tags?: string[]; +}; + +export type InferInputStreamType = T extends RealtimeDefinedInputStream + ? TData + : unknown; + +/** + * Internal record format for multiplexed input stream data on S2. + * All input streams for a run share a single S2 stream, demuxed by `stream` field. + */ +export type InputStreamRecord = { + stream: string; + data: unknown; + ts: number; + id: string; +}; diff --git a/packages/core/src/v3/schemas/api.ts b/packages/core/src/v3/schemas/api.ts index 2a7bcb96502..d42e6158096 100644 --- a/packages/core/src/v3/schemas/api.ts +++ b/packages/core/src/v3/schemas/api.ts @@ -1370,6 +1370,31 @@ export const CreateWaitpointTokenResponseBody = z.object({ }); export type CreateWaitpointTokenResponseBody = z.infer; +export const CreateInputStreamWaitpointRequestBody = z.object({ + streamId: z.string(), + timeout: z.string().optional(), + idempotencyKey: z.string().optional(), + idempotencyKeyTTL: z.string().optional(), + tags: z.union([z.string(), z.array(z.string())]).optional(), + /** + * The last S2 sequence number the client has seen on this input stream. + * Used to check for data that arrived before .wait() was called. + * If undefined, the server checks from the beginning of the stream. + */ + lastSeqNum: z.number().optional(), +}); +export type CreateInputStreamWaitpointRequestBody = z.infer< + typeof CreateInputStreamWaitpointRequestBody +>; + +export const CreateInputStreamWaitpointResponseBody = z.object({ + waitpointId: z.string(), + isCached: z.boolean(), +}); +export type CreateInputStreamWaitpointResponseBody = z.infer< + typeof CreateInputStreamWaitpointResponseBody +>; + export const waitpointTokenStatuses = ["WAITING", "COMPLETED", "TIMED_OUT"] as const; export const WaitpointTokenStatus = z.enum(waitpointTokenStatuses); export type WaitpointTokenStatus = z.infer; @@ -1598,3 +1623,8 @@ export const AppendToStreamResponseBody = z.object({ message: z.string().optional(), }); export type AppendToStreamResponseBody = z.infer; + +export const SendInputStreamResponseBody = z.object({ + ok: z.boolean(), +}); +export type SendInputStreamResponseBody = z.infer; diff --git a/packages/core/src/v3/utils/globals.ts b/packages/core/src/v3/utils/globals.ts index 47cabd60808..08b62d379b2 100644 --- a/packages/core/src/v3/utils/globals.ts +++ b/packages/core/src/v3/utils/globals.ts @@ -2,6 +2,7 @@ import { ApiClientConfiguration } from "../apiClientManager/types.js"; import { Clock } from "../clock/clock.js"; import { HeartbeatsManager } from "../heartbeats/types.js"; import type { IdempotencyKeyCatalog } from "../idempotency-key-catalog/catalog.js"; +import { InputStreamManager } from "../inputStreams/types.js"; import { LifecycleHooksManager } from "../lifecycleHooks/types.js"; import { LocalsManager } from "../locals/types.js"; import { RealtimeStreamsManager } from "../realtimeStreams/types.js"; @@ -74,4 +75,5 @@ type TriggerDotDevGlobalAPI = { ["trace-context"]?: TraceContextManager; ["heartbeats"]?: HeartbeatsManager; ["realtime-streams"]?: RealtimeStreamsManager; + ["input-streams"]?: InputStreamManager; }; diff --git a/packages/core/src/v3/waitpoints/index.ts b/packages/core/src/v3/waitpoints/index.ts new file mode 100644 index 00000000000..fc70ff57f21 --- /dev/null +++ b/packages/core/src/v3/waitpoints/index.ts @@ -0,0 +1,35 @@ +import type { WaitpointTokenTypedResult } from "../schemas/common.js"; + +export class WaitpointTimeoutError extends Error { + constructor(message: string) { + super(message); + this.name = "WaitpointTimeoutError"; + } +} + +export class ManualWaitpointPromise extends Promise< + WaitpointTokenTypedResult +> { + constructor( + executor: ( + resolve: ( + value: + | WaitpointTokenTypedResult + | PromiseLike> + ) => void, + reject: (reason?: any) => void + ) => void + ) { + super(executor); + } + + unwrap(): Promise { + return this.then((result) => { + if (result.ok) { + return result.output; + } else { + throw new WaitpointTimeoutError(result.error.message); + } + }); + } +} diff --git a/packages/core/src/v3/workers/index.ts b/packages/core/src/v3/workers/index.ts index 4ca301fcdc7..e5f8eecff98 100644 --- a/packages/core/src/v3/workers/index.ts +++ b/packages/core/src/v3/workers/index.ts @@ -32,3 +32,4 @@ export { populateEnv } from "./populateEnv.js"; export { StandardTraceContextManager } from "../traceContext/manager.js"; export { StandardHeartbeatsManager } from "../heartbeats/manager.js"; export { StandardRealtimeStreamsManager } from "../realtimeStreams/manager.js"; +export { StandardInputStreamManager } from "../inputStreams/manager.js"; diff --git a/packages/core/src/v3/workers/taskExecutor.ts b/packages/core/src/v3/workers/taskExecutor.ts index c232864884b..2b9ffecf151 100644 --- a/packages/core/src/v3/workers/taskExecutor.ts +++ b/packages/core/src/v3/workers/taskExecutor.ts @@ -13,6 +13,7 @@ import { accessoryAttributes, attemptKey, flattenAttributes, + inputStreams, lifecycleHooks, OTEL_SPAN_ATTRIBUTE_COUNT_LIMIT, runMetadata, @@ -1046,6 +1047,7 @@ export class TaskExecutor { signal: AbortSignal ) { await this.#callCleanupFunctions(payload, ctx, initOutput, signal); + inputStreams.clearHandlers(); await this.#blockForWaitUntil(); } diff --git a/packages/react-hooks/src/hooks/useInputStreamSend.ts b/packages/react-hooks/src/hooks/useInputStreamSend.ts new file mode 100644 index 00000000000..d3f816f30e8 --- /dev/null +++ b/packages/react-hooks/src/hooks/useInputStreamSend.ts @@ -0,0 +1,60 @@ +"use client"; + +import useSWRMutation from "swr/mutation"; +import { useApiClient, UseApiClientOptions } from "./useApiClient.js"; + +export interface InputStreamSendInstance { + /** Send data to the input stream */ + send: (data: TData) => void; + /** Whether a send is currently in progress */ + isLoading: boolean; + /** Any error that occurred during the last send */ + error?: Error; + /** Whether the hook is ready to send (has runId and access token) */ + isReady: boolean; +} + +/** + * Hook to send data to an input stream on a running task. + * + * @template TData - The type of data to send + * @param streamId - The input stream identifier + * @param runId - The run to send input stream data to + * @param options - API client options (e.g. accessToken) + * + * @example + * ```tsx + * const { send, isLoading } = useInputStreamSend("my-stream", runId, { accessToken }); + * send({ message: "hello" }); + * ``` + */ +export function useInputStreamSend( + streamId: string, + runId?: string, + options?: UseApiClientOptions +): InputStreamSendInstance { + const apiClient = useApiClient(options); + + async function sendToStream(key: string, { arg }: { arg: { data: TData } }) { + if (!apiClient) { + throw new Error("Could not send to input stream: Missing access token"); + } + + if (!runId) { + throw new Error("Could not send to input stream: Missing run ID"); + } + + return await apiClient.sendInputStream(runId, streamId, arg.data); + } + + const mutation = useSWRMutation(runId ? `input-stream:${runId}:${streamId}` : null, sendToStream); + + return { + send: (data) => { + mutation.trigger({ data }); + }, + isLoading: mutation.isMutating, + isReady: !!runId && !!apiClient, + error: mutation.error, + }; +} diff --git a/packages/react-hooks/src/index.ts b/packages/react-hooks/src/index.ts index bc20cf837d4..23c8ca947d5 100644 --- a/packages/react-hooks/src/index.ts +++ b/packages/react-hooks/src/index.ts @@ -4,3 +4,4 @@ export * from "./hooks/useRun.js"; export * from "./hooks/useRealtime.js"; export * from "./hooks/useTaskTrigger.js"; export * from "./hooks/useWaitToken.js"; +export * from "./hooks/useInputStreamSend.js"; diff --git a/packages/trigger-sdk/src/v3/auth.ts b/packages/trigger-sdk/src/v3/auth.ts index ddcf92569af..1f2df463b6f 100644 --- a/packages/trigger-sdk/src/v3/auth.ts +++ b/packages/trigger-sdk/src/v3/auth.ts @@ -62,6 +62,11 @@ type PublicTokenPermissionProperties = { * Grant access to specific waitpoints */ waitpoints?: string | string[]; + + /** + * Grant access to send data to input streams on specific runs + */ + inputStreams?: string | string[]; }; export type PublicTokenPermissions = { diff --git a/packages/trigger-sdk/src/v3/streams.ts b/packages/trigger-sdk/src/v3/streams.ts index 3bdde70bea1..68edc2a64ab 100644 --- a/packages/trigger-sdk/src/v3/streams.ts +++ b/packages/trigger-sdk/src/v3/streams.ts @@ -1,6 +1,7 @@ import { type ApiRequestOptions, realtimeStreams, + inputStreams, taskContext, type RealtimeStreamOperationOptions, mergeRequestOptions, @@ -15,7 +16,19 @@ import { AppendStreamOptions, RealtimeDefinedStream, InferStreamType, + ManualWaitpointPromise, + WaitpointTimeoutError, + runtime, + type RealtimeDefinedInputStream, + type InputStreamSubscription, + type InputStreamOnceOptions, + InputStreamOncePromise, + type InputStreamOnceResult, + type InputStreamWaitOptions, + type SendInputStreamOptions, + type InferInputStreamType, } from "@trigger.dev/core/v3"; +import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; import { tracer } from "./tracer.js"; import { SpanStatusCode } from "@opentelemetry/api"; @@ -652,7 +665,191 @@ function define(opts: RealtimeDefineStreamOptions): RealtimeDefinedStream }; } -export type { InferStreamType }; +export type { InferStreamType, InferInputStreamType }; + +/** + * Define an input stream that can receive typed data from external callers. + * + * Inside a task, use `.on()`, `.once()`, or `.peek()` to receive data. + * Outside a task (e.g., from your backend), use `.send(runId, data)` to send data. + * + * @template TData - The type of data this input stream receives + * @param opts - Options including a unique `id` for this input stream + * + * @example + * ```ts + * import { streams, task } from "@trigger.dev/sdk"; + * + * const approval = streams.input<{ approved: boolean; reviewer: string }>({ id: "approval" }); + * + * export const myTask = task({ + * id: "my-task", + * run: async (payload) => { + * // Wait for the next approval + * const data = await approval.once().unwrap(); + * console.log(data.approved, data.reviewer); + * }, + * }); + * + * // From your backend: + * // await approval.send(runId, { approved: true, reviewer: "alice" }); + * ``` + */ +function input(opts: { id: string }): RealtimeDefinedInputStream { + return { + id: opts.id, + on(handler) { + return inputStreams.on( + opts.id, + handler as (data: unknown) => void | Promise + ); + }, + once(options) { + const ctx = taskContext.ctx; + const runId = ctx?.run.id; + + const innerPromise = inputStreams.once(opts.id, options); + + return new InputStreamOncePromise((resolve, reject) => { + tracer + .startActiveSpan( + `inputStream.once()`, + async () => { + const result = await innerPromise; + resolve(result as InputStreamOnceResult); + }, + { + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "streams", + [SemanticInternalAttributes.ENTITY_TYPE]: "input-stream", + ...(runId + ? { [SemanticInternalAttributes.ENTITY_ID]: `${runId}:${opts.id}` } + : {}), + streamId: opts.id, + ...accessoryAttributes({ + items: [{ text: opts.id, variant: "normal" }], + style: "codepath", + }), + }, + } + ) + .catch(reject); + }); + }, + peek() { + return inputStreams.peek(opts.id) as TData | undefined; + }, + wait(options) { + return new ManualWaitpointPromise(async (resolve, reject) => { + try { + const ctx = taskContext.ctx; + + if (!ctx) { + throw new Error("inputStream.wait() can only be used from inside a task.run()"); + } + + const apiClient = apiClientManager.clientOrThrow(); + + const result = await tracer.startActiveSpan( + `inputStream.wait()`, + async (span) => { + // 1. Create a waitpoint linked to this input stream + const response = await apiClient.createInputStreamWaitpoint(ctx.run.id, { + streamId: opts.id, + timeout: options?.timeout, + idempotencyKey: options?.idempotencyKey, + idempotencyKeyTTL: options?.idempotencyKeyTTL, + tags: options?.tags, + lastSeqNum: inputStreams.lastSeqNum(opts.id), + }); + + // Set the entity ID now that we have the waitpoint ID + span.setAttribute(SemanticInternalAttributes.ENTITY_ID, response.waitpointId); + + // 2. Block the run on the waitpoint + const waitResponse = await apiClient.waitForWaitpointToken({ + runFriendlyId: ctx.run.id, + waitpointFriendlyId: response.waitpointId, + }); + + if (!waitResponse.success) { + throw new Error("Failed to block on input stream waitpoint"); + } + + // 3. Suspend the task + const waitResult = await runtime.waitUntil(response.waitpointId); + + // 4. Parse the output + const data = + waitResult.output !== undefined + ? await conditionallyImportAndParsePacket( + { + data: waitResult.output, + dataType: waitResult.outputType ?? "application/json", + }, + apiClient + ) + : undefined; + + if (waitResult.ok) { + return { ok: true as const, output: data as TData }; + } else { + const error = new WaitpointTimeoutError(data?.message ?? "Timed out"); + + span.recordException(error); + span.setStatus({ code: SpanStatusCode.ERROR }); + + return { ok: false as const, error }; + } + }, + { + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "wait", + [SemanticInternalAttributes.ENTITY_TYPE]: "waitpoint", + streamId: opts.id, + ...accessoryAttributes({ + items: [ + { + text: opts.id, + variant: "normal", + }, + ], + style: "codepath", + }), + }, + } + ); + + resolve(result); + } catch (error) { + reject(error); + } + }); + }, + async send(runId, data, options) { + return tracer.startActiveSpan( + `inputStream.send()`, + async () => { + const apiClient = apiClientManager.clientOrThrow(); + await apiClient.sendInputStream(runId, opts.id, data, options?.requestOptions); + }, + { + attributes: { + [SemanticInternalAttributes.STYLE_ICON]: "streams", + [SemanticInternalAttributes.ENTITY_TYPE]: "input-stream", + [SemanticInternalAttributes.ENTITY_ID]: `${runId}:${opts.id}`, + streamId: opts.id, + runId, + ...accessoryAttributes({ + items: [{ text: opts.id, variant: "normal" }], + style: "codepath", + }), + }, + } + ); + }, + }; +} export const streams = { pipe, @@ -660,6 +857,7 @@ export const streams = { append, writer, define, + input, }; function getRunIdForOptions(options?: RealtimeStreamOperationOptions): string | undefined { diff --git a/packages/trigger-sdk/src/v3/wait.ts b/packages/trigger-sdk/src/v3/wait.ts index 7ac85b03429..aab0797f4f3 100644 --- a/packages/trigger-sdk/src/v3/wait.ts +++ b/packages/trigger-sdk/src/v3/wait.ts @@ -11,6 +11,7 @@ import { CursorPagePromise, flattenAttributes, ListWaitpointTokensQueryParams, + ManualWaitpointPromise, mergeRequestOptions, runtime, SemanticInternalAttributes, @@ -19,6 +20,7 @@ import { WaitpointRetrieveTokenResponse, WaitpointTokenStatus, WaitpointTokenTypedResult, + WaitpointTimeoutError, } from "@trigger.dev/core/v3"; import { conditionallyImportAndParsePacket } from "@trigger.dev/core/v3/utils/ioSerialization"; import { tracer } from "./tracer.js"; @@ -378,12 +380,7 @@ type WaitPeriod = years: number; }; -export class WaitpointTimeoutError extends Error { - constructor(message: string) { - super(message); - this.name = "WaitpointTimeoutError"; - } -} +export { WaitpointTimeoutError, ManualWaitpointPromise } from "@trigger.dev/core/v3"; const DURATION_WAIT_CHARGE_THRESHOLD_MS = 5000; @@ -393,29 +390,6 @@ function printWaitBelowThreshold() { ); } -class ManualWaitpointPromise extends Promise> { - constructor( - executor: ( - resolve: ( - value: WaitpointTokenTypedResult | PromiseLike> - ) => void, - reject: (reason?: any) => void - ) => void - ) { - super(executor); - } - - unwrap(): Promise { - return this.then((result) => { - if (result.ok) { - return result.output; - } else { - throw new WaitpointTimeoutError(result.error.message); - } - }); - } -} - export const wait = { for: async (options: WaitForOptions) => { const ctx = taskContext.ctx; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 48e6dd6ec01..dbf25048b6e 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -456,8 +456,8 @@ importers: specifier: ^0.1.3 version: 0.1.3(@remix-run/react@2.1.0(react-dom@18.2.0(react@18.2.0))(react@18.2.0)(typescript@5.5.4))(@remix-run/server-runtime@2.1.0(typescript@5.5.4)) '@s2-dev/streamstore': - specifier: ^0.17.2 - version: 0.17.3(typescript@5.5.4) + specifier: ^0.22.5 + version: 0.22.5(supports-color@10.0.0) '@sentry/remix': specifier: 9.46.0 version: 9.46.0(patch_hash=146126b032581925294aaed63ab53ce3f5e0356a755f1763d7a9a76b9846943b)(@remix-run/node@2.1.0(typescript@5.5.4))(@remix-run/react@2.1.0(react-dom@18.2.0(react@18.2.0))(react@18.2.0)(typescript@5.5.4))(@remix-run/server-runtime@2.1.0(typescript@5.5.4))(encoding@0.1.13)(react@18.2.0) @@ -1452,8 +1452,8 @@ importers: specifier: 1.36.0 version: 1.36.0 '@s2-dev/streamstore': - specifier: ^0.17.6 - version: 0.17.6 + specifier: ^0.22.5 + version: 0.22.5(supports-color@10.0.0) '@trigger.dev/build': specifier: workspace:4.4.1 version: link:../build @@ -1729,8 +1729,8 @@ importers: specifier: 1.36.0 version: 1.36.0 '@s2-dev/streamstore': - specifier: 0.17.3 - version: 0.17.3(typescript@5.5.4) + specifier: 0.22.5 + version: 0.22.5(supports-color@10.0.0) dequal: specifier: ^2.0.3 version: 2.0.3 @@ -9412,13 +9412,8 @@ packages: '@rushstack/eslint-patch@1.2.0': resolution: {integrity: sha512-sXo/qW2/pAcmT43VoRKOJbDOfV3cYpq3szSVfIThQXNt+E4DfKj361vaAt3c88U5tPUxzEswam7GW48PJqtKAg==} - '@s2-dev/streamstore@0.17.3': - resolution: {integrity: sha512-UeXL5+MgZQfNkbhCgEDVm7PrV5B3bxh6Zp4C5pUzQQwaoA+iGh2QiiIptRZynWgayzRv4vh0PYfnKpTzJEXegQ==} - peerDependencies: - typescript: 5.5.4 - - '@s2-dev/streamstore@0.17.6': - resolution: {integrity: sha512-ocjZfKaPKmo2yhudM58zVNHv3rBLSbTKkabVoLFn9nAxU6iLrR2CO3QmSo7/waohI3EZHAWxF/Pw8kA8d6QH2g==} + '@s2-dev/streamstore@0.22.5': + resolution: {integrity: sha512-GqdOKIbIoIxT+40fnKzHbrsHB6gBqKdECmFe7D3Ojk4FoN1Hu0LhFzZv6ZmVMjoHHU+55debS1xSWjZwQmbIyQ==} '@sec-ant/readable-stream@0.4.1': resolution: {integrity: sha512-831qok9r2t8AlxLko40y2ebgSDhenenCatLVeW/uBtnHPyhHOvG0C7TvfgecV+wHzIm5KUICgzmVpWS+IMEAeg==} @@ -18967,21 +18962,22 @@ packages: tar@6.1.13: resolution: {integrity: sha512-jdIBIN6LTIe2jqzay/2vtYLlBHa3JF42ot3h1dW8Q0PaAG4v8rm0cvpVePtau5C6OKXGGcgO9q2AMNSWxiLqKw==} engines: {node: '>=10'} - deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exhorbitant rates) by contacting i@izs.me + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me tar@6.2.1: resolution: {integrity: sha512-DZ4yORTwrbTj/7MZYq2w+/ZFdI6OZ/f9SFHR+71gIVUZhOQPHzVCLpvRnPgyaMpfWxxk/4ONva3GQSyNIKRv6A==} engines: {node: '>=10'} - deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exhorbitant rates) by contacting i@izs.me + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me tar@7.4.3: resolution: {integrity: sha512-5S7Va8hKfV7W5U6g3aYxXmlPoZVAwUMy9AOKyF2fVuZa2UD3qZjg578OrLRt8PcNN1PleVaL/5/yYATNL0ICUw==} engines: {node: '>=18'} - deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exhorbitant rates) by contacting i@izs.me + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me tar@7.5.6: resolution: {integrity: sha512-xqUeu2JAIJpXyvskvU3uvQW8PAmHrtXp2KDuMJwQqW8Sqq0CaZBAQ+dKS3RBXVhU4wC5NjAdKrmh84241gO9cA==} engines: {node: '>=18'} + deprecated: Old versions of tar are not supported, and contain widely publicized security vulnerabilities, which have been fixed in the current version. Please update. Support for old versions may be purchased (at exorbitant rates) by contacting i@izs.me tdigest@0.1.2: resolution: {integrity: sha512-+G0LLgjjo9BZX2MfdvPfH+MKLCrxlXSYec5DaPYP1fe6Iyhf0/fSmJ0bFiZ1F8BT6cGXl2LpltQptzjXKWEkKA==} @@ -22393,7 +22389,7 @@ snapshots: '@babel/traverse': 7.24.7 '@babel/types': 7.24.0 convert-source-map: 1.9.0 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) gensync: 1.0.0-beta.2 json5: 2.2.3 semver: 6.3.1 @@ -22681,7 +22677,7 @@ snapshots: '@babel/helper-split-export-declaration': 7.24.7 '@babel/parser': 7.27.5 '@babel/types': 7.27.3 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) globals: 11.12.0 transitivePeerDependencies: - supports-color @@ -23129,8 +23125,8 @@ snapshots: '@epic-web/test-server@0.1.0(bufferutil@4.0.9)': dependencies: - '@hono/node-server': 1.12.2(hono@4.5.11) - '@hono/node-ws': 1.0.4(@hono/node-server@1.12.2(hono@4.11.8))(bufferutil@4.0.9) + '@hono/node-server': 1.12.2(hono@4.11.8) + '@hono/node-ws': 1.0.4(@hono/node-server@1.12.2(hono@4.5.11))(bufferutil@4.0.9) '@open-draft/deferred-promise': 2.2.0 '@types/ws': 8.5.12 hono: 4.5.11 @@ -23662,7 +23658,7 @@ snapshots: '@eslint/eslintrc@1.4.1': dependencies: ajv: 6.12.6 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) espree: 9.6.0 globals: 13.19.0 ignore: 5.2.4 @@ -23877,17 +23873,17 @@ snapshots: dependencies: react: 18.2.0 - '@hono/node-server@1.12.2(hono@4.5.11)': + '@hono/node-server@1.12.2(hono@4.11.8)': dependencies: - hono: 4.5.11 + hono: 4.11.8 '@hono/node-server@1.19.9(hono@4.11.8)': dependencies: hono: 4.11.8 - '@hono/node-ws@1.0.4(@hono/node-server@1.12.2(hono@4.11.8))(bufferutil@4.0.9)': + '@hono/node-ws@1.0.4(@hono/node-server@1.12.2(hono@4.5.11))(bufferutil@4.0.9)': dependencies: - '@hono/node-server': 1.12.2(hono@4.5.11) + '@hono/node-server': 1.12.2(hono@4.11.8) ws: 8.18.3(bufferutil@4.0.9) transitivePeerDependencies: - bufferutil @@ -23896,7 +23892,7 @@ snapshots: '@humanwhocodes/config-array@0.11.8': dependencies: '@humanwhocodes/object-schema': 1.2.1 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) minimatch: 3.1.2 transitivePeerDependencies: - supports-color @@ -23912,7 +23908,7 @@ snapshots: '@antfu/install-pkg': 1.1.0 '@antfu/utils': 9.3.0 '@iconify/types': 2.0.0 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) globals: 15.15.0 kolorist: 1.8.0 local-pkg: 1.1.2 @@ -25645,7 +25641,7 @@ snapshots: '@puppeteer/browsers@2.10.6': dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) extract-zip: 2.0.1 progress: 2.0.3 proxy-agent: 6.5.0 @@ -29208,14 +29204,12 @@ snapshots: '@rushstack/eslint-patch@1.2.0': {} - '@s2-dev/streamstore@0.17.3(typescript@5.5.4)': - dependencies: - '@protobuf-ts/runtime': 2.11.1 - typescript: 5.5.4 - - '@s2-dev/streamstore@0.17.6': + '@s2-dev/streamstore@0.22.5(supports-color@10.0.0)': dependencies: '@protobuf-ts/runtime': 2.11.1 + debug: 4.4.3(supports-color@10.0.0) + transitivePeerDependencies: + - supports-color '@sec-ant/readable-stream@0.4.1': {} @@ -31296,7 +31290,7 @@ snapshots: dependencies: '@typescript-eslint/typescript-estree': 5.59.6(typescript@5.5.4) '@typescript-eslint/utils': 5.59.6(eslint@8.31.0)(typescript@5.5.4) - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) eslint: 8.31.0 tsutils: 3.21.0(typescript@5.5.4) optionalDependencies: @@ -31310,7 +31304,7 @@ snapshots: dependencies: '@typescript-eslint/types': 5.59.6 '@typescript-eslint/visitor-keys': 5.59.6 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) globby: 11.1.0 is-glob: 4.0.3 semver: 7.7.3 @@ -31819,7 +31813,7 @@ snapshots: agent-base@6.0.2: dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) transitivePeerDependencies: - supports-color @@ -32315,7 +32309,7 @@ snapshots: dependencies: bytes: 3.1.2 content-type: 1.0.5 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) http-errors: 2.0.0 iconv-lite: 0.6.3 on-finished: 2.4.1 @@ -32329,7 +32323,7 @@ snapshots: dependencies: bytes: 3.1.2 content-type: 1.0.5 - debug: 4.4.3 + debug: 4.4.3(supports-color@10.0.0) http-errors: 2.0.0 iconv-lite: 0.7.2 on-finished: 2.4.1 @@ -33321,15 +33315,15 @@ snapshots: dependencies: ms: 2.1.3 - debug@4.4.1(supports-color@10.0.0): + debug@4.4.1: dependencies: ms: 2.1.3 - optionalDependencies: - supports-color: 10.0.0 - debug@4.4.3: + debug@4.4.3(supports-color@10.0.0): dependencies: ms: 2.1.3 + optionalDependencies: + supports-color: 10.0.0 decamelize-keys@1.1.1: dependencies: @@ -33473,7 +33467,7 @@ snapshots: docker-modem@5.0.6: dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) readable-stream: 3.6.0 split-ca: 1.0.1 ssh2: 1.16.0 @@ -34170,7 +34164,7 @@ snapshots: eslint-import-resolver-typescript@3.5.5(@typescript-eslint/parser@5.59.6(eslint@8.31.0)(typescript@5.5.4))(eslint-import-resolver-node@0.3.7)(eslint-plugin-import@2.29.1)(eslint@8.31.0): dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) enhanced-resolve: 5.15.0 eslint: 8.31.0 eslint-module-utils: 2.7.4(@typescript-eslint/parser@5.59.6(eslint@8.31.0)(typescript@5.5.4))(eslint-import-resolver-node@0.3.7)(eslint-import-resolver-typescript@3.5.5)(eslint@8.31.0) @@ -34652,7 +34646,7 @@ snapshots: content-type: 1.0.5 cookie: 0.7.1 cookie-signature: 1.2.2 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) depd: 2.0.0 encodeurl: 2.0.0 escape-html: 1.0.3 @@ -34691,7 +34685,7 @@ snapshots: extract-zip@2.0.1: dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) get-stream: 5.2.0 yauzl: 2.10.0 optionalDependencies: @@ -34865,7 +34859,7 @@ snapshots: finalhandler@2.1.0(supports-color@10.0.0): dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) encodeurl: 2.0.0 escape-html: 1.0.3 on-finished: 2.4.1 @@ -35125,7 +35119,7 @@ snapshots: dependencies: basic-ftp: 5.0.3 data-uri-to-buffer: 5.0.1 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) fs-extra: 8.1.0 transitivePeerDependencies: - supports-color @@ -35284,7 +35278,7 @@ snapshots: '@types/node': 20.14.14 '@types/semver': 7.5.1 chalk: 4.1.2 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) interpret: 3.1.1 semver: 7.7.3 tslib: 2.8.1 @@ -35568,7 +35562,7 @@ snapshots: http-proxy-agent@7.0.2: dependencies: agent-base: 7.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) transitivePeerDependencies: - supports-color @@ -35581,14 +35575,14 @@ snapshots: https-proxy-agent@5.0.1: dependencies: agent-base: 6.0.2 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) transitivePeerDependencies: - supports-color https-proxy-agent@7.0.6: dependencies: agent-base: 7.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) transitivePeerDependencies: - supports-color @@ -35953,7 +35947,7 @@ snapshots: istanbul-lib-source-maps@5.0.6: dependencies: '@jridgewell/trace-mapping': 0.3.25 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) istanbul-lib-coverage: 3.2.2 transitivePeerDependencies: - supports-color @@ -37249,7 +37243,7 @@ snapshots: micromark@3.1.0: dependencies: '@types/debug': 4.1.12 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) decode-named-character-reference: 1.0.2 micromark-core-commonmark: 1.0.6 micromark-factory-space: 1.0.0 @@ -37271,7 +37265,7 @@ snapshots: micromark@4.0.2: dependencies: '@types/debug': 4.1.12 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) decode-named-character-reference: 1.0.2 devlop: 1.1.0 micromark-core-commonmark: 2.0.3 @@ -38159,7 +38153,7 @@ snapshots: dependencies: '@tootallnate/quickjs-emscripten': 0.23.0 agent-base: 7.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) get-uri: 6.0.1 http-proxy-agent: 7.0.2 https-proxy-agent: 7.0.6 @@ -38918,7 +38912,7 @@ snapshots: proxy-agent@6.5.0: dependencies: agent-base: 7.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) http-proxy-agent: 7.0.2 https-proxy-agent: 7.0.6 lru-cache: 7.18.3 @@ -38958,7 +38952,7 @@ snapshots: dependencies: '@puppeteer/browsers': 2.10.6 chromium-bidi: 7.2.0(devtools-protocol@0.0.1464554) - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.1 devtools-protocol: 0.0.1464554 typed-query-selector: 2.12.0 ws: 8.18.3(bufferutil@4.0.9) @@ -39779,7 +39773,7 @@ snapshots: remix-auth-oauth2@1.11.0(@remix-run/server-runtime@2.1.0(typescript@5.5.4))(remix-auth@3.6.0(@remix-run/react@2.1.0(react-dom@18.2.0(react@18.2.0))(react@18.2.0)(typescript@5.5.4))(@remix-run/server-runtime@2.1.0(typescript@5.5.4))): dependencies: '@remix-run/server-runtime': 2.1.0(typescript@5.5.4) - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) remix-auth: 3.6.0(@remix-run/react@2.1.0(react-dom@18.2.0(react@18.2.0))(react@18.2.0)(typescript@5.5.4))(@remix-run/server-runtime@2.1.0(typescript@5.5.4)) transitivePeerDependencies: - supports-color @@ -39843,7 +39837,7 @@ snapshots: require-in-the-middle@7.1.1(supports-color@10.0.0): dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) module-details-from-path: 1.0.3 resolve: 1.22.8 transitivePeerDependencies: @@ -39974,7 +39968,7 @@ snapshots: router@2.2.0: dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) depd: 2.0.0 is-promise: 4.0.0 parseurl: 1.3.3 @@ -40128,7 +40122,7 @@ snapshots: send@1.1.0(supports-color@10.0.0): dependencies: - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) destroy: 1.2.0 encodeurl: 2.0.0 escape-html: 1.0.3 @@ -40145,7 +40139,7 @@ snapshots: send@1.2.1: dependencies: - debug: 4.4.3 + debug: 4.4.3(supports-color@10.0.0) encodeurl: 2.0.0 escape-html: 1.0.3 etag: 1.8.1 @@ -40471,7 +40465,7 @@ snapshots: socks-proxy-agent@8.0.5: dependencies: agent-base: 7.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) socks: 2.8.3 transitivePeerDependencies: - supports-color @@ -40843,7 +40837,7 @@ snapshots: dependencies: component-emitter: 1.3.1 cookiejar: 2.1.4 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) fast-safe-stringify: 2.1.1 form-data: 4.0.4 formidable: 3.5.1 @@ -41963,7 +41957,7 @@ snapshots: vite-node@0.28.5(@types/node@20.14.14)(lightningcss@1.29.2)(terser@5.44.1): dependencies: cac: 6.7.14 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) mlly: 1.7.4 pathe: 1.1.2 picocolors: 1.1.1 @@ -41983,7 +41977,7 @@ snapshots: vite-node@3.1.4(@types/node@20.14.14)(lightningcss@1.29.2)(terser@5.44.1): dependencies: cac: 6.7.14 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.3(supports-color@10.0.0) es-module-lexer: 1.7.0 pathe: 2.0.3 vite: 5.4.21(@types/node@20.14.14)(lightningcss@1.29.2)(terser@5.44.1) @@ -42039,7 +42033,7 @@ snapshots: '@vitest/spy': 3.1.4 '@vitest/utils': 3.1.4 chai: 5.2.0 - debug: 4.4.1(supports-color@10.0.0) + debug: 4.4.1 expect-type: 1.2.1 magic-string: 0.30.21 pathe: 2.0.3 diff --git a/references/hello-world/src/trigger/agentRelay.ts b/references/hello-world/src/trigger/agentRelay.ts new file mode 100644 index 00000000000..cead2408e66 --- /dev/null +++ b/references/hello-world/src/trigger/agentRelay.ts @@ -0,0 +1,252 @@ +import { logger, runs, streams, task, wait } from "@trigger.dev/sdk/v3"; + +/** + * Multi-agent coordination via input streams. + * + * Demonstrates the pattern from the Agent Relay blog post: instead of a human + * copy-pasting context between agents, the agents talk to each other directly. + * + * Three "agents" (tasks) collaborate to write and review a document: + * - Planner: breaks the work into steps and sends them to the worker + * - Worker: executes each step, sends results to the reviewer + * - Reviewer: checks each result, sends feedback back to the worker + * + * The coordinator task wires them together — no human in the loop. + */ + +// -- Input streams for inter-agent communication -- + +/** Coordinator → Agent: tells an agent who its peers are */ +const connect = streams.input<{ workerRunId: string }>({ + id: "connect", +}); + +/** Planner → Worker: steps to execute */ +const planSteps = streams.input<{ step: number; instruction: string }>({ + id: "plan-steps", +}); + +/** Planner → Worker: signal that all steps have been sent */ +const planComplete = streams.input<{ totalSteps: number }>({ + id: "plan-complete", +}); + +/** Worker → Reviewer: completed work for review */ +const workResult = streams.input<{ step: number; output: string }>({ + id: "work-result", +}); + +/** Reviewer → Worker: feedback on completed work */ +const reviewFeedback = streams.input<{ + step: number; + approved: boolean; + comment: string; +}>({ + id: "review-feedback", +}); + +// -- Mock "AI" functions (replace with real LLM calls) -- + +function mockPlan(topic: string) { + return [ + { step: 1, instruction: `Research the topic: ${topic}` }, + { step: 2, instruction: `Write an outline for: ${topic}` }, + { step: 3, instruction: `Draft the introduction for: ${topic}` }, + ]; +} + +function mockWork(instruction: string): string { + return `[Completed] ${instruction} — Lorem ipsum dolor sit amet.`; +} + +function mockReview(output: string): { approved: boolean; comment: string } { + const approved = !output.includes("outline"); + return { + approved, + comment: approved ? "Looks good." : "Needs more detail in the structure.", + }; +} + +// -- Agent tasks -- + +/** + * Planner agent: receives a topic, breaks it into steps, sends them to the worker. + */ +export const plannerAgent = task({ + id: "agent-planner", + run: async (payload: { topic: string; workerRunId: string }) => { + const steps = mockPlan(payload.topic); + logger.info("Planner: created plan", { stepCount: steps.length }); + + for (const step of steps) { + await planSteps.send(payload.workerRunId, step); + logger.info("Planner: sent step", { step: step.step }); + await wait.for({ seconds: 1 }); + } + + await planComplete.send(payload.workerRunId, { totalSteps: steps.length }); + logger.info("Planner: all steps sent"); + + return { steps: steps.length }; + }, +}); + +/** + * Worker agent: receives steps from the planner, executes them, sends results + * to the reviewer, and incorporates feedback. + */ +export const workerAgent = task({ + id: "agent-worker", + run: async (payload: { reviewerRunId: string }) => { + const completedSteps: Array<{ + step: number; + output: string; + feedback: string; + }> = []; + + let totalSteps: number | null = null; + let stepsReceived = 0; + + // Listen for plan completion signal + planComplete.on((data) => { + totalSteps = data.totalSteps; + logger.info("Worker: plan complete signal received", { totalSteps }); + }); + + // Listen for review feedback + reviewFeedback.on((data) => { + logger.info("Worker: received feedback", { step: data.step, approved: data.approved }); + const entry = completedSteps.find((s) => s.step === data.step); + if (entry) { + entry.feedback = data.comment; + } + }); + + // Process steps as they arrive + planSteps.on(async (data) => { + stepsReceived++; + logger.info("Worker: received step", { step: data.step, instruction: data.instruction }); + + const output = mockWork(data.instruction); + completedSteps.push({ step: data.step, output, feedback: "" }); + + // Send result to reviewer + await workResult.send(payload.reviewerRunId, { step: data.step, output }); + logger.info("Worker: sent result to reviewer", { step: data.step }); + }); + + // Wait until all steps are received and processed + while (totalSteps === null || stepsReceived < totalSteps) { + await wait.for({ seconds: 1 }); + } + + // Give reviewer time to send feedback + await wait.for({ seconds: 5 }); + + logger.info("Worker: all done", { completedSteps }); + return { completedSteps }; + }, +}); + +/** + * Reviewer agent: receives work results, reviews them, sends feedback to the + * worker, and reports final results to the coordinator. + * + * The reviewer doesn't know the worker's run ID at spawn time — it receives + * it via the `connect` input stream once the coordinator has spawned both agents. + */ +export const reviewerAgent = task({ + id: "agent-reviewer", + run: async (payload: { expectedSteps: number }) => { + // Wait for the coordinator to tell us who the worker is + const { workerRunId } = await connect.once({ timeoutMs: 30_000 }).unwrap(); + logger.info("Reviewer: connected to worker", { workerRunId }); + + const reviews: Array<{ step: number; approved: boolean; comment: string }> = []; + + // Review each piece of work as it arrives + workResult.on(async (data) => { + logger.info("Reviewer: checking step", { step: data.step }); + + const review = mockReview(data.output); + reviews.push({ step: data.step, ...review }); + + // Send feedback back to worker + await reviewFeedback.send(workerRunId, { + step: data.step, + ...review, + }); + logger.info("Reviewer: sent feedback", { step: data.step, approved: review.approved }); + }); + + // Wait until all steps are reviewed + while (reviews.length < payload.expectedSteps) { + await wait.for({ seconds: 1 }); + } + + const approved = reviews.filter((r) => r.approved).length; + const rejected = reviews.filter((r) => !r.approved).length; + const summary = `Reviewed ${reviews.length} steps. ${approved} approved, ${rejected} need revision.`; + + logger.info("Reviewer: done", { summary }); + return { reviews, summary }; + }, +}); + +/** + * Coordinator: wires the agents together and collects results. + * + * This is the orchestrator — it spawns the agents, connects them via input + * streams, and waits for everything to complete. No human in the loop. + */ +export const agentRelayCoordinator = task({ + id: "agent-relay-coordinator", + run: async (payload: { topic?: string }) => { + const topic = payload.topic ?? "The future of multi-agent systems"; + logger.info("Coordinator: starting multi-agent workflow", { topic }); + + // Spawn worker and reviewer (order doesn't matter — they wait for connections) + const reviewerHandle = await reviewerAgent.trigger({ expectedSteps: 3 }); + const workerHandle = await workerAgent.trigger({ + reviewerRunId: reviewerHandle.id, + }); + + logger.info("Coordinator: agents spawned", { + workerId: workerHandle.id, + reviewerId: reviewerHandle.id, + }); + + // Tell the reviewer who the worker is so it can send feedback + await connect.send(reviewerHandle.id, { workerRunId: workerHandle.id }); + + // Spawn the planner — it sends steps directly to the worker + const plannerHandle = await plannerAgent.trigger({ + topic, + workerRunId: workerHandle.id, + }); + + logger.info("Coordinator: planner spawned, waiting for completion", { + plannerId: plannerHandle.id, + }); + + // Wait for all agents to complete + const [plannerRun, workerRun, reviewerRun] = await Promise.all([ + runs.poll(plannerHandle, { pollIntervalMs: 2000 }), + runs.poll(workerHandle, { pollIntervalMs: 2000 }), + runs.poll(reviewerHandle, { pollIntervalMs: 2000 }), + ]); + + logger.info("Coordinator: all agents complete", { + planner: plannerRun.output, + worker: workerRun.output, + reviewer: reviewerRun.output, + }); + + return { + topic, + planner: plannerRun.output, + worker: workerRun.output, + reviewer: reviewerRun.output, + }; + }, +}); diff --git a/references/hello-world/src/trigger/inputStreams.ts b/references/hello-world/src/trigger/inputStreams.ts new file mode 100644 index 00000000000..567c1738d7a --- /dev/null +++ b/references/hello-world/src/trigger/inputStreams.ts @@ -0,0 +1,149 @@ +import { logger, runs, streams, task, wait } from "@trigger.dev/sdk/v3"; + +// Define typed input streams +const approvalStream = streams.input<{ approved: boolean; reviewer: string }>({ + id: "approval", +}); + +const messageStream = streams.input<{ text: string }>({ id: "messages" }); + +/** + * Coordinator task that exercises all input stream patterns end-to-end. + * + * 1a. .once().unwrap() — trigger a child, send data, verify unwrap returns TData + * 1b. .once() result — trigger a child, send data, verify result object { ok, output } + * 2. .on() — trigger a child, send it multiple messages, poll until complete + * 3. .wait() — trigger a child, send it data (completes its waitpoint), poll until complete + * 4. .wait() race — send data before child calls .wait(), verify race handling + */ +export const inputStreamCoordinator = task({ + id: "input-stream-coordinator", + run: async () => { + const results: Record = {}; + + // --- Test 1a: .once() with .unwrap() --- + logger.info("Test 1a: .once().unwrap()"); + const onceUnwrapHandle = await inputStreamOnceUnwrap.trigger({}); + await wait.for({ seconds: 5 }); + await approvalStream.send(onceUnwrapHandle.id, { approved: true, reviewer: "coordinator-unwrap" }); + const onceUnwrapRun = await runs.poll(onceUnwrapHandle, { pollIntervalMs: 1000 }); + results.onceUnwrap = onceUnwrapRun.output; + logger.info("Test 1a passed", { output: onceUnwrapRun.output }); + + // --- Test 1b: .once() with result object --- + logger.info("Test 1b: .once() result object"); + const onceResultHandle = await inputStreamOnceResult.trigger({}); + await wait.for({ seconds: 5 }); + await approvalStream.send(onceResultHandle.id, { approved: true, reviewer: "coordinator-result" }); + const onceResultRun = await runs.poll(onceResultHandle, { pollIntervalMs: 1000 }); + results.onceResult = onceResultRun.output; + logger.info("Test 1b passed", { output: onceResultRun.output }); + + // --- Test 2: .on() with multiple messages --- + logger.info("Test 2: .on()"); + const onHandle = await inputStreamOn.trigger({ messageCount: 3 }); + await wait.for({ seconds: 5 }); + for (let i = 0; i < 3; i++) { + await messageStream.send(onHandle.id, { text: `message-${i}` }); + await wait.for({ seconds: 1 }); + } + const onRun = await runs.poll(onHandle, { pollIntervalMs: 1000 }); + results.on = onRun.output; + logger.info("Test 2 passed", { output: onRun.output }); + + // --- Test 3: .wait() (waitpoint-based) --- + logger.info("Test 3: .wait()"); + const waitHandle = await inputStreamWait.trigger({ timeout: "1m" }); + await wait.for({ seconds: 5 }); + await approvalStream.send(waitHandle.id, { approved: true, reviewer: "coordinator-wait" }); + const waitRun = await runs.poll(waitHandle, { pollIntervalMs: 1000 }); + results.wait = waitRun.output; + logger.info("Test 3 passed", { output: waitRun.output }); + + // --- Test 4: .wait() race condition (send before child calls .wait()) --- + logger.info("Test 4: .wait() race"); + const raceHandle = await inputStreamWait.trigger({ timeout: "1m" }); + await approvalStream.send(raceHandle.id, { approved: false, reviewer: "race-test" }); + const raceRun = await runs.poll(raceHandle, { pollIntervalMs: 1000 }); + results.race = raceRun.output; + logger.info("Test 4 passed", { output: raceRun.output }); + + logger.info("All input stream tests passed", { results }); + return results; + }, +}); + +/** + * Uses .once().unwrap() — returns TData directly, throws InputStreamTimeoutError on timeout. + */ +export const inputStreamOnceUnwrap = task({ + id: "input-stream-once-unwrap", + run: async (_payload: Record) => { + logger.info("Waiting for approval via .once().unwrap()"); + + const approval = await approvalStream.once({ timeoutMs: 30_000 }).unwrap(); + + logger.info("Received approval", { approval }); + return { approval }; + }, +}); + +/** + * Uses .once() with result object — check result.ok to handle timeout without try/catch. + */ +export const inputStreamOnceResult = task({ + id: "input-stream-once-result", + run: async (_payload: Record) => { + logger.info("Waiting for approval via .once() result object"); + + const result = await approvalStream.once({ timeoutMs: 30_000 }); + + if (!result.ok) { + logger.error("Timed out waiting for approval", { error: result.error.message }); + return { approval: null, timedOut: true }; + } + + logger.info("Received approval", { approval: result.output }); + return { approval: result.output }; + }, +}); + +/** + * Uses .on() to subscribe and collect multiple messages. + */ +export const inputStreamOn = task({ + id: "input-stream-on", + run: async (payload: { messageCount?: number }) => { + const expected = payload.messageCount ?? 3; + const received: { text: string }[] = []; + + logger.info("Subscribing to messages via .on()", { expected }); + + messageStream.on((data) => { + logger.info("Received message", { data }); + received.push(data); + }); + + while (received.length < expected) { + await wait.for({ seconds: 1 }); + } + + logger.info("Done receiving messages", { count: received.length }); + return { messages: received }; + }, +}); + +/** + * Uses .wait() to suspend the task via a waitpoint until data arrives. + */ +export const inputStreamWait = task({ + id: "input-stream-wait", + run: async (payload: { timeout?: string }) => { + logger.info("Waiting for approval via .wait()"); + const approval = await approvalStream.wait({ + timeout: payload.timeout ?? "5m", + }); + logger.info("Received approval via .wait()", { approval }); + return { approval }; + }, +}); diff --git a/references/hello-world/src/trigger/realtime.ts b/references/hello-world/src/trigger/realtime.ts index c53bb2f16ad..3bd051f6a95 100644 --- a/references/hello-world/src/trigger/realtime.ts +++ b/references/hello-world/src/trigger/realtime.ts @@ -80,13 +80,13 @@ export const realtimeStreamsTask = task({ export const realtimeStreamsV2Task = task({ id: "realtime-streams-v2", run: async () => { - const mockStream1 = createStreamFromGenerator(generateMockData(5 * 60 * 1000)); + const mockStream1 = createStreamFromGenerator(generateMockData(10_000)); await metadata.stream("mock-data", mockStream1); - await setTimeout(10000); // Offset by 10 seconds + await setTimeout(5000); // Offset by 5 seconds - const mockStream2 = createStreamFromGenerator(generateMockData(5 * 60 * 1000)); + const mockStream2 = createStreamFromGenerator(generateMockData(10_000)); const stream2 = await metadata.stream("mock-data", mockStream2); for await (const chunk of stream2) { diff --git a/references/hello-world/src/trigger/streams.ts b/references/hello-world/src/trigger/streams.ts new file mode 100644 index 00000000000..9a10888b532 --- /dev/null +++ b/references/hello-world/src/trigger/streams.ts @@ -0,0 +1,17 @@ +import { streams, InferStreamType } from "@trigger.dev/sdk"; + +export const textStream = streams.define({ + id: "text", +}); + +export const progressStream = streams.define<{ step: string; percent: number }>({ + id: "progress", +}); + +export const logStream = streams.define({ + id: "logs", +}); + +export type TextStreamPart = InferStreamType; +export type ProgressStreamPart = InferStreamType; +export type LogStreamPart = InferStreamType; diff --git a/references/hello-world/src/trigger/streamsV2.ts b/references/hello-world/src/trigger/streamsV2.ts new file mode 100644 index 00000000000..cf51e40e5d3 --- /dev/null +++ b/references/hello-world/src/trigger/streamsV2.ts @@ -0,0 +1,229 @@ +import { logger, streams, task } from "@trigger.dev/sdk"; +import { setTimeout } from "timers/promises"; +import { textStream, progressStream, logStream } from "./streams.js"; + +// Test 1: .pipe() then read back from S2 via a coordinator +export const streamsPipeTask = task({ + id: "streams-pipe", + run: async () => { + const source = ReadableStream.from(generateChunks(5)); + const { waitUntilComplete } = textStream.pipe(source); + await waitUntilComplete(); + + return { written: 5 }; + }, +}); + +export const streamsPipeReadTask = task({ + id: "streams-pipe-read", + run: async () => { + const handle = await streamsPipeTask.trigger({}); + + const stream = await textStream.read(handle.id); + const chunks: string[] = []; + for await (const chunk of stream) { + logger.info("read chunk from pipe", { chunk }); + chunks.push(chunk); + } + + return { chunks }; + }, +}); + +// Test 2: .append() then read back from S2 +export const streamsAppendTask = task({ + id: "streams-append", + run: async () => { + await logStream.append("Starting processing"); + await progressStream.append({ step: "init", percent: 0 }); + + await setTimeout(500); + await logStream.append("Step 1 complete"); + await progressStream.append({ step: "step-1", percent: 33 }); + + await setTimeout(500); + await logStream.append("Step 2 complete"); + await progressStream.append({ step: "step-2", percent: 66 }); + + await setTimeout(500); + await logStream.append("All done"); + await progressStream.append({ step: "done", percent: 100 }); + + return { success: true }; + }, +}); + +export const streamsAppendReadTask = task({ + id: "streams-append-read", + run: async () => { + const handle = await streamsAppendTask.trigger({}); + + // Read both log and progress streams from the child + const logStreamReader = await logStream.read(handle.id); + const logs: string[] = []; + for await (const chunk of logStreamReader) { + logger.info("read log", { chunk }); + logs.push(chunk); + } + + const progressStreamReader = await progressStream.read(handle.id); + const steps: Array<{ step: string; percent: number }> = []; + for await (const chunk of progressStreamReader) { + logger.info("read progress", { chunk }); + steps.push(chunk); + } + + return { logs, steps }; + }, +}); + +// Test 3: .writer() then read back +export const streamsWriterTask = task({ + id: "streams-writer", + run: async () => { + const { waitUntilComplete } = logStream.writer({ + execute: ({ write, merge }) => { + write("Line 1 from write()"); + write("Line 2 from write()"); + + const moreLines = ReadableStream.from(["Line 3 from merge()", "Line 4 from merge()"]); + merge(moreLines); + }, + }); + + await waitUntilComplete(); + + return { written: 4 }; + }, +}); + +export const streamsWriterReadTask = task({ + id: "streams-writer-read", + run: async () => { + const handle = await streamsWriterTask.trigger({}); + + const stream = await logStream.read(handle.id); + const lines: string[] = []; + for await (const chunk of stream) { + logger.info("read writer line", { chunk }); + lines.push(chunk); + } + + return { lines }; + }, +}); + +// Test 4: Direct streams.pipe() then read back with streams.read() +export const streamsDirectPipeTask = task({ + id: "streams-direct-pipe", + run: async () => { + const source = ReadableStream.from(generateChunks(3)); + const { waitUntilComplete } = streams.pipe("direct-output", source); + await waitUntilComplete(); + + return { written: 3 }; + }, +}); + +export const streamsDirectPipeReadTask = task({ + id: "streams-direct-pipe-read", + run: async () => { + const handle = await streamsDirectPipeTask.trigger({}); + + const stream = await streams.read(handle.id, "direct-output"); + const chunks: string[] = []; + for await (const chunk of stream) { + logger.info("read direct pipe chunk", { chunk }); + chunks.push(chunk as string); + } + + return { chunks }; + }, +}); + +// Test 5: Direct streams.append() then read back +export const streamsDirectAppendTask = task({ + id: "streams-direct-append", + run: async () => { + await streams.append("direct-logs", "Log entry 1"); + await setTimeout(300); + await streams.append("direct-logs", "Log entry 2"); + await setTimeout(300); + await streams.append("direct-logs", "Log entry 3"); + + return { written: 3 }; + }, +}); + +export const streamsDirectAppendReadTask = task({ + id: "streams-direct-append-read", + run: async () => { + const handle = await streamsDirectAppendTask.trigger({}); + + const stream = await streams.read(handle.id, "direct-logs"); + const entries: string[] = []; + for await (const chunk of stream) { + logger.info("read direct append entry", { chunk }); + entries.push(chunk as string); + } + + return { entries }; + }, +}); + +// Test 6: Multiple streams in one task, read all back +export const streamsMultiTask = task({ + id: "streams-multi", + run: async () => { + await logStream.append("Starting multi-stream test"); + await progressStream.append({ step: "start", percent: 0 }); + + const source = ReadableStream.from(generateChunks(3)); + const { waitUntilComplete } = textStream.pipe(source); + + await setTimeout(500); + await logStream.append("Text stream piped"); + await progressStream.append({ step: "piped", percent: 50 }); + + await waitUntilComplete(); + + await logStream.append("Complete"); + await progressStream.append({ step: "done", percent: 100 }); + + return { success: true }; + }, +}); + +export const streamsMultiReadTask = task({ + id: "streams-multi-read", + run: async () => { + const handle = await streamsMultiTask.trigger({}); + + const logReader = await logStream.read(handle.id); + const logs: string[] = []; + for await (const chunk of logReader) { + logs.push(chunk); + } + + const progressReader = await progressStream.read(handle.id); + const steps: Array<{ step: string; percent: number }> = []; + for await (const chunk of progressReader) { + steps.push(chunk); + } + + const textReader = await textStream.read(handle.id); + const texts: string[] = []; + for await (const chunk of textReader) { + texts.push(chunk); + } + + return { logs, steps, texts }; + }, +}); + +async function* generateChunks(count: number) { + for (let i = 0; i < count; i++) { + await setTimeout(200); + yield `chunk-${i}`; + } +} diff --git a/references/realtime-streams/src/app/actions.ts b/references/realtime-streams/src/app/actions.ts index 2c18d11e6c6..b411dadc2c2 100644 --- a/references/realtime-streams/src/app/actions.ts +++ b/references/realtime-streams/src/app/actions.ts @@ -3,6 +3,8 @@ import { tasks, auth } from "@trigger.dev/sdk"; import type { streamsTask } from "@/trigger/streams"; import type { aiChatTask } from "@/trigger/ai-chat"; +import type { approvalTask } from "@/trigger/approval"; +import type { messagesTask } from "@/trigger/messages"; import { redirect } from "next/navigation"; import type { UIMessage } from "ai"; @@ -43,6 +45,16 @@ export async function triggerStreamTask( redirect(path); } +export async function triggerApprovalTask() { + const handle = await tasks.trigger("approval-flow", {}); + redirect(`/approval/${handle.id}?accessToken=${handle.publicAccessToken}`); +} + +export async function triggerMessagesTask() { + const handle = await tasks.trigger("messages-flow", { messageCount: 5 }); + redirect(`/messages/${handle.id}?accessToken=${handle.publicAccessToken}`); +} + export async function triggerAIChatTask(messages: UIMessage[]) { // Trigger the AI chat task const handle = await tasks.trigger("ai-chat", { diff --git a/references/realtime-streams/src/app/approval/[runId]/page.tsx b/references/realtime-streams/src/app/approval/[runId]/page.tsx new file mode 100644 index 00000000000..b35d2cbef0b --- /dev/null +++ b/references/realtime-streams/src/app/approval/[runId]/page.tsx @@ -0,0 +1,54 @@ +import { ApprovalFlow } from "@/components/approval-flow"; +import Link from "next/link"; + +export default function ApprovalPage({ + params, + searchParams, +}: { + params: { runId: string }; + searchParams: { accessToken?: string }; +}) { + const { runId } = params; + const accessToken = searchParams.accessToken; + + if (!accessToken) { + return ( +
+
+

Missing Access Token

+

This page requires an access token.

+ + Go back home + +
+
+ ); + } + + return ( +
+
+
+

Approval Flow: {runId}

+ + ← Back to Home + +
+ +
+

+ This task is waiting for your approval. Click Approve or Reject to send input to the + running task via useInputStreamSend. +

+
+ +
+ +
+
+
+ ); +} diff --git a/references/realtime-streams/src/app/messages/[runId]/page.tsx b/references/realtime-streams/src/app/messages/[runId]/page.tsx new file mode 100644 index 00000000000..040337fe411 --- /dev/null +++ b/references/realtime-streams/src/app/messages/[runId]/page.tsx @@ -0,0 +1,56 @@ +import { MessagesFlow } from "@/components/messages-flow"; +import Link from "next/link"; + +export default function MessagesPage({ + params, + searchParams, +}: { + params: { runId: string }; + searchParams: { accessToken?: string }; +}) { + const { runId } = params; + const accessToken = searchParams.accessToken; + + if (!accessToken) { + return ( +
+
+

Missing Access Token

+

This page requires an access token.

+ + Go back home + +
+
+ ); + } + + return ( +
+
+
+

Messages Flow: {runId}

+ + ← Back to Home + +
+ +
+

+ Send messages to the running task using{" "} + useInputStreamSend. The task + subscribes via .on() and logs each + message as it arrives. Send 5 messages to complete the task. +

+
+ +
+ +
+
+
+ ); +} diff --git a/references/realtime-streams/src/app/page.tsx b/references/realtime-streams/src/app/page.tsx index 76beed7a291..63b22909e97 100644 --- a/references/realtime-streams/src/app/page.tsx +++ b/references/realtime-streams/src/app/page.tsx @@ -1,5 +1,6 @@ import { TriggerButton } from "@/components/trigger-button"; import { AIChatButton } from "@/components/ai-chat-button"; +import { triggerApprovalTask, triggerMessagesTask } from "./actions"; export default function Home() { return ( @@ -45,6 +46,31 @@ export default function Home() {
+
+

Input Streams

+

+ Test sending data from the UI to a running task via useInputStreamSend +

+
+
+ +
+
+ +
+
+
+

Performance Testing

diff --git a/references/realtime-streams/src/app/streams.ts b/references/realtime-streams/src/app/streams.ts index da977ab8714..decbea455a5 100644 --- a/references/realtime-streams/src/app/streams.ts +++ b/references/realtime-streams/src/app/streams.ts @@ -10,3 +10,11 @@ export type DemoStreamPart = InferStreamType; export const aiStream = streams.define({ id: "ai", }); + +export const approvalInputStream = streams.input<{ approved: boolean; reviewer: string }>({ + id: "approval", +}); + +export const messageInputStream = streams.input<{ text: string }>({ + id: "messages", +}); diff --git a/references/realtime-streams/src/components/approval-flow.tsx b/references/realtime-streams/src/components/approval-flow.tsx new file mode 100644 index 00000000000..a108735fd48 --- /dev/null +++ b/references/realtime-streams/src/components/approval-flow.tsx @@ -0,0 +1,92 @@ +"use client"; + +import { useRealtimeRun, useInputStreamSend } from "@trigger.dev/react-hooks"; +import type { approvalTask } from "@/trigger/approval"; + +export function ApprovalFlow({ + runId, + accessToken, +}: { + runId: string; + accessToken: string; +}) { + const { run, error: runError } = useRealtimeRun(runId, { + accessToken, + baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL, + }); + const { + send, + isLoading: isSending, + error: sendError, + } = useInputStreamSend<{ approved: boolean; reviewer: string }>("approval", runId, { + accessToken, + baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL, + }); + + const status = run?.metadata?.status as string | undefined; + const reviewer = run?.metadata?.reviewer as string | undefined; + const isWaiting = status === "waiting-for-approval"; + const isCompleted = run?.status === "COMPLETED"; + const isFailed = run?.status === "FAILED" || run?.status === "CANCELED"; + + return ( +
+
+
+ + {!run + ? "Loading..." + : isWaiting + ? "Waiting for approval" + : status === "approved" + ? `Approved by ${reviewer}` + : status === "rejected" + ? `Rejected by ${reviewer}` + : status === "timed-out" + ? "Timed out" + : `Status: ${run?.status ?? "unknown"}`} + +
+ + {isWaiting && ( +
+ + +
+ )} + + {runError &&

Run error: {runError.message}

} + {sendError &&

Send error: {sendError.message}

} + + {(isCompleted || isFailed) && run?.output && ( +
+          {JSON.stringify(run.output, null, 2)}
+        
+ )} +
+ ); +} diff --git a/references/realtime-streams/src/components/messages-flow.tsx b/references/realtime-streams/src/components/messages-flow.tsx new file mode 100644 index 00000000000..6065f2bc99c --- /dev/null +++ b/references/realtime-streams/src/components/messages-flow.tsx @@ -0,0 +1,97 @@ +"use client"; + +import { useState } from "react"; +import { useRealtimeRun, useInputStreamSend } from "@trigger.dev/react-hooks"; +import type { messagesTask } from "@/trigger/messages"; + +export function MessagesFlow({ + runId, + accessToken, +}: { + runId: string; + accessToken: string; +}) { + const [inputText, setInputText] = useState(""); + const { run, error: runError } = useRealtimeRun(runId, { + accessToken, + baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL, + }); + const { + send, + isLoading: isSending, + error: sendError, + } = useInputStreamSend<{ text: string }>("messages", runId, { + accessToken, + baseURL: process.env.NEXT_PUBLIC_TRIGGER_API_URL, + }); + + const status = run?.metadata?.status as string | undefined; + const received = (run?.metadata?.received as number) ?? 0; + const expected = (run?.metadata?.expected as number) ?? 0; + const isListening = status === "listening"; + const isDone = status === "done"; + const isCompleted = run?.status === "COMPLETED"; + + function handleSend() { + if (!inputText.trim()) return; + send({ text: inputText.trim() }); + setInputText(""); + } + + return ( +
+
+
+ + {!run + ? "Loading..." + : isListening + ? `Listening for messages (${received}/${expected})` + : isDone + ? `Received all ${received} messages` + : `Status: ${run?.status ?? "unknown"}`} + +
+ + {isListening && ( +
+ setInputText(e.target.value)} + onKeyDown={(e) => e.key === "Enter" && handleSend()} + placeholder="Type a message..." + className="flex-1 px-4 py-2 border border-gray-300 rounded-lg focus:outline-none focus:ring-2 focus:ring-blue-500" + /> + +
+ )} + + {runError &&

Run error: {runError.message}

} + {sendError &&

Send error: {sendError.message}

} + + {isCompleted && run?.output && ( +
+

Task output:

+
+            {JSON.stringify(run.output, null, 2)}
+          
+
+ )} +
+ ); +} diff --git a/references/realtime-streams/src/trigger/approval.ts b/references/realtime-streams/src/trigger/approval.ts new file mode 100644 index 00000000000..11692820da1 --- /dev/null +++ b/references/realtime-streams/src/trigger/approval.ts @@ -0,0 +1,20 @@ +import { task, metadata } from "@trigger.dev/sdk"; +import { approvalInputStream } from "../app/streams"; + +export const approvalTask = task({ + id: "approval-flow", + run: async () => { + metadata.set("status", "waiting-for-approval"); + + const result = await approvalInputStream.wait({ timeout: "5m" }); + + if (result.ok) { + metadata.set("status", result.output.approved ? "approved" : "rejected"); + metadata.set("reviewer", result.output.reviewer); + } else { + metadata.set("status", "timed-out"); + } + + return { approval: result }; + }, +}); diff --git a/references/realtime-streams/src/trigger/messages.ts b/references/realtime-streams/src/trigger/messages.ts new file mode 100644 index 00000000000..2dfea5e660a --- /dev/null +++ b/references/realtime-streams/src/trigger/messages.ts @@ -0,0 +1,33 @@ +import { task, logger, metadata, wait } from "@trigger.dev/sdk"; +import { messageInputStream } from "../app/streams"; + +export const messagesTask = task({ + id: "messages-flow", + run: async (payload: { messageCount?: number }) => { + const expected = payload.messageCount ?? 5; + const received: { text: string }[] = []; + + metadata.set("status", "listening"); + metadata.set("received", 0); + metadata.set("expected", expected); + + logger.info("Subscribing to messages via .on()", { expected }); + + const { off } = messageInputStream.on((data) => { + logger.info("Received message", { data }); + received.push(data); + metadata.set("received", received.length); + }); + + while (received.length < expected) { + await wait.for({ seconds: 1 }); + } + + off(); + + metadata.set("status", "done"); + logger.info("Done receiving messages", { count: received.length }); + + return { messages: received }; + }, +});