Writing is a powerful medium, which when controlled by a skillful
writer could evoke a captivating world in the mind of the reader –
something the reader couldn’t have come up by himself. Knut Hansum
takes it to the next level in the book Hunger. Instead of creating a
world which could be observed by the reader as an omnipresent God, he
chooses to place the reader inside the world as a character, make the
reader go through the same sensations of the protagonist in the story,
make him feel how it would be to be hungry and delirious.
For many problems in programming, the most straightforward solution is to transform a list (E.g. map/filter/reduce etc over it). Once in a while, there will be problems which could be solved with list and transformations, except that the list is big and consumes lot of memory. Let’s say you want to export records in database as a csv report, it might not be possible to load the entire set of records in memory before the transformations can be applied to it. Stream abstraction in Elixir is designed to address these kinds of problems.
Steams allow for processing of elements, without any buffering. A transformation is a function which takes an element and returns zero or more elements. The stream abstraction helps us to apply a sequence of such transformations on a stream with a constant memory overhead.
Let’s look at an example to understand the api and semantics. I will skip details like escaping csv entries and focus on stream.
The function cycle creates an infinite stream of elements. In more real scenarios the source would be a database, etc. The function take creates a new stream that will return the first 1 million elements from the source stream. The function map creates a new stream that applies the given function to each element in the source stream. The function into emits each element in the source stream into the destination stream. The function run forces the stream to run.
Though most of the functions that deal with streams look exactly like the list counterpart, there are some subtle semantical differences.
Stream is lazy in nature. When you apply a transformation to a stream, instead of the result of the transformation, you get a new stream. The stream has to be explicitly forced by either using Stream.run or using any other function in Enum module.
Stream abstraction is lot more restrictive in nature. Stream only allows linear transformations. When you force a stream, all the streams associated with it are also forced.
When you force a stream (E.g. run), the bottommost stream will pull elements from the above stream, which in turn will pull from the stream above it and apply the transformation on it. Effectively each element travels from the top to bottom without being buffered anywhere.
For example, the following transformation is easy to perform using stream abstraction
fetch records from db
map records to csv format
This works well when there is one operation in the end (the one that writes to file in the case above). But if you want to have multiple end operations, things get tricky. Though the version below looks similar to the one above, it is much more difficult to model using the stream abstraction.
fetch records from db
map records to csv format
map records to ndjson format
Why is the second version difficult? What happens if you pull 2 elements from one end operation (the ndjson side), and 1 element from another (the csv side)? Because there are two streams in the bottom, each might be pulling elements at different speed.
There seem to be two options for allowing pull from both end operations.
Introduce a buffer in stream two, which would break our constant memory overhead promise. E.g. if ndjson side pulled 2 elements, but csv side pulled only 1, filter records would have to keep the 1 extra element buffered for when csv side asks for more element.
Split the stream into two pipelines, which means, the first two streams (fetch and filter) would be applied twice. E.g. for both sides, db records would be fetched separately.
Is there any other way to implement this without introducing buffer? Fortunately for us, this is a well-studied problem12. Libraries like conduit and repa-flow provide abstraction to handle those scenario elegantly. But the models are not without shortcoming.
Source and Sink
A stream can be implemented in two ways: push → and pull ←. Let’s say two streams A and B are connected in the following way
An implementation is called pull based if B controls the main loop. Whenever B wants more elements it will pull from A.
An implementation is called push based if A controls the main loop. A will push the elements to B and B will not have any control over when it will get the next element.
Pull based stream is called source and push based stream is called sink . Is one type of implementation strictly better than another? No, each has its own pros and cons.
Source is good for cases where two streams needs to be merged into a single stream. But forking a source into two sources is not possible (without having buffers, which defeats the purpose).
Sink is good for stream that needs to be split into multiple streams. It’s not possible to merge multiple sinks into a single sink.
The difference between the two types will become apparent when you think about the three fundamental transformations merge, fork and map
The table below shows all the possible combinations of source, sink and transformation and which can be implemented without introducing buffers.
There are two main points that could be inferred from the above table
Once you map from source to sink, there is no way you can map back to source.
If you need to fork the stream at any point, you need to choose at least one of the resultant stream as sink.
There can be only one main loop in any series of connected streams. If all the streams are sink, then the main loop would be controlled by the first sink. In case of mixed streams, only one of the end streams can be a source, which will control the main loop. Rest of the end streams (if there are any) would be sink. Each sink would receive elements based on how source streams are getting elements. Source pulls elements. Sinks get pushed the elements.
Elixir Source and Sink Streams
Does Elixir support source and sink abstraction? The documentation doesn’t explicitly talk about push/pull or source/sink. So I could only make educated guess.
A stream should implement at least one of Enumerable and Collectable. Enumerable protocol enables pull type implementation via suspend operator. Collectable protocol only allows the push type implementation.
if a stream implements only Collectable then it’s a sink.
if a stream implements Enumerable then it’s a source.
if a stream implements both Enumerable and Collectable then it’s both a source and sink (E.g. File.stream!).
Stream.into could be considered as a function that performs the fork transformation. It takes a source and a sink as arguments and returns a new source and connects the source to the sink. When the returned source is forced, each element will get pushed to the sink
Here we use the Stream.into to fork twice thereby writing the same content to two different streams.
Let’s go back to our main problem. We wanted to create both csv and json file. We should be able to manipulate the sink after it’s forked. But this where Elixir support lacks. The Stream module only provides functions to transform source type streams. As Collectable is a protocol, we could implement the transform functions ourself.
Failures are inevitable in any system. How it should be handled varies
from one system to another. In job processing systems, a common
approach is to retry the failed jobs for a fixed number of times
before they are considered as permanent failures. A backoff
function is used to determine the wait time between successive
Let’s look at a simple backoff function, which retries after a fixed
wait time – 5 minutes in this case.
Let’s assume there are 100 job failures, the chart below shows when
each of the jobs would be retried again. Each dot represents a time at
which a job is retried. The color is varied based on the retry count.
An exponential backoff function increases the wait time exponentially
for each retry.
The above two are pure functions, given an input they will always
return the same output. If n jobs failed at the same time, then all
the n jobs will be retried at the same time, which could cause
problem. A random component called jitter is normally added to fix
this problem. The last component in the function below is a random
jitter that is scaled based on the retry count.
The above function is good enough for most of the use cases. There are
still some gaps where your job processing system would be idle. The
below function tries to distribute the load evenly by increasing the
Although the load distribution is better than the previous version,
the wait time between two retries starts to deviate a lot. I wonder if
there are any stateless functions which could provide better distribution
without much deviation in wait time.
At a project I am working on, our backend system makes http requests
to hundreds of different servers. It is sort of like webhook, http
requests are made to the customer server with the payload whenever
some specific events occur in our system. We were in the midst of
migrating our systems from ruby to elixir.
During the test run, we started to get a cryptic error for some small
number of domains.
To my amazement, it worked. I was able to connect to this domain, but
https request failed. Due to compliance reasons, we have to use a
proxy server for all our outgoing http/https requests. We use
HTTPoison library, which is a wrapper for hackney library. We
already had some issues due to proxy. It seems like most of the users
of the hackney library don’t use the proxy option, so some of the code
paths related to proxy are not well tested. To make sure proxy is the
problem, I made the request without proxy option and it worked.
method, which is quite simple. The http client sends CONNECT
piay.iflix.com:443 to the proxy server, which in turn opens a tcp
connection to piay.iflix.com:443. The proxy server will then relay
whatever data sent by http client to the destination server. In case
of https request, once the connection is established, hackney
initiates the ssl protocol using ssl:connect method.
This looks quite simple, but still something is going wrong. The same
ssl:connect succeeds when it is established directly, but not
through the proxy server.
The dbg app provides text based
tracing functionality. It can be used to trace function at various
granularities, from all the functions in a module to a function with
specific arguments. I started to trace all the function calls in ssl
module, but it resulted in too much data for me to analyze
properly. Then I started to read the source code of ssl app and
started to trace a single function at a time and compared the
arguments and the results of successful and failed connection.
(<0.572.0>) returned from ssl_handshake:client_hello_extensions/6 ->
(<0.572.0>) returned from ssl_handshake:client_hello_extensions/6 ->
After multiple trials and errors, I finally came across something
interesting. The sni field was present in successful connection, but
not in failed connection. The rest of the deduction was easy. The sni
allows a single server to serve different certificates for different
domains. During the initial handshake, the client has to provide the
domain name along with other details.
If a host name is passed as the first param of ssl:connect, the sni
extension is automatically enabled. For proxy request, the connection
is established to the proxy server, which relays the data to the
destination server. As the ip address of the proxy server is passed as
the first param, sni extension was not enabled.
Section 80TTA of Income Tax Act allows deduction up to ₹10,000 on the
income earned as interest from a savings account. At interest rate for savings account, a principal of
₹ would generate ₹10,000 as interest in a
year. The effective amount of interest you would make for the same
principal in fixed deposit scheme would differ based on the tax slab
(slab 1 - 10%, slab 2 - 20%, slab 3 - 30%) you belong to. You should
use fixed deposit scheme as long as the interest rate is higher than
, , for slab 1, slab 2 and slab 3 respectively.