The most hardcore operation on the whole network, how to insert 1 billion data into MySQL as quickly

Mondo Technology Updated on 2024-01-28

This is an interview question that I once asked, and this question is not about the exact time you answer, but about how to design a system that can insert 1 billion pieces of data as quickly as possible.

The author replied stupidly for three hours, but did not say why. The interviewer saw that I hadn't woken up and told me to go back and wait for the notice. Fortunately, he returned the resume to me, and I saved a resume. Try to rethink it today and slap him in the face.

To import 1 billion pieces of data into the database as quickly as possible, you first need to clarify with the interviewer, what form of 1 billion pieces of data exists, how big is each piece of data, whether it is imported in an orderly manner, whether it cannot be repeated, and whether the database is mysql

After the assumptions are clarified with the interviewer, there are the following constraints:

1 billion pieces of data, 1 KB each

The data content is an unstructured user access log that needs to be parsed and written to the database.

Data is stored in HDFS or S3 distributed file storage.

One billion pieces of data are not one large file, but are approximately divided into 100 files, and the suffix is marked in order.

It is required to be imported in an orderly manner and try not to repeat it.

The database is MySQL

1. Can a single table in the database support 1 billion?

First, is it feasible to write 1 billion pieces of data to a single table in MySQL?

The answer is no, the recommended value for a single table is less than 2000w. How is this value calculated?

The MySQL index data structure is a B+ tree, and all data is stored on the leaf nodes of the primary key index, that is, the cluster index. The performance of B+ tree insertion and query is directly related to the number of B+ tree layers, with 3-layer indexes below 2000W and 4-layer indexes above 2000W.

The leaf node of the MySQL B+ index is 16k in size per page. Currently, each piece of data is exactly 1k, so it is simply understood that each leaf node stores 16 pieces of data. The size of each non-leaf node of the b+ index is also 16k, but it only needs to store the primary key and the pointer to the leaf node, we assume that the type of the primary key is bigint, the length is 8 bytes, and the pointer size is set to 6 bytes in innodb, so that a total of 14 bytes, so that a non-leaf node can store 16 * 1024 14=1170.

That is, each non-leaf node can be associated with 1170 leaf nodes, and each leaf node stores 16 pieces of data. From this, we can obtain the number of layers of the B+ tree index and the number of storage amounts. The number of index layers above 2kw is 4 layers, and the performance is even worse.

For more details, please refer to B+ Tree Layer Calculation (baijiahao.).baidu.com/s?id=1709205029044038742&wfr=spider&for=pc)。

To facilitate calculation, we can design a single table with a capacity of 1 kW and a total of 100 tables with 1 billion pieces of data.

2. How to write to the database efficiently

If the performance of a single write database is poor, you can consider writing to the database in batches, and the batch value can be dynamically adjusted. Each message is 1k and can be adjusted to 100 pieces in batches by default.

How Do I Ensure Batch Data Is Written Successfully at the Same Time?The MySQL InnoDB storage engine guarantees that batch write transactions succeed or fail at the same time.

If it still fails after retrying N times, you can consider writing 100 pieces to the database, print the record of the failed data, and discard it.

In addition, the fastest performance can be achieved by writing in the order of primary key IDs, while the insertion of non-primary key indexes is not necessarily sequential, and frequent index adjustment will lead to degraded insertion performance. It is best not to create a non-primary key index, or to create an index after the table is created to ensure the fastest insert performance.

Whether you need to write the same table concurrently

No. Concurrent writes to the same table cannot guarantee that data is written in order.

The threshold for batch insertion is increased, and the concurrency of insertion is increased to a certain extent, eliminating the need to write to a single table concurrently.

3. Selection of MySQL storage engine

Myisam has better insert performance than InnoDB, but it loses transaction support, and there is no guarantee that batch inserts will succeed or fail at the same time, so when batch inserts time out or fail, if they are retried, it will inevitably lead to the occurrence of some duplicate data. However, in order to ensure a faster import speed, the Myisam storage engine can be listed as one of the plans.

At this stage, I would like to quote the performance test results of others Myisam vs. InnoDB (T.).csdn.cn/efm9z)。

From the data, you can see that batch writes are significantly better than single writes. And after InnoDB disables the instant flush disk policy, the InnoDB insert performance is not much worse than Myisam.

innodb flush log at trx commit: controls the policy for refresh data to disk in MySQL.

Default = 1, that is, the data will be refreshed to the disk every time the transaction is committed, and the security is the highest without data loss.

If the configuration is set to refresh data to disk every 1 second, data may be lost for 1 second when the system goes down or MySQL crashes.

