.do
Service TypesAutomation

Data Pipeline Services

Build ETL, data transformation, real-time and batch processing services for moving and transforming data at scale

Build data pipeline services that extract, transform, and load data between systems, enabling real-time streaming, batch processing, data enrichment, and quality assurance at scale.

Overview

Data pipeline services automate the movement and transformation of data across systems. They're essential for:

  • ETL Operations: Extract, Transform, Load workflows
  • Data Synchronization: Keep systems in sync in real-time
  • Data Warehousing: Load data into analytics platforms
  • Data Enrichment: Enhance data with additional information
  • Real-time Processing: Stream processing for immediate insights
  • Batch Processing: Process large datasets efficiently

Core Concepts

Pipeline Architecture

import $, { db, on, send, every } from 'sdk.do'

// Define ETL pipeline service
const customerDataPipeline = await $.Service.create({
  name: 'Customer Data ETL Pipeline',
  type: $.ServiceType.Automation,
  subtype: 'data-pipeline',

  // Pipeline configuration
  pipeline: {
    source: {
      type: 'database',
      connection: 'crm-database',
      query: 'SELECT * FROM customers WHERE updated_at > ?',
      incremental: true,
    },
    transformations: [
      {
        id: 'normalize',
        type: 'function',
        description: 'Normalize customer data',
      },
      {
        id: 'enrich',
        type: 'function',
        description: 'Enrich with third-party data',
      },
      {
        id: 'validate',
        type: 'function',
        description: 'Validate data quality',
      },
    ],
    destination: {
      type: 'data-warehouse',
      connection: 'analytics-warehouse',
      table: 'customers',
      mode: 'upsert',
    },
  },

  // Schedule
  schedule: {
    type: 'cron',
    expression: '0 */6 * * *', // Every 6 hours
    timezone: 'UTC',
  },

  // Pricing
  pricing: {
    model: 'per-gb',
    rate: 1.0,
    minimumCharge: 0.1,
    additionalCosts: {
      enrichment: 0.5, // per 1000 records
    },
  },
})

Building ETL Pipelines

Extract Phase

// Extract data from source
async function extractData(source: any, lastRunTime: Date) {
  switch (source.type) {
    case 'database':
      return await extractFromDatabase(source, lastRunTime)
    case 'api':
      return await extractFromAPI(source, lastRunTime)
    case 'file':
      return await extractFromFile(source)
    case 'stream':
      return await extractFromStream(source)
    default:
      throw new Error(`Unsupported source type: ${source.type}`)
  }
}

// Database extraction
async function extractFromDatabase(source: any, lastRunTime: Date) {
  const connection = await getConnection(source.connection)

  try {
    const query = source.incremental ? [source.query, lastRunTime] : source.query

    const results = await connection.query(...(Array.isArray(query) ? query : [query]))

    return {
      records: results,
      count: results.length,
      extractedAt: new Date(),
      source: 'database',
      incremental: source.incremental,
    }
  } finally {
    await connection.close()
  }
}

// API extraction with pagination
async function extractFromAPI(source: any, lastRunTime: Date) {
  const records = []
  let page = 1
  let hasMore = true

  while (hasMore) {
    const response = await fetch(source.endpoint, {
      method: 'GET',
      headers: {
        Authorization: `Bearer ${source.apiKey}`,
        'Content-Type': 'application/json',
      },
      params: {
        page,
        per_page: 100,
        updated_since: lastRunTime.toISOString(),
      },
    })

    const data = await response.json()
    records.push(...data.results)

    hasMore = data.has_more
    page++

    // Rate limiting
    await sleep(source.rateLimitDelay || 100)
  }

  return {
    records,
    count: records.length,
    extractedAt: new Date(),
    source: 'api',
    pages: page - 1,
  }
}

// File extraction with streaming
async function extractFromFile(source: any) {
  const stream = await openFileStream(source.path)
  const records = []

  return new Promise((resolve, reject) => {
    stream
      .pipe(createParser(source.format)) // csv, json, parquet, etc.
      .on('data', (record) => {
        records.push(record)
      })
      .on('end', () => {
        resolve({
          records,
          count: records.length,
          extractedAt: new Date(),
          source: 'file',
          format: source.format,
        })
      })
      .on('error', reject)
  })
}

