Monday, March 16, 2020

Insert Benchmark v3

I expect to replace the insert benchmark later this year. The insert benchmark I have been using might be ibench v2 as ibench v1 came from Tokutek. So the replacement I write would be ibench v3.

The reasons for a replacement include:
  • Switch from Python to something more performant (Java or Go) as it is difficult to sustain high QPS rates (100k / second or more) with Python and a Java/Go client will consume less CPU. I assume the DBMS I care about all have good golang clients (MongoDB, MySQL, Postgres).
  • Make the benchmark less synthetic. Early ibench was insert-only for a table with a PK and 3 secondary indexes. Eventually I added support for short range queries. It has always used uniform distribution. The plan for ibench v3 is to model a monitoring workload -- many inserts, some updates, many queries. It will use a PK and 1 or 2 secondary indexes.
  • Continue to support multiple DBMS -- MongoDB, MySQL and Postgres.
  • As a bonus, don't ignore coordinated omission
I still need to confirm whether this is worth implementing or whether I have a case of NIH and should be using TSBS. I browsed TSBS and the indexes I describe here might match the indexes that TSBS would use. However, TSBS has more query diversity. My focus is on the storage engine and a smaller set of queries is sufficient. Regardless, I need to spend more time reading about TSBS.

Logical Schema

The logical schema describes the data in one insert operation. The physical schema describes how that is stored in the DBMS. I focus on the logical schema here.

The logical schema is: timestamp, deviceID, [metricID, metricValue]+
  • This is the data from one insert for a given device. It has data for 1+ metrics.
  • timestamp is a 64-bit integer that will be mostly increasing on insert. Clocks are not synchronized across clocks, some devices will have bad/stuck clocks and some devices will have clock drift.
  • deviceID is a 64-bit integer. The number of devices will be used to scale the workload. There can be millions of devices for large scale factors.
  • Each (metricID, metricValue) pair represents the value for a metricID from a given device at a given point in time. metricID is a 32-bit integer. The number of metrics will also be used to scale the workload. There will be fewer metrics than devices. metricValue is a 64-bit integer

Workload

Fields are abbreviated when describing the workload: t=timestamp, d=deviceID, m=metricID, v=metricValue. Operations are described in SQL, except for insert.

The workload is a combination of:
  • GetOne: select v, t from C where m=M and d=D and t between T1 and T2 order by t
    • Query variants are: t >= T1 order by t limit N, t <= T1 order by t desc limit N
  • GetAll: select v, t from C where d=D and t between T1 and T2 order by t
    • Query variants are: t >= T1 order by t limit N, t <= T1 order by t desc limit N
    • Like GetOne but missing the equality predicate on m
    • If m isn't known then an extra secondary index on (d,t) might be needed unless the DBMS supports index skip scan via an index on (d,m,t,...).
  • Max, Min: select v, t, d from C where m=M and t between T1 and T2 order by v asc/desc limit N
  • Compare: call GetOne for m in (...) and d=D and t between T1 and T2
  • Insert: insert into C values (t, d, [(m1, v1),(m2, v2),...])
    • SQL above is not standard and assumes a schema that might not be used.
    • (t,d,m) is unique. I am still deciding whether the benchmark client can generate intermittent duplicate inserts and how that should be handled.
  • Rollup: Uses Compare to get all data for d in (...) in time range, then replaces that data with values aggregated over coarser granularity. Assume that base data is rolled up into 5 minute, then 1 hour and then 1 day granularity.
  • Backfill: inserts missing data that might be hours or days old
The value of N in GetOne, GetAll, Max and Min above will vary. For Max and Min it is likely to be <= 100 and frequently <= 10. For GetOne/GetAll in some cases the user wants all data in a time range, but in other cases they will set N to be <= 10,000 and not get all data in the time range.

Open Issues
  • Spammy data - is it worth simulating devices that misbehave? This includes sending data too frequently, sending incorrect values and timestamps and not sending data for some time periods.
  • Is the aggregated data stored inline with the base data or in a separate table? If stored inline do entries need a tag to indicate granularity like base, hourly, daily, etc?
  • Should the benchmark include queries and writes to correct bad data?
  • Can there be a way for the benchmark to run faster than wall clock time so that per-hour and per-day rollup is done more frequently than per real hour and per real day? This avoids the need to run the benchmark for too long.
  • How does a scale factor affect the size of the database, the write load and the query load?

Physical Schema - Write Optimized
The goal here is to make writes fast. That is done by using a unique index for (t,d). Inserts are to the right end of the index assuming they usually arrive in t order. While this makes inserts fast it makes queries too slow. The (t,d) index over-fetches because only the range predicate on t can be used for the index access path:
  • For the GetOne and Compare queries it over-fetches by a factor of |m| * |d|. If there are 1M devices and 10 metrics per insert it fetches 10M times more index entries than needed.
  • For the Max/Min queries it over-fetches by a factor of |m|.
