大佬教程收集整理的这篇文章主要介绍了spark streaming 7 Driect,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
package com.shujia.spark.streaming import java.util import org.apache.spark.SparkConf import org.apache.spark.streaming.{Durations, StreamingContext} import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka010._ import org.apache.spark.streaming.kafka010.LOCATIOnStrategies.PreferConsistent import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe import redis.CLIENts.jedis.jedis import scala.collection.mutable object Demo7Direct { def main(args: ArraY[String]): Unit = { val conf: SparkConf = new SparkConf() .setAppName("streaming") .setMaster("local[4]") /** * 创建streaming 上下文对象,指定batch的间隔时间,多久计算一次 * */ val ssc = new StreamingContext(conf, Durations.seconds(5)) /** * earliest * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * latest * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 * none * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 * */ val groupId = "asdasda" val kafkaParams: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "master:9092,node1:9092,node2:9092", "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "earliest", "enable.auto.commit" -> "false" ) val topics = Array("test_topic2") val key: String = groupId + ":" + "test_topic2" /** * 读取redis中的消费偏移量 * */ val redis = new jedis("master", 6379) val map: util.Map[String, String] = redis.hgetAll(key) import scala.collection.JavaConverters._ val scalaMap: mutable.Map[String, String] = map.asScala //消费的偏移量,topic 分区 偏移量 val partitionOffset: Map[TopicPartition, Long] = scalaMap.map(kv => { val partition: String = kv._1 val offset: String = kv._2 val tp = new TopicPartition("test_topic2", partition.toint) (tp, offset.toLong) }).toMap println(partitionOffset) val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String]( ssc, PreferConsistent, Subscribe[String, String](topics, kafkaParams, partitionOffset) //读取数据的时候指定消费偏移量 ) stream.foreachRDD(rdd => { //编写用户自定义的代码逻辑 rdd.map(_.value()).foreach(println) //获取消费偏移量 val offsetRanges: ArraY[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges /** * 可以将偏移量保存到redis中 * key : 消费者组 + topi + partition * value : untilOffset 作为value * * 启动redis * ./redis-server redis.conf */ //1、创建redis链接 val jedis = new jedis("master", 6379) for (offsetRange <- offsetRanges) { val fromOffset: Long = offsetRange.fromOffset val partition: Int = offsetRange.partition val topic: String = offsetRange.topic val untilOffset: Long = offsetRange.untilOffset println(topic + "t" + partition + "t" + fromOffset + "t" + untilOffset) //保存消费偏移量到redis中 jedis.hset(key, partition.toString, untilOffset.toString) } }) ssc.start() ssc.awaitTermination() ssc.stop() } }
以上是大佬教程为你收集整理的spark streaming 7 Driect全部内容,希望文章能够帮你解决spark streaming 7 Driect所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。