当前位置: 首页 > news >正文

网站qq显示未启用推进门户网站建设工作会议

网站qq显示未启用,推进门户网站建设工作会议,菏泽+网站建设公司,建站到网站收录到优化Gravitino SparkConnector 实现原理 本文参考了官网介绍#xff0c;想看官方解析请参考 官网地址 本文仅仅介绍原理 文章目录 Gravitino SparkConnector 实现原理背景知识-Spark Plugin 介绍(1) **插件加载**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) *…Gravitino SparkConnector 实现原理 本文参考了官网介绍想看官方解析请参考 官网地址 本文仅仅介绍原理 文章目录 Gravitino SparkConnector 实现原理背景知识-Spark Plugin 介绍(1) **插件加载**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) **插件执行**(5) **插件销毁** 背景知识-Driver Plugin 介绍(1) **init 方法**(2) **registerMetrics 方法**(3) **onTaskStart 方法**(4) **onTaskSucceeded 方法**(5) **onTaskFailed 方法**(6) **close 方法** SparkConnector使用方式加载spark.sql.catalog.xxx 具体执行的配置 背景知识-Spark Plugin 介绍 spark在[spark-29399]pr提交更新了SparkPlugin插件 SparkPlugin插件执行生命周期 SparkPlugin 的生命周期与 Spark 应用程序的生命周期一致具体如下 (1) 插件加载 当 Spark 应用程序启动时Spark 会扫描类路径下的 SparkPlugin 实现类。如果插件被正确配置例如通过 spark.plugins 配置项Spark 会实例化该类。 (2) DriverPlugin 初始化 Spark 调用 driverPlugin() 方法获取 DriverPlugin 实例。DriverPlugin 的生命周期开始其方法如 init、registerMetrics 等会被调用。 (3) ExecutorPlugin 初始化 Spark 调用 executorPlugin() 方法获取 ExecutorPlugin 实例。ExecutorPlugin 的生命周期开始其方法如 init、shutdown 等会被调用。 (4) 插件执行 DriverPlugin 在 Driver 端执行自定义逻辑例如注册指标、拦截 SQL 解析、修改 Catalog 等。ExecutorPlugin 在 Executor 端执行自定义逻辑例如监控 Task 执行、收集指标等。 (5) 插件销毁 当 Spark 应用程序结束时DriverPlugin 和 ExecutorPlugin 的生命周期结束其 close() 方法会被调用以释放资源。 背景知识-Driver Plugin 介绍 DriverPlugin 是用于在 Driver 端执行自定义逻辑的插件其生命周期方法包括 (1) init 方法 在 Driver 插件初始化时调用。可以在此方法中执行初始化逻辑例如注册自定义 Catalog、拦截 SQL 解析器等。 (2) registerMetrics 方法 在 Driver 插件初始化时调用。可以在此方法中注册自定义指标Metrics。 (3) onTaskStart 方法 在 Task 启动时调用。可以在此方法中执行与 Task 相关的逻辑。 (4) onTaskSucceeded 方法 在 Task 成功完成时调用。可以在此方法中执行与 Task 成功相关的逻辑。 (5) onTaskFailed 方法 在 Task 失败时调用。可以在此方法中执行与 Task 失败相关的逻辑。 (6) close 方法 在 Driver 插件销毁时调用。可以在此方法中释放资源例如关闭连接、清理缓存等。 SparkConnector使用方式 ./bin/spark-sql -v \ --conf spark.pluginsorg.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin \ --conf spark.sql.gravitino.urihttp://127.0.0.1:8090 \ --conf spark.sql.gravitino.metalaketest \ --conf spark.sql.gravitino.enableIcebergSupporttrue \ --conf spark.sql.warehouse.dirhdfs://127.0.0.1:9000/user/hive/warehouse-hive可以看出SparkConnector指定了加载的插件是GravitinoSparkPlugin public class GravitinoSparkPlugin implements SparkPlugin {Overridepublic DriverPlugin driverPlugin() {return new GravitinoDriverPlugin();}Overridepublic ExecutorPlugin executorPlugin() {return null;} }可以看出实现方式很简单仅仅使用了一个GravitinoDriverPlugin也就是在Spark应用程序启动的时候扫描SparkPlugin扫描到了这个GravitinoSparkPlugin然后立马就去执行GravitinoDriverPlugin初始化程序。在DriverPlugin初始化过程中 插件仅仅覆写了两个函数init() 和shutdown()。 说明这个插件仅仅做了一些初始化和资源销毁操作。 在Driver端进行初始化 配置检查检查gravitino_uri和gravitino_metalake是否配置 如果开启了iceberg则将gravitinoDriverExtensions放入到数组中方便配置 初始化Gravtino客户端和GravitinoCatalogManager并且将relational类型的表加载到缓存中 将缓存中的catalog进行如果是非iceberg类型当前仅仅只有Hive进行注册这里定义的注册的实际操作配置Spark的配置项spark.sql.catalog.catalogName这里的catalogName对应的是缓存中的catalogName配置的值为根据Gravitino自己的Catalog使用的Provider进行适配比如可以是(org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33或者org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33)具体情况由适配器进行处理。 然后注册SqlExtensions其实就是将第2步骤的数组配置到SPARK_SESSION_EXTENSIONS这个SparkConf配置里面 稍微贴一下注册Catalog代码比较重要 //初始化的时候调用注册逻辑将Gravitino中的Catalog加载到缓存//然后将缓存中的数据作为第二个参数gravitinoCatalogs传递进来private void registerGravitinoCatalogs(SparkConf sparkConf, MapString, Catalog gravitinoCatalogs) {gravitinoCatalogs.entrySet().forEach(entry - {String catalogName entry.getKey();Catalog gravitinoCatalog entry.getValue();String provider gravitinoCatalog.provider();if (lakehouse-iceberg.equals(provider.toLowerCase(Locale.ROOT)) enableIcebergSupport false) {return;}try {registerCatalog(sparkConf, catalogName, provider);} catch (Exception e) {LOG.warn(Register catalog {} failed., catalogName, e);}});}//这里根据适配器去配置spark.sql.catalog.xxx 的具体执行CatalogClassprivate void registerCatalog(SparkConf sparkConf, String catalogName, String provider) {if (StringUtils.isBlank(provider)) {LOG.warn(Skip registering {} because catalog provider is empty., catalogName);return;}String catalogClassName CatalogNameAdaptor.getCatalogName(provider);if (StringUtils.isBlank(catalogClassName)) {LOG.warn(Skip registering {} because {} is not supported yet., catalogName, provider);return;}String sparkCatalogConfigName spark.sql.catalog. catalogName;Preconditions.checkArgument(!sparkConf.contains(sparkCatalogConfigName),catalogName is already registered to SparkCatalogManager);sparkConf.set(sparkCatalogConfigName, catalogClassName);LOG.info(Register {} catalog to Spark catalog manager., catalogName);}到这里GravitinoConnector的代码机制已经说完了下面聊聊Spark机制 加载spark.sql.catalog.xxx 具体执行的配置 经过上面GravitinoDriverPlugin的初始化之后已经将具体的catalog名称和对应的处理类映射起来这里以GravitinoHiveCatalogSpark33为例。 GravitinoHiveCatalogSpark33这个类继承关系是继承了BaseCatalog 而BaseCatalog是Spark中定义的CatalogPlugin的一个实现类。 Spark在解析SQL的时候会查找catalog对应的Catalog可以看到调用了CatalogManager.catalog()方法 private object CatalogAndMultipartIdentifier {def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] parts match {case Seq(_) Some((None, parts))case Seq(catalogName, tail _*) try {Some((Some(catalogManager.catalog(catalogName)), tail))} catch {case _: CatalogNotFoundException Some((None, parts))}}}这个catalog方法调用了Catalogs.load()方法 def catalog(name: String): CatalogPlugin synchronized {if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {v2SessionCatalog} else {catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))}}这个方法才是真正的加载方法他真正根据conf配置将GravitinoHiveCatalogSpark33名称根据定义的反射构造函数实例化到内存中 def load(name: String, conf: SQLConf): CatalogPlugin {val pluginClassName try {val _pluginClassName conf.getConfString(sspark.sql.catalog.$name)// SPARK-39079 do configuration check first, otherwise some path-based table like// org.apache.spark.sql.json./path/json_file may fail on analyze phaseif (name.contains(.)) {throw QueryExecutionErrors.invalidCatalogNameError(name)}_pluginClassName} catch {case _: NoSuchElementException throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)}val loader Utils.getContextOrSparkClassLoadertry {val pluginClass loader.loadClass(pluginClassName)if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName)}val plugin pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]plugin.initialize(name, catalogOptions(name, conf))plugin} catch {// 省略}}到这里流程就分析结束了
http://www.w-s-a.com/news/857379/

