Runtime Adapter: Azure (Queues, Cosmos DB & Redis)
This adapter provides a distributed solution for Flowcraft leveraging Microsoft Azure services. It uses Azure Queue Storage for job queuing, Azure Cosmos DB for state persistence, and Redis for high-performance coordination.
Installation
You will need the adapter and its peer dependencies for Azure and Redis.
npm install flowcraft @flowcraft/azure-adapter @azure/storage-queue @azure/cosmos ioredisArchitecture
This adapter leverages native Azure services for the queue and context, with Redis handling coordination.
Infrastructure Setup
You must have the following resources provisioned:
- An Azure Storage Account with a Queue.
- An Azure Cosmos DB account (Core SQL API) with a database and two containers (one for context, one for status).
- A Redis instance (e.g., Azure Cache for Redis) accessible by your workers.
Using Azure CLI
# 1. Create Storage Account and Queue
az storage account create --name flowcraftstorage --resource-group my-rg
az storage queue create --name flowcraft-jobs --account-name flowcraftstorage
# 2. Create Cosmos DB Account, Database, and Containers
az cosmosdb create --name flowcraft-cosmos --resource-group my-rg
az cosmosdb sql database create --account-name flowcraft-cosmos --name flowcraft-db --resource-group my-rg
az cosmosdb sql container create --account-name flowcraft-cosmos --database-name flowcraft-db --name contexts --partition-key-path "/runId" --resource-group my-rg
az cosmosdb sql container create --account-name flowcraft-cosmos --database-name flowcraft-db --name statuses --partition-key-path "/runId" --resource-group my-rg
# 3. Create Redis Cache
az redis create --name flowcraft-redis --resource-group my-rg --location eastus --sku Basic --vm-size c0Using Terraform
resource "azurerm_storage_account" "main" {
name = "flowcraftstorage"
resource_group_name = var.resource_group_name
location = var.location
account_tier = "Standard"
account_replication_type = "LRS"
}
resource "azurerm_storage_queue" "jobs" {
name = "flowcraft-jobs"
storage_account_name = azurerm_storage_account.main.name
}
resource "azurerm_cosmosdb_account" "main" {
name = "flowcraft-cosmos"
resource_group_name = var.resource_group_name
location = var.location
offer_type = "Standard"
kind = "GlobalDocumentDB"
// ... other required config
}
resource "azurerm_cosmosdb_sql_database" "main" {
name = "flowcraft-db"
resource_group_name = azurerm_cosmosdb_account.main.resource_group_name
account_name = azurerm_cosmosdb_account.main.name
}
resource "azurerm_cosmosdb_sql_container" "contexts" {
name = "contexts"
resource_group_name = azurerm_cosmosdb_account.main.resource_group_name
account_name = azurerm_cosmosdb_account.main.name
database_name = azurerm_cosmosdb_sql_database.main.name
partition_key_path = "/runId"
}
// ... also for statuses container and redisWorker Usage
The following example demonstrates how to set up and start a worker.
import { CosmosClient } from '@azure/cosmos'
import { QueueClient } from '@azure/storage-queue'
import { AzureQueueAdapter, RedisCoordinationStore } from '@flowcraft/azure-adapter'
import { FlowRuntime } from 'flowcraft'
import Redis from 'ioredis'
// 1. Define your blueprints and registry
const blueprints = { /* your workflow blueprints */ }
const registry = { /* your node implementations */ }
// 2. Initialize service clients
const queueClient = new QueueClient(process.env.AZURE_STORAGE_CONNECTION_STRING, 'your-queue-name')
const cosmosClient = new CosmosClient(process.env.COSMOS_DB_CONNECTION_STRING)
const redisClient = new Redis(process.env.REDIS_CONNECTION_STRING)
// 3. Create a runtime instance
const runtime = new FlowRuntime({ blueprints, registry })
// 4. Set up the coordination store
const coordinationStore = new RedisCoordinationStore(redisClient)
// 5. Initialize the adapter
const adapter = new AzureQueueAdapter({
runtimeOptions: runtime.options,
coordinationStore,
queueClient,
cosmosClient,
cosmosDatabaseName: 'your-cosmos-db-name',
contextContainerName: 'workflow-contexts',
statusContainerName: 'workflow-statuses',
})
// 6. Start the worker
adapter.start()
console.log('Flowcraft worker with Azure adapter is running...')Starting a Workflow (Client-Side)
A client starts a workflow by creating the initial state in Cosmos DB and enqueuing the first job(s).
import { analyzeBlueprint } from 'flowcraft'
import { CosmosClient } from '@azure/cosmos'
import { QueueClient } from '@azure/storage-queue'
async function startWorkflow(blueprint, initialContext) {
const runId = crypto.randomUUID()
const cosmosClient = new CosmosClient(process.env.COSMOS_DB_CONNECTION_STRING)
const queueClient = new QueueClient(process.env.AZURE_STORAGE_CONNECTION_STRING, 'your-queue-name')
// 1. Set initial context and status in Cosmos DB
const db = cosmosClient.database('your-cosmos-db-name')
await db.container('workflow-contexts').items.create({ id: runId, ...initialContext })
await db.container('workflow-statuses').items.create({ id: runId, status: 'running', lastUpdated: new Date().toISOString() })
// 2. Analyze blueprint for start nodes
const analysis = analyzeBlueprint(blueprint)
const startJobs = analysis.startNodeIds.map(nodeId =>
queueClient.sendMessage(JSON.stringify({ runId, blueprintId: blueprint.id, nodeId }))
)
// 3. Enqueue start jobs
await Promise.all(startJobs)
console.log(`Workflow ${runId} started.`)
return runId
}Reconciliation
The adapter includes a utility to find and resume stalled workflows, ensuring workflows complete even if workers fail.
How It Works
The reconciler queries the Cosmos DB status container for 'running' workflows that have not been updated recently and re-enqueues their next valid jobs.
Usage
import { createAzureReconciler } from '@flowcraft/azure-adapter'
// 'adapter' and 'cosmosClient' should be initialized as in the worker setup
const reconciler = createAzureReconciler({
adapter,
cosmosClient,
cosmosDatabaseName: 'your-cosmos-db-name',
statusContainerName: 'workflow-statuses',
stalledThresholdSeconds: 300, // 5 minutes
})
// Run this function periodically
async function reconcile() {
const stats = await reconciler.run()
console.log(`Reconciled ${stats.reconciledRuns} of ${stats.stalledRuns} stalled runs.`)
}Webhook Endpoints
The Azure adapter supports webhook endpoints for workflows using Flow.createWebhook(). Webhook endpoints can be registered using Azure Functions or API Management.
registerWebhookEndpoint(runId, nodeId)
Registers a webhook endpoint for the specified workflow run and node.
runIdstring: The unique identifier for the workflow execution.nodeIdstring: The ID of the webhook node.- Returns:
Promise<{ url: string; event: string }>- The webhook URL and event name.
Example Implementation:
// In AzureQueueAdapter
public async registerWebhookEndpoint(runId: string, nodeId: string): Promise<{ url: string; event: string }> {
const eventName = `webhook:${runId}:${nodeId}`
// Use Azure Functions URL or custom domain
const url = `https://your-function-app.azurewebsites.net/api/webhook/${runId}/${nodeId}`
// Store webhook mapping in Cosmos DB for later retrieval
await this.cosmosDb.items.create({
id: `${runId}:${nodeId}`,
eventName,
url,
createdAt: new Date().toISOString()
})
return { url, event: eventName }
}Handling Webhook Requests with Azure Functions
Create an Azure Function to handle webhook requests and send messages to Azure Queue:
// Azure Function (TypeScript)
import { QueueClient } from '@azure/storage-queue'
import { CosmosClient } from '@azure/cosmos'
const queueClient = QueueClient.fromConnectionString(process.env.AZURE_STORAGE_CONNECTION_STRING!, 'flowcraft-events')
const cosmosClient = new CosmosClient(process.env.COSMOS_CONNECTION_STRING!)
const database = cosmosClient.database('flowcraft')
const container = database.container('webhooks')
export async function webhookHandler(context: any, req: any): Promise<void> {
const { runId, nodeId } = context.bindingData
const payload = req.body
// Get webhook mapping from Cosmos DB
const { resources: webhooks } = await container.items
.query(`SELECT * FROM c WHERE c.id = '${runId}:${nodeId}'`)
.fetchAll()
if (webhooks.length > 0) {
const webhookData = webhooks[0]
// Send event to Azure Queue
await queueClient.sendMessage(JSON.stringify({
event: webhookData.eventName,
payload
}))
context.res = { status: 200, body: 'OK' }
} else {
context.res = { status: 404, body: 'Webhook not found' }
}
}Key Components
AzureQueueAdapter: Orchestrates job processing from Azure Queues.CosmosDbContext: AnIAsyncContextimplementation for Cosmos DB.RedisCoordinationStore: AnICoordinationStorefor distributed locks using Redis.createAzureReconciler: A factory function to create the reconciliation utility.