BigQuery 数据仓库使用指南

在当今的企业环境中,越来越多的企业依赖实时数据和分析来推动业务决策,数据仓库技术变得愈发关键。Google Cloud 的 BigQuery 作为一款无服务器、具备 PB 级规模的数据仓库,在数据存储、大规模分析甚至基于 SQL 的机器学习模型等方面表现出色。它无需创建集群,只需将数据上传即可开始查询,并且由于计算和存储分离且可独立扩展,成本效益极高。下面将详细介绍 BigQuery 的一些常见操作。

1. 使用 Cloud Console 运行 BigQuery 查询
  • 问题 :想要快速开始使用 BigQuery。
  • 解决方案
    1. 从 Google Cloud Console 打开 BigQuery。
    2. 在已打开的查询窗口中编写查询语句,完成后按 Ctrl + Shift + F 自动格式化查询,然后点击“运行”。
    3. 几秒内即可看到查询结果。再次运行查询会更快,因为 BigQuery 会缓存查询结果长达 24 小时,可在查询设置中关闭此功能,便于进行查询性能基准测试。
  • 操作步骤
graph LR
    A[打开 Google Cloud Console] --> B[打开 BigQuery]
    B --> C[编写查询语句]
    C --> D[按 Ctrl + Shift + F 格式化]
    D --> E[点击运行]
    E --> F[查看结果]
    F --> G{再次运行?}
    G -- 是 --> H[更快获取结果]
    G -- 否 --> I[结束]
  • 讨论 :BigQuery 控制台是使用该服务的理想起点,可探索数据、创建数据集、浏览项目查询历史,还能保存查询与队友分享。一些有用的热键如下:
    | 按键组合 | 操作 |
    | ---- | ---- |
    | Ctrl + Shift + F | 自动格式化查询 |
    | Ctrl + Enter | 执行整个查询窗口 |
    | Ctrl + E (选中时) | 仅执行所选 SQL |

需要注意的是,BigQuery 缓存虽能加速数据探索并避免重复查询成本,但在生产环境中不应依赖,因为缓存结果不一定能持续 24 小时。若需要快速缓存结果,可将数据存储在定期刷新的表中。此外,缓存机制较为简单,查询文本的细微变化(如添加空格)都会导致缓存失效,子查询、视图等部分也不会被缓存。

2. 从 CSV 文件加载数据到 BigQuery
  • 问题 :有本地 CSV 数据,想加载到 BigQuery 进行分析。
  • 解决方案
    1. 下载示例文件,运行 gsutil cp gs://cloud-samples-data/bigquery/us-states/us-states-by-date.csv 命令(需安装 gcloud)。
    2. 打开 BigQuery 控制台。
    3. 创建新数据集,点击浏览器左侧项目名称旁的三个点,选择“创建数据集”。
    4. 将数据集命名为 mydataset ,选择处理位置为 US(此设置不可更改)。
    5. 点击数据集上的三个点并打开,点击“创建表”。
    6. 在对话框中输入以下信息:
      | 选项 | 值 |
      | ---- | ---- |
      | 创建表来源 | 上传 |
      | 选择文件 | 之前下载的文件 |
      | 文件格式 | CSV |
      | 数据集名称 | mydataset |
      | 表名称 | us - states - by - date |
      | 模式自动检测 | 勾选“是” |
    7. 表创建完成后,可检查生成的模式,通过“预览”标签查看部分数据,最后点击“查询”按钮查询数据,运行以下 SQL 查找美国最新的五个州:
SELECT * 
FROM 
  `dhodun1.mydataset.us-states-by-date` 
ORDER BY 
  date DESC 
LIMIT 
  5
  • 操作步骤
graph LR
    A[下载示例 CSV 文件] --> B[打开 BigQuery 控制台]
    B --> C[创建新数据集]
    C --> D[命名数据集并选择位置]
    D --> E[创建表]
    E --> F[填写表信息]
    F --> G[创建表完成]
    G --> H[检查模式和数据]
    H --> I[查询数据]
  • 讨论 :这是最简单的上传用例之一,可上传多种数据类型,如 CSV、AVRO、JSON 和 Parquet。通常会从 GCS 或 S3 上传数据,也可设置 BigQuery 数据传输作业定期从外部源拉取数据。还能通过编程方式将许多数据处理框架(如 Spark、Dataflow、Data Fusion 等)的数据写入 BigQuery。
3. 在 BigQuery 中构建透视表
  • 问题 :想在 BigQuery 中使用 SQL 构建透视表。
  • 解决方案
    1. 打开 BigQuery UI,运行以下查询计算每个起始站每周各天的平均骑行时长:
