How to Optimize Pagination for Distributed Data While Maintaining Ordering

Premkumar Thangamani

Range sharded indexes in a database maintain data in a sorted order. This ordering makes it easy to serve paginated query results where each subsequent query returns the next page of results. In the recent blog, “How to Avoid Hotspots on Range-based Indexes”, we walked through the main pitfall of range-sharded data in a distributed database like YugabyteDB—hotspots. We suggested an application-level sharding layout as a possible solution to avoid hotspots. With that layout in mind, where order is maintained within each shard, let’s discuss how to design a query to return data with pagination while maintaining the global ordering.

Scenario

Let’s look at an events table to illustrate our scenario. This table has the details of various events associated with each user. We have to display the events associated with a user in a UI showing the most recent ones first (for example, 10 of the most recent events on the first page). We will use pagination links to view the next set of 10 and so on.We design the table (based on what we learned in the Avoid Hotspot blog) To avoid hotspots we create multiple application-level shards (there are four in the example below but feel free to play with higher values) represented by shard_id which is auto-populated using the DEFAULT clause as shown below.

CREATE TABLE events (
   user_id integer,
   shard_id smallint DEFAULT (random()*10000)::int % 4,
   event_ts timestamp DEFAULT CURRENT_TIMESTAMP,
   details text,
   PRIMARY KEY((user_id, shard_id) HASH, event_ts DESC)
);

Generate the data

Let’s insert a million rows into the table for user_id=1

INSERT INTO events(user_id, event_ts, details)
 SELECT 1, now() - make_interval(mins=>1000000-n, secs=>EXTRACT (SECONDS FROM now())), 'details-' || n FROM generate_series(1, 1000000) n;

Sample data:
 user_id | shard_id |      event_ts       |  details
---------+----------+---------------------+------------
       1 |        3 | 2020-12-29 11:52:00 | details-1
       1 |        2 | 2020-12-29 11:53:00 | details-2
       1 |        3 | 2020-12-29 11:54:00 | details-3
       1 |        2 | 2020-12-29 11:55:00 | details-4
       1 |        0 | 2020-12-29 11:56:00 | details-5
       1 |        0 | 2020-12-29 11:57:00 | details-6
       1 |        0 | 2020-12-29 11:58:00 | details-7
       1 |        1 | 2020-12-29 11:59:00 | details-8
       1 |        3 | 2020-12-29 12:00:00 | details-9
       1 |        0 | 2020-12-29 12:01:00 | details-10

From internal metrics,* we can see that the data is distributed across multiple tablets.

inserts | table   tabletid 
---------+------------------------
 312359  | events  aea29aa7a1e7...
 312889  | events  db15af0488c8...
 374752  | events  e2c6fdf80277...

Retrieve Recent Events

To retrieve the most recent events for a user, we have to start with the highest event_ts for that user. Then, for the next set of events, we need to re-issue the query with the minimum event_ts from the previous result set and so on. There are a few ways to do this. Let’s explore each of them and identify which is best suited for us.

NOTE: To get the first page of results, we’ll try to find events where event_ts is less than some very high timestamp (luckily, in PostgreSQL the constant ‘infinity’ works for our purposes) First, we can retrieve the 10 results less than the ‘infinity’,  then for the next query, we can pick the lowest event_ts from the previous result and get the next set of 10, and so on. Let us try to design the first query.

Method #1 to Design a Pagination Query

Let’s do a naive retrieval of the events for a specific user.

explain analyze SELECT event_ts, details FROM events WHERE user_id = 1 and event_ts < 'infinity' ORDER BY event_ts DESC limit 10;
                          QUERY PLAN
----------------------------------------------------------------------------------------
 Limit (actual time=1689.147..1689.152 rows=10 loops=1)
   Output: event_ts, details
   ->  Sort (actual time=1689.146..1689.147 rows=10 loops=1)
         Output: event_ts, details
         Sort Key: events.event_ts DESC
         Sort Method: top-N heapsort  Memory: 25kB
         ->  Seq Scan on public.events (actual time=2.149..1547.710 rows=1000000 loops=1)
               Output: event_ts, details
               Filter: ((events.event_ts < 'infinity'::timestamp) AND (events.user_id = 1))
 Planning Time: 0.072 ms
 Execution Time: 1689.200 ms
 Peak Memory Usage: 40 kB
