omniverse theirix's Thoughts About Research and Development

Why the Riak matters

Riak is one of the first and most prominent key-value databases implementing the famous Amazon Dynamo paper. The paper was unveiled in 2007. Sixteen years ago, it could even get a passport!

The more widespread database that follows the paper is Apache Cassandra. Funny enough that AWS DynamoDB had walked away from original ideas (leaderless replication), and all of these systems have different storage engines.

Riak differs by following the original ideas from Dynamo. So if you’d like to explore those principles and design a system for database, or maybe even enhance a database, Riak is a very good candidate, with a tiny peculiarity. It is written in Erlang and OTP which raises a barrier to entry.

In this post, I’ll describe the architecture of Riak, map-reduce pipeline development in Erlang and the Bitcask storage engine.

Dynamo foundations

Attempts to speak with the Riak Ring in Erlang as shown in the Arrival movie:

arrival

Maybe Riak’s glory days are passed. After Basho’s departure and giving away projects to the community, it lacks support. For example, you cannot download binary releases for macOS since the bucket with artifacts has some permission errors. So it’s required to build it from the source. And it’s impossible to build Riak with the latest Erlang 25, I only managed to do it with Erlang 22. A lot of dead links in the documentation. So you were warned.

Riak has a nice CLI to manage a cluster. Each node is accessible by HTTP and protobuf protocols. You can even fire an Erlang shell and work with a node directly because it is an OTP application. It really shines with distributed deployments since it is fully decentralised and uses leaderless replication. Membership and state are replicated across the cluster with gossip protocols. Riak is leaderless, therefore a client can connect to any node and perform operations. Depending on request types, a node can interact with other nodes to execute the request. Regarding CAP, Riak is called an AP system, but I’ll talk more about consistency guarantees later.

Data is partitioned between nodes with consistent hashing at the ring. The ring of ordered hashes represents the partitioning of keyspace to multiple continuous partitions identified by start and end keys. Each partition on the ring is handled by a vnode. Each Riak node is mapped to multiple vnodes. To fetch or store an object in the cluster, Riak first hashes the key, finds a small sequence (depending on availability requirements) of hashes on the ring, finds the vnodes, maps them onto physical nodes, perform reads/writes and waits for a read/write quorum until returning an answer to the client. The idea of consistent hashing lies in a faster rehashing when a node joins and leaves a cluster, becaise it affects only a small part of the ring instead of the whole ring. If a node leaves the cluster temporarily (i.e., network problems), rehashing is still unnecessary because this situation can be handled by hinted handoff. If a failure is permament, differences in keys between nodes can be found using anti-entropy procedures.

So many concepts from Dynamo are directly implemented in Riak without major changes. There is a good page in documentation about how closely these concepts are mapped to Riak’s.

Client interfaces

Riak is widely known for its good interface. It had been thoroughly following the REST principles. According to Richardson Maturity Model, which describes the adoption of REST concepts at Level 0 (HTTP transport), Level 1 (resources), Level 2 (HTTP verbs), Riak stays at the highest Level 3 (Hypermedia Control). It provides walking across linked keys using standard Link headers. Multiple values are served with the standard HTTP multipart/mixed responses. Different types of media are handled with the MIME types.

I’ve seen such powerful HTTP API in Hashicorp products too, for example, in Consul. The ability to have a full-featured HTTP API to perform KV operations, resolve conflicts and even perform a leader election without needing to write a lot of Java clients (I look at you, ZooKeeper) is fascinating.

Another available protocol is Protobuf. Curiously, it is not used over gRPC transport but instead with a very simple TCP transport.

Of course, many client libraries and drivers are written for Riak to interact with the store. For example, here is some Ruby code to create and put a value:

client = Riak::Client.new(:pb_port => 10017)
bucket = client.bucket("test")

obj = bucket.new('mykey')
obj.data = {'answer': 42}
obj.store()

val = bucket.get('mykey')
fail unless val.data['answer'] == 42

And an excerpt of quering the key by HTTP:

