For many problems in programming, the most straightforward solution is to transform a list (E.g. map/filter/reduce etc over it). Once in a while, there will be problems which could be solved with list and transformations, except that the list is big and consumes lot of memory. Let’s say you want to export records in database as a csv report, it might not be possible to load the entire set of records in memory before the transformations can be applied to it. Stream abstraction in Elixir is designed to address these kinds of problems.

Streams allow for processing of elements, without any buffering. A transformation is a function which takes an element and returns zero or more elements. The stream abstraction helps us to apply a sequence of such transformations on a stream with a constant memory overhead.

Let’s look at an example to understand the api and semantics. I will skip details like escaping csv entries and focus on stream.

cycle
take
map
into
run
File.stream
Stream.cycle([[name: "John", age: "42", id: "4774", plan: "premium"]])
|> Stream.take(1000_000)
|> Stream.map(fn record ->
  [Enum.join(Keyword.values(record), ","), "\n"]
  end)
|> Stream.into(File.stream!("records.csv"))
|> Stream.run

The function cycle creates an infinite stream of elements. In more real scenarios the source would be a database, etc. The function take creates a new stream that will return the first 1 million elements from the source stream. The function map creates a new stream that applies the given function to each element in the source stream. The function into emits each element in the source stream into the destination stream. The function run forces the stream to run.

Though most of the functions that deal with streams look exactly like the list counterpart, there are some subtle semantical differences.

  1. Stream is lazy in nature. When you apply a transformation to a stream, instead of the result of the transformation, you get a new stream. The stream has to be explicitly forced by either using Stream.run or using any other function in Enum module.

  2. Stream abstraction is lot more restrictive in nature. Stream only allows linear transformations. When you force a stream, all the streams associated with it are also forced.

When you force a stream (E.g. run), the bottommost stream will pull elements from the above stream, which in turn will pull from the stream above it and apply the transformation on it. Effectively each element travels from the top to bottom without being buffered anywhere.

For example, the following transformation is easy to perform using stream abstraction

fetch records from db
filter records
map records to csv format
Stream.into
Stream.run
File.stream

This works well when there is one operation in the end (the one that writes to file in the case above). But if you want to have multiple end operations, things get tricky. Though the version below looks similar to the one above, it is much more difficult to model using the stream abstraction.

fetch records from db
filter records
map records to csv format
map records to ndjson format
Stream.into
File.stream
run
Stream.into
File.stream
run

Why is the second version difficult? What happens if you pull 2 elements from one end operation (the ndjson side), and 1 element from another (the csv side)? Because there are two streams in the bottom, each might be pulling elements at different speed.

There seem to be two options for allowing pull from both end operations.

  1. Introduce a buffer in stream two, which would break our constant memory overhead promise. E.g. if ndjson side pulled 2 elements, but csv side pulled only 1, filter records would have to keep the 1 extra element buffered for when csv side asks for more element.
  2. Split the stream into two pipelines, which means, the first two streams (fetch and filter) would be applied twice. E.g. for both sides, db records would be fetched separately.

Is there any other way to implement this without introducing buffer? Fortunately for us, this is a well-studied problem12. Libraries like conduit and repa-flow provide abstraction to handle those scenario elegantly. But the models are not without shortcoming.

Source and Sink

A stream can be implemented in two ways: push → and pull ←. Let’s say two streams A and B are connected in the following way

A
B

An implementation is called pull based if B controls the main loop. Whenever B wants more elements it will pull from A.

A
B

An implementation is called push based if A controls the main loop. A will push the elements to B and B will not have any control over when it will get the next element.

Pull based stream is called source and push based stream is called sink . Is one type of implementation strictly better than another? No, each has its own pros and cons.

Source is good for cases where two streams needs to be merged into a single stream. But forking a source into two sources is not possible (without having buffers, which defeats the purpose).

Sink is good for stream that needs to be split into multiple streams. It’s not possible to merge multiple sinks into a single sink.

