Skip to content

Commit 6d43061

Browse files
StefanRRichteraljoscha
authored andcommitted
[FLINK-4380] Remove KeyGroupAssigner in favor of static method/Have default max. parallelism at 128
1 parent 2b7a8d6 commit 6d43061

File tree

42 files changed

+297
-586
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+297
-586
lines changed

flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.flink.core.memory.DataOutputView;
2626
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
2727
import org.apache.flink.runtime.query.netty.message.KvStateRequestSerializer;
28+
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
2829
import org.apache.flink.runtime.state.KvState;
2930
import org.apache.flink.util.Preconditions;
3031
import org.rocksdb.ColumnFamilyHandle;
@@ -130,7 +131,7 @@ public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Except
130131
backend.getKeySerializer(),
131132
namespaceSerializer);
132133

133-
int keyGroup = backend.getKeyGroupAssigner().getKeyGroupIndex(des.f0);
134+
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
134135
writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
135136
return backend.db.get(columnFamily, keySerializationStream.toByteArray());
136137

flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.apache.flink.api.common.JobID;
2222
import org.apache.flink.api.common.state.FoldingState;
2323
import org.apache.flink.api.common.state.FoldingStateDescriptor;
24-
import org.apache.flink.api.common.state.KeyGroupAssigner;
2524
import org.apache.flink.api.common.state.ListState;
2625
import org.apache.flink.api.common.state.ListStateDescriptor;
2726
import org.apache.flink.api.common.state.ReducingState;
@@ -131,11 +130,11 @@ public RocksDBKeyedStateBackend(
131130
ColumnFamilyOptions columnFamilyOptions,
132131
TaskKvStateRegistry kvStateRegistry,
133132
TypeSerializer<K> keySerializer,
134-
KeyGroupAssigner<K> keyGroupAssigner,
133+
int numberOfKeyGroups,
135134
KeyGroupRange keyGroupRange
136135
) throws Exception {
137136

138-
super(kvStateRegistry, keySerializer, keyGroupAssigner, keyGroupRange);
137+
super(kvStateRegistry, keySerializer, numberOfKeyGroups, keyGroupRange);
139138

140139
this.operatorIdentifier = operatorIdentifier;
141140
this.jobId = jobId;
@@ -183,7 +182,7 @@ public RocksDBKeyedStateBackend(
183182
ColumnFamilyOptions columnFamilyOptions,
184183
TaskKvStateRegistry kvStateRegistry,
185184
TypeSerializer<K> keySerializer,
186-
KeyGroupAssigner<K> keyGroupAssigner,
185+
int numberOfKeyGroups,
187186
KeyGroupRange keyGroupRange,
188187
List<KeyGroupsStateHandle> restoreState
189188
) throws Exception {
@@ -195,7 +194,7 @@ public RocksDBKeyedStateBackend(
195194
columnFamilyOptions,
196195
kvStateRegistry,
197196
keySerializer,
198-
keyGroupAssigner,
197+
numberOfKeyGroups,
199198
keyGroupRange);
200199

201200
LOG.info("Initializing RocksDB keyed state backend from snapshot.");

flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.flink.contrib.streaming.state;
1919

2020
import org.apache.flink.api.common.JobID;
21-
import org.apache.flink.api.common.state.KeyGroupAssigner;
2221
import org.apache.flink.api.common.state.StateBackend;
2322
import org.apache.flink.api.common.typeutils.TypeSerializer;
2423
import org.apache.flink.core.fs.Path;
@@ -230,7 +229,7 @@ public <K> KeyedStateBackend<K> createKeyedStateBackend(
230229
JobID jobID,
231230
String operatorIdentifier,
232231
TypeSerializer<K> keySerializer,
233-
KeyGroupAssigner<K> keyGroupAssigner,
232+
int numberOfKeyGroups,
234233
KeyGroupRange keyGroupRange,
235234
TaskKvStateRegistry kvStateRegistry) throws Exception {
236235

@@ -246,15 +245,15 @@ public <K> KeyedStateBackend<K> createKeyedStateBackend(
246245
getColumnOptions(),
247246
kvStateRegistry,
248247
keySerializer,
249-
keyGroupAssigner,
248+
numberOfKeyGroups,
250249
keyGroupRange);
251250
}
252251

253252
@Override
254253
public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID jobID,
255254
String operatorIdentifier,
256255
TypeSerializer<K> keySerializer,
257-
KeyGroupAssigner<K> keyGroupAssigner,
256+
int numberOfKeyGroups,
258257
KeyGroupRange keyGroupRange,
259258
List<KeyGroupsStateHandle> restoredState,
260259
TaskKvStateRegistry kvStateRegistry) throws Exception {
@@ -270,7 +269,7 @@ public <K> KeyedStateBackend<K> restoreKeyedStateBackend(Environment env, JobID
270269
getColumnOptions(),
271270
kvStateRegistry,
272271
keySerializer,
273-
keyGroupAssigner,
272+
numberOfKeyGroups,
274273
keyGroupRange,
275274
restoredState);
276275
}

flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import org.apache.flink.runtime.query.KvStateRegistry;
2929

3030
import org.apache.flink.runtime.state.AbstractStateBackend;
31-
import org.apache.flink.runtime.state.HashKeyGroupAssigner;
3231
import org.apache.flink.runtime.state.KeyGroupRange;
3332
import org.apache.flink.util.OperatingSystem;
3433
import org.junit.Assume;
@@ -93,7 +92,7 @@ public void testSetDbPath() throws Exception {
9392
env.getJobID(),
9493
"test_op",
9594
IntSerializer.INSTANCE,
96-
new HashKeyGroupAssigner<Integer>(1),
95+
1,
9796
new KeyGroupRange(0, 0),
9897
env.getTaskKvStateRegistry());
9998

@@ -147,7 +146,7 @@ public void testUseTempDirectories() throws Exception {
147146
env.getJobID(),
148147
"test_op",
149148
IntSerializer.INSTANCE,
150-
new HashKeyGroupAssigner<Integer>(1),
149+
1,
151150
new KeyGroupRange(0, 0),
152151
env.getTaskKvStateRegistry());
153152

@@ -182,7 +181,7 @@ public void testFailWhenNoLocalStorageDir() throws Exception {
182181
env.getJobID(),
183182
"foobar",
184183
IntSerializer.INSTANCE,
185-
new HashKeyGroupAssigner<Integer>(1),
184+
1,
186185
new KeyGroupRange(0, 0),
187186
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
188187
}
@@ -224,7 +223,7 @@ public void testContinueOnSomeDbDirectoriesMissing() throws Exception {
224223
env.getJobID(),
225224
"foobar",
226225
IntSerializer.INSTANCE,
227-
new HashKeyGroupAssigner<Integer>(1),
226+
1,
228227
new KeyGroupRange(0, 0),
229228
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
230229
}

flink-core/src/main/java/org/apache/flink/api/common/state/KeyGroupAssigner.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

flink-core/src/main/java/org/apache/flink/util/MathUtils.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,21 @@ else if (code != Integer.MIN_VALUE) {
155155
}
156156
}
157157

