Thursday, November 7, 2019

Revisiting Kudu

I read the Kudu paper many years ago. It is worth reading but I never published my review so I re-read the paper this week. My review is limited to the paper and I ignore improvements since then. My focus is on storage so I also ignore most of the distributed system features.

I don't have access to my old review of the paper when I read it years ago. I might have considered it to be an LSM variant. Now I consider it to be an example of an index+log index structure.

Kudu is a scale-out index structure for analytics engines like Impala. The goals for Kudu are fast columnar scans, low latency updates and low performance variance. I assume that Kudu satisfied those goals. The features include:
  • Data is stored in tables and a table has a fixed schema. I am curious about the demand for flexible schemas.
  • It is primary key only. It uses Raft to make sure replicas stay in sync. If global secondary indexes were supported then something like XA across shards would also be needed. That is much harder, but feasible -- see Spanner, CockroachDB and Yugabyte.
  • Supports add/drop column but PK columns can't be dropped. I am not sure whether the PK can be changed.
  • Supports insert, update, delete and range query. It might consider a point query to be a simple range scan. Update and delete must fully specify the PK values for the rows to be changed.
  • Supports range and hash partitioning but that paper section confused me
  • Supports single-row transactions. I assume a batch can be submitted but I am not sure how per-row outcomes are reported to a client.
  • Compaction uses cost-based decisions to merge data where the objective function is to improve read efficiency. I am a big fan of that.

Index+log

I think this is an example of an index+log index structure where the DiskRowSets are log segments. GC is clever and there are two parts to it. First, there is compaction between DeltaMemStore and DiskRowSet. This removes deleted rows and merges long update chains. Second, there is compaction between DiskRowSets. This reduces the number of places that must be checked for a given key range.

In the standard index+log approach only the first type of compaction is done for a log segment. That usually requires all of the index to be in memory because each row from a log segment needs an index probe to determine whether the row is live. For Kudu a search of the DeltaFile determines liveness. It is not clear to me whether DeltaMemFiles are clustered per DiskRowSet to reduce the amount of data that should be in memory when such compaction is done.

I briefly cover the second type of compaction at the end of my blog post. The benefit from merging log segments with index+log is larger sorted runs. Write-optimized index structures impose a CPU and IO read efficiency penalty. Merging log segments like this reduces that penalty.

Storage

Tables are horizontally partitioned into tablets and each tablet consists of a MemRowSet, many DiskRowSets, a DeltaMemStore and many DeltaFiles. Inserts are written into a MemRowSet, when full a MemRowSet is flushed to disk creating a DiskRowSet. A DiskRowSet is limited to ~32MB. Only inserts are directly written into DiskRowSets. Because there is a PK check on insert a row will exist in at most one DiskRowSet.

Deletes and updates are first written into a DeltaMemStore and when full that is flushed to disk to create a DeltaFile. While only inserts are directly written to a DiskRowSet. Eventually a DeltaMemStore will be merged with a DiskRowSet so the effect of update and delete eventually reach a DiskRowSet.

Storage is columnar for the rows in a DiskRowSet. Encoding takes advantage of data types to use less space and block compression is optional. This output is written to a sequence of pages for each column. Then there is a B-Tree index that maps ID to page where ID is the ordinal offset of the row within the DiskRowSet (when a DiskRowSet has 1M rows then the ID is from 1 to 1M assuming it starts at 1). Although I am not sure if that index has an entry per page or per row.

There is also a PK index column that has the encoded PK values (encoded because the key is a string even when the PK has more than one column). It wasn't clear to me whether there was an index built on that or if binary search was used. I assume the value in this case is the ID (ordinal offset) for the row. A paged bloom filter is created for the PK index column.

Range scans will start by searching the PK index column to determine the row IDs that must be retrieved and then for each column that must be returned search the per-column indexes using those IDs.

3 comments:

  1. It was nice to meet you at HPTS!

    To answer some of your questions:
    - In Kudu, primary keys are immutable. This simplifies a number of things, not least of which is major delta compaction (compacting DeltaFiles and a DeltaMemStore into a DiskRowSet's base data): it can be restricted to only those columns whose values have actually changed. If PKs were mutable, MDC would need to rewrite all columns if the PKs had in fact been changed.
    - Per-row errors are collected server-side and reported back to the client via the error collector API.
    - Not exactly sure what you mean when you ask "It is not clear to me whether DeltaMemFiles are clustered per DiskRowSet to reduce the amount of data that should be in memory when such compaction is done." A major delta compaction takes as input a DeltaMemStore, N REDO DeltaFiles, the base data, and M historical UNDO DeltaFiles, and will produce an (empty) DeltaMemStore, 1 REDO DeltaFile (if there are any deleted rows), new base data, and 1 UNDO DeltaFile (containing all other mutations). By that measure, memory usage has been reduced as much as it could possibly be.
    - The ordinal index found in each column file in a DiskRowSet contains one entry per page, not one per row. When seeking we first use the index to seek to the appropriate page, then consult the page decoder to seek to the appropriate row (as the specific encoding affects the placement of the row within the page).
    - There's a value index in the encoded PK "column" file. This enables accelerated seeking based on encoded PK values (rather than just row ordinals).

    ReplyDelete
    Replies
    1. Thank you, but clustering of the DeltaFiles I was asking whether a given DeltaFile has data for 1 or more than one DiskRowSets.

      Delete
  2. Gotcha. No, each DeltaFile stores deltas for rows belonging to just one DiskRowSet, and that optimizes memory usage during compaction in the manner you described.

    ReplyDelete