当前位置: 首页 > news >正文

农村基本制度建设网站网站建设公司位置

农村基本制度建设网站,网站建设公司位置,百度搜索引擎投放,深圳工业设计协会1. UDF函数#xff08;用户自定义函数#xff09; 一般指的是用户自己定义的单行函数。一进一出#xff0c;函数接受的是一行中的一个或者多个字段值#xff0c;返回一个值。比如MySQL中的#xff0c;日期相关的dateDiff函数#xff0c;字符串相关的substring函数。 先…1. UDF函数用户自定义函数 一般指的是用户自己定义的单行函数。一进一出函数接受的是一行中的一个或者多个字段值返回一个值。比如MySQL中的日期相关的dateDiff函数字符串相关的substring函数。 先准备数据 1.1 导入必要的包 首先确保导入必要的Spark包 import org.apache.spark.sql.SparkSession 1.2 创建SparkSession 创建一个SparkSession对象这是与Spark交互的入口。 1.3 定义UDF并注册到SparkSQL 定义一个Scala函数并将其注册为UDF。示例 1.4 使用UDF在SQL查询中 调用udf的register方法第一个参数是udf函数的函数名第二个参数是要注册为UDF的函数。 session.udf.register(all_income,(sal:Int,bonus:Int){sal*12 bonus}) 1.5 代码 尽量使用SparkSQL的sql形式的写法api写法太麻烦了。 object TestUDF{def main(args: Array[String]): Unit {val session SparkSession.builder().master(local[*]).appName(testUDF).getOrCreate()import session.implicits._val df session.sparkContext.textFile(D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt).map(t {val strs t.split( )(strs(0), strs(1), strs(2).toInt, strs(3).toInt)}).toDF(id, name, salary, bonus)session.udf.register(all_income,(sal:Int,bonus:Int){sal*12 bonus})import org.apache.spark.sql.functions // df.withColumn(all,functions.callUDF(all_income,$salary,$bonus)) // .select(id,name,all) // .show()df.createTempView(salary)session.sql(|select id,name,all_income(salary,bonus) all from salary|.stripMargin).show()} }输出 2. UDAF用户自定义的聚合函数 指的是用户自定义的聚合函数多进一出比如MySQL中的count函数avg函数。 以学生信息为主进行统计所有人员的年龄的总和 或者每个性别的年龄的平均值 计算所有人的年龄之和 package com.atguigu.bigdata.testimport org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions} import org.apache.spark.sql.expressions.Aggregator/*** ClassName : TestUDAF* Package : com.atguigu.bigdata.test* Description** Author HeXua* Create 2024/11/29 19:09* Version 1.0*/ object TestUDAF {def main(args: Array[String]): Unit {val session SparkSession.builder().appName(test udaf).master(local[*]).getOrCreate()import session.implicits._val df session.sparkContext.textFile(D:\\software\\Spark\\SparkProgram1\\atguigu-classes\\data\\a.txt).map(t {val strs t.split( )(strs(0), strs(1), strs(2).toInt, strs(3))}).toDF(id, name, age, gender)import org.apache.spark.sql.functions._// 注册udaf函数session.udf.register(mysum,udaf(new MySum))df.createTempView(student)session.sql(|select mysum(age) from student|.stripMargin).show()} } // udaf的类继承Aggregator抽象类 class MySum extends Aggregator[Int,Int,Int]{//初始化def zero: Int 0//聚合逻辑def reduce(b: Int, a: Int): Int ab//整体聚合def merge(b1: Int, b2: Int): Int b1b2//最终返回值def finish(reduction: Int): Int reduction//累加值的类型def bufferEncoder: Encoder[Int] Encoders.scalaInt//输出结果的类型def outputEncoder: Encoder[Int] Encoders.scalaInt }定义用户自定义聚合函数时继承Aggregator类需要指定三个泛型参数。这三个泛型参数分别代表不同的概念。 泛型参数解释 1. 输入类型IN 这是聚合函数的输入类型即每次调用reduce方法时传入的单个元素的类型。例如你要计算一组整数的平均值输入类型就是int。 2. 缓冲区类型BUFFER 这是聚合函数的中间状态类型也称为缓冲区类型。 例如你要计算一组整数的平均值缓冲区可能包含两个字段总和和计数因为iBUF可能是一个元组。 3. 输出类型(OUT) 这是聚合函数的最终输出类型即finish方法返回的类型。例如你要计算平均值最终输出类型是Double。 方法解释 zero初始化缓冲区的值对于平均值计算初始化和计数都是0。 reduce更新缓冲区每次传入一个新的输入值时更新总和和计数。 finish计算最终结果根据缓冲区中的总和和计数计算平均值。 bufferEncoder定义缓冲区类型的编码器用于序列化和反序列化缓冲区。 outputEncoder定义最终输出类型的编码器用于序列化和反序列化输出结果。 计算每个性别的年龄的平均值 case class AggragateVo(var cnt:Int,var sum:Int) object MyAvg extends Aggregator[Int,AggragateVo,Double]{override def zero: AggragateVo AggragateVo(0,0)override def reduce(b: AggragateVo, a: Int): AggragateVo {b.cnt 1b.sum ab}override def merge(b1: AggragateVo, b2: AggragateVo): AggragateVo {b1.cnt b2.cntb1.sum b2.sumb1}override def finish(reduction: AggragateVo): Double {reduction.sum.toDouble /reduction.cnt}override def bufferEncoder: Encoder[AggragateVo] Encoders.productoverride def outputEncoder: Encoder[Double] Encoders.scalaDouble } 3. UDTF用户自定义炸裂函数 拆分函数进入的是一行内容出现的结果是多行内容。 spark中并不直接支持UDTF函数。但可以使用hive中的炸裂函数达到效果。 import org.apache.spark.sql.SparkSessionobject TestUDTF {def main(args: Array[String]): Unit {val session SparkSession.builder().appName(test udtf).master(local[*]).getOrCreate()import session.implicits._val df session.sparkContext.textFile(file:///headless/workspace/spark/data/m.txt).map(t {val strs t.split(,)(strs(0), strs(1), strs(2))}).toDF(id, name, actors)//explode map arraydf.createTempView(movies)session.sql(|select id,name,actor from movies lateral view explode(split(actors,\\|)) t as actor|.stripMargin).createTempView(movies1)session.sql(|select count(1),actor from movies1 group by actor|.stripMargin).show()} }
http://www.w-s-a.com/news/355731/

