当前位置: 首页 > news >正文

免费做网站送域名的上海品质网站建设

免费做网站送域名的,上海品质网站建设,网站左侧 导航,大厂做网站Gravitino SparkConnector 实现原理 本文参考了官网介绍#xff0c;想看官方解析请参考 官网地址 本文仅仅介绍原理 文章目录 Gravitino SparkConnector 实现原理背景知识-Spark Plugin 介绍(1) **插件加载**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) *…Gravitino SparkConnector 实现原理 本文参考了官网介绍想看官方解析请参考 官网地址 本文仅仅介绍原理 文章目录 Gravitino SparkConnector 实现原理背景知识-Spark Plugin 介绍(1) **插件加载**(2) **DriverPlugin 初始化**(3) **ExecutorPlugin 初始化**(4) **插件执行**(5) **插件销毁** 背景知识-Driver Plugin 介绍(1) **init 方法**(2) **registerMetrics 方法**(3) **onTaskStart 方法**(4) **onTaskSucceeded 方法**(5) **onTaskFailed 方法**(6) **close 方法** SparkConnector使用方式加载spark.sql.catalog.xxx 具体执行的配置 背景知识-Spark Plugin 介绍 spark在[spark-29399]pr提交更新了SparkPlugin插件 SparkPlugin插件执行生命周期 SparkPlugin 的生命周期与 Spark 应用程序的生命周期一致具体如下 (1) 插件加载 当 Spark 应用程序启动时Spark 会扫描类路径下的 SparkPlugin 实现类。如果插件被正确配置例如通过 spark.plugins 配置项Spark 会实例化该类。 (2) DriverPlugin 初始化 Spark 调用 driverPlugin() 方法获取 DriverPlugin 实例。DriverPlugin 的生命周期开始其方法如 init、registerMetrics 等会被调用。 (3) ExecutorPlugin 初始化 Spark 调用 executorPlugin() 方法获取 ExecutorPlugin 实例。ExecutorPlugin 的生命周期开始其方法如 init、shutdown 等会被调用。 (4) 插件执行 DriverPlugin 在 Driver 端执行自定义逻辑例如注册指标、拦截 SQL 解析、修改 Catalog 等。ExecutorPlugin 在 Executor 端执行自定义逻辑例如监控 Task 执行、收集指标等。 (5) 插件销毁 当 Spark 应用程序结束时DriverPlugin 和 ExecutorPlugin 的生命周期结束其 close() 方法会被调用以释放资源。 背景知识-Driver Plugin 介绍 DriverPlugin 是用于在 Driver 端执行自定义逻辑的插件其生命周期方法包括 (1) init 方法 在 Driver 插件初始化时调用。可以在此方法中执行初始化逻辑例如注册自定义 Catalog、拦截 SQL 解析器等。 (2) registerMetrics 方法 在 Driver 插件初始化时调用。可以在此方法中注册自定义指标Metrics。 (3) onTaskStart 方法 在 Task 启动时调用。可以在此方法中执行与 Task 相关的逻辑。 (4) onTaskSucceeded 方法 在 Task 成功完成时调用。可以在此方法中执行与 Task 成功相关的逻辑。 (5) onTaskFailed 方法 在 Task 失败时调用。可以在此方法中执行与 Task 失败相关的逻辑。 (6) close 方法 在 Driver 插件销毁时调用。可以在此方法中释放资源例如关闭连接、清理缓存等。 SparkConnector使用方式 ./bin/spark-sql -v \ --conf spark.pluginsorg.apache.gravitino.spark.connector.plugin.GravitinoSparkPlugin \ --conf spark.sql.gravitino.urihttp://127.0.0.1:8090 \ --conf spark.sql.gravitino.metalaketest \ --conf spark.sql.gravitino.enableIcebergSupporttrue \ --conf spark.sql.warehouse.dirhdfs://127.0.0.1:9000/user/hive/warehouse-hive可以看出SparkConnector指定了加载的插件是GravitinoSparkPlugin public class GravitinoSparkPlugin implements SparkPlugin {Overridepublic DriverPlugin driverPlugin() {return new GravitinoDriverPlugin();}Overridepublic ExecutorPlugin executorPlugin() {return null;} }可以看出实现方式很简单仅仅使用了一个GravitinoDriverPlugin也就是在Spark应用程序启动的时候扫描SparkPlugin扫描到了这个GravitinoSparkPlugin然后立马就去执行GravitinoDriverPlugin初始化程序。在DriverPlugin初始化过程中 插件仅仅覆写了两个函数init() 和shutdown()。 说明这个插件仅仅做了一些初始化和资源销毁操作。 在Driver端进行初始化 配置检查检查gravitino_uri和gravitino_metalake是否配置 如果开启了iceberg则将gravitinoDriverExtensions放入到数组中方便配置 初始化Gravtino客户端和GravitinoCatalogManager并且将relational类型的表加载到缓存中 将缓存中的catalog进行如果是非iceberg类型当前仅仅只有Hive进行注册这里定义的注册的实际操作配置Spark的配置项spark.sql.catalog.catalogName这里的catalogName对应的是缓存中的catalogName配置的值为根据Gravitino自己的Catalog使用的Provider进行适配比如可以是(org.apache.gravitino.spark.connector.hive.GravitinoHiveCatalogSpark33或者org.apache.gravitino.spark.connector.iceberg.GravitinoIcebergCatalogSpark33)具体情况由适配器进行处理。 然后注册SqlExtensions其实就是将第2步骤的数组配置到SPARK_SESSION_EXTENSIONS这个SparkConf配置里面 稍微贴一下注册Catalog代码比较重要 //初始化的时候调用注册逻辑将Gravitino中的Catalog加载到缓存//然后将缓存中的数据作为第二个参数gravitinoCatalogs传递进来private void registerGravitinoCatalogs(SparkConf sparkConf, MapString, Catalog gravitinoCatalogs) {gravitinoCatalogs.entrySet().forEach(entry - {String catalogName entry.getKey();Catalog gravitinoCatalog entry.getValue();String provider gravitinoCatalog.provider();if (lakehouse-iceberg.equals(provider.toLowerCase(Locale.ROOT)) enableIcebergSupport false) {return;}try {registerCatalog(sparkConf, catalogName, provider);} catch (Exception e) {LOG.warn(Register catalog {} failed., catalogName, e);}});}//这里根据适配器去配置spark.sql.catalog.xxx 的具体执行CatalogClassprivate void registerCatalog(SparkConf sparkConf, String catalogName, String provider) {if (StringUtils.isBlank(provider)) {LOG.warn(Skip registering {} because catalog provider is empty., catalogName);return;}String catalogClassName CatalogNameAdaptor.getCatalogName(provider);if (StringUtils.isBlank(catalogClassName)) {LOG.warn(Skip registering {} because {} is not supported yet., catalogName, provider);return;}String sparkCatalogConfigName spark.sql.catalog. catalogName;Preconditions.checkArgument(!sparkConf.contains(sparkCatalogConfigName),catalogName is already registered to SparkCatalogManager);sparkConf.set(sparkCatalogConfigName, catalogClassName);LOG.info(Register {} catalog to Spark catalog manager., catalogName);}到这里GravitinoConnector的代码机制已经说完了下面聊聊Spark机制 加载spark.sql.catalog.xxx 具体执行的配置 经过上面GravitinoDriverPlugin的初始化之后已经将具体的catalog名称和对应的处理类映射起来这里以GravitinoHiveCatalogSpark33为例。 GravitinoHiveCatalogSpark33这个类继承关系是继承了BaseCatalog 而BaseCatalog是Spark中定义的CatalogPlugin的一个实现类。 Spark在解析SQL的时候会查找catalog对应的Catalog可以看到调用了CatalogManager.catalog()方法 private object CatalogAndMultipartIdentifier {def unapply(parts: Seq[String]): Some[(Option[CatalogPlugin], Seq[String])] parts match {case Seq(_) Some((None, parts))case Seq(catalogName, tail _*) try {Some((Some(catalogManager.catalog(catalogName)), tail))} catch {case _: CatalogNotFoundException Some((None, parts))}}}这个catalog方法调用了Catalogs.load()方法 def catalog(name: String): CatalogPlugin synchronized {if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {v2SessionCatalog} else {catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))}}这个方法才是真正的加载方法他真正根据conf配置将GravitinoHiveCatalogSpark33名称根据定义的反射构造函数实例化到内存中 def load(name: String, conf: SQLConf): CatalogPlugin {val pluginClassName try {val _pluginClassName conf.getConfString(sspark.sql.catalog.$name)// SPARK-39079 do configuration check first, otherwise some path-based table like// org.apache.spark.sql.json./path/json_file may fail on analyze phaseif (name.contains(.)) {throw QueryExecutionErrors.invalidCatalogNameError(name)}_pluginClassName} catch {case _: NoSuchElementException throw QueryExecutionErrors.catalogPluginClassNotFoundError(name)}val loader Utils.getContextOrSparkClassLoadertry {val pluginClass loader.loadClass(pluginClassName)if (!classOf[CatalogPlugin].isAssignableFrom(pluginClass)) {throw QueryExecutionErrors.catalogPluginClassNotImplementedError(name, pluginClassName)}val plugin pluginClass.getDeclaredConstructor().newInstance().asInstanceOf[CatalogPlugin]plugin.initialize(name, catalogOptions(name, conf))plugin} catch {// 省略}}到这里流程就分析结束了
http://www.w-s-a.com/news/667578/