Considering that InnoDB has good batch performance when the instant flush disk policy is disabled, it is tentatively necessary to use InnoDB first (if the company's MySQL cluster does not allow the change of this policy value, you may need to use MyISAM). During the online environment test, you can focus on comparing the insertion performance of the two.

Fourth, whether to carry out the sub-library

There is a performance bottleneck for concurrent writes to a single MySQL database, and 5K TPS writes are generally high.

At present, the data is stored in SSD, and the performance should be better. However, if it is an HDD, although the sequential read and write will have a very high performance, the HDD cannot cope with concurrent writes, for example, 10 tables per database, assuming that 10 tables are written concurrently, although each table is sequentially written, due to the different storage locations of multiple tables, the HDD only has 1 head, does not support concurrent writes, and can only re-seek, the time consumption will be greatly increased, and the high performance of sequential read and write will be lost.

Therefore, for HDDs, it is not a good solution to write multiple tables concurrently in a single database. Returning to the SSD scenario, different SSD vendors have different writing capabilities, and their ability to write concurrently is also different, some support 500M s, some support 1G S read and write, some support 8 concurrency, and some support 4 concurrency. **We didn't know how the actual performance would be until we started the experiment.

Therefore, to be more flexible in design, it needs to support the following capabilities:

The number of databases that can be configured.

You can configure the number of concurrent tables to be written (if MySQL is an HDD disk, only one table is written sequentially, and other tasks wait).

Through the above configuration, the number of online databases and the concurrency of table writing can be flexibly adjusted, whether it is HDD or SSD, our system can support. No matter what manufacturer's model of SSD is performing, you can adjust the configuration to continuously get higher performance. This is also the idea of the later design, which does not fix a certain number of thresholds, but must be dynamically adjustable.

Next, let's talk about file reading, 1 billion pieces of data, 1k each, a total of 931g. Nearly 1T large files, generally do not generate such large files. So by default, the file has been roughly split into 100 files. The number of files per file is about the same.

Why is it cut to 100 pieces?Isn't it possible to import the database faster by dividing up to 1000 and increasing the concurrency of reads?As mentioned earlier, the read and write performance of a database is limited by disk, but any disk has a faster read operation than a write operation. In particular, when reading, you only need to read from a file, but when you write, MySQL needs to perform complex processes such as indexing, parsing SQL statements, and transactions. Therefore, the maximum concurrency of writing is 100, and the concurrency of reading files does not need to exceed 100.

More importantly, the concurrency of reading files is equal to the number of table shards, which is conducive to simplifying the model design. That is, 100 read tasks and 100 write tasks, corresponding to 100 tables.

5. How to ensure that the writing to the database is orderly

Since the file is divided into 100 small files of 10 g, you can use the file suffix + in the file line number as the unique key to record the file, and ensure that the content of the same file is written to the same table. For example:

index_90.txt is written to database 9, table 0, index 67txt is written to database database 6, table 7.

In this way, each table is ordered. The overall order is achieved by database suffix + table name suffix.

6. How to read documents faster

A 10g file obviously can't be read into memory all at once, and the file read of the scene includes:

files.readallbytes loads memory all at once.

FileReader+ BufferedReader read line by line.

file+ bufferedreader

The scanner reads line by line.

j**a nio filechannel buffer mode.

On Mac, use these few ways to read 3Performance comparison of 4G size files:

For more information, please refer to: Reading File Performance Comparison (zhuanlan.)zhihu.com/p/142029812)

It can be seen that it is obviously better to use j**anio filechannnel, but the way to use filechannel is to read the fixed-size buffer first, and does not support reading by line. There is also no guarantee that the buffer will include exactly integer rows of data. If the last byte of the buffer is stuck in the middle of a row of data, it needs to be read in the next batch of data. How to turn a buffer into a row of data is difficult.

file file = new file("/xxx.zip");fileinputstream fileinputstream = null;long now = system.currenttimemillis();trysystem.out.println("file size:" + size);} catch (filenotfoundexception e) catch (ioexception e) finallysystem.out.println("time:" + system.currenttimemillis() now));
j**anio is buffer-based, bytebuffer can be converted to byte array, needs to be converted to a string, and is handled by line truncation.

But bufferedreader j**aio way to read can naturally support line truncation, and the performance is not bad, 10G file, roughly only need to read30s, because the overall bottleneck of the import is in the write part, even if it is read in 30s, it will not affect the overall performance. So the file is read using the bufferedreader line by line, i.e. scheme 3.

7. How to coordinate the task of reading files and writing the database

This piece is rather confusing, please be patient and read it.

