Skip to content

Commit 97c67f2

Browse files
authored
Merge pull request #36 from wyy566/Spark2.4.6
updata ml algorithm lib
2 parents 38369bc + 0d77db2 commit 97c67f2

File tree

30 files changed

+938
-50
lines changed

30 files changed

+938
-50
lines changed

README.md

+5-5
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Introduction
77

88
The machine learning algorithm library running on Kunpeng processors is an acceleration library that provides a rich set of high-level tools for machine learning algorithms. It is based on the original APIs of Apache [Spark 2.4.6](https://github.com/apache/spark/tree/v2.4.6), [breeze 0.13.1](https://github.com/scalanlp/breeze/tree/releases/v0.13.1) and [xgboost 1.1.0](https://github.com/dmlc/xgboost/tree/release_1.0.0). The acceleration library for greatly improves the computing power in big data scenarios.
99

10-
The library provides eighteen machine learning algorithms: support vector machine (SVM), random forest classifier (RFC), gradient boosting decision tree (GBDT), decision tree (DT), K-means clustering, linear regression, logistic regression algorithm, principal component analysis (PCA), singular value decomposition (SVD), latent dirichlet allocation (LDA), prefix-projected pattern prowth (Prefix-Span), alternating least squares (ALS), K-nearest neighbors (KNN), Covariance, Density-based spatial clustering of applicaitons with noise (DBSCAN), Pearson, Spearman, and XGboost. You can find the latest documentation on the project web page. This README file contains only basic setup instructions.
10+
The library provides 21 machine learning algorithms: support vector machine (SVM), random forest classifier (RFC), gradient boosting decision tree (GBDT), decision tree (DT), K-means clustering, linear regression, logistic regression algorithm, principal component analysis (PCA), principal component analysis for Sparse Matrix(SPCA), singular value decomposition (SVD), latent dirichlet allocation (LDA), prefix-projected pattern prowth (Prefix-Span), alternating least squares (ALS), K-nearest neighbors (KNN), Covariance, Density-based spatial clustering of applicaitons with noise (DBSCAN), Pearson, Spearman, XGboost, Inverse Document Frequency(IDF), and SimRank. You can find the latest documentation on the project web page. This README file contains only basic setup instructions.
1111
You can find the latest documentation, including a programming guide, on the project web page. This README file only contains basic setup instructions.
1212

1313

@@ -25,13 +25,13 @@ Building And Packageing
2525

2626
mvn clean package
2727

28-
(3) Obtain "boostkit-ml-core_2.11-1.3.0-spark2.4.6.jar" under the "Spark-ml-algo-lib/ml-core/target" directory.
28+
(3) Obtain "boostkit-ml-core_2.11-2.1.0-spark2.4.6.jar" under the "Spark-ml-algo-lib/ml-core/target" directory.
2929

30-
Obtain "boostkit-ml-acc_2.11-1.3.0-spark2.4.6.jar" under the "Spark-ml-algo-lib/ml-accelerator/target" directory.
30+
Obtain "boostkit-ml-acc_2.11-2.1.0-spark2.4.6.jar" under the "Spark-ml-algo-lib/ml-accelerator/target" directory.
3131

32-
Obtain "boostkit-xgboost4j_2.11-1.3.0.jar" under the "Spark-ml-algo-lib/ml-xgboost/jvm-packages/boostkit-xgboost4j/target" directory.
32+
Obtain "boostkit-xgboost4j_2.11-2.1.0.jar" under the "Spark-ml-algo-lib/ml-xgboost/jvm-packages/boostkit-xgboost4j/target" directory.
3333

34-
Obtain "boostkit-xgboost4j-spark2.4.6_2.11-1.3.0.jar" under the "Spark-ml-algo-lib/ml-xgboost/jvm-packages/boostkit-xgboost4j-spark/target" directory.
34+
Obtain "boostkit-xgboost4j-spark2.4.6_2.11-2.1.0.jar" under the "Spark-ml-algo-lib/ml-xgboost/jvm-packages/boostkit-xgboost4j-spark/target" directory.
3535

3636

3737
Contribution Guidelines

ml-accelerator/pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,12 @@
22
<parent>
33
<groupId>org.apache.spark</groupId>
44
<artifactId>boostkit-ml</artifactId>
5-
<version>1.3.0</version>
5+
<version>2.1.0</version>
66
</parent>
77

88
<modelVersion>4.0.0</modelVersion>
99
<artifactId>boostkit-ml-acc_2.11</artifactId>
10-
<version>1.3.0</version>
10+
<version>2.1.0</version>
1111
<name>${project.artifactId}</name>
1212
<description>Spark ml algo accelerator</description>
1313

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ml.feature
19+
20+
import org.apache.hadoop.fs.Path
21+
22+
import org.apache.spark.annotation.Since
23+
import org.apache.spark.ml._
24+
import org.apache.spark.ml.linalg.{Vector, VectorUDT}
25+
import org.apache.spark.ml.param._
26+
import org.apache.spark.ml.param.shared._
27+
import org.apache.spark.ml.util._
28+
import org.apache.spark.mllib.feature
29+
import org.apache.spark.mllib.linalg.{Vector => OldVector, Vectors => OldVectors}
30+
import org.apache.spark.mllib.util.MLUtils
31+
import org.apache.spark.rdd.RDD
32+
import org.apache.spark.sql._
33+
import org.apache.spark.sql.functions._
34+
import org.apache.spark.sql.types.StructType
35+
36+
/**
37+
* Params for [[IDF]] and [[IDFModel]].
38+
*/
39+
private[feature] trait IDFBase extends Params with HasInputCol with HasOutputCol {
40+
41+
/**
42+
* The minimum number of documents in which a term should appear.
43+
* Default: 0
44+
* @group param
45+
*/
46+
final val minDocFreq = new IntParam(
47+
this, "minDocFreq", "minimum number of documents in which a term should appear for filtering" +
48+
" (>= 0)", ParamValidators.gtEq(0))
49+
50+
setDefault(minDocFreq -> 0)
51+
52+
/** @group getParam */
53+
def getMinDocFreq: Int = $(minDocFreq)
54+
55+
/**
56+
* Validate and transform the input schema.
57+
*/
58+
protected def validateAndTransformSchema(schema: StructType): StructType = {
59+
SchemaUtils.checkColumnType(schema, $(inputCol), new VectorUDT)
60+
SchemaUtils.appendColumn(schema, $(outputCol), new VectorUDT)
61+
}
62+
}
63+
64+
/**
65+
* Compute the Inverse Document Frequency (IDF) given a collection of documents.
66+
*/
67+
@Since("1.4.0")
68+
final class IDF @Since("1.4.0") (@Since("1.4.0") override val uid: String)
69+
extends Estimator[IDFModel] with IDFBase with DefaultParamsWritable {
70+
71+
@Since("1.4.0")
72+
def this() = this(Identifiable.randomUID("idf"))
73+
74+
/** @group setParam */
75+
@Since("1.4.0")
76+
def setInputCol(value: String): this.type = set(inputCol, value)
77+
78+
/** @group setParam */
79+
@Since("1.4.0")
80+
def setOutputCol(value: String): this.type = set(outputCol, value)
81+
82+
/** @group setParam */
83+
@Since("1.4.0")
84+
def setMinDocFreq(value: Int): this.type = set(minDocFreq, value)
85+
86+
@Since("2.0.0")
87+
override def fit(dataset: Dataset[_]): IDFModel = {
88+
transformSchema(dataset.schema, logging = true)
89+
val input: RDD[OldVector] = dataset.select($(inputCol)).rdd.map {
90+
case Row(v: Vector) => OldVectors.fromML(v)
91+
}
92+
val idf = new feature.IDF($(minDocFreq)).fit(input)
93+
copyValues(new IDFModel(uid, idf).setParent(this))
94+
}
95+
96+
@Since("1.4.0")
97+
override def transformSchema(schema: StructType): StructType = {
98+
validateAndTransformSchema(schema)
99+
}
100+
101+
@Since("1.4.1")
102+
override def copy(extra: ParamMap): IDF = defaultCopy(extra)
103+
}
104+
105+
@Since("1.6.0")
106+
object IDF extends DefaultParamsReadable[IDF] {
107+
108+
@Since("1.6.0")
109+
override def load(path: String): IDF = super.load(path)
110+
}
111+
112+
/**
113+
* Model fitted by [[IDF]].
114+
*/
115+
@Since("1.4.0")
116+
class IDFModel private[ml] (
117+
@Since("1.4.0") override val uid: String,
118+
idfModel: feature.IDFModel)
119+
extends Model[IDFModel] with IDFBase with MLWritable {
120+
121+
import IDFModel._
122+
123+
/** @group setParam */
124+
@Since("1.4.0")
125+
def setInputCol(value: String): this.type = set(inputCol, value)
126+
127+
/** @group setParam */
128+
@Since("1.4.0")
129+
def setOutputCol(value: String): this.type = set(outputCol, value)
130+
131+
@Since("2.0.0")
132+
override def transform(dataset: Dataset[_]): DataFrame = {
133+
transformSchema(dataset.schema, logging = true)
134+
// TODO: Make the idfModel.transform natively in ml framework to avoid extra conversion.
135+
val idf = udf { vec: Vector => idfModel.transform(OldVectors.fromML(vec)).asML }
136+
dataset.withColumn($(outputCol), idf(col($(inputCol))))
137+
}
138+
139+
@Since("1.4.0")
140+
override def transformSchema(schema: StructType): StructType = {
141+
validateAndTransformSchema(schema)
142+
}
143+
144+
@Since("1.4.1")
145+
override def copy(extra: ParamMap): IDFModel = {
146+
val copied = new IDFModel(uid, idfModel)
147+
copyValues(copied, extra).setParent(parent)
148+
}
149+
150+
/** Returns the IDF vector. */
151+
@Since("2.0.0")
152+
def idf: Vector = idfModel.idf.asML
153+
154+
@Since("1.6.0")
155+
override def write: MLWriter = new IDFModelWriter(this)
156+
}
157+
158+
@Since("1.6.0")
159+
object IDFModel extends MLReadable[IDFModel] {
160+
161+
private[IDFModel] class IDFModelWriter(instance: IDFModel) extends MLWriter {
162+
163+
private case class Data(idf: Vector)
164+
165+
override protected def saveImpl(path: String): Unit = {
166+
DefaultParamsWriter.saveMetadata(instance, path, sc)
167+
val data = Data(instance.idf)
168+
val dataPath = new Path(path, "data").toString
169+
sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath)
170+
}
171+
}
172+
173+
private class IDFModelReader extends MLReader[IDFModel] {
174+
175+
private val className = classOf[IDFModel].getName
176+
177+
override def load(path: String): IDFModel = {
178+
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
179+
val dataPath = new Path(path, "data").toString
180+
val data = sparkSession.read.parquet(dataPath)
181+
val Row(idf: Vector) = MLUtils.convertVectorColumnsToML(data, "idf")
182+
.select("idf")
183+
.head()
184+
val model = new IDFModel(metadata.uid, new feature.IDFModel(OldVectors.fromML(idf)))
185+
DefaultParamsReader.getAndSetParams(model, metadata)
186+
model
187+
}
188+
}
189+
190+
@Since("1.6.0")
191+
override def read: MLReader[IDFModel] = new IDFModelReader
192+
193+
@Since("1.6.0")
194+
override def load(path: String): IDFModel = super.load(path)
195+
}