http http://localhost:10018/buckets/test/keys/mykey
HTTP/1.1 200 OK
Content-Type: application/json; charset=UTF-8
Link: </buckets/test>; rel="up"
X-Riak-Vclock: a85hYGBgzGDKBVI8R4M2cgds+X+VgUFdMIMpkTGPlSFaZ8tdviwA
{
    "answer": 42
}

Libraries lack maintenance too and show their age. For example, it’s not possible to use Python newer than 3.7.

Tunable consistency

It’s well known that Dynamo databases are eventually-consistent. Usually, they are put to “AP” class, but Kleppman disagrees and instists on “P”. Let’s check why.

To handle conflicts, Riak provides a variety of options. I will describe four of them.

The simplest conflict resolution strategy is “last write wins” (Cassandra uses it too) based on the latest timestamp. Client does not need to address conflicts.

The recommended option is to use multi-version values. For this option, Riak uses dotted version vectors. This structure is an improvement of vector clocks which can handle many clients with conflicting writes. Two conflicting writes create siblings – different values identified by their vector clock values. So you need to handle siblings at the client application.

Let’s create a bucket type with version vectors.

riak-admin bucket-type create vv '{"props":{"allow_mult":true}}'

Create conflict:

http PUT http://localhost:10018/types/vv/buckets/cats/keys/favourite name=Puffy
http PUT http://localhost:10018/types/vv/buckets/cats/keys/favourite name=Pirate

When we try to get value, Riak returns both siblings

http http://localhost:10018/types/vv/buckets/cats/keys/favourite
HTTP/1.1 300 Multiple Choices
...
Siblings:
4IgpOUc3rR7wPOKbgJTGfJ
5UX8mBVbx3Y9Vno6ncicuO

So to resolve the conflict, the next update must include a context (X-Riak-Vclock value from PUT responses) and therefore resolve a conflict. If no context is given, Riak cannot reconstruct lineage.

http http://localhost:10018/types/vv/buckets/cats/keys/favourite \
    name=Pirate X-Riak-Vclock:"hexademical-clock-value-from-response"

Libraries encapsulate sibling handling with a fetch-modify-write loop. For example, the equivalent code in Ruby:

bucket = client.bucket('cats')
# read from Riak
obj = bucket.get_or_new('favourite', type: 'vv')
# deal with siblings
fail if obj.siblings
# update object locally
obj.content_type = 'application/json'
obj.data = {"name"=>"Pirate"}
# write to Riak
obj.store

So to write a key, you always need to read a key before and therefore create a causal context which will be included in write operation.

The third option is a strong consistency mode. Strong consistency is specified at a per-bucket level. There Riak guarantees that no siblings will be returned during read operations. Since the causal context is used for write operations, no siblings will be produced too. Client code is free from handling conflicts (a line fail if obj.siblings can be dropped).

The fourth option appeared in 2014 in version 2.0 with the introduction of CRDT types (Conflict-free replicated data types) like flags, registers, counters, sets, and maps. It is a rare and complex feature for databases. CRDT types allow the developer to ignore all eventual consistency quirks, vector clocks, and handling sibling versions because the conflict resolution is fused into data types, not operations. Of course, data modelling using CRDTs is more challenging than just dropping blobs into the KV store. Full-fledged sequence CRDTs were out of scope too.

With this broad choice of consistency modes and options, it’s hard to call Riak a “simply AP” system.

MapReduce

As with every good SQL and NoSQL database, Riak provides server-side computational primitives.

In past years, we were able to run MapReduce queries with JavaScript functions which are registered to the Riak via HTTP API:

echo 'function(v) {
var parsed_data = JSON.parse(v.values[0].data); var data = {};
data[parsed_data.style] = parsed_data.capacity; return [data];
}' | http PUT http://localhost:10018/riak/my_functions/map_capacity

And then call the MapReduce with some inputs:

echo '{
  "inputs":[["rooms","101"],["rooms","102"],["rooms","103"] ],
  "query":[
    {"map":{
      "language":"javascript",
      "bucket":"my_functions",
      "key":"map_capacity"
		}}
	]
}' | http POST http://localhost:10018/mapred