The large over-fetching for GetOne and Compare makes it infeasible to run them with the write optimized schema. The Max and Min queries could still be run -- it would be painful but not infeasible given that |m| is likely to be <= 100.

Implementation notes:
  • MongoDB - There are 3 choices for the unique index. The first choice is to use the subdocument {t, d} as the value for _id and make sure that all clients encode it in the same way. The second choice is to concatenate t and d into a string for the value of _id. This avoids the need to worry about clients encoding as in the first choice at the cost of storing t and d twice per doc. The third choice is to use ObjectId as the value of _id and then have a unique compound index on {t:1, d:1}. The cost for this is an extra index but inserts are in index order for both.
  • SQL with document datatypes - Use one row per insert and document datatype support in MySQL or Postgres to store all of the (m,v) pairs in that row.
  • SQL without document datatypes - Use one row per metric per insert. The unique index is defined on (t,d,m) rather than (t,d). The columns in the row are t, d, m, v.

Physical Schema - Read Optimized

These indexes are important to make queries efficient.
  • GetOne and Compare need one of (d,m,t) or (m,d,t).
  • Max/Min need one of (m,t,v,d) or (m,v,t,d)
From the perspective of one GetOne query there isn’t a difference between the index choices because there are equality predicates on d and m. But from a cache perspective one might be better than the other as they cluster on the first indexed field -- by d or m. If some metrics are less likely to be queried then clustering by m can be a better choice.

The unique constraint is on (d,m,t) or (m,d,t). If v is also in that index then uniqueness cannot be enforced. If v is not in the index then it might not be covering. With Postgres I can use the INCLUDE clause to get v in the index. With InnoDB and MyRocks the table is clustered on the PK, so it is sufficient to make (d,m,t) or (m,d,t) the PK. With MongoDB if I want a covering index then I have to create two indexes -- unique on (m,d,t) and then non-unique but covering with (m,d,t,v) or I can skip the covering index to reduce the write cost but make queries slower. Finally, by unique for MongoDB I mean that this is the value for _id, and then we are back to the question of using a subdocument for _id.

For Max and Min:
  • The (m,t,v,d) index uses index predicates for m and t and then does a sort. So this has an extra cost from the sort and can’t benefit from stopping the index scan once the limit N has been satisfied. Some DBMS have optimizations to make top-N sort fast, but there is no way to avoid the cost of not stopping the index scan early.
  • The (m,v,t,d) index uses index predicates only for m but avoids the sort and can stop the index scan once the limit N has been satisfied. The range predicate on t would be a filter predicate - evaluated for each entry read from the index.
There are cases where (m,t,v,d) is a better choice, but I think that (m,v,t,d) will be better more of the time. The index on (m,t,v,d) would be better when the selectivity from using t as an index predicate offsets the cost of the sort and the data is skewed so that limit N isn’t satisfied until most of the index has been scanned.

Write efficiency in the read optimized schema

Below I use streams of writes to mean the number of leaf pages that will be subject to read-modify-write from index maintenance during inserts at any point in time. If all inserts are to the right end of the index then there is 1 stream. If the index is on (client-ID, time), there are 10 clients active concurrently and each client inserts in time order then there are 10 streams of writes. When there is one leaf page in cache for each stream then IO efficiency is better. Otherwise each insert is more likely to force a page write back followed by a page read.

For the indexes used by GetOne and Compare -- (d,m,t,v) or (m,d,t,v) -- there will be |d|*|m| streams of writes. The cache demand from this is |d|*|m| pages. A small workload might use |d|=1M and |m|=10 and 10M 8kb pages needs 80G of RAM. Both indexes have a similar amount of cache amplification.

For the indexes used by Max and Min, the (m,t,v,d) index has |m| streams of writes vs |m|*|v| for the (m,v,t,d) index. Thus the (m,v,t,d) provides better query performance at the cost of more random IO and more cache amplification. One index has much less cache amplification.

For the non-unique secondary index used for Max and Min, MyRocks can use read-free index maintenance and the number of write streams isn't an issue for it. But it will be an issue for other DBMS.

Partitioning

Partitioning the tables and collections by time can help with write and read performance as well as making other tasks easier -- removing and rolling up old data. For example there could be a partition per hour for the last N hours. However if a DBMS doesn’t support partitions then there is a burden on the application developer to support cross-partition queries. As a user I expect to run queries across hourly boundaries.

No comments:

Post a Comment