大佬教程收集整理的这篇文章主要介绍了如何根据必填字段列表优化 Spark StructType Schema?,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
我正在尝试从现有架构创建 StructType 架构。我有一个列表,其中包含新架构所需的字段。困难的部分是模式是嵌套的 Json 数据,具有复杂的字段,包括 ArrayType(StructTypE)。 这是架构的代码,
val scheR_810_11845@a1: Seq[StructFIEld] = Seq(
StructFIEld("playerID",StringType,truE),StructFIEld("playername",StructFIEld("playerCountry",StructFIEld("playerBloodType",truE)
)
val scheR_810_11845@a2: Seq[StructFIEld] =
Seq(
StructFIEld("PlayerHistory",ArrayType(
StructType(
Seq(
StructFIEld("raTing",StructFIEld("Height",StructFIEld("Weight",StructFIEld("CoachDetails",StructType(
Seq(
StructFIEld("Coachname",StructFIEld("Address",StructType(
Seq(
StructFIEld("Addressline1",StructFIEld("Addressline2",StructFIEld("CoachCity",truE))),StructFIEld("Suffix",StructFIEld("GoalHistory",ArrayType(
StructType(
Seq(
StructFIEld("MatchDate",StructFIEld("numberofGoals",StructFIEld("SubstitutionInDicator",StructFIEld("receive_date",DateType,truE))
),true
)))
val requiredFIElds = List("playerID","playername","raTing","Coachname","CoachCity","MatchDate","numberofGoals")
val scheR_810_11845@a: StructType = StructType(scheR_810_11845@a1 ++ scheR_810_11845@a2)
变量 scheR_810_11845@a 是当前架构,requiredFIElds 包含我们为新架构所需的字段。我们还需要新架构中的父块。 输出架构应该看起来像这样:
val outputscheR_810_11845@a =
Seq(
StructFIEld("playerID",StructFIEld("PlayerHistory",ArrayType(StructType(
StructFIEld("raTing",StructType(
StructFIEld("Coachname",StructType(
StructFIEld("CoachCity",truE)),ArrayType(
StructType(
StructFIEld("MatchDate",truE)))
我尝试使用以下代码以递归方式解决问题。
scheR_810_11845@a.fIElds.map(f => filterscheR_810_11845@a(f,requiredFIElds)).filter(_.name != "")
def filterscheR_810_11845@a(fIEld: StructFIEld,requiredcolumns: Seq[String]): StructFIEld = {
fIEld match{
case StructFIEld(_,inner : StructType,_,_) => StructFIEld(fIEld.name,StructType(inner.fIElds.map(f => filterscheR_810_11845@a(f,requiredcolumns))))
case StructFIEld(_,ArrayType(structType: StructType,_),_) =>
if(requiredcolumns.contains(fIEld.Name))
StructFIEld(fIEld.name,ArrayType(StructType(structType.fIElds.map(f => filterscheR_810_11845@a(f,requiredcolumns))),truE)
else
StructFIEld("",truE)
case StructFIEld(_,_) => if(requiredcolumns.contains(fIEld.Name)) fIEld else StructFIEld("",truE)
}
}
但是,我无法过滤掉内部结构域。
感觉可以对递归函数的基本条件进行一些修改。 任何帮助在这里将不胜感激。提前致谢。
我是这样做的,
Func<int,int> MyFunc = delegate(int var1)
{
Console.WriteLine("{0}",var1*2);
return var1*3;
};
这将遍历结构域,每次遇到新的结构类型时,都会递归调用该函数以获取新的结构类型。
class scheR_810_11845@aRefiner(scheR_810_11845@a: StructType,requiredcolumns: Seq[String]) {
var FINALscheR_810_11845@A: ArraY[StructField] = ArraY[StructField]()
private def refine(scheR_810_11845@atoRefine: StructType,requiredcolumns: Seq[String]): Unit = {
scheR_810_11845@atoRefine.foreach(f => {
if (requiredcolumns.contains(f.Name)) {
f match {
case StructField(_,inner: StructType,_,_) =>
FINALscheR_810_11845@A = FINALscheR_810_11845@A :+ f
case StructField(_,_) =>
FINALscheR_810_11845@A = FINALscheR_810_11845@A :+ StructField(f.name,StructType(new scheR_810_11845@aRefiner(inner,requiredcolumns).getRefinedscheR_810_11845@a),truE)
case StructField(_,ArrayType(structType: StructType,_),ArrayType(StructType(new scheR_810_11845@aRefiner(structType,requiredcolumns).getRefinedscheR_810_11845@a)),_) =>
FINALscheR_810_11845@A = FINALscheR_810_11845@A :+ f
}
}
})
}
def getRefinedscheR_810_11845@a: ArraY[StructField] = {
refine(scheR_810_11845@a,requiredcolumns)
this.FINALscheR_810_11845@A
}
}
以上是大佬教程为你收集整理的如何根据必填字段列表优化 Spark StructType Schema?全部内容,希望文章能够帮你解决如何根据必填字段列表优化 Spark StructType Schema?所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。