From 1caef3f45715813fcad1b6312681c22865a0dd44 Mon Sep 17 00:00:00 2001 From: Abhishek Dixit Date: Wed, 25 Sep 2019 21:31:38 +0530 Subject: [PATCH 1/3] adding option to configure trigger interval --- src/main/scala/com/qubole/spark/benchmark/BenchmarkConf.scala | 1 + .../qubole/spark/benchmark/streaming/states/BaseQuery.scala | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/scala/com/qubole/spark/benchmark/BenchmarkConf.scala b/src/main/scala/com/qubole/spark/benchmark/BenchmarkConf.scala index cb4242e..9599f8b 100644 --- a/src/main/scala/com/qubole/spark/benchmark/BenchmarkConf.scala +++ b/src/main/scala/com/qubole/spark/benchmark/BenchmarkConf.scala @@ -11,6 +11,7 @@ abstract class BenchmarkConf(args: Array[String]) extends ScallopConf(args) { val runTimeInSec = opt[Long](name = "run-time-in-sec", required = true, noshort = true) val useRocksDB = opt[Boolean](name = "use-rocks-db", default = Option(false), required = true, noshort = true) val numShufflePartition = opt[Int](name = "shuffle-partition", default = Option(8), required = true, noshort = true) + val triggerIntervalMillis = opt[Long]("trigger-interval", default=Option(30000L), required = true, noshort = true) val outputMode = opt[String](name = "output-mode", required = true, noshort = true, validate = (s: String) => validOutputModes.map(_.toLowerCase()).contains(s.toLowerCase())) diff --git a/src/main/scala/com/qubole/spark/benchmark/streaming/states/BaseQuery.scala b/src/main/scala/com/qubole/spark/benchmark/streaming/states/BaseQuery.scala index 96d8ea0..9e572c8 100644 --- a/src/main/scala/com/qubole/spark/benchmark/streaming/states/BaseQuery.scala +++ b/src/main/scala/com/qubole/spark/benchmark/streaming/states/BaseQuery.scala @@ -14,6 +14,7 @@ abstract class BaseQuery(conf: StateStoreBenchmarkConf, appName: String, queryNa val queryStatusFile = conf.queryStatusFile() val rateRowPerSecond = conf.rateRowPerSecond() val runTimeInSec = conf.runTimeInSec() + val triggerIntervalMillis = conf.triggerIntervalMillis() val spark = SparkSession .builder() @@ -51,7 +52,7 @@ abstract class BaseQuery(conf: StateStoreBenchmarkConf, appName: String, queryNa } } ) - .trigger(Trigger.ProcessingTime("30 seconds")) + .trigger(Trigger.ProcessingTime(triggerIntervalMillis)) .outputMode(conf.getSparkOutputMode) .start() From fce83ca7cb6e2d16782bbe8ec361e8f74209bbb1 Mon Sep 17 00:00:00 2001 From: Abhishek Dixit Date: Wed, 25 Sep 2019 21:35:47 +0530 Subject: [PATCH 2/3] updating readme --- README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index b219e14..d7f24e7 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ I have created this project in similar lines for streaming performance scenarios --rate-row-per-second "20000" \ --output-mode "append" \ --run-time-in-sec 1800 \ + --trigger-interval 60000 \ --shuffle-partition 8 \ --use-rocks-db @@ -39,7 +40,8 @@ I have created this project in similar lines for streaming performance scenarios --rate-row-per-second "20000" \ --output-mode "append" \ --run-time-in-sec 1800 \ - --shuffle-partition 8 \ + --trigger-interval 60000 \ + --shuffle-partition 8 ###### Analyze the progress /usr/lib/spark/bin/spark-submit \ From 0359e9ab93f2073c2de277df56849ad377cf39bc Mon Sep 17 00:00:00 2001 From: Abhishek Date: Thu, 19 Nov 2020 19:27:59 +0530 Subject: [PATCH 3/3] dfd --- .../streaming/states/ConcurrentBitSet.scala | 84 ++++++ .../streaming/states/HeaderFileManager.scala | 63 ++++ .../benchmark/streaming/states/Offsets.scala | 25 ++ .../streaming/states/SparseFile.scala | 77 +++++ .../states/StateStoreBenchmarkRunner.scala | 12 - .../benchmark/streaming/states/Task.scala | 278 ++++++++++++++++++ .../benchmark/utils/SparseFileUtils.scala | 32 ++ 7 files changed, 559 insertions(+), 12 deletions(-) create mode 100644 src/main/scala/com/qubole/spark/benchmark/streaming/states/ConcurrentBitSet.scala create mode 100644 src/main/scala/com/qubole/spark/benchmark/streaming/states/HeaderFileManager.scala create mode 100644 src/main/scala/com/qubole/spark/benchmark/streaming/states/Offsets.scala create mode 100644 src/main/scala/com/qubole/spark/benchmark/streaming/states/SparseFile.scala create mode 100644 src/main/scala/com/qubole/spark/benchmark/streaming/states/Task.scala create mode 100644 src/main/scala/com/qubole/spark/benchmark/utils/SparseFileUtils.scala diff --git a/src/main/scala/com/qubole/spark/benchmark/streaming/states/ConcurrentBitSet.scala b/src/main/scala/com/qubole/spark/benchmark/streaming/states/ConcurrentBitSet.scala new file mode 100644 index 0000000..35c87ff --- /dev/null +++ b/src/main/scala/com/qubole/spark/benchmark/streaming/states/ConcurrentBitSet.scala @@ -0,0 +1,84 @@ +package com.qubole.spark.benchmark.streaming.states + +import java.util.concurrent.atomic.AtomicLongArray + + +object ConcurrentBitSet { + /** + * STATE + */ + val BASE = 64 + val MAX_UNSIGNED_LONG = -1L + + def mask(id: Int) = 1L << id +} + +class ConcurrentBitSet(val bitsCount: Long) { + + val bucketsCount: Int = bitsCount.toInt / ConcurrentBitSet.BASE + + val buckets = new AtomicLongArray(bucketsCount) + + var i = 0 + while (i < buckets.length()) { + buckets.set(i, 0) + i += 1 + } + + /** + * API to set Bits for a key + */ + def set(idx: Long): Unit = { + val bucketIdx = idx.toInt / ConcurrentBitSet.BASE + atomicSet(bucketIdx, idx.toInt - (ConcurrentBitSet.BASE * bucketIdx)) + } + + /** + * API to set Bits for a key + */ + def get(idx: Long): Boolean = { + val bucketIdx = idx.toInt / ConcurrentBitSet.BASE + atomicGet(bucketIdx, idx.toInt - (ConcurrentBitSet.BASE * bucketIdx)) + } + + def clear(): Unit = { + throw new RuntimeException("not implemented") + } + + def capacity: Long = this.buckets.length * 64 + + /** + * IMLEMENTATION + */ + private def atomicGet(bucketIdx: Int, toGet: Int): Boolean = { + val l = buckets.get(bucketIdx) + val idxMask = ConcurrentBitSet.mask(toGet) + (l & idxMask) == idxMask + } + + private def atomicSet(bucketIdx: Int, toSet: Int): Unit = { + while ( { + true + }) { + val l = buckets.get(bucketIdx) + if (buckets.compareAndSet(bucketIdx, l, l | ConcurrentBitSet.mask(toSet))) return + } + } + + def longToBinaryStr(num: Long): String = { + val stringBuilder = new StringBuilder + var i = 0 + while ( { + i < ConcurrentBitSet.BASE + }) { + val idxMask = ConcurrentBitSet.mask(i) + stringBuilder.append(if ((num & idxMask) == idxMask) "1" + else "0") + + { + i += 1; i - 1 + } + } + stringBuilder.toString + } +} \ No newline at end of file diff --git a/src/main/scala/com/qubole/spark/benchmark/streaming/states/HeaderFileManager.scala b/src/main/scala/com/qubole/spark/benchmark/streaming/states/HeaderFileManager.scala new file mode 100644 index 0000000..d1c1541 --- /dev/null +++ b/src/main/scala/com/qubole/spark/benchmark/streaming/states/HeaderFileManager.scala @@ -0,0 +1,63 @@ +package com.qubole.spark.benchmark.streaming.states + +import scala.collection.mutable.{HashMap, List} + +import java.io.{File, FileInputStream, FileOutputStream, ObjectInputStream, ObjectOutputStream} + + + +class HeaderFileManager(rootPath: String) { + + var headerMap: HashMap[Offsets, FileStatus] = _ + + val headerFilePath = rootPath + "/" + "_header" + + def openHeaderFile(): Unit = { + + val file = new File(headerFilePath) + val fis = new FileInputStream()(file) + val ois = new ObjectInputStream(fis) + headerMap = ois.readObject().asInstanceOf[HashMap[Offsets, FileStatus]] + } + + + + + def commitHeaderFile(): Unit = { + + Serialization.write + } + + // Get partial and Completely overlapping offsets + def getOverLappingOffsets(offsets: Offsets): OverlappingOffsets = { + val partialOverlaps = headerMap.keys.filter { otherOffsets => + offsets.isPartialOverlap(otherOffsets) + }.toList + val completeOverlaps = headerMap.keys.filter { otherOffsets => + offsets.isCompleteOverlap(otherOffsets) + }.toList + OverlappingOffsets(partialOverlaps, completeOverlaps) + } + + def read(other: Offsets): String = { + val reader = new StringBuilder() + + val effectiveOffsets = headerMap.keys.filterNot(offset => other.isNonOverlap(offset)) + val sortedOffsets = effectiveOffsets.toList.sortWith(_.startOffset < _.startOffset) + + val offsetList = mutable.List[Offsets]() + + + + } + + + def addMapEntry(offsets: Offsets, fileStatus: FileStatus): Unit = { + headerMap += (offsets -> fileStatus) + } + + def removeCompleteOverlaps(offsets: List[Offsets]): Unit = { + headerMap.retain((other, _) => !offsets.contains(other)) + } + +} diff --git a/src/main/scala/com/qubole/spark/benchmark/streaming/states/Offsets.scala b/src/main/scala/com/qubole/spark/benchmark/streaming/states/Offsets.scala new file mode 100644 index 0000000..4c91543 --- /dev/null +++ b/src/main/scala/com/qubole/spark/benchmark/streaming/states/Offsets.scala @@ -0,0 +1,25 @@ +package com.qubole.spark.benchmark.streaming.states + +case class Offsets(startOffset: Long, + endOffset: Long, + isZeroOffset: Boolean = false) { + + def isPartialOverlap(other: Offsets): Boolean = { + !isCompleteOverlap(other) && !isNonOverlap(other) + } + + def isCompleteOverlap(other: Offsets): Boolean = { + startOffset <= other.startOffset && endOffset >= other.endOffset + } + + def isNonOverlap(other: Offsets): Boolean = { + endOffset < other.startOffset || startOffset > other.endOffset + } + + +} + +case class FileStatus(path: String) + +case class OverlappingOffsets(partialOverlaps: List[Offsets], + completeOverlaps: List[Offsets]) \ No newline at end of file diff --git a/src/main/scala/com/qubole/spark/benchmark/streaming/states/SparseFile.scala b/src/main/scala/com/qubole/spark/benchmark/streaming/states/SparseFile.scala new file mode 100644 index 0000000..80efc03 --- /dev/null +++ b/src/main/scala/com/qubole/spark/benchmark/streaming/states/SparseFile.scala @@ -0,0 +1,77 @@ +package com.qubole.spark.benchmark.streaming.states + +class SparseFile(rootPath: String) { + + import com.qubole.spark.benchmark.utils.SparseFileUtils._ + + val headerFileManager = new HeaderFileManager(rootPath) + var fileSize: Long = _ + + // Write the bytes in at bytes from the start + // of the file. + // ask if overwrite needs to be done + def writeBytes(offset: Long, data: String): Unit = { + val offsets = computeOffsets(offset, data) + val overlappingOffsets = headerFileManager.getOverLappingOffsets(offsets) + if (overlappingOffsets.partialOverlaps.nonEmpty) { + throwUnsupportedOperationException("Partial Overwrite") + } + performWrite(offsets, data, overlappingOffsets.completeOverlaps) + } + + private def performWrite(offsets: Offsets, data: String, completeOverlaps: List[Offsets]): Unit = + synchronized { + val fileName = writeToFile(data, rootPath) + if (fileName.isDefined) { + headerFileManager.addMapEntry(offsets, FileStatus(fileName.get)) + headerFileManager.removeCompleteOverlaps(completeOverlaps) + headerFileManager.commitHeaderFile() + updateSize(data) + } + } + + private def updateSize(str: String): Unit = { + fileSize += str.getBytes.length + } + + private def computeOffsets(offset: Long, data: String): Offsets = { + Offsets(offset, offset + data.size) + } + + // Read bytes starting from the offset upto the given length. + def readBytes(offset: Long, size: Int) : String = { + val offsets = Offsets(offsets, offset + size) + headerFileManager.readFiles(offsets) + } + + // Set the size of the file. This can be called at any point + // in time. Unwritten bytes must be assumed to be zero. If + // more bytes have already been written, it should be + // considered gone. + // size in bytes + def setSize(size: Long): Unit = { + + } + + def getSize(): Long = { + fileSize + } + +} + +object SparseFile { + + def main(args: Array[String]): Unit = { + + val sparseFile = createSparseFile() + + } + + def createSparseFile(): SparseFile = { + + println("Enter Root Path") + val rootPath = scala.io.StdIn.readLine() + new SparseFile(rootPath) + } + +} \ No newline at end of file diff --git a/src/main/scala/com/qubole/spark/benchmark/streaming/states/StateStoreBenchmarkRunner.scala b/src/main/scala/com/qubole/spark/benchmark/streaming/states/StateStoreBenchmarkRunner.scala index a488c83..ed6c9af 100644 --- a/src/main/scala/com/qubole/spark/benchmark/streaming/states/StateStoreBenchmarkRunner.scala +++ b/src/main/scala/com/qubole/spark/benchmark/streaming/states/StateStoreBenchmarkRunner.scala @@ -1,14 +1,2 @@ package com.qubole.spark.benchmark.streaming.states -object StateStoreBenchmarkRunner { - def main(args: Array[String]): Unit = { - val conf = new StateStoreBenchmarkConf(args) - val benchmarkInstance = conf.initializeBenchmarkClass - - println(s"Benchmark class: ${benchmarkInstance.getClass.getCanonicalName}") - println(s"Output Mode: ${conf.getSparkOutputMode}") - - benchmarkInstance.runBenchmark() - } - -} diff --git a/src/main/scala/com/qubole/spark/benchmark/streaming/states/Task.scala b/src/main/scala/com/qubole/spark/benchmark/streaming/states/Task.scala new file mode 100644 index 0000000..18b8a49 --- /dev/null +++ b/src/main/scala/com/qubole/spark/benchmark/streaming/states/Task.scala @@ -0,0 +1,278 @@ +package com.qubole.spark.benchmark.streaming.states + +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.Duration + +/** + * A class which implements Scala Futures lazily. + * + * @param run function which takes Execution Context as argument and + * returns typed object A after execution + * @tparam A + */ +class Task[A](val run: ExecutionContext ⇒ A) { + + /** + * API to chain a Task using Map + * + * @param f function + * @return new Task + */ + def map[B](f: A ⇒ B): Task[B] = { + new Task[B](executionContext => { + f(run(executionContext)) + }) + } + + /** + * API to chain a Task using flatMap + * + * @param f function + * @return new Task + */ + def flatMap[B](f: A ⇒ Task[B]): Task[B] = { + new Task[B](executionContext => { + f(run(executionContext)).run(executionContext) + }) + } + + /** + * API to convert a Task to Future + * + * @param ec Execution Context to be used by future + * @return Future + */ + def toFuture(implicit ec: ExecutionContext): Future[A] = { + Future { + run(ec) + } + } + + /** + * Helper method to return result of a Task + * + * @param ec Execution Context to be used by task + * @return Object of the + */ + def runBlocking(implicit ec: ExecutionContext): A = Await.result(toFuture, Duration.Inf) +} + + +object Task { + + def apply[A](run: ExecutionContext ⇒ A): Task[A] = new Task(run) + + def apply[A](run: ⇒ A): Task[A] = new Task(_ ⇒ run) + + /** + * API to create a Task from a Future + * + * @param f method which results in a future when invoked + * @return new Task which executes future lazily + */ + def fromFuture[A](f: ⇒ Future[A]): Task[A] = { + Task(_ => { + Await.result(f, Duration.Inf) + }) + } + + /** + * API to create a Task which runs a set of Tasks sequentially + * + * @param taskA first Task + * @param taskB second Task + * @return new Task which gives a tuple containing results of taskA + * and taskB + */ + def zipParallel[A, B](taskA: Task[A], taskB: Task[B]): Task[(A, B)] = { + new Task[(A, B)](implicit ec => { + + val futureA = taskA.toFuture + val futureB = taskB.toFuture + + val res = for { + x <- futureA + y <- futureB + } yield (x, y) + + Await.result(res, Duration.Inf) + + }) + } + + /** + * API to create a Task which runs a set of Tasks sequentially + * + * @param tasks sequence of Tasks + * @return new Task which returns a sequence containing results of + * all tasks passed in input sequence + */ + def sequence[A](tasks: Seq[Task[A]]): Task[Seq[A]] = { + new Task[Seq[A]](implicit ec => { + tasks.map(_.run(ec)) + }) + } + + /** + * API to create a Task which runs a set of Tasks parallelly + * + * @param tasks sequence of Tasks + * @return new Task which returns a sequence containing results of + * all tasks passed in input sequence + */ + def parallel[A](tasks: Seq[Task[A]]): Task[Seq[A]] = { + new Task[Seq[A]](implicit ec => { + + val futureResults = tasks.map(task => + task.toFuture + ) + Await.result(Future.sequence(futureResults), Duration.Inf) + + }) + } + + /** + * Method to Test Creating Simple IO Tasks + */ + def simpleIOTask: Task[Unit] = for { + _ ← Task(println("First Name?")) + fn ← Task(scala.io.StdIn.readLine()) + _ ← Task(println("Last Name?")) + ln ← Task(scala.io.StdIn.readLine()) + _ ← Task(println(s"Hello $fn $ln")) + } yield () + + /** + * Method to Test API to chain Tasks using map + */ + def chainMapCalls(num: Int = 100): Task[Int] = { + var t: Task[Int] = Task(1) + var counter = 0 + while (counter < num) { + t = t.map(_ + 1) + counter += 1 + } + t + } + + /** + * Method to Test API to chain Tasks using flatMap + */ + def chainFlatMapCalls(num: Int = 100): Task[Int] = { + var t: Task[Int] = Task(1) + var counter = 0 + while (counter < num) { + t = t.flatMap(x ⇒ Task(x + 1)) + counter += 1 + } + t + } + + /** + * Method to Test API to create Task from Future + */ + def testFromFuture: Task[Unit] = { + import scala.concurrent.ExecutionContext.Implicits.global + Task.fromFuture({ + Future { + Thread.sleep(1000) + print("Task created from Future") + } + }) + } + + /** + * Method to Test Zipped Parallel Runs of Two Tasks + */ + def testZipParallelRuns: Task[(Unit, Int)] = { + println("Testing Zip Parallel Runs") + val taskA: Task[Unit] = Task(_ => { + runTask("A", 1000, Unit) + }) + val taskB: Task[Int] = Task(_ => { + runTask("B", 500, 2) + }) + Task.zipParallel(taskA, taskB) + } + + /** + * Helper method to run a Task + * + * @param taskName identifier of Task + * @param sleepTime time for which task waits or sleeps + * @param returnValue value returned by task + * @return A generic typed object returned by task + */ + def runTask[A](taskName: String, sleepTime: Long, returnValue: A): A = { + val startTime = System.currentTimeMillis() + Thread.sleep(sleepTime) + val endTime = System.currentTimeMillis() + println(s"Task $taskName started at: $startTime and took ${endTime - startTime} ms") + returnValue + } + + /** + * Method to Test Sequential Runs of Tasks + */ + def testSeqRuns: Task[Seq[Unit]] = { + println("Testing Seq Runs") + val seq: Array[Task[Unit]] = Array( + Task(_ => { + runTask("A", 1000, Unit) + }), + Task(_ => { + runTask("B", 200, Unit) + }), + Task(_ => { + runTask("C", 500, Unit) + }) + ) + Task.sequence(seq) + } + + /** + * Method to Test Parallel Runs of Tasks + */ + def testParallelRuns: Task[Seq[Unit]] = { + println("Testing Parallel Runs") + val seq: Array[Task[Unit]] = Array( + Task(_ => { + runTask("A", 1000, Unit) + }), + Task(_ => { + runTask("B", 200, Unit) + }), + Task(_ => { + runTask("C", 500, Unit) + }) + ) + Task.parallel(seq) + } + + def main(args: Array[String]): Unit = { + import scala.concurrent.ExecutionContext.Implicits.global + // test 1 + //simpleIOTask.runBlocking + + // test 2 + testFromFuture.runBlocking + + // test 3 + chainMapCalls().runBlocking + + // test 4 + chainFlatMapCalls().runBlocking + + // test 5 + testZipParallelRuns.runBlocking + + // test 6 + testSeqRuns.runBlocking + + // test 7 + testParallelRuns.runBlocking + + } + + +} \ No newline at end of file diff --git a/src/main/scala/com/qubole/spark/benchmark/utils/SparseFileUtils.scala b/src/main/scala/com/qubole/spark/benchmark/utils/SparseFileUtils.scala new file mode 100644 index 0000000..4f8224a --- /dev/null +++ b/src/main/scala/com/qubole/spark/benchmark/utils/SparseFileUtils.scala @@ -0,0 +1,32 @@ +package com.qubole.spark.benchmark.utils + +import java.io.{BufferedWriter, File, FileWriter} + +import scala.util.Random + +object SparseFileUtils { + + def writeToFile(data: String, rootPath: String): Option[String] = { + try { + val randomFileName = generateRandomFileName() + val fileName = new File(rootPath +"/" + randomFileName); + val bw = new BufferedWriter(new FileWriter(fileName)) + bw.write(data) + bw.close() + Some(randomFileName) + } catch { + case e: Exception => + println("Error Occurred while writing file. Cause:\n" + e.getMessage) + None + } + } + + private def generateRandomFileName(): String = { + Random.alphanumeric.take(6).mkString("") + } + + def throwUnsupportedOperationException(exception: String): Unit = { + throw new Exception(exception + "is not supported") + } + +}