Transform Phase

// Transform extracted data
async function transformData(data: any, transformations: any[]) {
  let records = data.records
  const transformResults = []

  for (const transform of transformations) {
    const startTime = Date.now()

    try {
      switch (transform.type) {
        case 'function':
          records = await executeTransformFunction(records, transform)
          break
        case 'map':
          records = await mapTransform(records, transform)
          break
        case 'filter':
          records = await filterTransform(records, transform)
          break
        case 'aggregate':
          records = await aggregateTransform(records, transform)
          break
        case 'join':
          records = await joinTransform(records, transform)
          break
        default:
          throw new Error(`Unknown transform type: ${transform.type}`)
      }

      transformResults.push({
        id: transform.id,
        success: true,
        duration: Date.now() - startTime,
        recordsIn: records.length,
        recordsOut: records.length,
      })
    } catch (error) {
      transformResults.push({
        id: transform.id,
        success: false,
        error: error.message,
        duration: Date.now() - startTime,
      })

      if (transform.required !== false) {
        throw error
      }
    }
  }

  return {
    records,
    count: records.length,
    transformations: transformResults,
  }
}

// Map transform - field mapping and conversion
async function mapTransform(records: any[], transform: any) {
  return records.map((record) => {
    const mapped: any = {}

    for (const [targetField, mapping] of Object.entries(transform.mappings)) {
      const sourceField = (mapping as any).source
      const converter = (mapping as any).converter

      let value = getNestedValue(record, sourceField)

      // Apply converter if specified
      if (converter) {
        value = applyConverter(value, converter)
      }

      setNestedValue(mapped, targetField, value)
    }

    return mapped
  })
}

// Filter transform - filter records
async function filterTransform(records: any[], transform: any) {
  return records.filter((record) => {
    return evaluateFilter(record, transform.condition)
  })
}

// Aggregate transform - group and aggregate
async function aggregateTransform(records: any[], transform: any) {
  const groups = new Map()

  // Group records
  for (const record of records) {
    const groupKey = transform.groupBy.map((field: string) => getNestedValue(record, field)).join('|')

    if (!groups.has(groupKey)) {
      groups.set(groupKey, [])
    }

    groups.get(groupKey).push(record)
  }

  // Aggregate each group
  const aggregated = []

  for (const [groupKey, groupRecords] of groups) {
    const result: any = {}

    // Add group by fields
    const groupKeyParts = groupKey.split('|')
    transform.groupBy.forEach((field: string, index: number) => {
      result[field] = groupKeyParts[index]
    })

    // Add aggregations
    for (const [field, aggFunc] of Object.entries(transform.aggregations)) {
      const values = groupRecords.map((r: any) => getNestedValue(r, field))
      result[`${aggFunc}_${field}`] = applyAggregation(values, aggFunc as string)
    }

    aggregated.push(result)
  }

  return aggregated
}

// Apply aggregation function
function applyAggregation(values: any[], func: string): any {
  switch (func) {
    case 'sum':
      return values.reduce((sum, v) => sum + (Number(v) || 0), 0)
    case 'avg':
      return values.reduce((sum, v) => sum + (Number(v) || 0), 0) / values.length
    case 'min':
      return Math.min(...values.map((v) => Number(v) || 0))
    case 'max':
      return Math.max(...values.map((v) => Number(v) || 0))
    case 'count':
      return values.length
    case 'count_distinct':
      return new Set(values).size
    default:
      throw new Error(`Unknown aggregation function: ${func}`)
  }
}

Load Phase

// Load transformed data to destination
async function loadData(data: any, destination: any) {
  switch (destination.type) {
    case 'database':
      return await loadToDatabase(data, destination)
    case 'data-warehouse':
      return await loadToWarehouse(data, destination)
    case 'api':
      return await loadToAPI(data, destination)
    case 'file':
      return await loadToFile(data, destination)
    default:
      throw new Error(`Unsupported destination type: ${destination.type}`)
  }
}

