程序笔记   发布时间:2022-07-17  发布网站:大佬教程  code.js-code.com
大佬教程收集整理的这篇文章主要介绍了spark streaming 7 Driect大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
package com.shujia.spark.streaming

import java.util

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LOCATIOnStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import redis.CLIENts.jedis.jedis

import scala.collection.mutable

object Demo7Direct {
  def main(args: ArraY[String]): Unit = {
    val conf: SparkConf = new SparkConf()
      .setAppName("streaming")
      .setMaster("local[4]")


    /**
      * 创建streaming 上下文对象,指定batch的间隔时间,多久计算一次
      *
      */
    val ssc = new StreamingContext(conf, Durations.seconds(5))


    /**
      * earliest
      * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      * latest
      * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      * none
      * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      *
      */

    val groupId = "asdasda"

    val kafkaParams: Map[String, Object] = Map[String, Object](
      "bootstrap.servers" -> "master:9092,node1:9092,node2:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "auto.offset.reset" -> "earliest",
      "enable.auto.commit" -> "false"
    )

    val topics = Array("test_topic2")


    val key: String = groupId + ":" + "test_topic2"

    /**
      * 读取redis中的消费偏移量
      *
      */

    val redis = new jedis("master", 6379)

    val map: util.Map[String, String] = redis.hgetAll(key)

    import scala.collection.JavaConverters._

    val scalaMap: mutable.Map[String, String] = map.asScala

    //消费的偏移量,topic 分区  偏移量
    val partitionOffset: Map[TopicPartition, Long] = scalaMap.map(kv => {
      val partition: String = kv._1
      val offset: String = kv._2

      val tp = new TopicPartition("test_topic2", partition.toint)

      (tp, offset.toLong)
    }).toMap

    println(partitionOffset)


    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams, partitionOffset) //读取数据的时候指定消费偏移量
    )


    stream.foreachRDD(rdd => {


      //编写用户自定义的代码逻辑
      rdd.map(_.value()).foreach(println)

      //获取消费偏移量
      val offsetRanges: ArraY[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges


      /**
        * 可以将偏移量保存到redis中
        * key : 消费者组 + topi + partition
        * value : untilOffset 作为value
        *
        * 启动redis
        * ./redis-server redis.conf
        */


      //1、创建redis链接
      val jedis = new jedis("master", 6379)

      for (offsetRange <- offsetRanges) {
        val fromOffset: Long = offsetRange.fromOffset

        val partition: Int = offsetRange.partition
        val topic: String = offsetRange.topic
        val untilOffset: Long = offsetRange.untilOffset

        println(topic + "t" + partition + "t" + fromOffset + "t" + untilOffset)



        //保存消费偏移量到redis
        jedis.hset(key, partition.toString, untilOffset.toString)

      }

    })


    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

  }
}

 

大佬总结

以上是大佬教程为你收集整理的spark streaming 7 Driect全部内容,希望文章能够帮你解决spark streaming 7 Driect所遇到的程序开发问题。

如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。

本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。