Spark and Flink s JDBC are unreliable

Mondo Social Updated on 2024-01-31

Don't get me wrong, this is unreliable, it means that neither Spark nor Flink can support the real sense of the word through JDBCStreaming reads, not that it's not available.

At least, judging by the official documentation of both at the moment, or through my own hands-on experience.

So let's talk about the shortcomings of JDBC, although it is a universal database connection method, in streaming reading (or computing).

Read classification of the data source

We know that as enterprises have higher and higher requirements for data processing, this directly leads to our data processing system, which has more variable requirements for the way data sources are read.

The reading mode and frequency of data sources can be roughly divided into two categories from the perspective of the usage requirements of the business side:

Type 1: One-time, that is, reading all the data of the target system (such as a database) at once, which we call itBatch processing

Type 2: Continuous, on the basis of Type 1, it still monitors the changes at the data source and continues to read the subsequent new and changed data, which we call itStreaming

The first is our traditional requirement for reading data sources, which can be met by the mainstream computing engines Spark and Flink.

However, for the second type, although both Spark and Flink support the feature of stream computing, they areIn fact, there is an important premise for this support, that is, the data source side and the corresponding docking method with the data source side can cooperate

JDBC for Spark

I've bragged before that any storage system, or database, that you can name by name, Spark has an interface to read the data in it, or store the results of the computation into it.

Admittedly, Spark does this, but when we want to stream data sources from a particular database, it is a bit overwhelming.

For example, I want it to stream data from MySQL

All I can think of is to use its structured streaming framework to try to read mysql, but when I open the official website and see the data sources it supports (the latest), I can't help but feel a little disappointed:

In other words, there is no MySQL in the data sources that are officially supported to be read in a streaming way, and there is no mention of JDBC.

However, my previous practical experience has taught me that sometimes we can't fully believe the official words, so it's better to try it out ourselves, in case we can, right?

Based on past experience, I wrote the following core ** (remember to introduce the corresponding mysql-connector package in the pom file in advance):

It may seem like that, but once you run it, you'll find it:

Sure enough, no, the official website will not deceive me.

But I know that Spark can definitely read the MySQL data source through JDBC.

So, change the core ** to this, and you can run through:

It's just that in this way, it goes against my original intention, and this logic has been transformed from the original stream computing I wanted to batch processing, that is, after this modification, it is no longer Spark structured streaming but ordinary Spark.

As a result, Spark officials can't (at least so far) read data sources directly with JDBC in a streaming way.

I saw an open source project on GitHub, after the transformation of the JDBC method supported by the official native support, it said that it can support the use of Spark Structured Streaming to incrementally read MySQL data sources, I haven't verified it for the time being, interested students can take a look (github.).com/sutugin/spark-streaming-jdbc-source)。

jdbc for flink

Open the official website of Flink, and in the Flink Connector column, JDBC appears (there is no MySQL):

In that case, let's try it.

First of all, you need to configure the development environment, unlike Spark, if Flink wants to read MySQL data sources, it needs to introduce Flink's unique JDBC Connector (non-traditional MySQL-Connector).

Note that the choice of this version, it may be different from the description on the official website, the version of the latest official document consists of two parts: connector version + flink version, and my version is slightly older.

Then there is the ** part, as follows (like Spark above, here only demonstrates reading mysql data, and then printing it out):

package com.anryg.mysql.jdbc

import j**a.time.duration

import org.apache.flink.contrib.streaming.state.embeddedrocksdbstatebackend

import org.apache.flink.streaming.api.checkpointingmode

import org.apache.flink.streaming.api.environment.checkpointconfig.externalizedcheckpointcleanup

import org.apache.flink.streaming.api.scala.streamexecutionenvironment

import org.apache.flink.table.api.bridge.scala.streamtableenvironment

desc: Reads MySQL data sources in JDBC mode.

auther: anryg

date: 2023/11/8 10:49

object frommysql2print {

def main(args: array[string]):unit = {

val env = streamexecutionenvironment.getexecutionenvironment

env.enablecheckpointing(10000l)

env.setStateBackend(New EmbeddedRocksDbStateBackend(True)) A new way to set the State Backend.

env.getcheckpointconfig.setcheckpointstorage("hdfs:")

env.getcheckpointconfig.setexternalizedcheckpointcleanup(externalizedcheckpointcleanup.retain on cancellation) to set the retention policy for checkpoint records.

env.getcheckpointconfig.setalignedcheckpointtimeout(duration.ofminutes(1l))

env.getcheckpointconfig.setcheckpointingmode(checkpointingmode.exactly_once)

val tableenv = streamtableenvironment.create(env)

Step 1: Read the MySQL data source*

tableenv.executesql(

create table data_from_mysql(

client_ip` string,`domain` string,`time` string,`target_ip` string,`rcode` int,`query_type` int,`authority_record` string,`add_msg` string,`dns_ip` string,primary key(`client_ip`,`domain`,`time`,`target_ip`,`rcode`,`query_type`) not enforced

with('connector' = 'jdbc','url' = 'jdbc:mysql:', 'username' = '***', 'password' = '***', 'table-name' = 'test02'Determine the separator for the text data source.

.stripmargin)

The results are printed directly*

tableenv.executesql(

select * from data_from_mysql limit 100

.stripmargin).print()

The whole ** content is very similar to the CDC way of reading mysql written in the previous article (if you are interested, you can go to my previous article to compare).

However, after running it, when the program finishes reading the data in the current table, it will stop abruptly:

In other words, although we use the context of Flink stream computing (streamexecutionenvironment), because the program uses JDBC to read the data source, so,It still only runs in batches

In this respect, Flink behaves exactly the same as Spark.

Finally

Through the above verification, it can be determined that whether it is Spark or Flink, it is not feasible to stream and read MySQL data sources (or other databases) in the way of JDBC, at least, it is not possible to directly use the official regular army method.

Then, if you want to read incremental data from some databases (such as MySQL) directly through the computing engine, it seems that the best solution is Flink CDC.

Of course, JDBC is not useless, for some lower versions of the database (which CDC does not support for the time being), such as MySQL 55 and below versions of historical data import, it can still come in handy.

Author丨anryg

*丨*** Anruige is a coder (ID: GH C12DC29AE2E7).

The DBAPLUS community welcomes contributions from technical personnel at editor@dbapluscn

Related Pages