Runtime Adapter: Google Cloud (Pub/Sub & Firestore)
The Google Cloud adapter provides a fully native, serverless-friendly solution for running distributed workflows on GCP. It uses Pub/Sub for event-driven job queuing, Firestore for the context store, and Redis for coordination.
Installation
You will need the adapter package, the GCP clients, and ioredis.
npm install flowcraft @flowcraft/gcp-adapter @google-cloud/pubsub @google-cloud/firestore ioredisArchitecture
This adapter leverages native GCP services for the queue and context, with Redis handling coordination.
Infrastructure Setup
Before running, you need to provision the following GCP resources:
- A Pub/Sub Topic and a corresponding Subscription.
- A Firestore Database. The required collections will be created by the adapter.
- A Redis instance (e.g., Memorystore for Redis).
Using gcloud CLI
# 1. Create Pub/Sub topic and subscription
gcloud pubsub topics create flowcraft-jobs
gcloud pubsub subscriptions create flowcraft-workers --topic flowcraft-jobs
# 2. Enable Firestore API
gcloud services enable firestore.googleapis.com
# 3. Create a Redis instance in Memorystore
gcloud redis instances create flowcraft-redis --size=1 --region=us-central1Using Terraform
resource "google_pubsub_topic" "flowcraft_jobs" {
name = "flowcraft-jobs"
}
resource "google_pubsub_subscription" "flowcraft_workers" {
name = "flowcraft-workers"
topic = google_pubsub_topic.flowcraft_jobs.name
}
resource "google_firestore_database" "database" {
project = var.project_id
name = "(default)"
location_id = "nam5" // Example location, choose one near your app
type = "FIRESTORE_NATIVE"
}
resource "google_redis_instance" "flowcraft_redis" {
name = "flowcraft-redis"
memory_size_gb = 1
region = "us-central1"
}Worker Usage
The following example demonstrates how to configure and start a worker.
import { PubSubAdapter, RedisCoordinationStore } from '@flowcraft/gcp-adapter'
import { Firestore } from '@google-cloud/firestore'
import { PubSub } from '@google-cloud/pubsub'
import { FlowRuntime } from 'flowcraft'
import Redis from 'ioredis'
// 1. Define your blueprints and registry
const blueprints = { /* your workflow blueprints */ }
const registry = { /* your node implementations */ }
// 2. Initialize service clients
const pubsubClient = new PubSub({ projectId: process.env.GCP_PROJECT_ID })
const firestoreClient = new Firestore({ projectId: process.env.GCP_PROJECT_ID })
const redisClient = new Redis(process.env.REDIS_URL)
// 3. Create a runtime instance
const runtime = new FlowRuntime({ blueprints, registry })
// 4. Set up the coordination store
const coordinationStore = new RedisCoordinationStore(redisClient)
// 5. Initialize the adapter
const adapter = new PubSubAdapter({
runtimeOptions: runtime.options,
coordinationStore,
pubsubClient,
firestoreClient,
topicName: 'flowcraft-jobs',
subscriptionName: 'flowcraft-workers',
contextCollectionName: 'workflow-contexts',
statusCollectionName: 'workflow-statuses',
})
// 6. Start the worker
adapter.start()
console.log('Flowcraft worker with GCP adapter is running...')Starting a Workflow (Client-Side)
A client starts a workflow by creating the initial state in Firestore and publishing the first job(s) to Pub/Sub.
import { analyzeBlueprint } from 'flowcraft'
import { Firestore } from '@google-cloud/firestore'
import { PubSub } from '@google-cloud/pubsub'
async function startWorkflow(blueprint, initialContext) {
const runId = crypto.randomUUID()
const projectId = process.env.GCP_PROJECT_ID
const firestore = new Firestore({ projectId })
const pubsub = new PubSub({ projectId })
// 1. Set initial context and status in Firestore
await firestore.collection('workflow-contexts').doc(runId).set(initialContext)
await firestore.collection('workflow-statuses').doc(runId).set({ status: 'running', lastUpdated: new Date() })
// 2. Analyze blueprint for start nodes
const analysis = analyzeBlueprint(blueprint)
const topic = pubsub.topic('flowcraft-jobs')
const publishPromises = analysis.startNodeIds.map(nodeId => {
const data = Buffer.from(JSON.stringify({ runId, blueprintId: blueprint.id, nodeId }))
return topic.publishMessage({ data })
})
// 3. Publish start jobs
await Promise.all(publishPromises)
console.log(`Workflow ${runId} started.`)
return runId
}Reconciliation
The adapter includes a utility to find and resume stalled workflows by querying Firestore.
How It Works
The reconciler queries the Firestore status collection for 'running' workflows that have been idle for too long and re-publishes their next valid jobs.
Usage
import { createGcpReconciler } from '@flowcraft/gcp-adapter'
// 'adapter' and 'firestoreClient' should be initialized as in the worker setup
const reconciler = createGcpReconciler({
adapter,
firestoreClient,
statusCollectionName: 'workflow-statuses',
stalledThresholdSeconds: 300, // 5 minutes
})
// Run this function periodically
async function reconcile() {
const stats = await reconciler.run()
console.log(`Reconciled ${stats.reconciledRuns} of ${stats.stalledRuns} stalled runs.`)
}Key Components
PubSubAdapter: Orchestrates job processing from a Pub/Sub subscription.FirestoreContext: AnIAsyncContextimplementation for Firestore.RedisCoordinationStore: AnICoordinationStorefor distributed locks using Redis.createGcpReconciler: A factory function to create the reconciliation utility.