当前位置: 首页 > news >正文

广州怎么建设一个网站重养网站建设

广州怎么建设一个网站,重养网站建设,企业网站搭建哪家好,久久建筑网施工方案好用吗入门需要掌握#xff1a;从入门demo理解、flink 系统架构#xff08;看几个关键组件#xff09;、安装、使用flink的命令跑jar包flink的webUI 界面的监控、常见错误、调优 一、入门demo#xff1a;统计单词个数 0、单词txt 文本内容(words.txt)#xff1a; hello world … 入门需要掌握从入门demo理解、flink 系统架构看几个关键组件、安装、使用flink的命令跑jar包flink的webUI 界面的监控、常见错误、调优 一、入门demo统计单词个数 0、单词txt 文本内容(words.txt) hello world hello flink hello java1、DataSet api(已弃用) public class BatchWordCount {public static void main(String[] args) throws Exception {// 1. 创建执行环境ExecutionEnvironment env ExecutionEnvironment.getExecutionEnvironment();// 2. 从文件读取数据 按行读取(存储的元素就是每行的文本)DataSourceString lineDS env.readTextFile(input/words.txt);// 3. 转换数据格式FlatMapOperatorString, Tuple2String, Long wordAndOne lineDS.flatMap((String line, CollectorTuple2String, Long out) - {String[] words line.split( );for (String word : words) {//Tuple2.of 构建二元组out.collect(Tuple2.of(word, 1L));}}) //Lambda是java的新特性(不是flink的特性)在Lambda表达式中使用泛型存在泛型擦除所以flink 需要用returns 显示的声明类型信息.returns(Types.TUPLE(Types.STRING, Types.LONG)); // 4. 按照 word 进行分组因为flink 不像Spark 有groupByKey可以直接得到字段flink是通过索引位置得到字段比如0就是第一个字段UnsortedGroupingTuple2String, Long wordAndOneUG wordAndOne.groupBy(0);// 5. 分组内聚合统计同理flink是通过索引位置得到字段比如1就是第二个字段AggregateOperatorTuple2String, Long sum wordAndOneUG.sum(1);// 6. 打印结果sum.print();} }批处理结果 为什么说是DataSet api 数据源DataSource 继承Operator 继承DataSet 转换 ​ FlatMapOperator 继承SingleInputUdfOperator 继承SingleInputOperator 继承Operator 继承DataSet ​ AggregateOperator 继承SingleInputOperator 继承Operator 继承DataSet 所以把这一整套称为DataSet api。 2、DataStream api public class FlinkDemo {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取文件DataStreamSourceString lineDSS env.readTextFile(./src/main/java/com/shan/words.txt);// 3. 转换数据格式SingleOutputStreamOperatorTuple2String, Long wordAndOne lineDSS.flatMap((String line, CollectorString words) - {Arrays.stream(line.split( )).forEach(words::collect);}).returns(Types.STRING).map(word - Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 分组KeyedStreamTuple2String, Long, String wordAndOneKS wordAndOne.keyBy(t - t.f0);// 5. 求和SingleOutputStreamOperatorTuple2String, Long result wordAndOneKS.sum(1);// 6. 打印result.print();// 7. 执行env.execute();} }流处理结果 3、特点 idea 执行代码通过多线程并行模拟flink这个分布式框架的并行的特点。idea通过线程模拟任务槽flink的并行单位-并行子任务编号它的数量是由并行度决定的简单理解并行度就是当前任务分成多少份去做多线程的并行处理的程度、个数idea 没有设置并行度默认就是电脑cpu核心数量(逻辑处理器数量)小细节hello前面的子任务编号是相同的说明分组的时候将统计hello的个数分给同一个子任务去执行只有在同一个任务上才能进行叠加 并行不能保证顺序 4、流数据特点应该是源源不断的到来—需要监听捕获 代码中将数据源来源进行修改成监听端口即可 public class FlinkDemo {public static void main(String[] args) throws Exception {// 1. 创建流式执行环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取数据流 [代码中将数据源来源进行修改成监听端口即可]DataStreamSourceString lineDSS env.socketTextStream(hadoop102, 7777);// 3. 转换数据格式SingleOutputStreamOperatorTuple2String, Long wordAndOne lineDSS.flatMap((String line, CollectorString words) - {Arrays.stream(line.split( )).forEach(words::collect);}).returns(Types.STRING).map(word - Tuple2.of(word, 1L)).returns(Types.TUPLE(Types.STRING, Types.LONG));// 4. 分组KeyedStreamTuple2String, Long, String wordAndOneKS wordAndOne.keyBy(t - t.f0);// 5. 求和SingleOutputStreamOperatorTuple2String, Long result wordAndOneKS.sum(1);// 6. 打印result.print();// 7. 执行env.execute();} }socket的数据流数据的发送可以通过Linux系统自带的netcat工具进行模拟。 # 在 Linux 环境的主机 hadoop102 上执行下列命令发送数据进行测试 [atshanhadoop102 ~]$ nc -lk 7777启动 java 程序 我们会发现程序启动之后没有任何输出、也不会退出。这是正常的——因为 Flink 的流处理是事件驱动的当前程序会一直处于监听状态只有接收到数据才会执行任务、输出统计结果。 在 Linux 环境的主机 hadoop102 上发送数据 hello flink hello world hello java可以看到程序的控制台有数据的输出 二、flink 系统架构/Flink 作业提交运行的原理 我们编写的代码对应着在Flink集群上执行的一个作业所以我们在本地执行代码其实是idea开发环境中根据引入的依赖先模拟启动一个Flink集群 然后把我们代码中定义好的操作作为作业job要打包好 然后将作业提交到集群上创建好要执行的任务等待数据输入。 1、Flink 中的几个关键组件 客户端Client、作业管理器JobManager和任务管理器TaskManager。 (1) 客户端和flink 集群关系没那么大主要就是向flink 提交一下作业。 我们的代码实际上是由客户端获取并做转换之后提交给JobManger 的。 (2) JobManager对作业进行调度管理 JobManager获取到要执行的作业后会进一步处理转换然后分发任务给众多的 TaskManager (3) TaskManagers: 也叫worker等待数据输入进行实际计算。 2、flink 系统架构详细图 JobManager也称为 master用于协调分布式执行。负责调度任务检查点失败恢复等。TaskManager也称为 worker用于执行数据流图的任务更准确地说是计算子任务并对数据流进行缓冲、交换。Flink 运行环境中至少包含一个任务管理器。 3、Flink之运行时环境 (1) JobManager JobManager是Flink系统的协调者它负责接收Flink Job调度组成Job的多个Task的执行。同时JobManager还负责收集Job的状态信息并管理Flink集群中从节点TaskManager。 ■ JobManager所负责的各项管理功能它接收到并处理的事件主要包括RegisterTaskManager、SubmitJob、CancelJob、UpdateTaskExecutionState、JobStatusChanged、RequestNextInputSplit RegisterTaskManager 在Flink集群启动的时候TaskManager会向JobManager注册如果注册成功则JobManager会向TaskManager回复消息AcknowledgeRegistration。 SubmitJob Flink程序内部通过Client向JobManager提交Flink Job其中在消息SubmitJob中以JobGraph形式描述了Job的基本信息。 CancelJob 请求取消一个Flink Job的执行CancelJob消息中包含了Job的ID如果成功则返回消息CancellationSuccess失败则返回消息CancellationFailure。 UpdateTaskExecutionState TaskManager会向JobManager请求更新ExecutionGraph中的ExecutionVertex的状态信息更新成功则返回true。 JobStatusChanged ExecutionGraph向JobManager发送该消息用来表示Flink Job的状态发生的变化例如RUNNING、CANCELING、FINISHED等。 RequestNextInputSplit 运行在TaskManager上面的Task请求获取下一个要处理的输入Split成功则返回NextInputSplit。 (2) TaskManager TaskManager也是一个Actor它是实际负责执行计算的Worker在其上执行Flink Job的一组Task。每个TaskManager负责管理其所在节点上的资源信息如内存、磁盘、网络在启动的时候将资源的状态向JobManager汇报。TaskManager端可以分成两个阶段: 注册、可操作阶段 注册阶段 TaskManager会向JobManager注册发送RegisterTaskManager消息等待JobManager返回AcknowledgeRegistration然后TaskManager就可以进行初始化过程。 可操作阶段 该阶段TaskManager可以接收并处理与Task有关的消息如SubmitTask、CancelTask、FailTask。如果TaskManager无法连接到JobManager这是TaskManager就失去了与JobManager的联系会自动进入“注册阶段”只有完成注册才能继续处理Task相关的消息。 (3) Client 当用户提交一个Flink程序时会首先创建一个Client该Client首先会对用户提交的Flink程序进行预处理并提交到Flink集群中处理所以Client需要从用户提交的Flink程序配置中获取JobManager的地址并建立到JobManager的连接将Flink Job提交给JobManager。 Client会将用户提交的Flink程序组装一个JobGraph 并且是以JobGraph的形式提交的。一个JobGraph是一个Flink Dataflow它由多个JobVertex组成的DAG。其中一个JobGraph包含了一个Flink程序的如下信息JobID、Job名称、配置信息、一组JobVertex等。 三、flink 集群部署-一主二从 flink 集群默认是本地启动即Flink 本地启动直接执行 start-cluster.sh 0、flink 安装 (1) 下载 细节1flink有两个版本一个版本是和hadoop捆绑的另一个是单独的后缀包含scala细节2注意最新版的flink 要求的java版本是1113版本的flinkjava版本是8下载链接https://flink.apache.org/zh/downloads.html wget https://archive.apache.org/dist/flink/flink-1.13.6/flink-1.13.6-bin-scala_2.12.tgz(2) 解压 习惯解压到/opt下自己建立的模块modules目录下 tar -zxvf flink-1.13.6-bin-scala_2.12.tgz -C /opt/modules/(3) 配置环境: vim /etc/profile# 在配置文件中添加如下内容 export FLINK_HOME/opt/flink-1.13.6 export PATH$FLINK_HOME/bin:$PATH# 保存一下配置 source /etc/profile1、服务器的准备 本书中三台服务器的具体设置如下 ⚫ 节点服务器 1IP 地址为 192.168.10.102主机名为 hadoop101。 ⚫ 节点服务器 2IP 地址为 192.168.10.103主机名为 hadoop102。 ⚫ 节点服务器 3IP 地址为 192.168.10.104主机名为 hadoop103。 配置工作先在其中一台服务器上对flink根目录下配置文件进行修改然后把修改好后的flink 根目录分发给另外两个节点服务器(通过scp命令)。 # 主机配置修改完毕后将 Flink 安装目录发给另外两个节点服务器 $ scp -r ./flink-1.13.0 atguiguhadoop102:/opt/module $ scp -r ./flink-1.13.0 atguiguhadoop103:/opt/module配置好后 在主机那台机器通过命令start-cluster.sh 启动集群时看到日志提示它是主机通过jps命令看到StandaloneSessionClusterEntrypoint 在从机上通过jps命令看到打印信息提示它是从机TaskManagerRunner。 ■ 跨服务器的远程拷贝命令scp scp是secure copy的简写用于在Linux下进行远程拷贝文件的命令和它类似的命令有cp不过cp只是在本机进行拷贝不能跨服务器而且scp传输是加密的。 命令格式scp [参数] [需要传输的文件原路径] [接收传输的文件的服务器目录路径] scp命令可以将当前文件资源拷贝给远程服务器也可以将远程服务器的文件资源拷贝到本机上 # 在本地服务器上将/root/shan目录下所有文件传输到服务器10.11.34.73 的/home/目录下命令为 scp -r /root/shan root10.11.34.73:/home/# 将服务器10.11.34.73 的/home/shanshan 目录下所有文件传递到本机服务器的/root/ 目录下命令为 scp -r 10.11.34.73:/home/shanshan /root2、主机的配置 hadoop101 修改flink 配置文件conf目录下的flink-conf.yaml # 默认是localhost jobmanager.rpc.address: hadoop101【这一步可以省略】修改flink 配置文件conf目录下的masters # 默认是localhost hadoop101:80813、从机的配置 hadoop102、hadoop103 修改flink 配置文件conf目录下的workers # 默认是localhost hadoop102 hadoop1034、flink 的 webUI 界面 概况可以看到可用的资源、运行的jobs数量、运行的job列表、完成的job列表也可以在Jobs 下看到更详细的信息 当前集群的 TaskManager 数量为 2由于默认每个TaskManager的Slot数量为 1所以总Slot数 和 可用Slot数都为2。 看到从机taskManagers 的情况 taskManagers 内存 每一个taskManager(worker) 其实就是一个独立的jvm实例[jvm 进程]站在jvm 进程的角度理解内存 Total Process Memory 其实就是jvm包括了JVM Heap 堆内存、Off-Heap Memory 堆外内存。 其中flink 管理的内存包括代码定义的对象放到的JVM Heap 堆内存以及flink定义好的状态会放在 Off-Heap Memory 的Managed Memory还有直接内存。 四、flink 命令 1、flink 提交运行作业(命令行的方式可以设置更多的参数) 也可以通过ui界面提交作业 # 在master主机上先使用命令rz 进行文件上传# 进入xxx.jar包所在目录 flink run -c com.xsj.whale.flink.task.CrawlerToKafka -p 4 --detached -D taskmanager.numberOfTaskSlots4 -D jobmanager.memory.process.size2048mb -D taskmanager.memory.process.size5120mb xxx.jar命令行提交作业的细节如果是直接在主机的命令行窗口上提交则不用通过参数指定主机。 在从机上通过命令行提交作业需要通过参数 -m 主机名端口 指定master主机和端口号。 # 先把xxx.jar 上传到从机上然后在其目录下提交作业 flink run -m hadoop102:8081 -c 主程序的类全限定名 -p 2 xxx.jar启动flink start-cluster.sh结束flink stop-cluster.sh取消作业 flink cancel 作业id查看flink 输出结果 # 进入flink的日志目录 tail log/flink-*-taskexecutor-*.out五、flink 常见错误 1、NoResourceAvailableException: Could not acquire the minimum required resources. 原因没有资源的情况下提交作业。 2、异常被抛出不被catch会导致flink任务挂掉 业务的代码写错了但是对异常没有try…catch异常被底层的fink捕获后任务挂掉 六、flink 的优化配置与调优 在 flink-conf.yaml 文件中还可以对集群中的 JobManager 和 TaskManager 组件进行优化配置 1、通用配置-内存调优 jobmanager.memory.process.size对 JobManager 进程可使用到的全部内存进行配置包括 JVM 元空间和其他开销默认为 1600M可以根据集群规模进行适当调整。 taskmanager.memory.process.size对 TaskManager 进程可使用到的全部内存进行配置包括 JVM 元空间和其他开销默认为 1600M可以根据集群规模进行适当调整。 taskmanager.numberOfTaskSlots对每个 TaskManager 能够分配的 Slot 数量进行配置默认为 1可根据 TaskManager 所在的机器能够提供给Flink 的 CPU 数量决定。所谓Slot 就是 TaskManager 中具体运行一个任务所分配的计算资源。 parallelism.defaultFlink 任务执行的默认并行度优先级低于代码中进行的并行度配置和任务提交时使用参数指定的并行度数量。 七、flink 的部署 1、flink 资源被占用的问题 flink 的资源被占用的时候还需要提交或者追加作业就得等前面的资源被释放了或者需要集群新增加机器。 一种手动管理一切的方式不够方便 真正的企业级应用里我们更加希望是有一套平台资源不够了你再去申请再去申请更多资源而我只需要做的就是只是将集群跑起来作业跑起来。 “你不要总跟我提示资源不够用了”。— flink 要和不同的资源管理平台要结合在一起 2、flink 的部署模式 flink 的不同场景不同的部署模式。 会话模式(Session Mode) 单作业模式(Per-JobMode) 应用模式(Application Mode) 它们的区别主要在于:集群的生命周期以及资源的分配方式。 会话模式 特点资源有限优点方便集群启动起来后资源也确定了固定放在那等待使用缺点资源共享一旦资源不够提交作业就失败应用适合**单个规模小执行时间短**的大量作业。上一个作业释放完资源就可以给下一个作业了充分利用资源集群不跟着作业的状态而发生改变当我们kill或者cancle作业之后集群依然存在生命周期远大于作业。作业如果竞争资源竞争不到怎么办–等待那个占用了资源的作业运行结束或者被cancle然后释放出资源 会话模式其实最符合常规思维就是我们前面的集群启动默认就是会话模式。我们需要先启动一个集群保持一个会话在这个会话中通过客户端提交作业集群启动时所有资源就都已经确定所以所有提交的作业会竞争集群中的资源。 八、flink的print 底层 1、旁路输出流 //旁路输出流print taptapGroupFeedStreamSideOutput.print(); taptapGroupFeedStreamSideOutput.printToErr();// --PrintSinkFunctionPublicEvolvingpublic DataStreamSinkT print() {//创建一个打印汇总算子对象PrintSinkFunctionT printFunction new PrintSinkFunction();//添加到汇总里return addSink(printFunction).name(Print to Std. Out);}public DataStreamSinkT addSink(SinkFunctionT sinkFunction) {// 1、在转换对象中获取到输出类型transformation.getOutputType();//如果汇总算子是输入类型的配置则设置一下相关配置 if (sinkFunction instanceof InputTypeConfigurable) {((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());}//2、清理一下汇总算子并构建汇总操作 StreamSinkT sinkOperator new StreamSink(clean(sinkFunction));//3、构建汇总数据流并把汇总操作放进去DataStreamSinkT sink new DataStreamSink(this, sinkOperator);//4、给环境添加操作(汇总数据流中的转换)getExecutionEnvironment().addOperator(sink.getTransformation());//5、返回汇总数据流return sink;}如果本文对你有帮助的话记得给一乐点个赞哦感谢
http://www.w-s-a.com/news/9473/

