Clickhouse 和其他端的连接(kafka,mysql,hdfs)


mysql

  1. 在MySQL创建表和插入数据
DROP TABLE test.test ;
create table test.test (
  id INT  NOT NULL AUTO_INCREMENT,
  cnt INT,
  PRIMARY KEY (id)
);
insert into test.test (id, cnt) VALUES (1,2);
  1. 在ClickHouse中创建MySQL引擎的表
DROP TABLE mysql_table_dup;
CREATE TABLE mysql_table_dup
(
    id Int32,
    cnt Int32
)
ENGINE = MySQL('127.0.0.1:3306', 'test', 'test', 'root', '123456', 0, 'UPDATE cnt=cnt+1');
插入数据并查询:
insert into mysql_table_dup values(2, 1);
select * from mysql_table_dup;
┌─id─┬─cnt─┐
│  12 │
│  21 │
└────┴─────┘
    
插入主键冲突的数据:
insert into mysql_table_dup values(2, 1);
select * from mysql_table_dup;
┌─id─┬─cnt─┐
│  12 │
│  22 │
└────┴─────┘

kafka


用 通过两张表分别保存Kafka的清单数据和分组聚合数据

1. 准备原料

./bin/kafka-topics.sh --create --topic topic_ch --replication-factor 1 --partitions 3 --zookeeper localhost:22181/mykafka

