Anantha Kumaran random thoughts

Hunger

Writing is a powerful medium, which when controlled by a skillful writer could evoke a captivating world in the mind of the reader – something the reader couldn’t have come up by himself. Knut Hansum takes it to the next level in the book Hunger. Instead of creating a world which could be observed by the reader as an omnipresent God, he chooses to place the reader inside the world as a character, make the reader go through the same sensations of the protagonist in the story, make him feel how it would be to be hungry and delirious.

A primer on Elixir Stream

For many problems in programming, the most straightforward solution is to transform a list (E.g. map/filter/reduce etc over it). Once in a while, there will be problems which could be solved with list and transformations, except that the list is big and consumes lot of memory. Let’s say you want to export records in database as a csv report, it might not be possible to load the entire set of records in memory before the transformations can be applied to it. Stream abstraction in Elixir is designed to address these kinds of problems.

Steams allow for processing of elements, without any buffering. A transformation is a function which takes an element and returns zero or more elements. The stream abstraction helps us to apply a sequence of such transformations on a stream with a constant memory overhead.

Let’s look at an example to understand the api and semantics. I will skip details like escaping csv entries and focus on stream.

cycle
take
map
into
run
File.stream
Stream.cycle([[name: "Johh", age: "42", id: "4774", plan: "premium"]])
|> Stream.take(1000_000)
|> Stream.map(fn record ->
  [Enum.join(Keyword.values(record), ","), "\n"]
  end)
|> Stream.into(File.stream!("records.csv"))
|> Stream.run

The function cycle creates an infinite stream of elements. In more real scenarios the source would be a database, etc. The function take creates a new stream that will return the first 1 million elements from the source stream. The function map creates a new stream that applies the given function to each element in the source stream. The function into emits each element in the source stream into the destination stream. The function run forces the stream to run.

Though most of the functions that deal with streams look exactly like the list counterpart, there are some subtle semantical differences.

  1. Stream is lazy in nature. When you apply a transformation to a stream, instead of the result of the transformation, you get a new stream. The stream has to be explicitly forced by either using Stream.run or using any other function in Enum module.

  2. Stream abstraction is lot more restrictive in nature. Stream only allows linear transformations. When you force a stream, all the streams associated with it are also forced.

When you force a stream (E.g. run), the bottommost stream will pull elements from the above stream, which in turn will pull from the stream above it and apply the transformation on it. Effectively each element travels from the top to bottom without being buffered anywhere.

For example, the following transformation is easy to perform using stream abstraction

fetch records from db
filter records
map records to csv format
Stream.into
Stream.run
File.stream

This works well when there is one operation in the end (the one that writes to file in the case above). But if you want to have multiple end operations, things get tricky. Though the version below looks similar to the one above, it is much more difficult to model using the stream abstraction.

fetch records from db
filter records
map records to csv format
map records to ndjson format
Stream.into
File.stream
run
Stream.into
File.stream
run

Why is the second version difficult? What happens if you pull 2 elements from one end operation (the ndjson side), and 1 element from another (the csv side)? Because there are two streams in the bottom, each might be pulling elements at different speed.

There seem to be two options for allowing pull from both end operations.

  1. Introduce a buffer in stream two, which would break our constant memory overhead promise. E.g. if ndjson side pulled 2 elements, but csv side pulled only 1, filter records would have to keep the 1 extra element buffered for when csv side asks for more element.
  2. Split the stream into two pipelines, which means, the first two streams (fetch and filter) would be applied twice. E.g. for both sides, db records would be fetched separately.

Is there any other way to implement this without introducing buffer? Fortunately for us, this is a well-studied problem12. Libraries like conduit and repa-flow provide abstraction to handle those scenario elegantly. But the models are not without shortcoming.

Source and Sink

A stream can be implemented in two ways: push → and pull ←. Let’s say two streams A and B are connected in the following way

A
B

An implementation is called pull based if B controls the main loop. Whenever B wants more elements it will pull from A.

A
B

An implementation is called push based if A controls the main loop. A will push the elements to B and B will not have any control over when it will get the next element.

Pull based stream is called source and push based stream is called sink . Is one type of implementation strictly better than another? No, each has its own pros and cons.

Source is good for cases where two streams needs to be merged into a single stream. But forking a source into two sources is not possible (without having buffers, which defeats the purpose).

Sink is good for stream that needs to be split into multiple streams. It’s not possible to merge multiple sinks into a single sink.

The difference between the two types will become apparent when you think about the three fundamental transformations merge, fork and map

merge
fork
map

The table below shows all the possible combinations of source, sink and transformation and which can be implemented without introducing buffers.

