一 、以批处理的方式读数据

        //读取数据库表的数据类型
        TypeInformation[] typeInformations = {
            BasicTypeInfo.STRING_TYPE_INFO,         
            BasicTypeInfo.STRING_TYPE_INFO,
            BasicTypeInfo.STRING_TYPE_INFO, 
            BasicTypeInfo.INT_TYPE_INFO
        };

        RowTypeInfo rowTypeInfo = new RowTypeInfo(typeInformations);

        //连接数据库
        JDBCInputFormat jdbcInputFormat=JDBCInputFormat.buildJDBCInputFormat()
                .setDrivername("com.mysql.cj.jdbc.Driver")
                .setDBUrl("jdbc:mysql://localhost:3306/库名称?serverTimezone=GMT%2B8&useSSL=false&characterEncoding=utf-8")
                .setUsername("用户名")
                .setPassword("密码")
                .setQuery("select * from 表名称")
                .setRowTypeInfo(rowTypeInfo).finish();



        //从数据库中获取限速表数据
        DataStreamSource<Row> input = env.createInput(jdbcInputFormat);
        input.print();//打印查询数据

这里方式是以批处理的方式执行的,执行完之后程序就结束了.这里只是关键代码,你得自己创建流式环境env

二、以自定义数据源的方式及线程休眠的方式,定时获取数据库数据

//从数据库读取数据源
class MyJDBCSource extends RichParallelSourceFunction<Monitor> {
    private volatile boolean isRunning = true;
    private Connection conn=null;
    private PreparedStatement ps=null;
    @Override
    public void open(Configuration parameters) throws Exception {
        conn= MySQLConnection.getConn();//这里是我自定义一个工具类获取一个数据库连接,所以你得自己创建一个连接
        ps = conn.prepareStatement("select * from t_monitor_speed_limit_info");
    }

    @Override
    public void run(SourceContext<Monitor> ctx) throws Exception {
        while (isRunning) {
            ResultSet resultSet = ps.executeQuery();
            while (resultSet.next()){
                ctx.collect(new Monitor(resultSet.getString(1),resultSet.getString(2)
                ,resultSet.getString(3),resultSet.getInt(4)));
            }
            Thread.sleep(1000*60);//每6分中读取一次数据库
        }
    }

    @Override
    public void cancel() { //这个方法用于释放资源

        try {
            ps.close();
            MySQLConnection.bounce(conn);
            isRunning = false;
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }

    }
}



//自己创建一个类
public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env= Env.getEnv();
        env.setParallelism(1);
        env.addSource(new MyJDBCSource())
        .print();
        env.execute();
    }

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