区块链技术博客
www.b2bchain.cn

Spark DStream

这篇文章主要介绍了Spark DStream的讲解,通过具体代码实例进行22731 讲解,并且分析了Spark DStream的详细步骤与相关技巧,需要的朋友可以参考下https://www.b2bchain.cn/?p=22731

本文实例讲述了2、树莓派设置连接WiFi,开启VNC等等的讲解。分享给大家供大家参考文章查询地址https://www.b2bchain.cn/7039.html。具体如下:

目录

    • 最后一次更新于 2020年10月26日 星期一 11:15:41
      • DStream概述
      • DStream特点
      • DStream创建
        • 通过监听端口创建DStream
        • 通过队列创建DStream
        • 通过自定义数据源创建DStream
        • 通过Kafka数据源创建DStream
          • spark-streaming-kafka-0-8_2.11 模式
            • 通过ReceiverAPI连接kafka数据源
            • 通过DirectAPI连接kafka数据源
      • DStream转换
        • 特殊算子Transform
        • 无状态转化操作

最后一次更新于 2020年10月26日 星期一 11:15:41

DStream概述

Discretized Stream是Spark Streaming的基础抽象。在内部实现上,DStream是一系列连续的RDD来表示,每个RDD含有一段时间间隔内的数据,对这些RDD的转换是由Spark引擎来计算的, DStream的操作隐藏了的大多数的细节, 只提供给开发者了方便实用的高级 API

DStream特点

1 一旦StreamingContext已经启动, 则不能再添加新的 streaming computations

2 一旦一个StreamingContext已经停止(StreamingContext.stop()), 不能再重启

3 在一个 JVM 内, 同一时间只能启动一个StreamingContext

4 stop() 的方式停止StreamingContext, 也会把SparkContext停掉. 如果仅仅想停止StreamingContext, 则应该这样: stop(false)

5 一个SparkContext可以重用去创建多个StreamingContext, 前提是以前的StreamingContext已经停掉,并且SparkContext没有被停掉

DStream创建

通过监听端口创建DStream

使用netcat工具向9999端口不断的发送数据,通过SparkStreaming读取端口数据并统计不同单词出现的次数