(1)、创建topic的数据流表:
CREATE TABLE topic_ch_kafka (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka(‘localhost:9092’, ‘topic_ch’, ‘group_ch’, ‘JSONEachRow’);

(2)、创建保存清单的表以及以及相应的物化视图:
DROP TABLE topic_ch_list;
CREATE TABLE topic_ch_list (
timestamp UInt64,
level String,
message String
) ENGINE = MergeTree()
order by (timestamp);

DROP TABLE topic_ch_list_view;
CREATE MATERIALIZED VIEW topic_ch_list_view TO topic_ch_list
AS SELECT timestamp, level, message
FROM topic_ch_kafka;
(3)、创建统计聚合的表以及以及相应的物化视图:
DROP TABLE topic_ch_daily;
CREATE TABLE topic_ch_daily (
day Date,
level String,
total UInt64
) ENGINE = SummingMergeTree(day)
ORDER BY (day, level);

DROP TABLE topic_ch_daily_view;
CREATE MATERIALIZED VIEW topic_ch_daily_view TO topic_ch_daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM topic_ch_kafka GROUP BY day, level;

2. 生产数据

./bin/kafka-console-producer.sh --topic topic_ch --broker-list localhost:9092

准备数据:
{“timestamp”:1542424134, “level”:“high”, “message”:“hehe”}
{“timestamp”:1542424132, “level”:“high”, “message”:“hehe”}
{“timestamp”:1542424133, “level”:“mid”, “message”:“hehe”}
{“timestamp”:1542424134, “level”:“low”, “message”:“hehe”}
{“timestamp”:1542434134, “level”:“high”, “message”:“hehe”}
{“timestamp”:1542424134, “level”:“low”, “message”:“hehe”}
{“timestamp”:1542424434, “level”:“low”, “message”:“hehe”}
{“timestamp”:1542424134, “level”:“low”, “message”:“hehe”}
{“timestamp”:1542424136, “level”:“high”, “message”:“hehe”}
{“timestamp”:1542424134, “level”:“high”, “message”:“hehe”}
{“timestamp”:1542424134, “level”:“high”, “message”:“hehe”}

  1. 查看数据
    清单表:
    select * from topic_ch_list;
    😃 select * from topic_ch_list;

    SELECT *
    FROM topic_ch_list

    ┌──timestamp─┬─level─┬─message─┐
    │ 1542424132 │ high │ hehe │
    │ 1542424133 │ mid │ hehe │
    │ 1542424134 │ low │ hehe │

    │ 1542424136 │ high │ hehe │
    │ 1542424434 │ low │ hehe │
    │ 1542434134 │ high │ hehe │
    └────────────┴───────┴─────────┘

    16 rows in set. Elapsed: 0.004 sec.

聚合统计表:
SELECT level, sum(total) FROM topic_ch_daily GROUP BY level;
😃 SELECT level, sum(total) FROM topic_ch_daily GROUP BY level;

SELECT
level,
sum(total)
FROM topic_ch_daily
GROUP BY level

┌─level─┬─sum(total)─┐
│ mid │ 3 │
│ low │ 5 │
│ high │ 8 │
└───────┴────────────┘

3 rows in set. Elapsed: 0.004 sec.

detach视图:
DETACH TABLE topic_ch_list_view;

Kafka的配置

在目录/etc/clickhouse-server/config.d/新建配置文件,配置文件名称任意指定,这里命名为kafka.xml。

配置内容如下:


cgrp
<auto_offset_reset>smallest</auto_offset_reset>

<kafka_topic_ch>
<auto_offset_reset>latest</auto_offset_reset>
<retry_backoff_ms>250</retry_backoff_ms>
<fetch_min_bytes>100000</fetch_min_bytes>
</kafka_topic_ch>

CREATE TABLE topic_ch_kafka3 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka(‘localhost:9092’, ‘topic_ch’, ‘group_ch3’, ‘JSONEachRow’);

HDFS

1. 创建HDFS引擎表

drop table hdfs_engine_table;
CREATE TABLE hdfs_engine_table (name String, value UInt32) ENGINE=HDFS(‘hdfs://xiaochen:8021/tmp/hdfs’, ‘TSV’);

  1. 插入数据
    INSERT INTO hdfs_engine_table VALUES (‘one’, 1), (‘two’, 2), (‘three’, 3);

  2. 查看数据
    SELECT * FROM hdfs_engine_table LIMIT 2;
    ┌─name──┬─value─┐
    │ one │ 1 │
    │ two │ 2 │
    │ three │ 3 │
    └───────┴───────┘

  3. 查看HDFS路径
    $ hdfs dfs -ls hdfs://xiaochen:8021/tmp/hdfs
    -rwxrwxrwx 3 clickhouse supergroup 20 2020-03-14 13:57 hdfs://xiaochen:8021/tmp/other_storage

JDBC

1. clickhouse-jdbc-bridge的安装

下载或编译源码,生成jar包:clickhouse-jdbc-bridge-1.0.1.jar 。
将MySQL的驱动包存放在主机的目录,启动jdbc-bridge服务时需指定此目录:/root/jdbc/lib。
启动服务:
java -jar ./clickhouse-jdbc-bridge-1.0.1.jar --driver-path /root/jdbc/lib/
日志:
2020-03-14 14:18:11,955 [ main ] {JdbcBridge} - Starting jdbc-bridge
2020-03-14 14:18:11,981 [ main ] {ClickHouseDriver} - Driver registered
2020-03-14 14:18:11,985 [ main ] {JdbcDriverLoader} - Looking for driver files in /root/jdbc/lib
2020-03-14 14:18:12,039 [ main ] {JdbcDriverLoader} - Found 1 JAR file
2020-03-14 14:18:12,040 [ main ] {JdbcDriverLoader} - Looking for driver in file /root/jdbc/lib/mysql-connector-java-5.1.44-bin.jar
2020-03-14 14:18:12,047 [ main ] {JdbcDriverLoader} - Registered driver com.mysql.jdbc.Driver
2020-03-14 14:18:12,048 [ main ] {JdbcDriverLoader} - Registered driver com.mysql.fabric.jdbc.FabricMySQLDriver
2020-03-14 14:18:12,115 [ main ] {JdbcBridge} - Will bind to localhost/127.0.0.1:9019
2020-03-14 14:18:12,173 [ main ] {JdbcBridge} - Starting server
2020-03-14 14:18:12,229 [ main ] {JdbcBridge} - Server is ready to accept connections

2. 准备MySQL表的数据

use test;
​
CREATE TABLE `test`.`test` (
  `int_id` INT NOT NULL AUTO_INCREMENT,
  `int_nullable` INT NULL DEFAULT NULL,
  `float` FLOAT NOT NULL,
  `float_nullable` FLOAT NULL DEFAULT NULL,
  PRIMARY KEY (`int_id`));
​
​
insert into test (`int_id`, `float`) VALUES (1,2);
​
select * from test;
+--------+--------------+-------+----------------+
| int_id | int_nullable | float | float_nullable |
+--------+--------------+-------+----------------+
|      1 |         NULL |     2 |           NULL |
+--------+--------------+-------+----------------+
1 row in set (0.00 sec)
3. 在ClickHouse创建JDBC引擎表
CREATE TABLE jdbc_table
(
    `int_id` Int32,
    `int_nullable` Nullable(Int32),
    `float` Float32,
    `float_nullable` Nullable(Float32)
)
ENGINE JDBC('jdbc:mysql://127.0.0.1:3306/?user=root&password=123456', 'test', 'test');
查看数据:
:) select int_id, float from jdbc_table;
​
SELECT 
    int_id, 
    float
FROM jdbc_table
​
┌─int_id─┬─float─┐
│      1 │     2 │
└────────┴───────┘


Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