相关文章:

  • 网站全站建设开题报告范文南京本地网站
  • 网站漏洞扫描工具wampserver集成环境搭建了一个织梦cms网站
  • 如何在局域网上做网站宁波设计公司排行榜
  • 自己的电脑做网站服务器吗百度搜索风云榜总榜
  • 做化妆品的一些网站企业网站建设与营运计划书
  • 重庆速代网络科技seo整站优化服务教程
  • 成都比较好的装修设计公司seo3的空间构型
  • 开发商建设审批网站成都创意设计公司
  • 百度快照比网站上线时间早wordpress新建阅读量字段
  • 国家工程建设标准化协会网站网站开发工具有
  • 上海网站建设集中公关公司组织架构图
  • wordpress副标题的作用百度网站标题优化
  • 大连哪家公司做网站比较好wordpress 判断用户组
  • 网站空间1g多少钱东莞公司高端网站建设
  • 网站服务器出错是什么意思做餐饮酒店网站
  • 房地产网站建设策划方案网站建设教程简笔画
  • 3d室内设计软件wordpress本地优化加速版
  • 南京高新区规划建设局网站石家庄哪里做网站比较好
  • 免费培训课程网站优化的方式
  • 做网站要固定电话在家自己做网站
  • 招聘网站开发视频新手如何做网站维护
  • flash 网站欣赏国外做的比较好的网站有哪些
  • 推广一个网站需要什么官网首页设计
  • 淘宝建设网站的理由企业官网建设哪家好
  • 青岛网站推wordpress主题切换
  • 天元建设集团有限公司资质郑州网站seo推广
  • 免费网站后台管理系统模板下载百度网盘app下载安装
  • 开封网站建设培训郑州高端网站建设哪家好
  • 东莞哪家做网站很有名的公司即墨专业医院网站制作公司
  • 做面食网站china cd wordpress