当前位置: 首页 > news >正文

邢台网站建设信息重庆做网站推广的

邢台网站建设信息,重庆做网站推广的,重庆门户网站,小学生网上学做辅导哪个网站好Flume 功能 Flume主要作用#xff0c;就是实时读取服务器本地磁盘数据#xff0c;将数据写入到 HDFS。 Flume是 Cloudera提供的高可用#xff0c;高可靠性#xff0c;分布式的海量日志采集、聚合和传输的系统工具。 Flume 架构 Flume组成架构如下图所示#xff1a; A…Flume 功能 Flume主要作用就是实时读取服务器本地磁盘数据将数据写入到 HDFS。 Flume是 Cloudera提供的高可用高可靠性分布式的海量日志采集、聚合和传输的系统工具。 Flume 架构 Flume组成架构如下图所示 Agent 每个 Agent 代表着一个 JVM 进程它以事件的方式将数据从源头送至目的地。 Agent 由 3 个部分组成Source、Channel、Sink。 SourceSource是负责接收数据到Flume Agent 的组件Source 组件可以处理各种类型、各种格式的日志数据。SinkSink不断轮询Channel中的事件并且批量移除他们并将这些数据批量写入到存储或者索引系统或被发送到另一个Flume Agent。ChannelChannel是位于Source 和Sink 之间的缓冲区。因此Channel允许 Source和 Sink 有不同的运行速率。Channel 是线程安全的 可以同时处理几个 Source 的写操作和多个 Sink 的读操作。 Flume自带两种 ChannelMemory Channel 和File Channel。 Memory Channel 是一个内存队列Source将事件写入其尾部Sink从其头部读取事件。 Memory Channel 将源写入的事件存储在堆上。由于它将所有数据存储在内存中因此提供了高吞吐量。 它最适合那些不担心数据丢失的流。 它不适合涉及数据丢失的数据流。 File Channel将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据。EventFlume 传输数据的基本单元数据以 Event的形式将数据从源头送至目的地。Event由Header 和 Body 组成Header用来存放event 的属性为 K-V结构Body 用于存放数据为字节数组结构。 安装 flume 话不多说直接开始安装 flume。 前往 http://archive.apache.org/dist/flume/ 选择1.9.0将apache-flume-1.9.0-bin.tar.gz使用 sftp上传到linux的/opt/software目录下 解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下 [loganhadoop101 hadoop]$ tar -zxf /opt/software/apache-flume-1.9.0-bin.tar.gz -C /opt/module/创建软链接 cd /opt/module ln -snf flume-1.9.0/ flume删除jar 包否则会报错 cd /opt/module/flume rm lib/guava-11.0.2.jar注意删除guava一定要配置 Hadoop 环境变量否则会报错 Caused by: java.lang.ClassNotFoundException: com.google.common.collect.Listsat java.net.URLClassLoader.findClass(URLClassLoader.java:382)at java.lang.ClassLoader.loadClass(ClassLoader.java:424)at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:349)at java.lang.ClassLoader.loadClass(ClassLoader.java:357)... 1 more修改log4j.properties配置文件 [loganhadoop101 flume]$ vim conf/log4j.properties # 注意是修改内容 flume.log.dir/opt/module/flume/logs验证 flume [loganhadoop101 flume]$ /opt/module/flume/bin/flume-ng version Flume 1.9.0 Source code repository: https://git-wip-us.apache.org/repos/asf/flume.git Revision: d4fcab4f501d41597bc616921329a4339f73585e Compiled by fszabo on Mon Dec 17 20:45:25 CET 2018 From source with checksum 35db629a3bda49d23e9b3690c80737f9Flume日志采集 配置解析 需要采集的日志文件分布在hadoop101hadoop102两台日志服务器故需要在hadoop101hadoop102上配置日志采集 Flume。Flume需要采集日志文件内容并对日志格式JSON进行校验然后将校验通过的日志发送到 Kafka。 此处选择TailDirSource和 KafkaChannel。 TailDirSource优势支持断点续传、多目录。KafkaChannel优势省去了 Sink提高了效率。日志采集 Flume关键配置 Flume日志采集配置 创建Flume 配置文件 [loganhadoop101 flume]$ pwd /opt/module/flume [loganhadoop101 flume]$ mkdir job [loganhadoop101 flume]$ vim job/file_to_kafka.conf #定义组件 a1.sources r1 a1.channels c1#配置source a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /opt/module/applog/log/app.* a1.sources.r1.positionFile /opt/module/flume/taildir_position.json a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.logan.gmall.flume.interceptor.ETLInterceptor$Builder#配置channel a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers hadoop101:9092 a1.channels.c1.kafka.topic topic_log a1.channels.c1.parseAsFlumeEvent false#组装 a1.sources.r1.channels c1编写拦截器 创建Maven工程flume-interceptor创建包com.logan.gmall.flume.interceptor在pom.xml文件中添加如下配置 dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency /dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion2.4/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins /build在com.logan.gmall.flume.utils包下创建JSONUtil类 package com.logan.gmall.flume.utils;import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONException;public class JSONUtil {/*** 通过异常判定是否是JSON字符串*/public static boolean isJSONValidate(String input){try {JSON.parseObject(input);return true;} catch (JSONException e) {return false;}} }在com.logan.gmall.flume.interceptor包下创建ETLInterceptor类 package com.logan.gmall.flume.interceptor;import com.logan.gmall.flume.utils.JSONUtil; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.List;public class ETLInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {//1、获取body当中的数据并转成字符串byte[] body event.getBody();String log new String(body, StandardCharsets.UTF_8);//2、判断字符串是否是一个合法的json是返回当前event不是返回nullif (JSONUtil.isJSONValidate(log)) {return event;} else {return null;}}Overridepublic ListEvent intercept(ListEvent list) {IteratorEvent iterator list.iterator();while (iterator.hasNext()){Event next iterator.next();if(intercept(next)null){iterator.remove();}}return list;}public static class Builder implements Interceptor.Builder{Overridepublic Interceptor build() {return new ETLInterceptor();}Overridepublic void configure(Context context) {}}Overridepublic void close() {} }打包 将打包文件放到hadoop101 的/opt/moduie/flume/lib文件夹下 采集测试 启动Zookeeper、Kafka集群启动 hadoop101上的日志采集Flume [loganhadoop101 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.loggerinfo,console启动一个Kafka的Console-Consumer [loganhadoop101 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop11:9092 --topic topic_log生成模拟数据 [loganhadoop101 module]$ mkdir -p /opt/module/applog/log/ [loganhadoop101 module]$ vim /opt/module/applog/log/app.2023-12-02.log {common:{ar:500000,ba:iPhone,ch:Appstore,is_new:1,md:iPhone Xs,mid:mid_552171,os:iOS 13.3.1,uid:919,vc:v2.1.134},displays:[{display_type:activity,item:1,item_type:activity_id,order:1,pos_id:5},{display_type:activity,item:2,item_type:activity_id,order:2,pos_id:5},{display_type:query,item:19,item_type:sku_id,order:3,pos_id:4},{display_type:query,item:3,item_type:sku_id,order:4,pos_id:2},{display_type:query,item:5,item_type:sku_id,order:5,pos_id:2},{display_type:promotion,item:19,item_type:sku_id,order:6,pos_id:4},{display_type:query,item:14,item_type:sku_id,order:7,pos_id:2},{display_type:query,item:9,item_type:sku_id,order:8,pos_id:2},{display_type:promotion,item:35,item_type:sku_id,order:9,pos_id:1}],page:{during_time:9853,page_id:home},ts:1672512476000} {actions:[{action_id:favor_add,item:9,item_type:sku_id,ts:1672512480386},{action_id:get_coupon,item:2,item_type:coupon_id,ts:1672512483772}],common:{ar:500000,ba:iPhone,ch:Appstore,is_new:1,md:iPhone Xs,mid:mid_552171,os:iOS 13.3.1,uid:919,vc:v2.1.134},displays:[{display_type:promotion,item:19,item_type:sku_id,order:1,pos_id:4},{display_type:promotion,item:14,item_type:sku_id,order:2,pos_id:5},{display_type:query,item:21,item_type:sku_id,order:3,pos_id:1},{display_type:query,item:11,item_type:sku_id,order:4,pos_id:2},{display_type:promotion,item:28,item_type:sku_id,order:5,pos_id:1}],page:{during_time:10158,item:9,item_type:sku_id,last_page_id:home,page_id:good_detail,source_type:promotion},ts:1672512477000}观察kafka是否有消费到数据
http://www.w-s-a.com/news/671135/

