22  arrow 包

22.1 引言

CSV文件的设计初衷是便于人类阅读,它结构简单且几乎能被所有工具解析。然而CSV的效率较低——将数据读入R需要大量处理工作。本章介绍一种更高效的替代方案:Parquet格式,这是一种基于开放标准、被大数据系统广泛使用的列式存储格式。

Arrow是一个跨语言工具包,专为高效分析和传输大规模数据集而设计,同时也可以用于处理Parquet文件。

本章须加载以下包:

library(tidyverse)
library(arrow)
library(dbplyr, warn.conflicts = FALSE)
library(duckdb)

22.2 获取数据

本章教学数据集为:西雅图公共图书馆借阅记录。该数据集包含41,389,465行记录,记录了2005年4月至2022年10月期间每月每本图书的借阅情况。

以下代码将下载该数据的缓存副本(9GB CSV文件)。由于文件较大,建议使用curl::multi_download(),它能显示进度条并支持断点续传:

# 创建数据目录
dir.create("data", showWarnings = FALSE)

# 使用多线程下载(支持断点续传)
curl::multi_download(
  "https://r4ds.s3.us-west-2.amazonaws.com/seattle-library-checkouts.csv",
  "data/seattle-library-checkouts.csv", 
  resume = TRUE
)

输出示例:

# A tibble: 1 × 10
  success status_code resumefrom url                    destfile        error
  <lgl>         <int>      <dbl> <chr>                  <chr>           <chr>
1 TRUE            200          0 https://r4ds.s3.us-we… data/seattle-l… <NA>
# ℹ 4 more variables: type <chr>, modified <dttm>, time <dbl>, headers <list>

22.3 打开数据集

此文件大小为 9GB,不算小了,故不建议一次性将其完全加载到内存中。即应该避免使用 read_csv(),而用 arrow::open_dataset()

seattle_csv <- open_dataset(
  sources = "data/seattle-library-checkouts.csv", 
  col_types = schema(ISBN = string()),
  format = "csv"
)

open_dataset() 会扫描几千行数据来推断数据集的结构。但由于前 80,000 行中的 ISBN 列都是空白的,所以此处手动指定该列的类型,以帮助 arrow 理解数据结构。

一旦 open_dataset() 扫描完数据,它就会记录下结构信息并停止读取数据内容,后续只有在明确请求时才会继续读取。

打印 seattle_csv 时,输出的是它的元数据(metadata):

seattle_csv
#> FileSystemDataset with 1 csv file
#> UsageClass: string
#> CheckoutType: string
#> MaterialType: string
#> CheckoutYear: int64
#> CheckoutMonth: int64
#> Checkouts: int64
#> Title: string
#> ISBN: string
#> Creator: string
#> Subjects: string
#> Publisher: string
#> PublicationYear: string

第一行告诉我们:seattle_csv 是一个由本地的 CSV 文件组成的数据集,只有在需要时才会加载到内存中。其余各行显示的是 arrow 推断出的各列数据类型。

想要查看数据的实际内容,使用 glimpse() 函数,会显示行数、列数和部分值:

seattle_csv |> glimpse()
#> FileSystemDataset with 1 csv file
#> 41,389,465 rows x 12 columns
#> $ UsageClass      <string> "Physical", "Physical", "Digital", "Physical", "Ph…
#> $ CheckoutType    <string> "Horizon", "Horizon", "OverDrive", "Horizon", "Hor…
#> $ MaterialType    <string> "BOOK", "BOOK", "EBOOK", "BOOK", "SOUNDDISC", "BOO…
#> $ CheckoutYear     <int64> 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 20…
#> $ CheckoutMonth    <int64> 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,…
#> $ Checkouts        <int64> 1, 1, 1, 1, 1, 1, 1, 1, 4, 1, 1, 2, 3, 2, 1, 3, 2,…
#> $ Title           <string> "Super rich : a guide to having it all / Russell S…
#> $ ISBN            <string> "", "", "", "", "", "", "", "", "", "", "", "", ""…
#> $ Creator         <string> "Simmons, Russell", "Barclay, James, 1965-", "Tim …
#> $ Subjects        <string> "Self realization, Conduct of life, Attitude Psych…
#> $ Publisher       <string> "Gotham Books,", "Pyr,", "Random House, Inc.", "Di…
#> $ PublicationYear <string> "c2011.", "2010.", "2015", "2005.", "c2004.", "c20…

可以使用 dplyr 的语法对数据进行操作,然后用 collect() 函数触发计算并返回结果。例如,下面这段代码统计了每年的总借阅次数:

seattle_csv |> 
  group_by(CheckoutYear) |> 
  summarise(Checkouts = sum(Checkouts)) |> 
  arrange(CheckoutYear) |> 
  collect()
#> # A tibble: 18 × 2
#>   CheckoutYear Checkouts
#>          <int>     <int>
#> 1         2005   3798685
#> 2         2006   6599318
#> 3         2007   7126627
#> 4         2008   8438486
#> 5         2009   9135167
#> 6         2010   8608966
#> # ℹ 12 more rows

