当前位置: 首页 > news >正文

网络培训的网站建设销售型网站怎么做的

网络培训的网站建设,销售型网站怎么做的,中国高清vpswindows在线,淄博物联网app开发公司2.3、Spark Streaming2.3.0、OverviewSpark Streaming 是核心 Spark API 的扩展#xff0c;它支持实时数据流的可扩展、高吞吐量、容错流处理。数据可以从许多来源#xff08;如 Kafka、Kinesis 或 TCP 套接字#xff09;获取#xff0c;并且可以使用复杂的算法进行处理它支持实时数据流的可扩展、高吞吐量、容错流处理。数据可以从许多来源如 Kafka、Kinesis 或 TCP 套接字获取并且可以使用复杂的算法进行处理这些算法由 map、reduce、join 和 window 等高级函数表示。最后可以将处理后的数据推送到文件系统、数据库和实时仪表板。当然也可以在数据流上应用机器学习和图处理。工作原理如下Spark Streaming 接收实时输入的数据流并将数据分成批处理然后由 Spark 引擎处理以批处理生成最终的结果流。其中SparkStreaming提供了一种离散流或DStream的高级抽象来代表一个连续的数据流底层就是由一系列RDD来表示。DStream 中的每个 RDD 都包含来自某个区间的数据如下图2.3.0.1、Exampleimport org.apache.spark._ import org.apache .spark.streaming._ import org.apache.spark.streaming.StreamingContext_ // not necessary since Spark 1.3 // Create a local StreamingContext with two working thread and batch interval of 1 second. // The master requires 2 cores to prevent a starvation scenario. val conf new SparkConf().setMaster(local[2]).setAppName(NetworkWordCount) val ssc new streamingContext(conf, Seconds(1)) // Create a Dstream that will connect to hostname:port, like localhost:9999 val lines ssc.socketTextstream(localhost, 9999) // Split each line into words val words lines.flatMap(_.split(”)) // Count each word in each batch val pairs words.map(word (word, 1)) val wordCounts pairs.reduceByKey(_ _) // Print the first ten elements of each RDD generated in this Dstream to the console wordCounts.print() ssc.start() // start the computation ssc.awaitTermination() // Wait for the computation to terminate如上面的demo所示每个输入流都会和一个Receiver对象相关联该对象用来接收数据并将其存储在Spark内存中进行下一步的处理。因此如果你想要在流应用程序中并行接收多个数据流的话那么就得需要创建多个Receiver对象用来接收数据。同时也需要记住的是SparkStreaming应用程序是属于常驻的而且也是Spark程序那么Worker/Executor也会占用一部分资源所以为了能够保障运行Receiver以及正常处理数据那么就需要申请到足够的资源所以其分配的核数一定要大于receivers的个数。2.3.0.2、Points To Remember1、一旦Context启动之后就不能增加或者设置新的流计算2、一旦Context停止后就无法重新启动。这里说的是容错方面。3、同一时间一个JVM内只能有一个StreamingContext。4、在StreamingContext上调用stop()方法同时也会把SparkContext给停止如果只是想停止StreamingContext那么可以在调用stop()方法的时候指定stopSparkContextfalse。5、一个SparkContext可以被复用创建多个StreamingContext(即在下一个StreamingContext被创建之前停止上一个StreamingContext且不停止SparkContext)2.3.1、ReceiverSparkStreaming可以从任意的数据源来接收数据并处理目前内置的数据源包括Kafka、File、Socket等等。当然目前Spark内置支持的数据源可以满足日常大部分的场景但有些时候仍然需要自定义Receiver来定制接收数据源。这小节将来讲述如何实现一个自定义的Receiver。首先要继承Receiver然后重写onStart和onStop方法。onStart()方法会在启动的时候负责接收数据onStop()方法将停止这些接收数据的线程当然还可以使用isStopped()方法来检查它们是否停止接收数据。在 Spark Streaming 中当一个 Receiver 启动时每隔 spark.streaming.blockInterval 毫秒就会产生一个新的块每个块都会变成 RDD 的一个分区最终由 DStream 创建。例如由 KafkaInputDStream 创建的 RDD 中的分区数由 batchInterval / spark.streaming.blockInterval 确定其中 batchInterval 是将流数据分成批次的时间间隔通过 StreamingContext 的构造函数参数设置。例如如果批处理间隔为 2 秒默认块间隔为 200 毫秒默认则RDD 将包含 10 个分区还有一个流程路径涉及从迭代器接收数据由 ReceivedBlockHandler 表示。创建 RDD 后驱动程序的 JobScheduler 可以将其处理安排为作业。在 Spark Streaming 的当前实现和默认配置下任何时间点只有一个作业处于活动状态即正在执行。因此如果一个批次的处理时间比批次间隔长那么下一个批次的作业将保持排队,将其设置为 1 的原因是并发作业可能会导致奇怪的资源共享并且可能难以调试系统中是否有足够的资源来足够快地处理摄取的数据当然可以通过实验性 Spark 属性 spark.streaming.concurrentJobs 进行更改默认情况下设置为 1。一次只运行一个作业不难看出如果批处理时间小于批处理间隔那么系统将是稳定的。Receiver一旦接收到数据后那么就会调用store(data)方法进行存储这里有两种处理方式来保障Receiver是否可靠1、来一条存储一条这种可靠性较差2、存储整个对象/序列化集合。阻塞的方式存储其自定义实现store()方法会影响到整体的容错和可靠。当应用程序发生了异常时应该要有捕获机制并要有重试机制。如果应用程序发生重启的时候那么会调用Receiver类中的restart()方法其内部会异步调用onStop方法并隔一定延迟后调用onStart()方法完成重启动作。public class JavaCustomReceiver extends ReceiverString {String host null;int port -1;public JavaCustomReceiver(String host_ , int port_) {super(storageLevel.MEMORY_AND_DISK_2());host host_;port port_;}Overridepublic void onstart() {// Start the thread that receives data over a connectionnew Thread(this::receive).start();}overridepublic void onstop() {// There is nothing much to do as the thread calling receive()// is designed to stop by itself if isStopped() returns false}/** Create a socket connection and receive data until receiver is stopped */private void receive() {Socket socket nul1;String userInput null;try {// connect to the serversocket new Socket(host, port);BufferedReader reader new BufferedReader(new InputstreamReader(socket.getInputstream(), StandardCharsets.UTF 8))// Until stopped or connection broken continue readingwhile (!isStopped() (userInput reader.readLine()) ! null) {System.out.println(Received data userInput );store(userInput);}reader.close();socket.close();// Restart in an attempt to connect again when server is active againrestart(Trying to connect again);} catch(ConnectException ce) {// restart if could not connect to serverrestart(Could not connect, ce);} catch(Throwable t) f// restart if there is any other errorrestart(Error receiving data, t);}} }// 调用自定义Receiver: // Assuming ssc is the JavastreamingContext JavaDStreamString customReceiverstream ssc.receiverstream(new JavaCustomReceiver(host, port)); JavaDstreamString words customReceiverstream.flatMap(s - ...); ...
http://www.w-s-a.com/news/229207/

