flink学习33:flinkSQL连接mysql,查询插入数据
1.生成运行时env2.生成表环境3.接上数据流,数据流数据生成表4.把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册5.查询表,可以根据注册表名查询6.插入表,可以根据生成的flink表进行数据插入。
·
总览
1.生成运行时env
2.生成表环境
3.接上数据流,数据流数据生成表
4.把数据库中sink保存数据的表,在flink中生成一遍(相当于把flink生成的表,绑定到数据库中的表),配上数据库连接信息,并执行,及注册
5.查询表,可以根据注册表名查询
6.插入表,可以根据生成的flink表进行数据插入
完整案例:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.{DataTypes, Table}
import org.apache.flink.table.descriptors._
object SqlReadMysql {
def main(args: Array[String]): Unit = {
// creat env
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
//parallelism
bsEnv.setParallelism(1)
//set env
val bsSetting = EnvironmentSettings
.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build()
//create table env
val bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSetting)
//create ds
val dataStream = bsEnv.fromElements(Tuple2("01","lisi" ))
val table1 = bsTableEnv.fromDataStream(dataStream)
//create table
val sinkDDL =
"""
|create table student2_flink (
|code varchar(20) null,
|name varchar(20) null
|)with(
|'connector.type'='jdbc',
|'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',
|'connector.table'='student2',
|'connector.driver'='com.mysql.jdbc.Driver',
|'connector.username'='root',
|'connector.password'='root'
|)
|""".stripMargin
println(sinkDDL)
// execute the create table sql
bsTableEnv.executeSql(sinkDDL)
//register table
val myStudent = bsTableEnv.from("student2_flink")
//execute query
val result = bsTableEnv.sqlQuery(s"select * from $myStudent")
result.toRetractStream[(String, String)].print()
//insert data
table1.executeInsert("student2_flink")
//execute
bsEnv.execute()
}
}
POM文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.sinopharm.gksk</groupId> <artifactId>gksk-bigdata</artifactId> <version>1.0-SNAPSHOT</version> <name>gksk-bigdata</name> <!-- FIXME change it to the project's website --> <url>http://www.example.com</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <log4j.version>1.2.17</log4j.version> <slf4j.version>1.7.25</slf4j.version> <slf4j.api.version>1.7.25</slf4j.api.version> </properties> <!--Flink项目核心依赖--> <dependencies> <!--Flink Java 项目核心依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.14.4</version> </dependency> <!--Flink scala项目核心依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_2.12</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.14.4</version> </dependency> <!--Flink Table API 核心依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-scala-bridge_2.12</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>1.12.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>1.14.4</version> </dependency> <!-- csv--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-csv</artifactId> <version>1.14.4</version> </dependency> <!--以下用到什么引用什么--> <!--Flink Kafka依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.12</artifactId> <version>1.14.4</version> </dependency> <!--Flink rocksdb状态后依赖 --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_2.12</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_2.12</artifactId> <version>1.14.4</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency> <!--本地测试核心依赖--> <!--Flink 本地测试客户端依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.14.4</version> </dependency> <!--Flink 本地测试wei ui依赖 http://127.0.0.1:8081/ --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_2.12</artifactId> <version>1.14.4</version> <scope>runtime</scope> </dependency> <!--junit测试--> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.11</version> <scope>test</scope> </dependency> <!--日志输出--> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>${slf4j.version}</version> <scope>runtime</scope> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>${log4j.version}</version> <scope>runtime</scope> </dependency> </dependencies> <build> <resources> <resource> <directory>src/main/java</directory> </resource> <resource> <directory>src/main/scala</directory> </resource> </resources> <plugins> <!--这里没引打包插件 需要的自己引用--> <!--Java compiler--> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> <!--Java Compiler--> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
Could not instantiate the executor. Make sure a planner module is on the classpath
原因:pom文件中缺少 planner
解决办法:添加
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner_2.12</artifactId> <version>1.14.4</version> </dependency>
ps:注意有时候 配置两个planner也会报错
flinksql 连接mysql报错 JDBC-Class not found. - com.mysql.jdbc.Driver
原因:缺少mysql的jar包
解决:pom文件添加:
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.16</version> </dependency>
open() failed.The server time zone value '�й���ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
原因:URL没有指定时区,jdbc 6.0以上都有这个问题
解决:在URL后边加时区
'connector.url'='jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true&serverTimezone=UTC',
- useUnicode=true 表示使用Unicode字符,因此可以使用中文
- characterEncoding=utf8 设置编码方式
- useSSL=true 设置安全连接
- serverTimezone=UTC 设置全球标准时间
open() failed.Cannot load connection class because of underlying exception: com.mysql.cj.exceptions.WrongArgumentException: Malformed database URL, failed to parse the main URL sections.
原因:连接的URL写错了
解决:好好看看,字符 、格式
更多推荐
已为社区贡献3条内容
所有评论(0)