From 51809834b432e14beac8a31f9e0076fb7af35c6b Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Tue, 11 Apr 2023 18:34:47 +0800 Subject: [PATCH 1/2] adapt Spark 3.x --- rocketmq-spark/pom.xml | 6 +++--- .../org/apache/spark/sql/rocketmq/RocketMQSource.scala | 1 - .../apache/spark/sql/rocketmq/RocketMQSourceOffset.scala | 5 ++--- .../org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala | 2 +- .../main/scala/org/apache/spark/streaming/RocketMqRDD.scala | 2 +- 5 files changed, 7 insertions(+), 9 deletions(-) diff --git a/rocketmq-spark/pom.xml b/rocketmq-spark/pom.xml index 87810a536..debba0eec 100644 --- a/rocketmq-spark/pom.xml +++ b/rocketmq-spark/pom.xml @@ -34,9 +34,9 @@ 1.8 1.8 4.9.4 - 2.3.0 - 2.11.8 - 2.11 + 3.3.1 + 2.12.10 + 2.12 2.5 diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala index efc671d7e..b526bf2cb 100644 --- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala +++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala @@ -115,7 +115,6 @@ private class RocketMQSource( if (content(0) == 'v') { val indexOfNewLine = content.indexOf("\n") if (indexOfNewLine > 0) { - val version = parseVersion(content.substring(0, indexOfNewLine), VERSION) RocketMQSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1))) } else { throw new IllegalStateException( diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala index 7635de38b..c8f702cfe 100644 --- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala +++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala @@ -25,15 +25,14 @@ package org.apache.spark.sql.rocketmq import org.apache.rocketmq.common.message.MessageQueue +import org.apache.spark.sql.connector.read.streaming.PartitionOffset import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} -import org.apache.spark.sql.sources.v2.reader.streaming.{PartitionOffset, Offset => OffsetV2} - /** * An [[Offset]] for the [[RocketMQSource]]. This one tracks all partitions of subscribed topics and * their offsets. */ private[rocketmq] -case class RocketMQSourceOffset(queueToOffsets: Map[MessageQueue, Long]) extends OffsetV2 { +case class RocketMQSourceOffset(queueToOffsets: Map[MessageQueue, Long]) extends Offset { override val json = JsonUtils.partitionOffsets(queueToOffsets) } diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala index d30eef1ba..0fcd7bafa 100644 --- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala +++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala @@ -174,7 +174,7 @@ private[rocketmq] class RocketMQSourceRDD( } } // Release consumer, either by removing it or indicating we're no longer using it - context.addTaskCompletionListener { _ => + context.addTaskCompletionListener[Unit] { _ => underlying.closeIfNeeded() } underlying diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala b/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala index 08a23520a..fc4e46bb5 100644 --- a/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala +++ b/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala @@ -178,7 +178,7 @@ class RocketMqRDD ( logDebug(s"Computing topic ${part.topic}, queueId ${part.queueId} " + s"offsets ${part.partitionOffsetRanges.mkString(",")}") - context.addTaskCompletionListener{ context => closeIfNeeded() } + context.addTaskCompletionListener[Unit]{ context => closeIfNeeded() } val consumer = if (useConsumerCache) { From 8df4046118c6da677961c6d1fb2fccfdb3ede20f Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Thu, 13 Apr 2023 11:07:31 +0800 Subject: [PATCH 2/2] Supports user-defined consumer groups --- .../sql/rocketmq/RocketMQSourceProvider.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala index b1c5e6c38..89fbe91f5 100644 --- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala +++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala @@ -77,13 +77,16 @@ class RocketMQSourceProvider extends DataSourceRegister // Each running query should use its own group id. Otherwise, the query may be only assigned // partial data since RocketMQ will assign partitions to multiple consumers having the same group // id. Hence, we should generate a unique id for each query. - val uniqueGroupId = s"spark-rocketmq-source-${UUID.randomUUID}-${metadataPath.hashCode.toHexString}" + var uniqueGroupId = s"spark-rocketmq-source-${UUID.randomUUID}-${metadataPath.hashCode.toHexString}" val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } val startingStreamOffsets = RocketMQSourceProvider.getRocketMQOffsetRangeLimit(caseInsensitiveParams, RocketMQConf.CONSUMER_OFFSET, LatestOffsetRangeLimit) + if (caseInsensitiveParams.contains(RocketMQConf.CONSUMER_GROUP)) { + uniqueGroupId = caseInsensitiveParams.get(RocketMQConf.CONSUMER_GROUP).get + } val offsetReader = new RocketMQOffsetReader( paramsForDriver(caseInsensitiveParams), parameters, @@ -252,10 +255,10 @@ object RocketMQSourceProvider extends Logging { } def paramsForDriver(specifiedRocketMQParams: Map[String, String]): ju.Map[String, String] = { - if (specifiedRocketMQParams.contains(RocketMQConf.CONSUMER_GROUP)) { - throw new IllegalArgumentException( - s"Option '${RocketMQConf.CONSUMER_GROUP}' can not be specified") - } +// if (specifiedRocketMQParams.contains(RocketMQConf.CONSUMER_GROUP)) { +// throw new IllegalArgumentException( +// s"Option '${RocketMQConf.CONSUMER_GROUP}' can not be specified") +// } ConfigUpdater("source", specifiedRocketMQParams) // Set to "earliest" to avoid exceptions. However, RocketMQSource will fetch the initial // offsets by itself instead of counting on RocketMQConsumer. @@ -268,10 +271,10 @@ object RocketMQSourceProvider extends Logging { def paramsForExecutors( specifiedRocketMQParams: Map[String, String], uniqueGroupId: String): ju.Map[String, String] = { - if (specifiedRocketMQParams.contains(RocketMQConf.CONSUMER_GROUP)) { - throw new IllegalArgumentException( - s"Option '${RocketMQConf.CONSUMER_GROUP}' can not be specified") - } +// if (specifiedRocketMQParams.contains(RocketMQConf.CONSUMER_GROUP)) { +// throw new IllegalArgumentException( +// s"Option '${RocketMQConf.CONSUMER_GROUP}' can not be specified") +// } ConfigUpdater("executor", specifiedRocketMQParams) // So that consumers in executors do not mess with any existing group id .set(RocketMQConf.CONSUMER_GROUP, s"$uniqueGroupId-executor")