程序笔记   发布时间:2022-07-19  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了flink-9-算子(Operators)大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。

flink-9-算子(Operators)

 

 

 1.map:

flink-9-算子(Operators)

 

 

 

flink-9-算子(Operators)

 

调用:

    val stream = environment.readTextFile("data/access.log")
    println(stream.parallelism)
    val accessStream = stream.map( x => {
      val splits = x.split(",")
      val time = splits(0).trim.toLong
      val domain = splits(1).trim
      val traffices = splits(2).trim.toDouble
      Access(time, domain, traffices)
    })
    accessStream.print()

flink-9-算子(Operators)

 

 

 分组求和:

    val stream = environment.readTextFile("data/access.log")
    println(stream.parallelism)
    val accessStream = stream.map( x => {
      val splits = x.split(",")
      val time = splits(0).trim.toLong
      val domain = splits(1).trim
      val traffices = splits(2).trim.toDouble
      Access(time, domain, traffices)
    })
    accessStream.keyBy(1).sum(2).print()  // 过时的
    accessStream.keyBy(_.domain).sum(2).print()  // 不过时的

flink-9-算子(Operators)

 

 

 看看map底层源码:

flink-9-算子(Operators)

 再看看map(mapper)的map是干啥的:

flink-9-算子(Operators)

 

 看这个map方法,大部分算子都用了transform,所以我们可以直接用它:

 

 

flink-9-算子(Operators)

 

下面我们来个自定义的map:

 

 

 

 刚才那个跑完就关了,看不到web,下面改改代码

    val stream = environment.socketTextStream("182.92.99.53", 8081)
    stream.map(x => {
      x.toInt * 10
    }).print()

flink-9-算子(Operators)

 

 

 我们先观察一下print的写法:(有个addsink方法)

flink-9-算子(Operators)

 

 

 下面我们来改map的源码:

注意这个方法,java的多了个类型:

flink-9-算子(Operators)

 

 

 我们去找scala的写法:

flink-9-算子(Operators)

 

 

 下面我们根据java的map方法写我们自己实现的方法:

flink-9-算子(Operators)

 

 

 第一步:

flink-9-算子(Operators)

 

 

 根据这个我们也能发现:

flink-9-算子(Operators)

 

 

 

flink-9-算子(Operators)

具体实现:

flink-9-算子(Operators)

 

结果:

 

 

flink-9-算子(Operators)

 

 

 

filter的写法:

flink-9-算子(Operators)

flatMap的:

 

flink-9-算子(Operators)

 

 

 

再看看更底层的:(这个是最底层的)

flink-9-算子(Operators)

flink-9-算子(Operators)

 

flink-9-算子(Operators)

flink-9-算子(Operators)

flink-9-算子(Operators)

 来实现一下:(接口的都是没实现的,所以我们这里继承一个实现类,再继承这个单操作符的接口)

flink-9-算子(Operators)

 

 

 参其他算子的源码,我们可以知道,map是一进一出的,所以我们这里是要指定的,这里方便演示,类型我们都弄成Int来做:

class ZxStreAMMap extends AbstractStreamOperator[Int] with OneInputStreamOperator[Int, Int] {
  override def procesSELER_286_11845@ent(element: StreamRecord[Int]): Unit = {
    val result = element.getValue.toInt * 10
    element.replace(result)
    output.collect(element)
  }
}

调用报错:

stream.transform("ZxMap2", new ZxStreAMMap)

flink-9-算子(Operators)

 

 

 原因是这里进来是一个String类型:

flink-9-算子(Operators)

 

 

 所以我们要改一下:

flink-9-算子(Operators)

 

 

 结果ok:

stream.transform("ZxMap2", new ZxStreAMMap).print()

flink-9-算子(Operators)

 

 

 

FlatMap:

flink-9-算子(Operators)

 

 

 

flink-9-算子(Operators)

看结果:(pk被干掉了不输出)

flink-9-算子(Operators)

 

 

 自定义实现:

class ZxStreamFlatMap extends AbstractStreamOperator[String] with OneInputStreamOperator[String, String] {
  override def procesSELER_286_11845@ent(element: StreamRecord[String]): Unit = {
    val splits = element.getValue.split(",")
    splits.filter(_ != "pk").map(x => {
      output.collect(element.replace(X))
    })
  }
}

 

KeyBy:

flink-9-算子(Operators)

来份数据:(省,市,访问量)

flink-9-算子(Operators)

 

 

flink-9-算子(Operators)

因为有中间状态的,看最后几行就行了:

flink-9-算子(Operators)

 

 Tuple这种方式就不过时了:(建议用case class来做,现在的下标不清晰)

flink-9-算子(Operators)

 

 

flink-9-算子(Operators)

用KeySELEctor方式:

flink-9-算子(Operators)

进来是一个tuple,key是两个字段所以也是tuple

flink-9-算子(Operators)

 

 

flink-9-算子(Operators)

实现一下:

flink-9-算子(Operators)

用case class的方式:(这种方式更直观,推荐用)

flink-9-算子(Operators)

 

 

flink-9-算子(Operators)

最后是转成KeyedStream类型的:

flink-9-算子(Operators)

 

大佬总结

以上是大佬教程为你收集整理的flink-9-算子(Operators)全部内容,希望文章能够帮你解决flink-9-算子(Operators)所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。