SELECT 
  start_station_name, 
  EXTRACT(DAYOFWEEK 
  FROM 
    end_date) AS day_of_week, 
  AVG(duration) AS average_ride_duration 
FROM 
  `bigquery-public-data.london_bicycles.cycle_hire` 
GROUP BY 
  start_station_name, 
  day_of_week
2. 使用 `PIVOT` 子句对数据进行透视,使每周的每一天都有自己的列:
SELECT * 
FROM ( 
  SELECT 
    start_station_name, 
    EXTRACT(DAYOFWEEK FROM end_date) AS day_of_week, 
    duration 
  FROM 
    `bigquery-public-data.london_bicycles.cycle_hire` 
) 
PIVOT 
( 
  AVG(duration) AS average_ride_duration 
  FOR day_of_week IN (1,2,3,4,5,6,7) 
)
  • 操作步骤
graph LR
    A[打开 BigQuery UI] --> B[运行初始查询]
    B --> C[查看未透视结果]
    C --> D[使用 PIVOT 子句查询]
    D --> E[查看透视结果]
  • 讨论 :透视操作通常是商业智能工具支持的显示功能,但在 BigQuery 中进行快速数据探索时也很有用。以前实现此功能较为复杂,需要 CASE 语句或存储过程。现在还有 UNPIVOT 运算符,可将列转换回行。
4. 向现有表添加分区和聚类列
  • 问题 :需要向 BigQuery 表添加分区列和聚类列以提高查询性能并降低查询成本。
  • 解决方案
    1. 验证源表是否有分区或聚类列,在本地终端或 Cloud Shell 运行 bq show bigquery - public - data:london_bicycles.cycle_hire
    2. 进行基于时间的查询干运行,查看扫描的数据量:
QUERY=' 
SELECT 
  duration, 
  bike_id, 
  start_date, 
  start_station_name, 
  end_date, 
  end_station_name 
FROM 
  `bigquery-public-data.london_bicycles.cycle_hire` 
WHERE 
  EXTRACT(DATE FROM start_date ) = "2016-04-03" 
  AND EXTRACT(DATE FROM end_date ) = "2016-04-03" 
ORDER BY 
  duration DESC 
LIMIT 
  5 
' 

bq query \ 
--use_legacy_sql=false \ 
--location=EU \ 
--dry_run \ 
$QUERY
3. 若未在 EU 区域创建数据集,使用 `bq mk --location = eu mydataset_eu` 创建。
4. 在 BigQuery UI 中创建新表,添加分区和聚类列:
CREATE OR REPLACE TABLE 
  mydataset_eu.cycle_hire_partitioned_clustered 
PARTITION BY 
  DATE(start_date) 
CLUSTER BY 
  bike_id 
OPTIONS( expiration_timestamp=TIMESTAMP "2025-01-01 00:00:00 UTC", 
     description="My partitioned and clustered cycle_hire table", 
     labels=[("cookbook_query", "development")] ) 
AS 
SELECT 
  * 
FROM 
  `bigquery-public-data.london_bicycles.cycle_hire`
5. 验证目标表是否有分区或聚类列,运行 `bq show mydataset_eu.cycle_hire_partitioned_clustered`。
6. 对新表进行查询干运行,查看扫描的数据量:
QUERY=' 
SELECT 
  duration, 
  bike_id, 
  start_date, 
  start_station_name, 
  end_date, 
  end_station_name 
FROM 
  `mydataset_eu.cycle_hire_partitioned_clustered` 
WHERE 
  EXTRACT(DATE FROM start_date ) = "2016-04-03" 
  AND EXTRACT(DATE FROM end_date ) = "2016-04-03" 
ORDER BY 
  duration DESC 
LIMIT 
  5 
' 

bq query \ 
--use_legacy_sql=false \ 
--location=EU \ 
--dry_run \ 
$QUERY
7. 操作完成后,删除表和数据集以避免费用。
  • 操作步骤
graph LR
    A[验证源表] --> B[查询干运行]
    B --> C{是否创建 EU 数据集?}
    C -- 否 --> D[创建数据集]
    C -- 是 --> E[创建新表]
    E --> F[验证目标表]
    F --> G[新表查询干运行]
    G --> H[删除表和数据集]
  • 讨论 :分区和聚类是 BigQuery 中提高性能和降低成本的有效方法。分区通常基于日期或时间将表分割成较小的段,查询时可避免读取不匹配过滤条件的分区数据。聚类则根据指定列自动组织和排序表数据,相关数据会存储在一起,提高查询性能。此示例还展示了使用 bq 命令行工具的干运行功能在运行查询前确定成本,该功能在 BigQuery UI 中也可用。