// Load to database with upsert
async function loadToDatabase(data: any, destination: any) {
  const connection = await getConnection(destination.connection)
  const batchSize = destination.batchSize || 1000
  let loaded = 0
  let errors = 0

  try {
    // Process in batches
    for (let i = 0; i < data.records.length; i += batchSize) {
      const batch = data.records.slice(i, i + batchSize)

      try {
        switch (destination.mode) {
          case 'insert':
            await connection.insertMany(destination.table, batch)
            break
          case 'upsert':
            await connection.upsertMany(destination.table, batch, destination.keyFields)
            break
          case 'replace':
            await connection.replaceMany(destination.table, batch, destination.keyFields)
            break
          default:
            throw new Error(`Unknown load mode: ${destination.mode}`)
        }

        loaded += batch.length
      } catch (error) {
        errors += batch.length

        // Handle errors based on strategy
        if (destination.errorHandling === 'continue') {
          // Log and continue
          await logLoadError(batch, error)
        } else {
          throw error
        }
      }
    }

    return {
      loaded,
      errors,
      total: data.records.length,
      loadedAt: new Date(),
    }
  } finally {
    await connection.close()
  }
}

// Load to data warehouse (optimized for analytics)
async function loadToWarehouse(data: any, destination: any) {
  const connection = await getConnection(destination.connection)

  try {
    // Create staging table
    const stagingTable = `${destination.table}_staging_${Date.now()}`
    await connection.createTable(stagingTable, data.schema)

    // Bulk load to staging
    await connection.bulkLoad(stagingTable, data.records)

    // Merge from staging to target
    await connection.execute(`
      MERGE INTO ${destination.table} AS target
      USING ${stagingTable} AS source
      ON ${destination.keyFields.map((f) => `target.${f} = source.${f}`).join(' AND ')}
      WHEN MATCHED THEN UPDATE SET *
      WHEN NOT MATCHED THEN INSERT *
    `)

    // Drop staging table
    await connection.dropTable(stagingTable)

    return {
      loaded: data.records.length,
      errors: 0,
      total: data.records.length,
      loadedAt: new Date(),
      method: 'bulk-merge',
    }
  } finally {
    await connection.close()
  }
}

Complete ETL Pipeline Implementation

// Execute full ETL pipeline
on($.ServiceRequest.created, async (request) => {
  if (request.serviceId !== customerDataPipeline.id) return

  const execution = {
    requestId: request.id,
    startTime: Date.now(),
    steps: [],
    metrics: {
      recordsExtracted: 0,
      recordsTransformed: 0,
      recordsLoaded: 0,
      bytesProcessed: 0,
      errors: 0,
    },
  }

  try {
    // Get last successful run time
    const lastRun = await db.findOne($.PipelineExecution, {
      serviceId: customerDataPipeline.id,
      status: 'success',
      orderBy: { completedAt: 'desc' },
    })

    const lastRunTime = lastRun?.completedAt || new Date(0)

    // EXTRACT
    await send($.PipelineProgress.updated, {
      requestId: request.id,
      phase: 'extract',
      status: 'running',
    })

    const extracted = await extractData(customerDataPipeline.pipeline.source, lastRunTime)

    execution.metrics.recordsExtracted = extracted.count
    execution.steps.push({
      phase: 'extract',
      success: true,
      recordCount: extracted.count,
    })

    // TRANSFORM
    await send($.PipelineProgress.updated, {
      requestId: request.id,
      phase: 'transform',
      status: 'running',
      recordCount: extracted.count,
    })

    const transformed = await transformData(extracted, customerDataPipeline.pipeline.transformations)

    execution.metrics.recordsTransformed = transformed.count
    execution.steps.push({
      phase: 'transform',
      success: true,
      recordCount: transformed.count,
      transformations: transformed.transformations,
    })

    // LOAD
    await send($.PipelineProgress.updated, {
      requestId: request.id,
      phase: 'load',
      status: 'running',
      recordCount: transformed.count,
    })

    const loaded = await loadData(transformed, customerDataPipeline.pipeline.destination)

    execution.metrics.recordsLoaded = loaded.loaded
    execution.metrics.errors = loaded.errors
    execution.steps.push({
      phase: 'load',
      success: true,
      loaded: loaded.loaded,
      errors: loaded.errors,
    })

    // Calculate bytes processed
    execution.metrics.bytesProcessed = calculateBytesProcessed(extracted.records)

    // Store execution record
    await db.create($.PipelineExecution, {
      serviceId: customerDataPipeline.id,
      requestId: request.id,
      status: 'success',
      startedAt: new Date(execution.startTime),
      completedAt: new Date(),
      duration: Date.now() - execution.startTime,
      metrics: execution.metrics,
      steps: execution.steps,
    })

    // Deliver results
    await send($.ServiceResult.deliver, {
      requestId: request.id,
      outputs: {
        success: true,
        recordsProcessed: execution.metrics.recordsLoaded,
        bytesProcessed: execution.metrics.bytesProcessed,
        duration: Date.now() - execution.startTime,
        steps: execution.steps,
      },
    })

    // Calculate cost
    const gbProcessed = execution.metrics.bytesProcessed / (1024 * 1024 * 1024)
    const cost = Math.max(gbProcessed * customerDataPipeline.pricing.rate, customerDataPipeline.pricing.minimumCharge)

    await send($.Payment.charge, {
      customerId: request.customerId,
      amount: cost,
      description: `ETL Pipeline - ${gbProcessed.toFixed(2)} GB processed`,
    })
  } catch (error) {
    await send($.ServiceRequest.fail, {
      requestId: request.id,
      error: error.message,
      execution: execution.steps,
      metrics: execution.metrics,
    })
  }
})

