Flywheel Technology Case.
This project case was submitted by Flywheel Technology and participated in the "Data Ape Annual Golden Ape Planning Activity - 2023 Big Data Industry Annual Innovative Service Enterprise List Award".
UnionPay Commerce is a large-scale non-bank payment institution in China, providing comprehensive payment services based on bank card acquiring and online payment, as well as diversified and professional merchant value-added and technological innovation services, and has always been committed to building an inclusive, convenient, efficient and safe payment environment for merchants, partners and consumers. As of December 2023, UnionPay Commerce has served more than 25 million merchants including large, medium-sized and well-known enterprises, laid more than 40 million terminals, and covered all cities above the prefecture level in Chinese mainland, Hong Kong and Macao, and ranked first in Nielsen Asia-Pacific acquirer for ten consecutive years.
Today, data has become a new driving force for economic growth, and digital technology is becoming an important engine for social development. With the rapid development of the digital economy, financial companies have increased their investment in the field of financial technology to improve their digital operation capabilities and accelerate the process of digital transformation. Against this backdrop, UnionPay Commerce has accelerated the construction of digital infrastructure with the goal of digital transformation of "full access, accurate and real-time, on-demand self-service, and intelligent interaction".
In the process of serving large-scale merchants for a long time, UnionPay Commerce has accumulated a huge, real and high-quality data asset data, which is not only the basis for UnionPay Commerce to open a new growth curve, but also the key support for further serving merchants. In order to achieve the goal of managing, visualizing and empowering data assets, UnionPay Commerce built a big data platform based on the Hadoop system as early as 2015 to provide data support for companies and merchants. However, with the continuous expansion of business, the construction needs of various departments and subsidiaries of UnionPay Commerce Corporation for data applications are emerging, and the early big data platform based on Hadoop can no longer efficiently support the business needs of scalability, timeliness and convenience, so the iterative upgrade of the data platform is imperative.
Since 2020, UnionPay Commerce has started the upgrade journey of the data platform, and built a new generation of real-time data warehouse architecture based on Apache Doris, which improves the performance of data import by 2-5 times, the performance of ETL scenarios by 3-12 times, and the response speed of query and analysis by 10-15 times, meeting the business needs of large-scale data import and real-time ultra-fast query, solving the problem of rapid business and data growth, and improving the efficiency of data application construction. It fully helps to improve business efficiency and the servitization of digital assets, and promotes the implementation of the digital process.
In the process of providing services to a large number of merchants in all walks of life for a long time, UnionPay Commerce has accumulated a large amount of transaction data, user data, terminal data and business data. Through the mining and utilization of these data, we can provide diversified data services for UnionPay business headquarters, branches, subsidiaries and merchants, which can explore the hidden value and consumption trends, and become an effective tool for mastering business control and making business decisions. Typical data service scenarios include:
Business analysis scenario: Build an indicator system and cockpit to help business parties and managers understand the overall business operation status in real time.
Business operation scenario: Build a tagging system to better understand user portraits and needs, so as to provide accurate products and business services.
Data analysis and mining scenarios: Build self-service analysis and reporting services to better understand your business situation and make scientific decisions.
External data service scenario: Build statements, reports, and data reporting services to help merchants better understand market demand.
In order to meet the needs of various scenarios, UnionPay Commerce introduced the Hadoop system as early as 2015 to build a big data platform and an offline data warehouse based on Hive. The architecture integrates data from multi-service systems such as MySQL, Oracle, and MongoDB, collects it into the Hive offline data warehouse according to the timeliness of T+1, and synchronizes it to different components to provide data query services through the processing of each layer of the Hive data warehouse, including building a CUBE based on Kylin and HBase for metric query and analysis, and using HBase to monitor the quality of incremental data, and also synchronize the processed data to Business data queries are available in Oracle.
Each component of the architecture has its own responsibilities and the overall architecture is clear, which carries the needs of the business for data query services in the early stage. Under the wave of rapid mobile payment, payment scenarios have become more diversified, the scale of UnionPay's business has grown steadily, and the construction needs of headquarters and subsidiaries for data applications have emerged, while the traditional big data platform has been unable to efficiently support the continuous growth of business and data, and the pain points have gradually emerged
Data timeliness: The overall data processing link is long, the efficiency of large-scale data import and processing is low, and the time interval between the latest business data from production to application is too long, which cannot give full play to the value of real-time data.
Low query efficiency: Kylin undertook the needs of query and analysis of most indicators in the early days, but with the growth of business volume, the cost of data preprocessing is getting higher and higher, and it cannot support flexible and fast analysis requirements, while Hive has insufficient performance in dealing with complex queries and can only rely on stacking machines, which undoubtedly greatly increases the cost of hardware resources.
O&M cost and scalability: The overall architecture involves multiple components, and the O&M workload is large, and the scalability is insufficient in the process of responding to the growth of business requirements, and it cannot respond to the rapid growth of data and the requirements of rapid online data applications.
Facing tens of millions of merchants from all walks of life, with different industry characteristics and actual needs, after comprehensively combing the many platform systems within the enterprise and the products provided by customers, UnionPay Commerce has determined the core goal to be achieved in digital transformation - "full access, accurate and real-time, on-demand self-service, and intelligent interaction".
Specifically, "full access" means that all platforms are fully interoperable and data is integrated and shared, so as to facilitate a more comprehensive grasp of the all-round information of data topics and give full play to the synergistic effect of data. "Accurate and real-time" means giving full play to the real-time value of data, and ensuring that the data is "fast" and "accurate" according to technical means, so as to lay a solid foundation for follow-up analysis; "On-demand pick-up" means to provide self-service services, flexible combination, on-demand access, and even tailor-made and customer-specific; "Intelligent interaction" is to make full use of technical means, from passive acceptance of services, to active ability output, to provide analysis, auxiliary decision-making and other intelligent services, respond to user needs, and achieve two-way interaction.
In terms of data services, in the process of providing data services to internal and external customers, UnionPay Commerce noticed that users had higher expectations for the performance of data analysis, the flexibility of analysis modes, the stability of data services, and the timeliness of data applications, so we decided to upgrade the existing architecture to meet new business requirements and solve the problems existing in the earlier architecture, so we officially launched the upgrade journey of UnionPay Business Data Architecture in 2020, and the upgrade objectives mainly include the following aspects:
Command. 1. Concise: A single system can complete the unification of data processing and services to simplify the data processing process and improve work efficiency;
Stable and efficient: It supports efficient data processing and high-performance data query, and the stability of the system and platform is fully guaranteed.
Accurate and real-time: support real-time data update and access to ensure that the data is accurate, not lost or duplicated;
Secure and reliable: Ensure the security of data access and data storage, support cluster disaster recovery, and highly reliable data.
Based on the above goals, we conducted in-depth research, compared a variety of big data components, and chose to introduce Apache Doris to build a new generation of real-time data warehouse architecture.
In the iterative process of architecture, on the one hand, we need to ensure the seamless operation of the business and avoid the impact of the business experience caused by system switching, and on the other hand, we need to take into account the compatibility with the old architecture, gradually iterate and adjust according to the actual situation, and at the same time, we also want to give full play to the capabilities and advantages of the new architecture, so the construction path is gradually clear
Introduce real-time data processing and analysis links to improve data timeliness.
Promote the migration of data applications from offline to real-time, and continuously improve the efficiency of query and analysis to improve business efficiency.
Break through the barriers between offline and real-time data links, unify data calibers and data service exits, and provide a consistent data service experience.
Therefore, in the new architecture, a real-time link is added to the original offline data warehouse system, and real-time data in various business databases such as MySQL is collected and transmitted to Apache Doris through Kafka, and the data is processed by using Flink and Doris SQL, and a hierarchical data warehouse system from ODS to ADS is built within Doris. At the same time, based on the multi-catalog capability provided by DORIS, the query of Hive data can be opened, avoiding heavy data migration costs. Each upper-layer data application can be connected to DORIS, without switching between offline data and real-time data, and DORIS's unified external query service is provided.
This upgrade has greatly improved the efficiency of data processing and the convenience of query, and we are gradually migrating from offline data warehouses to real-time data warehouses with Apache Doris-based as the core, so as to prepare for more efficient and large-scale data management and analysis.
Next, we will introduce the planning and design of the real-time data warehouse architecture based on Apache Doris, and we will share the practical experience of building the real-time database from the perspective of data model, bucketing strategy, data synchronization and processing methods.
1. Construction and practice of real-time data warehouse system
When building a data warehouse system, UnionPay Commerce adheres to the principle of governance, construction, and empowerment, with data governance as a key part and unified planning as its core content. Therefore, the unified planning of the data warehouse can ensure the rationality of the data warehouse structure design, which is not only conducive to the subsequent management and maintenance of the architecture, but also conducive to the guarantee of data reliability and consistency. Therefore, we have taken the following measures in terms of unified planning of data warehouses:
1. Reasonable planning of data warehouse stratification
Reasonable data warehouse layering plays a key role in data management and query performance, and based on the rich data model of Apache Doris, we have planned the layering of data warehouse in advance. Let's start by understanding what features the Doris data model has:
Duplicate Key Model: It is suitable for detailed data query scenarios and can support ad-hoc query of any dimension.
Unique Key model: It is suitable for scenarios with unique constraints on data, need to support accurate data deduplication, or need to update data, and can support multi-stream upsert and partial column updates of large and wide tables.
Aggregate key model: It is suitable for report query scenarios and accelerates report analysis through pre-aggregation of data.
Based on the actual application scenarios and data models, let's introduce the layering strategy of UnionPay business data warehouse
For example, in the transaction clearing scenario, UnionPay Commerce has tens of millions of clearing data to be processed every day, and the liquidation date spans up to one year, which requires all data to be stored completely. In order to meet this need, we choose the duplicate key model in the ODS layer, which is stored exactly according to the detailed data in the import file, without any aggregation operations. However, some merchant order data involves the update of order status, so the unique key model is adopted, and if the merchant ID and order ID are the same, it will be automatically updated to the latest status during the data import process.
The data model adopted by the DWD and DWS layers is basically the same, but the essential difference lies in the degree of abstraction of business data, mainly using the unique key model, while some scenarios with detailed data storage also retain the duplicate key model. For example, in the settlement and transfer scenario, the settlement date is used as a partition field and the table model is set to a unique key model, so that the status of settlement data can be automatically updated over a period of up to one year.
As a highly abstraction of business data, the ADS layer adopts the aggregate key model, which can greatly improve the efficiency of data query and analysis and reduce the pressure of real-time calculation by pre-aggregating all settlement data.
2. Reasonable settings of bucket partitioning policies
Partitioning bins is an important means to optimize data storage and improve query efficiency, and setting the number of buckets and bucket fields can effectively improve the query speed and the execution efficiency of data transformation scripts. In the data warehouse application, we refer to the actual data scale and the configuration suggestions on the official website, and plan the binning fields and number of bins for each table. For example, in a store width table, you often need to query location dimension data, so we use locations as bucket fields and set the number of buckets based on the size of the table. Here are the number of buckets we set up under different tablets, for reference:
3. Multi-source data migration solution
In the process of migrating the data of UnionPay branches to DORIS, we found that the local systems of the branches used a wide variety of databases and complex file storage formats, which brought considerable challenges to the data migration work. In order to ensure the smooth progress of data migration, we have developed corresponding migration plans for different data and file formats.
Doris supports a variety of data migration methods, whether it is offline data synchronization or real-time data synchronization, you can find efficient and fast data migration methods
In the real-time scenario, you can use Flink CDC to obtain MySQL binary logs in real time, and some of the data is directly written to Doris through Flink CDC, and the other part of high-traffic data is synchronized to Kafka for peak shaving, and then written to Doris through the flink-doris-connector.
In offline scenarios, data is more diverse and file formats are more complex, so data migration is done in a variety of ways. For historical and incremental data on S3 and HDFS, use broker load for batch import; For data stored in Hive and JDBC foreign tables, use the insert into method to synchronize data. For data in file format, use Flink FTP Connector and Flink Doris Connector to synchronize (because FTP is a cross-system data file interaction mode within UnionPay Commerce, and the file format is complex, the Flink FTP Connector has been developed to support complex data formats, support multiple line breaks and other complex application scenarios).
Rich data migration methods make it easy to migrate data from various databases to DORIS, and at the same time, the synchronization of multiple file formats solves the data inconsistency of branch offices.
1. The problem of non-standardization greatly reduces the difficulty and cost of data migration of branches, and provides strong support for the data integration and unified management of UnionPay Commerce.
4. Synchronization of full and incremental data
In the process of synchronizing a large amount of offline data, business continuity and data accuracy are very important, so we have adopted two methods to deal with full data synchronization and incremental data synchronization.
In the full synchronization scenario, we first create a temporary table with the same table structure, import all the data into the temporary table, and then use the alter table t1 replace with table t2 statement to atomically replace the temporary table and the formal table, and the temporary table becomes the official table, and the front-end business query will not be hindered. In the incremental synchronization scenario, a new incremental partition is created to synchronize incremental data directly to the incremental partition.
5. Migration of offline data processing tasks
At present, we have migrated the data processing tasks of the offline data warehouse to DORIS, used DORIS SQL for data processing and task scheduling through the scheduling platform.
For example, in the past, 30 million pieces of data needed to be processed every day and processed by the TEZ computing engine used in the Hive offline data warehouse, which took up to 2 tons of computing resources to be processed5 hours. When the data transformation tasks were migrated to Apache Doris, the entire link processing time was reduced to 0In 5 hours, the execution efficiency of the entire link is increased by more than 5 times, and the execution time of a single script is also increased from 8 minutes to 10 seconds.
2. Best practices for the stability of financial-grade data warehouses
At present, Apache Doris has been widely used in multiple business scenarios in UnionPay Commerce, serving various internal business analysis reports, user tags, self-service data retrieval platforms and other applications, and providing a variety of data services such as statements, reports, and data reports for external merchants, so the stability and availability of the cluster are crucial to the platform user experience and business continuity, and any cluster failure or instability may lead to obstruction of business decisions and affect user trust.
Therefore, UnionPay Commerce has taken a number of measures to ensure the stability and availability of the cluster, including multi-tenant resource isolation, fine-grained permission management, cluster disaster recovery, and multiple stability tuning strategies.
1. Multi-tenant resource isolation
In the actual business operation process, there are often multiple businesses or different departments querying the same data at the same time, and under the condition of limited resources, the query performance may be degraded or even the cluster is unstable due to the resource preemption of the query task software, and the data visibility requirements for different levels of the organizational structure are also inconsistent.
Single-query resource limit to ensure that resources between queries are controllable
After sorting out multiple internal applications, we subdivided the scenarios and tenants according to the business analysis workload, which were mainly divided into four scenarios: data processing (ETL), data exploration (AD-HOC), data dashboard (reporting), and data serving. To ensure the independence of each scenario and tenant, we limit the resources of a single query for each scenario. Specifically, we set up four types of DORIS accounts for each tenant, and set limits on the CPU and memory usage resources of the accounts, with the initial value set to 5 CPUs, and then fine-tuned according to the usage of each tenant to achieve the appropriate resource allocation. At present, the distribution of each scenario of UnionPay business is as follows:
The advantage of this strategy is that even if the resource usage of a single tenant increases, it will only affect the usage of the tenant in specific scenarios, and will not have any impact on other tenants or other scenarios, effectively improving the stability of the platform.
Multi-tenant data based on resource tags is isolated from queries
In the case of data usage from the head office to the branch office, we use the resource tag-based resource group physical isolation method to ensure the security and independence of the data.
At present, Apache Doris, a rich variety of data is stored, and from the perspective of data security, the data visibility range is finely divided, so that the head office can access all the data at the company level, while the branch can only access the data within its own business scope. In addition, some data is queried by the branch authorized by the head office, or the personalized data of the branch needs to be associated with the data of the head office. In this scenario, we use the resource tag resource isolation mode to separate the data from the available resources of the cluster.
Specifically, we configure an independent resource group for the branch office, store the branch personalized data in three copies in the independent resource group, and set the head office data to four copies, three of which are stored in the head office resource group, and the remaining single copy is stored in the independent resource group of the branch. When a branch office queries the data of the head office, only the single copy data in the branch resource group is queried, which ensures the security of the data and improves the stability and reliability of the system. The specific scheme is as follows:
Set be node labels: Assign the head office resource group and branch resource group, and set the corresponding labels on the server.
Set data distribution: Set replication allocation when creating a table, and set the ratio of the head office to 3:1, which can be flexibly adjusted based on the actual usage of the head office.
Set user resource group: Set the corresponding default resource group for users, and the head office and branch use their respective resource groups, so as to achieve query isolation of the head office and branch office.
More flexible resource isolation schemes
Although the resource tag-based resource isolation scheme realizes the physical level of resource isolation, although it is better in terms of independence, there is still some room for optimization in resource utilization, and it cannot guarantee more fine-grained resource isolation in the process, so Apache Doris is in 2Workload Group resource soft limits were introduced in version 0.
From the perspective of implementation principle, workload group can associate the query executed by the user with the workload group by managing workload groups, which can limit the percentage of CPU and memory resources of a single query on the BE node, and can configure the memory soft limit of the resource group. When cluster resources are tight, query tasks with large memory usage can be automatically terminated to relieve the pressure on the cluster. When cluster resources are idle, when the workload group uses more resources than the preset value, other workload groups can share the idle cluster resources, and automatically break through the threshold to ensure the stable execution of query tasks, so as to achieve fine-grained control of memory and CPU resources.
We are also continuing to explore the combination of the new version features and services, and the subsequent resource limits for single query in scenarios such as data transformation, data exploration, data kanban, and data services can be achieved through workload groups, and the task prioritization and task queuing mechanism can be further used to ensure the priority operation of key services.
2. Fine user rights management
In order to meet business needs and regulatory compliance requirements, UnionPay Commerce has established a strict user rights management system. The system clarifies the roles and permissions of different user groups, ensuring that users only have access to the functions and data they need. The following is the solution for the management of UnionPay business user rights:
User permission settings: Set different data usage permissions for different users in different scenarios in each branch.
Database, table, and row-level permission management: In order to meet the permission management requirements of each branch, a view is generally established for each branch, which is cumbersome and different from the use of Hive data warehouse, and may need to modify tables and statements. You can use the Row Policy mechanism to easily control permissions at the database, table, and row levels, and migrate the tasks of the original Hive data warehouse to DORIS.
Column-level permission management: Currently, you use the method of building views to manage column-level permissions.
3. Cluster stability guarantee
SQL circuit breaker: After the platform is opened to internal users, users often encounter situations where users query SQL irregularities and consume too many resources.
Import concurrency control: Considering that we often need to synchronize historical data to the platform, this will involve a large number of data modification tasks, which may cause a lot of pressure on the cluster. Therefore, we used the merge-on-write update mode of the unique key model, enabled vertical compaction and segment compaction, and adjusted the compaction parameter to control the data import frequency and reduce the pressure on the cluster.
Network traffic control: QoS is set for offline and real-time scenarios, and network isolation is further achieved through QoS policies. Considering that UnionPay has two sets of clusters in Shanghai and Wuhan, and the traffic in the process of remote network interaction is very important, we use QoS policies to achieve accurate network isolation operations to ensure network service quality and stability in different scenarios.
Monitoring and alarm: In order to meet the requirements of the company's internal night shift monitoring, we use DORIS to connect with the internal monitoring and alarm platform. The monitoring and alarm related to DORIS, sound and light monitoring, CU instant messaging software and email are connected to realize real-time monitoring and processing of problems.
4. CCR-based cluster disaster recovery capability
For financial enterprises, service stability and data security are crucial, and disaster recovery solutions are important measures to ensure business continuity and data security.
For the core business data of UnionPay Commerce, we expect to be able to achieve cross-cluster and remote disaster recovery, so we build an active-active solution for active and standby clusters based on cross-cluster data replication. Normal business queries access the primary cluster, and key business data will be synchronously written to the standby cluster and updated in real time, so that even if a set is down, it can be quickly switched to the standby cluster to quickly restore core services and data.
In the face of the increasing demand for data processing and analysis, UnionPay Commerce chose to build a new generation of real-time data warehouse based on Apache Doris, which has served multiple internal business and external merchant data service scenarios such as business analysis reports, user tags, and self-service data retrieval.
Taking the statement query scenario as an example, in the semi-annual statement scenario, the data query time is reduced from 8 minutes to 3 seconds, which is faster by more than 100 times, and the time spent in the annual statement query is also shortened to less than 2 minutes, and the performance of most typical query scenarios is improved by 10-15 times, and the overall query and analysis efficiency is greatly improved. In addition, the data import performance has been improved by an average of 2-5 times, and the data processing speed and efficiency have been increased by 3-12 times, greatly enhancing the timeliness of data application.
Through more efficient, real-time and flexible data analysis support, UnionPay Commerce is able to better understand the market, grasp opportunities, and optimize operations, so as to achieve sustainable business growth and innovative development.
UnionPay Business Shares***
UnionPay Commerce is a large-scale non-bank payment institution in China, providing comprehensive payment services based on bank card acquiring and online payment, as well as diversified and professional merchant value-added and technological innovation services, and has always been committed to building an inclusive, convenient, efficient and safe payment environment for merchants, partners and consumers. As of December 2023, UnionPay Commerce has served more than 25 million merchants including large, medium-sized and well-known enterprises, laid more than 40 million terminals, and covered all cities above the prefecture level in Chinese mainland, Hong Kong and Macao, and ranked first in Nielsen Asia-Pacific acquirer for ten consecutive years.
Flywheel Technology
Beijing Flywheel Data Technology Co., Ltd. is a commercial company based on the open source analytical database Apache Doris, adhering to the two-wheel drive strategy of "open source technology innovation" and "real-time data warehouse service", while investing resources to vigorously participate in the R&D and promotion of the Apache Doris community, based on the Apache Doris kernel to create an enterprise-level product focusing on the real-time analysis needs of enterprise big data. Build world-leading, real-time analytics capabilities for next-generation demands. Since its establishment one year ago, it has received nearly 1 billion yuan in financing from top VCs such as IDG Capital, Sequoia China, and Xianghe Capital, setting a new record in the field of open source basic software in recent years.