158+
/**
159+
* Round the given number to the next power of two
160+
* @param x number to round
161+
* @return x rounded up to the next power of two
162+
*/
163+
public static int roundUpToPowerOfTwo(int x) {
164+
x = x - 1;
165+
x |= x >> 1;
166+
x |= x >> 2;
167+
x |= x >> 4;
168+
x |= x >> 8;
169+
x |= x >> 16;
170+
return x + 1;
171+
}
172+
158173
// ============================================================================================
159174

160175
/**

flink-core/src/test/java/org/apache/flink/util/MathUtilTest.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,37 @@ public void testRoundDownToPowerOf2() {
8080
assertEquals(1073741824, MathUtils.roundDownToPowerOf2(Integer.MAX_VALUE));
8181
}
8282

83+
@Test
84+
public void testRoundUpToPowerOf2() {
85+
assertEquals(0, MathUtils.roundUpToPowerOfTwo(0));
86+
assertEquals(1, MathUtils.roundUpToPowerOfTwo(1));
87+
assertEquals(2, MathUtils.roundUpToPowerOfTwo(2));
88+
assertEquals(4, MathUtils.roundUpToPowerOfTwo(3));
89+
assertEquals(4, MathUtils.roundUpToPowerOfTwo(4));
90+
assertEquals(8, MathUtils.roundUpToPowerOfTwo(5));
91+
assertEquals(8, MathUtils.roundUpToPowerOfTwo(6));
92+
assertEquals(8, MathUtils.roundUpToPowerOfTwo(7));
93+
assertEquals(8, MathUtils.roundUpToPowerOfTwo(8));
94+
assertEquals(16, MathUtils.roundUpToPowerOfTwo(9));
95+
assertEquals(16, MathUtils.roundUpToPowerOfTwo(15));
96+
assertEquals(16, MathUtils.roundUpToPowerOfTwo(16));
97+
assertEquals(32, MathUtils.roundUpToPowerOfTwo(17));
98+
assertEquals(32, MathUtils.roundUpToPowerOfTwo(31));
99+
assertEquals(32, MathUtils.roundUpToPowerOfTwo(32));
100+
assertEquals(64, MathUtils.roundUpToPowerOfTwo(33));
101+
assertEquals(64, MathUtils.roundUpToPowerOfTwo(42));
102+
assertEquals(64, MathUtils.roundUpToPowerOfTwo(63));
103+
assertEquals(64, MathUtils.roundUpToPowerOfTwo(64));
104+
assertEquals(128, MathUtils.roundUpToPowerOfTwo(125));
105+
assertEquals(32768, MathUtils.roundUpToPowerOfTwo(25654));
106+
assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(34366363));
107+
assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108863));
108+
assertEquals(67108864, MathUtils.roundUpToPowerOfTwo(67108864));
109+
assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x3FFFFFFE));
110+
assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x3FFFFFFF));
111+
assertEquals(0x40000000, MathUtils.roundUpToPowerOfTwo(0x40000000));
112+
}
113+
83114
@Test
84115
public void testPowerOfTwo() {
85116
assertTrue(MathUtils.isPowerOf2(1));

flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint;
3737
import org.apache.flink.runtime.state.ChainedStateHandle;
3838
import org.apache.flink.runtime.state.KeyGroupRange;
39+
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
3940
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
4041
import org.apache.flink.runtime.state.StreamStateHandle;
4142
import org.apache.flink.util.Preconditions;
@@ -886,7 +887,7 @@ public static List<KeyGroupRange> createKeyGroupPartitions(int numberKeyGroups,
886887
List<KeyGroupRange> result = new ArrayList<>(parallelism);
887888
int start = 0;
888889
for (int i = 0; i < parallelism; ++i) {
889-
result.add(KeyGroupRange.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
890+
result.add(KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex(numberKeyGroups, parallelism, i));
890891
}
891892
return result;
892893
}

flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobVertex.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,8 @@ public int getMaxParallelism() {
253253
*/
254254
public void setMaxParallelism(int maxParallelism) {
255255
org.apache.flink.util.Preconditions.checkArgument(
256-
maxParallelism > 0 && maxParallelism <= Short.MAX_VALUE, "The max parallelism must be at least 1.");
256+
maxParallelism > 0 && maxParallelism <= (1 << 15),
257+
"The max parallelism must be at least 1 and smaller than 2^15.");
257258

