Distributed Adapter
The distributed adapter pattern is the mechanism for scaling Flowcraft beyond a single process. This section details the core components for building your own adapter.
BaseDistributedAdapter Abstract Class
The base class for all distributed adapters. It handles the technology-agnostic orchestration logic, leaving queue-specific implementation details to subclasses.
constructor(options)
optionsAdapterOptions:runtimeOptions: TheRuntimeOptionsto configure the internalFlowRuntimeinstance.coordinationStore: An instance ofICoordinationStore.
Abstract Methods to Implement
protected abstract createContext(runId): Must return an instance of a distributedIAsyncContext.protected abstract processJobs(handler): Must set up a listener on the message queue and call the providedhandlerfor each job.protected abstract enqueueJob(job): Must enqueue a new job onto the message queue.protected abstract publishFinalResult(runId, result): Must publish the final result of a workflow run.
Overridable Hooks
protected shouldRetryInProcess(nodeDef): Returnstrueby default. Override to returnfalsewhen your queue system should handle retries natively (e.g., BullMQattempts/backoff). Whenfalse, the adapter forcesmaxRetries = 1so the executor runs exactly once and lets the queue schedule retries.protected getQueueRetryOptions(nodeDef): Returnsundefinedby default. Override to return queue-specific retry options (e.g.,{ attempts, backoff }) that are applied when enqueuing successor jobs. Only used whenshouldRetryInProcess()returnsfalse.
ICoordinationStore Interface
Defines the contract for an atomic, distributed key-value store required for coordination tasks like fan-in joins and distributed locks.
interface ICoordinationStore {
// Atomically increments a key and returns the new value.
increment: (key: string, ttlSeconds: number) => Promise<number>
// Sets a key only if it does not already exist.
setIfNotExist: (key: string, value: string, ttlSeconds: number) => Promise<boolean>
// Extends the TTL of an existing key. Used for heartbeat mechanism in long-running jobs.
extendTTL: (key: string, ttlSeconds: number) => Promise<boolean>
// Deletes a key.
delete: (key: string) => Promise<void>
// Gets the value of a key.
get: (key: string) => Promise<string | undefined>
}JobPayload Interface
The data payload expected for a job in the message queue.
interface JobPayload {
runId: string
blueprintId: string
nodeId: string
attempt?: number
isLastAttempt?: boolean
}Delta-Based Persistence
All distributed adapters now support delta-based persistence for optimal performance with large state objects. Instead of serializing and transmitting the entire workflow context after each node execution, adapters use the patch() method to apply only the changes (deltas) atomically.
Each adapter implements patch() using its database's most efficient partial update mechanism:
- DynamoDB (SQS):
UpdateExpressionwithSETandREMOVE - Redis (BullMQ):
HSETandHDELfor hash operations - PostgreSQL (RabbitMQ):
jsonb_set()and#-operators - Azure Cosmos DB: Native patch operations (
set/remove) - Google Firestore:
update()withFieldValue.delete() - Apache Cassandra (Kafka): Read-modify-write pattern
Built-in Adapters
@flowcraft/bullmq-adapter: BullMQ and Redis.@flowcraft/sqs-adapter: AWS SQS and DynamoDB.@flowcraft/gcp-adapter: Google Pub/Sub, Firestore, and Redis.@flowcraft/azure-adapter: Azure Queues, Cosmos DB, and Redis.@flowcraft/kafka-adapter: Apache Kafka, Cassandra, and Redis.@flowcraft/rabbitmq-adapter: RabbitMQ, PostgreSQL, and Redis.@flowcraft/cloudflare-adapter: Cloudflare Queues, Durable Objects, and KV.
For examples, see the Adapters Guide.