merge(,):: fork()::(,) map()::
merge(,):: fork()::(,) map()::
merge(,):: fork()::(,) map()::
merge(,):: fork()::(,) map()::
merge(,):: fork()::(,)
merge(,):: fork()::(,)
merge(,):: fork()::(,)
merge(,):: fork()::(,)

There are two main points that could be inferred from the above table

  1. Once you map from source to sink, there is no way you can map back to source.
  2. If you need to fork the stream at any point, you need to choose at least one of the resultant stream as sink.

There can be only one main loop in any series of connected streams. If all the streams are sink, then the main loop would be controlled by the first sink. In case of mixed streams, only one of the end streams can be a source, which will control the main loop. Rest of the end streams (if there are any) would be sink. Each sink would receive elements based on how source streams are getting elements. Source pulls elements. Sinks get pushed the elements.

Elixir Source and Sink Streams

Does Elixir support source and sink abstraction? The documentation doesn’t explicitly talk about push/pull or source/sink. So I could only make educated guess.

A stream should implement at least one of Enumerable and Collectable. Enumerable protocol enables pull type implementation via suspend operator. Collectable protocol only allows the push type implementation.

Effectively

  1. if a stream implements only Collectable then it’s a sink.
  2. if a stream implements Enumerable then it’s a source.
  3. if a stream implements both Enumerable and Collectable then it’s both a source and sink (E.g. File.stream!).

Stream.into could be considered as a function that performs the fork transformation. It takes a source and a sink as arguments and returns a new source and connects the source to the sink. When the returned source is forced, each element will get pushed to the sink

cycle
take
map
into
into
File.stream
File.stream
run
Stream.cycle([[name: "Johh", age: "42", id: "4774", plan: "premium"]])
|> Stream.take(1000_000)
|> Stream.map(fn record ->
  [Enum.join(Keyword.values(record), ","), "\n"]
  end)
|> Stream.into(File.stream!("records_1.csv"))
|> Stream.into(File.stream!("records_2.csv"))
|> Stream.run

Here we use the Stream.into to fork twice thereby writing the same content to two different streams.

Let’s go back to our main problem. We wanted to create both csv and json file. We should be able to manipulate the sink after it’s forked. But this where Elixir support lacks. The Stream module only provides functions to transform source type streams. As Collectable is a protocol, we could implement the transform functions ourself.

defmodule Sink do
  defstruct [:builder]

  def map(dest, fun) do
    builder = fn ->
      {state, dest_fun} = Collectable.into(dest)
      collector_fun = fn
        state, {:cont, elem} -> dest_fun.(state, {:cont, fun.(elem)})
        state, :done -> dest_fun.(state, :done)
        state, :halt -> dest_fun.(state, :halt)
      end
      {state, collector_fun}
    end
    %Sink{builder: builder}
  end
end


defimpl Collectable, for: Sink do
  def into(%Sink{builder: builder}) do
    builder.()
  end
end
cycle
take
into
map
File.stream
into
map
run
File.stream
Stream.cycle([[name: "Johh", age: "42", id: "4774", plan: "premium"]])
|> Stream.take(1000_000)
|> Stream.into(Sink.map(File.stream!("records.csv"), fn record ->
  [Enum.join(Keyword.values(record), ","), "\n"]
end))
|> Stream.into(Sink.map(File.stream!("records.ndjson"), fn record ->
  [Poison.encode!(Enum.into(record, %{})), "\n"]
end))
|> Stream.run

One of the quirks with transforming sink is that all the transformations have to be applied in reverse order starting with the last sink.

  1. Lippmeierα, Ben, Fil Mackayβ, and Amos Robinsonγ. “Polarized Data Parallel Data Flow.” 

  2. Kay, Michael. “You pull, I’ll push: on the polarity of pipelines.” Balisage: The Markup Conference. 2009. 

Visualization of backoff functions

Failures are inevitable in any system. How it should be handled varies from one system to another. In job processing systems, a common approach is to retry the failed jobs for a fixed number of times before they are considered as permanent failures. A backoff function is used to determine the wait time between successive retries.

Let’s look at a simple backoff function, which retries after a fixed wait time – 5 minutes in this case.

function constant(retryCount) {
   return 5 * 60;
}

Let’s assume there are 100 job failures, the chart below shows when each of the jobs would be retried again. Each dot represents a time at which a job is retried. The color is varied based on the retry count.

An exponential backoff function increases the wait time exponentially for each retry.

function exponential(retryCount) {
    var min = 3 * 60;
    var base = 2;
    return min + (Math.pow(base, retryCount) * 60);
}

The above two are pure functions, given an input they will always return the same output. If n jobs failed at the same time, then all the n jobs will be retried at the same time, which could cause thundering herd problem. A random component called jitter is normally added to fix this problem. The last component in the function below is a random jitter that is scaled based on the retry count.

