Skip to content

Commit a6e8905

Browse files
committed
bhj optimization to ensure the hash table built once per executor
1 parent 5170be1 commit a6e8905

30 files changed

+1168
-44
lines changed

backends-velox/pom.xml

+4
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,10 @@
284284
<version>${project.version}</version>
285285
<scope>compile</scope>
286286
</dependency>
287+
<dependency>
288+
<groupId>com.github.ben-manes.caffeine</groupId>
289+
<artifactId>caffeine</artifactId>
290+
</dependency>
287291
<dependency>
288292
<groupId>org.scalacheck</groupId>
289293
<artifactId>scalacheck_${scala.binary.version}</artifactId>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.vectorized;
18+
19+
import org.apache.gluten.runtime.Runtime;
20+
import org.apache.gluten.runtime.RuntimeAware;
21+
22+
public class HashJoinBuilder implements RuntimeAware {
23+
private final Runtime runtime;
24+
25+
private HashJoinBuilder(Runtime runtime) {
26+
this.runtime = runtime;
27+
}
28+
29+
public static HashJoinBuilder create(Runtime runtime) {
30+
return new HashJoinBuilder(runtime);
31+
}
32+
33+
@Override
34+
public long rtHandle() {
35+
return runtime.getHandle();
36+
}
37+
38+
public static native void clearHashTable(long hashTableData);
39+
40+
public static native long cloneHashTable(long hashTableData);
41+
42+
public static native long nativeBuild(
43+
String buildHashTableId,
44+
long[] batchHandlers,
45+
long rowCount,
46+
String joinKeys,
47+
int joinType,
48+
boolean hasMixedFiltCondition,
49+
boolean isExistenceJoin,
50+
byte[] namedStruct,
51+
boolean isNullAwareAntiJoin,
52+
boolean hasNullKeyValues);
53+
}

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxBackend.scala

+5
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
9595
val GLUTEN_VELOX_INTERNAL_UDF_LIB_PATHS = VeloxBackend.CONF_PREFIX + ".internal.udfLibraryPaths"
9696
val GLUTEN_VELOX_UDF_ALLOW_TYPE_CONVERSION = VeloxBackend.CONF_PREFIX + ".udfAllowTypeConversion"
9797

98+
val GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME: String =
99+
VeloxBackend.CONF_PREFIX + ("broadcast.cache.expired.time")
100+
// unit: SECONDS, default 1 day
101+
val GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT: Int = 86400
102+
98103
/** The columnar-batch type this backend is by default using. */
99104
override def primaryBatchType: Convention.BatchType = VeloxBatch
100105

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala

+9
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import org.apache.gluten.columnarbatch.ArrowBatches.{ArrowJavaBatch, ArrowNative
2121
import org.apache.gluten.columnarbatch.VeloxBatch
2222
import org.apache.gluten.config.GlutenConfig
2323
import org.apache.gluten.config.VeloxConfig._
24+
import org.apache.gluten.execution.VeloxBroadcastBuildSideCache
2425
import org.apache.gluten.execution.datasource.GlutenFormatFactory
2526
import org.apache.gluten.expression.UDFMappings
2627
import org.apache.gluten.extension.columnar.transition.Convention
@@ -32,7 +33,9 @@ import org.apache.gluten.utils._
3233
import org.apache.spark.{HdfsConfGenerator, ShuffleDependency, SparkConf, SparkContext}
3334
import org.apache.spark.api.plugin.PluginContext
3435
import org.apache.spark.internal.Logging
36+
import org.apache.spark.listener.VeloxGlutenSQLAppStatusListener
3537
import org.apache.spark.network.util.ByteUnit
38+
import org.apache.spark.rpc.{GlutenDriverEndpoint, GlutenExecutorEndpoint}
3639
import org.apache.spark.shuffle.{ColumnarShuffleDependency, LookupKey, ShuffleManagerRegistry}
3740
import org.apache.spark.shuffle.sort.ColumnarShuffleManager
3841
import org.apache.spark.sql.execution.ColumnarCachedBatchSerializer
@@ -50,6 +53,9 @@ class VeloxListenerApi extends ListenerApi with Logging {
5053
import VeloxListenerApi._
5154

5255
override def onDriverStart(sc: SparkContext, pc: PluginContext): Unit = {
56+
GlutenDriverEndpoint.glutenDriverEndpointRef = (new GlutenDriverEndpoint).self
57+
VeloxGlutenSQLAppStatusListener.registerListener(sc)
58+
5359
val conf = pc.conf()
5460

5561
// When the Velox cache is enabled, the Velox file handle cache should also be enabled.
@@ -123,6 +129,8 @@ class VeloxListenerApi extends ListenerApi with Logging {
123129
override def onDriverShutdown(): Unit = shutdown()
124130

125131
override def onExecutorStart(pc: PluginContext): Unit = {
132+
GlutenExecutorEndpoint.executorEndpoint = new GlutenExecutorEndpoint(pc.executorID, pc.conf)
133+
126134
val conf = pc.conf()
127135

128136
// Static initializers for executor.
@@ -215,6 +223,7 @@ class VeloxListenerApi extends ListenerApi with Logging {
215223

216224
private def shutdown(): Unit = {
217225
// TODO shutdown implementation in velox to release resources
226+
VeloxBroadcastBuildSideCache.cleanAll()
218227
}
219228
}
220229

backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala

+8
Original file line numberDiff line numberDiff line change
@@ -639,6 +639,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi {
639639
}
640640
numOutputRows += serialized.map(_.getNumRows).sum
641641
dataSize += rawSize
642+
643+
val (buildKeys, isNullAware) = mode match {
644+
case mode1: HashedRelationBroadcastMode =>
645+
(mode1.key, mode1.isNullAware)
646+
case _ =>
647+
// IdentityBroadcastMode
648+
(Seq.empty, false)
649+
}
642650
if (useOffheapBroadcastBuildRelation) {
643651
TaskResources.runUnsafe {
644652
new UnsafeColumnarBuildSideRelation(child.output, serialized.map(_.getSerialized), mode)

backends-velox/src/main/scala/org/apache/gluten/execution/HashJoinExecTransformer.scala

+85-3
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,65 @@
1717
package org.apache.gluten.execution
1818

1919
import org.apache.spark.rdd.RDD
20+
import org.apache.spark.rpc.GlutenDriverEndpoint
2021
import org.apache.spark.sql.catalyst.expressions._
21-
import org.apache.spark.sql.catalyst.optimizer.BuildSide
22+
import org.apache.spark.sql.catalyst.optimizer.{BuildRight, BuildSide}
2223
import org.apache.spark.sql.catalyst.plans._
23-
import org.apache.spark.sql.execution.SparkPlan
24+
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
2425
import org.apache.spark.sql.execution.joins.BuildSideRelation
2526
import org.apache.spark.sql.vectorized.ColumnarBatch
2627

2728
import io.substrait.proto.JoinRel
2829

30+
object JoinTypeTransform {
31+
32+
// ExistenceJoin is introduced in #SPARK-14781. It returns all rows from the left table with
33+
// a new column to indecate whether the row is matched in the right table.
34+
// Indeed, the ExistenceJoin is transformed into left any join in CH.
35+
// We don't have left any join in substrait, so use left semi join instead.
36+
// and isExistenceJoin is set to true to indicate that it is an existence join.
37+
def toSubstraitJoinType(sparkJoin: JoinType, buildRight: Boolean): JoinRel.JoinType =
38+
sparkJoin match {
39+
case _: InnerLike =>
40+
JoinRel.JoinType.JOIN_TYPE_INNER
41+
case FullOuter =>
42+
JoinRel.JoinType.JOIN_TYPE_OUTER
43+
case LeftOuter =>
44+
if (!buildRight) {
45+
JoinRel.JoinType.JOIN_TYPE_RIGHT
46+
} else {
47+
JoinRel.JoinType.JOIN_TYPE_LEFT
48+
}
49+
case RightOuter =>
50+
if (!buildRight) {
51+
JoinRel.JoinType.JOIN_TYPE_LEFT
52+
} else {
53+
JoinRel.JoinType.JOIN_TYPE_RIGHT
54+
}
55+
case LeftSemi =>
56+
if (!buildRight) {
57+
JoinRel.JoinType.JOIN_TYPE_RIGHT_SEMI
58+
} else {
59+
JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
60+
}
61+
case LeftAnti =>
62+
if (!buildRight) {
63+
JoinRel.JoinType.JOIN_TYPE_RIGHT_ANTI
64+
} else {
65+
JoinRel.JoinType.JOIN_TYPE_LEFT_ANTI
66+
}
67+
case ExistenceJoin(_) =>
68+
if (!buildRight) {
69+
throw new IllegalArgumentException("Existence join should not switch children")
70+
}
71+
JoinRel.JoinType.JOIN_TYPE_LEFT_SEMI
72+
case _ =>
73+
// TODO: Support cross join with Cross Rel
74+
JoinRel.JoinType.UNRECOGNIZED
75+
}
76+
77+
}
78+
2979
case class ShuffledHashJoinExecTransformer(
3080
leftKeys: Seq[Expression],
3181
rightKeys: Seq[Expression],
@@ -99,6 +149,9 @@ case class BroadcastHashJoinExecTransformer(
99149
right,
100150
isNullAwareAntiJoin) {
101151

152+
// Unique ID for builded table
153+
lazy val buildBroadcastTableId: String = buildPlan.id.toString
154+
102155
override protected lazy val substraitJoinType: JoinRel.JoinType = joinType match {
103156
case _: InnerLike =>
104157
JoinRel.JoinType.JOIN_TYPE_INNER
@@ -125,9 +178,38 @@ case class BroadcastHashJoinExecTransformer(
125178

126179
override def columnarInputRDDs: Seq[RDD[ColumnarBatch]] = {
127180
val streamedRDD = getColumnarInputRDDs(streamedPlan)
181+
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
182+
if (executionId != null) {
183+
GlutenDriverEndpoint.collectResources(executionId, buildBroadcastTableId)
184+
} else {
185+
logWarning(
186+
s"Can't not trace broadcast table data $buildBroadcastTableId" +
187+
s" because execution id is null." +
188+
s" Will clean up until expire time.")
189+
}
190+
128191
val broadcast = buildPlan.executeBroadcast[BuildSideRelation]()
129-
val broadcastRDD = VeloxBroadcastBuildSideRDD(sparkContext, broadcast)
192+
val context =
193+
BroadCastHashJoinContext(
194+
buildKeyExprs,
195+
joinType,
196+
buildSide == BuildRight,
197+
condition.isDefined,
198+
joinType.isInstanceOf[ExistenceJoin],
199+
buildPlan.output,
200+
buildBroadcastTableId)
201+
val broadcastRDD = VeloxBroadcastBuildSideRDD(sparkContext, broadcast, context)
130202
// FIXME: Do we have to make build side a RDD?
131203
streamedRDD :+ broadcastRDD
132204
}
133205
}
206+
207+
case class BroadCastHashJoinContext(
208+
buildSideJoinKeys: Seq[Expression],
209+
joinType: JoinType,
210+
buildRight: Boolean,
211+
hasMixedFiltCondition: Boolean,
212+
isExistenceJoin: Boolean,
213+
buildSideStructure: Seq[Attribute],
214+
buildHashTableId: String,
215+
isNullAwareAntiJoin: Boolean = false)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.gluten.execution
18+
19+
import org.apache.gluten.backendsapi.velox.VeloxBackendSettings
20+
import org.apache.gluten.vectorized.HashJoinBuilder
21+
22+
import org.apache.spark.SparkEnv
23+
import org.apache.spark.broadcast.Broadcast
24+
import org.apache.spark.internal.Logging
25+
import org.apache.spark.sql.execution.ColumnarBuildSideRelation
26+
import org.apache.spark.sql.execution.joins.BuildSideRelation
27+
28+
import com.github.benmanes.caffeine.cache.{Cache, Caffeine, RemovalCause, RemovalListener}
29+
30+
import java.util.concurrent.TimeUnit
31+
32+
case class BroadcastHashTable(pointer: Long, relation: ColumnarBuildSideRelation)
33+
34+
/**
35+
* `CHBroadcastBuildSideCache` is used for controlling to build bhj hash table once.
36+
*
37+
* The complicated part is due to reuse exchange, where multiple BHJ IDs correspond to a
38+
* `ClickHouseBuildSideRelation`.
39+
*/
40+
object VeloxBroadcastBuildSideCache
41+
extends Logging
42+
with RemovalListener[String, BroadcastHashTable] {
43+
44+
private lazy val expiredTime = SparkEnv.get.conf.getLong(
45+
VeloxBackendSettings.GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME,
46+
VeloxBackendSettings.GLUTEN_VELOX_BROADCAST_CACHE_EXPIRED_TIME_DEFAULT
47+
)
48+
49+
// Use for controlling to build bhj hash table once.
50+
// key: hashtable id, value is hashtable backend pointer(long to string).
51+
private val buildSideRelationCache: Cache[String, BroadcastHashTable] =
52+
Caffeine.newBuilder
53+
.expireAfterAccess(expiredTime, TimeUnit.SECONDS)
54+
.removalListener(this)
55+
.build[String, BroadcastHashTable]()
56+
57+
def getOrBuildBroadcastHashTable(
58+
broadcast: Broadcast[BuildSideRelation],
59+
broadCastContext: BroadCastHashJoinContext): BroadcastHashTable = {
60+
61+
buildSideRelationCache
62+
.get(
63+
broadCastContext.buildHashTableId,
64+
(broadcast_id: String) => {
65+
val (pointer, relation) =
66+
broadcast.value
67+
.asInstanceOf[ColumnarBuildSideRelation]
68+
.buildHashTable(broadCastContext)
69+
logDebug(s"Create bhj $broadcast_id = 0x${pointer.toHexString}")
70+
BroadcastHashTable(pointer, relation)
71+
}
72+
)
73+
}
74+
75+
/** This is callback from c++ backend. */
76+
def get(broadcastHashtableId: String): Long =
77+
Option(buildSideRelationCache.getIfPresent(broadcastHashtableId))
78+
.map(_.pointer)
79+
.getOrElse(0)
80+
81+
def invalidateBroadcastHashtable(broadcastHashtableId: String): Unit = {
82+
// Cleanup operations on the backend are idempotent.
83+
buildSideRelationCache.invalidate(broadcastHashtableId)
84+
}
85+
86+
/** Only used in UT. */
87+
def size(): Long = buildSideRelationCache.estimatedSize()
88+
89+
def cleanAll(): Unit = buildSideRelationCache.invalidateAll()
90+
91+
override def onRemoval(key: String, value: BroadcastHashTable, cause: RemovalCause): Unit = {
92+
synchronized {
93+
logDebug(s"Remove bhj $key = 0x${value.pointer.toHexString}")
94+
if (value.relation != null) {
95+
value.relation.reset()
96+
}
97+
98+
HashJoinBuilder.clearHashTable(value.pointer)
99+
}
100+
}
101+
}

backends-velox/src/main/scala/org/apache/gluten/execution/VeloxBroadcastBuildSideRDD.scala

+4-8
Original file line numberDiff line numberDiff line change
@@ -16,22 +16,18 @@
1616
*/
1717
package org.apache.gluten.execution
1818

19-
import org.apache.gluten.iterator.Iterators
20-
2119
import org.apache.spark.{broadcast, SparkContext}
2220
import org.apache.spark.sql.execution.joins.BuildSideRelation
2321
import org.apache.spark.sql.vectorized.ColumnarBatch
2422

2523
case class VeloxBroadcastBuildSideRDD(
2624
@transient private val sc: SparkContext,
27-
broadcasted: broadcast.Broadcast[BuildSideRelation])
25+
broadcasted: broadcast.Broadcast[BuildSideRelation],
26+
broadcastContext: BroadCastHashJoinContext)
2827
extends BroadcastBuildSideRDD(sc, broadcasted) {
2928

3029
override def genBroadcastBuildSideIterator(): Iterator[ColumnarBatch] = {
31-
val relation = broadcasted.value.asReadOnlyCopy()
32-
Iterators
33-
.wrap(relation.deserialized)
34-
.recyclePayload(batch => batch.close())
35-
.create()
30+
VeloxBroadcastBuildSideCache.getOrBuildBroadcastHashTable(broadcasted, broadcastContext)
31+
Iterator.empty
3632
}
3733
}

0 commit comments

Comments
 (0)