Discussion 2021/06/09
- Does the GPU library try its best or just aborts completely?
- (****) Parquet + HDFS = high value, if we can push down to the VE, so we completely avoid reading through the JVM on the data load stages; this would also include push-down during reading that could be highly performant.
- Most valuable now would be to do HashAggregate with partial_sum; next would be just a HashAggregate.
- If we find
VE + something + VE, we should considering squashing thatsomething, for example anExchange, if it is possible. In particular, if it's running on the same VH. "The VE Sandwich". -
(***) A real world case of reading from a row source eg a JDBCRelation; we would be Scanning from "something" that is row-based --> we would have to read all this stuff and then process it like we do now. See the stage
*(2) Scan JDBCRelation. Once we do this, we will support many more possibilities. - A second case would be to do a HashAggregate on non-partial sums as well; this would also need to be implemented somewhat differently from the partial hash aggregate.
- Translating the code-generated Java to C is not yet feasible because the former is represented in an Iterator based format, which is pull-based. This would not work with our C for vectorization purposes: we really need to pre-load at least some batches of data. Therefore, we should do it like we had been doing.
- Match only against Spark's Expressions that map to a VE function
-
(**) We should definitely find out the performance characteristics between compilation and interpretation against the VE, ie "should
SUM(a+b), AVG(x - y)" run as: "a+b, thenx - y, thenSUM, thenAVG, ...". My expectation is the compiled approach would be much faster, but it's important to test this out. - Be able to run VE code when wholestagecodegen IS enabled.
Discussion was based around this:
*(4) Sort [joined_timestamp#20L ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(joined_timestamp#20L ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [id=#106]
+- *(3) HashAggregate(keys=[userId#42, joined_timestamp#20L], functions=[sum(cast(totalPrice#44 as double))], output=[sum(CAST(totalPrice AS DOUBLE))#52, userId#42, joined_timestamp#20L])
+- Exchange hashpartitioning(userId#42, joined_timestamp#20L, 200), ENSURE_REQUIREMENTS, [id=#102]
+- *(2) HashAggregate(keys=[userId#42, joined_timestamp#20L], functions=[partial_sum(cast(totalPrice#44 as double))], output=[userId#42, joined_timestamp#20L, sum#58])
+- *(2) Project [JOINED_TIMESTAMP#20L, userId#42, totalPrice#44]
+- *(2) BroadcastHashJoin [id#17], [cast(userId#42 as int)], Inner, BuildRight, false
:- *(2) Scan JDBCRelation(Users) [numPartitions=1] [ID#17,JOINED_TIMESTAMP#20L] PushedFilters: [*IsNotNull(ID)], ReadSchema: struct<ID:int,JOINED_TIMESTAMP:bigint>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(cast(input[0, string, false] as int) as bigint)),false), [id=#96]
+- *(1) Filter isnotnull(userId#42)
+- FileScan csv [userId#42,totalPrice#44] Batched: false, DataFilters: [isnotnull(userId#42)], Format: CSV, Location: InMemoryFileIndex[file:/C:/Users/William/IdeaProjects/aurora4spark/aurora4spark-parent/aurora4spa..., PartitionFilters: [], PushedFilters: [IsNotNull(userId)], ReadSchema: struct<userId:string,totalPrice:string>
Outcomes:
- (**) Microbenchmark interpretation vs compilation
-
(***) Implement the
*(2)stage - specifically a JOIN, and microbenchmark that for this specific case against plain Spark. - (****) Can we read from HDFS in the VE and is there enough of a benefit here? Follow up ==> how easy is it to push down some filters here as well?; Follow up ==> how do we build object files in NCC so we link against them, so we don't have to compile everything from scratch eg an HDFS + Parquet libraries. What was interesting from frovedis was how many libraries can compile with ncc -- is it easy to do so here? The huge benefit is fewer data transfers, however we don't really know if this is really worth it.
I've spent quite some time to try to link the external libraries with ncc. While I was able to build that with ncc (so should work with nc++ too) by using Arrow-GLib. However, this only works when we specify the paths to the libraries manually. However, it seems that for running Aurora4j and PyVeo depend on nld to dynamically link the libraries.
The problem is that nld doesn't seem to be compatible with the libraries as it logs the following information:
nld: skipping incompatible /usr/lib64/libarrow-glib.so when searching for -larrow-glib
nld: skipping incompatible /usr/lib64/libarrow-glib.a when searching for -larrow-glib
Not really sure yet why this occurs.
@Wosin libarrow-glib.so would need to be compiled for aurora architecture as well
you probably won't be able to link x86 libs into an aurora project
Yeah, I was thinking if there is a way to link them statically though.
It seems that WholeStageCodegen is represented as a parent plan that keeps the original plan as child. So, basically if we have something like HashAggregate and that single operation gets transformed by whole stage codegen, then a new spark plan is created WholeStageCodeGenExec which will have the HashAggregate as a `child.
What is more, it seems from my experiments that the wholestage codegen is actually executed after our plugin not before, so I don't think we need to actually disable that at all.
More notes:
object VeoGenericPlanExtractor {
def matchPlan(sparkPlan: SparkPlan): Option[GenericSparkPlanDescription] = {
PartialFunction.condOpt(sparkPlan) {
case first @ HashAggregateExec(
requiredChildDistributionExpressions,
groupingExpressions,
exprs,
aggregateAttributes,
initialInputBufferOffset,
resultExpressions,
org.apache.spark.sql.execution.exchange
.ShuffleExchangeExec(
outputPartitioning,
f @ org.apache.spark.sql.execution.aggregate
.HashAggregateExec(
_requiredChildDistributionExpressions,
_groupingExpressions,
_aggregateExpressions,
_aggregateAttributes,
_initialInputBufferOffset,
_resultExpressions,
fourth @ sparkPlan
),
shuffleOrigin
)
) => {
val columnIndices = fourth.output.map(_.name).zipWithIndex.toMap
println(s"CID => ${columnIndices}")
println(s"First =$first")
val functions =
_aggregateExpressions.map(_.aggregateFunction.asInstanceOf[DeclarativeAggregate])
val resIds =
_aggregateExpressions.map(_.resultId)
val initExpr = functions.map(f => f.initialValues)
val aggregateBufferAttributes: Seq[AttributeReference] =
_aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes)
val aggResults =
bindReferences(functions.map(_.evaluateExpression), aggregateBufferAttributes)
val resultVars = bindReferences(resultExpressions, aggregateAttributes)
List(
"modes" -> _aggregateExpressions.map(_.mode).distinct,
"rcde" -> _requiredChildDistributionExpressions.toSeq.flatten,
"ge" -> _groupingExpressions,
"ae" -> _aggregateExpressions,
"aa" -> _aggregateAttributes,
"allAtts" -> f.allAttributes.attrs,
"re" -> _resultExpressions,
"fo" -> fourth.output,
"bufVars" -> initExpr,
"resIds" -> resIds,
"preAggResults" -> functions.map(_.evaluateExpression),
"aggResults" -> aggResults,
"resultVars" -> resultVars,
"aggregateBufferAttributes" -> aggregateBufferAttributes,
"out" -> f.output
).foreach { case (k, vs) =>
println(s"$k: ==>")
vs.foreach(println)
println(" ")
}
val columnMappings = exprs
.map(expression => (expression, extractAttributes(expression.references)))
.zipWithIndex
.map { case ((operation, attributes), id) =>
ColumnAggregationExpression(
attributes.map(attr => Column(columnIndices(attr.value), attr.value)),
operation,
id
)
}
First =HashAggregate(keys=[], functions=[sum((_1#37 + _2#38)), avg((_2#38 - _1#37)), sum(_3#39)], output=[sum((_1 + _2))#47, avg((_2 - _1))#48, sum(_3)#49])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#33]
+- HashAggregate(keys=[], functions=[partial_sum((_1#37 + _2#38)), partial_avg((_2#38 - _1#37)), partial_sum(_3#39)], output=[sum#64, sum#65, count#66L, sum#67])
+- LocalTableScan [_1#37, _2#38, _3#39]
modes: ==>
Partial
rcde: ==>
ge: ==>
ae: ==>
partial_sum((_1#37 + _2#38))
partial_avg((_2#38 - _1#37))
partial_sum(_3#39)
aa: ==>
sum#60
sum#61
count#62L
sum#63
allAtts: ==>
_1#37
_2#38
_3#39
sum#60
sum#61
count#62L
sum#63
sum#60
sum#61
count#62L
sum#63
sum#64
sum#65
count#66L
sum#67
re: ==>
sum#64
sum#65
count#66L
sum#67
fo: ==>
_1#37
_2#38
_3#39
bufVars: ==>
List(null)
List(0.0, 0)
List(null)
resIds: ==>
ExprId(44,d84a5437-c39a-4a61-80ea-245df8effc7c)
ExprId(45,d84a5437-c39a-4a61-80ea-245df8effc7c)
ExprId(46,d84a5437-c39a-4a61-80ea-245df8effc7c)
preAggResults: ==>
sum#60
(sum#61 / cast(count#62L as double))
sum#63
aggResults: ==>
input[0, double, true]
(input[1, double, true] / cast(input[2, bigint, true] as double))
input[3, double, true]
resultVars: ==>
input[0, double, true] AS sum((_1 + _2))#47
input[1, double, true] AS avg((_2 - _1))#48
input[2, double, true] AS sum(_3)#49
aggregateBufferAttributes: ==>
sum#60
sum#61
count#62L
sum#63
out: ==>
sum#64
sum#65
count#66L
sum#67
CID => Map(_1 -> 0, _2 -> 1, _3 -> 2)
First =HashAggregate(keys=[], functions=[sum((_1#37 + _2#38)), avg((_2#38 - _1#37)), sum(_3#39)], output=[sum((_1 + _2))#47, avg((_2 - _1))#48, sum(_3)#49])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#38]
+- HashAggregate(keys=[], functions=[partial_sum((_1#37 + _2#38)), partial_avg((_2#38 - _1#37)), partial_sum(_3#39)], output=[sum#64, sum#65, count#66L, sum#67])
+- LocalTableScan [_1#37, _2#38, _3#39]
modes: ==>
Partial
rcde: ==>
ge: ==>
ae: ==>
partial_sum((_1#37 + _2#38))
partial_avg((_2#38 - _1#37))
partial_sum(_3#39)
aa: ==>
sum#60
sum#61
count#62L
sum#63
allAtts: ==>
_1#37
_2#38
_3#39
sum#60
sum#61
count#62L
sum#63
sum#60
sum#61
count#62L
sum#63
sum#64
sum#65
count#66L
sum#67
re: ==>
sum#64
sum#65
count#66L
sum#67
fo: ==>
_1#37
_2#38
_3#39
bufVars: ==>
List(null)
List(0.0, 0)
List(null)
resIds: ==>
ExprId(44,d84a5437-c39a-4a61-80ea-245df8effc7c)
ExprId(45,d84a5437-c39a-4a61-80ea-245df8effc7c)
ExprId(46,d84a5437-c39a-4a61-80ea-245df8effc7c)
preAggResults: ==>
sum#60
(sum#61 / cast(count#62L as double))
sum#63
aggResults: ==>
input[0, double, true]
(input[1, double, true] / cast(input[2, bigint, true] as double))
input[3, double, true]
resultVars: ==>
input[0, double, true] AS sum((_1 + _2))#47
input[1, double, true] AS avg((_2 - _1))#48
input[2, double, true] AS sum(_3)#49
aggregateBufferAttributes: ==>
sum#60
sum#61
count#62L
sum#63
out: ==>
sum#64
sum#65
count#66L
sum#67