当前位置: 首页 > news >正文

cms三合一网站源码浙江建设工程信息网官网入口网址

cms三合一网站源码,浙江建设工程信息网官网入口网址,sem营销是什么意思,江苏模板网站建设文章目录 一、Flume 入门概述1、概述2、Flume 基础架构2.1 Agent2.2 Source2.3 Sink2.4 Channel2.5 Event 3、Flume 安装部署3.1 安装地址3.2 安装部署 二、Flume 入门案例1、监控端口数据官方案例1.1 概述1.2 实现步骤 2、实时监控单个追加文件2.1 概述2.2 实现步骤 3、实时监… 文章目录 一、Flume 入门概述1、概述2、Flume 基础架构2.1 Agent2.2 Source2.3 Sink2.4 Channel2.5 Event 3、Flume 安装部署3.1 安装地址3.2 安装部署 二、Flume 入门案例1、监控端口数据官方案例1.1 概述1.2 实现步骤 2、实时监控单个追加文件2.1 概述2.2 实现步骤 3、实时监控目录下多个新文件3.1 概述3.2 实现步骤 4、实时监控目录下的多个追加文件4.1 概述4.2 实现步骤4.3 Taildir 问题说明 5、Kafka相关6、Kafka群起脚本 三、Flume 进阶1、Flume 事务2、Flume Agent 内部原理3、Flume 拓扑结构3.1 简单串联3.2 复制和多路复用3.3 负载均衡和故障转移3.4 聚合 4、Flume 企业开发案例4.1 复制和多路复用4.2 负载均衡和故障转移4.3 聚合 5、自定义 Interceptor5.1 概述5.2 官网实现5.3 代码实现 6、自定义 Source6.1 概述6.2 需求与分析6.3 编码实现 7、自定义 Sink7.1 概述7.2 需求7.3 编码实现 8、Flume 数据流监控8.1 Ganglia 的安装与部署8.2 操作 Flume 测试监控 四、企业真实面试题重点1、Flume 的 SourceSinkChannel 的作用你们 Source 是什么类型2、Flume 的 Channel Selectors3、Flume 参数调优4、Flume 的事务机制5、Flume 采集数据会丢失吗? 一、Flume 入门概述 1、概述 Flume 是Cloudera 提供的一个高可用的高可靠的分布式的海量日志采集、聚合和传输的系统。Flume 基于流式架构灵活简单。Flume最主要的作用就是实时读取服务器本地磁盘的数据(或者网络端口数据)将数据写入到HDFS 2、Flume 基础架构 2.1 Agent Agent 是一个 JVM 进程它以事件的形式将数据从源头送至目的。Agent 主要有 3 个部分组成Source、Channel、Sink 2.2 Source Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据包括 avro、thrift、exec、jms、spooling directory、netcat、taildir、 sequence generator、syslog、http、legacy 2.3 Sink Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、HBase、solr、自定义 2.4 Channel Channel 是位于 Source 和 Sink 之间的缓冲区。因此Channel 允许 Source 和 Sink 运作在不同的速率上。Channel 是线程安全的可以同时处理几个 Source 的写入操作和几个 Sink 的读取操作。Flume 自带两种 ChannelMemory Channel 和 File Channel。 Memory Channel 是内存中的队列。Memory Channel 在不需要关心数据丢失的情景下适用。如果需要关心数据丢失那么 Memory Channel 就不应该使用因为程序死亡、机器宕机或者重启都会导致数据丢失。File Channel 将所有事件写到磁盘。因此在程序关闭或机器宕机的情况下不会丢失数据 2.5 Event 传输单元Flume 数据传输的基本单元以 Event 的形式将数据从源头送至目的地。 Event 由 Header 和 Body 两部分组成Header 用来存放该 event 的一些属性为K-V 结构 Body 用来存放该条数据形式为字节数组 3、Flume 安装部署 3.1 安装地址 Flume 官网地址http://flume.apache.org/文档查看地址https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html下载地址http://archive.apache.org/dist/flume/ 3.2 安装部署 # 首先已经搭建好hadoop和jdk了可以参考之前的hadoop笔记 wget http://archive.apache.org/dist/flume/1.9.0/apache-flume-1.9.0-bin.tar.gz # 解压 apache-flume-1.9.0-bin.tar.gz 到/opt/module/目录下 tar -zxf apache-flume-1.9.0-bin.tar.gz -C /opt/module/ # 修改 apache-flume-1.9.0-bin 的名称为 flume mv /opt/module/apache-flume-1.9.0-bin /opt/module/flume # 将 lib 文件夹下的guava-11.0.2.jar 删除以兼容 Hadoop 3.1.3 rm /opt/module/flume/lib/guava-11.0.2.jar 二、Flume 入门案例 1、监控端口数据官方案例 1.1 概述 使用 Flume 监听一个端口收集该端口数据并打印到控制台。 首先通过netcat工具向本机的44444端口发送数据Flume监控本机的44444端口通过Flume的source端读取数据最后Flume将获取的数据通过Sink端写出到控制台(测试命令使用nc localhost 44444) 1.2 实现步骤 # 安装 netcat 工具 sudo yum install -y nc # nc -lk 9999 # nc local 9999 这样就可以聊天 # 判断 44444 端口是否被占用 sudo netstat -nlp | grep 44444 # 创建 Flume Agent 配置文件 flume-netcat-logger.conf # 在 flume 目录下创建 job 文件夹并进入 job 文件夹 mkdir job cd job/cd .. # 在 job 文件夹下创建 Flume Agent 配置文件 flume-netcat-logger.conf vim flume-netcat-logger.conf # 添加内容如下 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 # Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 # Describe the sink a1.sinks.k1.type logger # Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel # 一个sink只能一个channel,一个source可以多个channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1# 先开启 flume 监听端口 # 第一种写法 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/flume-netcat-logger.conf -Dflume.root.loggerINFO,console # 第二种写法 bin/flume-ng agent -c conf/ -n a1 -f job/flume-netcat-logger.conf -Dflume.root.loggerINFO,console # --conf/-c表示配置文件存储在 conf/目录 # --name/-n表示给 agent 起名为 a1 # --conf-file/-fflume 本次启动读取的配置文件是在 job 文件夹下的 flume-telnet.conf文件。 # -Dflume.root.loggerINFO,console -D 表示 flume 运行时动态修改 flume.root.logger参数属性值并将控制台日志打印级别设置为 INFO 级别。日志级别包括:log、info、warn、 error# 使用 netcat 工具向本机的 44444 端口发送内容 nc localhost 44444# 12/06/19 15:32:19 INFO source.NetcatSource: Source starting # 12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444] # 12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. } 2、实时监控单个追加文件 2.1 概述 案例需求实时监控 Hive 日志并上传到 HDFS 中 2.2 实现步骤 # Flume 要想将数据输出到 HDFS依赖 Hadoop 相关 jar 包 # 检查/etc/profile.d/my_env.sh 文件确认 Hadoop 和 Java 环境变量配置正确 JAVA_HOME/opt/module/jdk1.8.0_212 HADOOP_HOME/opt/module/ha/hadoop-3.1.3 PATH$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin export PATH JAVA_HOME HADOOP_HOME# 去job目录创建 flume-file-hdfs.conf 文件 vim flume-file-hdfs.conf # 要想读取 Linux 系统中的文件就得按照 Linux 命令的规则执行命令。由于 Hive日志在 Linux 系统中所以读取文件的类型选择exec 即 execute 执行的意思。表示执行Linux 命令来读取文件 # Name the components on this agent a2.sources r2 a2.sinks k2 a2.channels c2 # Describe/configure the source a2.sources.r2.type exec a2.sources.r2.command tail -F /opt/module/hive/logs/hive.log # Describe the sink a2.sinks.k2.type hdfs # 去core-site.xml查看ha集群的话改成hdfs://mycluster a2.sinks.k2.hdfs.path hdfs://hadoop102:9870/flume/%Y%m%d/%H #上传文件的前缀 a2.sinks.k2.hdfs.filePrefix logs- #是否按照时间滚动文件夹 a2.sinks.k2.hdfs.round true #多少时间单位创建一个新的文件夹 a2.sinks.k2.hdfs.roundValue 1 #重新定义时间单位 a2.sinks.k2.hdfs.roundUnit hour #是否使用本地时间戳 a2.sinks.k2.hdfs.useLocalTimeStamp true #积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k2.hdfs.batchSize 100 #设置文件类型可支持压缩 a2.sinks.k2.hdfs.fileType DataStream #多久生成一个新的文件,单位s,一般配置3600 a2.sinks.k2.hdfs.rollInterval 60 #设置每个文件的滚动大小 a2.sinks.k2.hdfs.rollSize 134217700 #文件的滚动与 Event 数量无关 a2.sinks.k2.hdfs.rollCount 0 # Use a channel which buffers events in memory a2.channels.c2.type memory a2.channels.c2.capacity 1000 a2.channels.c2.transactionCapacity 100 # Bind the source and sink to the channel a2.sources.r2.channels c2 a2.sinks.k2.channel c2 #注意对于所有与时间相关的转义序列Event Header 中必须存在以 “timestamp”的key除非 hdfs.useLocalTimeStamp 设置为 true此方法会使用 TimestampInterceptor 自动添加 timestamp# 运行 Flume bin/flume-ng agent --conf conf/ --name a2 --conf-file job/flume-file-hdfs.conf # 开启 Hadoop 和 Hive 并操作 Hive 产生日志去hive目录有数据才会生成新的文件 bin/hive 3、实时监控目录下多个新文件 3.1 概述 案例需求使用 Flume 监听整个目录的文件并上传至 HDFS 3.2 实现步骤 vim flume-dir-hdfs.conf a3.sources r3 a3.sinks k3 a3.channels c3 # Describe/configure the source a3.sources.r3.type spooldir a3.sources.r3.spoolDir /opt/module/flume/upload a3.sources.r3.fileSuffix .COMPLETED a3.sources.r3.fileHeader true #忽略所有以.tmp 结尾的文件不上传 a3.sources.r3.ignorePattern ([^ ]*\.tmp) # Describe the sink a3.sinks.k3.type hdfs a3.sinks.k3.hdfs.path hdfs://mycluster/flume/upload/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize 100 #设置文件类型可支持压缩 a3.sinks.k3.hdfs.fileType DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval 60 #设置每个文件的滚动大小大概是 128M a3.sinks.k3.hdfs.rollSize 134217700 #文件的滚动与 Event 数量无关 a3.sinks.k3.hdfs.rollCount 0 # Use a channel which buffers events in memory a3.channels.c3.type memory a3.channels.c3.capacity 1000 a3.channels.c3.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r3.channels c3 a3.sinks.k3.channel c3# 启动监控文件夹命令 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-dir-hdfs.conf # 说明在使用 Spooling Directory Source 时不要在监控目录中创建并持续修改文件上传完成的文件会以.COMPLETED 结尾被监控文件夹每 500 毫秒扫描一次文件变动# 向 upload 文件夹中添加文件,在/opt/module/flume 目录下创建 upload 文件夹 mkdir upload # 不能上传同名的否则会挂掉 touch atguigu.txt 4、实时监控目录下的多个追加文件 4.1 概述 Exec source 适用于监控一个实时追加的文件不能实现断点续传Spooldir Source 适合用于同步新文件但不适合对实时追加日志的文件进行监听并同步而 Taildir Source适合用于监听多个实时追加的文件并且能够实现断点续传 案例需求:使用 Flume 监听整个目录的实时追加文件并上传至 HDFS 4.2 实现步骤 vim flume-taildir-hdfs.conf a3.sources r3 a3.sinks k3 a3.channels c3 # Describe/configure the source a3.sources.r3.type TAILDIR # 记录JSON格式的文件记录每个尾文件的inode、绝对路径和最后位置 a3.sources.r3.positionFile /opt/module/flume/tail_dir.json a3.sources.r3.filegroups f1 f2 a3.sources.r3.filegroups.f1 /opt/module/flume/files/.*file.* a3.sources.r3.filegroups.f2 /opt/module/flume/files2/.*log.* # Describe the sink a3.sinks.k3.type hdfs a3.sinks.k3.hdfs.path hdfs://mycluster/flume/upload2/%Y%m%d/%H #上传文件的前缀 a3.sinks.k3.hdfs.filePrefix upload- #是否按照时间滚动文件夹 a3.sinks.k3.hdfs.round true #多少时间单位创建一个新的文件夹 a3.sinks.k3.hdfs.roundValue 1 #重新定义时间单位 a3.sinks.k3.hdfs.roundUnit hour #是否使用本地时间戳 a3.sinks.k3.hdfs.useLocalTimeStamp true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k3.hdfs.batchSize 100 #设置文件类型可支持压缩还有gzip等 a3.sinks.k3.hdfs.fileType DataStream #多久生成一个新的文件 a3.sinks.k3.hdfs.rollInterval 60 #设置每个文件的滚动大小大概是 128M a3.sinks.k3.hdfs.rollSize 134217700 #文件的滚动与 Event 数量无关 a3.sinks.k3.hdfs.rollCount 0 # Use a channel which buffers events in memory a3.channels.c3.type memory a3.channels.c3.capacity 1000 a3.channels.c3.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r3.channels c3 a3.sinks.k3.channel c3# 启动监控文件夹命令 bin/flume-ng agent --conf conf/ --name a3 --conf-file job/flume-taildir-hdfs.conf # 在/opt/module/flume 目录下创建 files 文件夹 mkdir files{1,2} echo hello file1.txt 4.3 Taildir 问题说明 Taildir Source 维护了一个 json 格式的 position File其会定期的往 position File中更新每个文件读取到的最新的位置因此能够实现断点续传。Linux 中储存文件元数据的区域就叫做 inode每个 inode 都有一个号码操作系统用 inode 号码来识别不同的文件。 但是例如log4j的日志是每过凌晨自动更名为新的文件这会导致数据的重复上传若后端不配合可以修改源码flume-ng-sources→flume-taildir-source源码包ReliableTailEventReader读数据TailFile的更新数据将更新和读取仅按照inode来。修改完成后打包去lib文件夹替换掉原来的的jar包 5、Kafka相关 kafka相关文档https://flume.apache.org/releases/content/1.11.0/FlumeUserGuide.html#kafka-channel flume一般都和kafka配合使用用于离线和实时数仓的数据获取kafka source相当于kafka的消费者channel数据会存储到kafka topic中而kafka sink相当于生产者 进入flume软件目录编写配置文件vim job/file_to_kafka.conf #定义组件 a1.sources r1 a1.channels c1#配置source a1.sources.r1.type TAILDIR a1.sources.r1.filegroups f1 a1.sources.r1.filegroups.f1 /opt/module/applog/log/app.* a1.sources.r1.positionFile /opt/module/flume/taildir_position.json # 自定义拦截器这里是将不合格的json数据过滤 # a1.sources.r1.interceptors i1 # a1.sources.r1.interceptors.i1.type com.atguigu.gmall.flume.interceptor.ETLInterceptor$Builder#配置channel a1.channels.c1.type org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092 a1.channels.c1.kafka.topic topic_log # 不以flume event存储 a1.channels.c1.parseAsFlumeEvent false#组装 a1.sources.r1.channels c1测试 # 启动flume bin/flume-ng agent -n a1 -c conf/ -f job/file_to_kafka.conf -Dflume.root.loggerinfo,console# 启动一个Kafka的Console-Consumer bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_log 同理还有kafka到hdfs的flume配置文件 #定义组件 a1.sourcesr1 a1.channelsc1 a1.sinksk1#配置source1 a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize 5000 a1.sources.r1.batchDurationMillis 2000 a1.sources.r1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092,hadoop104:9092 a1.sources.r1.kafka.topicstopic_log # a1.sources.r1.interceptors i1 # a1.sources.r1.interceptors.i1.type com.atguigu.interceptor.TimestampInterceptor$Builder#配置channel a1.channels.c1.type file a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior1 a1.channels.c1.dataDirs /opt/module/flume/data/behavior1 a1.channels.c1.maxFileSize 2146435071 a1.channels.c1.capacity 1000000 a1.channels.c1.keep-alive 6#配置sink a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path /origin_data/gmall/log/topic_log/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix log a1.sinks.k1.hdfs.round falsea1.sinks.k1.hdfs.rollInterval 10 a1.sinks.k1.hdfs.rollSize 134217728 a1.sinks.k1.hdfs.rollCount 0#控制输出文件类型 a1.sinks.k1.hdfs.fileType CompressedStream a1.sinks.k1.hdfs.codeC gzip#组装 a1.sources.r1.channels c1 a1.sinks.k1.channel c1 6、Kafka群起脚本 #!/bin/bashcase $1 in start){for i in hadoop102 hadoop103doecho --------启动 $i 采集flume-------ssh $i nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf/ -f /opt/module/flume/job/file_to_kafka.conf /dev/null 21 done };; stop){for i in hadoop102 hadoop103doecho --------停止 $i 采集flume-------# $2默认获取脚本第二个参数加个反斜杠是获取数据中的第二个参数ssh $i ps -ef | grep file_to_kafka | grep -v grep |awk {print \$2} | xargs -n1 kill -9 done};; esac三、Flume 进阶 1、Flume 事务 下面时commit和rollback的核心源码回滚的时候putList会直接清空而takeList会将数据重新塞回到channel中(sink的hdfs写成功但通讯失败可能重复消费source的nc可能会消息丢失)doCommit会提前判断channel够不够takeList回滚以防回滚失败 Override protected void doCommit() throws InterruptedException {int remainingChange takeList.size() - putList.size();if (remainingChange 0) {if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {throw new ChannelException(Cannot commit transaction. Byte capacity allocated to store event body byteCapacity * byteCapacitySlotSize reached. Please increase heap space/byte capacity allocated to the channel as the sinks may not be keeping up with the sources);}if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {bytesRemaining.release(putByteCounter);throw new ChannelFullException(Space for commit to queue couldnt be acquired. Sinks are likely not keeping up with sources, or the buffer size is too tight);}}int puts putList.size();int takes takeList.size();synchronized (queueLock) {if (puts 0) {while (!putList.isEmpty()) {if (!queue.offer(putList.removeFirst())) {throw new RuntimeException(Queue add failed, this shouldnt be able to happen);}}}putList.clear();takeList.clear();}bytesRemaining.release(takeByteCounter);takeByteCounter 0;putByteCounter 0;queueStored.release(puts);if (remainingChange 0) {queueRemaining.release(remainingChange);}if (puts 0) {channelCounter.addToEventPutSuccessCount(puts);}if (takes 0) {channelCounter.addToEventTakeSuccessCount(takes);}channelCounter.setChannelSize(queue.size()); }Override protected void doRollback() {int takes takeList.size();synchronized (queueLock) {Preconditions.checkState(queue.remainingCapacity() takeList.size(),Not enough space in memory channel queue to rollback takes. This should never happen, please report);while (!takeList.isEmpty()) {queue.addFirst(takeList.removeLast());}putList.clear();}putByteCounter 0;takeByteCounter 0;queueStored.release(takes);channelCounter.setChannelSize(queue.size()); }2、Flume Agent 内部原理 ChannelSelector ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型分别是 **Replicating复制**和 Multiplexing多路复用 ReplicatingSelector 会将同一个 Event 发往所有的 ChannelMultiplexing 会根据相应的原则将不同的 Event 发往不同的 Channel SinkProcessor SinkProcessor 共 有 三 种 类 型 分 别 是 DefaultSinkProcessor 、LoadBalancingSinkProcessor 和 FailoverSinkProcessor DefaultSinkProcessor 对应的是单个的 Sink LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink GroupLoadBalancingSinkProcessor 可以实现负载均衡的功能FailoverSinkProcessor 可以错误恢复的功能 3、Flume 拓扑结构 3.1 简单串联 这种模式是将多个 flume 顺序连接起来了从最初的 source 开始到最终 sink 传送的目的存储系统。此模式不建议桥接过多的 flume 数量 flume 数量过多不仅会影响传输速率而且一旦传输过程中某个节点 flume 宕机会影响整个传输系统。 3.2 复制和多路复用 Flume 支持将事件流向一个或者多个目的地。这种模式可以将相同数据复制到多个channel 中或者将不同数据分发到不同的 channel 中sink 可以选择传送到不同的目的地 3.3 负载均衡和故障转移 Flume支持使用将多个sink逻辑上分到一个sink组sink组配合不同的SinkProcessor可以实现负载均衡和错误恢复的功能 3.4 聚合 这种模式是我们最常见的也非常实用日常 web 应用通常分布在上百个服务器大者甚至上千个、上万个服务器。产生的日志处理起来也非常麻烦。用 flume 的这种组合方式能很好的解决这一问题每台服务器部署一个 flume 采集日志传送到一个集中收集日志的flume再由此 flume 上传到 hdfs、hive、hbase 等进行日志分析 4、Flume 企业开发案例 4.1 复制和多路复用 使用 Flume-1 监控文件变动Flume-1 将变动内容传递给 Flume-2Flume-2 负责存储到 HDFS。同时 Flume-1 将变动内容传递给 Flume-3Flume-3 负责输出到 LocalFileSystem # https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#avro-sink # 在/opt/module/flume/job 目录下创建 group1 文件夹并进入 # 在/opt/module/datas/目录下创建 flume3 文件夹并进入# group1下创建 flume-file-flume.conf # 配置 1 个接收日志文件的 source 和两个 channel、两个 sink分别输送给 flume-flumehdfs 和 flume-flume-dir vim flume-file-flume.conf # Name the components on this agent a1.sources r1 a1.sinks k1 k2 a1.channels c1 c2 # 将数据流复制给所有 channel默认不写就是复制 a1.sources.r1.selector.type replicating # Describe/configure the source a1.sources.r1.type exec a1.sources.r1.command tail -F /opt/module/hive/logs/hive.log a1.sources.r1.shell /bin/bash -c # Describe the sink # sink 端的 avro 是一个数据发送者 a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop102 a1.sinks.k1.port 4141 a1.sinks.k2.type avro a1.sinks.k2.hostname hadoop102 a1.sinks.k2.port 4142 # Describe the channel a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.channels.c2.type memory a1.channels.c2.capacity 1000 a1.channels.c2.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 c2 a1.sinks.k1.channel c1 a1.sinks.k2.channel c2# 创建 flume-flume-hdfs.conf配置上级 Flume 输出的 Source输出是到 HDFS 的 Sink vim flume-flume-hdfs.conf # Name the components on this agent a2.sources r1 a2.sinks k1 a2.channels c1 # Describe/configure the source # source 端的 avro 是一个数据接收服务 a2.sources.r1.type avro a2.sources.r1.bind hadoop102 a2.sources.r1.port 4141 # Describe the sink a2.sinks.k1.type hdfs a2.sinks.k1.hdfs.path hdfs://mycluster/flume2/%Y%m%d/%H #上传文件的前缀 a2.sinks.k1.hdfs.filePrefix flume2- #是否按照时间滚动文件夹 a2.sinks.k1.hdfs.round true #多少时间单位创建一个新的文件夹 a2.sinks.k1.hdfs.roundValue 1 #重新定义时间单位 a2.sinks.k1.hdfs.roundUnit hour #是否使用本地时间戳 a2.sinks.k1.hdfs.useLocalTimeStamp true #积攒多少个 Event 才 flush 到 HDFS 一次 a2.sinks.k1.hdfs.batchSize 100 #设置文件类型可支持压缩 a2.sinks.k1.hdfs.fileType DataStream #多久生成一个新的文件 a2.sinks.k1.hdfs.rollInterval 30 #设置每个文件的滚动大小大概是 128M a2.sinks.k1.hdfs.rollSize 134217700 #文件的滚动与 Event 数量无关 a2.sinks.k1.hdfs.rollCount 0 # Describe the channel a2.channels.c1.type memory a2.channels.c1.capacity 1000 a2.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a2.sources.r1.channels c1 a2.sinks.k1.channel c1# 创建 flume-flume-dir.conf,配置上级 Flume 输出的 Source输出是到本地目录的 Sink vim flume-flume-dir.conf # Name the components on this agent a3.sources r1 a3.sinks k1 a3.channels c2 # Describe/configure the source a3.sources.r1.type avro a3.sources.r1.bind hadoop102 a3.sources.r1.port 4142 # Describe the sink a3.sinks.k1.type file_roll a3.sinks.k1.sink.directory /opt/module/data/flume3 # Describe the channel a3.channels.c2.type memory a3.channels.c2.capacity 1000 a3.channels.c2.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r1.channels c2 a3.sinks.k1.channel c2# 注意输出的本地目录必须是已经存在的目录如果该目录不存在并不会创建新的目录执行配置文件并检查 # 分别启动对应的 flume 进程flume-flume-dirflume-flume-hdfsflume-file-flume # 有先后关系bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group1/flume-flume-dir.conf bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group1/flume-flume-hdfs.conf bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group1/flume-file-flume.conf 4.2 负载均衡和故障转移 使用 Flume1 监控一个端口其 sink 组中的 sink 分别对接 Flume2 和 Flume3采用FailoverSinkProcessor实现故障转移的功能 # https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sink-processors # 在/opt/module/flume/job 目录下创建 group2 文件夹 # 配置 1 个 netcat source 和 1 个 channel、1 个 sink group2 个 sink分别输送给flume-flume-console1 和 flume-flume-console2 # 创建 flume-netcat-flume.conf vim flume-netcat-flume.conf# Name the components on this agent a1.sources r1 a1.channels c1 a1.sinkgroups g1 a1.sinks k1 k2 # Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444a1.sinkgroups.g1.processor.type failover a1.sinkgroups.g1.processor.priority.k1 5 a1.sinkgroups.g1.processor.priority.k2 10 a1.sinkgroups.g1.processor.maxpenalty 10000 # Describe the sink a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop102 a1.sinks.k1.port 4141 a1.sinks.k2.type avro a1.sinks.k2.hostname hadoop102 a1.sinks.k2.port 4142 # Describe the channel a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinkgroups.g1.sinks k1 k2 a1.sinks.k1.channel c1 a1.sinks.k2.channel c1# 创建 flume-flume-console1.conf,配置上级 Flume 输出的 Source输出是到本地控制台 vim flume-flume-console1.conf # Name the components on this agent a2.sources r1 a2.sinks k1 a2.channels c1 # Describe/configure the source a2.sources.r1.type avro a2.sources.r1.bind hadoop102 a2.sources.r1.port 4141 # Describe the sink a2.sinks.k1.type logger # Describe the channel a2.channels.c1.type memory a2.channels.c1.capacity 1000 a2.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a2.sources.r1.channels c1 a2.sinks.k1.channel c1# 创建 flume-flume-console2.conf,配置上级 Flume 输出的 Source输出是到本地控制台 vim flume-flume-console2.conf # Name the components on this agent a3.sources r1 a3.sinks k1 a3.channels c2 # Describe/configure the source a3.sources.r1.type avro a3.sources.r1.bind hadoop102 a3.sources.r1.port 4142 # Describe the sink a3.sinks.k1.type logger # Describe the channel a3.channels.c2.type memory a3.channels.c2.capacity 1000 a3.channels.c2.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r1.channels c2 a3.sinks.k1.channel c2# 分别开启对应配置文件flume-flume-console2flume-flume-console1flumenetcat-flume bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group2/flume-flume-console2.conf -Dflume.root.loggerINFO,console bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group2/flume-flume-console1.conf -Dflume.root.loggerINFO,console bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group2/flume-netcat-flume.conf# 使用 netcat 工具向本机的 44444 端口发送内容 nc localhost 44444 # 使用 jps -ml 查看 Flume 进程如果要换成负载均衡只需要修改第一个文件 # 如果要改成负载均衡就变成load_balance a1.sinkgroups g1 a1.sinkgroups.g1.sinks k1 k2 a1.sinkgroups.g1.processor.type load_balance # 开启退避 a1.sinkgroups.g1.processor.backoff true # 默认 round_robin随机轮询是sink拉取轮询不是推轮询 a1.sinkgroups.g1.processor.selector random # 退避最大的时间防止一直退避下去 a1.sinkgroups.g1.processor.selector.maxTimeOut 300004.3 聚合 hadoop102 上的 Flume-1 监控文件/opt/module/group.loghadoop103 上的 Flume-2 监控某一个端口的数据流Flume-1 与 Flume-2 将数据发送给 hadoop104 上的 Flume-3Flume-3 将最终数据打印到控制台 # 分发 Flume进入module模块分发 xsync flume # 在 hadoop102、hadoop103 以及 hadoop104 的/opt/module/flume/job 目录下创建一个group3 文件夹 mkdir group3 # 创建 flume1-logger-flume.conf配置 Source 用于监控 hive.log 文件配置 Sink 输出数据到下一级 Flume # 在 hadoop102 上编辑配置文件 vim flume1-logger-flume.conf # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 # Describe/configure the source a1.sources.r1.type exec a1.sources.r1.command tail -F /opt/module/group.log a1.sources.r1.shell /bin/bash -c # Describe the sink a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop104 a1.sinks.k1.port 4141 # Describe the channel a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1# 创建 flume2-netcat-flume.conf,配置 Source 监控端口 44444 数据流配置 Sink 数据到下一级 Flume # 在 hadoop103 上编辑配置文件 vim flume2-netcat-flume.conf # Name the components on this agent a2.sources r1 a2.sinks k1 a2.channels c1 # Describe/configure the source a2.sources.r1.type netcat a2.sources.r1.bind hadoop103 a2.sources.r1.port 44444 # Describe the sink a2.sinks.k1.type avro a2.sinks.k1.hostname hadoop104 a2.sinks.k1.port 4141 # Use a channel which buffers events in memory a2.channels.c1.type memory a2.channels.c1.capacity 1000 a2.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a2.sources.r1.channels c1 a2.sinks.k1.channel c1# 创建 flume3-flume-logger.conf,配置 source 用于接收 flume1 与 flume2 发送过来的数据流最终合并后 sink 到控制台 vim flume3-flume-logger.conf # Name the components on this agent a3.sources r1 a3.sinks k1 a3.channels c1 # Describe/configure the source a3.sources.r1.type avro a3.sources.r1.bind hadoop104 a3.sources.r1.port 4141 # Describe the sink # Describe the sink a3.sinks.k1.type logger # Describe the channel a3.channels.c1.type memory a3.channels.c1.capacity 1000 a3.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a3.sources.r1.channels c1 a3.sinks.k1.channel c1# 分别开启对应配置文件flume3-flume-logger.confflume2-netcat-flume.confflume1-logger-flume.conf bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group3/flume3-flume-logger.conf -Dflume.root.loggerINFO,console bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group3/flume1-logger-flume.conf bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group3/flume2-netcat-flume.conf# 在 hadoop103 上向/opt/module 目录下的 group.log 追加内容 echo hello group.log # 在 hadoop102 上向 44444 端口发送数据 telnet hadoop102 44444 5、自定义 Interceptor 5.1 概述 使用 Flume 采集服务器本地日志需要按照日志类型的不同将不同种类的日志发往不同的分析系统 在实际的开发中一台服务器产生的日志类型可能有很多种不同类型的日志可能需要发送到不同的分析系统。此时会用到 Flume 拓扑结构中的 Multiplexing 结构Multiplexing的原理是根据 event 中 Header 的某个 key 的值将不同的 event 发送到不同的 Channel中所以我们需要自定义一个 Interceptor为不同类型的 event 的 Header 中的 key 赋予不同的值。 在该案例中我们以端口数据模拟日志以是否包含atguigu模拟不同类型的日志我们需要自定义 interceptor 区分数据中是否包含atguigu将其分别发往不同的分析系统Channel 5.2 官网实现 https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#multiplexing-channel-selector # 匹配header为state的值,一般需要我们实现拦截器实现多路复用只有在source可以使用 a1.sources r1 a1.channels c1 c2 c3 c4 a1.sources.r1.selector.type multiplexing a1.sources.r1.selector.header state a1.sources.r1.selector.mapping.CZ c1 a1.sources.r1.selector.mapping.US c2 c3 a1.sources.r1.selector.default c45.3 代码实现 创建一个 maven 项目并引入以下依赖 dependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/version /dependency定义 com.atguigu.interceptor.TypeInterceptor.CustomInterceptor 类并实现 Interceptor 接口 public class TypeInterceptor implements Interceptor {//声明一个存放事件的集合private ListEvent addHeaderEvents;Overridepublic void initialize() {//初始化存放事件的集合addHeaderEvents new ArrayList();}//单个事件拦截Overridepublic Event intercept(Event event) {//1.获取事件中的头信息MapString, String headers event.getHeaders();//2.获取事件中的 body 信息String body new String(event.getBody());//3.根据 body 中是否有atguigu来决定添加怎样的头信息if (body.contains(atguigu)) {//4.添加头信息headers.put(type, first);} else {//4.添加头信息headers.put(type, second);}return event;}//批量事件拦截Overridepublic ListEvent intercept(ListEvent events) {//1.清空集合addHeaderEvents.clear();//2.遍历 eventsfor (Event event : events) {//3.给每一个事件添加头信息addHeaderEvents.add(intercept(event));}//4.返回结果return addHeaderEvents;}Overridepublic void close() {}public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new TypeInterceptor();}Overridepublic void configure(Context context) {}} }打包放入flume/lib目录下启动时会自动通过反射扫描包 然后新建job/group4编辑 flume 配置文件为 hadoop102 上的 Flume1 配置 1 个 netcat source1 个 sink group2 个 avro sink并配置相应的 ChannelSelector 和 interceptor # https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-interceptors # Name the components on this agent.conf a1.sources r1 a1.sinks k1 k2 a1.channels c1 c2 # Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.atguigu.interceptor.TypeInterceptor$Builder a1.sources.r1.selector.type multiplexing a1.sources.r1.selector.header type a1.sources.r1.selector.mapping.first c1 a1.sources.r1.selector.mapping.second c2 # Describe the sink a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop103 a1.sinks.k1.port 4141 a1.sinks.k2.typeavro a1.sinks.k2.hostname hadoop104 a1.sinks.k2.port 4242 # Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Use a channel which buffers events in memory a1.channels.c2.type memory a1.channels.c2.capacity 1000 a1.channels.c2.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 c2 a1.sinks.k1.channel c1 a1.sinks.k2.channel c2为 hadoop103 上的 Flume4 配置一个 avro source 和一个 logger sink a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop103 a1.sources.r1.port 4141 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1为 hadoop104 上的 Flume3 配置一个 avro source 和一个 logger sink a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop104 a1.sources.r1.port 4242 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1启动先启动103104最后启动102 # hadoop103 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2.conf -Dflume.root.loggerINFO,console # hadoop104 bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume3.conf -Dflume.root.loggerINFO,console bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1.conf 6、自定义 Source 6.1 概述 官网给出的source:https://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-sources Source 是负责接收数据到 Flume Agent 的组件。Source 组件可以处理各种类型、各种格式的日志数据包括 avro、thrift、exec、jms、spooling directory、netcat、sequencegenerator、syslog、http、legacy。官方提供的 source 类型已经很多但是有时候并不能满足实际开发当中的需求此时我们就需要根据实际需求自定义某些 source。官方也提供了自定义 source 的接口根据官方说明自定义MySource 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。实现相应方法 getBackOffSleepIncrement() backoff 步长getMaxBackOffSleepInterval()backoff 最长时间configure(Context context)初始化 context读取配置文件内容process()获取数据封装成 event 并写入 channel这个方法将被循环调用 使用场景读取 MySQL 数据或者其他文件系统 6.2 需求与分析 使用 flume 接收数据并给每条数据添加前缀输出到控制台。前缀可从 flume 配置文件中配置 6.3 编码实现 导入依赖 dependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/version /dependency创建com.atguigu.source.MySource public class MySource extends AbstractSource implementsConfigurable, PollableSource {//定义配置文件将来要读取的字段private Long delay;private String field;//初始化配置信息Overridepublic void configure(Context context) {delay context.getLong(delay);field context.getString(field, Hello!);}Overridepublic Status process() throws EventDeliveryException {try {//创建事件头信息HashMapString, String hearderMap new HashMap();//创建事件SimpleEvent event new SimpleEvent();//循环封装事件for (int i 0; i 5; i) {//给事件设置头信息event.setHeaders(hearderMap);//给事件设置内容event.setBody((field i).getBytes());//将事件写入 channel这里面包括了拦截器和channel选择器(包括事务)可以结合之前的流程读一下getChannelProcessor().processEvent(event);Thread.sleep(delay);}} catch (Exception e) {e.printStackTrace();return Status.BACKOFF;}return Status.READY;}Overridepublic long getBackOffSleepIncrement() {return 0;}Overridepublic long getMaxBackOffSleepInterval() {return 0;} }将写好的代码打包并放到 flume 的 lib 目录/opt/module/flume下然后创建配置文件启动可以查看效果 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 # Describe/configure the source a1.sources.r1.type com.atguigu.source.MySource a1.sources.r1.delay 1000 # a1.sources.r1.field atguigu # Describe the sink a1.sinks.k1.type logger # Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c17、自定义 Sink 7.1 概述 Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。Sink 是完全事务性的。在从Channel 批量删除数据之前每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume AgentSink 就利用 Channel 提交事务。事务一旦被提交该 Channel 从自己的内部缓冲区删除事件。 Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多但是有时候并不能满足实际开发当中的需求此时我们就需要根据实际需求自定义某些 Sink。官方也提供了自定义 sink 的接口MySink 需要继承 AbstractSink 类并实现 Configurable 接口。实现相应方法 configure(Context context)初始化 context读取配置文件内容process()从 Channel 读取获取数据event这个方法将被循环调用。使用场景读取 Channel 数据写入 MySQL 或者其他文件系统。 7.2 需求 使用 flume 接收数据并在 Sink 端给每条数据添加前缀和后缀输出到控制台。前后缀可在 flume 任务配置文件中配置 7.3 编码实现 public class MySink extends AbstractSink implements Configurable {//创建 Logger 对象private static final Logger LOG LoggerFactory.getLogger(AbstractSink.class);private String prefix;private String suffix;Overridepublic Status process() throws EventDeliveryException {//声明返回值状态信息Status status;//获取当前 Sink 绑定的 ChannelChannel ch getChannel();//获取事务Transaction txn ch.getTransaction();//声明事件Event event;//开启事务txn.begin();//读取 Channel 中的事件直到读取到事件结束循环while (true) {event ch.take();if (event ! null) {break;}}try {//处理事件打印LOG.info(prefix new String(event.getBody()) suffix);//事务提交txn.commit();status Status.READY;} catch (Exception e) {//遇到异常事务回滚txn.rollback();status Status.BACKOFF;} finally {//关闭事务txn.close();}return status;}Overridepublic void configure(Context context) {//读取配置文件内容有默认值prefix context.getString(prefix, hello:);//读取配置文件内容无默认值suffix context.getString(suffix);} }将写好的代码打包并放到 flume 的 lib 目录/opt/module/flume下然后编写配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1 # Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 # Describe the sink a1.sinks.k1.type com.atguigu.MySink #a1.sinks.k1.prefix atguigu: a1.sinks.k1.suffix :atguigu # Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 开启任务 bin/flume-ng agent -c conf/ -f job/mysink.conf -n a1 -Dflume.root.loggerINFO,console # 开启 nc localhost 44444 8、Flume 数据流监控 8.1 Ganglia 的安装与部署 Ganglia 由 gmond、gmetad 和 gweb 三部分组成。 gmondGanglia Monitoring Daemon是一种轻量级服务安装在每台需要收集指标数据的节点主机上。使用 gmond你可以很容易收集很多系统指标数据如 CPU、内存、磁盘、网络和活跃进程的数据等。gmetadGanglia Meta Daemon整合所有信息并将其以 RRD 格式存储至磁盘的服务。gwebGanglia WebGanglia 可视化工具gweb 是一种利用浏览器显示 gmetad 所存储数据的 PHP 前端。在 Web 界面中以图表方式展现集群的运行状态下收集的多种不同指标数据 # 在 102 103 104 分别安装 epel-release sudo yum -y install epel-release # 在 102 安装 sudo yum -y install ganglia-gmetad sudo yum -y install ganglia-web sudo yum -y install ganglia-gmond # 在 103 和 104 安装 sudo yum -y install ganglia-gmond# 在 102 修改配置文件/etc/httpd/conf.d/ganglia.conf sudo vim /etc/ganglia/gmetad.conf # 修改为data_source my cluster hadoop102# 在 102 103 104 修改配置文件/etc/ganglia/gmond.conf sudo vim /etc/ganglia/gmond.conf # 修改三个地方name my clusterhost hadoop102bind 0.0.0.0 cluster {name my clusterowner unspecifiedlatlong unspecifiedurl unspecified }#bind_hostname yes # Highly recommended, soon to be default.# This option tells gmond to use a source address# that resolves to the machines hostname. Without# this, the metrics may appear to come from any# interface and the DNS names associated with# those IPs will be used to create the RRDs.# mcast_join 239.2.11.71# 数据发送给 hadoop102host hadoop102port 8649ttl 1 } udp_recv_channel {# mcast_join 239.2.11.71port 8649 # 接收来自任意连接的数据bind 0.0.0.0retry_bind true# Size of the UDP buffer. If you are handling lots of metrics you really# should bump it up to e.g. 10MB or even higher.# buffer 10485760 }# 在 102 修改配置文件/etc/selinux/config sudo vim /etc/selinux/config # 设置SELINUXdisabled # 上面生效要重启可以暂时生效 sudo setenforce 0# 设置权限 sudo chmod -R 777 /var/lib/ganglia # 去hadoop102设置所有ip访问 sudo vim /etc/httpd/conf.d/ganglia.conf # 添加Require all granted# 打开网页启动感觉界面很low http://hadoop102/ganglia 8.2 操作 Flume 测试监控 bin/flume-ng agent \ -c conf/ \ -n a1 \ -f job/flume-netcat-logger.conf \ -Dflume.root.loggerINFO,console \ -Dflume.monitoring.typeganglia \ -Dflume.monitoring.hostshadoop102:8649# 发送 nc localhost 44444 四、企业真实面试题重点 1、Flume 的 SourceSinkChannel 的作用你们 Source 是什么类型 Source 组件是专门用来收集数据的可以处理各种类型、各种格式的日志数据包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacyChannel 组件对采集到的数据进行缓存可以存放在 Memory 或 File 中Sink 组件是用于把数据发送到目的地的组件目的地包括 Hdfs、Logger、avro、thrift、ipc、file、Hbase、solr、自定义 2、Flume 的 Channel Selectors 3、Flume 参数调优 Source 增加 Source 个使用 Tair Dir Source 时可增加 FileGroups 个数可以增大 Source的读取数据的能力。例如当某一个目录产生的文件过多时需要将这个文件目录拆分成多个文件目录同时配置好多个 Source 以保证 Source 有足够的能力获取到新产生的数据。batchSize 参数决定 Source 一次批量运输到 Channel 的 event 条数适当调大这个参数可以提高 Source 搬运Event 到 Channel 时的性能。 Channel type 选择 memory 时 Channel 的性能最好但是如果 Flume 进程意外挂掉可能会丢失数据。type 选择 file 时Channel 的容错性更好但是性能上会比 memory channel 差。使用 file Channel 时 dataDirs 配置多个不同盘下的目录可以提高性能。Capacity 参数决定 Channel 可容纳最大的 event 条数。transactionCapacity 参数决定每次 Source 往 channel 里面写的最大 event 条数和每次 Sink 从 channel 里面读的最大 event 条数。transactionCapacity 需要大于 Source 和 Sink 的 batchSize 参数 Sink 增加 Sink 的个数可以增加 Sink 消费 event 的能力。Sink 也不是越多越好够用就行过多的 Sink 会占用系统资源造成系统资源不必要的浪费。batchSize 参数决定 Sink 一次批量从 Channel 读取的 event 条数适当调大这个参数可以提高 Sink 从 Channel 搬出 event 的性能 4、Flume 的事务机制 Flume 的事务机制类似数据库的事务机制Flume 使用两个独立的事务分别负责从Soucrce 到 Channel以及从 Channel 到 Sink 的事件传递。 比如 spooling directory source 为文件的每一行创建一个事件一旦事务中所有的事件全部传递到 Channel 且提交成功那么Soucrce 就将该文件标记为完成。同理事务以类似的方式处理从 Channel 到 Sink 的传递过程如果因为某种原因使得事件无法记录那么事务将会回滚。且所有的事件都会保持到 Channel 中等待重新传递。 5、Flume 采集数据会丢失吗? 根据 Flume 的架构原理Flume 是不可能丢失数据的其内部有完善的事务机制 Source 到 Channel 是事务性的Channel 到 Sink 是事务性的因此这两个环节不会出现数据的丢失唯一可能丢失数据的情况是 Channel 采用 memoryChannelagent 宕机导致数据丢失或者 Channel 存储数据已满导致 Source 不再写入未写入的数据丢失。 Flume 不会丢失数据但是有可能造成数据的重复例如数据已经成功由 Sink 发出但是没有接收到响应Sink 会再次发送数据此时可能会导致数据的重复
http://www.w-s-a.com/news/698430/