//windows中命令 nc -lp 8888 //linux中命令 nc -lk 8888 
<dependency>     <groupId>org.apache.spark</groupId>     <artifactId>spark-streaming_2.11</artifactId>     <version>2.1.1</version>     <scope>compile</scope> </dependency> 
package com.xcu.bigdata.spark.streaming  import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext}  /**  * @Package : com.xcu.bigdata.spark.streaming  * @Author :   * @Date : 2020 10月 星期四  * @Desc : 流式WordCount  */ object SparkStreaming01_WordCount {   def main(args: Array[String]): Unit = {     // 创建配置文件对象     // 注意:SparkStreaming程序执行至少需要2个线程,所以不能设置为local     val conf = new SparkConf().setAppName("SparkStreaming01_WordCount").setMaster("local[*]")     // 创建SparkStreaming程序执行入口     // 设置采集周期为3s     val ssc = new StreamingContext(conf, Seconds(3))     // 从指定的端口获取数据     val socketDS = ssc.socketTextStream("localhost", 8888)     // 业务操作     val faltMapDS = socketDS.flatMap(_.split(" "))     val mapDS = faltMapDS.map((_, 1))     val reduceDS = mapDS.reduceByKey(_ + _)     reduceDS.print     // 启动采集器     ssc.start()     // 等待采集结束之后,终止程序     ssc.awaitTermination()   } } 

注意:如果程序运行时,log日志太多,可以将spark conf目录下的log4j文件里面的日志拷贝到当前项目的resources目录下,并将级别改成WARN。

log4j.properties

log4j.rootCategory=ERROR, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.target=System.err log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n  # Set the default spark-shell log level to ERROR. When running the spark-shell, the # log level for this class is used to overwrite the root logger's log level, so that # the user can have different defaults for the shell and regular Spark apps. log4j.logger.org.apache.spark.repl.Main=ERROR  # Settings to quiet third party logs that are too verbose log4j.logger.org.spark_project.jetty=ERROR log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR log4j.logger.org.apache.parquet=ERROR log4j.logger.parquet=ERROR  # SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR 

通过队列创建DStream

package com.xcu.bigdata.spark.streaming  import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable  /**  * @Package : com.xcu.bigdata.spark.streaming  * @Author :   * @Date : 2020 10月 星期四  * @Desc :  */ object SparkStreaming02_RDDQueue {   def main(args: Array[String]): Unit = {     //创建配置文件对象     //注意Streaming程序执行至少需要2个线程,所以不能设置为local     val conf: SparkConf = new SparkConf().setAppName("SparkStreaming02_RDDQueue").setMaster("local[*]")     //创建SparkStreaming上下文环境对象     //指定采集周期为3s     val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))     //创建可变队列,里面放的是RDD     val queue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]()     //从队列中采集数据,获取DS     val queueDS: InputDStream[Int] = ssc.queueStream(queue, false) //false在一个采集周期可以采集多个RDD     //处理采集到的数据     val resDS: DStream[(Int, Int)] = queueDS.map((_, 1)).reduceByKey(_ + _)     resDS.print()     //启动采集器     ssc.start()     //循环创建RDD,并将创建的RDD放到队列里     for (i <- 1 to 5) {       queue.enqueue(ssc.sparkContext.makeRDD(6 to 10))       Thread.sleep(2000)     }     //等待采集结束之后,终止程序     ssc.awaitTermination()    } } 

通过自定义数据源创建DStream

package com.xcu.bigdata.spark.streaming  import java.io.{BufferedReader, InputStreamReader} import java.net.{ConnectException, Socket} import java.nio.charset.StandardCharsets  import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.receiver.Receiver import org.apache.spark.streaming.{Seconds, StreamingContext}   /**  * @Package : com.xcu.bigdata.spark.streaming  * @Author :   * @Date : 2020 10月 星期四  * @Desc : 通过自定义数据源方式创建DStream  */ object SparkStreaming03_CustomerReceiver {   def main(args: Array[String]): Unit = {     //创建配置文件对象     //注意Streaming程序执行至少需要2个线程,所以不能设置为local     val conf: SparkConf = new SparkConf().setAppName("SparkStreaming03_CustomerReceiver").setMaster("local[*]")     //创建SparkStreaming上下文环境对象     //指定采集周期为3s     val ssc: StreamingContext = new StreamingContext(conf, Seconds(3))     //从端口中读取数据     val myDS: ReceiverInputDStream[String] = ssc.receiverStream(new MyReceiver("localhost", 7777))     //扁平化     val flatMapDS: DStream[String] = myDS.flatMap(_.split(" "))     //结构转换     val mapDS: DStream[(String, Int)] = flatMapDS.map((_, 1))     //聚合     val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_ + _)     //结果打印     reduceDS.print     //启动采集器     ssc.start()     //等待采集器采集结束之后,终止程序     ssc.awaitTermination()   } }  //Receiver[T] T泛型即读取数据类型 class MyReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_ONLY) {   private var socket: Socket = _    //真正处理数据的逻辑   def receive() {     try {       //创建连接       socket = new Socket(host, port)       //根据连接对象获取输入流       //socket.getInputStream字节流-->InputStreamReader转换流-->BufferedReader缓存字符流       val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))       //定义一个变量,用于接收读取到的一行数据       var line: String = null       while ((line = reader.readLine()) != null) {         store(line)       }     } catch {       case e: ConnectException =>         restart(s"Error connecting to $host:$port", e)         return     } finally {       onStop()     }   }    override def onStart(): Unit = {     new Thread("Socket Receiver") {       setDaemon(true) //设置后台守护线程        override def run() {         receive()       }     }.start()   }    override def onStop(): Unit = {     synchronized {       if (socket != null) {         socket.close()         socket = null       }     }   } } 

通过Kafka数据源创建DStream

spark-streaming-kafka-0-8_2.11 模式
通过ReceiverAPI连接kafka数据源

1 需要一个专门的Executor去接收数据,然后发送给其他的Executor做计算。存在的问题,接收数据的Executor和计算的Executor速度会有所不同,特别在接收数据的Executor速度大于计算的Executor速度,会导致计算数据的节点内存溢出。

2 offset维护在zookeeper中,程序停止后,继续生产数据,再次启动程序,仍然可以继续消费。

<dependency>     <groupId>org.apache.spark</groupId>     <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>     <version>2.1.1</version> </dependency> 
package com.xcu.bigdata.spark.streaming  import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}  /**  * @Package : com.xcu.bigdata.spark.streaming  * @Author :   * @Date : 2020 10月 星期六  * @Desc : 通过ReceiverAPI连接kafka数据源,获取数据  *       需要导入依赖  */ object SparkStreaming04_ReceiverAPI {   def main(args: Array[String]): Unit = {     //创建配置文件对象     //注意:SparkStreaming程序执行至少需要2个线程,所以不能设置为local     val conf: SparkConf = new SparkConf().setAppName("SparkStreaming04_ReceiverAPI").setMaster("local[*]")     //创建SparkStreaming上下文环境对象,并指定采集周期为3s     var ssc: StreamingContext = new StreamingContext(conf, Seconds(3))     //连接kafka,创建DStream     val kafkaDStream: ReceiverInputDStream[(String, String)] = KafkaUtils.createStream(       ssc,       "hadoop201:2181,hadoop202:2181,hadoop203:2181",       "test0105",       Map("bigdata-0105" -> 2)     )     //获取kafka中的消息,我们只需要v的部分     val lineDS: DStream[String] = kafkaDStream.map(_._2)     //扁平化     val flatMapDS: DStream[String] = lineDS.flatMap(_.split(" "))     //结构转换     val mapDS: DStream[(String, Int)] = flatMapDS.map((_, 1))     //聚合     val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_ + _)     //打印输出     reduceDS.print     //开启任务     ssc.start()     ssc.awaitTermination()   } } 
通过DirectAPI连接kafka数据源

这种API是由计算的Executor来主动消费Kafka的数据,速度由自身控制。

  • 自动维护offset

offset维护在checkpoint中,获取StreamingContext的方式也改变了,但是目前这种方式会丢失消息

package com.xcu.bigdata.spark.streaming  import kafka.serializer.StringDecoder import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.kafka.KafkaUtils import org.apache.spark.streaming.{Seconds, StreamingContext}  /**  * @Package : com.xcu.bigdata.spark.streaming  * @Author :   * @Date : 2020 10月 星期六  * @Desc : 通过DirectAPI连接kafka数据源  *       缺点:  *       1 小文件过多  *       2 在checkpoint中,只记录最后offset的时间戳,再次启动程序时,会从这个时间到当前时间,把所有的周期都执行一次  */  object SparkStreaming05_DirectAPI_Auto {   def main(args: Array[String]): Unit = {     //修改StreamingContext对象的获取方式,先从检查点cp中获取,如果检查点cp没有,通过函数创建。     val ssc: StreamingContext = StreamingContext.getActiveOrCreate("/cp", () => getStreamingContext)     //启动采集器     ssc.start()     //等到采集器结束,终止程序     ssc.awaitTermination()   }    def getStreamingContext(): StreamingContext = {     //创建配置文件对象     //注意:SparkStreaming程序执行至少需要2个线程,所以不能设置为local     val conf: SparkConf = new SparkConf().setAppName("SparkStreaming05_DirectAPI_Auto01").setMaster("local[*]")     //创建SparkStreaming上下文环境对象,指定采集周期为3s     val ssc = new StreamingContext(conf, Seconds(3))     //设置检测点路径     ssc.checkpoint("/cp")     //准备kafka参数     val kafkaParams: Map[String, String] = Map[String, String](       ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop201:9092,hadoop202:9092,hadoop203:9092",       ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"     )     val kafkaDstream: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](       ssc,       kafkaParams,       Set("bigdata-0105")     )     //获取kafka中的消息,我们只需要v的部分     val lineDS: DStream[String] = kafkaDstream.map(_._2)     //扁平化     val flatMapRDD: DStream[String] = lineDS.flatMap(_.split(" "))     //结构转换     val mapDS: DStream[(String, Int)] = flatMapRDD.map((_, 1))     //聚合     val reduceDS: DStream[(String, Int)] = mapDS.reduceByKey(_ + _)     //打印输出     reduceDS.print     ssc   } } 
  • 手动维护offset
package com.xcu.bigdata.spark.streaming  import kafka.common.TopicAndPartition import kafka.message.MessageAndMetadata import kafka.serializer.StringDecoder import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.{SparkConf} import org.apache.spark.streaming.kafka.{HasOffsetRanges, KafkaUtils, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext}  /**  * @Package : com.xcu.bigdata.spark.streaming  * @Author :   * @Date : 2020 10月 星期六  * @Desc : DirectAPI连接kafka数据源(手动维护offset)  */ object SparkStreaming07_DirectAPI_Hander {   def main(args: Array[String]): Unit = {     //创建配置文件对象     //注意:SparkStreaming执行至少需要两个线程     val conf: SparkConf = new SparkConf().setAppName("SparkStreaming07_DirectAPI_Hander").setMaster("local[*]")     //创建SparkStreaming上下文环境对象     //执行周期为3s     val ssc = new StreamingContext(conf, Seconds(3))     //准备kafka参数     val kafkaParams: Map[String, String] = Map[String, String](       ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop201:9092,hadoop202:9092,hadoop203:9092",       ConsumerConfig.GROUP_ID_CONFIG -> "bigdata"     )     //获取上一次消费的位置(偏移量)     //实际项目中,为了保证数据精准的一致性,我们对数据进行消费处理之后,通常将偏移量保存至有事务的存储中,如MySQL     val fromOffsets: Map[TopicAndPartition, Long] = Map[TopicAndPartition, Long](       TopicAndPartition("bigdata-0105", 0) -> 10L,       TopicAndPartition("bigdata-0105", 1) -> 10L     )     //从指定的offset读取数据进行消费     val kafkaDstream: InputDStream[String] = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](       ssc,       kafkaParams,       fromOffsets,       (m: MessageAndMetadata[String, String]) => m.message()     )     //消费完毕之后,对偏移量offset进行更新     val offsetRanges: Array[OffsetRange] = Array.empty[OffsetRange]     kafkaDstream.transform {       rdd => {         val offsetRnages: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges         rdd       }     }.foreachRDD {       rdd => {         for (o <- offsetRanges) {           println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")         }       }     }     //启动采集器     ssc.start()     //等待采集器结束之后,结束程序     ssc.awaitTermination()   } } 

DStream转换

DStream上的操作与RDD的类似,分为Transformations(转换)和Output Operations(输出)两种,此外转换操作中还有一些比较特殊的算子,如:updateStateByKey()、transform()以及各种Window相关的算子。

特殊算子Transform

Transform允许DStream上执行任意的RDD-to-RDD函数。即使这些函数并没有在DStream的API中暴露出来,通过该函数可以方便的扩展Spark API。该函数每一批次调度一次。其实也就是对DStream中的RDD应用转换。

package com.xcu.bigdata.spark.streaming  import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext}  /**  * @Package : com.xcu.bigdata.spark.streaming  * @Author :   * @Date : 2020 10月 星期一  * @Desc : 使用transform算子将DS转换为RDD  */ object SparkStreaming08_Transform {   def main(args: Array[String]): Unit = {     //创建配置文件对象     //注意Streaming程序执行至少需要2个线程,所以不能设置为local     val conf: SparkConf = new SparkConf().setAppName("SparkStreaming08_Transform").setMaster("local[*]")     //创建SparkStreaming程序执行入口     //指定采集周期为3s     val ssc = new StreamingContext(conf, Seconds(3))     //从指定端口读取数据     val socketDS: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)     //将DS转换为RDD进行操作     val resRDD: DStream[(String, Int)] = socketDS.transform(       rdd => {         val flatMapRDD: RDD[String] = rdd.flatMap(_.split(" "))         val mapRDD: RDD[(String, Int)] = flatMapRDD.map((_, 1))         val reduceRDD: RDD[(String, Int)] = mapRDD.reduceByKey(_ + _)         reduceRDD.sortByKey()       }     )     resRDD.print()     //启动采集器     ssc.start()     //等待采集结束之后,终止程序     ssc.awaitTermination()   } } 

无状态转化操作

无状态转化操作就是把简单的RDD转化操作应用到每个批次上,也就是转化DStream中的每一个RDD,部分无状态转化操作列在了下表中
Spark DStream

需要记住的是,尽管这些函数看起来像作用在整个流上一样,但事实上每个DStream在内部是由许多RDD(批次)组成,且无状态转化操作是分别应用到每个RDD上的。例如:reduceByKey()会归约每个时间区间中的数据,但不会归约不同区间之间的数据。

本文转自互联网,侵权联系删除Spark DStream

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » Spark DStream
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们