spring boot整合scala 写一写flink
spring boot整合scala 写一写flink
·
基于scala 2.12
flink 1.14
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.12.7</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-compiler</artifactId>
<version>2.12.7</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.14.5</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>1.14.5</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<id>scala-compile-first</id>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<includes>
<include>**/*.scala</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
lib添加scala sdk
目录结构
@RestController
public class flinkController {
@GetMapping("/test")
public void test(){
Test.tableApi();
}
@GetMapping("/test2")
public void test2(){
//Test2.dataSetApi();
Test2.dataStreamApi();
}
}
flink代码
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Test {
private val host_url = "127.0.0.1"
private val port = 3328
private val db_name = "test"
private val table_input = "salary_table"
private val table_output = "salary_count"
private val user = "root"
private val password = "root"
def tableApi():Unit={
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tableEnv = StreamTableEnvironment.create(env)
tableEnv.executeSql(
s"""
|create table mysqlInput (
| name string,
| salary double
| ) with (
| 'connector.type' = 'jdbc',
| 'connector.url' = 'jdbc:mysql://$host_url:$port/$db_name',
| 'connector.table' = '$table_input',
| 'connector.driver' = 'com.mysql.jdbc.Driver',
| 'connector.username' = '$user',
| 'connector.password' = '$password'
| )
""".stripMargin)
tableEnv.executeSql(
s"""
|create table mysqlOutput (
| salary double,
| cnt bigint not null
| ) with (
| 'connector.type' = 'jdbc',
| 'connector.url' = 'jdbc:mysql://$host_url:$port/$db_name',
| 'connector.table' = '$table_output',
| 'connector.driver' = 'com.mysql.jdbc.Driver',
| 'connector.username' = '$user',
| 'connector.password' = '$password'
| )
""".stripMargin)
val rltTable = tableEnv.sqlQuery(
"""
|select salary,count(1) as cnt
|from mysqlInput
|group by salary
""".stripMargin)
rltTable.executeInsert("mysqlOutput")
}
}
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
object Test2 {
def dataSetApi(): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment;
val dataSet = env.readTextFile("C:\\demo.txt");
val result = dataSet
.flatMap(_.split(" "))
.map((_,1))
.groupBy(0)
.sum(1);
result.print();
}
def dataStreamApi(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment;
val dataStream = env.socketTextStream("url",28062);
val rlt = dataStream
.flatMap(_.split(" "))
.filter(_.nonEmpty)
.map((_,1))
.keyBy(_._1)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1);
rlt.print();
env.execute();
}
更多推荐
所有评论(0)