Skip to content

Commit 59c743b

Browse files
committed
Spark: Remove closing of IO in SerializableTable*
This is to fix: apache#12046 To summarize, the issue is that Spark can remove broadcast variables from memory and persist them to disk in case that memory needs to be freed. In the case that this happens, the IO object would be closed even if it was still being used by tasks. This fixes the issue by removing the closure of the IO object when the serializable table is closed. The IO objects should be closed on thread finalizers.
1 parent e89798e commit 59c743b

File tree

6 files changed

+16
-91
lines changed

6 files changed

+16
-91
lines changed

spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,30 +22,22 @@
2222
import org.apache.iceberg.SerializableTable;
2323
import org.apache.iceberg.Table;
2424
import org.apache.spark.util.KnownSizeEstimation;
25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
2725

2826
/**
2927
* This class provides a serializable table with a known size estimate. Spark calls its
3028
* SizeEstimator class when broadcasting variables and this can be an expensive operation, so
3129
* providing a known size estimate allows that operation to be skipped.
3230
*
33-
* <p>This class also implements AutoCloseable to avoid leaking resources upon broadcasting.
34-
* Broadcast variables are destroyed and cleaned up on the driver and executors once they are
35-
* garbage collected on the driver. The implementation ensures only resources used by copies of the
36-
* main table are released.
31+
* <p>This class explicitly does *not* close IO objects, because broadcast variables can be removed from memory and
32+
* persisted to disk by Spark in the case that memory is needed, even if associated IO objects are still in use.
3733
*/
3834
public class SerializableTableWithSize extends SerializableTable
39-
implements KnownSizeEstimation, AutoCloseable {
35+
implements KnownSizeEstimation {
4036

41-
private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
4237
private static final long SIZE_ESTIMATE = 32_768L;
4338

44-
private final transient Object serializationMarker;
45-
4639
protected SerializableTableWithSize(Table table) {
4740
super(table);
48-
this.serializationMarker = new Object();
4941
}
5042

5143
@Override
@@ -61,38 +53,16 @@ public static Table copyOf(Table table) {
6153
}
6254
}
6355

64-
@Override
65-
public void close() throws Exception {
66-
if (serializationMarker == null) {
67-
LOG.info("Releasing resources");
68-
io().close();
69-
}
70-
}
71-
7256
public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
73-
implements KnownSizeEstimation, AutoCloseable {
74-
75-
private static final Logger LOG =
76-
LoggerFactory.getLogger(SerializableMetadataTableWithSize.class);
77-
78-
private final transient Object serializationMarker;
57+
implements KnownSizeEstimation {
7958

8059
protected SerializableMetadataTableWithSize(BaseMetadataTable metadataTable) {
8160
super(metadataTable);
82-
this.serializationMarker = new Object();
8361
}
8462

8563
@Override
8664
public long estimatedSize() {
8765
return SIZE_ESTIMATE;
8866
}
89-
90-
@Override
91-
public void close() throws Exception {
92-
if (serializationMarker == null) {
93-
LOG.info("Releasing resources");
94-
io().close();
95-
}
96-
}
9767
}
9868
}

spark/v3.3/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.apache.iceberg.types.Types.NestedField.required;
2323
import static org.mockito.Mockito.never;
2424
import static org.mockito.Mockito.spy;
25-
import static org.mockito.Mockito.times;
2625
import static org.mockito.Mockito.verify;
2726
import static org.mockito.Mockito.when;
2827

@@ -103,7 +102,7 @@ public void testCloseSerializableTableKryoSerialization() throws Exception {
103102
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors
104103

105104
verify(spyIO, never()).close();
106-
verify(spyFileIOCopy, times(1)).close();
105+
verify(spyFileIOCopy, never()).close();
107106
}
108107
}
109108

@@ -124,7 +123,7 @@ public void testCloseSerializableTableJavaSerialization() throws Exception {
124123
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors
125124

126125
verify(spyIO, never()).close();
127-
verify(spyFileIOCopy, times(1)).close();
126+
verify(spyFileIOCopy, never()).close();
128127
}
129128
}
130129

spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,23 @@
2323
import org.apache.iceberg.Table;
2424
import org.apache.iceberg.spark.SparkExecutorCache;
2525
import org.apache.spark.util.KnownSizeEstimation;
26-
import org.slf4j.Logger;
27-
import org.slf4j.LoggerFactory;
2826

