实时数仓:常见数据治理工具的脚本示例,涵盖数据质量校验、数据血缘分析和元数据管理等。
以下是基于常见数据治理工具的脚本示例,涵盖数据质量校验、数据血缘分析和元数据管理等。是开源的数据质量管理工具,支持定义数据验证规则、生成报告等。可与 Iceberg 集成,用于管理数据血缘信息。登录 Apache Ranger Web UI。你可以根据自己的数据湖需求选择适配。批量配置规则(使用 REST API)用于设置表和列级别的访问控制。
·
以下是基于常见数据治理工具的脚本示例,涵盖数据质量校验、数据血缘分析和元数据管理等。工具包括 Apache Atlas、Great Expectations 和 Apache Ranger。你可以根据自己的数据湖需求选择适配。
1. 数据质量校验脚本 (Great Expectations)
Great Expectations 是开源的数据质量管理工具,支持定义数据验证规则、生成报告等。
示例:校验 Iceberg 表的数据质量
from great_expectations.data_context import DataContext
# 加载 Great Expectations 配置
context = DataContext("/path/to/great_expectations/")
# 定义数据源:连接 Iceberg 表
datasource_config = {
"name": "iceberg_datasource",
"class_name": "Datasource",
"execution_engine": {
"class_name": "SparkDFExecutionEngine",
"module_name": "great_expectations.execution_engine",
},
"data_connectors": {
"iceberg_connector": {
"class_name": "InferredAssetFilesystemDataConnector",
"base_directory": "hdfs://path/to/warehouse/dwd_order_fact/",
"default_regex": {
"pattern": "(.*)",
"group_names": ["data_asset_name"],
},
},
},
}
context.add_datasource(**datasource_config)
# 创建校验规则
expectation_suite_name = "dwd_order_fact_quality"
suite = context.create_expectation_suite(expectation_suite_name, overwrite_existing=True)
# 添加校验规则
batch = context.get_batch(
batch_request={
"datasource_name": "iceberg_datasource",
"data_connector_name": "iceberg_connector",
"data_asset_name": "dwd_order_fact",
}
)
batch.expect_column_values_to_not_be_null("order_id")
batch.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "CANCELLED"])
batch.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)
# 运行校验
results = context.run_validation_operator(
"action_list_operator",
assets_to_validate=[batch],
)
# 生成数据质量报告
context.build_data_docs()
context.open_data_docs()
2. 数据血缘分析 (Apache Atlas)
Apache Atlas 可与 Iceberg 集成,用于管理数据血缘信息。
示例:注册 Iceberg 表到 Atlas
# 运行 Atlas CLI,注册表元数据
curl -X POST http://<ATLAS_SERVER>:21000/api/atlas/v2/entity \
-u admin:admin \
-H "Content-Type: application/json" \
-d '{
"entities": [
{
"typeName": "iceberg_table",
"attributes": {
"qualifiedName": "real_time_dw.dwd_order_fact@hdfs://path/to/warehouse",
"name": "dwd_order_fact",
"description": "Iceberg 表,用于存储订单数据",
"owner": "data_team",
"columns": [
{"name": "order_id", "type": "STRING", "description": "订单 ID"},
{"name": "amount", "type": "DOUBLE", "description": "订单金额"}
]
}
}
]
}'
示例:通过 Atlas API 获取数据血缘
curl -X GET http://<ATLAS_SERVER>:21000/api/atlas/v2/lineage/hierarchy/real_time_dw.dwd_order_fact \
-u admin:admin \
-H "Content-Type: application/json"
3. 元数据清理与优化 (Iceberg 自带)
清理过期 Snapshots
清理无效快照,减少存储占用:
spark-submit \
--class org.apache.iceberg.spark.SparkActions \
--master yarn \
--deploy-mode client \
--conf spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hadoop_catalog.type=hadoop \
--conf spark.sql.catalog.hadoop_catalog.warehouse=hdfs://path/to/warehouse \
iceberg-spark3-runtime.jar \
ExpireSnapshots \
--table hadoop_catalog.real_time_dw.dwd_order_fact \
--older-than "2025-01-01T00:00:00"
合并小文件
合并小文件以优化查询性能:
spark-submit \
--class org.apache.iceberg.spark.SparkActions \
--master yarn \
--deploy-mode client \
--conf spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hadoop_catalog.type=hadoop \
--conf spark.sql.catalog.hadoop_catalog.warehouse=hdfs://path/to/warehouse \
iceberg-spark3-runtime.jar \
RewriteDataFiles \
--table hadoop_catalog.real_time_dw.dwd_order_fact
4. 权限控制 (Apache Ranger)
Apache Ranger 用于设置表和列级别的访问控制。
示例:配置 Ranger 策略
-
登录 Apache Ranger Web UI。
-
在 Hive Service 中添加策略:
- 资源路径:
real_time_dw.dwd_order_fact - 用户/组:
data_team - 权限:
SELECT
- 资源路径:
-
批量配置规则(使用 REST API)
curl -u admin:admin -X POST \
-H "Content-Type: application/json" \
-d '{
"service": "hive",
"name": "dwd_order_fact_policy",
"policyItems": [
{
"users": ["data_team"],
"accesses": [
{"type": "select", "isAllowed": true}
]
}
],
"resources": {
"database": {"values": ["real_time_dw"], "isExcludes": false, "isRecursive": false},
"table": {"values": ["dwd_order_fact"], "isExcludes": false, "isRecursive": false}
}
}' \
http://<RANGER_SERVER>:6080/service/public/v2/api/policy
5. 自动化数据治理脚本
综合清理与治理脚本
以下脚本结合 Iceberg、Atlas 和 Great Expectations,实现自动化的数据治理:
#!/bin/bash
# 清理过期 Snapshots
echo "Cleaning old snapshots..."
spark-submit \
--class org.apache.iceberg.spark.SparkActions \
--master yarn \
--deploy-mode client \
--conf spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hadoop_catalog.type=hadoop \
--conf spark.sql.catalog.hadoop_catalog.warehouse=hdfs://path/to/warehouse \
iceberg-spark3-runtime.jar \
ExpireSnapshots \
--table hadoop_catalog.real_time_dw.dwd_order_fact \
--older-than "2025-01-01T00:00:00"
# 合并小文件
echo "Merging small files..."
spark-submit \
--class org.apache.iceberg.spark.SparkActions \
--master yarn \
--deploy-mode client \
--conf spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.hadoop_catalog.type=hadoop \
--conf spark.sql.catalog.hadoop_catalog.warehouse=hdfs://path/to/warehouse \
iceberg-spark3-runtime.jar \
RewriteDataFiles \
--table hadoop_catalog.real_time_dw.dwd_order_fact
# 数据质量校验
echo "Running data validation..."
python /path/to/great_expectations_validation.py
# 更新血缘信息
echo "Updating lineage in Atlas..."
curl -X POST http://<ATLAS_SERVER>:21000/api/atlas/v2/entity \
-u admin:admin \
-H "Content-Type: application/json" \
-d '{
"entities": [
{
"typeName": "iceberg_table",
"attributes": {
"qualifiedName": "real_time_dw.dwd_order_fact@hdfs://path/to/warehouse",
"name": "dwd_order_fact",
"description": "Iceberg 表,用于存储订单数据"
}
}
]
}'
echo "Data governance tasks completed."
更多推荐
所有评论(0)