Skip to content

Commit 67e084c

Browse files
advancedxyRussellSpitzeramogh-jahagirdar
authored
API: Support removeUnusedSpecs in ExpireSnapshots (apache#10755)
Implement an API in ExpireSnapshots to remove partition specs no longer in use by performing a reachability analysis, so that metadata sizes can be maintained. Co-authored-by: Russell_Spitzer <[email protected]> Co-authored-by: Amogh Jahagirdar <[email protected]>
1 parent 1cbc163 commit 67e084c

File tree

11 files changed

+394
-6
lines changed

11 files changed

+394
-6
lines changed

api/src/main/java/org/apache/iceberg/ExpireSnapshots.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,15 @@ public interface ExpireSnapshots extends PendingUpdate<List<Snapshot>> {
118118
* @return this for method chaining
119119
*/
120120
ExpireSnapshots cleanExpiredFiles(boolean clean);
121+
122+
/**
123+
* Enable cleaning up unused metadata, such as partition specs, schemas, etc.
124+
*
125+
* @param clean remove unused partition specs, schemas, or other metadata when true
126+
* @return this for method chaining
127+
*/
128+
default ExpireSnapshots cleanExpiredMetadata(boolean clean) {
129+
throw new UnsupportedOperationException(
130+
this.getClass().getName() + " doesn't implement cleanExpiredMetadata");
131+
}
121132
}

core/src/main/java/org/apache/iceberg/IncrementalFileCleanup.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.IOException;
2222
import java.util.List;
23+
import java.util.Map;
2324
import java.util.Set;
2425
import java.util.concurrent.ConcurrentHashMap;
2526
import java.util.concurrent.ExecutorService;
@@ -256,7 +257,8 @@ public void cleanFiles(TableMetadata beforeExpiration, TableMetadata afterExpira
256257
});
257258

258259
Set<String> filesToDelete =
259-
findFilesToDelete(manifestsToScan, manifestsToRevert, validIds, afterExpiration);
260+
findFilesToDelete(
261+
manifestsToScan, manifestsToRevert, validIds, beforeExpiration.specsById());
260262