function sidekiq(retryCount) {
    return Math.pow(retryCount, 4) + 15 +
        (Math.random() * 30 * (retryCount + 1));
}

The above function is good enough for most of the use cases. There are still some gaps where your job processing system would be idle. The below function tries to distribute the load evenly by increasing the randomness.

function between(a, b) {
    return a + (Math.random() * (b - a));
}
function buckets(retryCount) {
    var exp = 3;
    return between(Math.pow(retryCount, exp),
                   Math.pow(retryCount + 2, exp));
}

Although the load distribution is better than the previous version, the wait time between two retries starts to deviate a lot. I wonder if there are any stateless functions which could provide better distribution without much deviation in wait time.

Debugging cryptic errors

At a project I am working on, our backend system makes http requests to hundreds of different servers. It is sort of like webhook, http requests are made to the customer server with the payload whenever some specific events occur in our system. We were in the midst of migrating our systems from ruby to elixir.

During the test run, we started to get a cryptic error for some small number of domains.

HTTPoison.get("https://piay.iflix.com", [], [proxy: {'10.128.10.16', 3128}])
# => timestamp=2017-01-02T04:11:00.816Z level=error message= SSL: :hello:ssl_alert.erl:88:Fatal error: handshake failure

# => {:error, %HTTPoison.Error{id: nil, reason: {:tls_alert, 'handshake failure'}}}

The error points out that the handshake phase during the ssl connection is failing. But it doesn’t say why it is failing.

To get more information, I tried to use ssl module directly.

:ssl.connect('piay.iflix.com', 443, [])
# => {:ok,{:sslsocket, {:gen_tcp, #Port<0.10658>, :tls_connection, :undefined}, #PID<0.325.0>}}

To my amazement, it worked. I was able to connect to this domain, but https request failed. Due to compliance reasons, we have to use a proxy server for all our outgoing http/https requests. We use HTTPoison library, which is a wrapper for hackney library. We already had some issues due to proxy. It seems like most of the users of the hackney library don’t use the proxy option, so some of the code paths related to proxy are not well tested. To make sure proxy is the problem, I made the request without proxy option and it worked.

Hackney uses connect tunneling method, which is quite simple. The http client sends CONNECT piay.iflix.com:443 to the proxy server, which in turn opens a tcp connection to piay.iflix.com:443. The proxy server will then relay whatever data sent by http client to the destination server. In case of https request, once the connection is established, hackney initiates the ssl protocol using ssl:connect method.

This looks quite simple, but still something is going wrong. The same ssl:connect succeeds when it is established directly, but not through the proxy server.

The dbg app provides text based tracing functionality. It can be used to trace function at various granularities, from all the functions in a module to a function with specific arguments. I started to trace all the function calls in ssl module, but it resulted in too much data for me to analyze properly. Then I started to read the source code of ssl app and started to trace a single function at a time and compared the arguments and the results of successful and failed connection.

(<0.572.0>) returned from ssl_handshake:client_hello_extensions/6 -> 
{hello_extensions,
 {renegotiation_info,
  {elliptic_curves,
   {1,3,
    132,
    0,
    30}]},
 undefined}


(<0.572.0>) returned from ssl_handshake:client_hello_extensions/6 -> 
{hello_extensions,
 {renegotiation_info,
  {elliptic_curves,
   {1,3,
    132,
    0,
    30}]},
 {sni,
  "piay.iflix.com"}}

After multiple trials and errors, I finally came across something interesting. The sni field was present in successful connection, but not in failed connection. The rest of the deduction was easy. The sni extension allows a single server to serve different certificates for different domains. During the initial handshake, the client has to provide the domain name along with other details.

If a host name is passed as the first param of ssl:connect, the sni extension is automatically enabled. For proxy request, the connection is established to the proxy server, which relays the data to the destination server. As the ip address of the proxy server is passed as the first param, sni extension was not enabled.

HTTPoison.get("https://piay.iflix.com", [], [proxy: {'10.128.10.16', 3128}, ssl: [server_name_indication: 'piay.iflix.com']])

The fix was easy. The sni extension has to be enabled explicitly. As always, more layers introduce more points of failures.

80TTA

Section 80TTA of Income Tax Act allows deduction up to ₹10,000 on the income earned as interest from a savings account. At interest rate for savings account, a principal of ₹ would generate ₹10,000 as interest in a year. The effective amount of interest you would make for the same principal in fixed deposit scheme would differ based on the tax slab (slab 1 - 10%, slab 2 - 20%, slab 3 - 30%) you belong to. You should use fixed deposit scheme as long as the interest rate is higher than , , for slab 1, slab 2 and slab 3 respectively.