Skip to content

Commit 0f48aaf

Browse files
committed
[SPARK-29339][R] Support Arrow 0.14 in vectoried dapply and gapply (test it in AppVeyor build)
### What changes were proposed in this pull request? This PR proposes: 1. Use `is.data.frame` to check if it is a DataFrame. 2. to install Arrow and test Arrow optimization in AppVeyor build. We're currently not testing this in CI. ### Why are the changes needed? 1. To support SparkR with Arrow 0.14 2. To check if there's any regression and if it works correctly. ### Does this PR introduce any user-facing change? ```r df <- createDataFrame(mtcars) collect(dapply(df, function(rdf) { data.frame(rdf$gear + 1) }, structType("gear double"))) ``` **Before:** ``` Error in readBin(con, raw(), as.integer(dataLen), endian = "big") : invalid 'n' argument ``` **After:** ``` gear 1 5 2 5 3 5 4 4 5 4 6 4 7 4 8 5 9 5 ... ``` ### How was this patch tested? AppVeyor Closes apache#25993 from HyukjinKwon/arrow-r-appveyor. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 8fabbab commit 0f48aaf

File tree

9 files changed

+30
-38
lines changed

9 files changed

+30
-38
lines changed

R/check-cran.sh

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,10 @@ fi
6565

6666
echo "Running CRAN check with $CRAN_CHECK_OPTIONS options"
6767

68+
# Remove this environment variable to allow to check suggested packages once
69+
# Jenkins installs arrow. See SPARK-29339.
70+
export _R_CHECK_FORCE_SUGGESTS_=FALSE
71+
6872
if [ -n "$NO_TESTS" ] && [ -n "$NO_MANUAL" ]
6973
then
7074
"$R_SCRIPT_PATH/R" CMD check $CRAN_CHECK_OPTIONS "SparkR_$VERSION.tar.gz"

R/pkg/DESCRIPTION

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ Suggests:
2222
rmarkdown,
2323
testthat,
2424
e1071,
25-
survival
25+
survival,
26+
arrow
2627
Collate:
2728
'schema.R'
2829
'generics.R'

R/pkg/R/SQLContext.R

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -148,19 +148,7 @@ getDefaultSqlSource <- function() {
148148
}
149149

