22 arrow 包
22.1 引言
CSV文件的设计初衷是便于人类阅读,它结构简单且几乎能被所有工具解析。然而CSV的效率较低——将数据读入R需要大量处理工作。本章介绍一种更高效的替代方案:Parquet格式,这是一种基于开放标准、被大数据系统广泛使用的列式存储格式。
Arrow是一个跨语言工具包,专为高效分析和传输大规模数据集而设计,同时也可以用于处理Parquet文件。
本章须加载以下包:
library(tidyverse)
library(arrow)
library(dbplyr, warn.conflicts = FALSE)
library(duckdb)22.2 获取数据
本章教学数据集为:西雅图公共图书馆借阅记录。该数据集包含41,389,465行记录,记录了2005年4月至2022年10月期间每月每本图书的借阅情况。
以下代码将下载该数据的缓存副本(9GB CSV文件)。由于文件较大,建议使用curl::multi_download(),它能显示进度条并支持断点续传:
# 创建数据目录
dir.create("data", showWarnings = FALSE)
# 使用多线程下载(支持断点续传)
curl::multi_download(
"https://r4ds.s3.us-west-2.amazonaws.com/seattle-library-checkouts.csv",
"data/seattle-library-checkouts.csv",
resume = TRUE
)输出示例:
# A tibble: 1 × 10
success status_code resumefrom url destfile error
<lgl> <int> <dbl> <chr> <chr> <chr>
1 TRUE 200 0 https://r4ds.s3.us-we… data/seattle-l… <NA>
# ℹ 4 more variables: type <chr>, modified <dttm>, time <dbl>, headers <list>22.3 打开数据集
此文件大小为 9GB,不算小了,故不建议一次性将其完全加载到内存中。即应该避免使用 read_csv(),而用 arrow::open_dataset():
seattle_csv <- open_dataset(
sources = "data/seattle-library-checkouts.csv",
col_types = schema(ISBN = string()),
format = "csv"
)open_dataset() 会扫描几千行数据来推断数据集的结构。但由于前 80,000 行中的 ISBN 列都是空白的,所以此处手动指定该列的类型,以帮助 arrow 理解数据结构。
一旦 open_dataset() 扫描完数据,它就会记录下结构信息并停止读取数据内容,后续只有在明确请求时才会继续读取。
打印 seattle_csv 时,输出的是它的元数据(metadata):
seattle_csv
#> FileSystemDataset with 1 csv file
#> UsageClass: string
#> CheckoutType: string
#> MaterialType: string
#> CheckoutYear: int64
#> CheckoutMonth: int64
#> Checkouts: int64
#> Title: string
#> ISBN: string
#> Creator: string
#> Subjects: string
#> Publisher: string
#> PublicationYear: string第一行告诉我们:seattle_csv 是一个由本地的 CSV 文件组成的数据集,只有在需要时才会加载到内存中。其余各行显示的是 arrow 推断出的各列数据类型。
想要查看数据的实际内容,使用 glimpse() 函数,会显示行数、列数和部分值:
seattle_csv |> glimpse()
#> FileSystemDataset with 1 csv file
#> 41,389,465 rows x 12 columns
#> $ UsageClass <string> "Physical", "Physical", "Digital", "Physical", "Ph…
#> $ CheckoutType <string> "Horizon", "Horizon", "OverDrive", "Horizon", "Hor…
#> $ MaterialType <string> "BOOK", "BOOK", "EBOOK", "BOOK", "SOUNDDISC", "BOO…
#> $ CheckoutYear <int64> 2016, 2016, 2016, 2016, 2016, 2016, 2016, 2016, 20…
#> $ CheckoutMonth <int64> 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6, 6,…
#> $ Checkouts <int64> 1, 1, 1, 1, 1, 1, 1, 1, 4, 1, 1, 2, 3, 2, 1, 3, 2,…
#> $ Title <string> "Super rich : a guide to having it all / Russell S…
#> $ ISBN <string> "", "", "", "", "", "", "", "", "", "", "", "", ""…
#> $ Creator <string> "Simmons, Russell", "Barclay, James, 1965-", "Tim …
#> $ Subjects <string> "Self realization, Conduct of life, Attitude Psych…
#> $ Publisher <string> "Gotham Books,", "Pyr,", "Random House, Inc.", "Di…
#> $ PublicationYear <string> "c2011.", "2010.", "2015", "2005.", "c2004.", "c20…可以使用 dplyr 的语法对数据进行操作,然后用 collect() 函数触发计算并返回结果。例如,下面这段代码统计了每年的总借阅次数:
seattle_csv |>
group_by(CheckoutYear) |>
summarise(Checkouts = sum(Checkouts)) |>
arrange(CheckoutYear) |>
collect()
#> # A tibble: 18 × 2
#> CheckoutYear Checkouts
#> <int> <int>
#> 1 2005 3798685
#> 2 2006 6599318
#> 3 2007 7126627
#> 4 2008 8438486
#> 5 2009 9135167
#> 6 2010 8608966
#> # ℹ 12 more rows多亏了 arrow,无论数据多大上述代码都能运行。但目前它执行起来仍然较慢(本书作者 Hadley 的电脑上大概花了 10 秒)。考虑到数据量已经很大,这个速度并不算差——不过我们仍然可以通过切换到更高效的数据格式来进一步加速。
22.4 Parquet 格式
为了让上述数据更易于处理,我们需要将其转换为 Parquet 文件格式并进行分块存储。
首先详细介绍一下 Parquet。与 CSV 类似,Parquet 也适用于矩形数据,但它不可以用文本编辑器查看,是专为大数据需求设计的二进制格式。这意味着:
- 文件更小:Parquet 采用高效编码和压缩技术,减少了磁盘到内存的数据传输量,从而提升速度。
- 类型系统完善:CSV 需要猜测列类型(如”08-10-2022”应解析为字符串还是日期),而Parquet直接存储数据类型信息。
- 列式存储:数据按列组织(类似 R 数据框),相比按行存储的 CSV 能显著提升分析性能。
- 分块存储:支持并行处理不同数据块,某些情况下可以跳过无关数据块。
唯一缺点是不可人工阅读,用 readr::read_file() 查看 Parquet 文件只会显示乱码。
接下来看看分区的概念。随着数据集增大,单文件存储会变得难以维护。对数据进行分区可以让我们分析时只访问部分文件,从而大幅提升性能。
基础理论阐述完毕,下面让我们来重构图书馆数据。我们将按借阅年份(CheckoutYear)分区,这样既能支持按年份筛选,又能生成大小适中的 18 个数据块。
通过 group_by() 函数定义分区,再用 write_dataset() 保存为 Parquet 格式:
pq_path <- "data/seattle-library-checkouts"
seattle_csv |>
group_by(CheckoutYear) |>
write_dataset(path = pq_path, format = "parquet")转换耗时约 1 分钟,但后续操作将因此获得极大加速。
查看生成的文件结构:
tibble(
files = list.files(pq_path, recursive = TRUE),
size_MB = file.size(file.path(pq_path, files)) / 1024^2
)
#> # A tibble: 18 × 2
#> files size_MB
#> <chr> <dbl>
#> 1 CheckoutYear=2005/part-0.parquet 109.
#> 2 CheckoutYear=2006/part-0.parquet 164.
#> 3 CheckoutYear=2007/part-0.parquet 178.
#> 4 CheckoutYear=2008/part-0.parquet 195.
#> 5 CheckoutYear=2009/part-0.parquet 214.
#> 6 CheckoutYear=2010/part-0.parquet 222.
#> # ℹ 12 more rows原始 9GB CSV 文件被转换为 18 个 Parquet 文件,且命名格式统一。每个文件 100-300MB,总大小约 4GB(仅为原 CSV 的一半多),体现了 Parquet 的高效存储特性。
22.5 使用 dplyr 操作 Arrow 数据
我们已经创建了 Parquet 文件,现在需要重新读取它们。再次使用 open_dataset(),传入目录路径:
seattle_pq <- open_dataset(pq_path)接下来我们就可以构建 dplyr 分析流程。例如,统计近五年每月图书借阅量:
query <- seattle_pq |>
filter(CheckoutYear >= 2018, MaterialType == "BOOK") |>
group_by(CheckoutYear, CheckoutMonth) |>
summarize(TotalCheckouts = sum(Checkouts)) |>
arrange(CheckoutYear, CheckoutMonth)这与上一章 dbplyr 的工作方式类似:编写的 dplyr 代码会被自动转换为 Apache Arrow C++ 库能理解的查询,调用 collect() 时才会执行。打印查询对象可预览执行计划:
query
#> FileSystemDataset (query)
#> CheckoutYear: int32
#> CheckoutMonth: int64
#> TotalCheckouts: int64
#>
#> * Grouped by CheckoutYear
#> * Sorted by CheckoutYear [asc], CheckoutMonth [asc]
#> See $.data for the source Arrow object随后调用 collect() 即可获取结果:
query |> collect()
#> # A tibble: 58 × 3
#> # Groups: CheckoutYear [5]
#> CheckoutYear CheckoutMonth TotalCheckouts
#> <int> <int> <int>
#> 1 2018 1 355101
#> 2 2018 2 309813
#> 3 2018 3 344487
#> 4 2018 4 330988
#> 5 2018 5 318049
#> 6 2018 6 341825
#> # ℹ 52 more rows另外,通过 arrow::to_duckdb() 函数可转为 DuckDB 数据库:
seattle_pq |>
to_duckdb() |>
filter(CheckoutYear >= 2018, MaterialType == "BOOK") |>
group_by(CheckoutYear) |>
summarize(TotalCheckouts = sum(Checkouts)) |>
arrange(desc(CheckoutYear)) |>
collect()此过程无需复制内存,实现了计算环境间的无缝切换。