The difference between the two types will become apparent when you think about the three fundamental transformations merge, fork and map

merge
fork
map

The table below shows all the possible combinations of source, sink and transformation and which can be implemented without introducing buffers.

merge(,):: fork()::(,) map()::
merge(,):: fork()::(,) map()::
merge(,):: fork()::(,) map()::
merge(,):: fork()::(,) map()::
merge(,):: fork()::(,)
merge(,):: fork()::(,)
merge(,):: fork()::(,)
merge(,):: fork()::(,)

There are two main points that could be inferred from the above table

  1. Once you map from source to sink, there is no way you can map back to source.
  2. If you need to fork the stream at any point, you need to choose at least one of the resultant stream as sink.

There can be only one main loop in any series of connected streams. If all the streams are sink, then the main loop would be controlled by the first sink. In case of mixed streams, only one of the end streams can be a source, which will control the main loop. Rest of the end streams (if there are any) would be sink. Each sink would receive elements based on how source streams are getting elements. Source pulls elements. Sinks get pushed the elements.

Elixir Source and Sink Streams

Does Elixir support source and sink abstraction? The documentation doesn’t explicitly talk about push/pull or source/sink. So I could only make educated guess.

A stream should implement at least one of Enumerable and Collectable. Enumerable protocol enables pull type implementation via suspend operator. Collectable protocol only allows the push type implementation.

Effectively

  1. if a stream implements only Collectable then it’s a sink.
  2. if a stream implements Enumerable then it’s a source.
  3. if a stream implements both Enumerable and Collectable then it’s both a source and sink (E.g. File.stream!).

Stream.into could be considered as a function that performs the fork transformation. It takes a source and a sink as arguments and returns a new source and connects the source to the sink. When the returned source is forced, each element will get pushed to the sink

cycle
take
map
into
into
File.stream
File.stream
run
Stream.cycle([[name: "John", age: "42", id: "4774", plan: "premium"]])
|> Stream.take(1000_000)
|> Stream.map(fn record ->
  [Enum.join(Keyword.values(record), ","), "\n"]
  end)
|> Stream.into(File.stream!("records_1.csv"))
|> Stream.into(File.stream!("records_2.csv"))
|> Stream.run

Here we use the Stream.into to fork twice thereby writing the same content to two different streams.

Let’s go back to our main problem. We wanted to create both csv and json file. We should be able to manipulate the sink after it’s forked. But this where Elixir support lacks. The Stream module only provides functions to transform source type streams. As Collectable is a protocol, we could implement the transform functions ourself.

defmodule Sink do
  defstruct [:builder]

  def map(dest, fun) do
    builder = fn ->
      {state, dest_fun} = Collectable.into(dest)
      collector_fun = fn
        state, {:cont, elem} -> dest_fun.(state, {:cont, fun.(elem)})
        state, :done -> dest_fun.(state, :done)
        state, :halt -> dest_fun.(state, :halt)
      end
      {state, collector_fun}
    end
    %Sink{builder: builder}
  end
end


defimpl Collectable, for: Sink do
  def into(%Sink{builder: builder}) do
    builder.()
  end
end
cycle
take
into
map
File.stream
into
map
run
File.stream
Stream.cycle([[name: "Johhn", age: "42", id: "4774", plan: "premium"]])
|> Stream.take(1000_000)
|> Stream.into(Sink.map(File.stream!("records.csv"), fn record ->
  [Enum.join(Keyword.values(record), ","), "\n"]
end))
|> Stream.into(Sink.map(File.stream!("records.ndjson"), fn record ->
  [Poison.encode!(Enum.into(record, %{})), "\n"]
end))
|> Stream.run

One of the quirks with transforming sink is that all the transformations have to be applied in reverse order starting with the last sink.

  1. Lippmeierα, Ben, Fil Mackayβ, and Amos Robinsonγ. “Polarized Data Parallel Data Flow.” 

  2. Kay, Michael. “You pull, I’ll push: on the polarity of pipelines.” Balisage: The Markup Conference. 2009.