Real-Time Stream Processing

// Real-time event stream processor
const eventStreamPipeline = await $.Service.create({
  name: 'Real-Time Event Processor',
  type: $.ServiceType.Automation,
  subtype: 'stream-pipeline',

  stream: {
    source: {
      type: 'kafka',
      topic: 'user-events',
      consumerGroup: 'analytics-processor',
    },
    processors: [
      {
        id: 'parse',
        type: 'function',
        description: 'Parse event data',
      },
      {
        id: 'enrich',
        type: 'function',
        description: 'Enrich with user context',
      },
      {
        id: 'aggregate',
        type: 'window',
        window: { type: 'tumbling', size: 60000 }, // 1 minute
        aggregations: {
          event_count: 'count',
          unique_users: 'count_distinct:userId',
          total_value: 'sum:value',
        },
      },
    ],
    sinks: [
      {
        type: 'database',
        connection: 'analytics-db',
        table: 'event_aggregates',
      },
      {
        type: 'kafka',
        topic: 'processed-events',
      },
    ],
  },

  pricing: {
    model: 'per-hour',
    rate: 2.0,
    includes: 1000000, // events per hour
    overage: 0.001, // per event over limit
  },
})

// Stream processor implementation
class StreamProcessor {
  private windowBuffer: Map<string, any[]> = new Map()
  private windowTimers: Map<string, NodeJS.Timeout> = new Map()

  async processStream(pipeline: any) {
    const consumer = await createKafkaConsumer(pipeline.stream.source)

    consumer.on('message', async (message) => {
      try {
        let data = JSON.parse(message.value.toString())

        // Process through pipeline
        for (const processor of pipeline.stream.processors) {
          data = await this.processEvent(data, processor)
        }

        // Write to sinks
        for (const sink of pipeline.stream.sinks) {
          await this.writeToSink(data, sink)
        }

        // Commit offset
        await consumer.commitOffsets([
          {
            topic: message.topic,
            partition: message.partition,
            offset: message.offset + 1,
          },
        ])
      } catch (error) {
        await this.handleStreamError(message, error)
      }
    })

    await consumer.subscribe([pipeline.stream.source.topic])
    await consumer.run()
  }

  async processEvent(data: any, processor: any) {
    switch (processor.type) {
      case 'function':
        return await processor.handler(data)

      case 'window':
        return await this.processWindow(data, processor)

      case 'join':
        return await this.processJoin(data, processor)

      default:
        return data
    }
  }

  async processWindow(event: any, processor: any) {
    const windowKey = this.getWindowKey(event, processor.window)

    // Add to window buffer
    if (!this.windowBuffer.has(windowKey)) {
      this.windowBuffer.set(windowKey, [])

      // Set timer to process window
      const timer = setTimeout(() => this.emitWindow(windowKey, processor), processor.window.size)
      this.windowTimers.set(windowKey, timer)
    }

    this.windowBuffer.get(windowKey)!.push(event)

    // Return null to prevent immediate output
    return null
  }