Is it possible to read 100 read tasks, each task reads a batch of data, and writes it to the database immediately?As mentioned earlier, due to the bottleneck of concurrent writing in the database, it is impossible to meet the requirements of one database to write 10 tables in large batches at the same time, so 100 tasks are written to the database at the same time, which will inevitably cause each database to have 10 tables written sequentially at the same time, which aggravates the concurrent write pressure on the disk.

In order to maximize speed and reduce the performance degradation caused by concurrent writes to disk, it is necessary that some write tasks be paused. So do read tasks need to limit the concurrency?No, you don't.

If a write task and a read task are combined, the concurrency of the read task will be affected. The preliminarily planned read and write tasks are handled separately, and neither one will delay the other. However, the actual design found this scheme to be more difficult.

The original idea was to introduce Kafka, that is, 100 read tasks to deliver data to Kafka, and the write task consumed Kafka to write to DB. When 100 read tasks deliver messages to Kafka, the order is disrupted, so how can I ensure that the messages are written to the database in an orderly manner?I thought that you can use Kafka partition routing, that is, read the task ID to route all messages of the same task to the same partition to ensure orderly consumption in each partition.

How many shards do you have to prepare?100 is obviously too much, if the partition is less than 100, e.g. 10. Then there is bound to be a mix of messages for multiple tasks. If multiple tables in the same database are in a Kafka partition, and the database supports batch writes to only a single table, but does not support concurrent writes to multiple tables. The messages of multiple tables in this database are mixed in a shard, and due to the limitation of concurrency, the messages corresponding to the tables that do not support writing can only be discarded. So this solution is both complex and difficult to implement.

Therefore, the Kafka solution was finally abandoned, and the solution of separating read and write tasks was temporarily abandoned.

The final scheme is simplified to:A read task reads a batch of data and writes a batch of data. That is, the task is responsible for reading the file and inserting it into the database.

8. How to ensure the reliability of the task

What if a read task is in the middle of a task and the service is down, or the service is released?Or if the database fails, the write fails, and the task is temporarily terminated, how can I ensure that when the task is pulled up again, the processing will continue at the breakpoint and there will be no duplicate writes?

As we mentioned earlier, you can set a primary key ID for each record, i.e., file suffix index + file line number. The idempotency of writes can be ensured by the primary key ID.

The line number where the file is located, the maximum value is roughly 10g 1k = 10m, that is, 10000000. The largest suffix of the splice is 99. The largest ID is 9900000000.

Therefore, there is no need for the database to automatically increment the primary key ID, and the primary key ID can be specified when inserting in batches.

What if another task also needs to import the database?How to isolate the primary key ID, so the primary key ID still needs to be concatenated with the taskID. For example, convert to long type. If the taskid is large and the spliced value is too large, an error may occur when the task id is converted to the long type.

On top of that, if some tasks are written to 1kw and others are written to 100w, the length of each placeholder cannot be known using the long type, and there is a possibility of conflict. However, if a unique index is added to the concatenated string, the insertion performance will be worse and the data cannot be imported as soon as possible. So we need to think of another option.

Consider using Redis to record the progress of the current task. For example, Redis records the progress of a task, and updates the progress of the task after it is successfully written to the database in batches.

incrby key_name incr_amount
Specify a 100 increase in the current progress, e.g. incrby task offset 100. If a batch insert fails, the insertion is retried. If it fails multiple times, Redis is inserted and updated individually. To ensure that the Redis update succeeds, you can add a retry when the Redis update is made.

If you are not sure about the consistency of Redis progress and database updates, you can consider consuming database binlogs, and each new record will be redis +1.

If a task is interrupted, the offset of the task is queried first. The file is then read to the specified offset to continue processing.

9. How to coordinate the concurrency of reading tasks

As mentioned earlier, the concurrency of a single database inserted table is too high, which affects the database performance. Consider limiting concurrency. How?

Since the read task and the write task are combined. Then you need to limit the read tasks at the same time. That is, only one batch of read and write tasks is selected for execution at a time.

Before that, you need to design the storage model of the task table.

Bizid presets the fields in order to support other product lines in the future. The default value is 1, which represents the current line of business.

datbaseindex stands for the database suffix that is assigned.

tableindex represents the assigned table name suffix.

ParentTaskId, which is the total task ID.

Offset can be used to record the progress of the current task.

After 1 billion pieces of data are imported into the database and divided into 100 tasks, 100 taskIDs will be added to process a part of the data, that is, a 10G file.

The status status is used to distinguish whether the current task is being executed and whether the execution is complete.

