From 51809834b432e14beac8a31f9e0076fb7af35c6b Mon Sep 17 00:00:00 2001
From: ChengJie1053 <18033291053@163.com>
Date: Tue, 11 Apr 2023 18:34:47 +0800
Subject: [PATCH 1/2] adapt Spark 3.x
---
rocketmq-spark/pom.xml | 6 +++---
.../org/apache/spark/sql/rocketmq/RocketMQSource.scala | 1 -
.../apache/spark/sql/rocketmq/RocketMQSourceOffset.scala | 5 ++---
.../org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala | 2 +-
.../main/scala/org/apache/spark/streaming/RocketMqRDD.scala | 2 +-
5 files changed, 7 insertions(+), 9 deletions(-)
diff --git a/rocketmq-spark/pom.xml b/rocketmq-spark/pom.xml
index 87810a536..debba0eec 100644
--- a/rocketmq-spark/pom.xml
+++ b/rocketmq-spark/pom.xml
@@ -34,9 +34,9 @@
1.8
1.8
4.9.4
- 2.3.0
- 2.11.8
- 2.11
+ 3.3.1
+ 2.12.10
+ 2.12
2.5
diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala
index efc671d7e..b526bf2cb 100644
--- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala
+++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSource.scala
@@ -115,7 +115,6 @@ private class RocketMQSource(
if (content(0) == 'v') {
val indexOfNewLine = content.indexOf("\n")
if (indexOfNewLine > 0) {
- val version = parseVersion(content.substring(0, indexOfNewLine), VERSION)
RocketMQSourceOffset(SerializedOffset(content.substring(indexOfNewLine + 1)))
} else {
throw new IllegalStateException(
diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala
index 7635de38b..c8f702cfe 100644
--- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala
+++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceOffset.scala
@@ -25,15 +25,14 @@
package org.apache.spark.sql.rocketmq
import org.apache.rocketmq.common.message.MessageQueue
+import org.apache.spark.sql.connector.read.streaming.PartitionOffset
import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
-import org.apache.spark.sql.sources.v2.reader.streaming.{PartitionOffset, Offset => OffsetV2}
-
/**
* An [[Offset]] for the [[RocketMQSource]]. This one tracks all partitions of subscribed topics and
* their offsets.
*/
private[rocketmq]
-case class RocketMQSourceOffset(queueToOffsets: Map[MessageQueue, Long]) extends OffsetV2 {
+case class RocketMQSourceOffset(queueToOffsets: Map[MessageQueue, Long]) extends Offset {
override val json = JsonUtils.partitionOffsets(queueToOffsets)
}
diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala
index d30eef1ba..0fcd7bafa 100644
--- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala
+++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceRDD.scala
@@ -174,7 +174,7 @@ private[rocketmq] class RocketMQSourceRDD(
}
}
// Release consumer, either by removing it or indicating we're no longer using it
- context.addTaskCompletionListener { _ =>
+ context.addTaskCompletionListener[Unit] { _ =>
underlying.closeIfNeeded()
}
underlying
diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala b/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala
index 08a23520a..fc4e46bb5 100644
--- a/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala
+++ b/rocketmq-spark/src/main/scala/org/apache/spark/streaming/RocketMqRDD.scala
@@ -178,7 +178,7 @@ class RocketMqRDD (
logDebug(s"Computing topic ${part.topic}, queueId ${part.queueId} " +
s"offsets ${part.partitionOffsetRanges.mkString(",")}")
- context.addTaskCompletionListener{ context => closeIfNeeded() }
+ context.addTaskCompletionListener[Unit]{ context => closeIfNeeded() }
val consumer = if (useConsumerCache) {
From 8df4046118c6da677961c6d1fb2fccfdb3ede20f Mon Sep 17 00:00:00 2001
From: ChengJie1053 <18033291053@163.com>
Date: Thu, 13 Apr 2023 11:07:31 +0800
Subject: [PATCH 2/2] Supports user-defined consumer groups
---
.../sql/rocketmq/RocketMQSourceProvider.scala | 21 +++++++++++--------
1 file changed, 12 insertions(+), 9 deletions(-)
diff --git a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala
index b1c5e6c38..89fbe91f5 100644
--- a/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala
+++ b/rocketmq-spark/src/main/scala/org/apache/spark/sql/rocketmq/RocketMQSourceProvider.scala
@@ -77,13 +77,16 @@ class RocketMQSourceProvider extends DataSourceRegister
// Each running query should use its own group id. Otherwise, the query may be only assigned
// partial data since RocketMQ will assign partitions to multiple consumers having the same group
// id. Hence, we should generate a unique id for each query.
- val uniqueGroupId = s"spark-rocketmq-source-${UUID.randomUUID}-${metadataPath.hashCode.toHexString}"
+ var uniqueGroupId = s"spark-rocketmq-source-${UUID.randomUUID}-${metadataPath.hashCode.toHexString}"
val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) }
val startingStreamOffsets = RocketMQSourceProvider.getRocketMQOffsetRangeLimit(caseInsensitiveParams,
RocketMQConf.CONSUMER_OFFSET, LatestOffsetRangeLimit)
+ if (caseInsensitiveParams.contains(RocketMQConf.CONSUMER_GROUP)) {
+ uniqueGroupId = caseInsensitiveParams.get(RocketMQConf.CONSUMER_GROUP).get
+ }
val offsetReader = new RocketMQOffsetReader(
paramsForDriver(caseInsensitiveParams),
parameters,
@@ -252,10 +255,10 @@ object RocketMQSourceProvider extends Logging {
}
def paramsForDriver(specifiedRocketMQParams: Map[String, String]): ju.Map[String, String] = {
- if (specifiedRocketMQParams.contains(RocketMQConf.CONSUMER_GROUP)) {
- throw new IllegalArgumentException(
- s"Option '${RocketMQConf.CONSUMER_GROUP}' can not be specified")
- }
+// if (specifiedRocketMQParams.contains(RocketMQConf.CONSUMER_GROUP)) {
+// throw new IllegalArgumentException(
+// s"Option '${RocketMQConf.CONSUMER_GROUP}' can not be specified")
+// }
ConfigUpdater("source", specifiedRocketMQParams)
// Set to "earliest" to avoid exceptions. However, RocketMQSource will fetch the initial
// offsets by itself instead of counting on RocketMQConsumer.
@@ -268,10 +271,10 @@ object RocketMQSourceProvider extends Logging {
def paramsForExecutors(
specifiedRocketMQParams: Map[String, String],
uniqueGroupId: String): ju.Map[String, String] = {
- if (specifiedRocketMQParams.contains(RocketMQConf.CONSUMER_GROUP)) {
- throw new IllegalArgumentException(
- s"Option '${RocketMQConf.CONSUMER_GROUP}' can not be specified")
- }
+// if (specifiedRocketMQParams.contains(RocketMQConf.CONSUMER_GROUP)) {
+// throw new IllegalArgumentException(
+// s"Option '${RocketMQConf.CONSUMER_GROUP}' can not be specified")
+// }
ConfigUpdater("executor", specifiedRocketMQParams)
// So that consumers in executors do not mess with any existing group id
.set(RocketMQConf.CONSUMER_GROUP, s"$uniqueGroupId-executor")