大佬教程收集整理的这篇文章主要介绍了Kaggle电信客户流失预测——基于GBDT融合LR,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
package com.fiveonevv.app.Model
import java.io.{FileInputStream, IOException, ObjecTinputStream}
import org.apache.spark.mllib.linalg.{DenseVector, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.configuration.{BoosTingStrategy, FeatureTypE}
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel, NodE}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
class GBDTPreprocessor extends serializable {
/**
*
* @param node 节点
* @return 树的叶子节点
*/
def getLeafNodes(node: NodE): ArraY[Int] = {
var treeLeafNodes = new ArraY[Int](0)
if (node.isLeaf) {
treeLeafNodes = treeLeafNodes.:+(node.id)
} else {
treeLeafNodes = treeLeafNodes ++ getLeafNodes(node.leftNode.get)
treeLeafNodes = treeLeafNodes ++ getLeafNodes(node.rightNode.get)
}
treeLeafNodes
}
/**
*
* @param node 树节点
* @param features 特征数据
* @return 返回样本所在的叶节点id
*/
def preDictModify(node: Node, features: Vector): Int = {
val split = node.split
if (node.isLeaf) {
node.id
} else {
// 判断是连续或者离散特征
if (split.get.featureType == FeatureType.ConTinuous) {
if (features(split.get.featurE) <= split.get.threshold) {
preDictModify(node.leftNode.get, features)
} else {
preDictModify(node.rightNode.get, features)
}
} else {
if (split.get.categories.contains(features(split.get.featurE))) {
preDictModify(node.leftNode.get, features)
} else {
preDictModify(node.rightNode.get, features)
}
}
}
}
def gbtTrain(gbtTrainData: RDD[LabeledPoint], numTrees: int): (GradientBoostedTreesModel, ArraY[ArraY[Int]]) = {
val boosTingStrategy = BoosTingStrategy.defaultParams("Classification")
boosTingStrategy.setNumIterations(numTrees)
val gbdtModel = GradientBoostedTrees.Train(gbtTrainData, boosTingStrategy)
val treeLeafArray = new ArraY[ArraY[Int]](numTrees)
for (i <- 0.until(numTrees)) {
// 获取所有树的叶子节点
treeLeafArray(i) = getLeafNodes(gbdtModel.trees(i).topNodE)
}
(gbdtModel, treeLeafArray)
}
/**
*
* @param gbtTestData 需要生成特征的数据
* @param gbtModel gbt模型
* @param treeLeafArray gbt模型树的所有叶子节点
* @param numTrees 树的数量
* @return
*/
def gbtFeaturePreDict(gbtTestData: RDD[(String, (Double, DenseVector))], gbtModel: GradientBoostedTreesModel, treeLeafArray: ArraY[ArraY[Int]], numTrees: int): RDD[(String, LabeledPoint)] = {
val newFeature = gbtTestData.map(line => {
var gbtFeatures = new ArraY[Double](0)
for (i <- 0.until(numTrees)) {
val treePreDict = preDictModify(gbtModel.trees(i).topNode, line._2._2)
val leafArray = new ArraY[Double]((gbtModel.trees(i).numNodes + 1) / 2) // 完全二叉树叶节点的数量
// 将叶子节点处置为1
leafArray(treeLeafArray(i).indexOf(treePreDict)) = 1 // 输入样本落入叶子节点的位置
gbtFeatures = gbtFeatures ++ leafArray
}
(line._1, line._2._1, gbtFeatures) // id, label, gbtFeatures
})
val gbtFeatureRDD = newFeature.map(
x => (x._1, LabeledPoint(x._2, Vectors.dense(x._3)))
)
gbtFeatureRDD
}
/**
*
* @param data 标签
* @param model 模型
* @param isAppend
* @return G B D T 构造新的特征
*/
def getNodeListWithGBDT(data: RDD[LabeledPoint], model: GradientBoostedTreesModel, spark: SparkSession, isAppend: Boolean): Option[RDD[LabeledPoint]] = {
val numTrees = model.numTrees
// 存放每棵树的叶子节点编号
val treeLeafArray = new ArraY[ArraY[Int]](numTrees)
for (i <- 0.until(numTrees)) {
treeLeafArray(i) = getLeafNodes(model.trees(i).topNodE)
}
// 构造新的特征
val newData:rDD[LabeledPoint] = data.map(line => {
var newFeatures = new ArraY[Double](0)
for (i <- 0.until(numTrees)) {
// 获取特征所在的节点编号
val treePreDict = preDictModify(model.trees(i).topNode, line.features)
val treeArray = new ArraY[Double]((model.trees(i).numNodes + 1) / 2)
treeArray(treeLeafArray(i).indexOf(treePreDict)) = 1
newFeatures = newFeatures ++ treeArray
}
if (isAppend) {
new LabeledPoint(line.label, Vectors.dense(newFeatures ++ line.features.toArray))
} else {
new LabeledPoint(line.label, Vectors.dense(newFeatures))
}
})
Option(newData)
}
def loadModel(path: String): Option[GradientBoostedTreesModel] = {
try {
val in = new ObjecTinputStream(new FileInputStream(path))
val model = Option(in.readObject().asInstanceOf[GradientBoostedTreesModel])
in.close()
model
} catch {
case ex: ClassnotFoundException =>
println(ex.printStackTrace())
None
case ex: IOException =>
println(ex.printStackTrace())
None
case _: Throwable =>
throw new Exception
}
}
}
package com.fiveonevv.app.Model
import org.apache.spark.ml.classification.{LogisticRegression, LogisticRegressionModel}
import org.apache.spark.ml.evaluation.binaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.ml.tuning.{Crossvalidator, ParamGridBuilder}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.mllib.evaluation.MulticlassMetrics
import org.apache.spark.mllib.linalg.DenseVector
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
class GBDTLRModelProcess {
/**
* 本地读取数据预处理,处理成labeledPoint和DenseVector
* @param rdd 本地读取txt数据 包含features,label
* @return denseVectorrdD
*/
def localDataProcess(rdd:rDD[String]): RDD[(String, LabeledPoint, LabeledPoint, (Double, DenseVector))] = {
val denseVectorrdD = rdd.map{
line =>{
val arr = line.toString.split("t")
val userInfo = arr(0)
val nonFeatures = arr(1).split("#").map(_.toDoublE)
val features = arr(2).split("#").map(_.toDoublE)
val label = arr(3).toDouble
//创建一个稠密向量,labeledPoint格式GBT模型使用,后一组数据给特征离散化使用
(userInfo,LabeledPoint(label, new DenseVector(features)), LabeledPoint(label, new DenseVector(nonFeatures)),
(label, new DenseVector(nonFeatures)))
}
}
denseVectorrdD
}
/**
* yarn集群读取hive数据预处理,处理成labeledPoint和DenseVector
* @param rdd 读取hive dataFrame转换成rdd
* @return denseVectorrdD
*/
def hiveDataProcess(rdd:rDD[(String, ArraY[Double], ArraY[Double], String)]): RDD[(String, LabeledPoint, LabeledPoint,
(Double, DenseVector))] = {
val denseVectorrdD = rdd.map{
line => {
val userInfo = line._1
val numFeatures = line._2 // 数值型特征
val cateFeatures = line._3 // 类别型特征
val label = line._4.toDouble
//创建一个稠密向量,labeledPoint格式GBT模型使用,后一组数据给特征离散化使用
(userInfo,
LabeledPoint(label, new DenseVector(cateFeatures)),
LabeledPoint(label,new DenseVector(numFeatures)),
(label, new DenseVector(numFeatures)))
}
}
denseVectorrdD
}
/**
* 用gbdt将连续型的特征离散化处理
* @param Train 训练用数据
* @param test 测试用数据
* @return 离散化处理后的训练集和测试集
*/
def gbtFeatureProcess(Train:rDD[(String,LabeledPoint,LabeledPoint,(Double,DenseVector))],
test:rDD[(String,LabeledPoint,LabeledPoint,(Double,DenseVector))],
spark:SparkSession): (DataFrame, DataFramE) = {
// 离散特征
val TrainRDD = Train.map(x => (x._1,x._2)).map(x => ((x._1,x._2.label),x._2.features.asML))
val testRDD = test.map(x => (x._1,x._2)).map(x => ((x._1,x._2.label),x._2.features.asML))
// 连续型特征
val gbtTrain = Train.map(x => x._3)
val gbtTrainData = Train.map(x => (x._1,x._4))
val gbtTestData = test.map(x => (x._1,x._4))
// 连续特征离散化处理
val gbdtPreprocessor = new GBDTPreprocessor
val numTrees = 10
// treeLeafArray所有树的叶子节点
val (gbtModel, treeLeafArray) = gbdtPreprocessor.gbtTrain(gbtTrain,numTrees)
val gbtTrainRDD = gbdtPreprocessor.gbtFeaturePreDict(gbtTrainData,gbtModel,treeLeafArray,numTrees)
.map(x => ((x._1,x._2.label),x._2.features.asML))
val allTrainRDD = TrainRDD.join(gbtTrainRDD)
val TrainDF = spark.createDataFrame(allTrainRDD.map(x => (
x._1._1,
x._1._2,
x._2._1,
x._2._2)))
.toDF("userInfo","label","feature1","feature2")
val gbtTestRDD = gbdtPreprocessor.gbtFeaturePreDict(gbtTestData,gbtModel,treeLeafArray,numTrees)
.map(x => ((x._1,x._2.label),x._2.features.asML))
val allTestRDD = testRDD.join(gbtTestRDD)
val testDF = spark.createDataFrame(allTestRDD.map(x => (
x._1._1,
x._1._2,
x._2._1,
x._2._2
)))
.toDF("userInfo","label","feature1","feature2")
(TrainDF,testDF)
}
/**
* 构建管道训练流程:归一化、特征选择、网格搜索
* @param data 训练集
* @return pipelineModel
*/
def pipelineTrain(data:DataFramE): PipelineModel = {
data.persist()
val featureScaler = new MinMaxScaler()
.seTinputCol("features")
.setOutputCol("scaledFeatures")
val featureSELEctor = new ChiSqSELEctor()
.setFeaturesCol("scaledFeatures")
.setLabelCol("label")
.setNumTopFeatures(80)
.setOutputCol("SELEctedFeatures")
val lr = new LogisticRegression()
.setMaxIter(200)
.setElasticNetParam(1.0)
.setRegParam(0.001)
.setThreshold(0.5)
.setLabelCol("label")
.setFeaturesCol("SELEctedFeatures")
// build pipeline
val pipeline = new Pipeline()
.setStages(Array(featureScaler,featureSELEctor,lr))
// 网格搜索:特征数量、正则化系数、弹性网络参数、迭代次数
val paramGrid = new ParamGridBuilder()
.addGrid(featureSELEctor.numTopFeatures,Array(70))
.addGrid(lr.maxIter,Array(100))
.addGrid(lr.elasticNetParam,Array(1.0,0.0))
.addGrid(lr.regParam,Array(0.00075))
.build()
// 交叉验证
val cv = new Crossvalidator()
.setEstimator(pipelinE)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParAMMaps(paramGrid)
.setNumFolds(5)
val cvModel = cv.fit(data)
val pipelineModel = cvModel.bestModel.asInstanceOf[PipelineModel]
data.unpersist()
pipelineModel
}
/**
* pipeline的中间计算结果
* @return 归一化结果、特征选择结果、lr分类结果
*/
def pipelinePreDict(data: DataFrame,pipelineModel: PipelineModel): (DataFrame, DataFrame, DataFramE) = {
data.persist()
val featureScaleModel = pipelineModel.stages(0).asInstanceOf[MinMaxScalerModel]
val chiSqSELEctorModel = pipelineModel.stages(1).asInstanceOf[ChiSqSELEctorModel]
val lrModel = pipelineModel.stages(2).asInstanceOf[LogisticRegressionModel]
println("特征选择个数:",chiSqSELEctorModel.explainParam(chiSqSELEctorModel.numTopFeatures))
println("LR迭代次数:",lrModel.explainParam(lrModel.maxIter))
println("LR正则化系数:",lrModel.explainParam(lrModel.regParam))
println("LR分类阈值:",lrModel.explainParam(lrModel.threshold))
println("L1L2正则比例:",lrModel.explainParam(lrModel.elasticNetParam))
println("LR特征个数:",lrModel.numFeatures)
val scaledData = featureScaleModel.transform(data) //归一化
val SELEctedData = chiSqSELEctorModel.transform(scaledData) //特征选择
val preDictions = lrModel.transform(SELEctedData) //lr预测
data.unpersist()
(scaledData,SELEctedData,preDictions)
}
/**
* 特征合并
* @param data 数据集dataFrame 包含features1和features2
* @return 合并后的features的数据集
*/
def featureAssembler(data:DataFramE):DataFrame ={
val assembler = new VectorAssembler()
.seTinputCols(Array("feature1", "feature2"))
.setOutputCol("features")
val output = assembler.transform(data)
output
}
/**
* 评估模型的效果
* @return 准确率、加权精确率、加权召回率、F1值
*/
def multiClassEvaluate(data: RDD[(Double,DoublE)]): (Double,Double,Double,DoublE) = {
val metrics = new MulticlassMetrics(data)
val accuracy = metrics.accuracy
val weightedPrecision = metrics.weightedPrecision
val weightedRecall = metrics.weightedRecall
val f1 = metrics.weightedFMeasure
(accuracy,weightedPrecision,weightedRecall,f1)
}
}
package com.fiveonevv.app.core
import com.fiveonevv.app.Model.GBDTLRModelProcess
import com.fiveonevv.app.util.SparkSqlUtil
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.evaluation.binaryClassificationEvaluator
import org.apache.spark.ml.feature.{StringIndexer, VectorAssembler}
import org.apache.spark.ml.{Pipeline, PipelinestagE}
import org.apache.spark.sql.functions.{udf, _}
import scala.collection.mutable.ListBuffer
object GBDTLrTrain {
Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.oFF)
def main(args: ArraY[String]): Unit = {
val spark = SparkSqlUtil.initSparkSession(SparkSqlUtil.initSparkBuilder(),"GBDTLRTrainDemo")
// 从hive读取数据
val rawDF = spark
.sql("""SELECT * FROM tR_817_11845@p.telco_churn""")
.na.fill(0.0,Seq("@R_489_10586@lCharges"))
// 类别型字段和数值型字段
val cateCols = Array("gender","partner","dependents","phone_service","multiple_lines","internet_service","online_security",
"online_BACkup","device_protection","tech_support","streaming_tv","streaming_movies","paperless_billing","payment_method")
val numCols = Array("senior_citizen","tenure","monthly_charges","@R_489_10586@l_charges")
// 建立类别索引
val indexer = cateCols.map(colName => new StringIndexer().seTinputCol(colName).setOutputCol(s"${ColNamE}Index"))
//val encoder = new OneHotEncoderEstimator().seTinputCols(indexCols).setOutputCols(cateCols map (name => s"${name}Vec"))
// 合并类别型特征
val cateAssembler = new VectorAssembler().seTinputCols(cateCols.map(_ + "Index")).setOutputCol("cateFeatures")
// 合并数值型特征
val numAssembler = new VectorAssembler().seTinputCols(numCols).setOutputCol("numFeatures").setHandleInvalid("skip")
val stagesArray = new ListBuffer[Pipelinestage]()
for (StringIndexer <- indexer) {
stagesArray.append(StringIndexer)
}
stagesArray.append(cateAssembler,numAssembler)
val dataPrePipeline = new Pipeline().setStages(stagesArray.toArray)
// pipeline转换的结果中混杂了稀疏向量和稠密向量,统一转换为稠密向量
val toDense = udf((v: org.apache.spark.ml.linalg.Vector) => v.toDensE)
val processedRDD = dataPrePipeline.fit(rawDF).transform(rawDF)
.SELEctExpr("customerid","numFeatures","cateFeatures","case when churn = 'Yes' then 1.0 else 0.0 end as label")
.withcolumn("cateDenseFeatures",toDense(col("cateFeatures")))
.SELEctExpr("customerid","numFeatures","cateDenseFeatures cateFeatures","label")
.rdd.map(x => (
x(0).toString,
// ml向量不能直接转换为mllib向量,先转成Array然后再转成mllib的稠密向量
x(1).asInstanceOf[org.apache.spark.ml.linalg.Vector].toArray,
x(2).asInstanceOf[org.apache.spark.ml.linalg.DenseVector].toArray,
x(3).toString)
)
val Array(TrainRDD, testRDD) = processedRDD.randomSplit(weights=Array(0.7,0.3),1234)
val modelProcess = new GBDTLRModelProcess
val denseVectorTrainRDD = modelProcess.hiveDataProcess(TrainRDD)
val denseVectorTestRDD = modelProcess.hiveDataProcess(testRDD)
//gbt训练 将连续型特征离散化并和原离散特征合并成新特征
val (gbtFeatureTrainDF, gbtFeatureTestDF) = modelProcess.gbtFeatureProcess(denseVectorTrainRDD, denseVectorTestRDD, spark)
val unionTrainDF = modelProcess.featureAssembler(gbtFeatureTrainDF) //gbt离散化后特征合并原特征
val unionTestDF = modelProcess.featureAssembler(gbtFeatureTestDF)
//训练数据上采样 正样本复制2倍
val positiveDF = unionTrainDF.filter("label=1")
val negativeDF = unionTrainDF.filter("label=0")
val upPositiveDF = positiveDF//.union(positiveDF).union(positiveDF)
val upSampleDF = negativeDF.union(upPositiveDF)
//管道训练和预测
val pipelineModel = modelProcess.pipelineTrain(upSampleDF)
val (scaledDF, SELEctedDF, preDictions) = modelProcess.pipelinePreDict(unionTestDF, pipelineModel)
// 评估模型效果
preDictions.SELEct("customerid","label","rawPreDiction","probability","preDiction").show(50)
val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
val areaUnderROC = evaluator.setMetricName("areaUnderROC").evaluate(preDictions)
val areaUnderPR = evaluator.setMetricName("areaUnderPR").evaluate(preDictions)
// 检查模型在测试集上的表现
val lp = preDictions.SELEct( "label", "preDiction")
val count@R_489_10586@l = preDictions.count()
val correct = lp.filter(lp("label") === lp("preDiction")).count() // 预测正确的样本数量
lp.show(200)
val ratioCorrect = correct.toDouble / count@R_489_10586@l.toDouble
// 1 流失 0 留存
val truePositive = lp.filter(lp("preDiction") === 1.0).filter(lp("label") === lp("preDiction")).count() // 真流失用户
val falsePositive = lp.filter(lp("preDiction") === 1.0).filter(lp("label") =!= lp("preDiction")).count() // 假流失用户
val trueNegative = lp.filter(lp("preDiction") === 0.0).filter(lp("label") === lp("preDiction")).count() // 真留存用户
val falseNegative = lp.filter(lp("preDiction") === 0.0).filter(lp("label") =!= lp("preDiction")).count() // 假留存用户
// 真正例率、假正例率
val tpr = truePositive.toDouble / (truePositive + falseNegativE)
val fpr = falsePositive.toDouble / (falsePositive + trueNegativE)
// 流失用户查准率
val positivePrecision = truePositive.toDouble / (truePositive + falsePositivE)
// 流失用户召回率
val positiveRecall = truePositive.toDouble / (truePositive + falseNegativE)
// 留存用户查准率
val negativePrecision = trueNegative.toDouble / (trueNegative + falseNegativE)
// 留存用户召回率
val negativeRecall = trueNegative.toDouble / (trueNegative + falsePositivE)
println(s"预测样本总数: $count@R_489_10586@l")
println(s"正确预测样本数量: $correct")
println(s"模型准确率: $ratioCorrect")
println(s"模型ROC值:$areaUnderROC")
println(s"模型PR值:$areaUnderPR")
println(s"预测结果中真流失用户个数:$truePositive")
println(s"预测结果中假流失用户个数:$falsePositive")
println(s"预测结果中真流失用户比例: $tpr")
println(s"预测结果中假流失用户比例: $fpr")
println(s"流失用户查准率:$positivePrecision")
println(s"流失用户召回率:$positiveRecall")
println(s"留存用户查准率:$negativePrecision")
println(s"留存用户召回率:$negativeRecall")
spark.stop()
}
}
scala> val evaluator = new BinaryClassificationEvaluator().setLabelCol("label")
evaluator: org.apache.spark.ml.evaluation.binaryClassificationEvaluator = binEval_f0b527f4e73d
scala> val areaUnderROC = evaluator.setMetricName("areaUnderROC").evaluate(preDictions)
areaUnderROC: Double = 0.8306899086101781
scala> val areaUnderPR = evaluator.setMetricName("areaUnderPR").evaluate(preDictions)
areaUnderPR: Double = 0.6296575868466127
scala> val lp = preDictions.SELEct( "label", "preDiction")
lp: org.apache.spark.sql.DataFrame = [label: double, preDiction: double]
scala> val count@R_489_10586@l = preDictions.count()
count@R_489_10586@l: Long = 2095
scala> val truePositive = lp.filter(lp("preDiction") === 1.0).filter(lp("label") === lp("preDiction")).count() // 真流失用户
truePositive: Long = 270
scala> val falsePositive = lp.filter(lp("preDiction") === 1.0).filter(lp("label") =!= lp("preDiction")).count() // 假流失用户
falsePositive: Long = 146
scala> val trueNegative = lp.filter(lp("preDiction") === 0.0).filter(lp("label") === lp("preDiction")).count() // 真留存用户
trueNegative: Long = 1397
scala> val falseNegative = lp.filter(lp("preDiction") === 0.0).filter(lp("label") =!= lp("preDiction")).count() // 假留存用户
falseNegative: Long = 282
scala> val tpr = truePositive.toDouble / (truePositive + falseNegativE)
tpr: Double = 0.4891304347826087
scala> val fpr = falsePositive.toDouble / (falsePositive + trueNegativE)
fpr: Double = 0.09462086843810759
scala> val positivePrecision = truePositive.toDouble / (truePositive + falsePositivE)
positivePrecision: Double = 0.6490384615384616
scala> val positiveRecall = truePositive.toDouble / (truePositive + falseNegativE)
positiveRecall: Double = 0.4891304347826087
scala> val negativePrecision = trueNegative.toDouble / (trueNegative + falseNegativE)
negativePrecision: Double = 0.8320428826682549
scala> val negativeRecall = trueNegative.toDouble / (trueNegative + falsePositivE)
negativeRecall: Double = 0.9053791315618924
scala> println(s"预测样本总数: $count@R_489_10586@l")
预测样本总数: 2095
scala> println(s"正确预测样本数量: $correct")
正确预测样本数量: 1667
scala> println(s"模型准确率: $ratioCorrect")
模型准确率: 0.7957040572792363
scala> println(s"模型ROC值:$areaUnderROC")
模型ROC值:0.8306899086101781
scala> println(s"模型PR值:$areaUnderPR")
模型PR值:0.6296575868466127
scala> println(s"预测结果中真流失用户个数:$truePositive")
预测结果中真流失用户个数:270
scala> println(s"预测结果中假流失用户个数:$falsePositive")
预测结果中假流失用户个数:146
scala> println(s"预测结果中真流失用户比例: $tpr")
预测结果中真流失用户比例: 0.4891304347826087
scala> println(s"预测结果中假流失用户比例: $fpr")
预测结果中假流失用户比例: 0.0946208
以上是大佬教程为你收集整理的Kaggle电信客户流失预测——基于GBDT融合LR全部内容,希望文章能够帮你解决Kaggle电信客户流失预测——基于GBDT融合LR所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。