We are a leading group in life services, mainly covering on-demand delivery services, bike-sharing, and online supermarkets. After more than a year of data system construction in the online supermarket, we gradually migrated the multi-dimensional data analysis that requires real-time interactive query and real-time query from ClickHouse to StarRocks. StarRocks has given us a good experience in terms of stability and real-time. In this article, we will introduce our practice of StarRocks in funnel analysis.
Introduction
At present, the funnel analysis dashboards on our data portal are scattered. Each dashboard usually only supports one funnel, making it hard to analyze data from a global perspective. Also, the dashboard cannot support flexible analysis such as customized funnel steps, drill-down dismantling, etc. Therefore, we need a funnel analysis tool that supports:
- Ingestion and analysis of comprehensive traffic data
- Flexible filtering dimensions
- Flexible funnel step selection
- Locating the lost and retention people
Comparison of various OLAP tools
The traffic log and behavior log of the e-commerce scenario are generally much larger than the data volume in the traditional scenarios. Therefore, funnel analysis in this context brings us two major challenges:
Daily data increase could reach tens of millions, requiring the OLAP tool to support flexible dimension selection and rapid multidimensional analysis of hundreds of millions of data volumes.
High timeliness requirements require the OLAP tool to accurately select the target users from billions of data volumes in real-time.
Both StarRocks and ClickHouse are widely used in our company, while StarRocks is easier to use and maintain. Detailed comparisons are as follows.
Product | ClickHouse | StarRocks |
Scenario | Flat Table Analysis | Flat Table and Multi-table Join |
Restrictions |
|
|
After comparisons and stress testing, we finally decided to use StarRocks in funnel scenarios. StarRocks has visble advantages over ClickHouse in terms of SQL monitoring and maintenance. We create a variety of materialized views to meet different query scenarios and improve the speed of multidimensional data analysis.
System Architecture
The responsibilities of each layer of the system are described as follows:
1. Data Source: mainly web-side and client-side event-tracking logs, which are continuously uploaded to data access layer
2. Data Access Layer:
Data Hub: The data hub provides access to multiple data sources. Datahub receives, verifies, and pushes log data to Apache Kafka after a simple ETL process.
Apache Kafka Cluster: It is the middle stage between Data Hub and Data Computing Layer. The Kafka cluster decouples data hub and data computing layer by flow control, releasing the pressure on computing layer and storage systems caused by excessive volume of log data during peak periods.
3. Data Computing and Storage Layer:
Data Computing Layer: After the data is visible in the Apache Kafka cluster, we use Apache Flink or Apache Spark™ to perform real-time or offline ETL based on different scenarios. Then, data will be ingested to StarRocks in batches.
StarRocks: Data in Apache Spark and Apache Flink is ingested by StarRocks as a stream. We can create tables of different data models, and materialized views in StarRocks according to different business scenarios.
4. Data Service Layer: It contains the internal indicator definition model and calculation logic, providing a unified offline and real-time query service for users.
5. Funnel Analysis System: It supports flexible funnels creation and edition; it supports funnel data viewing and duplicate data export.
6. Data Center: Centering on big data production and usage scenarios, it provides basic services such as metadata management, data maps, and job scheduling to improve data production and usage efficiency.
Specific Design
Currently, StarRocks' bitmap type can only accept integer values as input. Because the user_id of our original table is alphanumeric and cannot be directly converted to integers, the current user_id needs to be converted to a global Unique numeric ID to support bitmap calculations. Based on Apache Spark and Apache Hive™, we built a global dictionary that maps the original user ID and the encoded integer user ID one-to-one. The global dictionary itself is a Apache Hive table with two columns, one is the original value and the other is the encoded int value. The following is the construction process of the global dictionary:
1. Remove the duplicate values of the dictionary columns of the original table to generate a temporary table.
Temporary table definition:
create table 'temp_table'{
'user_id' string COMMENT 'User ID after the duplicate values of original table have been removed'
}
Remove the duplicate values of the dictionary columns to generate a temporary table.
insert overwrite table temp_table select user_id from fact_log_user_hive_table group by user_id
2. Perform a left join between the temporary table and the global dictionary. Find the new value, encode and insert it into the global dictionary:
Global dictionary table definition:
create table 'global_dict_by_userid_hive_table'{
'user_id' string COMMENT 'Original user ID',
'new_user_id' int COMMENT 'Integer user ID after encoding'
}
Associate the temporary table with the dictionary table. The users that do not match are new users, and needed to be assigned a new global ID and appended to the global dictionary table. The way to generate the global ID is to use the current largest user ID in the history table plus the row number of the newly added user:
insert overwrite global_dict_by_userid_hive_table
select user_id, new_user_id from global_dict_by_userid_hive_table
union all select t1.user_id,
(row_number() over(order by t1.user_id) + t2.max_id) as new_user_id
from
(
select user_id from temp_table
where user_id is not null
) t1
left join
(
select user_id, new_user_id, (max(new_user_id) over()) as max_id from
global_dict_by_userid_hive_table
) t2
on
t1.user_id = t2.user_id
where t2.newuser_id is null
3. Perform a left join between the original table and the updated global dictionary. Insert the new user ID and the encoded integer user ID into the original table:
insert overwrite fact_log_user_hive_table
select
a.user_id,
b.new_user_id
from
fact_log_user_hive_table a left join global_dict_by_userid_hive_table b
on a.user_id=b.user_id
4. Create a Apache Spark job to synchronize the original Apache Hive table to StarRocks.
CREATE TABLE `fact_log_user_starrocks_table` (
`new_user_id` bigint(20) NULL COMMENT "Int user id",
`user_id` varchar(65533) NULL COMMENT "user id",
`event_source` varchar(65533) NULL COMMENT "source",
`is_new` varchar(65533) NULL COMMENT "new user",
`identity` varchar(65533) NULL COMMENT "user identity",
`city_id` varchar(65533) NULL COMMENT "city ID",
`city_name` varchar(65533) NULL COMMENT "city",
`dt` date NULL COMMENT "d",
`period_type` varchar(65533) NULL DEFAULT "daily" COMMENT ""
) ENGINE=OLAP
DUPLICATE KEY(`index_id`, `user_id`, `biz_channel_name`, `pro_id`, `city_id`)
PARTITION BY RANGE(`dt`)(
PARTITION p20210731 VALUES [('2021-07-31'), ('2021-08-01')),
PARTITION p20210801 VALUES [('2021-08-01'), ('2021-08-02')),
PARTITION p20210802 VALUES [('2021-08-02'), ('2021-08-03')),
PARTITION p20210803 VALUES [('2021-08-03'), ('2021-08-04')),
PARTITION p20210804 VALUES [('2021-08-04'), ('2021-08-05')),
PARTITION p20210805 VALUES [('2021-08-05'), ('2021-08-06')),
PARTITION p20210806 VALUES [('2021-08-06'), ('2021-08-07')),
PARTITION p20210807 VALUES [('2021-08-07'), ('2021-08-08')),
PARTITION p20210808 VALUES [('2021-08-08'), ('2021-08-09')))
DISTRIBUTED BY HASH(`index_id`, `user_id`) BUCKETS 10
PROPERTIES (
"replication_num" = "3",
"dynamic_partition.enable" = "true",
"dynamic_partition.time_unit" = "DAY",
"dynamic_partition.time_zone" = "Asia/Shanghai",
"dynamic_partition.start" = "-2147483648",
"dynamic_partition.end" = "1",
"dynamic_partition.prefix" = "p",
"dynamic_partition.replication_num" = "-1",
"dynamic_partition.buckets" = "3",
"in_memory" = "false",
"storage_format" = "DEFAULT"
);
We use the duplicate key model in order to query detailed data. Also, we create corresponding materialized views according to different multi-dimensional funnel analysis scenarios. In this way, users are able to get distinct records on different dimensions at each step of the funnel model.
5. Create a "bitmap_union materialized view" to speed up query and counting process of "count(distinct)":
For example, we want to check the conversion of users in certain cities on the funnel model. The query is as follows:
select city_id, count(distinct new_user_id) as countDistinctByID from fact_log_user_starrocks_table where `dt` >= '2021-08-01' AND `dt` <= '2021-08-07' AND `city_id` in (11, 12, 13) group by city_id
For scenarios of finding the exact number of users by city, we can create a materialized view with "bitmap_union" on the "fact_log_user_starrocks_table" to precompute the counts of distinct values. StarRocks will automatically route the original query to the materialized view when querying, improving the query performance.
The materialized view created for this case is as follows.
create materialized view city_user_count as select city_id, bitmap_union(to_bitmap(new_user_id)) from fact_log_user_starrocks_table group by city_id;
In StarRocks, the result of count(distinct) aggregation is exactly the same as the result of bitmap_union_count aggregation. And bitmap_union_count is equal to the count result of bitmap_union. So if count(distinct) is involved in the query, the query can be sped up by creating a materialized view with bitmap_union aggregation. Because "new_user_id" itself is in type of INT, it needs to be converted into bitmap by "to_bitmap" to implement "bitmap_union".
In this way, we update the global dictionary by running Apache Spark offline synchronization job every morning, and replace the Value column in the original table. At the same time, we configure the baseline and data quality alarms for the Apache Spark job to ensure the data accuracy. Thereby marketers can see daily how users were converted the previous day so they can adjust their strategies in time.
Final Results and Benefits
Through the joint efforts of products and R&D staff, we optimized the distinct counting from these three dimensions: the number of cities that need to be queried, time span, and data volume. The distinct counting for 150 city IDs with a data volume of 100 million levels takes within 3 seconds.
Future Plan
1.Enrich StarRocks tools; integrate StarRocks with our big data scheduling platform and data development platform.
2. Strengthen StarRocks to unify batch and streaming analytics. Since StarRocks provides a wealth of data models, we can build an analytical system to unify batch and streaming analytics based on the primary key model, duplicate key model and materialized view. The plan is in process now.
3. Based on the StarRocks capabilities on Elasticsearch, we expect to build a unified OLAP layer of heterogeneous data sources.
In the future, we will continue to follow StarRocks and continue to upgrade and iterate internally. We expect StarRocks to provide richer functions and a more open ecosystem. StarRocks will also be used as an important component of our unified OLAP platform.
Apache®, Apache Spark™,Apache Flink®,Apache Kafka®,Apache Hadoop®,Apache Hive™ and their logos are either registered trademarks or trademarks of the Apache Software Foundation in the United States and/or other countries.
Join StarRocks Community on Slack
Connect on Slack