How to Avoid Hotspots on Range-based Indexes in Distributed Databases

Premkumar Thangamani

There’s a basic assumption behind distributed databases—the data should be distributed across all nodes. This is true. But some data model choices create scenarios where the data grows in one node  before moving to another node. This causes one node to become a hotspot for reads and writes. This article explains how to avoid that.

Problem

Let’s walk through a simple scenario to illustrate the problem we might face. Say that you want to store a user id and the time the user was created.

CREATE TABLE user_table (
    user_id int,
    create_ts timestamp,
    PRIMARY KEY (user_id)
);

Three normal query scenarios might be:

  1. For a user_id, determine when the user was created
    select create_ts from user_table where user_id = 10
  2. Find the user_id for all users created in a given time range.
    select user_id from user_table where creat_ts > now() - make_interval(hours=>50)
  3.  Add a new user into the table.
    insert into user_table values(1, now())

For the first query, making user_id the primary key is sufficient. Doing so ensures the data is distributed across multiple tablets based on the HASH of the user_id.

For the second query, we would need an ASC (or DESC) index so that a range query would be efficient.

For the third query, it would be better to distribute the writes across multiple tablets rather than just one.

Possible Solution #1 to Avoid Hotspots

  1. Create an ASC index on the create timestamp.
    CREATE INDEX index1 ON user_table (create_ts ASC);
  2. Add 100 rows into the table:
    insert into user_table (select id, now() - make_interval(hours=>id) as ts from generate_series(1,100) as id);

    A look at the internal metrics*

     inserts | db   relname    tabletid      leader
    ---------+-------------------------------------
         100 | test index1     4b56b0e2ed1e... L
    
          33 | test user_table 90bd8624f9d5... L
          37 | test user_table 99700912e6b9... L
          30 | test user_table ff14b24352c2... L
    

    As you can see, the 100 rows are distributed across 3 tablets of the user_table. But for the index, the 100 rows went into just one tablet (4b56b0e2ed1e). Therefore, for all further writes this tablet is a hotspot. It grows until it’s manually split or an auto-split condition is reached. Then the newer tablet will, in turn, be a hotspot since the timestamp is constantly increasing.

  3.  Execute the select query:
    test=# explain analyze select user_id from user_table where create_ts > now() - make_interval(hours=>50);
                                     QUERY PLAN
    ------------------------------------------------------------------------------------------
     Index Scan using index1 on public.user_table  (cost=0.00..5.17 rows=10 width=4) (actual time=0.590..0.609 rows=50 loops=1)
       Output: user_id
       Index Cond: (user_table.create_ts > (now() - '50:00:00'::interval))
       Heap Fetches: 0
     Planning Time: 0.068 ms
     Execution Time: 0.638 ms
     Peak Memory Usage: 8 kB

As the rows were sorted the exact 50 rows were correctly fetched from the index (without filtering), but the writes are restricted to just one tablet.

Is there a better way to do this?

Possible Solution #2 to Avoid Hotspots

To avoid the hotspot, we might try to create a HASH index like this:

CREATE INDEX index2 ON user_table (create_ts HASH);

Doing so will distribute the data across different tablets. However, the ordering needed for our range query is lost, because YugabyteDB internally orders the create_ts on the hash of create_ts. Because of this, the ASC nature we want is missed.

explain analyze select user_id from user_table where create_ts > now() - make_interval(hours=>50);
                               QUERY PLAN
-----------------------------------------------------------------------------------
 Seq Scan on public.user_table (actual time=0.488..0.983 rows=50 loops=1)
   Output: user_id
   Filter: (user_table.create_ts > (now() - '50:00:00'::interval))
   Rows Removed by Filter: 50
   Heap Fetches: 0
 Planning Time: 0.108 ms
 Execution Time: 1.014 ms

Here, 50 rows were filtered out. (Rows Removed by Filter: 50), and 50 rows were returned. This is because the index does not ordered on the timestamp, but on the hash of the timestamp. So the query executor had to go through all 100 rows and remove the non-matching 50.  This is inefficient since it’s a full table scan.  Can we avoid this?

The Right Solution to Avoid Distributed Database Hotspots

We want to distribute the index across multiple tablets (or nodes) and at the same time retain the ordering by create_ts. Therefore, it’s better to implement lightweight application-level sharding.