(12 rows)

Time: 1858.273 ms (00:01.858)

If you can see above, this did not even use the primary key. Hence it was a full table scan, and all million rows were read. Could we do better?

Method #2 to Design a Pagination Query

Lets try to help the optimizer by explicitly specifying the shard_ids.

explain analyze SELECT event_ts, details FROM events WHERE user_id = 1 and shard_id IN (0,1,2,3) and event_ts < 'infinity' ORDER BY event_ts DESC limit 10;
                          QUERY PLAN
--------------------------------------------------------------------------------------
 Limit (actual time=851.442..851.446 rows=10 loops=1)
   Output: event_ts, details
   ->  Sort (actual time=851.440..851.442 rows=10 loops=1)
         Output: event_ts, details
         Sort Key: events.event_ts DESC
         Sort Method: top-N heapsort  Memory: 25kB
         ->  Index Scan using events_pkey on public.events (actual time=14.858..722.986 rows=1000000 loops=1)
               Output: event_ts, details
               Index Cond: ((events.user_id = 1) AND (events.shard_id = ANY ('{0,1,2,3}'::integer[])) AND (events.event_ts < 'infinity'::timestamp))
 Planning Time: 0.087 ms
 Execution Time: 851.490 ms
 Peak Memory Usage: 56 kB
(12 rows)

Here, the index was used, but a million rows were read. We just need the 10 rows read. There should be a better way.

Method #3 to Design a Pagination Query

Let’s break down the query in Method #2 to help the optimizer get to the right shard and take advantage of the fact that the data within each shard is already ordered by event_ts. This way we can avoid scanning all the rows associated with a specific user just to retrieve 10 rows.

Let’s try to get the results for a single shard first.

explain analyze SELECT event_ts, details FROM events WHERE user_id = 1 and shard_id = 0 and event_ts < 'infinity' ORDER BY event_ts DESC limit 10;
                                          QUERY PLAN
--------------------------------------------------------------------------------------- Limit (actual time=0.506..0.513 rows=10 loops=1)
   Output: event_ts, details
   ->  Index Scan using events_pkey on public.events (actual time=0.505..0.510 rows=10 loops=1)
         Output: event_ts, details
         Index Cond: ((events.user_id = 1) AND (events.shard_id = 0) AND (events.event_ts < 'infinity'::timestamp))
 Planning Time: 0.076 ms
 Execution Time: 0.538 ms
 Peak Memory Usage: 8 kB
(8 rows)

Time: 62.810 ms

We are getting closer to the result we want. Ten rows are returned in DESC order for a specific shard_id. Now let’s try to execute this on all the shards, combine them via a UNION ALL, and limit the number of results to 10.

We chose UNION ALL here instead of  UNION because UNION ensures there are no duplicates in the result set. Since we already know that there are no duplicates, we can avoid the deduplication overhead.

SELECT * FROM
 (
      (SELECT event_ts, details FROM events WHERE user_id = 1 and shard_id = 0 and event_ts < 'infinity' ORDER BY event_ts DESC)
     UNION ALL
      (SELECT event_ts, details FROM events WHERE user_id = 1 and shard_id = 1 and event_ts < 'infinity' ORDER BY event_ts DESC)
     UNION ALL
      (SELECT event_ts, details FROM events WHERE user_id = 1 and shard_id = 2 and event_ts < 'infinity' ORDER BY event_ts DESC)
     UNION ALL
      (SELECT event_ts, details FROM events WHERE user_id = 1 and shard_id = 3 and event_ts < 'infinity' ORDER BY event_ts DESC)
 ) AS result ORDER BY result.event_ts DESC LIMIT 10;

The query executor would now be able to look up all the shards independent of each other.

                                  QUERY PLAN
