Linkage LogoLinkage Docs

Execution Adapter

Bring your own runtime (Temporal, queues, Lambdas) and stream run state into Linkage

Linkage is execution-agnostic: it renders workflow UIs and run visualization, but it does not run your workflows. Instead, you integrate your existing runtime (Temporal, queue workers, Lambdas, cron + job runner, agents, etc.) via an execution adapter that streams run events into the UI.

What you provide

At minimum, your runtime needs to expose a way for Linkage to subscribe to run events.

Conceptually, the adapter looks like:

type Unsubscribe = () => void;

type RunSnapshot = {
  runId: string;
  workflowId: string;
  status: RunStatus;
  nodeStatuses?: Record<string, RunStatus>;
  startedAt?: string;
  finishedAt?: string;
  metadata?: Record<string, unknown>;
};

type ExecutionAdapter = {
  subscribe: (
    scope: { workflowId: string } | { runId: string },
    onEvent: (event: RunEvent) => void,
  ) => Unsubscribe;

  // Optional (Linkage can support “Run” buttons, cancel, and lazy output fetch)
  startRun?: (workflowId: string, inputs?: unknown) => Promise<{ runId: string }>;
  cancelRun?: (runId: string) => Promise<void>;
  fetchRun?: (runId: string) => Promise<RunSnapshot>;
  fetchNodeOutput?: (runId: string, nodeId: string) => Promise<unknown>;
};

Event schema

Events are small deltas that update the run UI over time.

type RunStatus =
  | "queued"
  | "running"
  | "success"
  | "error"
  | "skipped"
  | "canceled";

type RunEvent =
  | {
      type: "run.created" | "run.updated";
      runId: string;
      workflowId: string;
      status: RunStatus;
      timestamp: string;
    }
  | {
      type: "node.status";
      runId: string;
      nodeId: string;
      status: RunStatus;
      timestamp: string;
      message?: string;
    }
  | {
      type: "log.append";
      runId: string;
      nodeId?: string;
      timestamp: string;
      level?: "debug" | "info" | "warn" | "error";
      message: string;
    }
  | {
      type: "output.updated";
      runId: string;
      nodeId: string;
      timestamp: string;
      output: unknown;
    };

Notes:

  • timestamp should be an ISO string (new Date().toISOString()).
  • Events should be idempotent (replaying them should converge on the same UI state).
  • If you can’t guarantee ordering, include a monotonically increasing sequence and have the client drop older events.

Transport options

The MVP transport plan is:

  • SSE (preferred): best UX for live logs/status updates.
  • Polling fallback: simple to ship; works in locked-down environments.
  • Mock adapter: in-memory events for demos/tests.

Built-in mock adapter

@linkage-open/lib ships createMockExecutionAdapter for demos, storybooks, and tests.

import { createMockExecutionAdapter } from "@linkage-open/lib";

const adapter = createMockExecutionAdapter();

const { runId } = await adapter.startRun!("workflow_123");

adapter.emit({
  type: "node.status",
  runId,
  nodeId: "fetch_customer",
  status: "running",
  timestamp: new Date().toISOString(),
});

Run visualization UI

Use RunPanel to render the workflow run list, per-node statuses, streaming logs, and node outputs.

import { RunPanel } from "@linkage-open/lib/react";
import { createHttpExecutionAdapter } from "@linkage-open/lib";

const adapter = createHttpExecutionAdapter({
  baseUrl: "https://api.example.com",
  endpoints: {
    eventsPoll: ({ workflowId }) => `/runs/workflow/${workflowId}/events`,
    eventsSse: ({ workflowId }) => `/runs/workflow/${workflowId}/events/stream`,
    startRun: ({ workflowId }) => `/runs/workflow/${workflowId}/start`,
    cancelRun: ({ runId }) => `/runs/${runId}/cancel`,
    fetchRun: ({ runId }) => `/runs/${runId}`,
  },
});

<RunPanel adapter={adapter} workflowId="workflow_123" />;

Concrete integrations

Below are two concrete patterns that satisfy the “Works with Temporal, queues, Lambdas…” promise. In both cases, the key idea is the same: your runtime emits RunEvents into a store/stream, and the Linkage UI subscribes to those events.

Example: Temporal

Goal: Execute the workflow in Temporal, while Linkage renders live status/log/output using the adapter.

Recommended mapping:

  • Linkage workflowId = your saved Linkage graph id.
  • Linkage runId = a stable identifier you can resolve back to Temporal (often temporalWorkflowId + temporalRunId).

Pattern: emit RunEvents from the Temporal run via an Activity (workflows can’t do network I/O directly), write them to a database, and stream/poll them from your app.

Workflow (high-level):

// Temporal workflow (pseudo-code)
import { now, proxyActivities } from "@temporalio/workflow";

const { appendRunEvent } = proxyActivities<{
  appendRunEvent: (event: RunEvent) => Promise<void>;
}>({ startToCloseTimeout: "10 seconds" });

export async function linkageTemporalRun(args: {
  linkageWorkflowId: string;
  linkageRunId: string;
}) {
  await appendRunEvent({
    type: "run.created",
    runId: args.linkageRunId,
    workflowId: args.linkageWorkflowId,
    status: "running",
    timestamp: new Date(now()).toISOString(),
  });

  // ...execute your graph...

  await appendRunEvent({
    type: "run.updated",
    runId: args.linkageRunId,
    workflowId: args.linkageWorkflowId,
    status: "success",
    timestamp: new Date(now()).toISOString(),
  });
}

Activity (write to an append-only table; also optionally publish to a pub/sub stream):

// Temporal activity (server-side)
export async function appendRunEvent(event: RunEvent) {
  // Store: Postgres, Redis Streams, DynamoDB, etc.
  await runEventStore.append(event.runId, event);
}