相关文章:

  • 如何做网站哪个站推广描述对于营销型网站建设很重要飘红效果更佳
  • 济阳做网站公司99企业邮箱888
  • 国贸做网站的公司能接做网站的活的网站
  • 淮南建设厅网站上杭县建设局网站
  • 东莞做网站公司首选!西安注册公司费用
  • 做网站包括什么卖水果网站模板
  • 扬州网站建设外包wordpress 文章评分
  • 网站建设网站多少钱公司名字大全列表
  • 设计企业网站内容wordpress 投稿者 权限
  • seo网站推广免费价格低的成语
  • 做网站建设销售辛苦吗专题页是什么
  • 做网站的软件名字全拼wordpress可以上传文件吗
  • 建品牌网站公司关于asp_sql网站开发的书籍
  • 建网站公司营销型网站建设wordpress自定义登录页
  • 泉州市住房和城乡建设局网站淘宝店网站怎么做
  • 企业网站建设费未付款怎样挂账长春网站制作专业
  • 深圳找网站建设邹城市建设局网站
  • 长春火车站停运了吗网站开发概要设计
  • 网站开发表格整体页面居中网站域名详解
  • 漕泾网站建设赢展网站建设
  • 医院网站建设的要求毕业了智慧团建密码忘了
  • 网站怎么建设在哪里接单坪山商城网站建设哪家便宜
  • 中山企业网站优化易语言wordpress发布
  • 宜昌网站推广自己怎么做彩票网站吗
  • 英文网站建设 招标网站建设中服务器搭建方式
  • 直播网站建设需要什么软件有哪些室内设计效果图怎么做
  • 宁波网站建设电话网络推广外包一年多少钱
  • 检索标准的网站怎么制作企业网站
  • 下列关于网站开发中网页发布wordpress 粘帖图片
  • 网站建设遇到的问题及对策宁波网站建设营销推广