Skip to content

Commit

Permalink
#1213 added example of rowbuilder interface (#1214)
Browse files Browse the repository at this point in the history
* #1213 added example of rowbuilder interface

* #1213 Added extra functionality to row builder

* #1213 resolved issue with fake table

* #1213 resolved issue with fake table

* #1213 resolved issue with InstrumentsProviderTest failing due to interface change.

* #1213 Updated EasyCLA

---------

Co-authored-by: chris <[email protected]>
  • Loading branch information
chrisjstevo and chris authored Dec 13, 2024
1 parent 4236412 commit 508676f
Show file tree
Hide file tree
Showing 16 changed files with 298 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.thread.RunOnceLifeCycleRunner
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.module.basket.BasketConstants
import org.finos.vuu.core.module.basket.BasketModule.BasketConstituentColumnNames.{BasketId, Change, Description, LastTrade, Ric, RicBasketId, Side, Volume, Weighting}
import org.finos.vuu.core.module.basket.csv.BasketLoader
import org.finos.vuu.core.table.{DataTable, RowWithData}
import org.finos.vuu.provider.DefaultProvider
Expand All @@ -13,6 +14,16 @@ class BasketConstituentProvider(val table: DataTable)(implicit lifecycle: Lifecy
private val runner = new RunOnceLifeCycleRunner("BasketConstituentProvider", runOnce)
private val basketLoader = new BasketLoader()

private val ricCol = table.getTableDef.columnForName(Ric)
private val basketIdCol = table.getTableDef.columnForName(BasketId)
private val ricBasketIdCol = table.getTableDef.columnForName(RicBasketId)
private val lastTradeCol = table.getTableDef.columnForName(LastTrade)
private val changeCol = table.getTableDef.columnForName(Change)
private val weightingCol = table.getTableDef.columnForName(Weighting)
private val volumeCol = table.getTableDef.columnForName(Volume)
private val sideCol = table.getTableDef.columnForName(Side)
private val descCol = table.getTableDef.columnForName(Description)

lifecycle(this).dependsOn(runner)

import org.finos.vuu.core.module.basket.BasketModule.BasketConstituentColumnNames._
Expand All @@ -23,10 +34,13 @@ class BasketConstituentProvider(val table: DataTable)(implicit lifecycle: Lifecy
}

def updateBasketConstituents(basketId: String): Unit = {

val list = basketLoader.loadConstituents(basketId)

list.foreach(row => {

if (row.nonEmpty) {

val symbol = row("Symbol").asInstanceOf[String]
val name = row("Name")
val lastTrade = row("Last Trade")
Expand All @@ -35,17 +49,20 @@ class BasketConstituentProvider(val table: DataTable)(implicit lifecycle: Lifecy
val weighting = row("Weighting")
val side = BasketConstants.Side.Buy
val ricBasketId = symbol + "." + basketId
table.processUpdate(ricBasketId, RowWithData(ricBasketId, Map(
Ric -> symbol,
BasketId -> basketId,
RicBasketId -> ricBasketId,
LastTrade -> lastTrade,
Change -> change,
Weighting -> weighting,
Volume -> volume,
Description -> name,
Side -> side
)), clock.now())

val rowData = table.newRow(ricBasketId)
.setString(ricCol, symbol)
.setString(basketIdCol, basketId)
.setString(ricBasketIdCol, ricBasketId)
.setString(lastTradeCol, Option(lastTrade).getOrElse("").toString)
.setString(changeCol, Option(change).getOrElse("").toString)
.setDouble(weightingCol, weighting.asInstanceOf[Double])
.setString(volumeCol, Option(volume).getOrElse("").toString)
.setString(descCol, name.toString)
.setString(sideCol, side)
.asRow

table.processUpdate(rowData, clock.now())
}
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,24 @@ class BasketProvider(val table: DataTable)(implicit lifecycle: LifecycleContaine
private val runner = new RunOnceLifeCycleRunner("BasketProvider", runOnce)
private val basketLoader = new BasketLoader()

private val idColumn = table.columnForName(Id)
private val nameColumn = table.columnForName(Name)

lifecycle(this).dependsOn(runner)
def runOnce(): Unit = {
val data = basketLoader.loadBasketIds()

//reuse of the builder...
val builder = table.rowBuilder

data.foreach(id => {
table.processUpdate(id, RowWithData(id, Map(
Id -> id,
Name -> id
)), clock.now())
table.processUpdate(id,
builder.setKey(id)
.setString(idColumn, id)
.setString(nameColumn, id)
//as row clears out the data from the builder
.asRow,
clock.now())
})
}
override val lifecycleId: String = "org.finos.vuu.core.module.basket.provider.BasketProvider"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package org.finos.vuu.example.rest.provider

import org.finos.toolbox.json.JsonUtil
import org.finos.vuu.core.table.{Columns, DataTable, RowWithData}
import org.finos.vuu.core.table.{Columns, DataTable, RowData, RowWithData}
import org.finos.toolbox.time.{Clock, TestFriendlyClock}
import org.finos.vuu.api.TableDef
import org.finos.vuu.core.module.ModuleFactory.stringToString
Expand Down Expand Up @@ -29,7 +29,7 @@ class InstrumentsProviderTest extends AnyFeatureSpec with Matchers with MockFact

getInstrumentsProvider(mockBackend).doStart()

(mockTable.processUpdate _).verify(expectedRow.get(KEY_FIELD).toString, expectedRow, *).once
(mockTable.processUpdate(_: String, _: RowData, _ : Long) ).verify(expectedRow.get(KEY_FIELD).toString, expectedRow, *).once
}

Scenario("can correctly make an external call, parse response and update the table WHEN server responds with multiple instruments") {
Expand All @@ -41,24 +41,25 @@ class InstrumentsProviderTest extends AnyFeatureSpec with Matchers with MockFact

getInstrumentsProvider(mockBackend).doStart()

expectedRows.foreach(row => (mockTable.processUpdate _).verify(row.get(KEY_FIELD).toString, row, *).once)
expectedRows.foreach(row => (mockTable.processUpdate(_: String, _: RowData, _ : Long) ).verify(row.get(KEY_FIELD).toString, row, *).once)
}


Scenario("skips updating table when response is not parsable") {
val mockClientResponse = "Some body"
val mockBackend = SyncBackendStub.whenAnyRequest.thenRespond(mockClientResponse)

getInstrumentsProvider(mockBackend).doStart()

(mockTable.processUpdate _).verify(*, *, *).never
(mockTable.processUpdate(_: String, _: RowData, _ : Long) ).verify(*, *, *).never
}

Scenario("skips updating table when response errors") {
val mockBackend = SyncBackendStub.whenAnyRequest.thenRespond(throw new Exception("Some error"))

getInstrumentsProvider(mockBackend).doStart()

(mockTable.processUpdate _).verify(*, *, *).never
(mockTable.processUpdate(_: String, _: RowData, _ : Long) ).verify(*, *, *).never
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.time.Clock
import org.finos.vuu.api.SessionTableDef
import org.finos.vuu.core.table.{ColumnValueProvider, InMemSessionDataTable, RowWithData, TableData, TablePrimaryKeys}
import org.finos.vuu.core.table.{ColumnValueProvider, InMemSessionDataTable, RowData, RowWithData, TableData, TablePrimaryKeys}
import org.finos.vuu.net.ClientSessionId
import org.finos.vuu.provider.{JoinTableProvider, VirtualizedProvider}

Expand All @@ -17,7 +17,7 @@ class VirtualizedSessionTable(clientSessionId: ClientSessionId,
@volatile private var dataSetSize: Int = 0
@volatile private var range = VirtualizedRange(0, 0)

override def toString: String = s"VirtualizedSessionTable(tableDef=${sessionTableDef.name}, name=${name})"
override def toString: String = s"VirtualizedSessionTable(tableDef=${sessionTableDef.name}, name=$name)"

override def primaryKeys: TablePrimaryKeys = super.primaryKeys

Expand Down Expand Up @@ -55,7 +55,7 @@ class VirtualizedSessionTable(clientSessionId: ClientSessionId,
logger.error("Trying to set range on non-virtualized data, something has gone bad.")
}
}
override def processUpdate(rowKey: String, rowData: RowWithData, timeStamp: Long): Unit = super.processUpdate(rowKey, rowData, timeStamp)
override def processUpdate(rowKey: String, rowData: RowData, timeStamp: Long): Unit = super.processUpdate(rowKey, rowData, timeStamp)

override def processDelete(rowKey: String): Unit = super.processDelete(rowKey)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class VirtualizedSessionTableData(cacheSize: Int)(implicit clock: Clock) extends
}
}

override def update(key: String, update: RowWithData): TableData = {
override def update(key: String, update: RowData): TableData = {
rowCache.put(key, update)
this
}
Expand Down
48 changes: 48 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/core/row/InMemMapRowBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package org.finos.vuu.core.row
import org.finos.vuu.core.table.{Column, RowData, RowWithData}

import scala.collection.mutable

class InMemMapRowBuilder extends RowBuilder {

private val mutableMap = new mutable.HashMap[String, Any]()
private var key: String = null
override def setLong(column: Column, v: Long): RowBuilder = {
mutableMap.put(column.name, v)
this
}

override def setDouble(column: Column, v: Double): RowBuilder = {
mutableMap.put(column.name, v)
this
}

override def setInt(column: Column, v: Int): RowBuilder = {
mutableMap.put(column.name, v)
this
}

override def setString(column: Column, v: String): RowBuilder = {
mutableMap.put(column.name, v)
this
}

override def setBoolean(column: Column, v: Boolean): RowBuilder = {
mutableMap.put(column.name, v)
this
}
override def setKey(key: String): RowBuilder = {
this.key = key
this
}
override def asRow: RowData = {
if(key == null){
throw new RuntimeException("Key has not been set, this is likely a coding error.")
}
val immMap = mutableMap.toMap
val rowData = RowWithData(key, immMap)
mutableMap.clear()
key = null
rowData
}
}
32 changes: 32 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/core/row/RowBuilder.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package org.finos.vuu.core.row

import org.finos.vuu.core.table.{Column, RowData}

trait RowBuilder {
def setKey(key: String): RowBuilder
def setLong(column: Column, v: Long): RowBuilder
def setDouble(column: Column, v: Double): RowBuilder
def setInt(column: Column, v: Int): RowBuilder
def setString(column: Column, v: String): RowBuilder
def setBoolean(column: Column, v: Boolean): RowBuilder
/**
* this metyhod effectively resets the builder, emptying its existing contents to begin again.
* @return row with data set
*/
def asRow: RowData
}

object NoRowBuilder extends RowBuilder{
override def setKey(key: String): RowBuilder = ???
override def setLong(column: Column, v: Long): RowBuilder = ???
override def setDouble(column: Column, v: Double): RowBuilder = ???
override def setInt(column: Column, v: Int): RowBuilder = ???
override def setString(column: Column, v: String): RowBuilder = ???
override def setBoolean(column: Column, v: Boolean): RowBuilder = ???
/**
* this metyhod effectively resets the builder, emptying its existing contents to begin again.
*
* @return row with data set
*/
override def asRow: RowData = ???
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.finos.vuu.api.TableDef
import org.finos.vuu.provider.JoinTableProvider
import io.vertx.core.impl.ConcurrentHashSet
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.vuu.core.row.RowBuilder

class AutoSubscribeTable(tableDef: TableDef, joinProvider: JoinTableProvider)(implicit override val metrics: MetricsProvider) extends InMemDataTable(tableDef, joinProvider) with StrictLogging {

Expand Down
24 changes: 18 additions & 6 deletions vuu/src/main/scala/org/finos/vuu/core/table/InMemDataTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.finos.vuu.viewport.{RowProcessor, RowSource, ViewPortColumns}
import org.finos.toolbox.collection.array.ImmutableArray
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.text.AsciiUtil
import org.finos.vuu.core.row.{InMemMapRowBuilder, RowBuilder}
import org.finos.vuu.feature.inmem.InMemTablePrimaryKeys

import java.util
Expand All @@ -22,6 +23,8 @@ trait DataTable extends KeyedObservable[RowKeyUpdate] with RowSource {

def updateCounter: Long

def newRow(key: String): RowBuilder
def rowBuilder: RowBuilder
def incrementUpdateCounter(): Unit

def indexForColumn(column: Column): Option[IndexedField[_]]
Expand All @@ -42,7 +45,11 @@ trait DataTable extends KeyedObservable[RowKeyUpdate] with RowSource {

def getTableDef: TableDef

def processUpdate(rowKey: String, rowUpdate: RowWithData, timeStamp: Long): Unit
def processUpdate(rowUpdate: RowData, timeStamp: Long): Unit = {
processUpdate(rowUpdate.key(), rowUpdate, timeStamp)
}

def processUpdate(rowKey: String, rowUpdate: RowData, timeStamp: Long): Unit

def hasRowChanged(row: RowWithData): Boolean = {
val existingRow = this.pullRow(row.key)
Expand Down Expand Up @@ -170,10 +177,10 @@ case class InMemDataTableData(data: ConcurrentHashMap[String, RowData], private

//protected def merge(update: RowUpdate, data: RowData): RowData = MergeFunctions.mergeLeftToRight(update, data)

protected def merge(update: RowWithData, data: RowWithData): RowWithData =
protected def merge(update: RowData, data: RowData): RowData =
MergeFunctions.mergeLeftToRight(update, data)

def update(key: String, update: RowWithData): TableData = {
def update(key: String, update: RowData): TableData = {

val table = data.synchronized {

Expand Down Expand Up @@ -220,6 +227,11 @@ class InMemDataTable(val tableDef: TableDef, val joinProvider: JoinTableProvider

private final val columnValueProvider = InMemColumnValueProvider(this)

override def newRow(key: String): RowBuilder = {
new InMemMapRowBuilder().setKey(key)
}
override def rowBuilder: RowBuilder = new InMemMapRowBuilder

private def buildIndexForColumn(c: Column): IndexedField[_] = {
c.dataType match {
case DataType.StringDataType =>
Expand Down Expand Up @@ -333,7 +345,7 @@ class InMemDataTable(val tableDef: TableDef, val joinProvider: JoinTableProvider
def columns(): Array[Column] = tableDef.columns
lazy val viewPortColumns: ViewPortColumns = ViewPortColumnCreator.create(this, tableDef.columns.map(_.name).toList)

private def updateIndices(rowkey: String, rowUpdate: RowWithData): Unit = {
private def updateIndices(rowkey: String, rowUpdate: RowData): Unit = {
this.indices.foreach(colTup => {
val column = colTup._1
val index = colTup._2
Expand Down Expand Up @@ -375,7 +387,7 @@ class InMemDataTable(val tableDef: TableDef, val joinProvider: JoinTableProvider
})
}

def update(rowkey: String, rowUpdate: RowWithData): Unit = {
def update(rowkey: String, rowUpdate: RowData): Unit = {
data = data.update(rowkey, rowUpdate)
updateIndices(rowkey, rowUpdate)
}
Expand Down Expand Up @@ -446,7 +458,7 @@ class InMemDataTable(val tableDef: TableDef, val joinProvider: JoinTableProvider
}
}

def processUpdate(rowKey: String, rowData: RowWithData, timeStamp: Long): Unit = {
def processUpdate(rowKey: String, rowData: RowData, timeStamp: Long): Unit = {

onUpdateMeter.mark()

Expand Down
10 changes: 7 additions & 3 deletions vuu/src/main/scala/org/finos/vuu/core/table/JoinTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import org.finos.vuu.provider.JoinTableProvider
import org.finos.vuu.viewport.{RowProcessor, ViewPortColumns}
import org.finos.toolbox.collection.array.{ImmutableArray, ImmutableArrays}
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.vuu.core.row.{NoRowBuilder, RowBuilder}
import org.finos.vuu.feature.inmem.InMemTablePrimaryKeys

import java.util
Expand Down Expand Up @@ -104,7 +105,7 @@ case class JoinDataTableData(tableDef: JoinTableDef, var keysByJoinIndex: Array[
map.toMap
}

def rowUpdateToArray(update: RowWithData): Array[Any] = {
def rowUpdateToArray(update: RowData): Array[Any] = {
//val data = columns.map(update.get(_))

var index = 0
Expand Down Expand Up @@ -189,7 +190,7 @@ case class JoinDataTableData(tableDef: JoinTableDef, var keysByJoinIndex: Array[
}
}

def processUpdate(rowKey: String, rowUpdate: RowWithData, joinTable: JoinTable, sourceTables: Map[String, DataTable]): JoinDataTableData = {
def processUpdate(rowKey: String, rowUpdate: RowData, joinTable: JoinTable, sourceTables: Map[String, DataTable]): JoinDataTableData = {

val updateByKeyIndex = rowUpdateToArray(rowUpdate)

Expand Down Expand Up @@ -347,7 +348,7 @@ class JoinTable(val tableDef: JoinTableDef, val sourceTables: Map[String, DataTa

override def incrementUpdateCounter(): Unit = updateCounterInternal +=1

override def processUpdate(rowKey: String, rowUpdate: RowWithData, timeStamp: Long): Unit = {
override def processUpdate(rowKey: String, rowUpdate: RowData, timeStamp: Long): Unit = {

onUpdateMeter.mark()

Expand Down Expand Up @@ -658,4 +659,7 @@ class JoinTable(val tableDef: JoinTableDef, val sourceTables: Map[String, DataTa
}

override def getColumnValueProvider: ColumnValueProvider = InMemColumnValueProvider(this)
override def newRow(key: String): RowBuilder = ???

override def rowBuilder: RowBuilder = NoRowBuilder
}
Loading

0 comments on commit 508676f

Please sign in to comment.