相关文章:

  • 如何做x响应式网站asp网站出现乱码
  • 网站备案的幕布是什么来的游戏推广代理
  • 固始城乡建设局的网站怎么打不开了上海建设网站
  • 关于加强网站信息建设的通知3d网站开发成本
  • 网站建设实训过程报告成品网站1688入口的功能介绍
  • 网站定制开发需要什么资质国外设计灵感网站
  • 搜搜网站收录广告设计与制作模板图片
  • 江苏省建设监理协会网站汕头网站建设方案优化
  • 中国风网站配色方案正规少儿编程排名
  • 兼职做网站的软件wordpress赞的代码
  • 销售网站的技巧四博互联做的网站
  • 网站建设 图片问题小程序免费制作平台凡科网页版
  • 猪八戒网做网站怎么样网站建设 客户同程
  • 西安网站建设那家强网站建设方案 报价
  • 销售网站建设考核指标网站建设价格组成
  • 网站302跳转网站建设完成后 下一步做什么
  • 赣州制作网站企业硬件开发用什么语言
  • 新网站如何被网站收录百度排名优化软件
  • html网站简易模板国内买机票的网站建设
  • 百度关键词分析工具百度seo排名软
  • 自己怎样做免费网站ueditor 上传wordpress
  • 深圳高端网站开发网站建设公司销售技巧
  • 网站建设的优势是什么意思可拖动网站
  • 建设什么企业网站网站微信认证
  • 网站开发的平台成都有哪些好玩的
  • 上海金瑞建设集团网站怎么创建免费网页
  • 柳州做网站设计的公司制作网站软件下载
  • 湖南seo网站开发苏州网络营销及网站推广
  • 如何发布自己做的网站郑州网站建设定制开发
  • 重庆网站商城宁波网络公司联系方式