5. 向无法或不应分区的表添加聚类
  • 问题 :想向 BigQuery 表添加聚类列,但表没有明显的分区候选列(如 DATETIME INTEGER 列),且表较小(每天添加数据少于 10 GB),添加分区列可能不合适。
  • 解决方案
    1. 运行以下 SQL 创建包含新的虚拟日期列的表:
CREATE OR REPLACE TABLE 
  mydataset_eu.cycle_hire_fake_partitioned_clustered ( 
    rental_id INTEGER, 
    duration INTEGER, 
    bike_id INTEGER, 
    end_date TIMESTAMP, 
    end_station_id INTEGER, 
    end_station_name STRING, 
    start_date TIMESTAMP, 
    start_station_id INTEGER, 
    start_station_name STRING, 
    end_station_logical_terminal INTEGER, 
    start_station_logical_terminal INTEGER, 
    end_station_priority_id INTEGER, 
    fake_date DATE ) 
PARTITION BY 
  fake_date 
CLUSTER BY 
  bike_id 
OPTIONS( expiration_timestamp=TIMESTAMP "2025-01-01 00:00:00 UTC", 
    description="My partitioned and clustered cycle_hire table", 
    labels=[("cookbook_query", "development")] ) AS 
SELECT 
  *, NULL 
FROM 
  `bigquery-public-data.london_bicycles.cycle_hire`
2. 运行查询查看自行车 153 的最大骑行时长:
SELECT MAX(duration) as max_duration FROM 
`mydataset_eu.cycle_hire_fake_partitioned_clustered` 
WHERE bike_id =  153
3. 在原数据集上运行相同查询:
SELECT MAX(duration) as max_duration FROM 
`bigquery-public-data.london_bicycles.cycle_hire` 
WHERE bike_id =  153
4. 若上一个示例中的表还存在,也对其运行相同查询。
  • 操作步骤
graph LR
    A[运行 SQL 创建表] --> B[查询新表最大时长]
    B --> C[查询原数据集最大时长]
    C --> D{上一示例表存在?}
    D -- 是 --> E[查询上一示例表最大时长]
    D -- 否 --> F[结束]
  • 讨论 :对于较小的表,使用虚拟分区列后进行聚类可提高性能。对于没有明显分区列或每天添加数据超过 10 GB 的大表,建议基于插入时间伪列(如 _PARTITIONDATE _PARTITIONTIME )进行分区。
6. 选择排名第一的结果
  • 问题 :想在 BigQuery 查询中从排序列表返回排名第一的项目,但数据集特别大,查询速度慢。
  • 解决方案
    1. 在 BigQuery UI 中使用 ROW_NUMBER() OVER() 窗口函数运行查询:
SELECT 
  rental_id, 
  duration, 
  bike_id, 
  end_date 
FROM ( 
  SELECT 
    rental_id, 
    duration, 
    bike_id, 
    end_date, 
    ROW_NUMBER() OVER (ORDER BY end_date ASC) rental_num 
  FROM 
    `bigquery-public-data`.london_bicycles.cycle_hire ) 
WHERE 
  rental_num = 1
2. 使用 `ARRAY_AGG(x LIMIT 1)[OFFSET(0)]` 运行查询:
SELECT 
  rental.* 
FROM ( 
  SELECT 
    ARRAY_AGG( rentals 
    ORDER BY rentals.end_date ASC LIMIT 1)[OFFSET(0)] rental 
  FROM ( 
    SELECT 
      rental_id, 
      duration, 
      bike_id, 
      end_date 
    FROM 
      `bigquery-public-data.london_bicycles.cycle_hire`) 
rentals )
  • 操作步骤
graph LR
    A[打开 BigQuery UI] --> B[使用 ROW_NUMBER 运行查询]
    B --> C[使用 ARRAY_AGG 运行查询]
  • 讨论 :虽然 ROW_NUMBER() 可进行分布式排序,但对于大数据集,使用 ARRAY_AGG 可让 BigQuery 丢弃不需要的数据,提高查询性能,这是 BigQuery 特有的技巧,适用于只需要前 n 个结果的情况。
7. 在 BigQuery 中合并表而不产生重复项
  • 问题 :将数据加载到现有 BigQuery 表时可能引入重复行,需要对数据进行去重。
  • 解决方案
    1. 创建生产表,复制公共伦敦自行车租赁数据集:
