当前位置: 首页 > news >正文

上海集团网站建设公司好wordpress 幻灯代码

上海集团网站建设公司好,wordpress 幻灯代码,seo管理是什么,快速优化官网前置知识#xff1a; Flume学习笔记#xff08;1#xff09;—— Flume入门-CSDN博客 Flume学习笔记#xff08;2#xff09;—— Flume进阶-CSDN博客 Flume 自定义组件 自定义 Interceptor 需求分析#xff1a;使用 Flume 采集服务器本地日志#xff0c;需要按照日志… 前置知识 Flume学习笔记1—— Flume入门-CSDN博客 Flume学习笔记2—— Flume进阶-CSDN博客 Flume 自定义组件 自定义 Interceptor 需求分析使用 Flume 采集服务器本地日志需要按照日志类型的不同将不同种类的日志发往不同的分析系统 需要使用Flume 拓扑结构中的 Multiplexing 结构Multiplexing的原理是根据 event 中 Header 的某个 key 的值将不同的 event 发送到不同的 Channel中所以我们需要自定义一个 Interceptor为不同类型的 event 的 Header 中的 key 赋予不同的值 实现流程 代码 导入依赖 dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/version/dependency /dependencies 自定义拦截器的代码 package com.why.interceptor;import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor;import java.util.ArrayList; import java.util.List; import java.util.Map;public class TypeInterceptor implements Interceptor {//存放事件的集合private ListEvent addHeaderEvents;Overridepublic void initialize() {//初始化集合addHeaderEvents new ArrayList();}//单个事件拦截Overridepublic Event intercept(Event event) {//获取头信息MapString, String headers event.getHeaders();//获取body信息String body new String(event.getBody());//根据数据中是否包含”why“来分组if(body.contains(why)){headers.put(type,first);}else {headers.put(type,second);}return event;}//批量事件拦截Overridepublic ListEvent intercept(ListEvent events) {//清空集合addHeaderEvents.clear();//遍历eventsfor(Event event : events){//给每一个事件添加头信息addHeaderEvents.add(intercept(event));}return addHeaderEvents;}Overridepublic void close() {}//构建生成器public static class TypeBuilder implements Interceptor.Builder{Overridepublic Interceptor build() {return new TypeInterceptor();}Overridepublic void configure(Context context) {}}} 将代码打包放入flume安装路径下的lib文件夹中 配置文件 在job文件夹下创建group4目录添加配置文件 为 hadoop102 上的 Flume1 配置 1 个 netcat source1 个 sink group2 个 avro sink并配置相应的 ChannelSelector 和 interceptor # Name the components on this agent a1.sources r1 a1.sinks k1 k2 a1.channels c1 c2# Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444 a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.why.interceptor.TypeInterceptor$TypeBuilder a1.sources.r1.selector.type multiplexing a1.sources.r1.selector.header type a1.sources.r1.selector.mapping.first c1 a1.sources.r1.selector.mapping.second c2# Describe the sink a1.sinks.k1.type avro a1.sinks.k1.hostname hadoop103 a1.sinks.k1.port 4141 a1.sinks.k2.typeavro a1.sinks.k2.hostname hadoop104 a1.sinks.k2.port 4242# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 # Use a channel which buffers events in memory a1.channels.c2.type memory a1.channels.c2.capacity 1000 a1.channels.c2.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 c2 a1.sinks.k1.channel c1 a1.sinks.k2.channel c2 hadoop103配置一个 avro source 和一个 logger sink a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop103 a1.sources.r1.port 4141 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1 hadoop104配置一个 avro source 和一个 logger sink a1.sources r1 a1.sinks k1 a1.channels c1 a1.sources.r1.type avro a1.sources.r1.bind hadoop104 a1.sources.r1.port 4242 a1.sinks.k1.type logger a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100 a1.sinks.k1.channel c1 a1.sources.r1.channels c1 执行指令 hadoop102bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume-interceptor-flume.conf hadoop103bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1-flume-logger.conf -Dflume.root.loggerINFO,console hadoop104bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume2-flume-logger.conf -Dflume.root.loggerINFO,console 然后hadoop102通过nc连接44444端口发送数据 在hadoop103和104上分别接收到 自定义 Source 官方提供的文档Flume 1.11.0 Developer Guide — Apache Flume 给出的示例代码如下 public class MySource extends AbstractSource implements Configurable, PollableSource {private String myProp;Overridepublic void configure(Context context) {String myProp context.getString(myProp, defaultValue);// Process the myProp value (e.g. validation, convert to another type, ...)// Store myProp for later retrieval by process() methodthis.myProp myProp;}Overridepublic void start() {// Initialize the connection to the external client}Overridepublic void stop () {// Disconnect from external client and do any additional cleanup// (e.g. releasing resources or nulling-out field values) ..}Overridepublic Status process() throws EventDeliveryException {Status status null;try {// This try clause includes whatever Channel/Event operations you want to do// Receive new dataEvent e getSomeData();// Store the Event into this Sources associated Channel(s)getChannelProcessor().processEvent(e);status Status.READY;} catch (Throwable t) {// Log exception, handle individual exceptions as neededstatus Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}} finally {txn.close();}return status;} } 需要继承AbstractSource实现Configurable, PollableSource 实战需求分析 使用 flume 接收数据并给每条数据添加前缀输出到控制台。前缀可从 flume 配置文件中配置 代码 package com.why.source;import org.apache.flume.Context; import org.apache.flume.EventDeliveryException; import org.apache.flume.PollableSource; import org.apache.flume.conf.Configurable; import org.apache.flume.event.SimpleEvent; import org.apache.flume.source.AbstractSource;import java.util.HashMap; import java.util.concurrent.ConcurrentMap;public class MySource extends AbstractSource implements PollableSource, Configurable {//定义配置文件将来要读取的字段private Long delay;private String field;//获取数据封装成 event 并写入 channel这个方法将被循环调用Overridepublic Status process() throws EventDeliveryException {try {//事件头信息HashMapString,String headerMap new HashMap();//创建事件SimpleEvent event new SimpleEvent();//循环封装事件for (int i 0; i 5; i) {//设置头信息event.setHeaders(headerMap);//设置事件内容event.setBody((field i).getBytes());//将事件写入ChannelgetChannelProcessor().processEvent(event);Thread.sleep(delay);}}catch (InterruptedException e) {throw new RuntimeException(e);}return Status.READY;}//backoff 步长Overridepublic long getBackOffSleepIncrement() {return 0;}//backoff 最长时间Overridepublic long getMaxBackOffSleepInterval() {return 0;}//初始化 context读取配置文件内容Overridepublic void configure(Context context) {delay context.getLong(delay);field context.getString(field,Hello);}}打包放到flume安装路径下的lib文件夹中 配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# Describe/configure the source a1.sources.r1.type com.why.source.MySource a1.sources.r1.delay 1000 a1.sources.r1.field why# Describe the sink a1.sinks.k1.type logger# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 执行指令 hadoop102上bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group5/mysource.conf -Dflume.root.loggerINFO,console 结果如下 自定义 Sink Sink 不断地轮询 Channel 中的事件且批量地移除它们并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume AgentSink 就利用 Channel 提交事务。事务一旦被提交该 Channel 从自己的内部缓冲区删除事件 官方文档Flume 1.11.0 Developer Guide — Apache Flume 接口实例 public class MySink extends AbstractSink implements Configurable {private String myProp;Overridepublic void configure(Context context) {String myProp context.getString(myProp, defaultValue);// Process the myProp value (e.g. validation)// Store myProp for later retrieval by process() methodthis.myProp myProp;}Overridepublic void start() {// Initialize the connection to the external repository (e.g. HDFS) that// this Sink will forward Events to ..}Overridepublic void stop () {// Disconnect from the external respository and do any// additional cleanup (e.g. releasing resources or nulling-out// field values) ..}Overridepublic Status process() throws EventDeliveryException {Status status null;// Start transactionChannel ch getChannel();Transaction txn ch.getTransaction();txn.begin();try {// This try clause includes whatever Channel operations you want to doEvent event ch.take();// Send the Event to the external repository.// storeSomeData(e);txn.commit();status Status.READY;} catch (Throwable t) {txn.rollback();// Log exception, handle individual exceptions as neededstatus Status.BACKOFF;// re-throw all Errorsif (t instanceof Error) {throw (Error)t;}}return status;} } 自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口 实战需求分析 使用 flume 接收数据并在 Sink 端给每条数据添加前缀和后缀输出到控制台。前后缀可在 flume 任务配置文件中配置 代码 package com.why.sink;import org.apache.flume.*; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink;import org.slf4j.Logger; import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable {//创建 Logger 对象private static final Logger LOG LoggerFactory.getLogger(AbstractSink.class);//前后缀private String prefix;private String suffix;Overridepublic Status process() throws EventDeliveryException {//声明返回值状态信息Status status;//获取当前 Sink 绑定的 ChannelChannel ch getChannel();//获取事务Transaction txn ch.getTransaction();//声明事件Event event;//开启事务txn.begin();//读取 Channel 中的事件直到读取到事件结束循环while (true) {event ch.take();if (event ! null) {break;}}try {//处理事件打印LOG.info(prefix new String(event.getBody()) suffix);//事务提交txn.commit();status Status.READY;} catch (Exception e) {//遇到异常事务回滚txn.rollback();status Status.BACKOFF;} finally {//关闭事务txn.close();}return status;}Overridepublic void configure(Context context) {prefix context.getString(prefix, hello);suffix context.getString(suffix);} }打包放到flume安装路径下的lib文件夹中 配置文件 # Name the components on this agent a1.sources r1 a1.sinks k1 a1.channels c1# Describe/configure the source a1.sources.r1.type netcat a1.sources.r1.bind localhost a1.sources.r1.port 44444# Describe the sink a1.sinks.k1.type com.why.sink.MySink a1.sinks.k1.prefix why: a1.sinks.k1.suffix :why# Use a channel which buffers events in memory a1.channels.c1.type memory a1.channels.c1.capacity 1000 a1.channels.c1.transactionCapacity 100# Bind the source and sink to the channel a1.sources.r1.channels c1 a1.sinks.k1.channel c1 执行指令 hadoop102上bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group6/mysink.conf -Dflume.root.loggerINFO,console 结果如下
http://www.w-s-a.com/news/429169/