The function can be inlined in this mapred call. Unfortunately, the JavaScript interface is deprecated and doesn’t work reliably anymore.

You should write your functions in Erlang, and it could be a little tricky. The official documentation on MapReduce is very good but still lacks a complete example. The general approach is similar to Hadoop and HBase ecosystem techniques. You build JARs, push them to the cluster, register with runtime and then execute. Let me show how to do it.

Colouring all the cats

Let’s analyse cat’s colours by their names:

White Stripes
Black Ash
Grey Hound
Small White Dot
Orange Pirate
Snow White

As we know from the internet, the number of cats is enormous. We should do it in a massively parallel fashion!

First, fire up the cluster. Official documentation described the process well enough. Let nodes live locally at $CLUSTERDIR/{dev1,dev2,dev3}. And the Riak libraries needed to build our extension are located at $SDKDIR.

To check cluster, run riak-admin member-status for the cluster membership and riak-admin ring-status for the state of the ring.

riak-admin member-status
================================= Membership ========
Status     Ring    Pending    Node
-----------------------------------------------------
valid      34.4%      --      [email protected]
valid      32.8%      --      [email protected]
valid      32.8%      --      [email protected]
-----------------------------------------------------
Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0
ok

The interesting thing is that each VNode is implemented as an Erlang process, specifically a finite state machine with gen_fsm behaviour. For example, my three-node cluster has 64 partitions and 64 vnodes. Each physical node will get 21 or 22 vnodes (approximately 33% of all partitions). More than that, a lot of operations for node (storage maintenance, handoff) are also implemented as processes. Since they are really lightweight, running such an amount of processes is not a problem.

% riak-admin ring-status
================================== Claimant ========
Claimant:  '[email protected]'
Status:     up
Ring Ready: true

============================== Ownership Handoff ===
No pending changes.

============================== Unreachable Nodes ===
All nodes are up and reachable

ok

Write some code in colorer.erl for loading cats to the database (function load/1), for map phase (colormap/3) and for reduce phase (coloreduce/2).

-module(colorer).

-export([load/1]).
-export([colormap/3]).
-export([coloreduce/2]).
-export([get_color/1]).

%% Determine a cat color from its name
get_color(Name) ->
    NameTokens = string:tokens(Name, " "),
    case lists:member("White", NameTokens) of
        true ->
            "White";
        false ->
            case lists:member("Black", NameTokens) of
                true ->
                    "Black";
                false ->
                    "Mixed"
            end
    end.

%% map phase
colormap(Value, _Keydata, _Arg) ->
    ObjKey = riak_object:key(Value),
    KeyStr = binary_to_list(ObjKey),
    Color = get_color(KeyStr),
    error_logger:info_msg("colorer mapped key ~p to ~p", [KeyStr, Color]),
    [{Color, 1}].

%% map reduce
coloreduce(List, _Arg) ->
    error_logger:info_msg("colorer reduce list ~p", [List]),
    ListOfDicts = [dict:from_list([I]) || I <- List],
    Merged =
        lists:foldl(fun(Item, Acc) -> dict:merge(fun(_, X, Y) -> X + Y end, Item, Acc) end,
                    dict:new(),
                    ListOfDicts),
    % convert dict to list
    dict:to_list(Merged).

%% End of list
insert(_Client, []) ->
    ok;
%% Put a cat from head of list to the Riak
insert(Client, [Head | Tail]) ->
    % All cats are adorable, so it goes to the Riak value as a JSON
    Body = list_to_binary(mochijson2:encode(#{<<"adorable">> => true})),
    % Riak key is a cat name
    Key = Head,
    RawObj = riakc_obj:new(<<"cats">>, Key, Body),
    Obj = riakc_obj:update_content_type(RawObj, "application/json"),
    case riakc_pb_socket:put(Client, Obj) of
        ok ->
            io:format("Inserted key ~p~n", [Head]);
        {error, Error} ->
            io:format("Error inserting key ~p", [Error])
    end,

    insert(Client, Tail).

%% Load cats from text file
load(Filename) ->
    {ok, Client} = riakc_pb_socket:start_link("127.0.0.1", 10017),
    {ok, Data} = file:read_file(Filename),
    Lines = re:split(Data, "\r?\n", [{return, binary}, trim]),
    insert(Client, Lines).

Put cats in a bucket

Cats in bucket

First things first, load all our cats into Riak.

Compile the file with erlc colorer.erl. It produces a BEAM file colorer.beam in the current directory. Then run the loader:

erl -pa $SDKDIR/lib/riakc/ebin \
  $SDKDIR/lib/riak_pb/ebin $SDKDIR/lib/mochiweb/ebin \
  -noshell -eval 'colorer:load("cats.txt"), halt().'

Since we connect to the Riak cluster from the load/1 function by Protobuf protocol, we can start the standalone virtual machine, not the one which Riak uses.

Now check the bucket cats:

http http://localhost:10018/buckets/cats/keys keys==true
HTTP/1.1 200 OK
Content-Type: application/json
Server: MochiWeb/3.0.0 WebMachine/1.11.1 (greased slide to failure)
Vary: Accept-Encoding

{
    "keys": [
        "Snow White",
        "White Stripes",
        "Grey Hound",
        "Small White Dot",
        "Black Ash",
        "Orange Pirate"
    ]
}

Check a random cat:

http http://localhost:10018/buckets/cats/keys/Snow%20White
HTTP/1.1 200 OK
Link: </buckets/cats>; rel="up"
Server: MochiWeb/3.0.0 WebMachine/1.11.1 (greased slide to failure)
Vary: Accept-Encoding
X-Riak-Vclock: a85hYGBgzGDKBVI8R4M2ckekvudgYIyfmMGUyJnHyjDl29a7fFkA

{
    "adorable": true
}

Great, all our cats are in a bucket.

Running MapReduce job

All Riak nodes must be able to load the code colorer.beam. Add the path to this directory to riak/etc/advanced.config for each node and restart the cluster:

[
 {riak_kv,
 [
   {add_paths, ["YOURDIR"]}
 ]},

 {riak_core,
 ...
 }.

Each time you recompile the file, the cluster must reload the BEAM file. Since Erlang uses hot reloading, it could be achieved without restarting the cluster!

erlc colorer.erl && for N in 1 2 3 ; do ${CLUSTERDIR}/dev$N/riak/bin/riak-admin erl-reload ; done

Now we can ask Riak to launch a job like it was in the JavaScript example.

echo '{"inputs":"cats", "query":[
  {"map":{"language":"erlang","module":"colorer","function":"colormap"}},
  {"reduce":{"language":"erlang","module":"colorer","function":"coloreduce"}}]}' | http POST localhost:10018/mapred

And we got the answer

{
    "Black": 1,
    "Mixed": 2,
    "White": 3
}

So we just run a distributed colorer job on our cluster!

Let’s explore what is going on inside. Actually, you can test map and reduce functions from REPL with manually specified inputs. But for actual remote invocations let’s log inputs

error_logger:info_msg("colorer mapped key ~p to ~p", [KeyStr, Color]).

Output can be seen in $CLUSTER_DIR/dev*/riak/log/console.log: colorer mapped key "Black Ash" to "Black". So the mapper produces a list of tuples with the cat’s colour. We produce a string with a colour name (actually it is an Erlang binary) for each cat and initial count as one. Value is not analysed for simplicity but can be easily extracted with riak_object:get_value(Value) and converted to dict using mochijson2.

Then all the outputs of the map phase are passed to reduce phase (remember, we specified the function and module name in HTTP invocation). The coloreduce/2 gets the list of all tuples: colorer reduce list [{"Black",1},{"Mixed",1},{"White",1},{"White",1},{"White",1},{"Mixed",1}]. Then it converts them to dicts and folds in a resulting dict:

[{"Black",1},{"Mixed",2},{"White",3}]

So the function returns a list of dicts. Riak produces the MapReduce output as JSON. Outputs and inputs between stages are still binary-serialized Erlang objects.

It isn’t difficult at all.

Bitcask

Riak has pluggable backend architecture. The default storage engine is Bitcask. Another widely-used backend is a customised version of LevelDB which is an LSM-based (log-structured merge tree) key-value store. The main reason to prefer LevelDB to Bitcask is having a lot of keys that could not fit into the node’s memory because Bitcask requires the entire keyspace to be in memory.

A lot of literature is dedicated to LSM engines. I would like to describe Bitcask architecture a bit (pun intended).

This famous storage engine is described in a really short paper that is worth reading. The Bitcask is a key-value store. The original design covers on-disk format, data flow and access protocols and the Erlang application. Some parts could remind the LSM but it is a much simpler engine.

A database node handles a directory of data files exclusively. The directory has multiple datafiles, and the most recent one is called an active file. A node can only append to the active file. All other files are read-only. So all the writes are sequential and therefore blazing fast. When the file has grown over the limit, it is closed and another one is created as an active file.

The structure of key/value entry is simple. It is a variable-sized record with sizes of key and value specified in a header. The whole record is checksummed.

0           4           8            16           24
+-----------+-----------+------------+------------+--------+--------+
|   CRC     | TIMESTAMP | KEY_SIZE   | VALUE_SIZE |  KEY   | VALUE  |
+-----------+-----------+------------+------------+--------+--------+

Bitcask requires maintaining an in-memory structure called “keydir”. It stores a mapping from each key to a location information: file path, offset in a file to the corresponding entry, and the most recent timestamp.

Reads are simple. Node checks keydir in a constant time. Then it opens a file (if not opened yet) where the record is stored and seeks to the specified offset. Since size information is stored in an entry header, the node reads value_size bytes containing a value. The total cost of a read operation is a single random disk seek.

To perform a write, node just appends a new entry to the active file and updates information in keydir. If the key is already present, its previous apperance in data files became orphaned (no links to them). The cost of a write operation is a sequential write to disk without any seeks.

To prune orphaned records, Bitcask uses merging. It is much simpler than LSM merges. Bitcask knows all the keys because they are stored in keydir. It needs to iterate over all directory files and get only the latest entry for each key. Then a new file is written with only the latest entries for each key. Old files can be removed. Merge is completed.

The major advantage and both disadvantage of Bitcask is the keydir. It must store all the keys. If the keyspace is larger than available memory, it makes no sense to store keys in a hybrid manner (in memory and disk), and an LSM-based engine should be preferred. Also, when a node is started, it must populate a keydir with all keys from all files (not only an active file). It can be optimised by using hint files.

The interesting thing in Bitcask is the lack of a commit log or WAL. Data files are the log itself. Concurrency is not described in the paper and is addressed only in the implementation.

My implementation

Speaking of implementation, I stumbled upon a really fun project CaskDB. It is a skeleton for designing your own Bitcask store in Python. A lot of tests are already written.

I have written an extremely simple storage engine. Concurrency and network protocol are out of scope. Surprisingly, it was a really fun project to code Bitcask in modern Python for a few evenings. If by any chance you are interested, it is on my GitHub. I made a few additions compared to the original task, primarily in tests. I have used property-based testing with Hypothesis which helped to pinpoint hard-to-find bugs. I think it is an essential technique for data engines. Other useful techniques that should be used are fuzzing and secure coding with invariants. Of course, a whole class of problems can be avoided by using more safe languages with static typing like Rust, but it will be another project. Stay tuned!

Outcome

Implementing Bitcask storage engine helps to understand why the engine works, where it shines and what the downsides are. Definitely worth your Netflix weekend:)

The Riak is a little outdated, and 15 years seems like an eternity in the database world. But it is a pure elixir (pun intended) of design approaches for leaderless replication, tunable consistency, causal contexts, CRDTs, effective storage engines and another important concept. Also, it is a beautiful example of a well-architected OTP application. Long live Riak!