flume增量采集mysql数据到kafka
flume采集mysql数据到kafka
·
flume安装配置(非集群,单节点)
1.flume-env.sh
设置JAVA_HOME
export JAVA_HOME=/usr/java/jdk1.8.0_221
2.flume连接mysql驱动包准备
进入链接下载源码
https://github.com/keedio/flume-ng-sql-source
现在最新是1.5.3
解压,
进入到目录中编译
C:\Users\asus>D:
D:\>cd \实用工具\新建文件夹\flume-ng-sql-source-1.4.3\flume-ng-sql-source-1.4.3
D:\实用工具\新建文件夹\flume-ng-sql-source-1.4.3\flume-ng-sql-source-1.4.3>mvn package
编译成功
2.flume与kafka,mysql集成
agent.sources = sql-source
agent.sinks = k1
agent.channels = ch
#这个是flume采集mysql的驱动,git地址https://github.com/keedio/flume-ng-sql-source,需要自己编译,编译完成后,将flume-ng-sql-source-1.x.x.jar包放到FLUME_HOME/lib下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下
agent.sources.sql-source.type= org.keedio.flume.source.SQLSource
# URL to connect to database (currently only mysql is supported)
#?useUnicode=true&characterEncoding=utf-8&useSSL=false参数需要加上
agent.sources.sql-source.hibernate.connection.url=jdbc:mysql://hostname:3306/yinqing?useUnicode=true&characterEncoding=utf-8&useSSL=false
# Database connection properties
agent.sources.sql-source.hibernate.connection.user=root
agent.sources.sql-source.hibernate.connection.password =password
agent.sources.sql-source.hibernate.dialect = org.hibernate.dialect.MySQLDialect
#需要将mysql-connector-java-X-bin.jar放到FLUME_HOME/lib下,如果是CM下CDH版本的flume,则放到/opt/cloudera/parcels/CDH-xxxx/lib/flume-ng/lib下此处直接提供5.1.48版本(理论mysql5.x的都可以用)的的wget https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
agent.sources.sql-source.hibernate.driver_class = com.mysql.jdbc.Driver
agent.sources.sql-source.hibernate.connection.autocommit = true
#填写你需要采集的数据表名字
agent.sources.sql-source.table =table_name
agent.sources.sql-source.columns.to.select = *
# Query delay, each configured milisecond the query will be sent
agent.sources.sql-source.run.query.delay=10000
# Status file is used to save last readed row
#储存flume的状态数据,因为是增量查找
agent.sources.sql-source.status.file.path = /var/lib/flume-ng
agent.sources.sql-source.status.file.name = sql-source.status
#kafka.sink配置,此处是集群,需要zookeeper和kafka集群的地址已经端口号,不懂的,看后面kafka的配置已经介绍
agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.k1.topic = yinqing
agent.sinks.k1.brokerList = kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
agent.sinks.k1.batchsize = 200
agent.sinks.kafkaSink.requiredAcks=1
agent.sinks.k1.serializer.class = kafka.serializer.StringEncoder
#此处的zookeeper端口根据配置来,我配的是2180,基本应该是2181
agent.sinks.kafkaSink.zookeeperConnect=zookeeper-node1:2180,zookeeper-node2:2180,zookeeper-node3:2180
agent.channels.ch.type = memory
agent.channels.ch.capacity = 10000
agent.channels.ch.transactionCapacity = 10000
agent.channels.hbaseC.keep-alive = 20
agent.sources.sql-source.channels = ch
agent.sinks.k1.channel = ch
kafka集群安装配置
1.配置server.properties文件
# 集群唯一标识
broker.id=2
#内网
host.name=host_name/IP
#内网
listeners=PLAINTEXT://host_name/IP:9092
#如果是在有外网iP的服务器搭建时需要配置服务器的外网ip
advertised.listeners=PLAINTEXT://123.xxx.xxx.xxx:9092
log.dirs=/opt/moudle/kafka_2.11-2.1.0/logs/tmp/kafka-logs
zookeeper.connect=zookeeper_node1_hostname:2180,zookeeper_node2_hostname:2180,zookeeper_node3_hostname:2180
配置基本前面和最后这些,中间的一般默认即可,可以根据需要修改
2. zookeeper.properties配置
dataDir=/tmp/zookeeper
dataLogDir=/tmp/log/zookeeper
tickTime=2000
initLimit=10
syncLimit=5
# the port at which the clients will connect
clientPort=2180
# disable the per-ip limit on the number of connections since this is a non-production config
maxClientCnxns=100
server.1=192.168.100.81:2888:3888
server.2=192.168.100.95:2888:3888
server.3=192.168.100.224:2888:3888
注意
1.No suitable driver found for jdbc:mysql://xxxxx
一是:连接URL格式出现了问题((“jdbc:mysql://localhost:3306/XX”,“root”,“XXXX”)
二是:驱动字符串出错(com.mysql.jdbc.Driver)
三是Classpath中没有加入合适的mysql_jdbc驱动
四是把 mysql-connector-java-5.1.22-bin.jar放到$JAVA_HOME/jre/lib/ext中
2.kafka常用命令
删除主题
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic test_topic
注意:需要在Broker的配置文件server.properties中配置 delete.topic.enable=true 才能删除主题。
主题信息
bin/kafka-topics.sh --describe --zookeeper localhost:2180 --topic test_topic
添加分区
bin/kafka-topics.sh --alter --zookeeper localhost:2180 --topic test_topic --partitions 3
创建主题
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test_topic
列出主题
bin/kafka-topics.sh --list --zookeeper localhost:2180
3.kafka shell脚本
kafka启动脚本
#/bin/bash
echo "start kafka......"
bin/kafka-server-start.sh -daemon /opt/hiteam_moudle/kafka_2.11-2.1.0/config/server.properties
创建topic
#/bin/bash
echo "create topic of kafka ...."
bin/kafka-topics.sh --create --zookeeper k8s-master:2181,k8s-node1:2181,k8s-node2:2181 --replication-factor 3 --partitions 4 --topic topic2
查看topic
#/bin/bash
echo "list topic........"
./bin/kafka-topics.sh --list --zookeeper 192.168.100.81:2180
启动consumer
#/bin/bash
echo "kafka-flume starting........"
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic yinqing --from-beginning
flume脚本
指定配置文件启动flume
#/bin/bash
echo "flume:mysql-kafka start....."
bin/flume-ng agent --conf conf --conf-file /opt/hiteam_moudle/apache-flume-1.8.0-bin/conf/flume-conf.properties --name agent -Dflume.root.logger=INFO,console
更多推荐
已为社区贡献1条内容
所有评论(0)