150150
writeToFileInArrow <- function(fileName, rdf, numPartitions) {
151-
requireNamespace1 <- requireNamespace
152-
153-
# R API in Arrow is not yet released in CRAN. CRAN requires to add the
154-
# package in requireNamespace at DESCRIPTION. Later, CRAN checks if the package is available
155-
# or not. Therefore, it works around by avoiding direct requireNamespace.
156-
# Currently, as of Arrow 0.12.0, it can be installed by install_github. See ARROW-3204.
157-
if (requireNamespace1("arrow", quietly = TRUE)) {
158-
record_batch <- get("record_batch", envir = asNamespace("arrow"), inherits = FALSE)
159-
RecordBatchStreamWriter <- get(
160-
"RecordBatchStreamWriter", envir = asNamespace("arrow"), inherits = FALSE)
161-
FileOutputStream <- get(
162-
"FileOutputStream", envir = asNamespace("arrow"), inherits = FALSE)
163-
151+
if (requireNamespace("arrow", quietly = TRUE)) {
164152
numPartitions <- if (!is.null(numPartitions)) {
165153
numToInt(numPartitions)
166154
} else {
@@ -176,11 +164,11 @@ writeToFileInArrow <- function(fileName, rdf, numPartitions) {
176164
stream_writer <- NULL
177165
tryCatch({
178166
for (rdf_slice in rdf_slices) {
179-
batch <- record_batch(rdf_slice)
167+
batch <- arrow::record_batch(rdf_slice)
180168
if (is.null(stream_writer)) {
181-
stream <- FileOutputStream(fileName)
169+
stream <- arrow::FileOutputStream(fileName)
182170
schema <- batch$schema
183-
stream_writer <- RecordBatchStreamWriter(stream, schema)
171+
stream_writer <- arrow::RecordBatchStreamWriter(stream, schema)
184172
}
185173

186174
stream_writer$write_batch(batch)

R/pkg/R/deserialize.R

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -232,11 +232,7 @@ readMultipleObjectsWithKeys <- function(inputCon) {
232232
}
233233

234234
readDeserializeInArrow <- function(inputCon) {
235-
# This is a hack to avoid CRAN check. Arrow is not uploaded into CRAN now. See ARROW-3204.
236-
requireNamespace1 <- requireNamespace
237-
if (requireNamespace1("arrow", quietly = TRUE)) {
238-
RecordBatchStreamReader <- get(
239-
"RecordBatchStreamReader", envir = asNamespace("arrow"), inherits = FALSE)
235+
if (requireNamespace("arrow", quietly = TRUE)) {
240236
# Arrow drops `as_tibble` since 0.14.0, see ARROW-5190.
241237
useAsTibble <- exists("as_tibble", envir = asNamespace("arrow"))
242238

@@ -246,7 +242,7 @@ readDeserializeInArrow <- function(inputCon) {
246242
# for now.
247243
dataLen <- readInt(inputCon)
248244
arrowData <- readBin(inputCon, raw(), as.integer(dataLen), endian = "big")
249-
batches <- RecordBatchStreamReader(arrowData)$batches()
245+
batches <- arrow::RecordBatchStreamReader(arrowData)$batches()
250246

251247
if (useAsTibble) {
252248
as_tibble <- get("as_tibble", envir = asNamespace("arrow"))

R/pkg/R/serialize.R

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -222,15 +222,11 @@ writeArgs <- function(con, args) {
222222
}
223223

224224
writeSerializeInArrow <- function(conn, df) {
225-
# This is a hack to avoid CRAN check. Arrow is not uploaded into CRAN now. See ARROW-3204.
226-
requireNamespace1 <- requireNamespace
227-
if (requireNamespace1("arrow", quietly = TRUE)) {
228-
write_arrow <- get("write_arrow", envir = asNamespace("arrow"), inherits = FALSE)
229-
225+
if (requireNamespace("arrow", quietly = TRUE)) {
230226
# There looks no way to send each batch in streaming format via socket
231227
# connection. See ARROW-4512.
232228
# So, it writes the whole Arrow streaming-formatted binary at once for now.
233-
writeRaw(conn, write_arrow(df, raw()))
229+
writeRaw(conn, arrow::write_arrow(df, raw()))
234230
} else {
235231
stop("'arrow' package should be installed.")
236232
}

R/pkg/inst/worker/worker.R

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ compute <- function(mode, partition, serializer, deserializer, key,
5050
} else {
5151
# Check to see if inputData is a valid data.frame
5252
stopifnot(deserializer == "byte" || deserializer == "arrow")
53-
stopifnot(class(inputData) == "data.frame")
53+
stopifnot(is.data.frame(inputData))
5454
}
5555

5656
if (mode == 2) {

R/pkg/tests/fulltests/test_sparkSQL_arrow.R

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ test_that("dapply() Arrow optimization", {
101101
tryCatch({
102102
ret <- dapply(df,
103103
function(rdf) {
104-
stopifnot(class(rdf) == "data.frame")
104+
stopifnot(is.data.frame(rdf))
105105
rdf
106106
},
107107
schema(df))
@@ -115,7 +115,7 @@ test_that("dapply() Arrow optimization", {
115115
tryCatch({
116116
ret <- dapply(df,
117117
function(rdf) {
118-
stopifnot(class(rdf) == "data.frame")
118+
stopifnot(is.data.frame(rdf))
119119
# mtcars' hp is more then 50.
120120
stopifnot(all(rdf$hp > 50))
121121
rdf
@@ -199,7 +199,7 @@ test_that("gapply() Arrow optimization", {
199199
if (length(key) > 0) {
200200
stopifnot(is.numeric(key[[1]]))
201201
}
202-
stopifnot(class(grouped) == "data.frame")
202+
stopifnot(is.data.frame(grouped))
203203
grouped
204204
},
205205
schema(df))
@@ -217,7 +217,7 @@ test_that("gapply() Arrow optimization", {
217217
if (length(key) > 0) {
218218
stopifnot(is.numeric(key[[1]]))
219219
}
220-
stopifnot(class(grouped) == "data.frame")
220+
stopifnot(is.data.frame(grouped))
221221
stopifnot(length(colnames(grouped)) == 11)
222222
# mtcars' hp is more then 50.
223223
stopifnot(all(grouped$hp > 50))

appveyor.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,13 @@ install:
4242
# Install maven and dependencies
4343
- ps: .\dev\appveyor-install-dependencies.ps1
4444
# Required package for R unit tests
45-
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'e1071', 'survival'), repos='https://cloud.r-project.org/')"
45+
- cmd: R -e "install.packages(c('knitr', 'rmarkdown', 'e1071', 'survival', 'arrow'), repos='https://cloud.r-project.org/')"
4646
# Here, we use the fixed version of testthat. For more details, please see SPARK-22817.
4747
# As of devtools 2.1.0, it requires testthat higher then 2.1.1 as a dependency. SparkR test requires testthat 1.0.2.
4848
# Therefore, we don't use devtools but installs it directly from the archive including its dependencies.
4949
- cmd: R -e "install.packages(c('crayon', 'praise', 'R6'), repos='https://cloud.r-project.org/')"
5050
- cmd: R -e "install.packages('https://cloud.r-project.org/src/contrib/Archive/testthat/testthat_1.0.2.tar.gz', repos=NULL, type='source')"
51-
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival')"
51+
- cmd: R -e "packageVersion('knitr'); packageVersion('rmarkdown'); packageVersion('testthat'); packageVersion('e1071'); packageVersion('survival'); packageVersion('arrow')"
5252

5353
build_script:
5454
# '-Djna.nosys=true' is required to avoid kernel32.dll load failure.

docs/sparkr.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -663,13 +663,20 @@ Apache Arrow is an in-memory columnar data format that is used in Spark to effic
663663

664664
## Ensure Arrow Installed
665665

666-
Currently, Arrow R library is not on CRAN yet [ARROW-3204](https://issues.apache.org/jira/browse/ARROW-3204). Therefore, it should be installed directly from Github. You can use `remotes::install_github` as below.
666+
Arrow R library is available on CRAN as of [ARROW-3204](https://issues.apache.org/jira/browse/ARROW-3204). It can be installed as below.
667+
668+
```bash
669+
Rscript -e 'install.packages("arrow", repos="https://cloud.r-project.org/")'
670+
```
671+
672+
If you need to install old versions, it should be installed directly from Github. You can use `remotes::install_github` as below.
667673

668674
```bash
669675
Rscript -e 'remotes::install_github("apache/[email protected]", subdir = "r")'
670676
```
671677

672-
`apache-arrow-0.12.1` is a version tag that can be checked in [Arrow at Github](https://github.com/apache/arrow/releases). You must ensure that Arrow R package is installed and available on all cluster nodes. The current supported version is 0.12.1.
678+
`apache-arrow-0.12.1` is a version tag that can be checked in [Arrow at Github](https://github.com/apache/arrow/releases). You must ensure that Arrow R package is installed and available on all cluster nodes.
679+
The current supported minimum version is 0.12.1; however, this might change between the minor releases since Arrow optimization in SparkR is experimental.
673680

674681
## Enabling for Conversion to/from R DataFrame, `dapply` and `gapply`
675682

0 commit comments

Comments
 (0)