Skip to content

Commit

Permalink
[SNAP-1743] Compress column batches when storing to disk or sending o…
Browse files Browse the repository at this point in the history
…ver network (#905)

Changes to ColumnFormatValue serialization/deserialization to deal with compression
transparently when storing to disk or sending over network.

## Changes proposed in this pull request

- added methods to get compressed/uncompressed buffers from column value
- compress buffer if possible when sending to remote node or storing to Oplog
- switch between compressed/uncompressed buffer in the stored value as below:
  - keep initial form as far as possible
  - if an uncompressed call is seen, then uncompress (if compressed) and store
  - if a compressed call is seen consecutively more than 2 times, then compress and store;
    this is tracked using "compressionState" byte in ColumnFormatValue
- "compression" property is now only for the table and cannot be overridden by a session
  property
- removed eclipse plugin related blocks from build.gradle that do not work as required
- add a FreeMemory implemention for execution and  ExecutionFreeMemory to allow freeing
  from execution pool; use the same for allocating on-the-fly compression/decompression
  buffers on execution pool instead of storage pool (and move to storage pool if result is
  also replacing underlying buffer)
- add explicit accounting for heap buffers when compressed/decompressed
  ColumnFormatValue is transferred to storage
- decompress buffer on-the-fly in smart connector iterator, if required

* fixes, enhancements seen in manual testing

- added a more optimal null value scan
  - use direct BitSet reads instead of tracking nextPosition because former is faster
    despite more CPU instructions (likely due to zero writes avoiding updates to nextPosition)
  - use long reads (instead of byte) for BitSet checks for better cache line behaviour
  - added classes for boolean RLE but not being used for null values since its measured to
    give consistently 2X poor performance than bitSet scan on datacentre and is much more
    erratic. Cost is some increase in memory for very small or very large number of nulls.
- also removed "numNulls" calls from code and instead use only isNullAt which is enough
  (track total nulls seen so far in the generated code in a variable -- somehow having
   the variable as member works better than having as stack local variable)
- add some buffer rewinds that were missing
- other fixes seen in manual testing
- add ResultSetDecoder.isNullAt

* Address review comments

- move compression/decompression related utility methods to a separate
  CompressionUtils class from Utils
- split ColumnFormatValue.transformValue into compressValue and decompressValue methods
  • Loading branch information
Sumedh Wale authored Dec 15, 2017
1 parent 74ebb06 commit 8549df1
Show file tree
Hide file tree
Showing 44 changed files with 1,324 additions and 608 deletions.
8 changes: 0 additions & 8 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ allprojects {
apply plugin: 'maven'
apply plugin: 'scalaStyle'
apply plugin: 'idea'
apply plugin: 'eclipse'
apply plugin: "build-time-tracker"
apply plugin: "nebula.ospackage"
apply plugin: 'nebula.ospackage-base'
Expand Down Expand Up @@ -616,13 +615,6 @@ subprojects {
testOutput packageTests
}

eclipse {
classpath {
defaultOutputDir = file('build-artifacts/eclipse')
downloadSources = true
}
}

dependencies {
compile 'log4j:log4j:' + log4jVersion
compile 'org.slf4j:slf4j-api:' + slf4jVersion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.memory

import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.Consumer
import java.util.function.BiConsumer

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand Down Expand Up @@ -307,8 +307,8 @@ class SnappyUnifiedMemoryManager private[memory](
val mode = MemoryMode.OFF_HEAP
val totalSize = capacity + DirectBufferAllocator.DIRECT_OBJECT_OVERHEAD
val toOwner = DirectBufferAllocator.DIRECT_STORE_OBJECT_OWNER
val changeOwner = new Consumer[String] {
override def accept(fromOwner: String): Unit = {
val changeOwner = new BiConsumer[String, AnyRef] {
override def accept(fromOwner: String, runnable: AnyRef): Unit = {
if (fromOwner ne null) {
val memoryForObject = self.memoryForObject
// "from" was changed to "to"
Expand All @@ -328,6 +328,11 @@ class SnappyUnifiedMemoryManager private[memory](
throw DirectBufferAllocator.instance().lowMemoryException(
"changeToStorage", totalSize)
}
// release from execution pool if using execution allocator
runnable match {
case r: ExecutionFreeMemory => r.releaseExecutionMemory()
case _ =>
}
} else throw new IllegalStateException(
s"ByteBuffer Cleaner does not match expected source $fromOwner")
}
Expand All @@ -337,10 +342,10 @@ class SnappyUnifiedMemoryManager private[memory](
capacity, changeOwner)
}

def tryExplicitGC(): Unit = {
def tryExplicitGC(numBytes: Long): Unit = {
// check if explicit GC should be invoked
if (canUseExplicitGC) {
logStats("Invoking explicit GC before failing storage allocation request: ")
logStats(s"Explicit GC before failing storage allocation request of $numBytes bytes: ")
System.gc()
System.runFinalization()
logStats("Stats after explicit GC: ")
Expand Down Expand Up @@ -632,19 +637,19 @@ class SnappyUnifiedMemoryManager private[memory](
// for off-heap try harder before giving up since pending references
// may be on heap (due to unexpected exceptions) that will go away on GC
if (!couldEvictSomeData && offHeap) {
tryExplicitGC()
tryExplicitGC(numBytes)
couldEvictSomeData = storagePool.acquireMemory(blockId, numBytes)
}
if (!couldEvictSomeData) {
if (doEvict) {
wrapperStats.incNumFailedEvictionRequest(offHeap)
}
logWarning(s"Could not allocate memory for $blockId of " +
s"$objectName size=$numBytes. Memory pool size " + storagePool.memoryUsed)
s"$objectName size=$numBytes. Memory pool size ${storagePool.memoryUsed}")
} else {
memoryForObject.addTo(objectName -> memoryMode, numBytes)
logDebug(s"Allocated memory for $blockId of " +
s"$objectName size=$numBytes. Memory pool size " + storagePool.memoryUsed)
s"$objectName size=$numBytes. Memory pool size ${storagePool.memoryUsed}")
}
couldEvictSomeData
} else {
Expand Down
13 changes: 4 additions & 9 deletions cluster/src/test/scala/org/apache/spark/sql/IndexTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@
*/
package org.apache.spark.sql

import java.sql.{Connection, DriverManager}
import java.util.{Properties, TimeZone}
import java.util.TimeZone

import com.pivotal.gemfirexd.internal.engine.db.FabricDatabase
import io.snappydata.benchmark.TPCH_Queries
Expand All @@ -26,15 +25,11 @@ import io.snappydata.benchmark.snappy.{SnappyAdapter, TPCH}
import io.snappydata.{PlanTest, SnappyFunSuite}
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkConf
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Sort}
import org.apache.spark.sql.execution.streaming.{Offset, Sink, Source}
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider, StreamSourceProvider}
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.{BooleanType, IntegerType, StructField, StructType}
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Benchmark

class IndexTest extends SnappyFunSuite with PlanTest with BeforeAndAfterEach {
Expand Down Expand Up @@ -273,8 +268,8 @@ class IndexTest extends SnappyFunSuite with PlanTest with BeforeAndAfterEach {
// genPlan = genPlan)._1.foreach(_ => ())

var queryToBeExecuted = TPCH_Queries.getQuery(qNum, false, true)
def evalSnappyMods(genPlan: Boolean) = QueryExecutor.queryExecution(qNum, queryToBeExecuted, snc, false)
._1.foreach(_ => ())
def evalSnappyMods(genPlan: Boolean) = QueryExecutor.queryExecution(
qNum, queryToBeExecuted, snc, false)._1.foreach(_ => ())

def evalBaseTPCH = qryProvider.execute(query, executor)._1.foreach(_ => ())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ class BitSetTest extends SnappyFunSuite {
private def clear(bitset: Array[Long], index: Int): Unit =
BitSet.clear(bitset, baseAddress, index)

private def anySet(bitset: Array[Long], index: Int): Boolean = {
BitSet.anySet(bitset, baseAddress + ((index + 7) >> 3), ((bitsetSize << 3) - index) >> 3)
}
private def anySet(bitset: Array[Long], index: Int): Boolean =
BitSet.anySet(bitset, baseAddress + ((index + 7) >> 3),
(((bitsetSize << 6) - index) + 63) >> 6)

private def nextSetBit(bitset: Array[Long], index: Int): Int =
BitSet.nextSetBit(bitset, baseAddress, index, bitsetSize)
Expand All @@ -74,7 +74,7 @@ class BitSetTest extends SnappyFunSuite {
val maxSetBit = 96
val setBits = Seq(0, 9, 1, 10, 90, maxSetBit)
val bitset = new Array[Long](4)
bitsetSize = 13
bitsetSize = 2

for (i <- 0 until 100) {
assert(!get(bitset, i))
Expand All @@ -86,7 +86,7 @@ class BitSetTest extends SnappyFunSuite {
assert(get(bitset, i) === setBits.contains(i))
}
for (i <- 0 until 100) {
assert(anySet(bitset, i) === (i <= maxSetBit))
assert(anySet(bitset, i) === (i <= maxSetBit), "failed for " + i)
}

// clear the bits and check after each clear
Expand Down Expand Up @@ -115,7 +115,7 @@ class BitSetTest extends SnappyFunSuite {

test("100% full bit set then clear all") {
val bitset = new Array[Long](200)
bitsetSize = 1250
bitsetSize = 157

for (i <- 0 until 10000) {
assert(!get(bitset, i))
Expand All @@ -140,7 +140,7 @@ class BitSetTest extends SnappyFunSuite {
test("nextSetBit") {
val setBits = Seq(0, 9, 1, 10, 90, 96)
val bitset = new Array[Long](4)
bitsetSize = 13
bitsetSize = 2

setBits.foreach(i => set(bitset, i))

Expand All @@ -159,7 +159,7 @@ class BitSetTest extends SnappyFunSuite {
test("cardinality") {
val setBits = Seq(0, 9, 1, 10, 100, 90, 34, 108, 130, 127, 128, 96, 123, 180, 191)
val bitset = new Array[Long](3)
bitsetSize = 16
bitsetSize = 2

setBits.foreach(set(bitset, _))

Expand Down
2 changes: 2 additions & 0 deletions codeStyleSettings.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@
</codeStyleSettings>
<codeStyleSettings language="JAVA">
<option name="ALIGN_MULTILINE_PARAMETERS" value="false" />
<option name="SPACE_WITHIN_ARRAY_INITIALIZER_BRACES" value="true" />
<option name="SPACE_AFTER_TYPE_CAST" value="false" />
<option name="SPACE_BEFORE_ARRAY_INITIALIZER_LBRACE" value="true" />
<indentOptions>
<option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="4" />
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/io/snappydata/impl/SnappyHiveCatalog.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.pivotal.gemfirexd.internal.impl.jdbc.Util;
import com.pivotal.gemfirexd.internal.impl.sql.catalog.GfxdDataDictionary;
import com.pivotal.gemfirexd.internal.shared.common.reference.SQLState;
import io.snappydata.Constant;
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.Hive;
Expand Down Expand Up @@ -396,7 +397,7 @@ public Object call() throws Exception {
int columnMaxDeltaRows = Integer.parseInt(parameters.get(
ExternalStoreUtils.COLUMN_MAX_DELTA_ROWS()));
value = parameters.get(ExternalStoreUtils.COMPRESSION_CODEC());
String compressionCodec = value == null ? null : value.toString();
String compressionCodec = value == null ? Constant.DEFAULT_CODEC() : value.toString();
String tableType = ExternalTableType.getTableType(table);
return new ExternalTableMetaData(
fullyQualifiedName,
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/io/snappydata/Literals.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import com.gemstone.gemfire.internal.shared.SystemProperties

import org.apache.spark.sql.execution.columnar.ExternalStoreUtils
import org.apache.spark.sql.internal.{AltName, SQLAltName, SQLConfigEntry}
import org.apache.spark.sql.store.CompressionCodecId

/**
* Constant names suggested per naming convention
Expand Down Expand Up @@ -120,6 +121,9 @@ object Constant {
// But the JNI version means no warmup time which helps for short jobs.
val DEFAULT_CODEC = "lz4"

/** the [[CompressionCodecId]] of default compression scheme ([[DEFAULT_CODEC]]) */
val DEFAULT_CODECID: CompressionCodecId.Type = CompressionCodecId.fromName(DEFAULT_CODEC)

// System property to tell the system whether the String type columns
// should be considered as clob or not
val STRING_AS_CLOB_PROP = "spark-string-as-clob"
Expand Down Expand Up @@ -239,12 +243,6 @@ object Property extends Enumeration {
s"each table using the ${ExternalStoreUtils.COLUMN_MAX_DELTA_ROWS} option in " +
s"create table DDL else this setting is used for the create table.", Some(10000))

val CompressionCodec = SQLVal[String](s"${Constant.PROPERTY_PREFIX}compression.codec",
"The compression codec to use when creating column batches for binary and " +
"complex type columns. Possible values: none, snappy, gzip, lzo. It can " +
s"also be set as ${ExternalStoreUtils.COMPRESSION_CODEC} option in " +
s"create table DDL. Default is no compression.", Some("none"))

val HashJoinSize = SQLVal[Long](s"${Constant.PROPERTY_PREFIX}hashJoinSize",
"The join would be converted into a hash join if the table is of size less " +
"than hashJoinSize. Default value is 100 MB.", Some(100L * 1024 * 1024))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@
*/
package org.apache.spark.memory

import java.nio.ByteBuffer
import java.nio.{ByteBuffer, ByteOrder}

import com.gemstone.gemfire.SystemFailure
import com.gemstone.gemfire.internal.shared.BufferAllocator
import com.gemstone.gemfire.internal.shared.unsafe.{DirectBufferAllocator, UnsafeHolder}
import com.gemstone.gemfire.internal.snappy.UMMMemoryTracker
import com.gemstone.gemfire.internal.snappy.memory.MemoryManagerStats
import org.slf4j.LoggerFactory

import org.apache.spark.storage.{BlockId, TestBlockId}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.{Logging, SparkConf, SparkEnv, TaskContext}

/**
* Base trait for different memory manager used by SnappyData in different modes
Expand Down Expand Up @@ -224,4 +227,65 @@ object MemoryManagerCallback extends Logging {
bootMemoryManager
}
}

def allocateExecutionMemory(size: Int, owner: String,
allocator: BufferAllocator): ByteBuffer = {
if (allocator.isManagedDirect) {
val context = TaskContext.get()
if (context ne null) {
val memoryManager = context.taskMemoryManager()
val totalSize = UnsafeHolder.getAllocationSize(size) +
DirectBufferAllocator.DIRECT_OBJECT_OVERHEAD
val consumer = new DefaultMemoryConsumer(memoryManager, MemoryMode.OFF_HEAP)
if (consumer.acquireMemory(totalSize) < totalSize) {
consumer.freeMemory(consumer.getUsed)
throw DirectBufferAllocator.instance().lowMemoryException(owner, totalSize)
}
return allocator.allocateCustom(totalSize, new UnsafeHolder.FreeMemoryFactory {
override def newFreeMemory(address: Long, size: Int): ExecutionFreeMemory =
new ExecutionFreeMemory(consumer, address)
}).order(ByteOrder.LITTLE_ENDIAN)
}
}
allocator.allocate(size, owner).order(ByteOrder.LITTLE_ENDIAN)
}
}

final class DefaultMemoryConsumer(taskMemoryManager: TaskMemoryManager,
mode: MemoryMode = MemoryMode.ON_HEAP)
extends MemoryConsumer(taskMemoryManager, taskMemoryManager.pageSizeBytes(), mode) {

override def spill(size: Long, trigger: MemoryConsumer): Long = 0L

override def getUsed: Long = this.used
}

final class ExecutionFreeMemory(consumer: DefaultMemoryConsumer,
address: Long) extends UnsafeHolder.FreeMemory(address) {

override protected def objectName(): String = BufferAllocator.EXECUTION

override def run() {
val address = tryFree()
if (address != 0) {
UnsafeHolder.getUnsafe.freeMemory(address)
releaseExecutionMemory()
}
}

def releaseExecutionMemory(): Unit = {
try {
// release from execution pool
consumer.freeMemory(consumer.getUsed)
} catch {
case t: Throwable => // ignore exceptions
SystemFailure.checkFailure()
try {
val logger = LoggerFactory.getLogger(getClass)
logger.error("ExecutionFreeMemory unexpected exception", t)
} catch {
case _: Throwable => // ignore if even logging failed
}
}
}
}
17 changes: 7 additions & 10 deletions core/src/main/scala/org/apache/spark/sql/CachedDataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import com.pivotal.gemfirexd.internal.shared.common.reference.SQLState

import org.apache.spark._
import org.apache.spark.io.CompressionCodec
import org.apache.spark.memory.MemoryConsumer
import org.apache.spark.memory.DefaultMemoryConsumer
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen.CodeAndComment
Expand All @@ -47,6 +47,7 @@ import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.aggregate.CollectAggregateExec
import org.apache.spark.sql.execution.command.ExecutedCommandExec
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionEnd, SparkListenerSQLExecutionStart}
import org.apache.spark.sql.store.CompressionUtils
import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.{BlockManager, RDDBlockId, StorageLevel}
import org.apache.spark.unsafe.Platform
Expand Down Expand Up @@ -427,7 +428,7 @@ object CachedDataFrame
private def flushBufferOutput(bufferOutput: Output, position: Int,
output: ByteBufferDataOutput, codec: CompressionCodec): Unit = {
if (position > 0) {
val compressedBytes = Utils.codecCompress(codec,
val compressedBytes = CompressionUtils.codecCompress(codec,
bufferOutput.getBuffer, position)
val len = compressedBytes.length
// write the uncompressed length too
Expand Down Expand Up @@ -483,13 +484,9 @@ object CachedDataFrame
// We will ensure that sufficient memory is available by reserving
// four times as Kryo serialization will expand its buffer accordingly
// and transport layer can create another copy.
if (context != null) {
if (context ne null) {
// TODO why driver is calling this code with context null ?
val memoryConsumer = new MemoryConsumer(context.taskMemoryManager()) {
override def spill(size: Long, trigger: MemoryConsumer): Long = {
0L
}
}
val memoryConsumer = new DefaultMemoryConsumer(context.taskMemoryManager())
// TODO Remove the 4 times check once SNAP-1759 is fixed
val required = 4L * memSize
val granted = memoryConsumer.acquireMemory(4L * memSize)
Expand Down Expand Up @@ -636,7 +633,7 @@ object CachedDataFrame
var decompressedLen = input.readInt()
var inputLen = input.readInt()
val inputPosition = input.position()
val bufferInput = new Input(Utils.codecDecompress(codec, data,
val bufferInput = new Input(CompressionUtils.codecDecompress(codec, data,
inputPosition, inputLen, decompressedLen))
input.setPosition(inputPosition + inputLen)

Expand All @@ -659,7 +656,7 @@ object CachedDataFrame
decompressedLen = input.readInt()
inputLen = input.readInt()
val inputPosition = input.position()
bufferInput.setBuffer(Utils.codecDecompress(codec, data,
bufferInput.setBuffer(CompressionUtils.codecDecompress(codec, data,
inputPosition, inputLen, decompressedLen))
input.setPosition(inputPosition + inputLen)
bufferInput.readInt(true)
Expand Down
Loading

0 comments on commit 8549df1

Please sign in to comment.