To do this, you need to create an index of the form (shard_id HASH, create_ts ASC) so that the data is distributed based on the hash of the shard_id, and the create_ts is stored along with it in the expected sort order. For example, to distribute the write load across 16 shards, generate the shard_id for each row as a random number (from 0 to 15), so that its hash is reasonably distributed across many tablets. In addition, to help the optimizer on the select query you can specify the shard_ids in the IN clause or as a UNION of multiple selects for which a small set of shard_ids would be beneficial.

We can get the hashes of the shard_ids and check against the tablets info for distribution.

test=# select id, upper(to_hex(yb_hash_code(id))) from generate_series(0,15) as id;
 id | hash
----+-------
  0 | BA22
  1 | 1210
  2 | C0C4
. . .
 14 | CD71
 15 | 4895

So now let’s introduce a shard_id column into the table to force the distribution of the index. We set the DEFAULT expression of shard_id to (random()*10000)::int % 16 so that we don’t need to explicitly do this during inserts. YugabyteDB does this automatically for us. (Note: If you prefer to colocate rows from the same session, you could use pg_backend_pid instead of random).

CREATE TABLE user_table (
    user_id int,
    create_ts timestamp,
    shard_id smallint DEFAULT (random()*10000)::int % 16,
    PRIMARY KEY (user_id)
);

CREATE INDEX index3 ON user_table (shard_id HASH, create_ts ASC);

Now let’s add 100 rows into the table. (Note: we did not specifying shard_id explicitly since it’s automatically set via the DEFAULT expression)

insert into user_table(user_id,create_ts) (select id, now() - make_interval(hours=>id) from generate_series(1,100) as id);

If you look at internal metrics,* you will see that both the table and the index are now distributed across multiple tablets

 inserts | db   relname    tabletid      leader
---------+-------------------------------------
      21 | test index3     578ec6e8892d... L
      26 | test index3     5a6ccd53ede6... L
      53 | test index3     f0bde9a0e480... L

      33 | test user_table 0f9c658cb7ab... L
      30 | test user_table 51799b1a0805... L
      37 | test user_table d77869849ca8... L

We have now removed the hotspot on writes. But one might think that this is similar to Solution #2 above. This is true if you think in terms of the way the writes are distributed. However, it also has the ASC ordering on the create_ts column which will help the query executor on range queries. For querying, we need to modify it a little bit, so that it uses the current layout. We have to specify the range for shard_id via either of these 2 options below. (Option 1 will typically be faster – explanation is out of the scope of this article)

shard_id IN (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15) 
shard_id >= 0 and shard_id < 16

Execute the select query:

explain analyze select user_id from user_table where shard_id IN (0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15) and create_ts > now() - make_interval(hours=>50);

                                QUERY PLAN
---------------------------------------------------------------------
 Index Scan using index3 on public.user_table (actual time=1.135..1.175 rows=50 loops=1)
   Output: user_id
   Index Cond: ((user_table.shard_id = ANY ('{0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15}'::integer[])) 
                AND (user_table.create_ts > (now() - '50:00:00'::interval)))
   Heap Fetches: 0
 Planning Time: 0.551 ms
 Execution Time: 1.217 ms
 Peak Memory Usage: 24 kB

The executor returned the exact rows without filtering because it knew where to stop since create_ts was ordered. This is what we have been looking for.

Conclusion

To avoid the hotspot problem on certain range indexes, we introduce application-level sharding. Doing so removes the hotspot of maintaining one sorted list and distributes it across N sorted lists, shifting the responsibility to the query side to merge the lists. This is the correct way to design scalable applications in a distributed system that uses a distributed SQL database like YugabyteDB.

As a follow-up, in another article, we will explore how we can use this layout to support paginated results e.g., returning 500 rows at a time in descending order of timestamp.

Note:

[ * ] Reported metrics were fetched from YugabyteDB cluster and processed using Franck Pachot’s ybwr.sql script.

Premkumar Thangamani

Related Posts

Explore Distributed SQL and YugabyteDB in Depth

Discover the future of data management.
Learn at Yugabyte University
Get Started
Browse Yugabyte Docs
Explore docs
PostgreSQL For Cloud Native World
Read for Free