Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support to configure trigger interval #1

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ I have created this project in similar lines for streaming performance scenarios
--rate-row-per-second "20000" \
--output-mode "append" \
--run-time-in-sec 1800 \
--trigger-interval 60000 \
--shuffle-partition 8 \
--use-rocks-db

Expand All @@ -39,7 +40,8 @@ I have created this project in similar lines for streaming performance scenarios
--rate-row-per-second "20000" \
--output-mode "append" \
--run-time-in-sec 1800 \
--shuffle-partition 8 \
--trigger-interval 60000 \
--shuffle-partition 8

###### Analyze the progress
/usr/lib/spark/bin/spark-submit \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ abstract class BenchmarkConf(args: Array[String]) extends ScallopConf(args) {
val runTimeInSec = opt[Long](name = "run-time-in-sec", required = true, noshort = true)
val useRocksDB = opt[Boolean](name = "use-rocks-db", default = Option(false), required = true, noshort = true)
val numShufflePartition = opt[Int](name = "shuffle-partition", default = Option(8), required = true, noshort = true)
val triggerIntervalMillis = opt[Long]("trigger-interval", default=Option(30000L), required = true, noshort = true)

val outputMode = opt[String](name = "output-mode", required = true, noshort = true,
validate = (s: String) => validOutputModes.map(_.toLowerCase()).contains(s.toLowerCase()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ abstract class BaseQuery(conf: StateStoreBenchmarkConf, appName: String, queryNa
val queryStatusFile = conf.queryStatusFile()
val rateRowPerSecond = conf.rateRowPerSecond()
val runTimeInSec = conf.runTimeInSec()
val triggerIntervalMillis = conf.triggerIntervalMillis()

val spark = SparkSession
.builder()
Expand Down Expand Up @@ -51,7 +52,7 @@ abstract class BaseQuery(conf: StateStoreBenchmarkConf, appName: String, queryNa
}
}
)
.trigger(Trigger.ProcessingTime("30 seconds"))
.trigger(Trigger.ProcessingTime(triggerIntervalMillis))
.outputMode(conf.getSparkOutputMode)
.start()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package com.qubole.spark.benchmark.streaming.states

import java.util.concurrent.atomic.AtomicLongArray


object ConcurrentBitSet {
/**
* STATE
*/
val BASE = 64
val MAX_UNSIGNED_LONG = -1L

def mask(id: Int) = 1L << id
}

class ConcurrentBitSet(val bitsCount: Long) {

val bucketsCount: Int = bitsCount.toInt / ConcurrentBitSet.BASE

val buckets = new AtomicLongArray(bucketsCount)

var i = 0
while (i < buckets.length()) {
buckets.set(i, 0)
i += 1
}

/**
* API to set Bits for a key
*/
def set(idx: Long): Unit = {
val bucketIdx = idx.toInt / ConcurrentBitSet.BASE
atomicSet(bucketIdx, idx.toInt - (ConcurrentBitSet.BASE * bucketIdx))
}

/**
* API to set Bits for a key
*/
def get(idx: Long): Boolean = {
val bucketIdx = idx.toInt / ConcurrentBitSet.BASE
atomicGet(bucketIdx, idx.toInt - (ConcurrentBitSet.BASE * bucketIdx))
}

def clear(): Unit = {
throw new RuntimeException("not implemented")
}

def capacity: Long = this.buckets.length * 64

/**
* IMLEMENTATION
*/
private def atomicGet(bucketIdx: Int, toGet: Int): Boolean = {
val l = buckets.get(bucketIdx)
val idxMask = ConcurrentBitSet.mask(toGet)
(l & idxMask) == idxMask
}

private def atomicSet(bucketIdx: Int, toSet: Int): Unit = {
while ( {
true
}) {
val l = buckets.get(bucketIdx)
if (buckets.compareAndSet(bucketIdx, l, l | ConcurrentBitSet.mask(toSet))) return
}
}

def longToBinaryStr(num: Long): String = {
val stringBuilder = new StringBuilder
var i = 0
while ( {
i < ConcurrentBitSet.BASE
}) {
val idxMask = ConcurrentBitSet.mask(i)
stringBuilder.append(if ((num & idxMask) == idxMask) "1"
else "0")

{
i += 1; i - 1
}
}
stringBuilder.toString
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.qubole.spark.benchmark.streaming.states

import scala.collection.mutable.{HashMap, List}

import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream}



class HeaderFileManager(rootPath: String) {

var headerMap: HashMap[Offsets, FileStatus] = _

val headerFilePath = rootPath + "/" + "_header"

def openHeaderFile(): Unit = {

val file = new File(headerFilePath)
val fis = new FileInputStream()(file)
val ois = new ObjectInputStream(fis)
headerMap = ois.readObject().asInstanceOf[HashMap[Offsets, FileStatus]]
}




def commitHeaderFile(): Unit = {

Serialization.write
}

// Get partial and Completely overlapping offsets
def getOverLappingOffsets(offsets: Offsets): OverlappingOffsets = {
val partialOverlaps = headerMap.keys.filter { otherOffsets =>
offsets.isPartialOverlap(otherOffsets)
}.toList
val completeOverlaps = headerMap.keys.filter { otherOffsets =>
offsets.isCompleteOverlap(otherOffsets)
}.toList
OverlappingOffsets(partialOverlaps, completeOverlaps)
}

def read(other: Offsets): String = {
val reader = new StringBuilder()

val effectiveOffsets = headerMap.keys.filterNot(offset => other.isNonOverlap(offset))
val sortedOffsets = effectiveOffsets.toList.sortWith(_.startOffset < _.startOffset)

val offsetList = mutable.List[Offsets]()



}


def addMapEntry(offsets: Offsets, fileStatus: FileStatus): Unit = {
headerMap += (offsets -> fileStatus)
}

def removeCompleteOverlaps(offsets: List[Offsets]): Unit = {
headerMap.retain((other, _) => !offsets.contains(other))
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.qubole.spark.benchmark.streaming.states

case class Offsets(startOffset: Long,
endOffset: Long,
isZeroOffset: Boolean = false) {

def isPartialOverlap(other: Offsets): Boolean = {
!isCompleteOverlap(other) && !isNonOverlap(other)
}

def isCompleteOverlap(other: Offsets): Boolean = {
startOffset <= other.startOffset && endOffset >= other.endOffset
}

def isNonOverlap(other: Offsets): Boolean = {
endOffset < other.startOffset || startOffset > other.endOffset
}


}

case class FileStatus(path: String)

case class OverlappingOffsets(partialOverlaps: List[Offsets],
completeOverlaps: List[Offsets])
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package com.qubole.spark.benchmark.streaming.states

class SparseFile(rootPath: String) {

import com.qubole.spark.benchmark.utils.SparseFileUtils._

val headerFileManager = new HeaderFileManager(rootPath)
var fileSize: Long = _

// Write the bytes in <data> at <offset> bytes from the start
// of the file.
// ask if overwrite needs to be done
def writeBytes(offset: Long, data: String): Unit = {
val offsets = computeOffsets(offset, data)
val overlappingOffsets = headerFileManager.getOverLappingOffsets(offsets)
if (overlappingOffsets.partialOverlaps.nonEmpty) {
throwUnsupportedOperationException("Partial Overwrite")
}
performWrite(offsets, data, overlappingOffsets.completeOverlaps)
}

private def performWrite(offsets: Offsets, data: String, completeOverlaps: List[Offsets]): Unit =
synchronized {
val fileName = writeToFile(data, rootPath)
if (fileName.isDefined) {
headerFileManager.addMapEntry(offsets, FileStatus(fileName.get))
headerFileManager.removeCompleteOverlaps(completeOverlaps)
headerFileManager.commitHeaderFile()
updateSize(data)
}
}

private def updateSize(str: String): Unit = {
fileSize += str.getBytes.length
}

private def computeOffsets(offset: Long, data: String): Offsets = {
Offsets(offset, offset + data.size)
}

// Read bytes starting from the offset upto the given length.
def readBytes(offset: Long, size: Int) : String = {
val offsets = Offsets(offsets, offset + size)
headerFileManager.readFiles(offsets)
}

// Set the size of the file. This can be called at any point
// in time. Unwritten bytes must be assumed to be zero. If
// more bytes have already been written, it should be
// considered gone.
// size in bytes
def setSize(size: Long): Unit = {

}

def getSize(): Long = {
fileSize
}

}

object SparseFile {

def main(args: Array[String]): Unit = {

val sparseFile = createSparseFile()

}

def createSparseFile(): SparseFile = {

println("Enter Root Path")
val rootPath = scala.io.StdIn.readLine()
new SparseFile(rootPath)
}

}
Original file line number Diff line number Diff line change
@@ -1,14 +1,2 @@
package com.qubole.spark.benchmark.streaming.states

object StateStoreBenchmarkRunner {
def main(args: Array[String]): Unit = {
val conf = new StateStoreBenchmarkConf(args)
val benchmarkInstance = conf.initializeBenchmarkClass

println(s"Benchmark class: ${benchmarkInstance.getClass.getCanonicalName}")
println(s"Output Mode: ${conf.getSparkOutputMode}")

benchmarkInstance.runBenchmark()
}

}
Loading