相关文章:

  • 蔡甸网站建设烟台网站建设yt
  • 最流行的网站开发新开的网页游戏平台
  • 暴富建站wordpress 标签分类
  • 搞笑网站源码百度快照替代
  • 重庆网站建设哪家公司哪家好关键词是怎么排名的
  • 青县网站建设今天国际大事新闻
  • 深圳正规网站制作哪里好怎样优化网络
  • 米拓网站建设教程dw成品网站成品视频教学
  • 用jsp做的网站源代码天门网站网站建设
  • 百度如何把网站做链接地址有没有资源可以在线观看
  • 淮安做网站找哪家好电子商务网站建设规划书的内容
  • 开发网站建设用什么框架php黄页系统
  • 聊城制作网站全球十大电商平台排名
  • 用什么来网站开发好mega menu wordpress
  • 深圳制作网站有用吗wordpress的主题
  • 网站的规划与创建天津市南开区网站开发有限公司
  • 免备案网站主机建站哪个平台好
  • python做网站 不适合单页营销分享网站
  • 珠海市研发网站建设建设网站挣钱
  • 阿里巴巴国际站特点做wps的网站赚钱
  • wordpress更换域名后网站打不开宜昌建设银行网站
  • 写出网站开发的基本流程百度网页电脑版入口
  • 网站设计有限公司怎么样网站建设西班牙语
  • 网站安全解决方案宁波seo网络推广优化价格
  • 做网站带来好处wordpress可以做oa系统吗
  • 建筑设计人才招聘网站h5营销型网站suteng
  • 做app和网站怎样如何做html网站
  • php开发手机端网站开发更换网站标题
  • 提供网站建设报价延津县建设局网站
  • 江苏网站建设流程土巴兔全包装修怎么样