How to assign tasks to each node can be considered in the preemption mode. Each node needs to preempt a task, and each node can only preempt one task at a time. How?You can consider starting a scheduled task on each node, periodically scanning the table, scanning to the subtask to be executed, and trying to execute the task.

How do you control concurrency?Redission's semaphores can be used. The key is the database ID

redissonclient redissonclient = redisson.create(config); rsemaphore rsemaphore = redissonclient.getsemaphore("semaphore");Set 1 concurrency to rsemaphoretrysetpermits(1); rsemaphore.tryacquire();Shen ** lock, non-blocking. 
The task is responsible for regular rotation training, and after grabbing the quota, the task will begin. Set the status of the task to process, and release the semaphore after the task is completed or fails.

However, there is a problem with using semaphore throttling, if the task forgets to release the semaphore, or the process crash fails to release the semaphore, what should I do?Consider adding a timeout to the semaphore. So if the task is executed for too long, resulting in the early release of the semaphore, and another customer competing for the semaphore, causing two clients to write a task at the same time, how to deal with it?

What, it's obviously a similar problem of importing 1 billion data into a database, how does it become a similar problem of distributed lock timeout?

In fact, there is no good way to solve the problem of semaphore timeout, normal thinking: if the task is executed for too long, resulting in the semaphore being released, the solution to this problem only needs to renew the contract, the task is executing, as long as the semaphore is about to expire, renew the contract for a period of time, and always keep the semaphore not expired. But Redission doesn't provide semaphore renewal, what to do?

To put it another way, we've been trying to limit concurrency by having multiple nodes compete for semaphore. You can try to select a master node and rotate the task table through the master node. There are three scenarios:

Scenario 1: The number of executions is less than the concurrency

Select the task to be executed with the lowest ID and set the status to In Progress to notify the release message.

The process that consumes the message applies for a distributed lock and starts processing the task. When processing is complete, the lock is released. With the help of Redission distributed lock renewal, the lock does not expire until the task is completed.

Scenario 2: The number of current executions is equal to the concurrency

The primary node tries to get if there is a lock on the task in progress.

If there is no lock, the task fails to be executed, and the task should be republished. If there is a lock, a task is being executed.

Scenario 3: The number of executions in the current execution is greater than the concurrency

Report abnormal conditions, call the police, and manually intervene.

You can use the master node to rotate tasks to reduce task contention, publish messages through Kafka, and process the process that receives the messages to process the tasks. To ensure that more nodes participate in consumption, you can consider increasing the number of Kafka shards. Although each node may process multiple tasks at the same time, it does not affect performance because the performance bottleneck is in the database.

So how should the master node be chosen?You can use zookeeper+curator to select the primary node. The reliability is relatively high.

There are many factors that affect the time it takes for a billion pieces of data to be inserted into a database. This includes database disk type and performance. If the number of database shards can be divided into 1,000 databases, of course, the performance is faster, and the number of shards and tables should be decided according to the actual situation online, which greatly determines the write rate. Finally, the threshold for batch database inserts is also not set in stone, and needs to be constantly tested and adjusted to achieve the best performance. You can follow the optimal threshold of 100, 1000, 10000, etc. to keep trying to insert in batches.

Summary

Finally, to summarize a few important points:

Before you can design a solution, you need to confirm the constraints first. Determining the direction that the interviewer is primarily trying to ask, such as how to cut a 1t file into smaller files, is a difficult point, but it may not be the question that the interviewer wants to investigate.

From the perspective of data scale, it is necessary to shard databases and tables to roughly determine the scale of table sharding.

Based on the analysis of the write bottleneck of a single database, it is determined that the database needs to be sharded.

Considering that disks support concurrent writes differently, the concurrency of multiple table writes to the same database needs to be limited. And it supports dynamic adjustment, which is convenient for debugging the optimal value in the environment.

MySQL InnoDB and Myisam storage engines support different write performance, and they should also be compared and verified.

The optimal threshold for bulk database insertion needs to be determined by repeated testing.

Due to the concurrency limit, it is difficult to separate read and write tasks based on Kafka. So merge the read task and the write task.

Redis is required to record the progress of the task execution. When a task fails, the progress is recorded when it is re-imported, which avoids data duplication issues.

The coordination of distributed tasks is a difficult point, and the problem of timeout renewal cannot be solved by using the Redission semaphore. The master node can assign tasks + distributed locks to ensure that tasks are written exclusively. The primary node is selected using zookeeper+curator.

Author丨Wuyang Shengong**丨Rare Earth Nuggets: juejinCN POST 7280436213902819369DBAPLUS community welcomes contributions from technical personnel at editor@dbapluscn

Related Pages