  async emitWindow(windowKey: string, processor: any) {
    const events = this.windowBuffer.get(windowKey) || []
    this.windowBuffer.delete(windowKey)
    this.windowTimers.delete(windowKey)

    // Calculate aggregations
    const result: any = {
      windowStart: new Date(parseInt(windowKey)),
      windowEnd: new Date(parseInt(windowKey) + processor.window.size),
      eventCount: events.length,
    }

    for (const [name, aggFunc] of Object.entries(processor.aggregations)) {
      const [func, field] = (aggFunc as string).split(':')
      const values = field ? events.map((e) => e[field]) : events

      result[name] = applyAggregation(values, func)
    }

    // Write aggregated result to sinks
    for (const sink of processor.pipeline.stream.sinks) {
      await this.writeToSink(result, sink)
    }
  }

  getWindowKey(event: any, window: any): string {
    const timestamp = event.timestamp || Date.now()

    switch (window.type) {
      case 'tumbling':
        // Round down to window boundary
        return String(Math.floor(timestamp / window.size) * window.size)

      case 'sliding':
        // Multiple overlapping windows
        const windows = []
        let windowStart = Math.floor(timestamp / window.slide) * window.slide
        while (windowStart <= timestamp && windowStart > timestamp - window.size) {
          windows.push(String(windowStart))
          windowStart -= window.slide
        }
        return windows[0] // Return most recent for now

      case 'session':
        // Session windows with timeout
        return `${event.userId}_${timestamp}`

      default:
        return String(timestamp)
    }
  }

  async writeToSink(data: any, sink: any) {
    if (!data) return // Skip null data (e.g., from windowing)

    switch (sink.type) {
      case 'database':
        const connection = await getConnection(sink.connection)
        await connection.insert(sink.table, data)
        await connection.close()
        break

      case 'kafka':
        const producer = await createKafkaProducer()
        await producer.send({
          topic: sink.topic,
          messages: [{ value: JSON.stringify(data) }],
        })
        break

      case 'api':
        await fetch(sink.endpoint, {
          method: 'POST',
          headers: { 'Content-Type': 'application/json' },
          body: JSON.stringify(data),
        })
        break
    }
  }
}

Data Quality and Validation

// Data quality pipeline
const dataQualityPipeline = await $.Service.create({
  name: 'Data Quality Checker',
  type: $.ServiceType.Automation,
  subtype: 'data-pipeline',

  validation: {
    rules: [
      {
        id: 'completeness',
        type: 'required-fields',
        fields: ['id', 'email', 'created_at'],
        severity: 'error',
      },
      {
        id: 'format',
        type: 'regex',
        field: 'email',
        pattern: '^[^@]+@[^@]+\\.[^@]+$',
        severity: 'error',
      },
      {
        id: 'range',
        type: 'numeric-range',
        field: 'age',
        min: 0,
        max: 150,
        severity: 'warning',
      },
      {
        id: 'uniqueness',
        type: 'unique',
        field: 'email',
        severity: 'error',
      },
      {
        id: 'referential',
        type: 'foreign-key',
        field: 'customer_id',
        reference: { table: 'customers', field: 'id' },
        severity: 'error',
      },
    ],
    actions: {
      error: 'reject',
      warning: 'flag',
      info: 'log',
    },
  },

  pricing: {
    model: 'per-record',
    rate: 0.001, // $0.001 per record
    minimumCharge: 0.1,
  },
})

// Validate data quality
async function validateDataQuality(records: any[], rules: any[]) {
  const results = {
    total: records.length,
    passed: 0,
    failed: 0,
    warnings: 0,
    errors: [],
    warnings: [],
  }

  const seenValues = new Map()

  for (let i = 0; i < records.length; i++) {
    const record = records[i]
    let recordValid = true

    for (const rule of rules) {
      const violation = await validateRule(record, rule, seenValues, i)

      if (violation) {
        if (rule.severity === 'error') {
          results.errors.push({
            recordIndex: i,
            record,
            rule: rule.id,
            message: violation,
          })
          recordValid = false
        } else if (rule.severity === 'warning') {
          results.warnings.push({
            recordIndex: i,
            record,
            rule: rule.id,
            message: violation,
          })
        }
      }
    }

    if (recordValid) {
      results.passed++
    } else {
      results.failed++
    }
  }

  return results
}

