Data Pipeline Services
Build ETL, data transformation, real-time and batch processing services for moving and transforming data at scale
Build data pipeline services that extract, transform, and load data between systems, enabling real-time streaming, batch processing, data enrichment, and quality assurance at scale.
Overview
Data pipeline services automate the movement and transformation of data across systems. They're essential for:
- ETL Operations: Extract, Transform, Load workflows
- Data Synchronization: Keep systems in sync in real-time
- Data Warehousing: Load data into analytics platforms
- Data Enrichment: Enhance data with additional information
- Real-time Processing: Stream processing for immediate insights
- Batch Processing: Process large datasets efficiently
Core Concepts
Pipeline Architecture
import $, { db, on, send, every } from 'sdk.do'
// Define ETL pipeline service
const customerDataPipeline = await $.Service.create({
name: 'Customer Data ETL Pipeline',
type: $.ServiceType.Automation,
subtype: 'data-pipeline',
// Pipeline configuration
pipeline: {
source: {
type: 'database',
connection: 'crm-database',
query: 'SELECT * FROM customers WHERE updated_at > ?',
incremental: true,
},
transformations: [
{
id: 'normalize',
type: 'function',
description: 'Normalize customer data',
},
{
id: 'enrich',
type: 'function',
description: 'Enrich with third-party data',
},
{
id: 'validate',
type: 'function',
description: 'Validate data quality',
},
],
destination: {
type: 'data-warehouse',
connection: 'analytics-warehouse',
table: 'customers',
mode: 'upsert',
},
},
// Schedule
schedule: {
type: 'cron',
expression: '0 */6 * * *', // Every 6 hours
timezone: 'UTC',
},
// Pricing
pricing: {
model: 'per-gb',
rate: 1.0,
minimumCharge: 0.1,
additionalCosts: {
enrichment: 0.5, // per 1000 records
},
},
})Building ETL Pipelines
Extract Phase
// Extract data from source
async function extractData(source: any, lastRunTime: Date) {
switch (source.type) {
case 'database':
return await extractFromDatabase(source, lastRunTime)
case 'api':
return await extractFromAPI(source, lastRunTime)
case 'file':
return await extractFromFile(source)
case 'stream':
return await extractFromStream(source)
default:
throw new Error(`Unsupported source type: ${source.type}`)
}
}
// Database extraction
async function extractFromDatabase(source: any, lastRunTime: Date) {
const connection = await getConnection(source.connection)
try {
const query = source.incremental ? [source.query, lastRunTime] : source.query
const results = await connection.query(...(Array.isArray(query) ? query : [query]))
return {
records: results,
count: results.length,
extractedAt: new Date(),
source: 'database',
incremental: source.incremental,
}
} finally {
await connection.close()
}
}
// API extraction with pagination
async function extractFromAPI(source: any, lastRunTime: Date) {
const records = []
let page = 1
let hasMore = true
while (hasMore) {
const response = await fetch(source.endpoint, {
method: 'GET',
headers: {
Authorization: `Bearer ${source.apiKey}`,
'Content-Type': 'application/json',
},
params: {
page,
per_page: 100,
updated_since: lastRunTime.toISOString(),
},
})
const data = await response.json()
records.push(...data.results)
hasMore = data.has_more
page++
// Rate limiting
await sleep(source.rateLimitDelay || 100)
}
return {
records,
count: records.length,
extractedAt: new Date(),
source: 'api',
pages: page - 1,
}
}
// File extraction with streaming
async function extractFromFile(source: any) {
const stream = await openFileStream(source.path)
const records = []
return new Promise((resolve, reject) => {
stream
.pipe(createParser(source.format)) // csv, json, parquet, etc.
.on('data', (record) => {
records.push(record)
})
.on('end', () => {
resolve({
records,
count: records.length,
extractedAt: new Date(),
source: 'file',
format: source.format,
})
})
.on('error', reject)
})
}Transform Phase
// Transform extracted data
async function transformData(data: any, transformations: any[]) {
let records = data.records
const transformResults = []
for (const transform of transformations) {
const startTime = Date.now()
try {
switch (transform.type) {
case 'function':
records = await executeTransformFunction(records, transform)
break
case 'map':
records = await mapTransform(records, transform)
break
case 'filter':
records = await filterTransform(records, transform)
break
case 'aggregate':
records = await aggregateTransform(records, transform)
break
case 'join':
records = await joinTransform(records, transform)
break
default:
throw new Error(`Unknown transform type: ${transform.type}`)
}
transformResults.push({
id: transform.id,
success: true,
duration: Date.now() - startTime,
recordsIn: records.length,
recordsOut: records.length,
})
} catch (error) {
transformResults.push({
id: transform.id,
success: false,
error: error.message,
duration: Date.now() - startTime,
})
if (transform.required !== false) {
throw error
}
}
}
return {
records,
count: records.length,
transformations: transformResults,
}
}
// Map transform - field mapping and conversion
async function mapTransform(records: any[], transform: any) {
return records.map((record) => {
const mapped: any = {}
for (const [targetField, mapping] of Object.entries(transform.mappings)) {
const sourceField = (mapping as any).source
const converter = (mapping as any).converter
let value = getNestedValue(record, sourceField)
// Apply converter if specified
if (converter) {
value = applyConverter(value, converter)
}
setNestedValue(mapped, targetField, value)
}
return mapped
})
}
// Filter transform - filter records
async function filterTransform(records: any[], transform: any) {
return records.filter((record) => {
return evaluateFilter(record, transform.condition)
})
}
// Aggregate transform - group and aggregate
async function aggregateTransform(records: any[], transform: any) {
const groups = new Map()
// Group records
for (const record of records) {
const groupKey = transform.groupBy.map((field: string) => getNestedValue(record, field)).join('|')
if (!groups.has(groupKey)) {
groups.set(groupKey, [])
}
groups.get(groupKey).push(record)
}
// Aggregate each group
const aggregated = []
for (const [groupKey, groupRecords] of groups) {
const result: any = {}
// Add group by fields
const groupKeyParts = groupKey.split('|')
transform.groupBy.forEach((field: string, index: number) => {
result[field] = groupKeyParts[index]
})
// Add aggregations
for (const [field, aggFunc] of Object.entries(transform.aggregations)) {
const values = groupRecords.map((r: any) => getNestedValue(r, field))
result[`${aggFunc}_${field}`] = applyAggregation(values, aggFunc as string)
}
aggregated.push(result)
}
return aggregated
}
// Apply aggregation function
function applyAggregation(values: any[], func: string): any {
switch (func) {
case 'sum':
return values.reduce((sum, v) => sum + (Number(v) || 0), 0)
case 'avg':
return values.reduce((sum, v) => sum + (Number(v) || 0), 0) / values.length
case 'min':
return Math.min(...values.map((v) => Number(v) || 0))
case 'max':
return Math.max(...values.map((v) => Number(v) || 0))
case 'count':
return values.length
case 'count_distinct':
return new Set(values).size
default:
throw new Error(`Unknown aggregation function: ${func}`)
}
}Load Phase
// Load transformed data to destination
async function loadData(data: any, destination: any) {
switch (destination.type) {
case 'database':
return await loadToDatabase(data, destination)
case 'data-warehouse':
return await loadToWarehouse(data, destination)
case 'api':
return await loadToAPI(data, destination)
case 'file':
return await loadToFile(data, destination)
default:
throw new Error(`Unsupported destination type: ${destination.type}`)
}
}
// Load to database with upsert
async function loadToDatabase(data: any, destination: any) {
const connection = await getConnection(destination.connection)
const batchSize = destination.batchSize || 1000
let loaded = 0
let errors = 0
try {
// Process in batches
for (let i = 0; i < data.records.length; i += batchSize) {
const batch = data.records.slice(i, i + batchSize)
try {
switch (destination.mode) {
case 'insert':
await connection.insertMany(destination.table, batch)
break
case 'upsert':
await connection.upsertMany(destination.table, batch, destination.keyFields)
break
case 'replace':
await connection.replaceMany(destination.table, batch, destination.keyFields)
break
default:
throw new Error(`Unknown load mode: ${destination.mode}`)
}
loaded += batch.length
} catch (error) {
errors += batch.length
// Handle errors based on strategy
if (destination.errorHandling === 'continue') {
// Log and continue
await logLoadError(batch, error)
} else {
throw error
}
}
}
return {
loaded,
errors,
total: data.records.length,
loadedAt: new Date(),
}
} finally {
await connection.close()
}
}
// Load to data warehouse (optimized for analytics)
async function loadToWarehouse(data: any, destination: any) {
const connection = await getConnection(destination.connection)
try {
// Create staging table
const stagingTable = `${destination.table}_staging_${Date.now()}`
await connection.createTable(stagingTable, data.schema)
// Bulk load to staging
await connection.bulkLoad(stagingTable, data.records)
// Merge from staging to target
await connection.execute(`
MERGE INTO ${destination.table} AS target
USING ${stagingTable} AS source
ON ${destination.keyFields.map((f) => `target.${f} = source.${f}`).join(' AND ')}
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
`)
// Drop staging table
await connection.dropTable(stagingTable)
return {
loaded: data.records.length,
errors: 0,
total: data.records.length,
loadedAt: new Date(),
method: 'bulk-merge',
}
} finally {
await connection.close()
}
}Complete ETL Pipeline Implementation
// Execute full ETL pipeline
on($.ServiceRequest.created, async (request) => {
if (request.serviceId !== customerDataPipeline.id) return
const execution = {
requestId: request.id,
startTime: Date.now(),
steps: [],
metrics: {
recordsExtracted: 0,
recordsTransformed: 0,
recordsLoaded: 0,
bytesProcessed: 0,
errors: 0,
},
}
try {
// Get last successful run time
const lastRun = await db.findOne($.PipelineExecution, {
serviceId: customerDataPipeline.id,
status: 'success',
orderBy: { completedAt: 'desc' },
})
const lastRunTime = lastRun?.completedAt || new Date(0)
// EXTRACT
await send($.PipelineProgress.updated, {
requestId: request.id,
phase: 'extract',
status: 'running',
})
const extracted = await extractData(customerDataPipeline.pipeline.source, lastRunTime)
execution.metrics.recordsExtracted = extracted.count
execution.steps.push({
phase: 'extract',
success: true,
recordCount: extracted.count,
})
// TRANSFORM
await send($.PipelineProgress.updated, {
requestId: request.id,
phase: 'transform',
status: 'running',
recordCount: extracted.count,
})
const transformed = await transformData(extracted, customerDataPipeline.pipeline.transformations)
execution.metrics.recordsTransformed = transformed.count
execution.steps.push({
phase: 'transform',
success: true,
recordCount: transformed.count,
transformations: transformed.transformations,
})
// LOAD
await send($.PipelineProgress.updated, {
requestId: request.id,
phase: 'load',
status: 'running',
recordCount: transformed.count,
})
const loaded = await loadData(transformed, customerDataPipeline.pipeline.destination)
execution.metrics.recordsLoaded = loaded.loaded
execution.metrics.errors = loaded.errors
execution.steps.push({
phase: 'load',
success: true,
loaded: loaded.loaded,
errors: loaded.errors,
})
// Calculate bytes processed
execution.metrics.bytesProcessed = calculateBytesProcessed(extracted.records)
// Store execution record
await db.create($.PipelineExecution, {
serviceId: customerDataPipeline.id,
requestId: request.id,
status: 'success',
startedAt: new Date(execution.startTime),
completedAt: new Date(),
duration: Date.now() - execution.startTime,
metrics: execution.metrics,
steps: execution.steps,
})
// Deliver results
await send($.ServiceResult.deliver, {
requestId: request.id,
outputs: {
success: true,
recordsProcessed: execution.metrics.recordsLoaded,
bytesProcessed: execution.metrics.bytesProcessed,
duration: Date.now() - execution.startTime,
steps: execution.steps,
},
})
// Calculate cost
const gbProcessed = execution.metrics.bytesProcessed / (1024 * 1024 * 1024)
const cost = Math.max(gbProcessed * customerDataPipeline.pricing.rate, customerDataPipeline.pricing.minimumCharge)
await send($.Payment.charge, {
customerId: request.customerId,
amount: cost,
description: `ETL Pipeline - ${gbProcessed.toFixed(2)} GB processed`,
})
} catch (error) {
await send($.ServiceRequest.fail, {
requestId: request.id,
error: error.message,
execution: execution.steps,
metrics: execution.metrics,
})
}
})Real-Time Stream Processing
// Real-time event stream processor
const eventStreamPipeline = await $.Service.create({
name: 'Real-Time Event Processor',
type: $.ServiceType.Automation,
subtype: 'stream-pipeline',
stream: {
source: {
type: 'kafka',
topic: 'user-events',
consumerGroup: 'analytics-processor',
},
processors: [
{
id: 'parse',
type: 'function',
description: 'Parse event data',
},
{
id: 'enrich',
type: 'function',
description: 'Enrich with user context',
},
{
id: 'aggregate',
type: 'window',
window: { type: 'tumbling', size: 60000 }, // 1 minute
aggregations: {
event_count: 'count',
unique_users: 'count_distinct:userId',
total_value: 'sum:value',
},
},
],
sinks: [
{
type: 'database',
connection: 'analytics-db',
table: 'event_aggregates',
},
{
type: 'kafka',
topic: 'processed-events',
},
],
},
pricing: {
model: 'per-hour',
rate: 2.0,
includes: 1000000, // events per hour
overage: 0.001, // per event over limit
},
})
// Stream processor implementation
class StreamProcessor {
private windowBuffer: Map<string, any[]> = new Map()
private windowTimers: Map<string, NodeJS.Timeout> = new Map()
async processStream(pipeline: any) {
const consumer = await createKafkaConsumer(pipeline.stream.source)
consumer.on('message', async (message) => {
try {
let data = JSON.parse(message.value.toString())
// Process through pipeline
for (const processor of pipeline.stream.processors) {
data = await this.processEvent(data, processor)
}
// Write to sinks
for (const sink of pipeline.stream.sinks) {
await this.writeToSink(data, sink)
}
// Commit offset
await consumer.commitOffsets([
{
topic: message.topic,
partition: message.partition,
offset: message.offset + 1,
},
])
} catch (error) {
await this.handleStreamError(message, error)
}
})
await consumer.subscribe([pipeline.stream.source.topic])
await consumer.run()
}
async processEvent(data: any, processor: any) {
switch (processor.type) {
case 'function':
return await processor.handler(data)
case 'window':
return await this.processWindow(data, processor)
case 'join':
return await this.processJoin(data, processor)
default:
return data
}
}
async processWindow(event: any, processor: any) {
const windowKey = this.getWindowKey(event, processor.window)
// Add to window buffer
if (!this.windowBuffer.has(windowKey)) {
this.windowBuffer.set(windowKey, [])
// Set timer to process window
const timer = setTimeout(() => this.emitWindow(windowKey, processor), processor.window.size)
this.windowTimers.set(windowKey, timer)
}
this.windowBuffer.get(windowKey)!.push(event)
// Return null to prevent immediate output
return null
}
async emitWindow(windowKey: string, processor: any) {
const events = this.windowBuffer.get(windowKey) || []
this.windowBuffer.delete(windowKey)
this.windowTimers.delete(windowKey)
// Calculate aggregations
const result: any = {
windowStart: new Date(parseInt(windowKey)),
windowEnd: new Date(parseInt(windowKey) + processor.window.size),
eventCount: events.length,
}
for (const [name, aggFunc] of Object.entries(processor.aggregations)) {
const [func, field] = (aggFunc as string).split(':')
const values = field ? events.map((e) => e[field]) : events
result[name] = applyAggregation(values, func)
}
// Write aggregated result to sinks
for (const sink of processor.pipeline.stream.sinks) {
await this.writeToSink(result, sink)
}
}
getWindowKey(event: any, window: any): string {
const timestamp = event.timestamp || Date.now()
switch (window.type) {
case 'tumbling':
// Round down to window boundary
return String(Math.floor(timestamp / window.size) * window.size)
case 'sliding':
// Multiple overlapping windows
const windows = []
let windowStart = Math.floor(timestamp / window.slide) * window.slide
while (windowStart <= timestamp && windowStart > timestamp - window.size) {
windows.push(String(windowStart))
windowStart -= window.slide
}
return windows[0] // Return most recent for now
case 'session':
// Session windows with timeout
return `${event.userId}_${timestamp}`
default:
return String(timestamp)
}
}
async writeToSink(data: any, sink: any) {
if (!data) return // Skip null data (e.g., from windowing)
switch (sink.type) {
case 'database':
const connection = await getConnection(sink.connection)
await connection.insert(sink.table, data)
await connection.close()
break
case 'kafka':
const producer = await createKafkaProducer()
await producer.send({
topic: sink.topic,
messages: [{ value: JSON.stringify(data) }],
})
break
case 'api':
await fetch(sink.endpoint, {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(data),
})
break
}
}
}Data Quality and Validation
// Data quality pipeline
const dataQualityPipeline = await $.Service.create({
name: 'Data Quality Checker',
type: $.ServiceType.Automation,
subtype: 'data-pipeline',
validation: {
rules: [
{
id: 'completeness',
type: 'required-fields',
fields: ['id', 'email', 'created_at'],
severity: 'error',
},
{
id: 'format',
type: 'regex',
field: 'email',
pattern: '^[^@]+@[^@]+\\.[^@]+$',
severity: 'error',
},
{
id: 'range',
type: 'numeric-range',
field: 'age',
min: 0,
max: 150,
severity: 'warning',
},
{
id: 'uniqueness',
type: 'unique',
field: 'email',
severity: 'error',
},
{
id: 'referential',
type: 'foreign-key',
field: 'customer_id',
reference: { table: 'customers', field: 'id' },
severity: 'error',
},
],
actions: {
error: 'reject',
warning: 'flag',
info: 'log',
},
},
pricing: {
model: 'per-record',
rate: 0.001, // $0.001 per record
minimumCharge: 0.1,
},
})
// Validate data quality
async function validateDataQuality(records: any[], rules: any[]) {
const results = {
total: records.length,
passed: 0,
failed: 0,
warnings: 0,
errors: [],
warnings: [],
}
const seenValues = new Map()
for (let i = 0; i < records.length; i++) {
const record = records[i]
let recordValid = true
for (const rule of rules) {
const violation = await validateRule(record, rule, seenValues, i)
if (violation) {
if (rule.severity === 'error') {
results.errors.push({
recordIndex: i,
record,
rule: rule.id,
message: violation,
})
recordValid = false
} else if (rule.severity === 'warning') {
results.warnings.push({
recordIndex: i,
record,
rule: rule.id,
message: violation,
})
}
}
}
if (recordValid) {
results.passed++
} else {
results.failed++
}
}
return results
}
async function validateRule(record: any, rule: any, seenValues: Map<string, Set<any>>, index: number): Promise<string | null> {
switch (rule.type) {
case 'required-fields':
for (const field of rule.fields) {
if (!record[field]) {
return `Missing required field: ${field}`
}
}
return null
case 'regex':
const value = record[rule.field]
if (value && !new RegExp(rule.pattern).test(value)) {
return `Field ${rule.field} does not match pattern: ${rule.pattern}`
}
return null
case 'numeric-range':
const numValue = Number(record[rule.field])
if (isNaN(numValue) || numValue < rule.min || numValue > rule.max) {
return `Field ${rule.field} out of range: ${rule.min} - ${rule.max}`
}
return null
case 'unique':
const fieldValue = record[rule.field]
if (!seenValues.has(rule.field)) {
seenValues.set(rule.field, new Set())
}
const seen = seenValues.get(rule.field)!
if (seen.has(fieldValue)) {
return `Duplicate value for ${rule.field}: ${fieldValue}`
}
seen.add(fieldValue)
return null
case 'foreign-key':
const fkValue = record[rule.field]
const exists = await checkForeignKeyExists(rule.reference.table, rule.reference.field, fkValue)
if (!exists) {
return `Invalid foreign key ${rule.field}: ${fkValue} not found in ${rule.reference.table}`
}
return null
default:
return null
}
}Batch Processing Pipelines
// Large-scale batch processing
const batchProcessingPipeline = await $.Service.create({
name: 'Batch Data Processor',
type: $.ServiceType.Automation,
subtype: 'batch-pipeline',
batch: {
source: {
type: 's3',
bucket: 'raw-data',
prefix: 'daily-exports/',
pattern: '*.csv',
},
processing: {
chunkSize: 10000, // Process 10k records at a time
parallelism: 4, // 4 parallel workers
transformations: [
{ id: 'parse', type: 'function' },
{ id: 'cleanse', type: 'function' },
{ id: 'transform', type: 'function' },
],
},
destination: {
type: 's3',
bucket: 'processed-data',
prefix: 'processed/',
format: 'parquet',
compression: 'snappy',
},
},
pricing: {
model: 'per-gb',
rate: 0.5,
minimumCharge: 0.1,
},
})
// Batch processor with parallel workers
async function processBatchPipeline(pipeline: any, request: any) {
// List all files to process
const files = await listS3Files(pipeline.batch.source.bucket, pipeline.batch.source.prefix, pipeline.batch.source.pattern)
const totalBytes = files.reduce((sum, f) => sum + f.size, 0)
let processedBytes = 0
// Process files in parallel
const workers = []
for (let i = 0; i < pipeline.batch.processing.parallelism; i++) {
workers.push(createBatchWorker(i, pipeline))
}
// Distribute files to workers
const fileQueue = [...files]
const processFile = async (file: any) => {
// Download and parse file
const stream = await downloadS3File(file.bucket, file.key)
const chunks = []
// Process in chunks
let chunk = []
let chunkSize = 0
for await (const record of parseStream(stream, pipeline.batch.source.format)) {
chunk.push(record)
chunkSize++
if (chunkSize >= pipeline.batch.processing.chunkSize) {
chunks.push([...chunk])
chunk = []
chunkSize = 0
}
}
if (chunk.length > 0) {
chunks.push(chunk)
}
// Process chunks
const results = []
for (const chunk of chunks) {
const transformed = await transformChunk(chunk, pipeline.batch.processing.transformations)
results.push(...transformed)
}
// Write results
const outputKey = `${pipeline.batch.destination.prefix}${file.key.split('/').pop()}`
await writeToS3(pipeline.batch.destination.bucket, outputKey, results, pipeline.batch.destination.format, pipeline.batch.destination.compression)
processedBytes += file.size
// Update progress
await send($.PipelineProgress.updated, {
requestId: request.id,
processedBytes,
totalBytes,
progress: processedBytes / totalBytes,
})
return {
file: file.key,
recordsProcessed: results.length,
bytesProcessed: file.size,
}
}
// Process all files
const results = await Promise.all(fileQueue.map((file) => processFile(file)))
return {
filesProcessed: results.length,
totalRecords: results.reduce((sum, r) => sum + r.recordsProcessed, 0),
totalBytes: processedBytes,
}
}Pricing Models for Pipelines
Per-GB Pricing
const pipeline = await $.Service.create({
name: 'Data Transfer Pipeline',
pricing: {
model: 'per-gb',
rate: 1.0,
minimumCharge: 0.1,
tiers: [
{ min: 0, max: 10, rate: 1.0 },
{ min: 10, max: 100, rate: 0.75 },
{ min: 100, max: 1000, rate: 0.5 },
{ min: 1000, max: Infinity, rate: 0.25 },
],
},
})Per-Hour Streaming
const pipeline = await $.Service.create({
name: 'Stream Processor',
pricing: {
model: 'per-hour',
rate: 2.0,
includes: 1000000, // events per hour
overage: 0.001, // per additional event
},
})Per-Record Pricing
const pipeline = await $.Service.create({
name: 'Record Validator',
pricing: {
model: 'per-record',
rate: 0.001,
minimumCharge: 0.1,
volume: [
{ min: 0, max: 10000, rate: 0.001 },
{ min: 10001, max: 100000, rate: 0.0005 },
{ min: 100001, max: Infinity, rate: 0.0001 },
],
},
})Best Practices
1. Incremental Processing
Only process new or changed data:
// Track last successful run
const lastRun = await db.findOne($.PipelineExecution, {
serviceId: pipeline.id,
status: 'success',
orderBy: { completedAt: 'desc' },
})
const watermark = lastRun?.completedAt || new Date(0)
// Query only changed data
const query = `
SELECT * FROM source_table
WHERE updated_at > ?
ORDER BY updated_at
`2. Error Handling and Dead Letter Queues
// Failed record handling
async function handleFailedRecord(record: any, error: Error) {
// Write to dead letter queue
await db.create($.FailedRecord, {
pipelineId: pipeline.id,
record,
error: error.message,
timestamp: new Date(),
})
// Alert if too many failures
const recentFailures = await db.count($.FailedRecord, {
where: {
pipelineId: pipeline.id,
timestamp: { gte: new Date(Date.now() - 3600000) }, // Last hour
},
})
if (recentFailures > 100) {
await send($.Alert.create, {
severity: 'high',
message: `Pipeline ${pipeline.name} has ${recentFailures} failures in last hour`,
})
}
}3. Monitoring and Observability
// Pipeline metrics
async function trackPipelineMetrics(execution: any) {
await db.create($.PipelineMetric, {
pipelineId: execution.pipelineId,
timestamp: new Date(),
metrics: {
recordsProcessed: execution.metrics.recordsLoaded,
bytesProcessed: execution.metrics.bytesProcessed,
duration: execution.duration,
throughput: execution.metrics.recordsLoaded / (execution.duration / 1000),
errorRate: execution.metrics.errors / execution.metrics.recordsExtracted,
},
})
}Real-World Examples
Customer Data Sync
const customerSyncPipeline = await $.Service.create({
name: 'Customer Data Sync',
schedule: { cron: '0 */4 * * *' },
pricing: { model: 'per-gb', rate: 1.0 },
})Log Aggregation
const logAggregationPipeline = await $.Service.create({
name: 'Log Aggregation',
stream: { source: { type: 'kafka', topic: 'logs' } },
pricing: { model: 'per-hour', rate: 3.0 },
})Analytics Data Warehouse
const dataWarehousePipeline = await $.Service.create({
name: 'Analytics ETL',
schedule: { cron: '0 2 * * *' },
pricing: { model: 'per-gb', rate: 0.5 },
})Next Steps
- Workflow Automation → - Multi-step workflows
- Business Process Automation → - Complete processes
- Integration Services → - System connectivity
- Service Composition → - Combine services
- Monetization → - Pricing strategies
Business Process Automation
Build services that automate complete business processes including invoice processing, expense management, scheduling, approvals, and document workflows
Integration Automation Services
Build services that orchestrate multi-system workflows, API choreography, event routing, service coordination, and cross-platform automation