Skip to content

Commit 9b25ced

Browse files
authored
[VL] Should convert kSpillReadBufferSize and kShuffleSpillDiskWriteBufferSize to number (#8684)
1 parent 07b8679 commit 9b25ced

File tree

3 files changed

+56
-20
lines changed

3 files changed

+56
-20
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.sql.internal
18+
19+
import org.apache.spark.network.util.ByteUnit
20+
21+
import org.scalatest.funsuite.AnyFunSuite
22+
23+
class GlutenConfigUtilSuite extends AnyFunSuite {
24+
25+
test("mapByteConfValue should return correct value") {
26+
val conf = Map(
27+
"spark.unsafe.sorter.spill.reader.buffer.size" -> "2m"
28+
)
29+
30+
GlutenConfigUtil.mapByteConfValue(
31+
conf,
32+
"spark.unsafe.sorter.spill.reader.buffer.size",
33+
ByteUnit.BYTE)(v => assert(2097152L.equals(v)))
34+
}
35+
}

shims/common/src/main/scala/org/apache/gluten/config/GlutenConfig.scala

+14-20
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@
1717
package org.apache.gluten.config
1818

1919
import org.apache.spark.internal.Logging
20-
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
21-
import org.apache.spark.sql.internal.{SQLConf, SQLConfProvider}
20+
import org.apache.spark.network.util.ByteUnit
21+
import org.apache.spark.sql.internal.{GlutenConfigUtil, SQLConf, SQLConfProvider}
2222

2323
import com.google.common.collect.ImmutableList
2424
import org.apache.hadoop.security.UserGroupInformation
@@ -428,9 +428,7 @@ object GlutenConfig {
428428
val SPARK_REDACTION_REGEX = "spark.redaction.regex"
429429
val SPARK_SHUFFLE_FILE_BUFFER = "spark.shuffle.file.buffer"
430430
val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE = "spark.unsafe.sorter.spill.reader.buffer.size"
431-
val SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
432431
val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE = "spark.shuffle.spill.diskWriteBufferSize"
433-
val SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT: Int = 1024 * 1024
434432
val SPARK_SHUFFLE_SPILL_COMPRESS = "spark.shuffle.spill.compress"
435433
val SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT: Boolean = true
436434

@@ -449,7 +447,7 @@ object GlutenConfig {
449447
*/
450448
def getNativeSessionConf(
451449
backendName: String,
452-
conf: scala.collection.Map[String, String]): util.Map[String, String] = {
450+
conf: Map[String, String]): util.Map[String, String] = {
453451
val nativeConfMap = new util.HashMap[String, String]()
454452
val keys = Set(
455453
DEBUG_ENABLED.key,
@@ -504,24 +502,20 @@ object GlutenConfig {
504502
(
505503
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.key,
506504
GLUTEN_COLUMNAR_TO_ROW_MEM_THRESHOLD.defaultValue.get.toString),
507-
(
508-
SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
509-
SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE_DEFAULT.toString),
510-
(
511-
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
512-
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE_DEFAULT.toString),
513505
(SPARK_SHUFFLE_SPILL_COMPRESS, SPARK_SHUFFLE_SPILL_COMPRESS_DEFAULT.toString)
514506
)
515507
keyWithDefault.forEach(e => nativeConfMap.put(e._1, conf.getOrElse(e._1, e._2)))
516-
517-
conf
518-
.get(SPARK_SHUFFLE_FILE_BUFFER)
519-
.foreach(
520-
v =>
521-
nativeConfMap
522-
.put(
523-
SPARK_SHUFFLE_FILE_BUFFER,
524-
(JavaUtils.byteStringAs(v, ByteUnit.KiB) * 1024).toString))
508+
GlutenConfigUtil.mapByteConfValue(
509+
conf,
510+
SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE,
511+
ByteUnit.BYTE)(
512+
v => nativeConfMap.put(SPARK_UNSAFE_SORTER_SPILL_READER_BUFFER_SIZE, v.toString))
513+
GlutenConfigUtil.mapByteConfValue(
514+
conf,
515+
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
516+
ByteUnit.BYTE)(v => nativeConfMap.put(SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, v.toString))
517+
GlutenConfigUtil.mapByteConfValue(conf, SPARK_SHUFFLE_FILE_BUFFER, ByteUnit.KiB)(
518+
v => nativeConfMap.put(SPARK_SHUFFLE_FILE_BUFFER, (v * 1024).toString))
525519

526520
conf
527521
.get(LEGACY_TIME_PARSER_POLICY.key)

shims/common/src/main/scala/org/apache/spark/sql/internal/GlutenConfigUtil.scala

+7
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package org.apache.spark.sql.internal
1818

1919
import org.apache.gluten.config._
2020

21+
import org.apache.spark.network.util.{ByteUnit, JavaUtils}
22+
2123
object GlutenConfigUtil {
2224
private def getConfString(configProvider: ConfigProvider, key: String, value: String): String = {
2325
Option(ConfigEntry.findEntry(key))
@@ -42,4 +44,9 @@ object GlutenConfigUtil {
4244
}
4345
}.toMap
4446
}
47+
48+
def mapByteConfValue(conf: Map[String, String], key: String, unit: ByteUnit)(
49+
f: Long => Unit): Unit = {
50+
conf.get(key).foreach(v => f(JavaUtils.byteStringAs(v, unit)))
51+
}
4552
}

0 commit comments

Comments
 (0)