From 4197e8cc6c1cfd1045adb6a2c2dd2d9da4869d65 Mon Sep 17 00:00:00 2001 From: Pierrick HYMBERT Date: Tue, 28 Sep 2021 13:05:03 +0200 Subject: [PATCH] Fixes #91 Delete temporary files between RDDs --- .../scala/org/apache/spark/search/rdd/SearchRDDIndexer.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/search/rdd/SearchRDDIndexer.scala b/core/src/main/scala/org/apache/spark/search/rdd/SearchRDDIndexer.scala index 088cf7e1..36a0aed0 100644 --- a/core/src/main/scala/org/apache/spark/search/rdd/SearchRDDIndexer.scala +++ b/core/src/main/scala/org/apache/spark/search/rdd/SearchRDDIndexer.scala @@ -57,6 +57,7 @@ private[search] class SearchRDDIndexer[S: ClassTag](sc: SparkContext, protected def streamPartitionIndexZip(context: TaskContext, searchRDDPartition: SearchPartitionIndex[S]): Iterator[Array[Byte]] = { val localIndexDirPath = new File(searchRDDPartition.indexDir) val targetPath = new File(localIndexDirPath.getParent, s"${localIndexDirPath.getName}.zip") + context.addTaskCompletionListener[Unit](_ => FileUtils.delete(targetPath)) zipPartition(localIndexDirPath.toPath, new FileOutputStream(targetPath)) new InterruptibleIterator[Array[Byte]](context, new FileInputStreamIterator(targetPath))