相关文章:

  • 长沙企业网站建设哪家好做app一般多少钱
  • 南宁一站网网络技术有限公司网站开发技术应用领域
  • 公司网站建设方案ppt专业构建网站的公司
  • 深圳网站建设方维网络网站框架设计好后怎么做
  • 合肥网站建设过程网站栏目建设调研
  • 手机访问网站页面丢失北京电商平台网站建设
  • 郑州网站怎么推广中山 网站关键词优化
  • 国外试用网站空间网站建设与管理题目
  • 淄博网赢网站建设网站设计的技术选择
  • 建外贸网站 东莞厦门做网站最好的公司
  • 为您服务网站新网站做百度推广
  • 电子商务免费网站建设网站制作哪个好薇
  • 全面启动门户网站建设中小型企业建设一个网站大概需要多少钱
  • 建网站一般多少钱网站建设上传服务器步骤
  • 手机销售网站怎么做的网站推广优化建设方案
  • 做任务分享赚钱的网站德阳网站建设公司哪家好
  • 云南建设工程质量监督网站wordpress网站导航主题
  • 徐州网站建设哪家好薇手机开源网站代码
  • 更新网站要怎么做呢泰安市58同城招聘网
  • 溧阳网站建设价格企业网站设计费用
  • 我建设的网站打开很慢河北住房和城乡建设厅网站卡
  • 门户网站广告的特点有网站的建设初步定位
  • 建设网站第一步网页建设方案
  • 网站开发需要那些人才wordpress 小工具原理
  • 广州建设局官方网站佛山高端网站制作公司
  • 东莞哪里能学建设网站网站备案值得吗
  • 中山 网站建设 骏域小程序开发课程
  • 北京网站建设成都微商城app官方下载
  • 网站开发用户登陆的安全wordpress 开发网站
  • 网站建设容易出现的问题四川seo关键词工具