Pipeline Sketches: Design Patterns
After writing thousands of lines of pipeline-heavy Lateralus code, certain patterns emerge repeatedly. This is a collection of idioms and design patterns that have proven useful across domains.
◉ Transform-Filter-Reduce
The most common pattern. Transform data, filter out what you don't need, reduce to a result.
// Count words over 5 characters
let long_word_count = text
|> split(" ")
|> map(fn(w) { w |> trim() |> lowercase() })
|> filter(fn(w) { len(w) > 5 })
|> len()
// Sum of squared even numbers
let sum_sq_evens = numbers
|> filter(fn(n) { n % 2 == 0 })
|> map(fn(n) { n * n })
|> sum()
◉ Fanout-Fanin
Process data through multiple paths, then merge results. Useful for parallel processing or when you need multiple derived values.
// Compute multiple statistics in parallel
fn analyze(data: [float]) -> Stats {
let results = data |> tee(
fn(d) { ("min", min(d)) },
fn(d) { ("max", max(d)) },
fn(d) { ("mean", mean(d)) },
fn(d) { ("stddev", stddev(d)) }
)
Stats {
min: results["min"],
max: results["max"],
mean: results["mean"],
stddev: results["stddev"]
}
}
// Multiple enrichment paths
fn enrich_user(user: User) -> EnrichedUser {
user |> tee(
fetch_profile,
fetch_permissions,
fetch_preferences
) |> merge_into(EnrichedUser)
◉ Accumulating Pipeline
Build up context as you move through stages. Each stage adds to the result rather than replacing it.
// Build a report with progressive enrichment
fn build_report(order_id: int) -> Report {
{ id: order_id }
|> add_order_details()
|> add_customer_info()
|> add_shipping_status()
|> add_payment_history()
|> format_report()
}
fn add_order_details(ctx: map) -> map {
let order = db.get_order(ctx.id)
{ ...ctx, order: order, items: order.items }
}
fn add_customer_info(ctx: map) -> map {
let customer = db.get_customer(ctx.order.customer_id)
{ ...ctx, customer: customer }
}
◉ Conditional Pipeline
Branch pipeline flow based on conditions. Use then_if or pattern matching.
// Apply discount only if eligible
fn process_order(order: Order) -> Order {
order
|> validate()
|> then_if(_.total > 100, apply_bulk_discount)
|> then_if(_.customer.is_member, apply_member_discount)
|> calculate_shipping()
|> finalize()
}
// Pattern-based branching
fn route_message(msg: Message) -> Result {
msg |> match {
Message::Email(_) => send_email,
Message::SMS(_) => send_sms,
Message::Push(_) => send_push,
} |> log_result()
}
◉ Error-Collecting Pipeline
Process all items, collecting errors rather than failing fast. Useful for batch operations.
fn process_batch(items: [Item]) -> BatchResult {
let results = items
|> map(fn(item) {
match process_item(item) {
Ok(r) => { item: item, result: Some(r), error: None },
Err(e) => { item: item, result: None, error: Some(e) }
}
})
BatchResult {
succeeded: results |> filter(_.result.is_some()) |> map(_.result.unwrap()),
failed: results |> filter(_.error.is_some()) |> map(fn(r) { (r.item, r.error.unwrap()) })
}
}
◉ Window Pipeline
Process data in sliding or tumbling windows. Essential for time-series and streaming.
// Moving average
fn moving_average(data: [float], window_size: int) -> [float] {
data
|> windows(window_size)
|> map(mean)
}
// Detect anomalies using rolling statistics
fn detect_anomalies(readings: [float]) -> [Anomaly] {
readings
|> windows(60) // 1-minute windows
|> enumerate()
|> filter(fn((i, window)) {
let avg = mean(window)
let std = stddev(window)
let latest = window[len(window) - 1]
abs(latest - avg) > 3.0 * std
})
|> map(fn((i, _)) { Anomaly { index: i } })
}
◉ State Machine Pipeline
Transform input through a state machine. Each input can trigger state transitions.
enum TokenizerState { Normal, InString, InComment }
fn tokenize(chars: [char]) -> [Token] {
let initial = { state: Normal, tokens: [], buffer: "" }
chars
|> fold(initial, fn(ctx, ch) {
match (ctx.state, ch) {
(Normal, '"') => { ...ctx, state: InString },
(InString, '"') => {
...ctx,
state: Normal,
tokens: ctx.tokens + [Token::String(ctx.buffer)],
buffer: ""
},
(InString, c) => { ...ctx, buffer: ctx.buffer + c },
(Normal, '/') => { ...ctx, state: InComment },
// ... more transitions
}
})
|> _.tokens
}
◉ Retry Pipeline
Wrap fallible operations with retry logic. Combines well with exponential backoff.
fn with_retry[T](op: fn() -> Result[T], max_attempts: int) -> Result[T] {
range(0, max_attempts)
|> find_map(fn(attempt) {
match op() {
Ok(v) => Some(Ok(v)),
Err(e) if attempt < max_attempts - 1 => {
sleep_ms(100 * pow(2, attempt)) // Exponential backoff
None
},
Err(e) => Some(Err(e))
}
})
|> unwrap_or(Err("Max retries exceeded"))
}
// Usage
let result = with_retry(fn() { http.get(url) }, 3)
◉ Validation Pipeline
Run multiple validations, collecting all errors rather than stopping at first.
fn validate_user(user: User) -> Result[User, [ValidationError]] {
let checks = [
(fn(u) { len(u.name) > 0 }, "Name required"),
(fn(u) { len(u.email) > 0 }, "Email required"),
(fn(u) { u.email.contains("@") }, "Invalid email format"),
(fn(u) { u.age >= 13 }, "Must be 13 or older"),
]
let errors = checks
|> filter(fn((check, _)) { !check(user) })
|> map(fn((_, msg)) { ValidationError { message: msg } })
match len(errors) {
0 => Ok(user),
_ => Err(errors)
}
}
◉ Pipeline Composition
Build complex pipelines from smaller, reusable pieces.
// Define pipeline stages as named functions
let normalize = fn(data) {
data |> map(lowercase) |> map(trim)
}
let remove_stopwords = fn(words) {
words |> filter(fn(w) { !STOPWORDS.contains(w) })
}
let stem = fn(words) {
words |> map(porter_stem)
}
// Compose into a text processing pipeline
let preprocess = compose(normalize, remove_stopwords, stem)
// Use it
let tokens = raw_text |> split(" ") |> preprocess()
◉ When Pipelines Don't Fit
Not everything should be a pipeline. Avoid them when:
- Lots of early exits: Use traditional control flow
- Complex state dependencies: Consider a class or closure
- Performance-critical loops: Sometimes a simple loop is faster
- Readability suffers: If the pipeline is confusing, refactor
Pipelines are a tool, not a religion. Use them where they clarify intent.
More examples in the playground and the examples directory.