2927
/**
3028
* This class provides a serializable table with a known size estimate. Spark calls its
3129
* SizeEstimator class when broadcasting variables and this can be an expensive operation, so
3230
* providing a known size estimate allows that operation to be skipped.
3331
*
34-
* <p>This class also implements AutoCloseable to avoid leaking resources upon broadcasting.
35-
* Broadcast variables are destroyed and cleaned up on the driver and executors once they are
36-
* garbage collected on the driver. The implementation ensures only resources used by copies of the
37-
* main table are released.
32+
* <p>This class also implements AutoCloseable , but does *not* close IO objects, because broadcast
33+
* variables can be removed from memory and persisted to disk by Spark in the case that memory is
34+
* needed, even if associated IO objects are still in use.
3835
*/
3936
public class SerializableTableWithSize extends SerializableTable
4037
implements KnownSizeEstimation, AutoCloseable {
4138

42-
private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
4339
private static final long SIZE_ESTIMATE = 32_768L;
4440

45-
private final transient Object serializationMarker;
46-
4741
protected SerializableTableWithSize(Table table) {
4842
super(table);
49-
this.serializationMarker = new Object();
5043
}
5144

5245
@Override
@@ -64,24 +57,14 @@ public static Table copyOf(Table table) {
6457

6558
@Override
6659
public void close() throws Exception {
67-
if (serializationMarker == null) {
68-
LOG.info("Releasing resources");
69-
io().close();
70-
}
7160
invalidateCache(name());
7261
}
7362

7463
public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
7564
implements KnownSizeEstimation, AutoCloseable {
7665

77-
private static final Logger LOG =
78-
LoggerFactory.getLogger(SerializableMetadataTableWithSize.class);
79-
80-
private final transient Object serializationMarker;
81-
8266
protected SerializableMetadataTableWithSize(BaseMetadataTable metadataTable) {
8367
super(metadataTable);
84-
this.serializationMarker = new Object();
8568
}
8669

8770
@Override
@@ -91,10 +74,6 @@ public long estimatedSize() {
9174

9275
@Override
9376
public void close() throws Exception {
94-
if (serializationMarker == null) {
95-
LOG.info("Releasing resources");
96-
io().close();
97-
}
9877
invalidateCache(name());
9978
}
10079
}

spark/v3.4/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import static org.apache.iceberg.types.Types.NestedField.required;
2323
import static org.mockito.Mockito.never;
2424
import static org.mockito.Mockito.spy;
25-
import static org.mockito.Mockito.times;
2625
import static org.mockito.Mockito.verify;
2726
import static org.mockito.Mockito.when;
2827

@@ -103,7 +102,7 @@ public void testCloseSerializableTableKryoSerialization() throws Exception {
103102
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors
104103

105104
verify(spyIO, never()).close();
106-
verify(spyFileIOCopy, times(1)).close();
105+
verify(spyFileIOCopy, never()).close();
107106
}
108107
}
109108

@@ -124,7 +123,7 @@ public void testCloseSerializableTableJavaSerialization() throws Exception {
124123
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors
125124

126125
verify(spyIO, never()).close();
127-
verify(spyFileIOCopy, times(1)).close();
126+
verify(spyFileIOCopy, never()).close();
128127
}
129128
}
130129

spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SerializableTableWithSize.java

Lines changed: 3 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,30 +23,23 @@
2323
import org.apache.iceberg.Table;
2424
import org.apache.iceberg.spark.SparkExecutorCache;
2525
import org.apache.spark.util.KnownSizeEstimation;
26-
import org.slf4j.Logger;
27-
import org.slf4j.LoggerFactory;
2826

2927
/**
3028
* This class provides a serializable table with a known size estimate. Spark calls its
3129
* SizeEstimator class when broadcasting variables and this can be an expensive operation, so
3230
* providing a known size estimate allows that operation to be skipped.
3331
*
34-
* <p>This class also implements AutoCloseable to avoid leaking resources upon broadcasting.
35-
* Broadcast variables are destroyed and cleaned up on the driver and executors once they are
36-
* garbage collected on the driver. The implementation ensures only resources used by copies of the
37-
* main table are released.
32+
* <p>This class also implements AutoCloseable , but does *not* close IO objects, because broadcast
33+
* variables can be removed from memory and persisted to disk by Spark in the case that memory is
34+
* needed, even if associated IO objects are still in use.
3835
*/
3936
public class SerializableTableWithSize extends SerializableTable
4037
implements KnownSizeEstimation, AutoCloseable {
4138

42-
private static final Logger LOG = LoggerFactory.getLogger(SerializableTableWithSize.class);
4339
private static final long SIZE_ESTIMATE = 32_768L;
4440

45-
private final transient Object serializationMarker;
46-
4741
protected SerializableTableWithSize(Table table) {
4842
super(table);
49-
this.serializationMarker = new Object();
5043
}
5144

5245
@Override
@@ -64,24 +57,14 @@ public static Table copyOf(Table table) {
6457

6558
@Override
6659
public void close() throws Exception {
67-
if (serializationMarker == null) {
68-
LOG.info("Releasing resources");
69-
io().close();
70-
}
7160
invalidateCache(name());
7261
}
7362

7463
public static class SerializableMetadataTableWithSize extends SerializableMetadataTable
7564
implements KnownSizeEstimation, AutoCloseable {
7665

77-
private static final Logger LOG =
78-
LoggerFactory.getLogger(SerializableMetadataTableWithSize.class);
79-
80-
private final transient Object serializationMarker;
81-
8266
protected SerializableMetadataTableWithSize(BaseMetadataTable metadataTable) {
8367
super(metadataTable);
84-
this.serializationMarker = new Object();
8568
}
8669

8770
@Override
@@ -91,10 +74,6 @@ public long estimatedSize() {
9174

9275
@Override
9376
public void close() throws Exception {
94-
if (serializationMarker == null) {
95-
LOG.info("Releasing resources");
96-
io().close();
97-
}
9877
invalidateCache(name());
9978
}
10079
}

spark/v3.5/spark/src/test/java/org/apache/iceberg/TestTableSerialization.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import static org.assertj.core.api.Assertions.assertThat;
2424
import static org.mockito.Mockito.never;
2525
import static org.mockito.Mockito.spy;
26-
import static org.mockito.Mockito.times;
2726
import static org.mockito.Mockito.verify;
2827
import static org.mockito.Mockito.when;
2928

@@ -100,7 +99,7 @@ public void testCloseSerializableTableKryoSerialization() throws Exception {
10099
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors
101100

102101
verify(spyIO, never()).close();
103-
verify(spyFileIOCopy, times(1)).close();
102+
verify(spyFileIOCopy, never()).close();
104103
}
105104
}
106105

@@ -121,7 +120,7 @@ public void testCloseSerializableTableJavaSerialization() throws Exception {
121120
((AutoCloseable) serializableTableCopy).close(); // mimics close on executors
122121

123122
verify(spyIO, never()).close();
124-
verify(spyFileIOCopy, times(1)).close();
123+
verify(spyFileIOCopy, never()).close();
125124
}
126125
}
127126

0 commit comments

Comments
 (0)