ml-accelerator/src/main/scala/org/apache/spark/mllib/clustering/KMACCm.scala

+17-3
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.broadcast.Broadcast
2626
import org.apache.spark.mllib.linalg.BLAS.{axpy, scal}
2727
import org.apache.spark.mllib.linalg.Vectors
2828
import org.apache.spark.rdd.RDD
29-
29+
import org.apache.spark.sql.SparkSession
3030

3131
object KMACCm {
3232
val DEFAULT_SAMPLE_RATE = 0.05
@@ -85,9 +85,23 @@ object KMACCm {
8585
throw new Exception("'spark.boostkit.Kmeans.sampleRate' value is invalid")
8686
}
8787

88+
val DEFAULT_PAR_LEVEL = 100
89+
var customParLevel = DEFAULT_PAR_LEVEL
90+
try{
91+
customParLevel = SparkSession.builder().getOrCreate()
92+
.sparkContext.getConf.getInt("spark.boostkit.Kmeans.parLevel",
93+
DEFAULT_PAR_LEVEL)
94+
if (customParLevel < 1) {
95+
throw new Exception
96+
}
97+
}
98+
catch {
99+
case x: Exception =>
100+
throw new Exception("'spark.boostkit.Kmeans.parLevel' value is invalid")
101+
}
102+
88103
while (iteration < maxIterations && !converged) {
89-
val s = Array.fill(cl * cl)(0.0)
90-
KmeansUtil.generateDisMatrix(centers, s)
104+
val s = KmeansUtil.generateDisMatrix(centers, customParLevel)
91105
val bcCenters = sc.broadcast(centers)
92106
val bcs = sc.broadcast(s)
93107

ml-accelerator/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala

+1-3
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ class KMeans private(
418418
bcCenters.destroy(blocking = false)
419419

420420
val myWeights = distinctCenters.indices.map(countMap.getOrElse(_, 0L).toDouble).toArray
421-
LocalKMeans.kMeansPlusPlus(0, distinctCenters.toArray, myWeights, k, 30)
421+
LocalKMeansX.kMeansPlusPlus(0, distinctCenters.toArray, myWeights, k, 30)
422422
}
423423
}
424424
}
@@ -614,5 +614,3 @@ object KMeans {
614614
}
615615
}
616616
}
617-
618-

0 commit comments

Comments
 (0)