相关文章:

  • 四川做网站设计公司价格vip解析网站怎么做的
  • 网站建设流程域名申请做化工的 有那些网站
  • 软件开发设计流程图seo搜索引擎官网
  • 外国小孩和大人做网站东富龙科技股份有限公司
  • 上线倒计时单页网站模板做网站的资金来源
  • 泸州市建设厅网站中小企业网络需求分析
  • asp网站版权做网页价格
  • 长春网站建设路关键词优化公司哪家好
  • 河南省建设银行网站年报天津设计师网站
  • 沙洋网站定制如果自己建立网站
  • 凡科网站怎么做建站关键字搜索网站怎么做
  • 小说网站建站程序企业邮箱地址
  • 福州市住房和城乡建设网站网站开发方案论文
  • 在线教育网站开发网站推广常用方法包括
  • 东莞高端品牌网站建设软件开发模型及特点
  • 个人网站的设计与实现的主要内容网站开发公司架构
  • 浏览器收录网站什么是新媒体营销
  • 上海营销网站建设公司下面哪个不是网页制作工具
  • 有哪些网站可以做设计比赛苏州设计公司排名前十
  • 公益网站建设需求车陂手机网站开发
  • 高端网站建设专业营销团队宁德网站建设51yunsou
  • 网站如何做cdn购物网站建设app开发
  • 简单的手机网站模板好看大方的企业网站源码.net
  • 沈阳住房和城乡建设厅网站网站个人备案做论坛
  • 企业建网站的目的开家网站建设培训班
  • 做怎么网站网站优化和推广
  • 建站工具 风铃网站每年空间域名费用及维护费
  • 网站开发工具 知乎工业软件开发技术就业前景
  • 永济微网站建设费用新手如何自学编程
  • 在本地怎么做网站深圳保障房申请条件2022