Clickhouse 和其他端的连接(kafka,mysql,hdfs)
Clickhouse 和其他端的连接(kafka,mysql,hdfs)mysql在MySQL创建表和插入数据DROP TABLE test.test ;create table test.test (id INTNOT NULL AUTO_INCREMENT,cnt INT,PRIMARY KEY (id));insert into test.test (id, cnt) VALUES (1,2
Clickhouse 和其他端的连接(kafka,mysql,hdfs)
mysql
- 在MySQL创建表和插入数据
DROP TABLE test.test ;
create table test.test (
id INT NOT NULL AUTO_INCREMENT,
cnt INT,
PRIMARY KEY (id)
);
insert into test.test (id, cnt) VALUES (1,2);
- 在ClickHouse中创建MySQL引擎的表
DROP TABLE mysql_table_dup;
CREATE TABLE mysql_table_dup
(
id Int32,
cnt Int32
)
ENGINE = MySQL('127.0.0.1:3306', 'test', 'test', 'root', '123456', 0, 'UPDATE cnt=cnt+1');
插入数据并查询:
insert into mysql_table_dup values(2, 1);
select * from mysql_table_dup;
┌─id─┬─cnt─┐
│ 1 │ 2 │
│ 2 │ 1 │
└────┴─────┘
插入主键冲突的数据:
insert into mysql_table_dup values(2, 1);
select * from mysql_table_dup;
┌─id─┬─cnt─┐
│ 1 │ 2 │
│ 2 │ 2 │
└────┴─────┘
kafka
用 通过两张表分别保存Kafka的清单数据和分组聚合数据
1. 准备原料
./bin/kafka-topics.sh --create --topic topic_ch --replication-factor 1 --partitions 3 --zookeeper localhost:22181/mykafka
(1)、创建topic的数据流表:
CREATE TABLE topic_ch_kafka (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka(‘localhost:9092’, ‘topic_ch’, ‘group_ch’, ‘JSONEachRow’);
(2)、创建保存清单的表以及以及相应的物化视图:
DROP TABLE topic_ch_list;
CREATE TABLE topic_ch_list (
timestamp UInt64,
level String,
message String
) ENGINE = MergeTree()
order by (timestamp);
DROP TABLE topic_ch_list_view;
CREATE MATERIALIZED VIEW topic_ch_list_view TO topic_ch_list
AS SELECT timestamp, level, message
FROM topic_ch_kafka;
(3)、创建统计聚合的表以及以及相应的物化视图:
DROP TABLE topic_ch_daily;
CREATE TABLE topic_ch_daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day)
ORDER BY (day, level);
DROP TABLE topic_ch_daily_view;
CREATE MATERIALIZED VIEW topic_ch_daily_view TO topic_ch_daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM topic_ch_kafka GROUP BY day, level;
2. 生产数据
./bin/kafka-console-producer.sh --topic topic_ch --broker-list localhost:9092
准备数据:
{“timestamp”:1542424134, “level”:“high”, “message”:“hehe”}
{“timestamp”:1542424132, “level”:“high”, “message”:“hehe”}
{“timestamp”:1542424133, “level”:“mid”, “message”:“hehe”}
{“timestamp”:1542424134, “level”:“low”, “message”:“hehe”}
{“timestamp”:1542434134, “level”:“high”, “message”:“hehe”}
{“timestamp”:1542424134, “level”:“low”, “message”:“hehe”}
{“timestamp”:1542424434, “level”:“low”, “message”:“hehe”}
{“timestamp”:1542424134, “level”:“low”, “message”:“hehe”}
{“timestamp”:1542424136, “level”:“high”, “message”:“hehe”}
{“timestamp”:1542424134, “level”:“high”, “message”:“hehe”}
{“timestamp”:1542424134, “level”:“high”, “message”:“hehe”}
- 查看数据
清单表:
select * from topic_ch_list;
😃 select * from topic_ch_list;
SELECT *
FROM topic_ch_list
┌──timestamp─┬─level─┬─message─┐
│ 1542424132 │ high │ hehe │
│ 1542424133 │ mid │ hehe │
│ 1542424134 │ low │ hehe │
…
│ 1542424136 │ high │ hehe │
│ 1542424434 │ low │ hehe │
│ 1542434134 │ high │ hehe │
└────────────┴───────┴─────────┘
16 rows in set. Elapsed: 0.004 sec.
聚合统计表:
SELECT level, sum(total) FROM topic_ch_daily GROUP BY level;
😃 SELECT level, sum(total) FROM topic_ch_daily GROUP BY level;
SELECT
level,
sum(total)
FROM topic_ch_daily
GROUP BY level
┌─level─┬─sum(total)─┐
│ mid │ 3 │
│ low │ 5 │
│ high │ 8 │
└───────┴────────────┘
3 rows in set. Elapsed: 0.004 sec.
detach视图:
DETACH TABLE topic_ch_list_view;
Kafka的配置
在目录/etc/clickhouse-server/config.d/新建配置文件,配置文件名称任意指定,这里命名为kafka.xml。
配置内容如下:
cgrp
<auto_offset_reset>smallest</auto_offset_reset>
<kafka_topic_ch>
<auto_offset_reset>latest</auto_offset_reset>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic_ch>
CREATE TABLE topic_ch_kafka3 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka(‘localhost:9092’, ‘topic_ch’, ‘group_ch3’, ‘JSONEachRow’);
HDFS
1. 创建HDFS引擎表
drop table hdfs_engine_table;
CREATE TABLE hdfs_engine_table (name String, value UInt32) ENGINE=HDFS(‘hdfs://xiaochen:8021/tmp/hdfs’, ‘TSV’);
-
插入数据
INSERT INTO hdfs_engine_table VALUES (‘one’, 1), (‘two’, 2), (‘three’, 3); -
查看数据
SELECT * FROM hdfs_engine_table LIMIT 2;
┌─name──┬─value─┐
│ one │ 1 │
│ two │ 2 │
│ three │ 3 │
└───────┴───────┘ -
查看HDFS路径
$ hdfs dfs -ls hdfs://xiaochen:8021/tmp/hdfs
-rwxrwxrwx 3 clickhouse supergroup 20 2020-03-14 13:57 hdfs://xiaochen:8021/tmp/other_storage
JDBC
1. clickhouse-jdbc-bridge的安装
下载或编译源码,生成jar包:clickhouse-jdbc-bridge-1.0.1.jar 。
将MySQL的驱动包存放在主机的目录,启动jdbc-bridge服务时需指定此目录:/root/jdbc/lib。
启动服务:
java -jar ./clickhouse-jdbc-bridge-1.0.1.jar --driver-path /root/jdbc/lib/
日志:
2020-03-14 14:18:11,955 [ main ] {JdbcBridge} - Starting jdbc-bridge
2020-03-14 14:18:11,981 [ main ] {ClickHouseDriver} - Driver registered
2020-03-14 14:18:11,985 [ main ] {JdbcDriverLoader} - Looking for driver files in /root/jdbc/lib
2020-03-14 14:18:12,039 [ main ] {JdbcDriverLoader} - Found 1 JAR file
2020-03-14 14:18:12,040 [ main ] {JdbcDriverLoader} - Looking for driver in file /root/jdbc/lib/mysql-connector-java-5.1.44-bin.jar
2020-03-14 14:18:12,047 [ main ] {JdbcDriverLoader} - Registered driver com.mysql.jdbc.Driver
2020-03-14 14:18:12,048 [ main ] {JdbcDriverLoader} - Registered driver com.mysql.fabric.jdbc.FabricMySQLDriver
2020-03-14 14:18:12,115 [ main ] {JdbcBridge} - Will bind to localhost/127.0.0.1:9019
2020-03-14 14:18:12,173 [ main ] {JdbcBridge} - Starting server
2020-03-14 14:18:12,229 [ main ] {JdbcBridge} - Server is ready to accept connections
2. 准备MySQL表的数据
use test;
CREATE TABLE `test`.`test` (
`int_id` INT NOT NULL AUTO_INCREMENT,
`int_nullable` INT NULL DEFAULT NULL,
`float` FLOAT NOT NULL,
`float_nullable` FLOAT NULL DEFAULT NULL,
PRIMARY KEY (`int_id`));
insert into test (`int_id`, `float`) VALUES (1,2);
select * from test;
+--------+--------------+-------+----------------+
| int_id | int_nullable | float | float_nullable |
+--------+--------------+-------+----------------+
| 1 | NULL | 2 | NULL |
+--------+--------------+-------+----------------+
1 row in set (0.00 sec)
3. 在ClickHouse创建JDBC引擎表
CREATE TABLE jdbc_table
(
`int_id` Int32,
`int_nullable` Nullable(Int32),
`float` Float32,
`float_nullable` Nullable(Float32)
)
ENGINE JDBC('jdbc:mysql://127.0.0.1:3306/?user=root&password=123456', 'test', 'test');
查看数据:
:) select int_id, float from jdbc_table;
SELECT
int_id,
float
FROM jdbc_table
┌─int_id─┬─float─┐
│ 1 │ 2 │
└────────┴───────┘
更多推荐
所有评论(0)