深圳医疗网站建设公司,360网站建设商家,seo外包 杭州,济南正规企业站seoSaprk-日志实战
一、用户行为日志
1.概念
用户每次访问网站时所有的行为日志(访问、浏览、搜索、点击)用户行为轨迹#xff0c;流量日志2.原因
分析日志#xff1a;网站页面访问量网站的粘性推荐3.生产渠道
(1)Nginx(2)Ajax4.日志内容 日志数据内容#xff1a;1.访问的…Saprk-日志实战
一、用户行为日志
1.概念
用户每次访问网站时所有的行为日志(访问、浏览、搜索、点击)用户行为轨迹流量日志2.原因
分析日志网站页面访问量网站的粘性推荐3.生产渠道
(1)Nginx(2)Ajax4.日志内容 日志数据内容1.访问的系统属性操作系统、浏览器等2.访问特征点击URL跳转页面(referer)、页面停留时间3.访问信息seesion_id、访问id信息(地市\运营商)注意Nginx配置可以获取指定信息5.意义
(1)网站的眼睛投放广告收益
(2)网站的神经网站布局(影响用户体验)
(3)网站的大脑二、离线数据处理
1.处理流程
1)数据采集
Flume:产生的Web日志写入到HDFS2)数据清洗Spark\Hive\MapReduce--》HDFS(Hive/Spark SQL表)3)数据处理按照业务逻辑进行统计分析Spark\Hive\MapReduce--》HDFS(Hive/Spark SQL表)4)处理结果入库RDBMS(MySQL)\NoSQL(HBase、Redis)5)数据可视化展示通过图形化展示饼图、柱状图、地图、折线图Echarts、HUE、Zeppelin三、项目需求
code/video
需求一统计imooc主站最受欢迎的课程/手记Top N访问次数需求二按地市统计imooc主站最受欢迎的Top N课程a.根据IP地址获取出城市信息b.窗口函数在Spark SQL中的使用需求三按流量统计imooc主站最受欢迎的Top N课程四、日志内容
需要字段访问时间、访问URL、访问过程耗费流量、访问IP地址日志处理一般的日志处理方式,我们是需要进行分区的,按照日志中的访问时间进行相应的分区比如: d, h,m5(每5分钟一个分区)输入:访问时间、访问URL、耗费的流量、访问IP地址信息
输出:URL、cmsType(video/article)、cmsId(编号)、流量、ip、城市信息、访问时间、天Maven打包
mvn install:install-file -DfileD:\ipdatabase-master\ipdatabase-master\target\ipdatabase-1.0-SNAPSHOT.jar -DgroupIdcom.ggstar2 -DartifactIdipdatabase -Dversion1.0 -Dpackagingjar五、数据清洗
1.原始日志解析
package com.saddam.spark.MuKe.ImoocProjectimport org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession/*** 第一步清洗抽取出所需要指定列数据** 添加断点可以查看各个字段*/
object SparkStatFormatJob {def main(args: Array[String]): Unit {val sparkSparkSession.builder().appName(SparkStatFormatJob).master(local[2]).getOrCreate()val logRDD spark.sparkContext.textFile(D:\\Spark\\DataSets\\access.20161111.log)// logRDD.take(10).foreach(println)val result logRDD.map(line {val split line.split( )val ip split(0)/*** 原始日志的第三个和第四个字段拼接起来就是完整的时间字段:* [10/Nov/2016:00:01:02 0800]yyyy-MM-dd HH*///TODO 使用时间解析工具类val time split(3) split(4)//http://www.imooc.com/code/1852 引号需要放空val url split(11).replaceAll(\, )val traffic split(9)//使用元组// (ip,DateUtils.parse(time),url,traffic)DateUtils.parse(time) \t url \t traffic \t ip}).take(20).foreach(println)// result.saveAsTextFile(D:\\Spark\\OutPut\\log_local_2)
/*
(10.100.0.1,[10/Nov/2016:00:01:02 0800])
(117.35.88.11,[10/Nov/2016:00:01:02 0800])
(182.106.215.93,[10/Nov/2016:00:01:02 0800])
(10.100.0.1,[10/Nov/2016:00:01:02 0800])*/spark.stop()
}
}2.日期工具类
package com.saddam.spark.MuKe.ImoocProjectimport java.util.{Date, Locale}
import org.apache.commons.lang3.time.FastDateFormat/*** 日期时间解析工具类*/
object DateUtils {// 输入文件日期时间格式//[10/Nov/2016:00:01:02 0800]val YYYYMMDDHHMM_TIME_FOEMAT FastDateFormat.getInstance(dd/MMM/yyyy:HH:mm:ss Z,Locale.ENGLISH)//目标日期格式//2016-11-10 00:01:02val TARGET_FORMATFastDateFormat.getInstance(yyyy-MM-dd HH:mm:ss)/***解析时间* param time* return*/def parse(time:String){TARGET_FORMAT.format(new Date(getTime(time)))}/*** 获取输入日志时间long类型** time:[10/Nov/2016:00:01:02 0800]* param time* return*/def getTime(time:String) {try {YYYYMMDDHHMM_TIME_FOEMAT.parse(time.substring(time.indexOf([) 1, time.lastIndexOf(]))).getTime} catch {case e: Exception {0l}}}def main(args: Array[String]): Unit {println(parse([10/Nov/2016:00:01:02 0800]))}
}六、项目需求
需求一 统计imooc主站最受欢迎的课程/手记TopN访问次数按照需求完成统计信息并将统计结果入库--使用DataFrame API完成统计分析--使用SQL API完成统计分析package com.saddam.spark.MuKeimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._import scala.collection.mutable.ListBufferobject PopularVideoVisits {def main(args: Array[String]): Unit {val sparkSparkSession.builder().appName(TopNStatJob).config(spark.sql.sources.partitionColumnTypeInference.enable, false).master(local[2]).getOrCreate()val accessDFspark.read.format(parquet).load(D:\\Spark\\DataSets\\clean_city)accessDF.printSchema()accessDF.show(false)/*
---------------------------------------------------------------------------------------------------
|url |cmsType|cmsId|traffic|ip |city|time |day |
---------------------------------------------------------------------------------------------------
|http://www.imooc.com/video/4500 |video |4500 |304 |218.75.35.226 |鏈煡 |2017-05-11 14:09:14|20170511|
|http://www.imooc.com/video/14623 |video |14623|69 |202.96.134.133 |鏈煡 |2017-05-11 15:25:05|20170511|
|http://www.imooc.com/article/17894|article|17894|115 |202.96.134.133 |鏈煡 |2017-05-11 07:50:01|20170511|*///代码重构val day20170511videoAccessTopNStat(spark,accessDF,day)//MySQL工具类测试println(MySQLUtils.getConnection())/*** 按照流量进行统计*/def videoAccessTopNStat(spark:SparkSession,accessDF:DataFrame,day:String){//隐式转换import spark.implicits._//TODO 统计方式一:DataFrame方式统计videoval videoAccessTopNDF accessDF.filter($dayday $cmsTypevideo).groupBy(day,cmsId).agg(count(cmsId).as(times))videoAccessTopNDF.printSchema()videoAccessTopNDF.show(false)//TODO 统计方式二:SQL方式统计articleaccessDF.createOrReplaceTempView(temp)val videoAccessTopNSQL spark.sql(select day,cmsId,count(1) as times from temp where day20170511 and cmsTypearticle group by day,cmsId order by times desc)videoAccessTopNSQL.show(false)/*** TODO 将最受欢迎的TopN课程统计结果写入MySQL**/try{videoAccessTopNSQL.foreachPartition(partitionOfRecords{val list new ListBuffer[DayVideoAccessStat]partitionOfRecords.foreach(info{val dayinfo.getAs[Integer](day).toStringval cmsIdinfo.getAs[Long](cmsId)val timesinfo.getAs[Long](times)list.append(DayVideoAccessStat(day,cmsId,times))})StatDAO.insertDayVideoAccessTopN(list)})}catch {case e:Exceptione.printStackTrace()}}spark.stop()}/*** 课程访问次数实体类*/case class DayVideoAccessStat(day:String,cmsId:Long,times:Long)/*** TODO MySQL操作工具类*/object MySQLUtils{def getConnection(){DriverManager.getConnection(jdbc:mysql://121.37.2x.xx:3306/imooc_project?userrootpasswordxxxxxxuseSSLfalse)}/*** 释放数据库连接等资源* param connection* param pstmt*/def release(connection: Connection, pstmt: PreparedStatement): Unit {try {if (pstmt ! null) {pstmt.close()}} catch {case e: Exception e.printStackTrace()} finally {if (connection ! null) {connection.close()}}}}/*** TODO DAO数据库接口*/object StatDAO{/*** 批量保存DayVideoAccessStat到数据库* insertDayVideoAccessTopN每天访问视频的*/def insertDayVideoAccessTopN(list: ListBuffer[DayVideoAccessStat]): Unit {var connection:Connection nullvar pstmt:PreparedStatement nulltry {connection MySQLUtils.getConnection()connection.setAutoCommit(false) //设置手动提交val sql insert into day_video_access_topn_stat2(day,cms_id,times) values (?,?,?)pstmt connection.prepareStatement(sql)for (ele - list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.cmsId)pstmt.setLong(3, ele.times)pstmt.addBatch()}pstmt.executeBatch() // 执行批量处理connection.commit() //手工提交} catch {case e: Exception e.printStackTrace()} finally {MySQLUtils.release(connection, pstmt)}}}}需求二 按地市统计imooc主站最受欢迎的Top N课程package com.saddam.spark.MuKeimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._import scala.collection.mutable.ListBufferobject PopularCiytVideoVisits {def main(args: Array[String]): Unit {val sparkSparkSession.builder().appName(TopNStatJob).config(spark.sql.sources.partitionColumnTypeInference.enable, false).master(local[2]).getOrCreate()val accessDFspark.read.format(parquet).load(D:\\Spark\\DataSets\\clean_city)accessDF.printSchema()accessDF.show(false)//代码重构val day20170511//TODO 按照地市进行统计TopN课程cityAccessTopNStat(spark,accessDF,day)/*** 按照地市进行统计TopN课程* param spark* param accessDf*/def cityAccessTopNStat(spark: SparkSession,accessDF:DataFrame,day:String){import spark.implicits._val cityAccessTopNDFaccessDF.filter($dayday$cmsTypevideo).groupBy(day,city,cmsId).agg(count(cmsId).as(times)).orderBy($times.desc)cityAccessTopNDF.printSchema()cityAccessTopNDF.show(false)//Windows函数在Spark SQL的使用val top3DFcityAccessTopNDF.select(cityAccessTopNDF(day),cityAccessTopNDF(city),cityAccessTopNDF(cmsId),cityAccessTopNDF(times),row_number().over(Window.partitionBy(cityAccessTopNDF(city)).orderBy(cityAccessTopNDF(times).desc)).as(times_rank)).filter(times_rank 3) //.show(false) //Top3/*** 将地市进行统计TopN课程统计结果写入MySQL**/try{top3DF.foreachPartition(partitionOfRecords{val list new ListBuffer[DayCityVideoAccessStat]partitionOfRecords.foreach(info{val dayinfo.getAs[Integer](day).toStringval cmsIdinfo.getAs[Long](cmsId)val cityinfo.getAs[String](city)val timesinfo.getAs[Long](times)val timesRankinfo.getAs[Int](times_rank)list.append(DayCityVideoAccessStat(day,cmsId,city,times,timesRank))})StatDAO.insertDayCityVideoAccessTopN(list)})}catch {case e:Exceptione.printStackTrace()}}spark.stop()} /*** 实体类*/case class DayCityVideoAccessStat(day:String, cmsId:Long, city:String,times:Long,timesRank:Int)/*** TODO MySQL操作工具类*/object MySQLUtils{def getConnection(){DriverManager.getConnection(jdbc:mysql://121.37.2x.xx:3306/imooc_project?userrootpasswordxxxxxxuseSSLfalse)}/*** 释放数据库连接等资源* param connection* param pstmt*/def release(connection: Connection, pstmt: PreparedStatement): Unit {try {if (pstmt ! null) {pstmt.close()}} catch {case e: Exception e.printStackTrace()} finally {if (connection ! null) {connection.close()}}}}/*** TODO DAO数据库接口*/object StatDAO{/*** 批量保存DayCityVideoAccessStat到数据库*/def insertDayCityVideoAccessTopN(list: ListBuffer[DayCityVideoAccessStat]): Unit {var connection: Connection nullvar pstmt: PreparedStatement nulltry {connection MySQLUtils.getConnection()connection.setAutoCommit(false) //设置手动提交val sql insert into day_video_city_access_topn_stat(day,cms_id,city,times,times_rank) values (?,?,?,?,?) pstmt connection.prepareStatement(sql)for (ele - list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.cmsId)pstmt.setString(3, ele.city)pstmt.setLong(4, ele.times)pstmt.setInt(5, ele.timesRank)pstmt.addBatch()}pstmt.executeBatch() // 执行批量处理connection.commit() //手工提交} catch {case e: Exception e.printStackTrace()} finally {MySQLUtils.release(connection, pstmt)}}}
}需求三 按流量统计imooc主站最受欢迎的Top N课程package com.saddam.spark.MuKeimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._import scala.collection.mutable.ListBufferobject VideoTrafficVisits {def main(args: Array[String]): Unit {val sparkSparkSession.builder().appName(TopNStatJob).config(spark.sql.sources.partitionColumnTypeInference.enable, false).master(local[2]).getOrCreate()val accessDFspark.read.format(parquet).load(D:\\Spark\\DataSets\\clean_city)accessDF.printSchema()accessDF.show(false)//代码重构val day20170511//TODO 按照流量进行统计videoTrafficsTopNStat(spark,accessDF,day)/*** 按照流量进行统计*/def videoTrafficsTopNStat(spark: SparkSession, accessDF:DataFrame,day:String): Unit {import spark.implicits._val cityAccessTopNDF accessDF.filter($day day $cmsType video).groupBy(day, cmsId).agg(sum(traffic).as(traffics)).orderBy($traffics.desc)//.show(false)/*** 将流量进行统计TopN课程统计结果写入MySQL**/try{cityAccessTopNDF.foreachPartition(partitionOfRecords{val list new ListBuffer[DayVideoTrafficsStat]partitionOfRecords.foreach(info{val dayinfo.getAs[Integer](day).toStringval cmsIdinfo.getAs[Long](cmsId)val trafficsinfo.getAs[Long](traffics)list.append(DayVideoTrafficsStat(day,cmsId,traffics))})StatDAO.insertDayVideoTrafficsAccessTopN(list)})}catch {case e:Exceptione.printStackTrace()}}spark.stop()}/*** 实体类*/case class DayVideoTrafficsStat(day:String,cmsId:Long,traffics:Long)/*** TODO MySQL操作工具类*/object MySQLUtils{def getConnection(){DriverManager.getConnection(jdbc:mysql://121.37.2x.2xx1:3306/imooc_project?userrootpasswordxxxxxxuseSSLfalse)}/*** 释放数据库连接等资源* param connection* param pstmt*/def release(connection: Connection, pstmt: PreparedStatement): Unit {try {if (pstmt ! null) {pstmt.close()}} catch {case e: Exception e.printStackTrace()} finally {if (connection ! null) {connection.close()}}}}/*** TODO DAO数据库接口*/object StatDAO{/*** 批量保存DayVideoTrafficsStat到数据库*/def insertDayVideoTrafficsAccessTopN(list: ListBuffer[DayVideoTrafficsStat]): Unit {var connection: Connection nullvar pstmt: PreparedStatement nulltry {connection MySQLUtils.getConnection()connection.setAutoCommit(false) //设置手动提交val sql insert into day_video_traffics_topn_stat(day,cms_id,traffics) values (?,?,?) pstmt connection.prepareStatement(sql)for (ele - list) {pstmt.setString(1, ele.day)pstmt.setLong(2, ele.cmsId)pstmt.setLong(3, ele.traffics)pstmt.addBatch()}pstmt.executeBatch() // 执行批量处理connection.commit() //手工提交} catch {case e: Exception e.printStackTrace()} finally {MySQLUtils.release(connection, pstmt)}}}
}删除已有数据 /*** 删除指定日期的数据*/def deleteData(day: String): Unit {val tables Array(day_video_access_topn_stat,day_video_city_access_topn_stat,day_video_traffics_topn_stat)var connection:Connection nullvar pstmt:PreparedStatement nulltry{connection MySQLUtils.getConnection()for(table - tables) {// delete from table ....val deleteSQL sdelete from $table where day ?pstmt connection.prepareStatement(deleteSQL)pstmt.setString(1, day)pstmt.executeUpdate()}}catch {case e:Exception e.printStackTrace()} finally {MySQLUtils.release(connection, pstmt)}}七、Zeppelin
官网
https://zeppelin.apache.org/1.解压缩
[roothadoop src]# tar zxvf zeppelin-0.7.1-bin-all2.改名
[roothadoop src]# mv zeppelin-0.7.1-bin-all zeppelin3.启动
[roothadoop bin]# ./zeppelin-daemon.sh start4.Web界面
http://121.37.2xx.xx:80805.修改JDBC驱动 com.mysql.jdbc.Driverxxxxxxjdbc:mysql://121.37.2x.xx:3306/imooc_project?root#mysql驱动
/usr/local/src/mysql-connector-java-5.1.27-bin.jar6.创建note 7.查询表
%jdbcshow tables;8.图形展示
%jdbcselect cms_id,times from day_video_access_topn_stat;八、Spark on Yarn
Spark运行模式
1Local:开发时使用
2Standalone:Spark自带的若一个集群是standalone,则需要在多台机器上同时部署Spark
3YARN:建议生产上使用该模式统一使用yarn进行整个集群作业(MR、Spark)的资源调度
4Mesos不管使用那种模式Spark应用程序代码是一模一样的只需要在提交的时候指定--master指定1.概述
Spark支持可插拔的集群管理模式对于yarn而言Spark Application仅仅只是一个客户端而已2.client模式
Driver运行在Client端(提交Spark作业的机器)
Client会和请求到的Container进行通信来完成作业的调度和执行Client是不能退出的
日志信息在控制台输出便于我们测试
3.cluster模式
Driver运行在ApplicationMaster中
Client提交完作业就可以关掉因为作业已在Yarn上运行了
日志在终端输出看控制台不到的因为日志在Driver端只能通过yarn logs -applicationId
4.两种模式对比
Driver运行位置ApplicationMaster的职责运行输出日志的位置5.案例
设置HADOOP_CONF_DIR?
Client模式
./bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master yarn \--executor-memory 1G \--num-executors 1 \/usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar \4Pi is roughly 3.1411378528446323
22/02/28 18:52:26 INFO server.ServerConnector: Stopped Spark1b0a7baf{HTTP/1.1}{0.0.0.0:4040}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler8a589a2{/stages/stage/kill,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler192f2f27{/jobs/job/kill,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler1bdf8190{/api,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler4f8969b0{/,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler6fefce9e{/static,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler74cec793{/executors/threadDump/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandlerf9b7332{/executors/threadDump,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler18e7143f{/executors/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler209775a9{/executors,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler5db4c359{/environment/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler2c177f9e{/environment,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler33617539{/storage/rdd/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler47874b25{/storage/rdd,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler290b1b2e{/storage/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler1fc0053e{/storage,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler77307458{/stages/pool/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler389adf1d{/stages/pool,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler7bf9b098{/stages/stage/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler72e34f77{/stages/stage,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler6e9319f{/stages/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler6fa590ba{/stages,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler2416a51{/jobs/job/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler293bb8a5{/jobs/job,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler37ebc9d8{/jobs/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler5217f3d0{/jobs,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.184.135:4040
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
22/02/28 18:52:26 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/02/28 18:52:26 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOptionNone,servicesList(),startedfalse)
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Stopped
22/02/28 18:52:26 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/02/28 18:52:26 INFO memory.MemoryStore: MemoryStore cleared
22/02/28 18:52:26 INFO storage.BlockManager: BlockManager stopped
22/02/28 18:52:26 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
22/02/28 18:52:26 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/02/28 18:52:26 INFO spark.SparkContext: Successfully stopped SparkContext
22/02/28 18:52:26 INFO util.ShutdownHookManager: Shutdown hook called
22/02/28 18:52:26 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a95834c0-d38b-457b-89b2-fed00d5bef56
Cluster模式
./bin/spark-submit \--class org.apache.spark.examples.SparkPi \--master yarn-cluster \--executor-memory 1G \--num-executors 1 \/usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar \4Pi is roughly 3.1411378528446323
22/02/28 18:52:26 INFO server.ServerConnector: Stopped Spark1b0a7baf{HTTP/1.1}{0.0.0.0:4040}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler8a589a2{/stages/stage/kill,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler192f2f27{/jobs/job/kill,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler1bdf8190{/api,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler4f8969b0{/,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler6fefce9e{/static,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler74cec793{/executors/threadDump/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandlerf9b7332{/executors/threadDump,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler18e7143f{/executors/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler209775a9{/executors,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler5db4c359{/environment/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler2c177f9e{/environment,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler33617539{/storage/rdd/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler47874b25{/storage/rdd,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler290b1b2e{/storage/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler1fc0053e{/storage,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler77307458{/stages/pool/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler389adf1d{/stages/pool,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler7bf9b098{/stages/stage/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler72e34f77{/stages/stage,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler6e9319f{/stages/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler6fa590ba{/stages,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler2416a51{/jobs/job/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler293bb8a5{/jobs/job,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler37ebc9d8{/jobs/json,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO handler.ContextHandler: Stopped o.s.j.s.ServletContextHandler5217f3d0{/jobs,null,UNAVAILABLE,Spark}
22/02/28 18:52:26 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.184.135:4040
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
22/02/28 18:52:26 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
22/02/28 18:52:26 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOptionNone,servicesList(),startedfalse)
22/02/28 18:52:26 INFO cluster.YarnClientSchedulerBackend: Stopped
22/02/28 18:52:26 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/02/28 18:52:26 INFO memory.MemoryStore: MemoryStore cleared
22/02/28 18:52:26 INFO storage.BlockManager: BlockManager stopped
22/02/28 18:52:26 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
22/02/28 18:52:26 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/02/28 18:52:26 INFO spark.SparkContext: Successfully stopped SparkContext
22/02/28 18:52:26 INFO util.ShutdownHookManager: Shutdown hook called
22/02/28 18:52:26 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a95834c0-d38b-457b-89b2-fed00d5bef56
[roothadoop01 spark]# ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster --executor-memory 1G --num-executors 1 /usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar 4
Warning: Master yarn-cluster is deprecated since 2.0. Please use master yarn with specified deploy mode instead.
22/02/28 18:54:32 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/02/28 18:54:32 WARN util.Utils: Your hostname, hadoop01.localdomain resolves to a loopback address: 127.0.0.1; using 192.168.184.135 instead (on interface ens33)
22/02/28 18:54:32 WARN util.Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/02/28 18:54:32 INFO client.RMProxy: Connecting to ResourceManager at hadoop01/192.168.184.135:8032
22/02/28 18:54:32 INFO yarn.Client: Requesting a new application from cluster with 1 NodeManagers
22/02/28 18:54:33 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
22/02/28 18:54:33 INFO yarn.Client: Will allocate AM container, with 1408 MB memory including 384 MB overhead
22/02/28 18:54:33 INFO yarn.Client: Setting up container launch context for our AM
22/02/28 18:54:33 INFO yarn.Client: Setting up the launch environment for our AM container
22/02/28 18:54:33 INFO yarn.Client: Preparing resources for our AM container
22/02/28 18:54:33 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
22/02/28 18:54:35 INFO yarn.Client: Uploading resource file:/tmp/spark-c7e3fb91-c7d0-4f59-86ba-705a9f256144/__spark_libs__3085975169933820625.zip - hdfs://hadoop01:9000/user/root/.sparkStaging/application_1646041633964_0004/__spark_libs__3085975169933820625.zip
22/02/28 18:54:35 INFO yarn.Client: Uploading resource file:/usr/local/src/spark/examples/jars/spark-examples_2.11-2.1.1.jar - hdfs://hadoop01:9000/user/root/.sparkStaging/application_1646041633964_0004/spark-examples_2.11-2.1.1.jar
22/02/28 18:54:35 INFO yarn.Client: Uploading resource file:/tmp/spark-c7e3fb91-c7d0-4f59-86ba-705a9f256144/__spark_conf__2818552262823480245.zip - hdfs://hadoop01:9000/user/root/.sparkStaging/application_1646041633964_0004/__spark_conf__.zip
22/02/28 18:54:35 INFO spark.SecurityManager: Changing view acls to: root
22/02/28 18:54:35 INFO spark.SecurityManager: Changing modify acls to: root
22/02/28 18:54:35 INFO spark.SecurityManager: Changing view acls groups to:
22/02/28 18:54:35 INFO spark.SecurityManager: Changing modify acls groups to:
22/02/28 18:54:35 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()
22/02/28 18:54:35 INFO yarn.Client: Submitting application application_1646041633964_0004 to ResourceManager
22/02/28 18:54:35 INFO impl.YarnClientImpl: Submitted application application_1646041633964_0004
22/02/28 18:54:36 INFO yarn.Client: Application report for application_1646041633964_0004 (state: ACCEPTED)
22/02/28 18:54:36 INFO yarn.Client:client token: N/Adiagnostics: N/AApplicationMaster host: N/AApplicationMaster RPC port: -1queue: defaultstart time: 1646045675928final status: UNDEFINEDtracking URL: http://hadoop01:8088/proxy/application_1646041633964_0004/user: root
22/02/28 18:54:37 INFO yarn.Client: Application report for application_1646041633964_0004 (state: ACCEPTED)
22/02/28 18:54:38 INFO yarn.Client: Application report for application_1646041633964_0004 (state: ACCEPTED)
22/02/28 18:54:39 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)
22/02/28 18:54:39 INFO yarn.Client:client token: N/Adiagnostics: N/AApplicationMaster host: 192.168.184.135ApplicationMaster RPC port: 0queue: defaultstart time: 1646045675928final status: UNDEFINEDtracking URL: http://hadoop01:8088/proxy/application_1646041633964_0004/user: root
22/02/28 18:54:40 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)
22/02/28 18:54:41 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)
22/02/28 18:54:42 INFO yarn.Client: Application report for application_1646041633964_0004 (state: RUNNING)
22/02/28 18:54:43 INFO yarn.Client: Application report for application_1646041633964_0004 (state: FINISHED)
22/02/28 18:54:43 INFO yarn.Client:client token: N/Adiagnostics: N/AApplicationMaster host: 192.168.184.135ApplicationMaster RPC port: 0queue: defaultstart time: 1646045675928final status: SUCCEEDEDtracking URL: http://hadoop01:8088/proxy/application_1646041633964_0004/user: root
22/02/28 18:54:43 INFO util.ShutdownHookManager: Shutdown hook called
22/02/28 18:54:43 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-c7e3fb91-c7d0-4f59-86ba-705a9f2561446.检测ID
[roothadoop01 spark]# yarn logs -applicationId application_1646041633964_000322/02/28 18:59:05 INFO client.RMProxy: Connecting to ResourceManager at hadoop01/192.168.184.135:8032
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/src/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/src/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
/tmp/logs/root/logs/application_1646041633964_0003 does not exist.
Log aggregation has not completed or is not enabled.#未指定参数看不到未作聚合日志配置需要通过webUI页面7.WebUI查看结果
http://hadoop01:8042/node/containerlogs/container_1646041633964_0004_01_000001/root九、Spark项目运行到YARN
maven打包依赖 1.IDEA项目代码-词频统计
package com.bigdataimport org.apache.spark.sql.SparkSessionobject WordCountYARN {def main(args: Array[String]): Unit {val sparkSparkSession.builder().getOrCreate()if(args.length!2){println(Usage:WordCountYARN inputPathoutputPath)}val Array(inputPath,outputPath)argsval rdd spark.sparkContext.textFile(inputPath)val df rdd.flatMap(xx.split(\t)).map(word(word,1)).reduceByKey((a,b)(ab))df.saveAsTextFile(outputPath)spark.stop()}}2.spark-submit
spark-submit \--class com.bigdata.WordCountYARN \--name WordCount \--master yarn \--executor-memory 1G \--num-executors 1 \/usr/local/src/spark/spark_jar/BYGJ.jar \hdfs://hadoop01:9000/wordcount.txt hdfs://hadoop01:9000/wc_output3.查询结果
[roothadoop01 spark_jar]# hadoop fs -cat /wc_output/part-*SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/src/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/src/hive/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
(hive,5)
(spark,5)
(hadoop,2)
(hbase,3)
-----------------------------------------
1.IDEA项目代码-日志清洗
package com.saddam.spark.MuKe.ImoocProject.LogCleanimport org.apache.spark.sql.{SaveMode, SparkSession}object SparkStatCleanJobYarn {def main(args: Array[String]): Unit {val sparkSparkSession.builder().getOrCreate()if(args.length!2){println(Usage:WordCountYARN inputPathoutputPath)}val Array(inputPath,outputPath)argsval accessRDD spark.sparkContext.textFile(inputPath)//TODO RDD-DFval accessDFspark.createDataFrame(accessRDD.map(xAccessConvertUtil.parseLog(x)),AccessConvertUtil.struct)accessDF.coalesce(1).write.format(parquet).mode(SaveMode.Overwrite).partitionBy(day).save(outputPath)spark.stop()}
}2.spark-submit
spark-submit \--class com.saddam.spark.MuKe.ImoocProject.LogClean.SparkStatCleanJobYarn \--name SparkStatCleanJobYarn \--master yarn \--executor-memory 1G \--num-executors 1 \--files /usr/local/src/spark/spark_jar/ipDatabase.csv,/usr/local/src/spark/spark_jar/ipRegion.xlsx \/usr/local/src/spark/spark_jar/Spark.jar \hdfs://hadoop01:9000/access.log hdfs://hadoop01:9000/log_output3.查询结果
进入spark-shell
[roothadoop01 datas]# spark-shell --master local[2] --jars /usr/local/src/mysql-connector-java-5.1.27-bin.jar获取hdfs输出文件
/log_output/day20170511/part-00000-36e30abb-3e42-4237-ad9f-a9f93258d4b2.snappy.parquet读取文件
scala spark.read.format(parquet).parquet(/log_output/day20170511/part-00000-36e30abb-3e42-4237-ad9f-a9f93258d4b2.snappy.parquet).show(false)SLF4J: Failed to load class org.slf4j.impl.StaticLoggerBinder.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
-------------------------------------------------------------------------------------------
|url |cmsType|cmsId|traffic|ip |city|time |
-------------------------------------------------------------------------------------------
|http://www.imooc.com/video/4500 |video |4500 |304 |218.75.35.226 | |2017-05-11 14:09:14|
|http://www.imooc.com/video/14623 |video |14623|69 |202.96.134.133 | |2017-05-11 15:25:05|
|http://www.imooc.com/article/17894|article|17894|115 |202.96.134.133 | |2017-05-11 07:50:01|
|http://www.imooc.com/article/17896|article|17896|804 |218.75.35.226 | |2017-05-11 02:46:43|
|http://www.imooc.com/article/17893|article|17893|893 |222.129.235.182| |2017-05-11 09:30:25|
|http://www.imooc.com/article/17891|article|17891|407 |218.75.35.226 | |2017-05-11 08:07:35|
|http://www.imooc.com/article/17897|article|17897|78 |202.96.134.133 | |2017-05-11 19:08:13|
|http://www.imooc.com/article/17894|article|17894|658 |222.129.235.182| |2017-05-11 04:18:47|
|http://www.imooc.com/article/17893|article|17893|161 |58.32.19.255 | |2017-05-11 01:25:21|
|http://www.imooc.com/article/17895|article|17895|701 |218.22.9.56 | |2017-05-11 13:37:22|
|http://www.imooc.com/article/17892|article|17892|986 |218.75.35.226 | |2017-05-11 05:53:47|
|http://www.imooc.com/video/14540 |video |14540|987 |58.32.19.255 | |2017-05-11 18:44:56|
|http://www.imooc.com/article/17892|article|17892|610 |218.75.35.226 | |2017-05-11 17:48:51|
|http://www.imooc.com/article/17893|article|17893|0 |218.22.9.56 | |2017-05-11 16:20:03|
|http://www.imooc.com/article/17891|article|17891|262 |58.32.19.255 | |2017-05-11 00:38:01|
|http://www.imooc.com/video/4600 |video |4600 |465 |218.75.35.226 | |2017-05-11 17:38:16|
|http://www.imooc.com/video/4600 |video |4600 |833 |222.129.235.182| |2017-05-11 07:11:36|
|http://www.imooc.com/article/17895|article|17895|320 |222.129.235.182| |2017-05-11 19:25:04|
|http://www.imooc.com/article/17898|article|17898|460 |202.96.134.133 | |2017-05-11 15:14:28|
|http://www.imooc.com/article/17899|article|17899|389 |222.129.235.182| |2017-05-11 02:43:15|
-------------------------------------------------------------------------------------------
only showing top 20 rows
十、项目性能调优
1.集群优化
存储格式的选择https://www.infoq.cn/article/bigdata-store-choose/压缩格式的选择默认snapy
.config(spark.sql.parquet.compression.codec,gzip)修改2.代码优化
选择高性能算子复用已有的数据3.参数优化
并行度spark.sql.shuffle.partitions 200 配置在为联接或聚合进行数据洗牌时使用的分区数。
spark-submit:--conf spark.sql.shuffle.partitions500
IDEA:.config(,)分区字段类型推测spark.sql.sources.partitionColumnTypeInference.enabled
spark-submit:--conf spark.sql.sources.partitionColumnTypeInference.enabledfalse
IDEA:.config(,)262 |58.32.19.255 | |2017-05-11 00:38:01| |http://www.imooc.com/video/4600 |video |4600 |465 |218.75.35.226 | |2017-05-11 17:38:16| |http://www.imooc.com/video/4600 |video |4600 |833 |222.129.235.182| |2017-05-11 07:11:36| |http://www.imooc.com/article/17895|article|17895|320 |222.129.235.182| |2017-05-11 19:25:04| |http://www.imooc.com/article/17898|article|17898|460 |202.96.134.133 | |2017-05-11 15:14:28| |http://www.imooc.com/article/17899|article|17899|389 |222.129.235.182| |2017-05-11 02:43:15| ±---------------------------------±------±----±------±--------------±—±------------------ only showing top 20 rows ## 十、项目性能调优### 1.集群优化~~~markdown
存储格式的选择https://www.infoq.cn/article/bigdata-store-choose/压缩格式的选择默认snapy
.config(spark.sql.parquet.compression.codec,gzip)修改2.代码优化
选择高性能算子复用已有的数据3.参数优化
并行度spark.sql.shuffle.partitions 200 配置在为联接或聚合进行数据洗牌时使用的分区数。
spark-submit:--conf spark.sql.shuffle.partitions500
IDEA:.config(,)分区字段类型推测spark.sql.sources.partitionColumnTypeInference.enabled
spark-submit:--conf spark.sql.sources.partitionColumnTypeInference.enabledfalse
IDEA:.config(,)