async function validateRule(record: any, rule: any, seenValues: Map<string, Set<any>>, index: number): Promise<string | null> {
  switch (rule.type) {
    case 'required-fields':
      for (const field of rule.fields) {
        if (!record[field]) {
          return `Missing required field: ${field}`
        }
      }
      return null

    case 'regex':
      const value = record[rule.field]
      if (value && !new RegExp(rule.pattern).test(value)) {
        return `Field ${rule.field} does not match pattern: ${rule.pattern}`
      }
      return null

    case 'numeric-range':
      const numValue = Number(record[rule.field])
      if (isNaN(numValue) || numValue < rule.min || numValue > rule.max) {
        return `Field ${rule.field} out of range: ${rule.min} - ${rule.max}`
      }
      return null

    case 'unique':
      const fieldValue = record[rule.field]
      if (!seenValues.has(rule.field)) {
        seenValues.set(rule.field, new Set())
      }
      const seen = seenValues.get(rule.field)!
      if (seen.has(fieldValue)) {
        return `Duplicate value for ${rule.field}: ${fieldValue}`
      }
      seen.add(fieldValue)
      return null

    case 'foreign-key':
      const fkValue = record[rule.field]
      const exists = await checkForeignKeyExists(rule.reference.table, rule.reference.field, fkValue)
      if (!exists) {
        return `Invalid foreign key ${rule.field}: ${fkValue} not found in ${rule.reference.table}`
      }
      return null

    default:
      return null
  }
}

Batch Processing Pipelines

// Large-scale batch processing
const batchProcessingPipeline = await $.Service.create({
  name: 'Batch Data Processor',
  type: $.ServiceType.Automation,
  subtype: 'batch-pipeline',

  batch: {
    source: {
      type: 's3',
      bucket: 'raw-data',
      prefix: 'daily-exports/',
      pattern: '*.csv',
    },
    processing: {
      chunkSize: 10000, // Process 10k records at a time
      parallelism: 4, // 4 parallel workers
      transformations: [
        { id: 'parse', type: 'function' },
        { id: 'cleanse', type: 'function' },
        { id: 'transform', type: 'function' },
      ],
    },
    destination: {
      type: 's3',
      bucket: 'processed-data',
      prefix: 'processed/',
      format: 'parquet',
      compression: 'snappy',
    },
  },

  pricing: {
    model: 'per-gb',
    rate: 0.5,
    minimumCharge: 0.1,
  },
})

// Batch processor with parallel workers
async function processBatchPipeline(pipeline: any, request: any) {
  // List all files to process
  const files = await listS3Files(pipeline.batch.source.bucket, pipeline.batch.source.prefix, pipeline.batch.source.pattern)

  const totalBytes = files.reduce((sum, f) => sum + f.size, 0)
  let processedBytes = 0

  // Process files in parallel
  const workers = []
  for (let i = 0; i < pipeline.batch.processing.parallelism; i++) {
    workers.push(createBatchWorker(i, pipeline))
  }

  // Distribute files to workers
  const fileQueue = [...files]

  const processFile = async (file: any) => {
    // Download and parse file
    const stream = await downloadS3File(file.bucket, file.key)
    const chunks = []

    // Process in chunks
    let chunk = []
    let chunkSize = 0

    for await (const record of parseStream(stream, pipeline.batch.source.format)) {
      chunk.push(record)
      chunkSize++

      if (chunkSize >= pipeline.batch.processing.chunkSize) {
        chunks.push([...chunk])
        chunk = []
        chunkSize = 0
      }
    }

    if (chunk.length > 0) {
      chunks.push(chunk)
    }

    // Process chunks
    const results = []
    for (const chunk of chunks) {
      const transformed = await transformChunk(chunk, pipeline.batch.processing.transformations)
      results.push(...transformed)
    }

    // Write results
    const outputKey = `${pipeline.batch.destination.prefix}${file.key.split('/').pop()}`
    await writeToS3(pipeline.batch.destination.bucket, outputKey, results, pipeline.batch.destination.format, pipeline.batch.destination.compression)

    processedBytes += file.size

    // Update progress
    await send($.PipelineProgress.updated, {
      requestId: request.id,
      processedBytes,
      totalBytes,
      progress: processedBytes / totalBytes,
    })

    return {
      file: file.key,
      recordsProcessed: results.length,
      bytesProcessed: file.size,
    }
  }

  // Process all files
  const results = await Promise.all(fileQueue.map((file) => processFile(file)))

  return {
    filesProcessed: results.length,
    totalRecords: results.reduce((sum, r) => sum + r.recordsProcessed, 0),
    totalBytes: processedBytes,
  }
}

