以下是基于常见数据治理工具的脚本示例,涵盖数据质量校验、数据血缘分析和元数据管理等。工具包括 Apache AtlasGreat ExpectationsApache Ranger。你可以根据自己的数据湖需求选择适配。


1. 数据质量校验脚本 (Great Expectations)

Great Expectations 是开源的数据质量管理工具,支持定义数据验证规则、生成报告等。

示例:校验 Iceberg 表的数据质量
from great_expectations.data_context import DataContext

# 加载 Great Expectations 配置
context = DataContext("/path/to/great_expectations/")

# 定义数据源:连接 Iceberg 表
datasource_config = {
    "name": "iceberg_datasource",
    "class_name": "Datasource",
    "execution_engine": {
        "class_name": "SparkDFExecutionEngine",
        "module_name": "great_expectations.execution_engine",
    },
    "data_connectors": {
        "iceberg_connector": {
            "class_name": "InferredAssetFilesystemDataConnector",
            "base_directory": "hdfs://path/to/warehouse/dwd_order_fact/",
            "default_regex": {
                "pattern": "(.*)",
                "group_names": ["data_asset_name"],
            },
        },
    },
}
context.add_datasource(**datasource_config)

# 创建校验规则
expectation_suite_name = "dwd_order_fact_quality"
suite = context.create_expectation_suite(expectation_suite_name, overwrite_existing=True)

# 添加校验规则
batch = context.get_batch(
    batch_request={
        "datasource_name": "iceberg_datasource",
        "data_connector_name": "iceberg_connector",
        "data_asset_name": "dwd_order_fact",
    }
)
batch.expect_column_values_to_not_be_null("order_id")
batch.expect_column_values_to_be_in_set("order_status", ["CREATED", "PAID", "CANCELLED"])
batch.expect_column_values_to_be_between("amount", min_value=0, max_value=100000)

# 运行校验
results = context.run_validation_operator(
    "action_list_operator",
    assets_to_validate=[batch],
)

# 生成数据质量报告
context.build_data_docs()
context.open_data_docs()

2. 数据血缘分析 (Apache Atlas)

Apache Atlas 可与 Iceberg 集成,用于管理数据血缘信息。

示例:注册 Iceberg 表到 Atlas
# 运行 Atlas CLI,注册表元数据
curl -X POST http://<ATLAS_SERVER>:21000/api/atlas/v2/entity \
  -u admin:admin \
  -H "Content-Type: application/json" \
  -d '{
    "entities": [
      {
        "typeName": "iceberg_table",
        "attributes": {
          "qualifiedName": "real_time_dw.dwd_order_fact@hdfs://path/to/warehouse",
          "name": "dwd_order_fact",
          "description": "Iceberg 表,用于存储订单数据",
          "owner": "data_team",
          "columns": [
            {"name": "order_id", "type": "STRING", "description": "订单 ID"},
            {"name": "amount", "type": "DOUBLE", "description": "订单金额"}
          ]
        }
      }
    ]
  }'
示例:通过 Atlas API 获取数据血缘
curl -X GET http://<ATLAS_SERVER>:21000/api/atlas/v2/lineage/hierarchy/real_time_dw.dwd_order_fact \
  -u admin:admin \
  -H "Content-Type: application/json"

3. 元数据清理与优化 (Iceberg 自带)

清理过期 Snapshots

清理无效快照,减少存储占用:

spark-submit \
  --class org.apache.iceberg.spark.SparkActions \
  --master yarn \
  --deploy-mode client \
  --conf spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.hadoop_catalog.type=hadoop \
  --conf spark.sql.catalog.hadoop_catalog.warehouse=hdfs://path/to/warehouse \
  iceberg-spark3-runtime.jar \
  ExpireSnapshots \
  --table hadoop_catalog.real_time_dw.dwd_order_fact \
  --older-than "2025-01-01T00:00:00"
合并小文件

合并小文件以优化查询性能:

spark-submit \
  --class org.apache.iceberg.spark.SparkActions \
  --master yarn \
  --deploy-mode client \
  --conf spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.hadoop_catalog.type=hadoop \
  --conf spark.sql.catalog.hadoop_catalog.warehouse=hdfs://path/to/warehouse \
  iceberg-spark3-runtime.jar \
  RewriteDataFiles \
  --table hadoop_catalog.real_time_dw.dwd_order_fact

4. 权限控制 (Apache Ranger)

Apache Ranger 用于设置表和列级别的访问控制。

示例:配置 Ranger 策略
  1. 登录 Apache Ranger Web UI。

  2. 在 Hive Service 中添加策略:

    • 资源路径real_time_dw.dwd_order_fact
    • 用户/组data_team
    • 权限SELECT
  3. 批量配置规则(使用 REST API)

curl -u admin:admin -X POST \
  -H "Content-Type: application/json" \
  -d '{
        "service": "hive",
        "name": "dwd_order_fact_policy",
        "policyItems": [
          {
            "users": ["data_team"],
            "accesses": [
              {"type": "select", "isAllowed": true}
            ]
          }
        ],
        "resources": {
          "database": {"values": ["real_time_dw"], "isExcludes": false, "isRecursive": false},
          "table": {"values": ["dwd_order_fact"], "isExcludes": false, "isRecursive": false}
        }
      }' \
  http://<RANGER_SERVER>:6080/service/public/v2/api/policy

5. 自动化数据治理脚本

综合清理与治理脚本

以下脚本结合 Iceberg、Atlas 和 Great Expectations,实现自动化的数据治理:

#!/bin/bash

# 清理过期 Snapshots
echo "Cleaning old snapshots..."
spark-submit \
  --class org.apache.iceberg.spark.SparkActions \
  --master yarn \
  --deploy-mode client \
  --conf spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.hadoop_catalog.type=hadoop \
  --conf spark.sql.catalog.hadoop_catalog.warehouse=hdfs://path/to/warehouse \
  iceberg-spark3-runtime.jar \
  ExpireSnapshots \
  --table hadoop_catalog.real_time_dw.dwd_order_fact \
  --older-than "2025-01-01T00:00:00"

# 合并小文件
echo "Merging small files..."
spark-submit \
  --class org.apache.iceberg.spark.SparkActions \
  --master yarn \
  --deploy-mode client \
  --conf spark.sql.catalog.hadoop_catalog=org.apache.iceberg.spark.SparkCatalog \
  --conf spark.sql.catalog.hadoop_catalog.type=hadoop \
  --conf spark.sql.catalog.hadoop_catalog.warehouse=hdfs://path/to/warehouse \
  iceberg-spark3-runtime.jar \
  RewriteDataFiles \
  --table hadoop_catalog.real_time_dw.dwd_order_fact

# 数据质量校验
echo "Running data validation..."
python /path/to/great_expectations_validation.py

# 更新血缘信息
echo "Updating lineage in Atlas..."
curl -X POST http://<ATLAS_SERVER>:21000/api/atlas/v2/entity \
  -u admin:admin \
  -H "Content-Type: application/json" \
  -d '{
    "entities": [
      {
        "typeName": "iceberg_table",
        "attributes": {
          "qualifiedName": "real_time_dw.dwd_order_fact@hdfs://path/to/warehouse",
          "name": "dwd_order_fact",
          "description": "Iceberg 表,用于存储订单数据"
        }
      }
    ]
  }'

echo "Data governance tasks completed."

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