The implementation of the six basic operators in a volcano-style tuple-at-a-time has been done in the following way.
Scan uses a counter currentRow
to scan a tuple at a time until the total number of rows numberRows
(which we obtain with the method of scannable.getRowCount
) is achieved. We use the match
operator to verify if scannable
is defined and to cast it to RowStore
in order to be able to use the .getRow
method to obtain the rows.
This operator is implemented in the Filter
class.
We first try to fetch a Option[Tuple]
using input.next()
. Afterwards we check if the tuple is defined using the match
operator. If there is a value inside the Option
then we filter it using the predicate
function: if it returns true, then the Tuple
is returned to the next operator, otherwise we call recursively next()
until we either find a Tuple
returning true
, either we finish the table.
The way we retrieve and manage Option[Tuple]
is similar to before. In this case we apply projection using map
in this way nextTuple.map(evaluator)
.
The join has been implemented as Hash Join. We always choose the left input as build table and the right as probe table. The hash table is implemented with a HashMap[Tuple, Seq[Tuple]]
and it is populated and probed in the open()
:
- We use
left.iterator
to iterate over all the tuples from the left input - We use
leftKeys.map(el => tuple(el))
in order to extract the join-fields - We add the (key,value) pair to the HashMap. If the key is already in the map, then we add the tuple to the sequence of tuples.
- We iterate over all the tuples from the right input using
rightIterator.next()
. - We extract the key as before, and we try to find access the map with the key obtained. If the key exists in HashMap, therefore we have a match. We join the tuples and store them in
Seq[Tuple]
.
Each time next()
is called a tuple is taken from the head of the sequence and returned.
The operator is implemented using aggregationMap: Map[Tuple, Seq[Tuple]]
where the key consists on the attributes on which the aggregation is done, whereas the value is a Seq[Tuple]
which collects all the tuples with the respective key. The map is populated similarly to the join operator, however in this case we extract the key using groupSet.toArray.map(e => tuple(e))
. In a second moment the Map is transformed into a aggregationSeq: IndexedSeq[(Tuple, Seq[Tuple])]
and is then processed each time next()
is called:
- We take a key-value pair from the head of
aggregationSeq
- We combine the key with the result of mapping and reducing the
aggCalls
over the tuples associated to that keytoAggregate._1.++(aggCalls.map(agg => toAggregate._2.map(t => agg.getArgument(t)).reduce(agg.reduce)))
- We remove the key-value pair from the sequence.
In order to sort all the tuples we need at first to store all of them. Indeed, in the function open()
we:
- Store all the tuples from the input into
toSort: Seq[Tuple]
- Sort them using the function
sortByCollation
. The function is implemented using the variablefieldCollations
which we obtain from the field valuecollation
.fieldCollations
will give us indeed the following information for each field to sort:- the field index using the method
.getFieldIndex
- the direction of sorting using the method
.getDirection
- the field index using the method
- We drop the first tuples based on the
offset
value.
Afterwards in the function next()
we return a tuple at a time by paying attention to return a maximum of fetch
tuples. This is done by using the variable counter
.
- Drop ([ch.epfl.dias.cs422.rel.early.volcano.late.Drop]) is implemented by simply accessing the field
value
of theLateTuple
and by returning it. - Stitch ([ch.epfl.dias.cs422.rel.early.volcano.late.Stitch]) is implemented by using
hashMap: HashMap[Long, Seq[LateTuple]]
where the keys are thevids
and the values are the tuples associated. We start from the left input, and we build thehashMap
based on it. Afterwards, for each tuple in the right input we try to find a match inhashMap
. If this is the case we stitch the tuples and return the finalLateTuple
as aOption[LateTuple]
- Filter ([ch.epfl.dias.cs422.rel.early.volcano.late.LateFilter]) is implemented by adapting the previous version of the Filter by accessing the field
value
of theLateTuple
in this waypredicate(lateTuple.value)
. It preserves the previousvid
. - Join ([ch.epfl.dias.cs422.rel.early.volcano.late.LateJoin]) is also an adaptation of the previous version by accessing the field
value
. In addition, a new vid is generated for the resultingLateTuple
by using the counteri
Option(LateTuple(i,output))
. - Project ([ch.epfl.dias.cs422.rel.early.volcano.late.LateProject]) We first apply the projection on the
.value
field, and then we add again the.vid
value in the following wayOption(LateTuple(lateTuple.vid, Option(lateTuple.value).map(evaluator).get))
The implementation of the Fetch
is done in next()
:
- We obtain
Option[LateTuple]
frominput.next()
and store it innextTuple
. - If we obtain a
LateTuple
then we accesscolumn
at the positionnextTuple.vid
. - If we find a corresponding value in
column
we apply the projection ifprojects
is not empty. - At the end we return a
LateTuple
with the samevid
and with the projected tuple containing now also the value fromcolumn
.
- [ch.epfl.dias.cs422.rel.early.volcano.late.qo.LazyFetchRule] the implementation is done by accessing the field
rels
ofcall: RelOptRuleCall
.- We store the input of the stitch operator in
inputStitch = call.rels(1)
by accessing the fieldcall.rels(1)
at position1
. - We store the value
columnScan = call.rels(2).asInstanceOf[LateColumnScan]
. From this variable we will get the column input needed in the Fetch operator by accessing it this waycolumnScan.getColumn
. - We create the new Fetch logical operator by using the corresponding
create
methodLogicalFetch.create(inputStitch,columnScan.getRowType, columnScan.getColumn, None, classOf[LogicalFetch])
- We store the input of the stitch operator in
- [ch.epfl.dias.cs422.rel.early.volcano.late.qo.LazyFetchFilterRule] in this case we return a
LogicalFilter
operator which takes as input aLogicalFetch
operator. As before we access the logical operators by using thecall.rels
field. In this case we have to pay particular attention to the following things:- The filter condition has to be shifted based on the number of fields we add to the
LateTuple
because of the Fetch. This is done by using the functionRexUtil.shift(filter.getCondition,0,inputStitch.getRowType.getFieldCount)
- The input of the Filter operator is stored in
newFetch = LogicalFetch.create(inputStitch,columnScan.getRowType,columnScan.getColumn,None,classOf[LogicalFetch])
- The filter condition has to be shifted based on the number of fields we add to the
- [ch.epfl.dias.cs422.rel.early.volcano.late.qo.LazyFetchProjectRule] the procedure is again similar to the previous cases. However, in this case we leverage the field
projects
in theFetch
operator in order to apply the projection on theLateTuple
. It was indeed important to correctly implement theFetch
operator in order to correctly apply the projection if theprojects
field is given.
In all these operators a buffer has been implemented in order to store the vector of tuples which are taken from the input. Afterwards this buffer is processed accordingly to the operator. It was particularly important to use the .transpose
method in order to process the values as Tuple
In addition in all the operators, in order to divide the flag column containing boolean values from the others we use the following method and field:
.dropRight(1)
to drop the flag column and keep only the field columns.last
to access only the last column which is the flag column
We can observe how the queries are processed faster than the volcano case. This can be explained by the fact there are fewer function calls since we process vectors of tuples at a time.
- Filter ([ch.epfl.dias.cs422.rel.early.operatoratatime.Filter]): in this case the processing consists in only changing the extra flag column containing the
Boolean
values. Here we will change them based on the filter condition. It is important to keep to false the values which were already set to false even if the predicate returns true. That's why we use a&&
operator between the previous value and the one returned from thepredicate
.bufferValues :+ bufferValues.transpose.zipWithIndex.map{case (t,i) => bufferFlag(i).asInstanceOf[Boolean] && predicate(t)}
. After the flag column has been processed then it is again combined with the others. - Project ([ch.epfl.dias.cs422.rel.early.operatoratatime.Project]): in this case we don't change the flag column. We only map over the transposed field columns in order to apply projection.
- Join ([ch.epfl.dias.cs422.rel.early.operatoratatime.Join]): we have 2 buffers for respectively the left and right inputs. Differently from the volcano case, we have both inputs in the beginning, therefore we can build the hash table on the smaller one. Since we have a vector of tuples we can group them using the
.groupBy
method without creating the groups manually as done in the volcano case. The matching is done using the.flatMap
method. Only the active tuples are processed in this operator: in order to filter the non-actvive we use the.filter
method as following.filter(row => row.last.asInstanceOf[Boolean])
- Sort ([ch.epfl.dias.cs422.rel.early.operatoratatime.Sort]): in this operator only active tuples are processed and returned. The sorting is done similarly to the volcano operator. In this case we use the
.slice
operator to remove not desired tuples according to theoffset
andfetch
values. - Aggregate ([ch.epfl.dias.cs422.rel.early.operatoratatime.Aggregate]): also in this operator only active tuples are processed and returned. Since we have a vector of tuples we can group them using the
.groupBy
method without creating the groups manually as done in the volcano case. Again we applymap
andreduce
on the groupedTuple
based onaggCall
.
In this case we can see how in general queries are processed faster than before. This again can be explained by the fewer number of function call due to the fact we process the whole column at a time.
-
Filter ([ch.epfl.dias.cs422.rel.early.columnatatime.Filter]) differently from the operator-at-a-time case we process directly the columns without using the
.transpose
method. Indeed we:unwrap[Boolean](executed.last)
theHomogeneousColumn
in order to transform it into aArray[Boolean]
- Apply the predicate on the other columns and obtain a
predicateArray : Array[Boolean]
- We zip them together, and we apply a
&&
between them for the same reason as in the operator-at-a-time case. We then transform the result intoHomogeneousColumn
- Finally, we add the new flag column to the previous ones.
As before, in the Filter operator we only change the values of the flag column.
-
Project ([ch.epfl.dias.cs422.rel.early.columnatatime.Project]) also in the project case we directly process the columns without using
.transpose
. -
Join ([ch.epfl.dias.cs422.rel.early.columnatatime.Join]) in this case we have to change the column representation to the tuple one using
.transpose
. After processing the tuples as in the operator-at-a-time case we re-transpose them to column. Finally, we transform all columns toHomogeneousColumn
using the methodtoHomogeneousColumn()
. Again only active tuples are processed. -
Sort ([ch.epfl.dias.cs422.rel.early.columnatatime.Sort]) the implementation is similar to the previous case. Again we have to transform all columns to
HomogeneousColumn
using the methodtoHomogeneousColumn()
. Again only active tuples are processed. -
Aggregate ([ch.epfl.dias.cs422.rel.early.columnatatime.Aggregate]) the implementation is similar to the previous case. Again we have to transform all columns to
HomogeneousColumn
using the methodtoHomogeneousColumn()
. Again only active tuples are processed.