258259
this.maxParallelism = maxParallelism;
259260
}

flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
package org.apache.flink.runtime.state;
2020

2121
import org.apache.flink.api.common.JobID;
22-
import org.apache.flink.api.common.state.KeyGroupAssigner;
2322
import org.apache.flink.api.common.typeutils.TypeSerializer;
2423
import org.apache.flink.runtime.execution.Environment;
2524
import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -53,7 +52,7 @@ public abstract <K> KeyedStateBackend<K> createKeyedStateBackend(
5352
JobID jobID,
5453
String operatorIdentifier,
5554
TypeSerializer<K> keySerializer,
56-
KeyGroupAssigner<K> keyGroupAssigner,
55+
int numberOfKeyGroups,
5756
KeyGroupRange keyGroupRange,
5857
TaskKvStateRegistry kvStateRegistry) throws Exception;
5958

@@ -66,7 +65,7 @@ public abstract <K> KeyedStateBackend<K> restoreKeyedStateBackend(
6665
JobID jobID,
6766
String operatorIdentifier,
6867
TypeSerializer<K> keySerializer,
69-
KeyGroupAssigner<K> keyGroupAssigner,
68+
int numberOfKeyGroups,
7069
KeyGroupRange keyGroupRange,
7170
List<KeyGroupsStateHandle> restoredState,
7271
TaskKvStateRegistry kvStateRegistry) throws Exception;

0 commit comments

Comments
 (0)