相关文章:

  • 手机网站页面制作网站怎么做快照
  • asp网站怎么仿站推广软件下载平台
  • 电子商务网站建设期末试题08答案互联网怎么做
  • 规范门户网站的建设和管理办法微信网站开发公司电话
  • 免费行情网站凡客的官网
  • 做网站运营的女生多吗海淀企业网站建设
  • 网站运行环境配置网站建设个一般需要花费多少钱
  • 广西平台网站建设报价wordpress 免费 企业 主题
  • 四川省建设厅职称查询网站辽宁省住房和城乡建设部网站
  • 公司网站后台登陆网站放到云服务器上怎么做
  • 济南 网站定制做网站购买域名
  • 代理分佣后台网站开发怎么用源码做网站视频
  • 天津网站建设招标wordpress七牛图片插件
  • 建设合同施工合同示范文本汕头市网络优化推广平台
  • 网站关键词修改老王搜索引擎入口
  • 那个网站做搬家推广比较好建设部网站办事大厅栏目
  • 做企业销售分析的网站广州网站设计建设
  • 建站流程wordpress怎么开伪静态
  • 服务器不是自己的做违法网站videopro wordpress
  • 北京建网站的公司哪个比较好网站开通告知书
  • 网站负责人 主体负责人黑龙江 建设监理协会网站
  • 手机网站焦点图代码建设工程质量检测网站
  • 墙绘做网站推广有作用没html网页制作用什么软件
  • 企业做网站有用吗网站推广的常用方法有哪些?
  • 景安做网站教程互联网小程序开发
  • 桂林北站离阳朔多远贵州省建设厅住房和城乡建设官网二建考试
  • 浙江省建设厅 网站是多少wordpress淘宝客一键
  • 网站流量少怎么做5个不好的网站
  • 随州网站建设有限公司个人申请注册公司需要多少钱
  • 东莞做商城网站建设wordpress批量下载外链图片