bq cp -f bigquery-public-data:london_bicycles.cycle_hire mydataset_eu.cycle_hire
2. 创建包含新数据和重复数据的加载表:
CREATE OR REPLACE TABLE 
  mydataset_eu.temp_loading_table AS ( 
  --Grab 5 duplicate rows 
  SELECT 
    * 
  FROM 
    mydataset_eu.cycle_hire 
  LIMIT 
    5) 
UNION ALL ( 
  --Add a new unique row 
  SELECT 
    111147469109, 
    3180, 
    7054, 
    '2015-09-03 12:45:00 UTC', 
    111, 
    'Park Lane, Hyde Park', 
    '2015-09-03 11:52:00 UTC', 
    300, 
    'Serpentine Car Park, Hyde Park', 
    NULL, 
    NULL, 
    NULL)
3. 验证并记录两个表的行数:
--Number of Rows in base table 
SELECT COUNT(*) FROM mydataset_eu.cycle_hire; 

--Number of Rows in loading table 
SELECT COUNT(*) FROM mydataset_eu.temp_loading_table;
4. 使用 `MERGE` 语句将数据合并到生产表:
MERGE 
  mydataset_eu.cycle_hire rentals 
USING 
  mydataset_eu.temp_loading_table temp 
ON 
  temp.rental_id = rentals.rental_id 
  WHEN NOT MATCHED 
  THEN 
    INSERT ROW
5. 验证生产表是否只插入了一行:
--Number of rows now in base table 
SELECT COUNT(*) FROM mydataset_eu.cycle_hire;
  • 操作步骤
graph LR
    A[创建生产表] --> B[创建加载表]
    B --> C[验证两表行数]
    C --> D[合并数据]
    D --> E[验证插入行数]
  • 讨论 :此方法可根据指定的行键插入数据而不产生重复项。也可使用 SELECT DISTINCT * FROM mydataset.mytable 进行去重。

BigQuery 数据仓库使用指南

5. 向无法或不应分区的表添加聚类(续)

在某些情况下,我们可能会遇到 BigQuery 表没有合适的分区候选列,或者表规模较小,添加分区列并非最佳选择,但又希望通过聚类来提升查询性能。

对于表中没有明显的 DATETIME INTEGER 列作为分区依据,且每天添加的数据少于 10GB 的情况,我们可以采用一些特殊的方法来实现聚类。

一种可行的方式是创建一个虚拟的日期列进行分区。以伦敦自行车租赁数据集为例,我们可以运行以下 SQL 来创建一个包含虚拟分区列和聚类列的新表:

CREATE OR REPLACE TABLE 
  mydataset_eu.cycle_hire_fake_partitioned_clustered ( 
    rental_id INTEGER, 
    duration INTEGER, 
    bike_id INTEGER, 
    end_date TIMESTAMP, 
    end_station_id INTEGER, 
    end_station_name STRING, 
    start_date TIMESTAMP, 
    start_station_id INTEGER, 
    start_station_name STRING, 
    end_station_logical_terminal INTEGER, 
    start_station_logical_terminal INTEGER, 
    end_station_priority_id INTEGER, 
    fake_date DATE ) 
PARTITION BY 
  fake_date 
CLUSTER BY 
  bike_id 
OPTIONS( expiration_timestamp=TIMESTAMP "2025-01-01 00:00:00 UTC", 
    description="My partitioned and clustered cycle_hire table", 
    labels=[("cookbook_query", "development")] ) AS 
SELECT 
  *, NULL 
FROM 
  `bigquery-public-data.london_bicycles.cycle_hire`

创建好表后,我们可以通过查询来验证聚类的效果。例如,查询自行车编号为 153 的最大骑行时长:

SELECT MAX(duration) as max_duration FROM 
`mydataset_eu.cycle_hire_fake_partitioned_clustered` 
WHERE bike_id =  153

将这个查询结果与在原数据集上进行相同查询的结果进行对比:

SELECT MAX(duration) as max_duration FROM 
`bigquery-public-data.london_bicycles.cycle_hire` 
WHERE bike_id =  153

如果之前的示例中创建的分区聚类表还存在,我们也可以对其进行相同的查询:

SELECT MAX(duration) as max_duration FROM 
`mydataset_eu.cycle_hire_partitioned_clustered` 
WHERE bike_id =  153