Pricing Models for Pipelines

Per-GB Pricing

const pipeline = await $.Service.create({
  name: 'Data Transfer Pipeline',
  pricing: {
    model: 'per-gb',
    rate: 1.0,
    minimumCharge: 0.1,
    tiers: [
      { min: 0, max: 10, rate: 1.0 },
      { min: 10, max: 100, rate: 0.75 },
      { min: 100, max: 1000, rate: 0.5 },
      { min: 1000, max: Infinity, rate: 0.25 },
    ],
  },
})

Per-Hour Streaming

const pipeline = await $.Service.create({
  name: 'Stream Processor',
  pricing: {
    model: 'per-hour',
    rate: 2.0,
    includes: 1000000, // events per hour
    overage: 0.001, // per additional event
  },
})

Per-Record Pricing

const pipeline = await $.Service.create({
  name: 'Record Validator',
  pricing: {
    model: 'per-record',
    rate: 0.001,
    minimumCharge: 0.1,
    volume: [
      { min: 0, max: 10000, rate: 0.001 },
      { min: 10001, max: 100000, rate: 0.0005 },
      { min: 100001, max: Infinity, rate: 0.0001 },
    ],
  },
})

Best Practices

1. Incremental Processing

Only process new or changed data:

// Track last successful run
const lastRun = await db.findOne($.PipelineExecution, {
  serviceId: pipeline.id,
  status: 'success',
  orderBy: { completedAt: 'desc' },
})

const watermark = lastRun?.completedAt || new Date(0)

// Query only changed data
const query = `
  SELECT * FROM source_table
  WHERE updated_at > ?
  ORDER BY updated_at
`

2. Error Handling and Dead Letter Queues

// Failed record handling
async function handleFailedRecord(record: any, error: Error) {
  // Write to dead letter queue
  await db.create($.FailedRecord, {
    pipelineId: pipeline.id,
    record,
    error: error.message,
    timestamp: new Date(),
  })

  // Alert if too many failures
  const recentFailures = await db.count($.FailedRecord, {
    where: {
      pipelineId: pipeline.id,
      timestamp: { gte: new Date(Date.now() - 3600000) }, // Last hour
    },
  })

  if (recentFailures > 100) {
    await send($.Alert.create, {
      severity: 'high',
      message: `Pipeline ${pipeline.name} has ${recentFailures} failures in last hour`,
    })
  }
}

3. Monitoring and Observability

// Pipeline metrics
async function trackPipelineMetrics(execution: any) {
  await db.create($.PipelineMetric, {
    pipelineId: execution.pipelineId,
    timestamp: new Date(),
    metrics: {
      recordsProcessed: execution.metrics.recordsLoaded,
      bytesProcessed: execution.metrics.bytesProcessed,
      duration: execution.duration,
      throughput: execution.metrics.recordsLoaded / (execution.duration / 1000),
      errorRate: execution.metrics.errors / execution.metrics.recordsExtracted,
    },
  })
}

Real-World Examples

Customer Data Sync

const customerSyncPipeline = await $.Service.create({
  name: 'Customer Data Sync',
  schedule: { cron: '0 */4 * * *' },
  pricing: { model: 'per-gb', rate: 1.0 },
})

Log Aggregation

const logAggregationPipeline = await $.Service.create({
  name: 'Log Aggregation',
  stream: { source: { type: 'kafka', topic: 'logs' } },
  pricing: { model: 'per-hour', rate: 3.0 },
})

Analytics Data Warehouse

const dataWarehousePipeline = await $.Service.create({
  name: 'Analytics ETL',
  schedule: { cron: '0 2 * * *' },
  pricing: { model: 'per-gb', rate: 0.5 },
})

Next Steps