← Back to Blog

Pipeline Sketches: Design Patterns

September 10, 2025 languagepatternspipelines

After writing thousands of lines of pipeline-heavy Lateralus code, certain patterns emerge repeatedly. This is a collection of idioms and design patterns that have proven useful across domains.

◉ Transform-Filter-Reduce

The most common pattern. Transform data, filter out what you don't need, reduce to a result.

// Count words over 5 characters
let long_word_count = text
    |> split(" ")
    |> map(fn(w) { w |> trim() |> lowercase() })
    |> filter(fn(w) { len(w) > 5 })
    |> len()

// Sum of squared even numbers
let sum_sq_evens = numbers
    |> filter(fn(n) { n % 2 == 0 })
    |> map(fn(n) { n * n })
    |> sum()

◉ Fanout-Fanin

Process data through multiple paths, then merge results. Useful for parallel processing or when you need multiple derived values.

// Compute multiple statistics in parallel
fn analyze(data: [float]) -> Stats {
    let results = data |> tee(
        fn(d) { ("min", min(d)) },
        fn(d) { ("max", max(d)) },
        fn(d) { ("mean", mean(d)) },
        fn(d) { ("stddev", stddev(d)) }
    )

    Stats {
        min: results["min"],
        max: results["max"],
        mean: results["mean"],
        stddev: results["stddev"]
    }
}

// Multiple enrichment paths
fn enrich_user(user: User) -> EnrichedUser {
    user |> tee(
        fetch_profile,
        fetch_permissions,
        fetch_preferences
    ) |> merge_into(EnrichedUser)

◉ Accumulating Pipeline

Build up context as you move through stages. Each stage adds to the result rather than replacing it.

// Build a report with progressive enrichment
fn build_report(order_id: int) -> Report {
    { id: order_id }
        |> add_order_details()
        |> add_customer_info()
        |> add_shipping_status()
        |> add_payment_history()
        |> format_report()
}

fn add_order_details(ctx: map) -> map {
    let order = db.get_order(ctx.id)
    { ...ctx, order: order, items: order.items }
}

fn add_customer_info(ctx: map) -> map {
    let customer = db.get_customer(ctx.order.customer_id)
    { ...ctx, customer: customer }
}

◉ Conditional Pipeline

Branch pipeline flow based on conditions. Use then_if or pattern matching.

// Apply discount only if eligible
fn process_order(order: Order) -> Order {
    order
        |> validate()
        |> then_if(_.total > 100, apply_bulk_discount)
        |> then_if(_.customer.is_member, apply_member_discount)
        |> calculate_shipping()
        |> finalize()
}

// Pattern-based branching
fn route_message(msg: Message) -> Result {
    msg |> match {
        Message::Email(_) => send_email,
        Message::SMS(_) => send_sms,
        Message::Push(_) => send_push,
    } |> log_result()
}

◉ Error-Collecting Pipeline

Process all items, collecting errors rather than failing fast. Useful for batch operations.

fn process_batch(items: [Item]) -> BatchResult {
    let results = items
        |> map(fn(item) {
            match process_item(item) {
                Ok(r) => { item: item, result: Some(r), error: None },
                Err(e) => { item: item, result: None, error: Some(e) }
            }
        })

    BatchResult {
        succeeded: results |> filter(_.result.is_some()) |> map(_.result.unwrap()),
        failed: results |> filter(_.error.is_some()) |> map(fn(r) { (r.item, r.error.unwrap()) })
    }
}

◉ Window Pipeline

Process data in sliding or tumbling windows. Essential for time-series and streaming.

// Moving average
fn moving_average(data: [float], window_size: int) -> [float] {
    data
        |> windows(window_size)
        |> map(mean)
}

// Detect anomalies using rolling statistics
fn detect_anomalies(readings: [float]) -> [Anomaly] {
    readings
        |> windows(60)  // 1-minute windows
        |> enumerate()
        |> filter(fn((i, window)) {
            let avg = mean(window)
            let std = stddev(window)
            let latest = window[len(window) - 1]
            abs(latest - avg) > 3.0 * std
        })
        |> map(fn((i, _)) { Anomaly { index: i } })
}

◉ State Machine Pipeline

Transform input through a state machine. Each input can trigger state transitions.

enum TokenizerState { Normal, InString, InComment }

fn tokenize(chars: [char]) -> [Token] {
    let initial = { state: Normal, tokens: [], buffer: "" }

    chars
        |> fold(initial, fn(ctx, ch) {
            match (ctx.state, ch) {
                (Normal, '"') => { ...ctx, state: InString },
                (InString, '"') => {
                    ...ctx,
                    state: Normal,
                    tokens: ctx.tokens + [Token::String(ctx.buffer)],
                    buffer: ""
                },
                (InString, c) => { ...ctx, buffer: ctx.buffer + c },
                (Normal, '/') => { ...ctx, state: InComment },
                // ... more transitions
            }
        })
        |> _.tokens
}

◉ Retry Pipeline

Wrap fallible operations with retry logic. Combines well with exponential backoff.

fn with_retry[T](op: fn() -> Result[T], max_attempts: int) -> Result[T] {
    range(0, max_attempts)
        |> find_map(fn(attempt) {
            match op() {
                Ok(v) => Some(Ok(v)),
                Err(e) if attempt < max_attempts - 1 => {
                    sleep_ms(100 * pow(2, attempt))  // Exponential backoff
                    None
                },
                Err(e) => Some(Err(e))
            }
        })
        |> unwrap_or(Err("Max retries exceeded"))
}

// Usage
let result = with_retry(fn() { http.get(url) }, 3)

◉ Validation Pipeline

Run multiple validations, collecting all errors rather than stopping at first.

fn validate_user(user: User) -> Result[User, [ValidationError]] {
    let checks = [
        (fn(u) { len(u.name) > 0 }, "Name required"),
        (fn(u) { len(u.email) > 0 }, "Email required"),
        (fn(u) { u.email.contains("@") }, "Invalid email format"),
        (fn(u) { u.age >= 13 }, "Must be 13 or older"),
    ]

    let errors = checks
        |> filter(fn((check, _)) { !check(user) })
        |> map(fn((_, msg)) { ValidationError { message: msg } })

    match len(errors) {
        0 => Ok(user),
        _ => Err(errors)
    }
}

◉ Pipeline Composition

Build complex pipelines from smaller, reusable pieces.

// Define pipeline stages as named functions
let normalize = fn(data) {
    data |> map(lowercase) |> map(trim)
}

let remove_stopwords = fn(words) {
    words |> filter(fn(w) { !STOPWORDS.contains(w) })
}

let stem = fn(words) {
    words |> map(porter_stem)
}

// Compose into a text processing pipeline
let preprocess = compose(normalize, remove_stopwords, stem)

// Use it
let tokens = raw_text |> split(" ") |> preprocess()

◉ When Pipelines Don't Fit

Not everything should be a pipeline. Avoid them when:

Pipelines are a tool, not a religion. Use them where they clarify intent.

More examples in the playground and the examples directory.