通过对比这几个查询的性能和扫描的数据量,我们可以发现,对于较小的表,使用虚拟分区列后进行聚类能够显著提高查询性能。而对于那些没有明显分区列或者每天添加数据超过 10GB 的大表,建议基于插入时间伪列(如 _PARTITIONDATE _PARTITIONTIME )进行分区。

6. 选择排名第一的结果(续)

在处理大数据集时,从排序列表中返回排名第一的项目可能会面临查询速度慢的问题。下面我们将详细介绍两种不同的解决方案及其特点。

首先,我们可以使用 ROW_NUMBER() OVER() 窗口函数来实现这个需求。在 BigQuery UI 中运行以下查询:

SELECT 
  rental_id, 
  duration, 
  bike_id, 
  end_date 
FROM ( 
  SELECT 
    rental_id, 
    duration, 
    bike_id, 
    end_date, 
    ROW_NUMBER() OVER (ORDER BY end_date ASC) rental_num 
  FROM 
    `bigquery-public-data`.london_bicycles.cycle_hire ) 
WHERE 
  rental_num = 1

这种方法虽然可以正确地返回排名第一的项目,但对于大数据集,它需要进行完整的排序操作,因此查询速度可能较慢。

为了提高查询性能,我们可以使用 BigQuery 特有的 ARRAY_AGG(x LIMIT 1)[OFFSET(0)] 技巧。运行以下查询:

SELECT 
  rental.* 
FROM ( 
  SELECT 
    ARRAY_AGG( rentals 
    ORDER BY rentals.end_date ASC LIMIT 1)[OFFSET(0)] rental 
  FROM ( 
    SELECT 
      rental_id, 
      duration, 
      bike_id, 
      end_date 
    FROM 
      `bigquery-public-data.london_bicycles.cycle_hire`) 
rentals )

ARRAY_AGG 方法允许 BigQuery 丢弃不需要的数据,只保留排名第一的行,从而大大提高了查询速度。特别是在我们只需要前 n 个结果的情况下,这种方法非常有效。

7. 在 BigQuery 中合并表而不产生重复项(续)

在将数据加载到现有 BigQuery 表时,重复行的问题可能会影响数据的准确性和查询性能。下面我们将详细介绍如何使用 MERGE 语句来解决这个问题。

首先,我们需要创建一个生产表,这里我们复制公共伦敦自行车租赁数据集:

bq cp -f bigquery-public-data:london_bicycles.cycle_hire mydataset_eu.cycle_hire

接着,创建一个包含新数据和重复数据的加载表:

CREATE OR REPLACE TABLE 
  mydataset_eu.temp_loading_table AS ( 
  --Grab 5 duplicate rows 
  SELECT 
    * 
  FROM 
    mydataset_eu.cycle_hire 
  LIMIT 
    5) 
UNION ALL ( 
  --Add a new unique row 
  SELECT 
    111147469109, 
    3180, 
    7054, 
    '2015-09-03 12:45:00 UTC', 
    111, 
    'Park Lane, Hyde Park', 
    '2015-09-03 11:52:00 UTC', 
    300, 
    'Serpentine Car Park, Hyde Park', 
    NULL, 
    NULL, 
    NULL)

在合并数据之前,我们需要验证并记录两个表的行数:

--Number of Rows in base table 
SELECT COUNT(*) FROM mydataset_eu.cycle_hire; 

--Number of Rows in loading table 
SELECT COUNT(*) FROM mydataset_eu.temp_loading_table;

然后,使用 MERGE 语句将加载表的数据合并到生产表中,同时确保不产生重复项:

MERGE 
  mydataset_eu.cycle_hire rentals 
USING 
  mydataset_eu.temp_loading_table temp 
ON 
  temp.rental_id = rentals.rental_id 
  WHEN NOT MATCHED 
  THEN 
    INSERT ROW

最后,验证生产表是否只插入了一行:

--Number of rows now in base table 
SELECT COUNT(*) FROM mydataset_eu.cycle_hire;

通过这种方式,我们可以根据指定的行键插入数据,避免重复项的产生。此外,我们也可以使用 SELECT DISTINCT * FROM mydataset.mytable 来进行去重,但这种方法可能在性能上不如 MERGE 语句。

综上所述,BigQuery 提供了丰富的功能和灵活的操作方式来满足不同的数据处理需求。无论是数据加载、查询优化还是表的管理,我们都可以通过合理运用这些技巧来提高工作效率和数据质量。希望本文介绍的方法能够帮助你更好地使用 BigQuery 进行数据仓库的管理和分析。

Logo

腾讯云面向开发者汇聚海量精品云计算使用和开发经验,营造开放的云计算技术生态圈。

更多推荐