Skip to content

Commit 4accdfe

Browse files
authored
feat: support read and write from hive datasource (#100)
* feat: support read and write from hive datasource * feat: connect hive by meta store * refactor: remove show dataFrame
1 parent 20573b3 commit 4accdfe

File tree

8 files changed

+220
-25
lines changed

8 files changed

+220
-25
lines changed

nebula-algorithm/src/main/resources/application.conf

+36-3
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,47 @@
1111
}
1212

1313
data: {
14-
# data source. optional of nebula,nebula-ngql,csv,json
14+
# data source. optional of nebula,nebula-ngql,csv,json,hive
1515
source: csv
16-
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text
16+
# data sink, means the algorithm result will be write into this sink. optional of nebula,csv,text,hive
1717
sink: csv
1818
# if your algorithm needs weight
1919
hasWeight: false
2020
}
2121

22+
# Hive related config
23+
hive: {
24+
#[Optional] spark and hive require configuration on different clusters. Read and write connect hive with this metastore
25+
metaStoreUris: "thrift://hive-metastore-server:9083"
26+
# algo's data source from hive
27+
read: {
28+
#spark sql
29+
sql: "select column_1,column_2,column_3 from database_01.table_01 "
30+
#[Optional] graph source vid mapping with column of sql result.
31+
srcId: "column_1"
32+
#[Optional] graph dest vid mapping with column of sql result
33+
dstId: "column_2"
34+
#[Optional] graph weight mapping with column of sql result
35+
weight: "column_3"
36+
}
37+
38+
# algo result sink into hive
39+
write: {
40+
#save result to hive table
41+
dbTableName: "database_02.table_02"
42+
#[Optional] spark dataframe save mode,optional of Append,Overwrite,ErrorIfExists,Ignore. Default is Overwrite
43+
saveMode: "Overwrite"
44+
#[Optional] if auto create hive table. Default is true
45+
autoCreateTable: true
46+
#[Optional] algorithm result mapping with hive table column name. Default same with column name of algo result dataframe
47+
resultTableColumnMapping: {
48+
# Note: Different algorithms have different output fields, so let's take the pagerank algorithm for example:
49+
_id: "column_1"
50+
pagerank: "pagerank_value"
51+
}
52+
}
53+
}
54+
2255
# NebulaGraph related config
2356
nebula: {
2457
# algo's data source from Nebula. If data.source is nebula, then this nebula.read config can be valid.
@@ -78,7 +111,7 @@
78111
# the algorithm that you are going to execute,pick one from [pagerank, louvain, connectedcomponent,
79112
# labelpropagation, shortestpaths, degreestatic, kcore, stronglyconnectedcomponent, trianglecount,
80113
# betweenness, graphtriangleCount, clusteringcoefficient, bfs, hanp, closeness, jaccard, node2vec]
81-
executeAlgo: graphtrianglecount
114+
executeAlgo: pagerank
82115

83116
# PageRank parameter
84117
pagerank: {

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ object Main {
5555
val algoTime = System.currentTimeMillis()
5656

5757
// writer
58-
saveAlgoResult(algoResult, configs)
58+
saveAlgoResult(sparkConfig.spark, algoResult, configs)
5959
val endTime = System.currentTimeMillis()
6060

6161
sparkConfig.spark.stop()
@@ -149,8 +149,8 @@ object Main {
149149
}
150150
}
151151

152-
private[this] def saveAlgoResult(algoResult: DataFrame, configs: Configs): Unit = {
152+
private[this] def saveAlgoResult(spark: SparkSession, algoResult: DataFrame, configs: Configs): Unit = {
153153
val writer = AlgoWriter.make(configs)
154-
writer.write(algoResult, configs)
154+
writer.write(spark, algoResult, configs)
155155
}
156156
}

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala

+88-12
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import org.apache.log4j.Logger
1212
import scala.collection.JavaConverters._
1313
import com.typesafe.config.{Config, ConfigFactory}
1414
import com.vesoft.nebula.algorithm.config.Configs.readConfig
15+
import com.vesoft.nebula.algorithm.config.Configs.getOrElse
1516

1617
import scala.collection.mutable
1718

@@ -129,6 +130,51 @@ object LocalConfigEntry {
129130
}
130131
}
131132

133+
134+
object HiveConfigEntry {
135+
def apply(config: Config): HiveConfigEntry = {
136+
//uri of hive metastore. eg: thrift://127.0.0.1:9083
137+
val hiveMetaStoreUris: String = getOrElse(config, "hive.metaStoreUris", "")
138+
val readConfigEntry = buildReadConfig(config)
139+
val writeConfigEntry = buildWriteConfig(config)
140+
HiveConfigEntry(hiveMetaStoreUris,readConfigEntry, writeConfigEntry)
141+
}
142+
143+
def buildReadConfig(config: Config): HiveReadConfigEntry = {
144+
//source data of spark sql
145+
val sql: String = getOrElse(config, "hive.read.sql", "")
146+
//the source vertex ID is mapped with the SQL result column name
147+
val srcIdCol: String = getOrElse(config, "hive.read.srcId", "")
148+
//the dest vertex ID is mapped with the SQL result column name
149+
val dstIdCol: String = getOrElse(config, "hive.read.dstId", "")
150+
//the weight is mapped with the SQL result column name
151+
val weightCol: String = getOrElse(config, "hive.read.weight", "")
152+
HiveReadConfigEntry(sql, srcIdCol, dstIdCol, weightCol)
153+
}
154+
155+
def buildWriteConfig(config: Config): HiveWriteConfigEntry = {
156+
//algo result save to hive table
157+
val dbTableName: String = getOrElse(config, "hive.write.dbTableName", "")
158+
//save mode of spark
159+
val saveMode: String = getOrElse(config, "hive.write.saveMode", "")
160+
//Whether the table is automatically created
161+
val autoCreateTable: Boolean = getOrElse(config, "hive.write.autoCreateTable", true)
162+
//algo results dataframe column and hive table column mapping relationships
163+
val resultColumnMapping = mutable.Map[String, String]()
164+
val mappingKey = "hive.write.resultTableColumnMapping"
165+
if (config.hasPath(mappingKey)) {
166+
val mappingConfig = config.getObject(mappingKey)
167+
for (subkey <- mappingConfig.unwrapped().keySet().asScala) {
168+
val key = s"${mappingKey}.${subkey}"
169+
val value = config.getString(key)
170+
resultColumnMapping += subkey -> value
171+
}
172+
}
173+
HiveWriteConfigEntry(dbTableName, saveMode, autoCreateTable, resultColumnMapping)
174+
}
175+
176+
}
177+
132178
/**
133179
* SparkConfigEntry support key-value pairs for spark session.
134180
*
@@ -173,6 +219,34 @@ case class LocalConfigEntry(filePath: String,
173219
}
174220
}
175221

222+
case class HiveConfigEntry(hiveMetaStoreUris: String,
223+
hiveReadConfigEntry: HiveReadConfigEntry,
224+
hiveWriteConfigEntry: HiveWriteConfigEntry) {
225+
override def toString: String = {
226+
s"HiveConfigEntry: {hiveMetaStoreUris:$hiveMetaStoreUris, read: $hiveReadConfigEntry, write: $hiveWriteConfigEntry}"
227+
}
228+
}
229+
230+
case class HiveReadConfigEntry(sql: String,
231+
srcIdCol: String = "srcId",
232+
dstIdCol: String = "dstId",
233+
weightCol: String) {
234+
override def toString: String = {
235+
s"HiveReadConfigEntry: {sql: $sql, srcIdCol: $srcIdCol, dstIdCol: $dstIdCol, " +
236+
s"weightCol:$weightCol}"
237+
}
238+
}
239+
240+
case class HiveWriteConfigEntry(dbTableName: String,
241+
saveMode: String,
242+
autoCreateTable: Boolean,
243+
resultColumnMapping: mutable.Map[String, String]) {
244+
override def toString: String = {
245+
s"HiveWriteConfigEntry: {dbTableName: $dbTableName, saveMode=$saveMode, " +
246+
s"autoCreateTable=$autoCreateTable, resultColumnMapping=$resultColumnMapping}"
247+
}
248+
}
249+
176250
/**
177251
* NebulaConfigEntry
178252
* @param readConfigEntry config for nebula-spark-connector reader
@@ -218,6 +292,7 @@ case class Configs(sparkConfig: SparkConfigEntry,
218292
dataSourceSinkEntry: DataSourceSinkEntry,
219293
nebulaConfig: NebulaConfigEntry,
220294
localConfigEntry: LocalConfigEntry,
295+
hiveConfigEntry: HiveConfigEntry,
221296
algorithmConfig: AlgorithmConfigEntry)
222297

223298
object Configs {
@@ -237,10 +312,11 @@ object Configs {
237312
val dataSourceEntry = DataSourceSinkEntry(config)
238313
val localConfigEntry = LocalConfigEntry(config)
239314
val nebulaConfigEntry = NebulaConfigEntry(config)
240-
val sparkEntry = SparkConfigEntry(config)
241-
val algorithmEntry = AlgorithmConfigEntry(config)
315+
val hiveConfigEntry = HiveConfigEntry(config)
316+
val sparkEntry = SparkConfigEntry(config)
317+
val algorithmEntry = AlgorithmConfigEntry(config)
242318

243-
Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, algorithmEntry)
319+
Configs(sparkEntry, dataSourceEntry, nebulaConfigEntry, localConfigEntry, hiveConfigEntry, algorithmEntry)
244320
}
245321

246322
/**
@@ -277,15 +353,15 @@ object Configs {
277353
}
278354

279355
/**
280-
* Get the value from config by the path. If the path not exist, return the default value.
281-
*
282-
* @param config The config.
283-
* @param path The path of the config.
284-
* @param defaultValue The default value for the path.
285-
*
286-
* @return
287-
*/
288-
private[this] def getOrElse[T](config: Config, path: String, defaultValue: T): T = {
356+
* Get the value from config by the path. If the path not exist, return the default value.
357+
*
358+
* @param config The config.
359+
* @param path The path of the config.
360+
* @param defaultValue The default value for the path.
361+
*
362+
* @return
363+
*/
364+
def getOrElse[T](config: Config, path: String, defaultValue: T): T = {
289365
if (config.hasPath(path)) {
290366
config.getAnyRef(path).asInstanceOf[T]
291367
} else {

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/SparkConfig.scala

+19
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55

66
package com.vesoft.nebula.algorithm.config
77

8+
import com.vesoft.nebula.algorithm.reader.ReaderType
9+
import com.vesoft.nebula.algorithm.writer.WriterType
810
import org.apache.spark.sql.SparkSession
911

1012
case class SparkConfig(spark: SparkSession, partitionNum: Int)
@@ -20,12 +22,29 @@ object SparkConfig {
2022
sparkConfigs.foreach { case (key, value) =>
2123
session.config(key, value)
2224
}
25+
26+
// set hive config
27+
setHiveConfig(session, configs)
28+
2329
val partitionNum = sparkConfigs.getOrElse("spark.app.partitionNum", "0")
2430
val spark = session.getOrCreate()
2531
validate(spark.version, "2.4.*")
2632
SparkConfig(spark, partitionNum.toInt)
2733
}
2834

35+
private def setHiveConfig(session: org.apache.spark.sql.SparkSession.Builder, configs: Configs): Unit = {
36+
val dataSource = configs.dataSourceSinkEntry
37+
if (dataSource.source.equals(ReaderType.hive.stringify)
38+
|| dataSource.sink.equals(WriterType.hive.stringify)) {
39+
session.enableHiveSupport()
40+
val uris = configs.hiveConfigEntry.hiveMetaStoreUris
41+
if (uris != null && uris.trim.nonEmpty) {
42+
session.config("hive.metastore.schema.verification", false)
43+
session.config("hive.metastore.uris", uris)
44+
}
45+
}
46+
}
47+
2948
private def validate(sparkVersion: String, supportedVersions: String*): Unit = {
3049
if (sparkVersion != "UNKNOWN" && !supportedVersions.exists(sparkVersion.matches)) {
3150
throw new RuntimeException(

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/DataReader.scala

+28
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ object DataReader {
2525
case ReaderType.nebulaNgql => new NebulaNgqlReader
2626
case ReaderType.nebula => new NebulaReader
2727
case ReaderType.csv => new CsvReader
28+
case ReaderType.hive => new HiveReader
2829
}
2930
.getOrElse(throw new UnsupportedOperationException("unsupported reader"))
3031
}
@@ -179,3 +180,30 @@ final class JsonReader extends DataReader {
179180
data
180181
}
181182
}
183+
final class HiveReader extends DataReader {
184+
185+
override val tpe: ReaderType = ReaderType.hive
186+
override def read(spark: SparkSession, configs: Configs, partitionNum: Int): DataFrame = {
187+
val readConfig = configs.hiveConfigEntry.hiveReadConfigEntry
188+
val sql = readConfig.sql
189+
val srcIdCol = readConfig.srcIdCol
190+
val dstIdCol = readConfig.dstIdCol
191+
val weightCol = readConfig.weightCol
192+
193+
var data = spark.sql(sql)
194+
195+
if (srcIdCol != null && dstIdCol != null && srcIdCol.trim.nonEmpty && dstIdCol.trim.nonEmpty) {
196+
if (configs.dataSourceSinkEntry.hasWeight && weightCol != null && weightCol.trim.nonEmpty) {
197+
data = data.select(srcIdCol, dstIdCol, weightCol)
198+
} else {
199+
data = data.select(srcIdCol, dstIdCol)
200+
}
201+
}
202+
203+
if (partitionNum != 0) {
204+
data.repartition(partitionNum)
205+
}
206+
207+
data
208+
}
209+
}

nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/reader/ReaderType.scala

+4-1
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,20 @@ sealed trait ReaderType {
1717
case ReaderType.nebulaNgql => "nebula-ngql"
1818
case ReaderType.nebula => "nebula"
1919
case ReaderType.csv => "csv"
20+
case ReaderType.hive => "hive"
2021
}
2122
}
2223
object ReaderType {
2324
lazy val mapping: Map[String, ReaderType] = Map(
2425
json.stringify -> json,
2526
nebulaNgql.stringify -> nebulaNgql,
2627
nebula.stringify -> nebula,
27-
csv.stringify -> csv
28+
csv.stringify -> csv,
29+
hive.stringify -> hive
2830
)
2931
object json extends ReaderType
3032
object nebulaNgql extends ReaderType
3133
object nebula extends ReaderType
3234
object csv extends ReaderType
35+
object hive extends ReaderType
3336
}

0 commit comments

Comments
 (0)