相关文章:

  • 二手市场网站建设的目的长沙ui设计公司
  • 微信公众号做留言网站wordpress详情页选择模板
  • php网站开发面向对象教程如何做分享赚钱的网站
  • 山东网站建设最便宜常州网站建站公司
  • 网站地图 seo中国建设招标网是私人网站吗
  • 高中作文网站全网营销有哪些平台
  • 网站构建建设制作平台上海搬家公司收费价目表
  • 成功案例展示网站做网站赚多少钱
  • 建设银行网站用什么字体网站建站后维护需要做哪些
  • 有哪些做平面设计好素材网站有哪些开网站建设
  • 国际交流网站平台有哪些筑建网
  • 网站程序是如何开发的江门市住房建设管理局网站
  • 网站建设一般需要几个步骤昵图网免费素材
  • 个人网站建设需求说明书微信域名防封在线生成
  • 专业网站建设的公司wordpress后台没有模板
  • 哈尔滨网站运营服务商制作外贸网站公司
  • 个人网站需要备案宁波网站推广工具
  • 苏州建设银行网站首页wordpress修改密码
  • 网站建设员工技能要求网站制作简单协议
  • 没有ipc备案的网站wordpress isux主题
  • 清远做网站电子商务网站建设需要的语言及特点6
  • 万州那家做网站c语言基础知识入门
  • 齐河网站建设公司价格网站建设包括什么
  • 论坛网站开发费用怎么把文件放到网站的根目录
  • 海南省零售户电商网站官渡区住房和城乡建设局网站
  • 怎么找淘宝客网站最新军事战况
  • 缺乏门户网站建设网页设计与制作项目教程第二版
  • 手机网站横竖屏一般做建设的是什么公司
  • 免费网站建设无广告网站开发 华景新城
  • 湖州网站制作报价西安网站开发有哪些公司