Skip to content

Commit 32f3a29

Browse files
siyinganishshri-db
authored andcommitted
[SPARK-54264][SS] DeDup Operator can use RocksDB's keyExists()
### What changes were proposed in this pull request? Add a keyExists() function in StateStore. In RocksDB state store, it would use RocksDB's keyExists(), while by default, it will check whether get() to return any value. Change deduplication to use keyExists(), rather than get() to see whether the key exists in the state store. ### Why are the changes needed? Dedup operation now uses RocksDB's Get() only to check if there is result. Rather than doing that, it can use keyExists(), which serves the exact purpose but is faster. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing RocksDBStateStoreStreamingDeduplicationSuite should already cover this case. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #52960 from siying/dedup_keyexists. Authored-by: Siying Dong <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent 811482a commit 32f3a29

File tree

6 files changed

+133
-2
lines changed

6 files changed

+133
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/operators/stateful/statefulOperators.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1317,8 +1317,8 @@ abstract class BaseStreamingDeduplicateExec
13171317
val result = baseIterator.filter { r =>
13181318
val row = r.asInstanceOf[UnsafeRow]
13191319
val key = getKey(row)
1320-
val value = store.get(key)
1321-
if (value == null) {
1320+
val keyExists = store.keyExists(key)
1321+
if (!keyExists) {
13221322
putDupInfoIntoState(store, row, key, reusedDupInfoRow)
13231323
numUpdatedStateRows += 1
13241324
numOutputRows += 1

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -983,6 +983,26 @@ class RocksDB(
983983
}
984984
}
985985

986+
/**
987+
* This method should gives a 100% guarantee of a correct result, whether the key exists or
988+
* not.
989+
*
990+
* @param key The key to check
991+
* @param cfName The column family name
992+
* @return true if the key exists, false otherwise
993+
*/
994+
def keyExists(
995+
key: Array[Byte],
996+
cfName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Boolean = {
997+
updateMemoryUsageIfNeeded()
998+
val keyWithPrefix = if (useColumnFamilies) {
999+
encodeStateRowWithPrefix(key, cfName)
1000+
} else {
1001+
key
1002+
}
1003+
db.keyExists(keyWithPrefix)
1004+
}
1005+
9861006
/**
9871007
* Get the values for a given key if present, that were merged (via merge).
9881008
* This returns the values as an iterator of index range, to allow inline access

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,15 @@ private[sql] class RocksDBStateStoreProvider
251251
value
252252
}
253253

254+
override def keyExists(key: UnsafeRow, colFamilyName: String): Boolean = {
255+
validateAndTransitionState(UPDATE)
256+
verify(key != null, "Key cannot be null")
257+
verifyColFamilyOperations("keyExists", colFamilyName)
258+
259+
val kvEncoder = keyValueEncoderMap.get(colFamilyName)
260+
rocksDB.keyExists(kvEncoder._1.encodeKey(key), colFamilyName)
261+
}
262+
254263
/**
255264
* Provides an iterator containing all values of a non-null key.
256265
*

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,23 @@ trait ReadStateStore {
115115
key: UnsafeRow,
116116
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): UnsafeRow
117117

118+
/**
119+
* Check if a key exists in the store, with 100% guarantee of a correct result.
120+
*
121+
* Default implementation calls get() and checks if the result is null.
122+
* Implementations backed by RocksDB should override this to use the native
123+
* keyExists() method for better performance.
124+
*
125+
* @param key The key to check
126+
* @param colFamilyName The column family name
127+
* @return true if the key exists, false if it doesn't exist
128+
*/
129+
def keyExists(
130+
key: UnsafeRow,
131+
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Boolean = {
132+
get(key, colFamilyName) != null
133+
}
134+
118135
/**
119136
* Provides an iterator containing all values of a non-null key. If key does not exist,
120137
* an empty iterator is returned. Implementations should make sure to return an empty
@@ -305,6 +322,12 @@ class WrappedReadStateStore(store: StateStore) extends ReadStateStore {
305322
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): UnsafeRow = store.get(key,
306323
colFamilyName)
307324

325+
override def keyExists(
326+
key: UnsafeRow,
327+
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Boolean = {
328+
store.keyExists(key, colFamilyName)
329+
}
330+
308331
override def iterator(colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME)
309332
: StateStoreIterator[UnsafeRowPair] = store.iterator(colFamilyName)
310333

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreCheckpointFormatV2Suite.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ case class CkptIdCollectingStateStoreWrapper(innerStore: StateStore) extends Sta
7070
innerStore.get(key, colFamilyName)
7171
}
7272

73+
override def keyExists(
74+
key: UnsafeRow,
75+
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Boolean = {
76+
innerStore.keyExists(key, colFamilyName)
77+
}
78+
7379
override def valuesIterator(
7480
key: UnsafeRow,
7581
colFamilyName: String = StateStore.DEFAULT_COL_FAMILY_NAME): Iterator[UnsafeRow] = {

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1393,6 +1393,79 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
13931393
encodeMethod.invoke(db, key.getBytes, cfName).asInstanceOf[Array[Byte]]
13941394
}
13951395

1396+
testWithStateStoreCheckpointIdsAndColumnFamilies(
1397+
"RocksDB: keyExists over 1000 random keys across CFs",
1398+
TestWithBothChangelogCheckpointingEnabledAndDisabled) {
1399+
case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>
1400+
val remoteDir = Utils.createTempDir().toString
1401+
new File(remoteDir).delete()
1402+
1403+
val conf = dbConf.copy(compactOnCommit = false)
1404+
withDB(
1405+
remoteDir,
1406+
conf = conf,
1407+
useColumnFamilies = colFamiliesEnabled,
1408+
enableStateStoreCheckpointIds = enableStateStoreCheckpointIds) { db =>
1409+
val totalPresent = 500
1410+
val totalAbsent = 500
1411+
1412+
// Generate present and absent keys using simple disjoint prefixes
1413+
val presentKeysAll = (0 until totalPresent).map(i => s"present_$i")
1414+
1415+
// Insert present keys
1416+
db.load(0)
1417+
// If column families are enabled, create a CF and use it uniformly (after load)
1418+
val cfNameOpt =
1419+
if (colFamiliesEnabled) {
1420+
val cf = "test_cf_random"
1421+
db.createColFamilyIfAbsent(cf, isInternal = false)
1422+
Some(cf)
1423+
} else {
1424+
None
1425+
}
1426+
cfNameOpt match {
1427+
case Some(cf) =>
1428+
presentKeysAll.foreach { k => db.put(k, s"v_$k", cf) }
1429+
case None =>
1430+
presentKeysAll.foreach { k => db.put(k, s"v_$k") }
1431+
}
1432+
1433+
// Generate absent keys using a different prefix to avoid overlap
1434+
val absentKeysAll = (0 until totalAbsent).map(i => s"absent_$i")
1435+
1436+
// Validation helper to avoid duplication
1437+
def validate(label: String): Unit = {
1438+
cfNameOpt match {
1439+
case Some(cf) =>
1440+
presentKeysAll.foreach { k =>
1441+
assert(db.keyExists(k, cf),
1442+
s"$label Expected keyExists(true) for present CF key $k")
1443+
}
1444+
absentKeysAll.foreach { k =>
1445+
assert(!db.keyExists(k, cf),
1446+
s"$label Expected keyExists(false) for absent CF key $k")
1447+
}
1448+
case None =>
1449+
presentKeysAll.foreach { k =>
1450+
assert(db.keyExists(k),
1451+
s"$label Expected keyExists(true) for present default key $k")
1452+
}
1453+
absentKeysAll.foreach { k =>
1454+
assert(!db.keyExists(k),
1455+
s"$label Expected keyExists(false) for absent default key $k")
1456+
}
1457+
}
1458+
}
1459+
1460+
// First check before commit
1461+
validate("(pre-commit)")
1462+
1463+
// Commit and re-check
1464+
db.commit()
1465+
validate("(post-commit)")
1466+
}
1467+
}
1468+
13961469
testWithStateStoreCheckpointIdsAndColumnFamilies(s"RocksDB: get, put, iterator, commit, load",
13971470
TestWithBothChangelogCheckpointingEnabledAndDisabled) {
13981471
case (enableStateStoreCheckpointIds, colFamiliesEnabled) =>

0 commit comments

Comments
 (0)