-
Notifications
You must be signed in to change notification settings - Fork 115
[Gold Standard]: Initial code for spark only setup with a single query #384
Conversation
trait TPCDSBase extends SparkFunSuite with SparkInvolvedSuite { | ||
|
||
val conf = SQLConf.get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: superclasses changed for lack of spark 3.0 support
|
||
// The TPCDS queries below are based on v1.4. | ||
// TODO: Fix bulid pipeline for q49 and reenable q49. | ||
val tpcdsQueries = Seq("q1") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: this is a complete list of queries which will run as part of this test. Currently only q1 is selected. In subsequent prs, all queries will be enabled
// TODO: Fix bulid pipeline for q49 and reenable q49. | ||
val tpcdsQueries = Seq("q1") | ||
|
||
private val tableColumns = Map( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no change in tableColumns
|
||
val tableNames: Iterable[String] = tableColumns.keys | ||
|
||
def createTable( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no change in createTable
|
||
private val originalCBCEnabled = conf.cboEnabled | ||
private val originalJoinReorderEnabled = conf.joinReorderEnabled | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: removed val originalPlanStatsEnabled
from source. It is only required for stats based tests which are not yet supported
private val originalCBCEnabled = conf.cboEnabled | ||
private val originalJoinReorderEnabled = conf.joinReorderEnabled | ||
|
||
override def beforeAll(): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: simplified beforeAll
and afterAll
by removing some stats related code which we don't support yet.
* | ||
* To run the entire test suite: | ||
* {{{ | ||
* sbt "test:testOnly *PlanStabilitySuite" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: the run command is different from spark, because of the difference in spark vs hyperspace project structure. In spark, this test is within sql/
project so the command looks slightly different.
Same for others
*/ | ||
// scalastyle:on filelinelengthchecker | ||
|
||
trait PlanStabilitySuite extends TPCDSBase with Logging { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Superclasses changed because of lack of spark 3.0 support
src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala
Outdated
Show resolved
Hide resolved
|
||
override def afterAll(): Unit = { | ||
super.afterAll() | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: in beforeAll and afterAll, some spark conf values have been changed:
originalMaxToStringFields = conf.maxToStringFields
=> Conf not present in spark 2.4spark.sql.crossJoin.enabled
=> set to true because some queries fail during query optimization phase in case of cross joins
case subquery: SubqueryExec => | ||
subqueriesMap.getOrElseUpdate(subquery, subqueriesMap.size + 1) | ||
case _ => -1 | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: removed a couple of case
match statements because of lack of spark 3.0 support
case SubqueryBroadcastExec
case ReusedSubqueryExec
* "sum(sr_return_amt#14)", so we remove all of these using regex | ||
*/ | ||
def cleanUpReferences(references: AttributeSet): String = { | ||
referenceRegex.replaceAllIn(references.toSeq.map(_.name).sorted.mkString(","), "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Node: added sorting
of references for consistent behavior in local vs azure build pipeline setup
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally looking fine to me.
src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala
Outdated
Show resolved
Hide resolved
s"Location.*spark-warehouse/", | ||
"Location [not included in comparison]/{warehouse_dir}/") | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: normalization logic changed very slightly.
We depend on df.explain()
command for generating output.
Spark depends on QueryExecution.explainString()
which generates a different output.
(to compare: link)
classLoader = Thread.currentThread().getContextClassLoader) | ||
val qe = spark.sql(queryString).queryExecution | ||
val plan = qe.executedPlan | ||
val explain = normalizeLocation(normalizeIds(explainString(qe))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: small change here: use of private def explainString
instead of spark 3.0 supported qe.explainString
} | ||
} | ||
|
||
def explainString(queryExecution: QueryExecution): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: new method, not present in original
override def afterAll(): Unit = { | ||
super.afterAll() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: No change from this line onward, until my next comment.
thanks @imback82 , I added the fix to the TODO comment. |
bintray is down https://status.bintray.com/ causing build failures. |
Looks like the issue got fixed. The latest build succeeded. @imback82 , please take a look |
src/test/scala/com/microsoft/hyperspace/goldstandard/PlanStabilitySuite.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks @apoorvedave1!
What is the context for this pull request?
What changes were proposed in this pull request?
In this PR, we are introducing the code for spark-only (non-hyperspace) version of Gold standard. This PR is also LIMITED TO ONLY QUERY 1 of tpcds queries.
In the subsequent PR #337, we will push the updated plans for all the remaining queries q2-q99. The sole aim of this PR is for code validation with an example query q1.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test