Observability and Events
Event Bus
Flowcraft provides an event bus for observability, allowing you to monitor workflow execution in real-time. The runtime emits various events during execution, which can be used for logging, monitoring, or triggering external actions.
Available Events
The event bus uses structured events for observability. See the FlowcraftEvent type definition and detailed descriptions of all available events.
Event Descriptions
workflow:start: Emitted when a workflow execution begins.workflow:finish: Emitted when a workflow completes, fails, or is cancelled.workflow:stall: Emitted when a workflow cannot proceed (e.g., due to unresolved dependencies).workflow:pause: Emitted when a workflow is paused (e.g., due to cancellation or stalling).workflow:resume: Emitted when a workflow resumes execution.node:start: Emitted when a node begins execution, including the resolved input.node:finish: Emitted when a node completes successfully.node:error: Emitted when a node fails.node:fallback: Emitted when a fallback node is executed.node:retry: Emitted when a node execution is retried.node:skipped: Emitted when a conditional edge is not taken.edge:evaluate: Emitted when an edge condition is evaluated, showing the condition and result.context:change: Emitted when data is written to the workflow context.batch:start: Emitted when a batch operation begins.batch:finish: Emitted when a batch operation completes.job:enqueued: Emitted when a job is enqueued for distributed processing.job:processed: Emitted when a distributed job completes successfully.job:failed: Emitted when a distributed job fails.
Using the Event Bus
You can provide a custom event bus when creating the runtime:
import type { IEventBus } from 'flowcraft'
const eventBus: IEventBus = {
async emit(event) {
console.log(`Event: ${event.type}`, event.payload)
// Send to monitoring service, etc.
}
}
const runtime = new FlowRuntime({
registry: myNodeRegistry,
eventBus,
})For the complete FlowcraftEvent type definition, see the Runtime API documentation.
This allows you to integrate with tools like OpenTelemetry, DataDog, or custom logging systems for comprehensive observability.
InMemoryEventLogger
The InMemoryEventLogger acts as a "flight recorder" for debugging complex workflow executions. It captures all events emitted during a workflow run, allowing you to inspect the sequence of operations, data flow, and errors in detail.
Usage
import { createFlow, FlowRuntime } from 'flowcraft'
import { InMemoryEventLogger } from 'flowcraft/testing'
const eventLogger = new InMemoryEventLogger()
const runtime = new FlowRuntime({
eventBus: eventLogger
})
const flow = createFlow('my-workflow')
.node('a', () => ({ output: 'done' }))
await runtime.run(flow.toBlueprint())
// You can now inspect the captured events
const startEvent = eventLogger.find('workflow:start')Benefits
- Non-Intrusive: Captures events without modifying workflow logic.
- Detailed Trace: Records node executions, context changes, and errors.
- In-Memory: Fast and lightweight, ideal for unit tests or local debugging.
Workflow Replay
Workflow replay enables time-travel debugging by reconstructing workflow state from recorded events without re-executing node logic. This is invaluable for debugging failed workflows, analyzing performance issues, or understanding complex state transitions.
How It Works
When workflows run with persistent event storage, all execution events are captured. The replay system processes these events in order to reconstruct the final workflow state:
node:finish: Applies completed node outputs to contextcontext:change: Applies context modifications (including usercontext.set()calls)node:error: Records errors in the workflow stateworkflow:finish: Marks workflow completion
Usage
import { createFlow, FlowRuntime } from 'flowcraft'
import { PersistentEventBusAdapter, InMemoryEventStore } from 'flowcraft'
// Set up persistent event storage
const eventStore = new InMemoryEventStore()
const eventBus = new PersistentEventBusAdapter(eventStore)
const runtime = new FlowRuntime({ eventBus })
// Create and run a workflow
const flow = createFlow('my-workflow')
.node('process-data', async ({ context }) => {
await context.set('result', 'processed')
return { output: 'done' }
})
const result = await runtime.run(flow.toBlueprint(), {}, { functionRegistry: flow.getFunctionRegistry() })
// Later, replay the execution for debugging
const executionId = result.context._executionId
const events = await eventStore.retrieve(executionId)
const replayResult = await runtime.replay(flow.toBlueprint(), events)
// replayResult.context contains the reconstructed final state
console.log(replayResult.context.result) // 'processed'Benefits
- Time-Travel Debugging: Inspect the exact state of any workflow execution at any point
- Post-Mortem Analysis: Reconstruct failed workflow states without re-running expensive operations
- Performance Analysis: Analyze execution patterns without the overhead of re-execution
- Testing: Verify complex state transitions and edge cases
- Pluggable Storage: Easy to implement custom event stores (databases, message queues, etc.)
Event Storage Backends
The replay system is designed to work with any event storage backend. Flowcraft provides:
InMemoryEventStore: Simple in-memory implementation for testing and development- Custom Implementations: Implement the
IEventStoreinterface for databases, log streams, or message queues
interface IEventStore {
store(event: FlowcraftEvent, executionId: string): Promise<void>
retrieve(executionId: string): Promise<FlowcraftEvent[]>
retrieveMultiple(executionIds: string[]): Promise<Map<string, FlowcraftEvent[]>>
}History Adapters
For persistent event storage in production environments, Flowcraft provides dedicated history adapter packages that implement the IEventStore interface for different databases.
SQLite History Adapter
The @flowcraft/sqlite-history package provides SQLite-based event storage, ideal for development, testing, and small-scale production deployments.
Installation
npm install @flowcraft/sqlite-historyUsage
import { SqliteHistoryAdapter } from '@flowcraft/sqlite-history'
import { PersistentEventBusAdapter } from 'flowcraft'
// Create SQLite event store
const eventStore = new SqliteHistoryAdapter({
databasePath: './workflow-events.db',
walMode: true, // Enable WAL mode for better concurrent access
})
// Create persistent event bus
const eventBus = new PersistentEventBusAdapter(eventStore)
// Use with Flowcraft runtime
const runtime = new FlowRuntime({ eventBus })
// Events are automatically stored and can be retrieved later
const events = await eventStore.retrieve(executionId)Configuration
databasePath: Path to SQLite database file. Use:memory:for in-memory storage.walMode: Enable WAL mode for better concurrent read/write access (default: true).
Features
- Efficient Storage: SQLite provides fast, reliable event storage
- Concurrent Access: WAL mode enables concurrent reads and writes
- Query Capabilities: Retrieve events by execution ID or multiple executions
- Statistics: Get insights into stored events and executions
PostgreSQL History Adapter
The @flowcraft/postgres-history package provides PostgreSQL-based event storage for enterprise-scale deployments requiring high availability and performance.
Installation
npm install @flowcraft/postgres-historyUsage
import { PostgresHistoryAdapter } from '@flowcraft/postgres-history'
import { PersistentEventBusAdapter } from 'flowcraft'
// Create PostgreSQL event store
const eventStore = new PostgresHistoryAdapter({
host: 'localhost',
port: 5432,
database: 'flowcraft',
user: 'flowcraft',
password: 'password',
tableName: 'workflow_events', // optional, defaults to 'flowcraft_events'
})
// Create persistent event bus
const eventBus = new PersistentEventBusAdapter(eventStore)
// Use with Flowcraft runtime
const runtime = new FlowRuntime({ eventBus })
// Events are automatically stored and can be retrieved later
const events = await eventStore.retrieve(executionId)Configuration
All standard pg.PoolConfig options are supported:
host,port,database,user,password: Database connection detailstableName: Custom table name (default: 'flowcraft_events')autoCreateTables: Automatically create tables and indexes (default: true)
Features
- Scalable Storage: PostgreSQL handles high-volume event storage
- Concurrent Access: Full support for concurrent reads and writes
- Advanced Querying: Leverage PostgreSQL's JSONB for complex event queries
- Connection Pooling: Efficient connection management with pg.Pool
- Statistics: Get insights into stored events and executions
Both adapters automatically create the necessary database schema on first use and support efficient querying by execution ID.
For CLI tools to inspect and debug workflow executions, see the CLI Guide.
OpenTelemetry
For distributed tracing and observability, you can use the @flowcraft/opentelemetry-middleware package. This middleware integrates with OpenTelemetry to provide end-to-end visibility into workflow executions.
import { OpenTelemetryMiddleware } from '@flowcraft/opentelemetry-middleware'
import { NodeSDK } from '@opentelemetry/sdk-node'
import { OTLPTraceExporter } from '@opentelemetry/exporter-trace-otlp-http'
// Set up OpenTelemetry SDK (standard OTel setup)
const sdk = new NodeSDK({
traceExporter: new OTLPTraceExporter(), // Point to Jaeger, Datadog, etc.
})
sdk.start()
// Create the middleware
const otelMiddleware = new OpenTelemetryMiddleware('flowcraft-worker')
// Add to runtime
const runtime = new FlowRuntime({
middleware: [otelMiddleware],
})This middleware automatically creates spans for each node execution, propagates context between nodes, and records errors, enabling full observability in distributed environments.