---------------------------------------------------------------------------------------
 Limit (actual time=1.450..1.460 rows=10 loops=1)
   Output: events.event_ts, events.details
   ->  Merge Append (actual time=1.449..1.456 rows=10 loops=1)
         Sort Key: events.event_ts DESC
         ->  Index Scan using events_pkey on public.events (actual time=0.509..0.509 rows=1 loops=1)
               Output: events.event_ts, events.details
               Index Cond: ((events.user_id = 1) AND (events.shard_id = 0) AND (events.event_ts < 'infinity'::timestamp))
         ->  Index Scan using events_pkey on public.events events_1 (actual time=0.200..0.202 rows=5 loops=1)
               Output: events_1.event_ts, events_1.details
               Index Cond: ((events_1.user_id = 1) AND (events_1.shard_id = 1) AND (events_1.event_ts < 'infinity'::timestamp))
         ->  Index Scan using events_pkey on public.events events_2 (actual time=0.281..0.283 rows=4 loops=1)
               Output: events_2.event_ts, events_2.details
               Index Cond: ((events_2.user_id = 1) AND (events_2.shard_id = 2) AND (events_2.event_ts < 'infinity'::timestamp))
         ->  Index Scan using events_pkey on public.events events_3 (actual time=0.245..0.245 rows=1 loops=1)
               Output: events_3.event_ts, events_3.details
               Index Cond: ((events_3.user_id = 1) AND (events_3.shard_id = 3) AND (events_3.event_ts < 'infinity'::timestamp))
 Planning Time: 0.372 ms
 Execution Time: 1.511 ms
 Peak Memory Usage: 56 kB 

The result would be:

      event_ts       |     details
---------------------+-----------------
 2022-11-22 18:56:00 | details-1000000
 2022-11-22 18:55:00 | details-999999
 2022-11-22 18:54:00 | details-999998
 2022-11-22 18:53:00 | details-999997
 2022-11-22 18:52:00 | details-999996
 2022-11-22 18:51:00 | details-999995
 2022-11-22 18:50:00 | details-999994
 2022-11-22 18:49:00 | details-999993
 2022-11-22 18:48:00 | details-999992
 2022-11-22 18:47:00 | details-999991

The Merge Append operator efficiently fetched different numbers of rows from the four shards adding up to 10. The final result is also sorted. Moreover we processed exactly 10 rows, which is exactly what we need. Within each shard the data is already sorted and all that remains is the K-way merge of these sorted streams. Because YugabyteDB’s query layer is based on PostgreSQL’s query layer code, the natively supported Merge Append operator does the k-way merge for us. If this was a NoSQL API like YCQL, then it would have been the application’s responsibility to fetch K-streams and merge them in the application code.

To get the next set of 10 results, replace ‘infinity‘ with 2022-11-22 18:47:00 in the above query, which is the minimum event_ts in that list, etc. To make things easier, define a prepared statement (or a stored procedure) like this.

/***
 *  get_paginated_result (user_id, prev_min_event_ts, num_rows)
 */

PREPARE get_paginated_result(int, timestamp, int) AS SELECT * FROM
  (
       (SELECT event_ts, details FROM events WHERE user_id = $1 and shard_id=0 and event_ts < $2 ORDER BY event_ts DESC)
      UNION ALL
       (SELECT event_ts, details FROM events WHERE user_id = $1 and shard_id=1 and event_ts < $2 ORDER BY event_ts DESC)
      UNION ALL
       (SELECT event_ts, details FROM events WHERE user_id = $1 and shard_id=2 and event_ts < $2 ORDER BY event_ts DESC)
      UNION ALL
       (SELECT event_ts, details FROM events WHERE user_id = $1 and shard_id=3 and event_ts < $2 ORDER BY event_ts DESC)
  ) AS result ORDER BY result.event_ts DESC LIMIT $3;
  
  # calls
  execute get_paginated_result(1,'infinity',            10);
  execute get_paginated_result(1,'2022-11-22 18:47:00', 10);
  execute get_paginated_result(1,'2022-11-22 18:37:00', 10);

Conclusion

We retrieved only the rows we needed from each shard, performed an efficient k-way merge (using the Merge Append operator) on the already sorted streams while preserving the DESC ordering of the event_ts. We were able to do this by using just the PRIMARY KEY of the table without any additional index. This is the suggested solution for getting paginated results from a table with range-sharded data while maintaining the ordering.

Note:

[ * ] Reported metrics were fetched from YugabyteDB cluster and processed using Franck Pachot’s ybwr.sql script.

Premkumar Thangamani

Related Posts

Explore Distributed SQL and YugabyteDB in Depth

Discover the future of data management.
Learn at Yugabyte University
Get Started
Browse Yugabyte Docs
Explore docs
PostgreSQL For Cloud Native World
Read for Free