As an emerging open data management architecture, a lakehouse can give full play to the flexibility of a data lake, the rich ecosystem, and the enterprise-level data analysis capabilities of a data warehouse, and has become a popular choice for enterprises to build a modern data platform.
In the previous live broadcast, we shared the architecture design of the HashData lakehouse solution and Hive data synchronization. In this live broadcast, we introduced the features and support solutions of Iceberg and Hudi, and explained and demonstrated in detail the principles and implementation processes of the HashData connection component. The following content is organized based on the text of the live broadcast. Application scenarios for Hudi and Iceberg technologies
In the process of building an enterprise data platform, with the continuous increase in data volume and the enrichment of scenarios, each enterprise will develop different architectural designs based on its own technical routes and needs.
A data lake is an evolving, scalable infrastructure for big data storage, processing, and analytics that allows enterprises to store structured and unstructured data at any scale. With the gradual maturity of cloud storage (especially object storage) technology, data lake solutions are gradually moving closer to cloud native, and data processing methods have evolved from batch processing to stream processing. Against this backdrop, modern data lakes require powerful stream batch processing capabilities, efficient data update mechanisms, rigorous transaction support, and flexible storage and compute engines. In the face of the above requirements, the traditional HIVE+HDFS architecture data warehouse has high data modification costs, does not support transactions (ACID), and cannot implement stream batching.
1. "Pain points" such as the time spent on data analysis cannot be directly used to build a data lake. In recent years, advanced management technologies such as Hudi and Iceberg have become the mainstream selection of enterprise data lake construction due to their open file storage formats, rich transaction support, and efficient read and write capabilities. Hudi baseThis term is related to:Write the operation flow
Hudi was born to solve the problem of data update and incremental query in the Hadoop system, and has distinctive characteristics in data storage and query.
The file layout of Hudi is the basis for its incremental query, data update and other features, and each Hudi table has a fixed directory to store metadata (.).hoodie) and data files, where the data files are partitioned in a partitioned fashion, and each partition has multiple data files (base files and log files) that are logically organized into files and filegroups.
base file: Data files stored in columnar format, parquet format by default. log file: The row-stored data file is in **ro format, which stores the redo log of the data, and is periodically merged with the base file. file group: A collection of all basefiles + logfiles with the same fileid in the same partition, and a partition can have multiple filegroups. file slice: A collection of basefiles + logfiles with the same fileid and the same instant in the same partition.
It can be understood as a timeline of the Hudi table, which records the operation of the Hudi table at different times and ensures the atomicity of the operation. The timeline consists of three fields: action, time, and state. Hudi provides two types of tables: copy-on-write (cow table) and merge-on-read (mor table).
COW table: Only use columnar file formats such as parquet to store data. Simply update versions and rewrite files by performing a synchronous merge during writesIt is suitable for scenarios where the amount of update data is large and the timeliness is not high
Mor table: Data is stored using a combination of column-based + row-based (e.g., **ro) file format, updates are recorded to an incremental file (row-based), and then compressed to synchronously or asynchronously generate a new version of the columnar fileIt is suitable for scenarios with a small amount of updated data and high timeliness requirementsHudi supports three query types: snapshot query, read optimized query, and incremental query:
snapshot query: Query the data of the most recent snapshot, that is, the latest data. read optimized query: A unique query method for MOR tables, which only reads basefiles and does not merge logs, because they use columnar file formats, so it is more efficient.
incremental query: You need to specify a commit time, and then Hudi will scan the records in the file to filter out the timeline records and basefiles where the commit time is greater than the begintime, which can effectively improve the incremental data processing capacity.
In the Hudi data lake framework, you can write data in three ways: upsert, insert, and bulk-insert. Among them, UPSERT is the default behavior, which is also the core function of Hudi.
Figure 1: The process of writing Spark to Hudi is shown in Figure 1
Start Submission: Determines whether the last task failed, and triggers a rollback operation if it fails. The request identification metadata for the start of a transaction is then generated based on the current time. Constructing a HoodieRecord RDD object: Hudi constructs a HoodieRecord RDD object based on the metadata information to facilitate subsequent data deduplication and data merging. Data deduplication: If there may be duplicate data in a batch of incremental data, Hudi deduplicates the data based on the primary key to prevent duplicate data from being written to the Hudi table. Obtain the data fileid location information: In the modification record, you can obtain the fileld of the file to which the current record belongs according to the index, because the update operation needs to know which fileid file to write a new snapshot file to during data merging. Data merging: In the COW table mode, the fileid snapshot file of the index hit will be rewritten. In the Mor table mode, the log file is appended to a partition based on the fileid. Complete Submission: Generate xxxx in metadatacommit file: Only by generating a commit metadata file, the query engine can query the upsert data based on the metadata. Data Cleanup: Used to delete old shards and limit the growth of tablespace, cleanup is performed automatically after each write operation, and timeline metadata cached on the Timeline Server is used to prevent the entire table from being scanned. CompAction compression: It is mainly used in mor mode, and it will be xxx. in mor modeThe log data is merged into xxxparquet snapshot file. LCEBERG basic terminology and write operation process
Iceberg's official website is positioned as "an efficient storage format for massive data analysis scenarios", so it does not simulate the design pattern of a business database (primary key + index) like Hudi to achieve data update, but designs a more powerful file organization form to implement data update operations. The data file is the file in which the Apache iceberg table actually stores data, generally in the data directory of the data storage directory of the table, if our file format is parquet, then the file is set to ".".parquet", iceberg generates multiple data files per update.
A snapshot represents the state of a table at a certain time, and each snapshot lists all the data files of the table at a certain time. Data files are stored in different manifest files, manifest files are stored in a manifest list file, and a manifest list file represents a snapshot.
A manifest file is a metadata file that lists the list of data files that make up a snapshot. Each row contains a detailed description of each data file, including the status of the data file, file path, partition information, column-level statistics (such as the maximum and minimum values of each column, the number of null values, etc.), the size of the file, and the number of rows of data in the file. Among them, column-level statistics can filter out unnecessary files when scanning table data. The manifest file is stored in the **ro format, starting with ".*ro" suffix. The manifest list is also a metadata file that lists the snapshots of the build table. This metadata file stores a list of manifest files, each of which occupies one line. In each row, the path of the manifest file is stored, the partition range of the data files it stores, the number of files is added, and how many data files are deleted, which can be used to provide filtering and speed up the query.
Figure 2: Schematic diagram of the Iceberg write process When writing data to Iceberg, the internal workflow can be summarized as follows:
Generate fileappender: Depending on the configured file format, iceberg generates the corresponding fileappender, which is the component that actually performs the file write operation. Write to a data file: The fileappender is responsible for writing data to the target file. Collect statistics: After all data is written, Iceberg collects the written statistics, such as record count, lower bound, upper bound, value count, etc., which provide important input files for subsequent generation of manifest files. Generate manifest file: Based on statistical information, Iceberg generates the corresponding manifest file, which is the index of the datafile, which stores the path of each data file, and implements the organization and management of the file according to these manifest files. Message back: The executor sends the generated manifest file and other related information back to the driver to complete the writing process.
How the hashdata connector works and how it works
The data in a data lake is often unorganized or processed, limiting the efficiency of direct analysis. Hashdata has achieved smooth integration with these two architectures through self-developed Hudi and Iceberg connectors. HashData currently supports readonly tables for Hudi and Iceberg, but does not support write.
Figure 3: As shown in the preceding figure, the hashdata connector reads Hudi and Iceberg data by creating external tables to further analyze and use data in the lake.
Create a foreign table
First of all, you need to have a table that needs to be read by hudi and iceberg. We use components such as Spark and Flink to create tables on Hudi and Iceberg and write data, and specify the Hudi and Iceberg formats.
Submit and create a readable external table on the hashdata db, which includes information such as path and catalog type, which is the location-related information we mentioned earlier.
Next, call the Hudi and Iceberg clients, and the client will create a connection and call Get Table, and pass in the information of the external tables to obtain the metadata information of the Hudi and Iceberg tables, including the number of fields in the table, field names, and data types.
Based on the obtained metadata, the hashdata table is generated by mapping on the db.
At this point, the process of creating a foreign table corresponding to Hudi and Iceberg is completed.
The above steps are completed by connecting components, which is equivalent to packaging and passing information such as the path and catalogtype of the table to the connector. The connector obtains the information of the relevant table and then passes it back, and hashdata maps the information back to a readable foreign table.
When a SELECT query statement is initiated, hashdata initiates a query for select internally, and packages the relevant parameters of the query through connectors. It is then passed to the connector via an external scan filter (e.g. a WHERE condition in SQL).
The connector then calls the scan APIs of Hudi and Iceberg, and the scan method will get the parameters passed in, and according to these parameters, it will filter and query the list of all files related to the table and return the related list files.
After obtaining the file list, the external generates a query plan to complete the query operation and interact with the metadata of Hudi and Iceberg.
After obtaining the data, hashdata will package the file list and distribute it to each segment node, which will obtain a shard in the file list and read the data based on this information. After the data is returned, the entire process of reading the data ends.
Conclusion
Hudi and Iceberg are currently the mainstream data lake solutions and are widely favored. HashData's "lakehouse" technology solution opens up the data warehouse and the data lake, the underlying layer supports the coexistence of multiple data types, which can truly realize the mutual sharing of data, and the upper layer can be accessed through a unified encapsulated interface, which can support real-time query and analysis at the same time, bringing more convenience to the data governance and use of enterprises under the data lake architecture.