261263
deleteFiles(filesToDelete, "data");
262264
deleteFiles(manifestsToDelete, "manifest");
@@ -273,7 +275,7 @@ private Set<String> findFilesToDelete(
273275
Set<ManifestFile> manifestsToScan,
274276
Set<ManifestFile> manifestsToRevert,
275277
Set<Long> validIds,
276-
TableMetadata current) {
278+
Map<Integer, PartitionSpec> specsById) {
277279
Set<String> filesToDelete = ConcurrentHashMap.newKeySet();
278280
Tasks.foreach(manifestsToScan)
279281
.retry(3)
@@ -285,8 +287,7 @@ private Set<String> findFilesToDelete(
285287
.run(
286288
manifest -> {
287289
// the manifest has deletes, scan it to find files to delete
288-
try (ManifestReader<?> reader =
289-
ManifestFiles.open(manifest, fileIO, current.specsById())) {
290+
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
290291
for (ManifestEntry<?> entry : reader.entries()) {
291292
// if the snapshot ID of the DELETE entry is no longer valid, the data can be
292293
// deleted
@@ -311,8 +312,7 @@ private Set<String> findFilesToDelete(
311312
.run(
312313
manifest -> {
313314
// the manifest has deletes, scan it to find files to delete
314-
try (ManifestReader<?> reader =
315-
ManifestFiles.open(manifest, fileIO, current.specsById())) {
315+
try (ManifestReader<?> reader = ManifestFiles.open(manifest, fileIO, specsById)) {
316316
for (ManifestEntry<?> entry : reader.entries()) {
317317
// delete any ADDED file from manifests that were reverted
318318
if (entry.status() == ManifestEntry.Status.ADDED) {

core/src/main/java/org/apache/iceberg/MetadataUpdate.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,23 @@ public void applyTo(TableMetadata.Builder metadataBuilder) {
175175
}
176176
}
177177

178+
class RemovePartitionSpecs implements MetadataUpdate {
179+
private final Set<Integer> specIds;
180+
181+
public RemovePartitionSpecs(Set<Integer> specIds) {
182+
this.specIds = specIds;
183+
}
184+
185+
public Set<Integer> specIds() {
186+
return specIds;
187+
}
188+
189+
@Override
190+
public void applyTo(TableMetadata.Builder metadataBuilder) {
191+
metadataBuilder.removeSpecs(specIds);
192+
}
193+
}
194+
178195
class AddSortOrder implements MetadataUpdate {
179196
private final UnboundSortOrder sortOrder;
180197

core/src/main/java/org/apache/iceberg/MetadataUpdateParser.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ private MetadataUpdateParser() {}
5959
static final String SET_CURRENT_VIEW_VERSION = "set-current-view-version";
6060
static final String SET_PARTITION_STATISTICS = "set-partition-statistics";
6161
static final String REMOVE_PARTITION_STATISTICS = "remove-partition-statistics";
62+
static final String REMOVE_PARTITION_SPECS = "remove-partition-specs";
6263

6364
// AssignUUID
6465
private static final String UUID = "uuid";
@@ -126,6 +127,9 @@ private MetadataUpdateParser() {}
126127
// SetCurrentViewVersion
127128
private static final String VIEW_VERSION_ID = "view-version-id";
128129

130+
// RemovePartitionSpecs
131+
private static final String SPEC_IDS = "spec-ids";
132+
129133
private static final Map<Class<? extends MetadataUpdate>, String> ACTIONS =
130134
ImmutableMap.<Class<? extends MetadataUpdate>, String>builder()
131135
.put(MetadataUpdate.AssignUUID.class, ASSIGN_UUID)
@@ -149,6 +153,7 @@ private MetadataUpdateParser() {}
149153
.put(MetadataUpdate.SetLocation.class, SET_LOCATION)
150154
.put(MetadataUpdate.AddViewVersion.class, ADD_VIEW_VERSION)
151155
.put(MetadataUpdate.SetCurrentViewVersion.class, SET_CURRENT_VIEW_VERSION)
156+
.put(MetadataUpdate.RemovePartitionSpecs.class, REMOVE_PARTITION_SPECS)
152157
.buildOrThrow();
153158

154159
public static String toJson(MetadataUpdate metadataUpdate) {
@@ -241,6 +246,9 @@ public static void toJson(MetadataUpdate metadataUpdate, JsonGenerator generator
241246
writeSetCurrentViewVersionId(
242247
(MetadataUpdate.SetCurrentViewVersion) metadataUpdate, generator);
243248
break;
249+
case REMOVE_PARTITION_SPECS:
250+
writeRemovePartitionSpecs((MetadataUpdate.RemovePartitionSpecs) metadataUpdate, generator);
251+
break;
244252
default:
245253
throw new IllegalArgumentException(
246254
String.format(
@@ -312,6 +320,8 @@ public static MetadataUpdate fromJson(JsonNode jsonNode) {
312320
return readAddViewVersion(jsonNode);
313321
case SET_CURRENT_VIEW_VERSION:
314322
return readCurrentViewVersionId(jsonNode);
323+
case REMOVE_PARTITION_SPECS:
324+
return readRemovePartitionSpecs(jsonNode);
315325
default:
316326
throw new UnsupportedOperationException(
317327
String.format("Cannot convert metadata update action to json: %s", action));
@@ -447,6 +457,11 @@ private static void writeSetCurrentViewVersionId(
447457
gen.writeNumberField(VIEW_VERSION_ID, metadataUpdate.versionId());
448458
}
449459

460+
private static void writeRemovePartitionSpecs(
461+
MetadataUpdate.RemovePartitionSpecs metadataUpdate, JsonGenerator gen) throws IOException {
462+
JsonUtil.writeIntegerArray(SPEC_IDS, metadataUpdate.specIds(), gen);
463+
}
464+
450465
private static MetadataUpdate readAssignUUID(JsonNode node) {
451466
String uuid = JsonUtil.getString(UUID, node);
452467
return new MetadataUpdate.AssignUUID(uuid);
@@ -596,4 +611,8 @@ private static MetadataUpdate readAddViewVersion(JsonNode node) {
596611
private static MetadataUpdate readCurrentViewVersionId(JsonNode node) {
597612
return new MetadataUpdate.SetCurrentViewVersion(JsonUtil.getInt(VIEW_VERSION_ID, node));
598613
}
614+
615+
private static MetadataUpdate readRemovePartitionSpecs(JsonNode node) {
616+
return new MetadataUpdate.RemovePartitionSpecs(JsonUtil.getIntegerSet(SPEC_IDS, node));
617+
}
599618
}

core/src/main/java/org/apache/iceberg/RemoveSnapshots.java

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
import java.util.Set;
4242
import java.util.concurrent.ExecutorService;
4343
import java.util.function.Consumer;
44+
import java.util.stream.Collectors;
4445
import org.apache.iceberg.exceptions.CommitFailedException;
4546
import org.apache.iceberg.exceptions.ValidationException;
4647
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -85,6 +86,7 @@ public void accept(String file) {
8586
private ExecutorService planExecutorService = ThreadPools.getWorkerPool();
8687
private Boolean incrementalCleanup;
8788
private boolean specifiedSnapshotId = false;
89+
private boolean cleanExpiredMetadata = false;
8890

8991
RemoveSnapshots(TableOperations ops) {
9092
this.ops = ops;
@@ -159,6 +161,12 @@ public ExpireSnapshots planWith(ExecutorService executorService) {
159161
return this;
160162
}
161163

164+
@Override
165+
public ExpireSnapshots cleanExpiredMetadata(boolean clean) {
166+
this.cleanExpiredMetadata = clean;
167+
return this;
168+
}
169+
162170
@Override
163171
public List<Snapshot> apply() {
164172
TableMetadata updated = internalApply();
@@ -209,6 +217,26 @@ private TableMetadata internalApply() {
209217
.forEach(idsToRemove::add);
210218
updatedMetaBuilder.removeSnapshots(idsToRemove);
211219

220+
if (cleanExpiredMetadata) {
221+
// TODO: Support cleaning expired schema as well.
222+
Set<Integer> reachableSpecs = Sets.newConcurrentHashSet();
223+
reachableSpecs.add(base.defaultSpecId());
224+
Tasks.foreach(idsToRetain)
225+
.executeWith(planExecutorService)
226+
.run(
227+
snapshot ->
228+
base.snapshot(snapshot).allManifests(ops.io()).stream()
229+
.map(ManifestFile::partitionSpecId)
230+
.forEach(reachableSpecs::add));
231+
232+
Set<Integer> specsToRemove =
233+
base.specs().stream()
234+
.map(PartitionSpec::specId)
235+
.filter(specId -> !reachableSpecs.contains(specId))
236+
.collect(Collectors.toSet());
237+
updatedMetaBuilder.removeSpecs(specsToRemove);
238+
}
239+
212240
return updatedMetaBuilder.build();
213241
}
214242

core/src/main/java/org/apache/iceberg/TableMetadata.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,6 +1138,19 @@ public Builder setDefaultPartitionSpec(int specId) {
11381138
return this;
11391139
}
11401140

1141+
Builder removeSpecs(Iterable<Integer> specIds) {
1142+
Set<Integer> specIdsToRemove = Sets.newHashSet(specIds);
1143+
Preconditions.checkArgument(
1144+
!specIdsToRemove.contains(defaultSpecId), "Cannot remove the default partition spec");
1145+
1146+
this.specs =
1147+
specs.stream()
1148+
.filter(s -> !specIdsToRemove.contains(s.specId()))
1149+
.collect(Collectors.toList());
1150+
changes.add(new MetadataUpdate.RemovePartitionSpecs(specIdsToRemove));
1151+
return this;
1152+
}
1153+
11411154
public Builder addPartitionSpec(UnboundPartitionSpec spec) {
11421155
addPartitionSpecInternal(spec.bind(schemasById.get(currentSchemaId)));
11431156
return this;

core/src/main/java/org/apache/iceberg/UpdateRequirements.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ private Builder update(MetadataUpdate update) {
105105
update((MetadataUpdate.SetDefaultPartitionSpec) update);
106106
} else if (update instanceof MetadataUpdate.SetDefaultSortOrder) {
107107
update((MetadataUpdate.SetDefaultSortOrder) update);
108+
} else if (update instanceof MetadataUpdate.RemovePartitionSpecs) {
109+
update((MetadataUpdate.RemovePartitionSpecs) update);
108110
}
109111

110112
return this;
@@ -173,6 +175,27 @@ private void update(MetadataUpdate.SetDefaultSortOrder unused) {
173175
}
174176
}
175177

178+
private void update(MetadataUpdate.RemovePartitionSpecs unused) {
179+
// require that the default partition spec has not changed
180+
if (!setSpecId) {
181+
if (base != null && !isReplace) {
182+
require(new UpdateRequirement.AssertDefaultSpecID(base.defaultSpecId()));
183+
}
184+
this.setSpecId = true;
185+
}
186+
187+
// require that no branches have changed, so that old specs won't be written.
188+
if (base != null && !isReplace) {
189+
base.refs()
190+
.forEach(
191+
(name, ref) -> {
192+
if (ref.isBranch() && !name.equals(SnapshotRef.MAIN_BRANCH)) {
193+
require(new UpdateRequirement.AssertRefSnapshotID(name, ref.snapshotId()));
194+
}
195+
});
196+
}
197+
}
198+
176199
private List<UpdateRequirement> build() {
177200
return requirements.build();
178201
}

core/src/test/java/org/apache/iceberg/TestMetadataUpdateParser.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -912,6 +912,17 @@ public void testRemovePartitionStatistics() {
912912
.isEqualTo(json);
913913
}
914914

915+
@Test
916+
public void testRemovePartitionSpec() {
917+
String action = MetadataUpdateParser.REMOVE_PARTITION_SPECS;
918+
String json = "{\"action\":\"remove-partition-specs\",\"spec-ids\":[1,2,3]}";
919+
MetadataUpdate expected = new MetadataUpdate.RemovePartitionSpecs(ImmutableSet.of(1, 2, 3));
920+
assertEquals(action, expected, MetadataUpdateParser.fromJson(json));
921+
assertThat(MetadataUpdateParser.toJson(expected))
922+
.as("Remove partition specs should convert to the correct JSON value")
923+
.isEqualTo(json);
924+
}
925+
915926
public void assertEquals(
916927
String action, MetadataUpdate expectedUpdate, MetadataUpdate actualUpdate) {
917928
switch (action) {
@@ -1016,6 +1027,11 @@ public void assertEquals(
10161027
(MetadataUpdate.SetCurrentViewVersion) expectedUpdate,
10171028
(MetadataUpdate.SetCurrentViewVersion) actualUpdate);
10181029
break;
1030+
case MetadataUpdateParser.REMOVE_PARTITION_SPECS:
1031+
assertEqualsRemovePartitionSpecs(
1032+
(MetadataUpdate.RemovePartitionSpecs) expectedUpdate,
1033+
(MetadataUpdate.RemovePartitionSpecs) actualUpdate);
1034+
break;
10191035
default:
10201036
fail("Unrecognized metadata update action: " + action);
10211037
}
@@ -1237,6 +1253,11 @@ private static void assertEqualsSetCurrentViewVersion(
12371253
assertThat(actual.versionId()).isEqualTo(expected.versionId());
12381254
}
12391255

1256+
private static void assertEqualsRemovePartitionSpecs(
1257+
MetadataUpdate.RemovePartitionSpecs expected, MetadataUpdate.RemovePartitionSpecs actual) {
1258+
assertThat(actual.specIds()).containsExactlyInAnyOrderElementsOf(expected.specIds());
1259+
}
1260+
12401261
private String createManifestListWithManifestFiles(long snapshotId, Long parentSnapshotId)
12411262
throws IOException {
12421263
File manifestList = File.createTempFile("manifests", null, temp.toFile());

0 commit comments

Comments
 (0)