Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] Should convert kSpillReadBufferSize and kShuffleSpillDiskWriteBufferSize to number #8684

Merged
merged 3 commits into from
Feb 19, 2025

Conversation

boneanxs
Copy link
Contributor

@boneanxs boneanxs commented Feb 7, 2025

What changes were proposed in this pull request?

Fix the issue introduced by #8045, we could meet the error if manually set spark.unsafe.sorter.spill.reader.buffer.size to value like 2m

org.apache.gluten.exception.GlutenException: Non-whitespace character found after end of conversion: "m"
	at org.apache.gluten.vectorized.PlanEvaluatorJniWrapper.nativeCreateKernelWithIterator(Native Method)
	at org.apache.gluten.vectorized.NativePlanEvaluator.createKernelWithBatchIterator(NativePlanEvaluator.java:68)
	at org.apache.gluten.backendsapi.velox.VeloxIteratorApi.genFirstStageIterator(VeloxIteratorApi.scala:204)
	at org.apache.gluten.execution.GlutenWholeStageColumnarRDD.$anonfun$compute$1(GlutenWholeStageColumnarRDD.scala:88)
	at org.apache.gluten.utils.Arm$.withResource(Arm.scala:25)
	at org.apache.gluten.metrics.GlutenTimeMetric$.millis(GlutenTimeMetric.scala:37)
	at org.apache.gluten.execution.GlutenWholeStageColumnarRDD.compute(GlutenWholeStageColumnarRDD.scala:77)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)
	at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:106)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:380)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:344)

We should parse value to number in bytes before put to nativeConf