多亏了 arrow,无论数据多大上述代码都能运行。但目前它执行起来仍然较慢(本书作者 Hadley 的电脑上大概花了 10 秒)。考虑到数据量已经很大,这个速度并不算差——不过我们仍然可以通过切换到更高效的数据格式来进一步加速。

22.4 Parquet 格式

为了让上述数据更易于处理,我们需要将其转换为 Parquet 文件格式并进行分块存储。


首先详细介绍一下 Parquet。与 CSV 类似,Parquet 也适用于矩形数据,但它不可以用文本编辑器查看,是专为大数据需求设计的二进制格式。这意味着:

  • 文件更小:Parquet 采用高效编码和压缩技术,减少了磁盘到内存的数据传输量,从而提升速度。
  • 类型系统完善:CSV 需要猜测列类型(如”08-10-2022”应解析为字符串还是日期),而Parquet直接存储数据类型信息。
  • 列式存储:数据按列组织(类似 R 数据框),相比按行存储的 CSV 能显著提升分析性能。
  • 分块存储:支持并行处理不同数据块,某些情况下可以跳过无关数据块。

唯一缺点是不可人工阅读,用 readr::read_file() 查看 Parquet 文件只会显示乱码。


接下来看看分区的概念。随着数据集增大,单文件存储会变得难以维护。对数据进行分区可以让我们分析时只访问部分文件,从而大幅提升性能。


基础理论阐述完毕,下面让我们来重构图书馆数据。我们将按借阅年份(CheckoutYear)分区,这样既能支持按年份筛选,又能生成大小适中的 18 个数据块。

通过 group_by() 函数定义分区,再用 write_dataset() 保存为 Parquet 格式:

pq_path <- "data/seattle-library-checkouts"
seattle_csv |> 
  group_by(CheckoutYear) |> 
  write_dataset(path = pq_path, format = "parquet")

转换耗时约 1 分钟,但后续操作将因此获得极大加速。

查看生成的文件结构:

tibble(
  files = list.files(pq_path, recursive = TRUE),
  size_MB = file.size(file.path(pq_path, files)) / 1024^2
)
#> # A tibble: 18 × 2
#>   files                            size_MB
#>   <chr>                              <dbl>
#> 1 CheckoutYear=2005/part-0.parquet    109.
#> 2 CheckoutYear=2006/part-0.parquet    164.
#> 3 CheckoutYear=2007/part-0.parquet    178.
#> 4 CheckoutYear=2008/part-0.parquet    195.
#> 5 CheckoutYear=2009/part-0.parquet    214.
#> 6 CheckoutYear=2010/part-0.parquet    222.
#> # ℹ 12 more rows

原始 9GB CSV 文件被转换为 18 个 Parquet 文件,且命名格式统一。每个文件 100-300MB,总大小约 4GB(仅为原 CSV 的一半多),体现了 Parquet 的高效存储特性。

22.5 使用 dplyr 操作 Arrow 数据

我们已经创建了 Parquet 文件,现在需要重新读取它们。再次使用 open_dataset(),传入目录路径:

seattle_pq <- open_dataset(pq_path)

接下来我们就可以构建 dplyr 分析流程。例如,统计近五年每月图书借阅量:

query <- seattle_pq |> 
  filter(CheckoutYear >= 2018, MaterialType == "BOOK") |>
  group_by(CheckoutYear, CheckoutMonth) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(CheckoutYear, CheckoutMonth)

这与上一章 dbplyr 的工作方式类似:编写的 dplyr 代码会被自动转换为 Apache Arrow C++ 库能理解的查询,调用 collect() 时才会执行。打印查询对象可预览执行计划:

query
#> FileSystemDataset (query)
#> CheckoutYear: int32
#> CheckoutMonth: int64
#> TotalCheckouts: int64
#> 
#> * Grouped by CheckoutYear
#> * Sorted by CheckoutYear [asc], CheckoutMonth [asc]
#> See $.data for the source Arrow object

随后调用 collect() 即可获取结果:

query |> collect()
#> # A tibble: 58 × 3
#> # Groups:   CheckoutYear [5]
#>   CheckoutYear CheckoutMonth TotalCheckouts
#>          <int>         <int>          <int>
#> 1         2018             1         355101
#> 2         2018             2         309813
#> 3         2018             3         344487
#> 4         2018             4         330988
#> 5         2018             5         318049
#> 6         2018             6         341825
#> # ℹ 52 more rows

另外,通过 arrow::to_duckdb() 函数可转为 DuckDB 数据库:

seattle_pq |> 
  to_duckdb() |>
  filter(CheckoutYear >= 2018, MaterialType == "BOOK") |>
  group_by(CheckoutYear) |>
  summarize(TotalCheckouts = sum(Checkouts)) |>
  arrange(desc(CheckoutYear)) |>
  collect()

此过程无需复制内存,实现了计算环境间的无缝切换。