Subscribe: your app exposes GET /api/runs/:runId/events (SSE) and/or GET /api/runs/:runId/events?cursor=... (poll). Those endpoints read from runEventStore.

If you want a “Run” button, implement startRun by calling Temporal from your backend (e.g. client.workflow.start(...)) and returning the runId you’ll use for subscription:

import { Client } from "@temporalio/client";

export async function startRunTemporal(input: {
  linkageWorkflowId: string;
  linkageRunId: string;
}) {
  const client = new Client();
  await client.workflow.start(linkageTemporalRun, {
    taskQueue: "linkage",
    workflowId: `linkage:${input.linkageWorkflowId}:${input.linkageRunId}`,
    args: [input],
  });
  return { runId: input.linkageRunId };
}

Example: AWS Lambdas (+ queue)

Goal: Execute nodes in Lambdas (often via SQS), while Linkage renders run state using polling.

Recommended mapping:

  • Linkage runId = an application-generated id (e.g. ULID/UUID) you attach to every queued job and every emitted event.

Pattern: each Lambda writes RunEvents to an append-only store (DynamoDB is a common fit), and your UI polls via cursor.

Lambda writes events:

// Lambda handler (pseudo-code)
export async function handler(job: { runId: string; nodeId: string }) {
  await runEventStore.append(job.runId, {
    type: "node.status",
    runId: job.runId,
    nodeId: job.nodeId,
    status: "running",
    timestamp: new Date().toISOString(),
  });

  const output = await executeNode(job);

  await runEventStore.append(job.runId, {
    type: "output.updated",
    runId: job.runId,
    nodeId: job.nodeId,
    output,
    timestamp: new Date().toISOString(),
  });

  await runEventStore.append(job.runId, {
    type: "node.status",
    runId: job.runId,
    nodeId: job.nodeId,
    status: "success",
    timestamp: new Date().toISOString(),
  });
}

Concrete DynamoDB shape (one partition per run):

  • pk = "RUN#<runId>"
  • sk = "EVT#<ulid>" (or EVT#<sequence>)
  • event = RunEvent (JSON)

Append example:

import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { DynamoDBDocumentClient, PutCommand } from "@aws-sdk/lib-dynamodb";
import { ulid } from "ulid";

const ddb = DynamoDBDocumentClient.from(new DynamoDBClient({}));
const tableName = process.env.RUN_EVENTS_TABLE!;

export async function appendRunEventDdb(runId: string, event: RunEvent) {
  await ddb.send(
    new PutCommand({
      TableName: tableName,
      Item: {
        pk: `RUN#${runId}`,
        sk: `EVT#${ulid()}`,
        event,
      },
    }),
  );
}

Polling endpoint reads events in order (cursor can be the last seen sequence or DynamoDB ExclusiveStartKey):

// GET /api/runs/:runId/events?cursor=...
// return { events: RunEvent[], nextCursor?: string }

Concrete query example (cursor = base64url-encoded LastEvaluatedKey):

import { QueryCommand } from "@aws-sdk/lib-dynamodb";

export async function fetchRunEventsDdb(runId: string, cursor?: string) {
  const exclusiveStartKey = cursor
    ? JSON.parse(Buffer.from(cursor, "base64url").toString("utf8"))
    : undefined;

  const res = await ddb.send(
    new QueryCommand({
      TableName: tableName,
      KeyConditionExpression: "pk = :pk",
      ExpressionAttributeValues: { ":pk": `RUN#${runId}` },
      ExclusiveStartKey: exclusiveStartKey,
      ScanIndexForward: true,
      Limit: 200,
    }),
  );

  const nextCursor = res.LastEvaluatedKey
    ? Buffer.from(JSON.stringify(res.LastEvaluatedKey)).toString("base64url")
    : undefined;

  return { events: (res.Items ?? []).map((item) => item.event as RunEvent), nextCursor };
}

Notes:

  • SSE is possible on AWS, but it’s usually easier to ship polling unless you’re already using long-lived compute (ECS/Fargate) or a managed realtime option.
  • You can attach CloudWatch logs to the run UI by forwarding logs into log.append events (or by linking out to CloudWatch from the run metadata).

SSE example

If your backend can serve text/event-stream, the client can subscribe like this:

export function subscribeSse(runId: string, onEvent: (e: RunEvent) => void) {
  const es = new EventSource(`/api/runs/${runId}/events`);
  es.onmessage = (msg) => onEvent(JSON.parse(msg.data));
  return () => es.close();
}

Polling example (with cursor)

export function subscribePoll(runId: string, onEvent: (e: RunEvent) => void) {
  let cursor: string | undefined;
  const tick = async () => {
    const res = await fetch(`/api/runs/${runId}/events?cursor=${cursor ?? ""}`);
    const data = (await res.json()) as { events: RunEvent[]; nextCursor?: string };
    data.events.forEach(onEvent);
    cursor = data.nextCursor;
  };
  const intervalId = window.setInterval(tick, 1000);
  void tick();
  return () => window.clearInterval(intervalId);
}

REST endpoint shape (reference)

If you’re exposing an HTTP API for run visualization, a practical baseline is:

  • GET /api/runs?workflowId=... → list runs (most recent first)
  • GET /api/runs/:runId → run snapshot (status, node statuses, metadata)
  • GET /api/runs/:runId/events → SSE stream (optional)
  • GET /api/runs/:runId/events?cursor=... → poll for incremental events (optional)

Auth and security

The adapter endpoints are owned by you. Recommended patterns:

  • Keep worker credentials server-side; expose only what the UI needs.
  • If Linkage runs in a browser, prefer same-origin proxying (/api/...) or short-lived, scoped tokens.
  • Treat run output as potentially sensitive and scope access by user/workflow permissions.

Next