(Fixes: #ISSUE-ID)

How was this patch tested?

(Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests)

(If this patch involves UI changes, please attach a screenshot; otherwise, remove this)

@github-actions github-actions bot added the CORE works for Gluten Core label Feb 7, 2025
Copy link

github-actions bot commented Feb 7, 2025

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

Copy link

github-actions bot commented Feb 7, 2025

Run Gluten Clickhouse CI on x86

@boneanxs
Copy link
Contributor Author

boneanxs commented Feb 7, 2025

@jinchengchenghh @FelixYBW Hey, could you please help review this? Thanks!

@Yohahaha
Copy link
Contributor

Yohahaha commented Feb 7, 2025

object GlutenConfigUtil {
private def getConfString(configProvider: ConfigProvider, key: String, value: String): String = {
Option(ConfigEntry.findEntry(key))
.map {
_.readFrom(configProvider) match {
case o: Option[_] => o.map(_.toString).getOrElse(value)
case null => value
case v => v.toString
}
}
.getOrElse(value)
}
def parseConfig(conf: Map[String, String]): Map[String, String] = {
val provider = new MapProvider(conf.filter(_._1.startsWith("spark.gluten.")))
conf.map {
case (k, v) =>
if (k.startsWith("spark.gluten.")) {
(k, getConfString(provider, k, v))
} else {
(k, v)
}
}.toMap
}
}

spark.unsafe.sorter.spill.reader.buffer.size=2m should be converted by above codes, could you investigate why it not works?

@Yohahaha
Copy link
Contributor

Yohahaha commented Feb 8, 2025

object GlutenConfigUtil {
private def getConfString(configProvider: ConfigProvider, key: String, value: String): String = {
Option(ConfigEntry.findEntry(key))
.map {
_.readFrom(configProvider) match {
case o: Option[_] => o.map(_.toString).getOrElse(value)
case null => value
case v => v.toString
}
}
.getOrElse(value)
}
def parseConfig(conf: Map[String, String]): Map[String, String] = {
val provider = new MapProvider(conf.filter(_._1.startsWith("spark.gluten.")))
conf.map {
case (k, v) =>
if (k.startsWith("spark.gluten.")) {
(k, getConfString(provider, k, v))
} else {
(k, v)
}
}.toMap
}
}

spark.unsafe.sorter.spill.reader.buffer.size=2m should be converted by above codes, could you investigate why it not works?

oh, GlutenConfigUtil only process the config which prefix is 'spark.gluten'.

@boneanxs
Copy link
Contributor Author

boneanxs commented Feb 8, 2025

object GlutenConfigUtil {
private def getConfString(configProvider: ConfigProvider, key: String, value: String): String = {
Option(ConfigEntry.findEntry(key))
.map {
_.readFrom(configProvider) match {
case o: Option[_] => o.map(_.toString).getOrElse(value)
case null => value
case v => v.toString
}
}
.getOrElse(value)
}
def parseConfig(conf: Map[String, String]): Map[String, String] = {
val provider = new MapProvider(conf.filter(_._1.startsWith("spark.gluten.")))
conf.map {
case (k, v) =>
if (k.startsWith("spark.gluten.")) {
(k, getConfString(provider, k, v))
} else {
(k, v)
}
}.toMap
}
}

spark.unsafe.sorter.spill.reader.buffer.size=2m should be converted by above codes, could you investigate why it not works?

oh, GlutenConfigUtil only process the config which prefix is 'spark.gluten'.

Do we need to extend this function to allow all configures?

@Yohahaha
Copy link
Contributor

Yohahaha commented Feb 8, 2025

Do we need to extend this function to allow all configures?

yeah, we may need add new method GlutenConfigUtil#get(ConfigEntry) to process non-sql configs.

@boneanxs
Copy link
Contributor Author

Do we need to extend this function to allow all configures?

yeah, we may need add new method GlutenConfigUtil#get(ConfigEntry) to process non-sql configs.

Hey @Yohahaha We cannot use ConfigEntry since they are private in Spark package only

https://github.com/apache/spark/blob/cea79dc1918b7f03870fe1cb189da9a152e3bbaf/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1892-L1899

How about only extract a specific method that handle bytes value only to reduce duplicates?

@Yohahaha
Copy link
Contributor

Do we need to extend this function to allow all configures?

yeah, we may need add new method GlutenConfigUtil#get(ConfigEntry) to process non-sql configs.

Hey @Yohahaha We cannot use ConfigEntry since they are private in Spark package only

https://github.com/apache/spark/blob/cea79dc1918b7f03870fe1cb189da9a152e3bbaf/core/src/main/scala/org/apache/spark/internal/config/package.scala#L1892-L1899

How about only extract a specific method that handle bytes value only to reduce duplicates?

sounds good to me.

@boneanxs boneanxs force-pushed the fix_unexpected_character branch from 3cf701b to 590a0a7 Compare February 12, 2025 08:33
Copy link

Run Gluten Clickhouse CI on x86

@boneanxs
Copy link
Contributor Author

@Yohahaha Hey, comments addressed, could you review this again?

Copy link
Contributor

@Yohahaha Yohahaha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! only one comments.


import org.scalatest.funsuite.AnyFunSuite

class GlutenConfigUtilSuite extends AnyFunSuite {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move this suite to gluten-core is better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@Yohahaha
Copy link
Contributor

- csv scan with option string as null
/opt/gluten/ep/_ep/arrow_ep/java/dataset/src/main/cpp/jni_util.cc:79: Failed to update reservation while freeing bytes: Java Exception: java.lang.IncompatibleClassChangeError

/tmp/jnilib-1738162292075553983.tmp(+0x11b6338)[0x7f4bbada5338]
/tmp/jnilib-1738162292075553983.tmp(_ZN5arrow4util8ArrowLogD1Ev+0xed)[0x7f4bbada576d]
/tmp/jnilib-1738162292075553983.tmp(_ZN5arrow7dataset3jni31ReservationListenableMemoryPool4FreeEPhll+0x47d)[0x7f4bba41a58d]
/tmp/jnilib-1738162292075553983.tmp(_ZN5arrow10PoolBufferD0Ev+0x47)[0x7f4bbb04a6e7]
/tmp/jnilib-1738162292075553983.tmp(_ZNSt23_Sp_counted_ptr_inplaceIN5arrow9ArrayDataESaIS1_ELN9__gnu_cxx12_Lock_policyE2EE10_M_disposeEv+0x10e)[0x7f4bba61b58e]
/tmp/jnilib-1738162292075553983.tmp(_ZN5arrow17SimpleRecordBatchD1Ev+0x10e)[0x7f4bbaed394e]
/tmp/jnilib-1738162292075553983.tmp(_ZNSt16_Sp_counted_baseILN9__gnu_cxx12_Lock_policyE2EE10_M_releaseEv+0x3a)[0x7f4bba41146a]
/tmp/jnilib-1738162292075553983.tmp(_ZN5arrow8internal6FnOnceIFvRKNS_10FutureImplEEE6FnImplINS_6FutureISt10shared_ptrINS_11RecordBatchEEE20WrapResultOnComplete8CallbackINSC_14ThenOnCompleteIZNS_23DefaultIfEmptyGeneratorISB_EclEvEUt_NSC_17PassthruOnFailureISI_EEEEEEED0Ev+0x2e)[0x7f4bba465ffe]
/tmp/jnilib-1738162292075553983.tmp(_ZN5arrow18ConcreteFutureImpl22DoMarkFinishedOrFailedENS_11FutureStateE+0x1a4)[0x7f4bbadbf494]
/tmp/jnilib-1738162292075553983.tmp(_ZN5arrow6FutureISt10shared_ptrINS_11RecordBatchEEE12MarkFinishedENS_6ResultIS3_EE+0x96)[0x7f4bba486526]
/tmp/jnilib-1738162292075553983.tmp(_ZN5arrow8internal6FnOnceIFvRKNS_10FutureImplEEE6FnImplINS_6FutureISt10shared_ptrINS_11RecordBatchEEE20WrapResultOnComplete8CallbackINS_6detail16MarkNextFinishedISC_SC_Lb0ELb0EEEEEE6invokeES4_+0x55)[0x7f4bba486895]
/tmp/jnilib-1738162292075553983.tmp(_ZN5arrow18ConcreteFutureImpl21RunOrScheduleCallbackERKSt10shared_ptrINS_10FutureImplEEONS2_14CallbackRecordEb+0x40)[0x7f4bbadbf1c0]
/tmp/jnilib-1738162292075553983.tmp(_ZN5arrow10FutureImpl11AddCallbackENS_8internal6FnOnceIFvRKS0_EEENS_15CallbackOptionsE+0xe5)[0x7f4bbadb5b35]

just ignore this failure, it's a known issue.

@boneanxs boneanxs force-pushed the fix_unexpected_character branch from 590a0a7 to 0258e96 Compare February 18, 2025 08:35
Copy link

Run Gluten Clickhouse CI on x86

@jinchengchenghh
Copy link
Contributor

object GlutenConfigUtil {
private def getConfString(configProvider: ConfigProvider, key: String, value: String): String = {
Option(ConfigEntry.findEntry(key))
.map {
_.readFrom(configProvider) match {
case o: Option[_] => o.map(_.toString).getOrElse(value)
case null => value
case v => v.toString
}
}
.getOrElse(value)
}
def parseConfig(conf: Map[String, String]): Map[String, String] = {
val provider = new MapProvider(conf.filter(_._1.startsWith("spark.gluten.")))
conf.map {
case (k, v) =>
if (k.startsWith("spark.gluten.")) {
(k, getConfString(provider, k, v))
} else {
(k, v)
}
}.toMap
}
}

spark.unsafe.sorter.spill.reader.buffer.size=2m should be converted by above codes, could you investigate why it not works?

oh, GlutenConfigUtil only process the config which prefix is 'spark.gluten'.

Do we need to update it from spark.gluten to spark.? We may map other Spark config with Velox config.
We could also keep this implementation because it may take effect for the Spark downstream project such as Iceberg config in the future. How about it? @Yohahaha

@jackylee-ch
Copy link
Contributor

Do we need to update it from spark.gluten to spark.? We may map other Spark config with Velox config. We could also keep this implementation because it may take effect for the Spark downstream project such as Iceberg config in the future. How about it? @Yohahaha

The current code only gets the configuration of gluten, as ConfigEntry has been refactored. we need another method to handle the ConfigEntry of vanilla spark.
BTW, fully scanning all spark conf will be time-consuming. Maybe we can add a spark config registration list so that only the registered conf will be passed.

.put(
SPARK_SHUFFLE_FILE_BUFFER,
(JavaUtils.byteStringAs(v, ByteUnit.KiB) * 1024).toString))
GlutenConfigUtil.mapByteConfValue(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should add a new list like keyWithDefault, move the bytes config together, it will be more easier for users to add byte config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is byteKeys

@jinchengchenghh
Copy link
Contributor

Do we need to update it from spark.gluten to spark.? We may map other Spark config with Velox config. We could also keep this implementation because it may take effect for the Spark downstream project such as Iceberg config in the future. How about it? @Yohahaha

The current code only gets the configuration of gluten, as ConfigEntry has been refactored. we need another method to handle the ConfigEntry of vanilla spark. BTW, fully scanning all spark conf will be time-consuming. Maybe we can add a spark config registration list so that only the registered conf will be passed.

Yes, maybe add a list to parseConfig is more reasonable.

@Yohahaha
Copy link
Contributor

thanks for the comments, we could treat these comments as #8326 's subtask, or would you keep iterating in current PR? @boneanxs

I think we can merge current PR if above comments does not blocking.

@jinchengchenghh
Copy link
Contributor

ok CC @Yohahaha @boneanxs

conf,
SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE,
ByteUnit.BYTE)(v => nativeConfMap.put(SPARK_SHUFFLE_SPILL_DISK_WRITE_BUFFER_SIZE, v.toString))
GlutenConfigUtil.mapByteConfValue(conf, SPARK_SHUFFLE_FILE_BUFFER, ByteUnit.KiB)(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is JavaUtils.byteStringAs(v, ByteUnit.KiB) * 1024) equal to JavaUtils.byteStringAs(v, ByteUnit.BYTE)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, if v doesn't provide unit, then this unit provided will be used. E.g, when v is 1024, then JavaUtils.byteStringAs(v, ByteUnit.KiB) * 1024) will be 1024 * 1024, while JavaUtils.byteStringAs(v, ByteUnit.BYTE) is 1024.

@jinchengchenghh jinchengchenghh merged commit 9b25ced into apache:main Feb 19, 2025
51 checks passed
kevinw66 pushed a commit to kevinw66/incubator-gluten that referenced this pull request Mar 3, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
CORE works for Gluten Core
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants