- Introduction & Core Concepts
- Basic Usage Patterns
- Real-World Examples
- Advanced Features Deep Dive
- Practical Patterns & Best Practices
- Integration Examples
Low-level streaming operations often require precise control over data flow, memory usage, and resource management. Traditional approaches using Node.js streams, async iterators, or manual state management lead to complex, error-prone code that's difficult to compose and reason about.
// Traditional Node.js Transform Stream - complex and error-prone
import { Transform, Readable, Writable } from 'stream'
import { pipeline } from 'stream/promises'
class CustomTransform extends Transform {
private buffer: Buffer[] = []
private isProcessing = false
_transform(chunk: any, encoding: BufferEncoding, callback: Function) {
try {
// Manual buffering and flow control
this.buffer.push(chunk)
if (!this.isProcessing) {
this.isProcessing = true
this.processBuffer()
}
callback()
} catch (error) {
callback(error)
}
}
private async processBuffer() {
while (this.buffer.length > 0) {
const chunk = this.buffer.shift()!
try {
// Complex async processing
const processed = await this.processChunk(chunk)
this.push(processed)
} catch (error) {
this.emit('error', error)
return
}
}
this.isProcessing = false
}
private async processChunk(chunk: Buffer): Promise<Buffer> {
// Simulate complex processing
return Buffer.from(chunk.toString().toUpperCase())
}
}
// Usage requires complex error handling and resource management
async function processStream() {
try {
const source = new Readable({
read() {
this.push('hello')
this.push('world')
this.push(null)
}
})
const transform = new CustomTransform()
const destination = new Writable({
write(chunk, encoding, callback) {
console.log(chunk.toString())
callback()
}
})
await pipeline(source, transform, destination)
} catch (error) {
// Error handling is separate from stream logic
console.error('Stream failed:', error)
throw error
}
}
// Async iterator approach - limited composability
async function* asyncGenerator() {
yield 'hello'
yield 'world'
}
async function processAsyncIterator() {
try {
for await (const item of asyncGenerator()) {
// Manual error handling for each operation
const processed = item.toUpperCase()
console.log(processed)
}
} catch (error) {
console.error('Processing failed:', error)
}
}This approach leads to:
- Complex State Management - Manual handling of buffering, flow control, and processing state
- Error Handling Scattered - Error handling logic mixed throughout the streaming code
- Resource Leaks - Difficult to ensure proper cleanup of resources and handlers
- Limited Composability - Hard to combine different streaming operations cleanly
- Poor Performance Control - No built-in backpressure or rate limiting mechanisms
Channel provides the foundational building blocks for Effect's streaming ecosystem, giving you precise control over data flow with built-in resource management, error handling, and composition.
import { Channel, Effect, Chunk } from "effect"
// The Channel solution - precise control with safety
const processData = Channel.succeed("hello").pipe(
Channel.flatMap(data =>
Channel.write(data.toUpperCase())
),
Channel.pipeTo(
Channel.readWith({
onInput: (input: string) => Channel.write(`Processed: ${input}`),
onFailure: (error) => Channel.fail(error),
onDone: () => Channel.succeed("Complete")
})
)
)
// Convert to Stream for easier consumption
const processedStream = Channel.toStream(processData)
// Run the processing pipeline
const result = Effect.gen(function* () {
const output = yield* Stream.runCollect(processedStream)
return Chunk.toReadonlyArray(output)
})Channel: A nexus of I/O operations that supports both reading and writing values, with precise control over input/output types and error handling.
Pull-based Processing: Channels operate on a pull-based model where downstream consumers request data, enabling natural backpressure and memory efficiency.
Type-Safe I/O: Channels maintain separate types for input elements, output elements, errors, and completion values, providing complete type safety through the pipeline.
Composition Primitives: Channels can be piped, sequenced, and concatenated to build complex data processing pipelines from simple building blocks.
import { Channel, Effect, Chunk, Option } from "effect"
// Create a channel that succeeds with a value
const successChannel = Channel.succeed(42)
// Create a channel that writes a single value
const writeChannel = Channel.write("Hello, Channel!")
// Create a channel that writes multiple values
const writeAllChannel = Channel.writeAll(["first", "second", "third"])
// Create a channel that reads input
const readChannel = Channel.read<string>()
// Create a channel from an Effect
const effectChannel = Channel.fromEffect(
Effect.succeed("From Effect")
)
// Create a channel that fails
const failChannel = Channel.fail(new Error("Channel failed"))
// Create a never-completing channel
const neverChannel = Channel.never// Basic read-write pattern
const echoChannel = Channel.read<string>().pipe(
Channel.flatMap(input => Channel.write(input))
)
// Read with explicit handling
const processChannel = Channel.readWith({
onInput: (input: string) =>
Channel.write(input.toUpperCase()),
onFailure: (error) =>
Channel.fail(new Error(`Processing failed: ${error}`)),
onDone: (value) =>
Channel.succeed(`Completed with: ${value}`)
})
// Read or fail if no input available
const readOrFailChannel = Channel.readOrFail(
new Error("No input available")
)
// Conditional reading and writing
const conditionalChannel = Channel.read<number>().pipe(
Channel.flatMap(n =>
n > 0
? Channel.write(`Positive: ${n}`)
: Channel.fail(new Error("Non-positive number"))
)
)// Pipe channels together
const pipeline = Channel.write("input").pipe(
Channel.pipeTo(
Channel.readWith({
onInput: (input: string) =>
Channel.write(input.split('').reverse().join('')),
onFailure: Channel.fail,
onDone: () => Channel.succeed("Done")
})
)
)
// Sequential composition with flatMap
const sequentialChannel = Channel.succeed("Hello").pipe(
Channel.flatMap(greeting =>
Channel.write(greeting).pipe(
Channel.zipRight(Channel.write("World"))
)
)
)
// Merge multiple channels
const mergedChannel = Channel.mergeWith(
Channel.write("Channel 1"),
Channel.write("Channel 2"),
(a, b) => `${a} + ${b}`
)Building a specialized log processing system that requires precise control over parsing, filtering, and output formatting.
import { Channel, Effect, Chunk, pipe, Option } from "effect"
// Define log entry structure
interface LogEntry {
timestamp: Date
level: "DEBUG" | "INFO" | "WARN" | "ERROR"
message: string
metadata?: Record<string, unknown>
}
interface ParseError {
_tag: "ParseError"
line: string
reason: string
}
// Channel for parsing log lines
const parseLogLine = Channel.read<string>().pipe(
Channel.flatMap(line => {
try {
// Parse log line format: [timestamp] [level] message metadata?
const match = line.match(/^\[([^\]]+)\] \[([^\]]+)\] (.+)$/)
if (!match) {
return Channel.fail({
_tag: "ParseError" as const,
line,
reason: "Invalid log format"
})
}
const [, timestamp, level, rest] = match
// Try to extract JSON metadata
const metadataMatch = rest.match(/^(.+?) (\{.+\})$/)
let message = rest
let metadata: Record<string, unknown> | undefined
if (metadataMatch) {
message = metadataMatch[1]
try {
metadata = JSON.parse(metadataMatch[2])
} catch {
// Invalid JSON metadata, treat as part of message
message = rest
}
}
const entry: LogEntry = {
timestamp: new Date(timestamp),
level: level as LogEntry["level"],
message,
metadata
}
return Channel.write(entry)
} catch (error) {
return Channel.fail({
_tag: "ParseError" as const,
line,
reason: String(error)
})
}
})
)
// Channel for filtering log entries by level
const filterByLevel = (minLevel: LogEntry["level"]) => {
const levelPriority = {
"DEBUG": 0,
"INFO": 1,
"WARN": 2,
"ERROR": 3
}
const minPriority = levelPriority[minLevel]
return Channel.read<LogEntry>().pipe(
Channel.flatMap(entry => {
const entryPriority = levelPriority[entry.level]
if (entryPriority >= minPriority) {
return Channel.write(entry)
} else {
// Skip this entry by reading the next one
return filterByLevel(minLevel)
}
})
)
}
// Channel for formatting output
const formatEntry = Channel.read<LogEntry>().pipe(
Channel.flatMap(entry => {
const timestamp = entry.timestamp.toISOString()
const metadata = entry.metadata
? ` ${JSON.stringify(entry.metadata)}`
: ""
const formatted = `${timestamp} [${entry.level}] ${entry.message}${metadata}`
return Channel.write(formatted)
})
)
// Channel for batching output
const batchOutput = (batchSize: number) => {
const collectBatch = (
collected: string[],
remaining: number
): Channel.Channel<Chunk.Chunk<string>, string, ParseError> => {
if (remaining === 0) {
return Channel.succeed(Chunk.fromIterable(collected))
}
return Channel.readWith({
onInput: (input: string) =>
collectBatch([...collected, input], remaining - 1),
onFailure: (error) => Channel.fail(error),
onDone: () =>
collected.length > 0
? Channel.succeed(Chunk.fromIterable(collected))
: Channel.succeed(Chunk.empty())
})
}
return collectBatch([], batchSize)
}
// Complete log processing pipeline
const createLogProcessor = (
minLevel: LogEntry["level"] = "INFO",
batchSize: number = 100
) => {
return Channel.identity<string>().pipe(
Channel.pipeTo(Channel.repeated(parseLogLine)),
Channel.pipeTo(Channel.repeated(filterByLevel(minLevel))),
Channel.pipeTo(Channel.repeated(formatEntry)),
Channel.pipeTo(Channel.repeated(batchOutput(batchSize)))
)
}
// Usage with file processing
const processLogFile = (filePath: string) =>
Effect.gen(function* () {
const fileSystem = yield* NodeFS.NodeFileSystem
// Create file reader channel
const fileReader = Channel.unwrapScoped(
Effect.gen(function* () {
const stream = yield* fileSystem.stream(filePath)
return Channel.fromReadableStream(
() => stream,
(error) => ({ _tag: "ParseError" as const, line: "", reason: String(error) })
).pipe(
Channel.pipeTo(Channel.splitLines)
)
})
)
// Create the complete pipeline
const processor = fileReader.pipe(
Channel.pipeTo(createLogProcessor("WARN", 50))
)
// Run and collect results
const results = yield* Channel.runCollect(processor)
return results
})
// Advanced: Real-time log monitoring with alerting
const createLogMonitor = (
logSource: Channel.Channel<string, never, never>,
alertThreshold: number = 10
) =>
Effect.gen(function* () {
const errorCount = yield* Ref.make(0)
const alertSent = yield* Ref.make(false)
const monitoringChannel = logSource.pipe(
Channel.pipeTo(Channel.repeated(parseLogLine)),
Channel.pipeTo(
Channel.readWith({
onInput: (entry: LogEntry) =>
Effect.gen(function* () {
if (entry.level === "ERROR") {
const current = yield* Ref.updateAndGet(errorCount, n => n + 1)
if (current >= alertThreshold) {
const alreadySent = yield* Ref.get(alertSent)
if (!alreadySent) {
yield* Effect.log(`ALERT: ${current} errors detected in log stream`)
yield* Ref.set(alertSent, true)
// Reset counter and alert flag after some time
yield* Effect.sleep(Duration.minutes(5)).pipe(
Effect.zipRight(Ref.set(errorCount, 0)),
Effect.zipRight(Ref.set(alertSent, false)),
Effect.fork
)
}
}
}
return yield* Channel.write(entry)
}).pipe(Channel.fromEffect, Channel.flatten),
onFailure: (error) =>
Effect.gen(function* () {
yield* Effect.log(`Parse error: ${error.reason} for line: ${error.line}`)
return yield* Channel.succeed(Option.none<LogEntry>())
}).pipe(Channel.fromEffect, Channel.flatten),
onDone: () => Channel.void
})
)
)
return monitoringChannel
}).pipe(Channel.unwrap)Building a high-performance data format converter that transforms between different serialization formats with precise control over memory usage.
// Define conversion interfaces
interface ConversionError {
_tag: "ConversionError"
input: string
format: string
reason: string
}
interface ConversionResult<T> {
data: T
format: "json" | "csv" | "xml" | "yaml"
size: number
}
// Channel for format detection
const detectFormat = Channel.read<string>().pipe(
Channel.flatMap(input => {
const trimmed = input.trim()
if (trimmed.startsWith('{') || trimmed.startsWith('[')) {
return Channel.write({ input, format: "json" as const })
} else if (trimmed.startsWith('<')) {
return Channel.write({ input, format: "xml" as const })
} else if (trimmed.includes(',') && !trimmed.includes(':')) {
return Channel.write({ input, format: "csv" as const })
} else if (trimmed.includes(':') && !trimmed.startsWith('<')) {
return Channel.write({ input, format: "yaml" as const })
} else {
return Channel.fail({
_tag: "ConversionError" as const,
input: input.slice(0, 100),
format: "unknown",
reason: "Unable to detect format"
})
}
})
)
// Channel for parsing JSON
const parseJSON = Channel.read<{ input: string; format: "json" }>().pipe(
Channel.flatMap(({ input }) => {
try {
const data = JSON.parse(input)
return Channel.write({
data,
format: "json" as const,
size: input.length
})
} catch (error) {
return Channel.fail({
_tag: "ConversionError" as const,
input: input.slice(0, 100),
format: "json",
reason: String(error)
})
}
})
)
// Channel for parsing CSV
const parseCSV = Channel.read<{ input: string; format: "csv" }>().pipe(
Channel.flatMap(({ input }) => {
try {
const lines = input.trim().split('\n')
const headers = lines[0].split(',').map(h => h.trim())
const rows = lines.slice(1).map(line => {
const values = line.split(',').map(v => v.trim())
return headers.reduce((obj, header, index) => {
obj[header] = values[index] || ""
return obj
}, {} as Record<string, string>)
})
return Channel.write({
data: rows,
format: "csv" as const,
size: input.length
})
} catch (error) {
return Channel.fail({
_tag: "ConversionError" as const,
input: input.slice(0, 100),
format: "csv",
reason: String(error)
})
}
})
)
// Channel for format conversion
const convertToFormat = (targetFormat: "json" | "csv" | "xml" | "yaml") =>
Channel.read<ConversionResult<unknown>>().pipe(
Channel.flatMap(result => {
try {
let converted: string
let size: number
switch (targetFormat) {
case "json":
converted = JSON.stringify(result.data, null, 2)
size = converted.length
break
case "csv":
if (Array.isArray(result.data) && result.data.length > 0) {
const headers = Object.keys(result.data[0])
const csvRows = [
headers.join(','),
...result.data.map(row =>
headers.map(h => String(row[h] || "")).join(',')
)
]
converted = csvRows.join('\n')
size = converted.length
} else {
throw new Error("Data must be array of objects for CSV conversion")
}
break
case "xml":
// Simple XML conversion
const xmlData = typeof result.data === 'object' && result.data !== null
? Object.entries(result.data as Record<string, unknown>)
.map(([key, value]) => `<${key}>${String(value)}</${key}>`)
.join('\n')
: String(result.data)
converted = `<root>\n${xmlData}\n</root>`
size = converted.length
break
case "yaml":
// Simple YAML conversion
const yamlData = typeof result.data === 'object' && result.data !== null
? Object.entries(result.data as Record<string, unknown>)
.map(([key, value]) => `${key}: ${JSON.stringify(value)}`)
.join('\n')
: `data: ${JSON.stringify(result.data)}`
converted = yamlData
size = converted.length
break
}
return Channel.write({
data: converted,
format: targetFormat,
size,
originalFormat: result.format,
originalSize: result.size
})
} catch (error) {
return Channel.fail({
_tag: "ConversionError" as const,
input: JSON.stringify(result.data).slice(0, 100),
format: targetFormat,
reason: String(error)
})
}
})
)
// Create format converter pipeline
const createConverter = (targetFormat: "json" | "csv" | "xml" | "yaml") => {
const parseByFormat = Channel.readWith({
onInput: (detected: { input: string; format: string }) => {
switch (detected.format) {
case "json":
return Channel.write(detected as { input: string; format: "json" }).pipe(
Channel.pipeTo(parseJSON)
)
case "csv":
return Channel.write(detected as { input: string; format: "csv" }).pipe(
Channel.pipeTo(parseCSV)
)
default:
return Channel.fail({
_tag: "ConversionError" as const,
input: detected.input.slice(0, 100),
format: detected.format,
reason: `Unsupported format: ${detected.format}`
})
}
},
onFailure: Channel.fail,
onDone: () => Channel.void
})
return Channel.identity<string>().pipe(
Channel.pipeTo(Channel.repeated(detectFormat)),
Channel.pipeTo(Channel.repeated(parseByFormat)),
Channel.pipeTo(Channel.repeated(convertToFormat(targetFormat)))
)
}
// Batch converter with progress tracking
const createBatchConverter = (
inputs: string[],
targetFormat: "json" | "csv" | "xml" | "yaml"
) =>
Effect.gen(function* () {
const progress = yield* Ref.make({ processed: 0, errors: 0, total: inputs.length })
const converter = createConverter(targetFormat)
const processItem = (input: string) =>
Channel.write(input).pipe(
Channel.pipeTo(converter),
Channel.mapEffect(result =>
Ref.update(progress, p => ({ ...p, processed: p.processed + 1 })).pipe(
Effect.zipRight(Effect.succeed(result))
)
),
Channel.catchAll(error =>
Ref.update(progress, p => ({ ...p, errors: p.errors + 1 })).pipe(
Effect.zipRight(Effect.fail(error)),
Channel.fromEffect,
Channel.flatten
)
)
)
// Process all inputs through the converter
const results = yield*
Channel.writeAll(inputs).pipe(
Channel.pipeTo(
Channel.concatMap(processItem)
),
Channel.runCollect
)
)
const finalProgress = yield* Ref.get(progress)
return {
results: Chunk.toReadonlyArray(results),
progress: finalProgress
}
})Building a real-time event processing system that handles high-throughput data with custom aggregation and alerting.
import { Channel, Effect, Chunk, Queue, Ref, Schedule, Duration } from "effect"
// Event types
interface Event {
id: string
type: "user_action" | "system_event" | "error" | "metric"
timestamp: number
userId?: string
data: Record<string, unknown>
}
interface AggregatedMetrics {
windowStart: number
windowEnd: number
eventCounts: Record<string, number>
uniqueUsers: number
errorRate: number
avgProcessingTime: number
}
interface Alert {
type: "high_error_rate" | "unusual_activity" | "system_overload"
severity: "low" | "medium" | "high" | "critical"
message: string
timestamp: number
metrics: AggregatedMetrics
}
// Channel for event validation
const validateEvent = Channel.read<unknown>().pipe(
Channel.flatMap(input => {
try {
if (
typeof input === 'object' &&
input !== null &&
'id' in input &&
'type' in input &&
'timestamp' in input &&
'data' in input
) {
const event = input as Event
// Validate required fields
if (!event.id || !event.type || !event.timestamp) {
return Channel.fail(new Error("Missing required event fields"))
}
// Validate timestamp
if (event.timestamp > Date.now() + 60000) { // 1 minute future tolerance
return Channel.fail(new Error("Event timestamp is too far in the future"))
}
return Channel.write(event)
} else {
return Channel.fail(new Error("Invalid event format"))
}
} catch (error) {
return Channel.fail(new Error(`Event validation failed: ${error}`))
}
})
)
// Channel for event enrichment
const enrichEvent = Channel.read<Event>().pipe(
Channel.mapEffect(event =>
Effect.gen(function* () {
// Simulate enrichment with user data, geolocation, etc.
const enriched = {
...event,
enrichedAt: Date.now(),
processingTime: Date.now() - event.timestamp
}
// Simulate async enrichment
yield* Effect.sleep(Duration.millis(Math.random() * 10))
return enriched
})
)
)
// Channel for time-based windowing
const createTimeWindows = (windowSizeMs: number) => {
const collectWindow = (
events: Event[],
windowStart: number
): Channel.Channel<AggregatedMetrics, Event, never> => {
const windowEnd = windowStart + windowSizeMs
return Channel.readWith({
onInput: (event: Event) => {
if (event.timestamp < windowEnd) {
// Event belongs to current window
return collectWindow([...events, event], windowStart)
} else {
// Window is complete, emit aggregated metrics and start new window
const metrics = aggregateEvents(events, windowStart, windowEnd)
return Channel.write(metrics).pipe(
Channel.zipRight(
collectWindow([event], Math.floor(event.timestamp / windowSizeMs) * windowSizeMs)
)
)
}
},
onFailure: Channel.fail,
onDone: () =>
events.length > 0
? Channel.write(aggregateEvents(events, windowStart, Date.now()))
: Channel.void
})
}
return collectWindow([], Math.floor(Date.now() / windowSizeMs) * windowSizeMs)
}
// Helper function to aggregate events
const aggregateEvents = (
events: Event[],
windowStart: number,
windowEnd: number
): AggregatedMetrics => {
const eventCounts = events.reduce((acc, event) => {
acc[event.type] = (acc[event.type] || 0) + 1
return acc
}, {} as Record<string, number>)
const uniqueUsers = new Set(
events.filter(e => e.userId).map(e => e.userId!)
).size
const errorCount = eventCounts.error || 0
const totalEvents = events.length
const errorRate = totalEvents > 0 ? errorCount / totalEvents : 0
const processingTimes = events
.filter(e => 'processingTime' in e)
.map(e => (e as any).processingTime)
const avgProcessingTime = processingTimes.length > 0
? processingTimes.reduce((sum, time) => sum + time, 0) / processingTimes.length
: 0
return {
windowStart,
windowEnd,
eventCounts,
uniqueUsers,
errorRate,
avgProcessingTime
}
}
// Channel for alert generation
const generateAlerts = Channel.read<AggregatedMetrics>().pipe(
Channel.flatMap(metrics => {
const alerts: Alert[] = []
// High error rate alert
if (metrics.errorRate > 0.1) { // 10% error rate threshold
alerts.push({
type: "high_error_rate",
severity: metrics.errorRate > 0.5 ? "critical" : "high",
message: `High error rate detected: ${(metrics.errorRate * 100).toFixed(2)}%`,
timestamp: Date.now(),
metrics
})
}
// Unusual activity alert
const totalEvents = Object.values(metrics.eventCounts).reduce((sum, count) => sum + count, 0)
if (totalEvents > 1000) { // Threshold for unusual activity
alerts.push({
type: "unusual_activity",
severity: totalEvents > 5000 ? "high" : "medium",
message: `Unusual activity detected: ${totalEvents} events in window`,
timestamp: Date.now(),
metrics
})
}
// System overload alert
if (metrics.avgProcessingTime > 1000) { // 1 second processing time threshold
alerts.push({
type: "system_overload",
severity: metrics.avgProcessingTime > 5000 ? "critical" : "high",
message: `System overload detected: ${metrics.avgProcessingTime.toFixed(0)}ms avg processing time`,
timestamp: Date.now(),
metrics
})
}
if (alerts.length > 0) {
return Channel.writeAll(alerts)
} else {
return Channel.write(metrics) // Pass through metrics without alerts
}
})
)
// Complete event processing pipeline
const createEventProcessor = (windowSizeMs: number = 60000) => {
return Channel.identity<unknown>().pipe(
Channel.pipeTo(Channel.repeated(validateEvent)),
Channel.pipeTo(Channel.repeated(enrichEvent)),
Channel.pipeTo(createTimeWindows(windowSizeMs)),
Channel.pipeTo(Channel.repeated(generateAlerts))
)
}
// Real-time event processor with queues
const createRealTimeProcessor = (
eventQueue: Queue.Queue<unknown>,
alertQueue: Queue.Queue<Alert>,
windowSizeMs: number = 60000
) =>
Effect.gen(function* () {
const processor = createEventProcessor(windowSizeMs)
// Event ingestion fiber
const ingestionFiber = yield*
Channel.fromQueue(eventQueue).pipe(
Channel.pipeTo(processor),
Channel.mapEffect(result => {
if ('type' in result && 'severity' in result) {
// This is an alert
return Queue.offer(alertQueue, result as Alert)
}
// This is aggregated metrics, could be stored or processed further
return Effect.log(`Metrics: ${JSON.stringify(result)}`)
}),
Channel.runDrain,
Effect.fork
)
)
// Alert processing fiber
const alertFiber = yield*
Channel.fromQueue(alertQueue).pipe(
Channel.mapEffect(alert =>
Effect.gen(function* () {
yield* Effect.log(`🚨 ALERT [${alert.severity.toUpperCase()}]: ${alert.message}`))
// Send to external alerting system
if (alert.severity === "critical" || alert.severity === "high") {
yield*
Effect.tryPromise({
try: () => fetch('/api/alerts', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify(alert)
}),
catch: () => new Error("Failed to send alert")
}).pipe(
Effect.retry(Schedule.exponential(Duration.seconds(1), 2).pipe(
Schedule.compose(Schedule.recurs(3))
)),
Effect.catchAll(error =>
Effect.log(`Failed to send alert after retries: ${error}`)
)
)
)
}
})
),
Channel.runDrain,
Effect.fork
)
)
return { ingestionFiber, alertFiber }
})
// Usage example with simulated event stream
const simulateEventStream = Effect.gen(function* () {
const eventQueue = yield* Queue.unbounded<unknown>()
const alertQueue = yield* Queue.unbounded<Alert>()
// Start the processor
const { ingestionFiber, alertFiber } = yield*
createRealTimeProcessor(eventQueue, alertQueue, 10000) // 10-second windows
)
// Simulate events
const eventGenerator = yield*
Effect.gen(function* () {
for (let i = 0; i < 100; i++) {
const event: Event = {
id: `event-${i}`,
type: Math.random() > 0.8 ? "error" : "user_action", // 20% error rate
timestamp: Date.now(),
userId: `user-${Math.floor(Math.random() * 10)}`,
data: { value: Math.random() * 100 }
}
yield* Queue.offer(eventQueue, event)
yield* Effect.sleep(Duration.millis(100)) // 10 events per second
}
}).pipe(Effect.fork)
)
// Run for 30 seconds
yield* Effect.sleep(Duration.seconds(30))
// Shutdown
yield* Queue.shutdown(eventQueue)
yield* Queue.shutdown(alertQueue)
yield* Fiber.interrupt(eventGenerator)
yield* Fiber.interrupt(ingestionFiber)
yield* Fiber.interrupt(alertFiber)
yield* Effect.log("Event processing simulation complete")
})Channels provide sophisticated composition mechanisms that allow you to build complex data processing pipelines from simple building blocks.
import { Channel, Effect, Chunk } from "effect"
// Basic pipe operations
const simpleTransform = Channel.read<string>().pipe(
Channel.map(input => input.toUpperCase()),
Channel.flatMap(upper => Channel.write(upper))
)
// Pipe one channel to another
const pipeline = Channel.write("hello world").pipe(
Channel.pipeTo(simpleTransform)
)
// Multiple stage pipeline
const complexPipeline = Channel.write("input").pipe(
Channel.pipeTo(
Channel.readWith({
onInput: (input: string) => Channel.write(input.split('')),
onFailure: Channel.fail,
onDone: () => Channel.void
})
),
Channel.pipeTo(
Channel.readWith({
onInput: (chars: string[]) =>
Channel.writeAll(chars.map(c => c.toUpperCase())),
onFailure: Channel.fail,
onDone: () => Channel.void
})
)
)interface ValidationError {
field: string
message: string
value: unknown
}
interface ValidationResult<T> {
data: T
errors: ValidationError[]
warnings: string[]
}
// Channel for field validation
const validateField = <T>(
fieldName: string,
validator: (value: unknown) => { isValid: boolean; error?: string; warning?: string }
) =>
Channel.read<{ field: string; value: unknown }>().pipe(
Channel.flatMap(({ field, value }) => {
if (field !== fieldName) {
// Pass through fields we don't validate
return Channel.write({ field, value, valid: true })
}
const result = validator(value)
return Channel.write({
field,
value,
valid: result.isValid,
error: result.error,
warning: result.warning
})
})
)
// Compose multiple validators
const createValidationPipeline = <T extends Record<string, unknown>>(
schema: Record<keyof T, (value: unknown) => { isValid: boolean; error?: string; warning?: string }>
) => {
const validators = Object.entries(schema).map(([field, validator]) =>
validateField(field, validator)
)
// Create a channel that applies all validators
const applyValidators = Channel.read<T>().pipe(
Channel.flatMap(data => {
// Convert object to field-value pairs
const fieldPairs = Object.entries(data).map(([field, value]) => ({ field, value }))
return Channel.writeAll(fieldPairs)
}),
Channel.pipeTo(
// Apply all validators in sequence
validators.reduce(
(pipeline, validator) => pipeline.pipe(Channel.pipeTo(Channel.repeated(validator))),
Channel.identity<{ field: string; value: unknown }>()
)
),
// Collect validation results
Channel.pipeTo(
Channel.repeated(
Channel.read<{ field: string; value: unknown; valid: boolean; error?: string; warning?: string }>().pipe(
Channel.map(result => result)
)
)
)
)
return applyValidators
}
// Usage example
interface UserData {
email: string
age: number
name: string
}
const userValidationSchema = {
email: (value: unknown) => ({
isValid: typeof value === 'string' && /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value),
error: typeof value !== 'string' ? "Email must be a string" :
!/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(value) ? "Invalid email format" : undefined
}),
age: (value: unknown) => ({
isValid: typeof value === 'number' && value >= 0 && value <= 150,
error: typeof value !== 'number' ? "Age must be a number" :
value < 0 || value > 150 ? "Age must be between 0 and 150" : undefined,
warning: typeof value === 'number' && value > 100 ? "Unusually high age" : undefined
}),
name: (value: unknown) => ({
isValid: typeof value === 'string' && value.length > 0,
error: typeof value !== 'string' ? "Name must be a string" :
value.length === 0 ? "Name cannot be empty" : undefined,
warning: typeof value === 'string' && value.length < 2 ? "Very short name" : undefined
})
}
const validateUser = createValidationPipeline(userValidationSchema)Build reusable channel operators that encapsulate common patterns and can be composed with other channels.
// Custom operator for rate limiting
const rateLimit = <A>(
requestsPerSecond: number
) => (input: Channel.Channel<A, A, never>) => {
const intervalMs = 1000 / requestsPerSecond
return input.pipe(
Channel.mapEffect(value =>
Effect.sleep(Duration.millis(intervalMs)).pipe(
Effect.zipRight(Effect.succeed(value))
)
)
)
}
// Custom operator for batching
const batch = <A>(
batchSize: number,
timeout: Duration.Duration
) => (input: Channel.Channel<A, never, never>) => {
const collectBatch = (
collected: A[],
remaining: number,
startTime: number
): Channel.Channel<Chunk.Chunk<A>, A, never> => {
const elapsed = Date.now() - startTime
if (remaining === 0 || Duration.toMillis(timeout) <= elapsed) {
return Channel.succeed(Chunk.fromIterable(collected))
}
return Channel.readWith({
onInput: (item: A) =>
collectBatch([...collected, item], remaining - 1, startTime),
onFailure: Channel.fail,
onDone: () =>
collected.length > 0
? Channel.succeed(Chunk.fromIterable(collected))
: Channel.succeed(Chunk.empty())
})
}
return input.pipe(
Channel.pipeTo(
Channel.repeated(
Channel.suspend(() => collectBatch([], batchSize, Date.now()))
)
)
)
}
// Custom operator for deduplication
const deduplicate = <A>(
keyExtractor: (item: A) => string,
windowSize: number = 1000
) => (input: Channel.Channel<A, never, never>) =>
Effect.gen(function* () {
const seenKeys = yield* Ref.make(new Set<string>())
return input.pipe(
Channel.mapEffect(item =>
Effect.gen(function* () {
const key = keyExtractor(item)
const seen = yield* Ref.get(seenKeys)
if (seen.has(key)) {
return Option.none<A>()
} else {
// Add key and cleanup old keys if needed
const newSeen = new Set(seen).add(key)
if (newSeen.size > windowSize) {
// Simple cleanup: remove half the keys
const keysArray = Array.from(newSeen)
const keepCount = Math.floor(windowSize / 2)
const keptKeys = new Set(keysArray.slice(-keepCount))
yield* Ref.set(seenKeys, keptKeys)
} else {
yield* Ref.set(seenKeys, newSeen)
}
return Option.some(item)
}
})
),
Channel.mapEffect(optItem =>
optItem._tag === "Some"
? Effect.succeed(optItem.value)
: Effect.fail("duplicate") // This will be filtered out
),
Channel.catchAll(() => Channel.void) // Skip duplicates
)
}).pipe(Channel.unwrap)
// Usage example with custom operators
const createProcessingPipeline = <T>(
keyExtractor: (item: T) => string
) => {
return rateLimit(10)(Channel.identity<T>()).pipe(
deduplicate(keyExtractor, 500), // Dedupe within 500 items
batch(50, Duration.seconds(5)) // Batch up to 50 items or 5 seconds
)
}// Custom operator for metrics collection
const withMetrics = <A>(
metricName: string
) => (input: Channel.Channel<A, never, never>) =>
Effect.gen(function* () {
const processedCount = yield* Ref.make(0)
const errorCount = yield* Ref.make(0)
const startTime = yield* Ref.make(Date.now())
// Start metrics reporting
const metricsReporter = yield*
Effect.gen(function* () {
const processed = yield* Ref.get(processedCount)
const errors = yield* Ref.get(errorCount)
const start = yield* Ref.get(startTime)
const elapsed = Date.now() - start
const rate = elapsed > 0 ? (processed / elapsed) * 1000 : 0
yield* Effect.log(
`[${metricName}] Processed: ${processed}, Errors: ${errors}, Rate: ${rate.toFixed(2)}/sec`
))
}).pipe(
Effect.repeat(Schedule.spaced(Duration.seconds(10))),
Effect.fork
)
)
return input.pipe(
Channel.mapEffect(item =>
Ref.update(processedCount, n => n + 1).pipe(
Effect.zipRight(Effect.succeed(item))
)
),
Channel.catchAll(error =>
Ref.update(errorCount, n => n + 1).pipe(
Effect.zipRight(Effect.fail(error)),
Channel.fromEffect,
Channel.flatten
)
),
Channel.ensuring(
Fiber.interrupt(metricsReporter).pipe(
Effect.zipRight(
Effect.gen(function* () {
const final = yield* Ref.get(processedCount)
const finalErrors = yield* Ref.get(errorCount)
yield* Effect.log(`[${metricName}] Final: ${final} processed, ${finalErrors} errors`)
})
)
)
)
)
}).pipe(Channel.unwrap)
// Custom operator for circuit breaker
const withCircuitBreaker = <A>(
failureThreshold: number,
resetTimeoutMs: number
) => (input: Channel.Channel<A, Error, never>) =>
Effect.gen(function* () {
const state = yield* Ref.make<
| { type: "closed"; failures: number }
| { type: "open"; openedAt: number }
| { type: "halfOpen" }
>({ type: "closed", failures: 0 })
const checkState = Effect.gen(function* () {
const current = yield* Ref.get(state)
switch (current.type) {
case "open":
if (Date.now() - current.openedAt > resetTimeoutMs) {
yield* Ref.set(state, { type: "halfOpen" })
return true
}
return false
case "halfOpen":
case "closed":
return true
}
})
const recordSuccess = Ref.update(state, current =>
current.type === "halfOpen" || current.type === "closed"
? { type: "closed", failures: 0 }
: current
)
const recordFailure = Ref.update(state, current => {
switch (current.type) {
case "closed":
return current.failures + 1 >= failureThreshold
? { type: "open", openedAt: Date.now() }
: { type: "closed", failures: current.failures + 1 }
case "halfOpen":
return { type: "open", openedAt: Date.now() }
default:
return current
}
})
return input.pipe(
Channel.filterEffect(() => checkState),
Channel.tap(() => recordSuccess),
Channel.catchAll(error =>
recordFailure.pipe(
Effect.zipRight(Effect.fail(error)),
Channel.fromEffect,
Channel.flatten
)
)
)
}).pipe(Channel.unwrap)Optimize channel performance for high-throughput scenarios with advanced buffering and concurrency techniques.
// Channel for chunk-based processing
const processInChunks = <A, B>(
chunkSize: number,
processor: (chunk: Chunk.Chunk<A>) => Effect.Effect<Chunk.Chunk<B>, Error>
) => {
const collectChunk = (
collected: A[],
remaining: number
): Channel.Channel<Chunk.Chunk<B>, A, Error> => {
if (remaining === 0) {
return Channel.fromEffect(
processor(Chunk.fromIterable(collected))
).pipe(Channel.flatten)
}
return Channel.readWith({
onInput: (item: A) =>
collectChunk([...collected, item], remaining - 1),
onFailure: Channel.fail,
onDone: () =>
collected.length > 0
? Channel.fromEffect(
processor(Chunk.fromIterable(collected))
).pipe(Channel.flatten)
: Channel.succeed(Chunk.empty<B>())
})
}
return Channel.repeated(
Channel.suspend(() => collectChunk([], chunkSize))
)
}
// Memory-efficient file processing
const processLargeFile = (
filePath: string,
chunkSize: number = 1024 * 64 // 64KB chunks
) =>
Effect.gen(function* () {
const fileSystem = yield* NodeFS.NodeFileSystem
const processor = Channel.unwrapScoped(
Effect.gen(function* () {
const handle = yield* fileSystem.open(filePath, { flag: "r" })
const readChunk = (offset: number): Channel.Channel<Buffer, never, Error> =>
Channel.fromEffect(
Effect.gen(function* () {
const { bytesRead, buffer } = yield* fileSystem.read(handle, { buffer: Buffer.alloc(chunkSize), offset })
if (bytesRead === 0) {
return yield* Effect.fail(new Error("EOF"))
}
return buffer.subarray(0, bytesRead)
})
).pipe(
Channel.catchAll(error =>
error.message === "EOF"
? Channel.void
: Channel.fail(error)
),
Channel.flatMap(buffer =>
Channel.write(buffer).pipe(
Channel.zipRight(readChunk(offset + bytesRead))
)
)
)
return readChunk(0)
})
)
return processor
}).pipe(Channel.unwrap)
// High-performance parallel processing
const parallelProcess = <A, B>(
concurrency: number,
processor: (item: A) => Effect.Effect<B, Error>
) => (input: Channel.Channel<A, never, never>) =>
Effect.gen(function* () {
const semaphore = yield* Effect.makeSemaphore(concurrency)
return input.pipe(
Channel.mapEffect(item =>
semaphore.withPermit(processor(item))
)
)
}).pipe(Channel.unwrap)
// Usage example: High-performance data transformation
const createHighThroughputProcessor = <T, R>(
transformer: (batch: Chunk.Chunk<T>) => Effect.Effect<Chunk.Chunk<R>, Error>
) => {
return processInChunks(1000, transformer)(Channel.identity<T>()).pipe(
Channel.mapOut(chunk => Chunk.toReadonlyArray(chunk).forEach(item => item)), // Flatten output
withMetrics("high-throughput-processor"),
rateLimit(1000) // Limit to 1000 items/sec if needed
)
}Ensure proper resource management and cleanup in channel-based operations.
import { Channel, Effect, Scope, Resource, Duration } from "effect"
// Helper for resource-safe file operations
const withFileHandle = <A>(
filePath: string,
operation: (handle: NodeFS.FileHandle) => Channel.Channel<A, never, Error>
): Channel.Channel<A, never, Error> =>
Channel.unwrapScoped(
Effect.gen(function* () {
const fs = yield* NodeFS.NodeFileSystem
const handle = yield* fs.open(filePath, { flag: "r" })
yield* Effect.addFinalizer(() =>
Effect.log(`Closing file: ${filePath}`).pipe(
Effect.zipRight(fs.close(handle))
)
)
return operation(handle)
})
)
// Resource-safe database channel
const withDatabaseConnection = <A>(
connectionString: string,
query: string,
operation: (rows: unknown[]) => Channel.Channel<A, never, Error>
): Channel.Channel<A, never, Error> =>
Channel.unwrapScoped(
Effect.gen(function* () {
// Simulate database connection
const connection = yield*
Effect.acquireRelease(
Effect.tryPromise({
try: () => createConnection(connectionString),
catch: () => new Error("Failed to connect to database")
}),
(conn) => Effect.tryPromise({
try: () => conn.close(),
catch: () => new Error("Failed to close connection")
}).pipe(Effect.orDie)
)
)
const rows = yield*
Effect.tryPromise({
try: () => connection.query(query),
catch: () => new Error("Query failed")
})
)
return operation(rows)
})
)
// Pattern for graceful shutdown
const createShutdownHandler = <A>(
channel: Channel.Channel<A, never, Error>,
shutdownSignal: Promise<void>
) =>
Channel.raceWith(
channel,
Channel.fromEffect(
Effect.tryPromise({
try: () => shutdownSignal,
catch: () => new Error("Shutdown signal failed")
})
),
(channelExit, signalFiber) =>
Effect.zipRight(
Fiber.interrupt(signalFiber),
Effect.succeed(channelExit)
),
(signalExit, channelFiber) =>
Effect.gen(function* () {
yield* Effect.log("Shutdown signal received, terminating channel")
yield* Fiber.interrupt(channelFiber)
return signalExit
})
)
// Complete resource-safe pattern
const createResourceSafePipeline = (
inputFile: string,
outputFile: string,
dbConnectionString: string
) =>
withFileHandle(inputFile, (inputHandle) =>
withFileHandle(outputFile, (outputHandle) =>
withDatabaseConnection(dbConnectionString, "SELECT * FROM config", (config) =>
Channel.gen(function* () {
// Process data with all resources properly managed
const data = yield*
Channel.fromEffect(
inputHandle.readFile().pipe(
Effect.map(buffer => buffer.toString())
)
)
)
const processed = yield*
Channel.succeed(data.toUpperCase())
)
yield*
Channel.fromEffect(
outputHandle.writeFile(processed)
)
)
return processed.length
})
)
)
)Build robust channels that handle errors gracefully and recover from failures.
// Exponential backoff retry pattern
const withExponentialBackoff = <A>(
maxAttempts: number,
initialDelay: Duration.Duration = Duration.seconds(1),
maxDelay: Duration.Duration = Duration.minutes(1)
) => (channel: Channel.Channel<A, Error, never>) => {
const attemptWithBackoff = (attempt: number): Channel.Channel<A, Error, never> => {
if (attempt >= maxAttempts) {
return channel
}
return channel.pipe(
Channel.catchAll(error => {
const delay = Duration.min(
Duration.times(initialDelay, Math.pow(2, attempt)),
maxDelay
)
return Channel.fromEffect(
Effect.gen(function* () {
yield* Effect.log(`Attempt ${attempt + 1} failed: ${error.message}`)
yield* Effect.log(`Retrying in ${Duration.toMillis(delay)}ms`))
yield* Effect.sleep(delay)
})
).pipe(
Channel.zipRight(attemptWithBackoff(attempt + 1))
)
})
)
}
return attemptWithBackoff(0)
}
// Fallback chain pattern
const withFallbacks = <A>(
primary: Channel.Channel<A, Error, never>,
fallbacks: Channel.Channel<A, Error, never>[]
) => {
const tryFallbacks = (index: number): Channel.Channel<A, Error, never> => {
if (index >= fallbacks.length) {
return Channel.fail(new Error("All fallbacks exhausted"))
}
return fallbacks[index].pipe(
Channel.catchAll(error => {
Effect.log(`Fallback ${index} failed: ${error.message}`).pipe(
Effect.zipRight(tryFallbacks(index + 1))
).pipe(Channel.fromEffect, Channel.flatten)
})
)
}
return primary.pipe(
Channel.catchAll(error =>
Effect.log(`Primary channel failed: ${error.message}`).pipe(
Effect.zipRight(tryFallbacks(0))
).pipe(Channel.fromEffect, Channel.flatten)
)
)
}
// Dead letter queue pattern
const withDeadLetterQueue = <A>(
deadLetterQueue: Queue.Queue<{ item: A; error: Error; timestamp: number }>
) => (channel: Channel.Channel<A, A, Error>) =>
channel.pipe(
Channel.catchAll(error =>
Channel.readWith({
onInput: (item: A) =>
Channel.fromEffect(
Queue.offer(deadLetterQueue, {
item,
error,
timestamp: Date.now()
})
).pipe(
Channel.zipRight(Channel.fail(error))
),
onFailure: Channel.fail,
onDone: () => Channel.void
})
)
)
// Complete resilience pattern
const createResilientChannel = <A>(
unreliableChannel: Channel.Channel<A, Error, never>,
config: {
maxRetries: number
fallbacks: Channel.Channel<A, Error, never>[]
deadLetterQueue?: Queue.Queue<{ item: A; error: Error; timestamp: number }>
}
) => {
let resilientChannel = unreliableChannel.pipe(
withExponentialBackoff(config.maxRetries),
withFallbacks(unreliableChannel, config.fallbacks)
)
if (config.deadLetterQueue) {
resilientChannel = withDeadLetterQueue(config.deadLetterQueue)(resilientChannel)
}
return resilientChannel
}Create comprehensive testing utilities for channel-based operations.
import { TestClock, TestContext, Fiber } from "effect"
// Helper for testing channel outputs
const collectChannelOutput = <A>(
channel: Channel.Channel<A, never, never>
) =>
Channel.runCollect(channel).pipe(
Effect.map(chunk => Chunk.toReadonlyArray(chunk))
)
// Helper for testing time-based channels
const testChannelWithTime = <A>(
channel: Channel.Channel<A, never, never>,
timeAdvances: Duration.Duration[]
) =>
Effect.gen(function* () {
const fiber = yield*
collectChannelOutput(channel).pipe(Effect.fork)
)
for (const advance of timeAdvances) {
yield* TestClock.adjust(advance)
}
const result = yield* Fiber.join(fiber)
return result
})
// Mock channel for testing
const createMockChannel = <A>(
values: A[],
delays?: Duration.Duration[],
shouldFail?: boolean
) => {
const emitValues = (index: number): Channel.Channel<A, never, Error> => {
if (index >= values.length) {
return shouldFail
? Channel.fail(new Error("Mock channel failed"))
: Channel.void
}
const value = values[index]
const delay = delays?.[index] || Duration.zero
return Channel.fromEffect(
Effect.sleep(delay).pipe(
Effect.zipRight(Effect.succeed(value))
)
).pipe(
Channel.flatMap(v =>
Channel.write(v).pipe(
Channel.zipRight(emitValues(index + 1))
)
)
)
}
return emitValues(0)
}
// Property-based testing helpers
const channelProperty = <A, B>(
generator: () => Channel.Channel<A, never, never>,
property: (output: ReadonlyArray<A>) => boolean
) =>
Effect.gen(function* () {
const channel = generator()
const output = yield* collectChannelOutput(channel)
if (!property(output)) {
return yield* Effect.fail(
new Error(`Property failed for output: ${JSON.stringify(output)}`)
))
}
return true
})
// Example test suite
const testChannelOperations = Effect.gen(function* () {
// Test basic channel operations
const basicTest = yield*
collectChannelOutput(
Channel.writeAll([1, 2, 3]).pipe(
Channel.pipeTo(
Channel.repeated(
Channel.read<number>().pipe(
Channel.map(n => n * 2)
)
)
)
)
)
)
console.assert(
JSON.stringify(basicTest) === JSON.stringify([2, 4, 6]),
"Basic transformation failed"
)
// Test error handling
const errorTest = yield*
Channel.writeAll([1, 2, 3]).pipe(
Channel.pipeTo(
Channel.repeated(
Channel.read<number>().pipe(
Channel.flatMap(n =>
n === 2
? Channel.fail(new Error("Test error"))
: Channel.write(n * 2)
)
)
)
),
Channel.catchAll(() => Channel.write(-1)),
collectChannelOutput
)
)
console.assert(
errorTest.includes(-1),
"Error handling failed"
)
// Test time-based operations
const timeTest = yield*
testChannelWithTime(
createMockChannel(
[1, 2, 3],
[Duration.seconds(1), Duration.seconds(1), Duration.seconds(1)]
),
[Duration.seconds(3)]
)
)
console.assert(
timeTest.length === 3,
"Time-based channel failed"
)
yield* Effect.log("All channel tests passed")
}).pipe(Effect.provide(TestContext.TestContext))Channel serves as the foundation for Effect's Stream and Sink, enabling seamless conversion and interoperability.
import { Channel, Stream, Sink, Effect, Chunk } from "effect"
// Convert Channel to Stream
const channelToStreamExample = Effect.gen(function* () {
const dataChannel = Channel.writeAll([1, 2, 3, 4, 5]).pipe(
Channel.pipeTo(
Channel.repeated(
Channel.read<number>().pipe(
Channel.map(n => n * 2)
)
)
)
)
const stream = Channel.toStream(dataChannel)
const result = yield*
stream.pipe(
Stream.runCollect
)
)
return Chunk.toReadonlyArray(result)
})
// Create custom Sink using Channel
const createBatchingSink = <A>(
batchSize: number,
process: (batch: Chunk.Chunk<A>) => Effect.Effect<void, Error>
) =>
Sink.fromChannel(
Channel.repeated(
Channel.suspend(() => {
const collectBatch = (
collected: A[],
remaining: number
): Channel.Channel<void, A, Error> => {
if (remaining === 0) {
return Channel.fromEffect(
process(Chunk.fromIterable(collected))
)
}
return Channel.readWith({
onInput: (item: A) =>
collectBatch([...collected, item], remaining - 1),
onFailure: Channel.fail,
onDone: () =>
collected.length > 0
? Channel.fromEffect(process(Chunk.fromIterable(collected)))
: Channel.void
})
}
return collectBatch([], batchSize)
})
)
)
// Advanced: Custom streaming operators using Channel
const createStreamingETL = <A, B, C>(
extractor: Channel.Channel<A, never, Error>,
transformer: (item: A) => Effect.Effect<B, Error>,
loader: Sink.Sink<void, B, Error>
) =>
Effect.gen(function* () {
const transformChannel = Channel.repeated(
Channel.read<A>().pipe(
Channel.mapEffect(transformer)
)
)
const pipeline = extractor.pipe(
Channel.pipeTo(transformChannel)
)
const stream = Channel.toStream(pipeline)
yield*
stream.pipe(
Stream.run(loader)
)
)
})
// Real-world example: Log processing with Stream integration
const processLogsWithStreams = (logFiles: string[]) =>
Effect.gen(function* () {
// Create channels for each log file
const logChannels = logFiles.map(file =>
Channel.unwrapScoped(
Effect.gen(function* () {
const fs = yield* NodeFS.NodeFileSystem
const stream = yield* fs.stream(file)
return Channel.fromReadableStream(
() => stream,
() => new Error(`Failed to read ${file}`)
).pipe(
Channel.pipeTo(Channel.splitLines)
)
})
)
)
// Merge all log channels
const mergedChannel = Channel.mergeAll(logChannels)
// Convert to stream and process
const logStream = Channel.toStream(mergedChannel)
const processedLogs = yield*
logStream.pipe(
Stream.filter(line => line.includes("ERROR")),
Stream.map(line => ({
timestamp: new Date(),
level: "ERROR",
message: line,
source: "merged-logs"
})),
Stream.groupedWithin(100, Duration.seconds(5)),
Stream.tap(batch =>
Effect.log(`Processing batch of ${batch.length} error logs`)
),
Stream.runCollect
)
)
return processedLogs
})import { TestServices, TestClock, TestRandom } from "effect"
// Channel testing utilities
const createChannelTestSuite = () => {
const testBasicOperations = Effect.gen(function* () {
// Test channel composition
const pipeline = Channel.write("hello").pipe(
Channel.pipeTo(
Channel.readWith({
onInput: (input: string) => Channel.write(input.toUpperCase()),
onFailure: Channel.fail,
onDone: () => Channel.void
})
)
)
const result = yield* Channel.runCollect(pipeline)
const output = Chunk.toReadonlyArray(result)
expect(output).toEqual(["HELLO"])
})
const testErrorHandling = Effect.gen(function* () {
const failingChannel = Channel.read<string>().pipe(
Channel.flatMap(input =>
input === "fail"
? Channel.fail(new Error("Test failure"))
: Channel.write(input.toUpperCase())
)
)
const resilientChannel = Channel.write("fail").pipe(
Channel.pipeTo(failingChannel),
Channel.catchAll(() => Channel.write("RECOVERED"))
)
const result = yield* Channel.runCollect(resilientChannel)
const output = Chunk.toReadonlyArray(result)
expect(output).toEqual(["RECOVERED"])
})
const testConcurrency = Effect.gen(function* () {
const concurrentChannel = Channel.mergeAll([
Channel.write("A"),
Channel.write("B"),
Channel.write("C")
])
const result = yield* Channel.runCollect(concurrentChannel)
const output = Chunk.toReadonlyArray(result)
expect(output.length).toBe(3)
expect(new Set(output)).toEqual(new Set(["A", "B", "C"]))
})
const testResourceManagement = Effect.gen(function* () {
const resourceAcquired = yield* Ref.make(false)
const resourceReleased = yield* Ref.make(false)
const resourceChannel = Channel.unwrapScoped(
Effect.gen(function* () {
yield*
Effect.acquireRelease(
Ref.set(resourceAcquired, true),
() => Ref.set(resourceReleased, true)
)
)
return Channel.write("resource-data")
})
)
const result = yield* Channel.runCollect(resourceChannel)
const acquired = yield* Ref.get(resourceAcquired)
const released = yield* Ref.get(resourceReleased)
expect(acquired).toBe(true)
expect(released).toBe(true)
expect(Chunk.toReadonlyArray(result)).toEqual(["resource-data"])
})
return {
testBasicOperations,
testErrorHandling,
testConcurrency,
testResourceManagement
}
}
// Property-based testing for channels
const channelProperties = Effect.gen(function* () {
// Property: Channel composition is associative
const testAssociativity = Effect.gen(function* () {
const a = Channel.write(1)
const b = Channel.read<number>().pipe(Channel.map(n => n + 1))
const c = Channel.read<number>().pipe(Channel.map(n => n * 2))
// (a |> b) |> c
const left = a.pipe(
Channel.pipeTo(b),
Channel.pipeTo(c)
)
// a |> (b |> c)
const right = a.pipe(
Channel.pipeTo(
b.pipe(Channel.pipeTo(c))
)
)
const leftResult = yield* Channel.runCollect(left)
const rightResult = yield* Channel.runCollect(right)
expect(leftResult).toEqual(rightResult)
})
// Property: Channel identity
const testIdentity = Effect.gen(function* () {
const data = [1, 2, 3, 4, 5]
const channel = Channel.writeAll(data)
const withIdentity = channel.pipe(
Channel.pipeTo(Channel.identity<number>())
)
const original = yield* Channel.runCollect(channel)
const withId = yield* Channel.runCollect(withIdentity)
expect(original).toEqual(withId)
})
return { testAssociativity, testIdentity }
})
// Integration testing with mock services
const integrationTests = Effect.gen(function* () {
const mockFileService = {
readFile: (path: string) =>
Effect.succeed(`Mock content for ${path}`),
writeFile: (path: string, content: string) =>
Effect.log(`Writing to ${path}: ${content}`)
}
const fileProcessingChannel = Channel.unwrapScoped(
Effect.gen(function* () {
const content = yield* mockFileService.readFile("input.txt")
return Channel.write(content).pipe(
Channel.pipeTo(
Channel.readWith({
onInput: (input: string) =>
Channel.fromEffect(
mockFileService.writeFile("output.txt", input.toUpperCase())
).pipe(
Channel.zipRight(Channel.write(input.length))
),
onFailure: Channel.fail,
onDone: () => Channel.void
})
)
)
})
)
const result = yield* Channel.runCollect(fileProcessingChannel)
const output = Chunk.toReadonlyArray(result)
expect(output[0]).toBeGreaterThan(0) // Content length
})
// Run all tests
const runChannelTests = Effect.gen(function* () {
const suite = createChannelTestSuite()
const properties = channelProperties
yield* suite.testBasicOperations
yield* suite.testErrorHandling
yield* suite.testConcurrency
yield* suite.testResourceManagement
yield* properties.testAssociativity
yield* properties.testIdentity
yield* integrationTests
yield* Effect.log("All channel tests completed successfully")
}).pipe(
Effect.provide(TestServices.TestServices)
)Channel provides the foundational low-level streaming primitives that power Effect's entire streaming ecosystem, offering precise control over data flow, resource management, and error handling.
Key benefits:
- Precise Control - Fine-grained control over input/output types, error handling, and resource management
- Memory Efficiency - Pull-based processing model prevents memory overflow with large datasets
- Type Safety - Complete type safety through the entire processing pipeline with separate error channels
- Composability - Rich composition primitives for building complex processing pipelines from simple building blocks
- Performance - Low-level optimizations for high-throughput scenarios with built-in backpressure
- Foundation - Serves as the building blocks for Stream and Sink, enabling seamless integration
Channel excels in scenarios requiring custom streaming operators, performance-critical data processing, specialized I/O patterns, and when you need precise control over the streaming behavior that higher-level abstractions like Stream don't provide.