阳江市住房和城乡规划建设局网站,泰安网站建设作用,做珠宝网站,货运网站源码Apache SeaTunnel新版本已经发布#xff0c;感兴趣的小伙伴可以看之前版本发布的文章 本文主要给大家介绍为使用2.3.4版本的新特性#xff0c;需要对Apache SeaTunnel-Web依赖的版本进行升级#xff0c;而SeaTunnel2.3.4版本部分API跟之前版本不兼容#xff0c;所以需要对 …Apache SeaTunnel新版本已经发布感兴趣的小伙伴可以看之前版本发布的文章 本文主要给大家介绍为使用2.3.4版本的新特性需要对Apache SeaTunnel-Web依赖的版本进行升级而SeaTunnel2.3.4版本部分API跟之前版本不兼容所以需要对 SeaTunnel-Web的源码进行修改适配。
源码修改编译
克隆SeaYunnel-Web源码到本地 git clone https://github.com/apache/seatunnel-web.git
在idea中打开项目
升级Pom中的SeaTunnel版本到2.3.4并重新导入依赖 seatunnel-framework.version2.3.3/seatunnel-framework.version改为seatunnel-framework.version2.3.4/seatunnel-framework.version
因为大部分用户使用SeaTunnel Web都是基于SeaTunnel-2.3.3 版本做的适配,而最新发布的SeaTunnel2.3.4 部分API发生了改动导致直接升级的过程中会出现API不兼容的问题所以本篇文章重点来了我们需要对调用SeaTunnel API的SeaTunnel Web源码部分进行修改修改完之后我们就能完全适配2.3.4最新版本。
社区推出了2.3.X及Web系列专属的社群感兴趣的小伙伴可以加社区小助手进群。
org.apache.dolphinscheduler.api.dto.seatunnel.bean.engine.EngineDataType
public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertorSeaTunnelDataType? {Overridepublic SeaTunnelDataType? toSeaTunnelType(String engineDataType) {return DATA_TYPE_MAP.get(engineDataType.toLowerCase(Locale.ROOT)).getRawType();}Overridepublic SeaTunnelDataType? toSeaTunnelType(SeaTunnelDataType? seaTunnelDataType, MapString, Object map)throws DataTypeConvertException {return seaTunnelDataType;}Overridepublic SeaTunnelDataType? toConnectorType(SeaTunnelDataType? seaTunnelDataType, MapString, Object map)throws DataTypeConvertException {return seaTunnelDataType;}Overridepublic String getIdentity() {return EngineDataTypeConvertor;}
}
// 改为
public static class SeaTunnelDataTypeConvertorimplements DataTypeConvertorSeaTunnelDataType? {Overridepublic SeaTunnelDataType? toSeaTunnelType(String s, String s1) {return DATA_TYPE_MAP.get(s.toLowerCase(Locale.ROOT)).getRawType();}Overridepublic SeaTunnelDataType? toSeaTunnelType(String s, SeaTunnelDataType? seaTunnelDataType, MapString, Object map) {return seaTunnelDataType;}Overridepublic SeaTunnelDataType? toConnectorType(String s, SeaTunnelDataType? seaTunnelDataType, MapString, Object map) {return seaTunnelDataType;}Overridepublic String getIdentity() {return EngineDataTypeConvertor;}}
org.apache.seatunnel.app.service.impl.TableSchemaServiceImpl
public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);SetPluginIdentifier pluginIdentifiers SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayListPluginIdentifier pluginIdentifiersList new ArrayList();pluginIdentifiersList.addAll(pluginIdentifiers);ListURL pluginJarPaths new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);// Path path new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {// ListURL files FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory new DataTypeConvertorFactory();}
}
// 改为public TableSchemaServiceImpl() throws IOException {Common.setStarter(true);SetPluginIdentifier pluginIdentifiers SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SINK).keySet();ArrayListPluginIdentifier pluginIdentifiersList new ArrayList();pluginIdentifiersList.addAll(pluginIdentifiers);ListURL pluginJarPaths new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiersList);// Path path new SeaTunnelSinkPluginDiscovery().getPluginDir();if (!pluginJarPaths.isEmpty()) {// ListURL files FileUtils.searchJarFiles(path);pluginJarPaths.addAll(FileUtils.searchJarFiles(Common.pluginRootDir()));factory new DataTypeConvertorFactory(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factory new DataTypeConvertorFactory();}}SeaTunnelDataType? dataType convertor.toSeaTunnelType(field.getType());
// 改为
SeaTunnelDataType? dataType convertor.toSeaTunnelType(field.getName(), field.getType());
org.apache.seatunnel.app.service.impl.JobExecutorServiceImpl.executeJobBySeaTunnel() public Long executeJobBySeaTunnel(Integer userId, String filePath, Long jobInstanceId) {Common.setDeployMode(DeployMode.CLIENT);JobConfig jobConfig new JobConfig();jobConfig.setName(jobInstanceId _job);try {SeaTunnelConfig seaTunnelConfig new YamlSeaTunnelConfigBuilder().build();SeaTunnelClient seaTunnelClient createSeaTunnelClient();ClientJobExecutionEnvironment jobExecutionEnv seaTunnelClient.createExecutionContext(filePath, jobConfig, seaTunnelConfig);final ClientJobProxy clientJobProxy jobExecutionEnv.execute();JobInstance jobInstance jobInstanceDao.getJobInstance(jobInstanceId);jobInstance.setJobEngineId(Long.toString(clientJobProxy.getJobId()));jobInstanceDao.update(jobInstance);CompletableFuture.runAsync(() - {waitJobFinish(clientJobProxy,userId,jobInstanceId,Long.toString(clientJobProxy.getJobId()),seaTunnelClient);});} catch (ExecutionException | InterruptedException e) {ExceptionUtils.getMessage(e);throw new RuntimeException(e);}return jobInstanceId;}
org.apache.seatunnel.app.service.impl.JobInstanceServiceImpl
else if (statusList.contains(CANCELLING)) {jobStatus JobStatus.CANCELLING.name();
// 改为
else if (statusList.contains(CANCELING)) {jobStatus JobStatus.CANCELING.name();
org.apache.seatunnel.app.service.impl.SchemaDerivationServiceImpl
TableFactoryContext context new TableFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());
// 改为
TableTransformFactoryContext context new TableTransformFactoryContext(Collections.singletonList(table),ReadonlyConfig.fromMap(config),Thread.currentThread().getContextClassLoader());
org.apache.seatunnel.app.thirdparty.engine.SeaTunnelEngineProxy
public void restoreJob(NonNull String filePath, NonNull Long jobInstanceId, NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient new SeaTunnelClient(clientConfig);JobConfig jobConfig new JobConfig();jobConfig.setName(jobInstanceId _job);try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}
}
// 改为
public void restoreJob(NonNull String filePath, NonNull Long jobInstanceId, NonNull Long jobEngineId) {SeaTunnelClient seaTunnelClient new SeaTunnelClient(clientConfig);JobConfig jobConfig new JobConfig();jobConfig.setName(jobInstanceId _job);SeaTunnelConfig seaTunnelConfig new YamlSeaTunnelConfigBuilder().build();try {seaTunnelClient.restoreExecutionContext(filePath, jobConfig, seaTunnelConfig, jobEngineId).execute();} catch (ExecutionException e) {throw new RuntimeException(e);} catch (InterruptedException e) {throw new RuntimeException(e);}}
org.apache.seatunnel.app.thirdparty.framework.PluginDiscoveryUtil
public static MapPluginIdentifier, ConnectorFeature getConnectorFeatures(PluginType pluginType) throws IOException {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException(ONLY support plugin type source);}Path path new SeaTunnelSinkPluginDiscovery().getPluginDir();ListFactory factories;if (path.toFile().exists()) {ListURL files FileUtils.searchJarFiles(path);factories FactoryUtil.discoverFactories(new URLClassLoader(files.toArray(new URL[0])));} else {factories FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}MapPluginIdentifier, ConnectorFeature featureMap new ConcurrentHashMap();factories.forEach(plugin - {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory (TableSourceFactory) plugin;PluginIdentifier info PluginIdentifier.of(seatunnel,PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;
}
// 改为public static MapPluginIdentifier, ConnectorFeature getConnectorFeatures(PluginType pluginType) {Common.setStarter(true);if (!pluginType.equals(PluginType.SOURCE)) {throw new UnsupportedOperationException(ONLY support plugin type source);}ArrayListPluginIdentifier pluginIdentifiers new ArrayList();pluginIdentifiers.addAll(SeaTunnelSinkPluginDiscovery.getAllSupportedPlugins(PluginType.SOURCE).keySet());ListURL pluginJarPaths new SeaTunnelSinkPluginDiscovery().getPluginJarPaths(pluginIdentifiers);ListFactory factories;if (!pluginJarPaths.isEmpty()) {factories FactoryUtil.discoverFactories(new URLClassLoader(pluginJarPaths.toArray(new URL[0])));} else {factories FactoryUtil.discoverFactories(Thread.currentThread().getContextClassLoader());}MapPluginIdentifier, ConnectorFeature featureMap new ConcurrentHashMap();factories.forEach(plugin - {if (TableSourceFactory.class.isAssignableFrom(plugin.getClass())) {TableSourceFactory tableSourceFactory (TableSourceFactory) plugin;PluginIdentifier info PluginIdentifier.of(seatunnel,PluginType.SOURCE.getType(),plugin.factoryIdentifier());featureMap.put(info,new ConnectorFeature(SupportColumnProjection.class.isAssignableFrom(tableSourceFactory.getSourceClass())));}});return featureMap;
代码格式化
mvn spotless:apply
编译打包
mvn clean package -DskipTests
至此seatunnel web 适配 seatunnel2.3.4版本完成对应的安装包会在 seatunnel-web-dist/target目录下生成
Linux部署测试
这里具体请参考之前社区其他老师发布的文章Apache SeaTunnel Web部署指南
重要的配置项
1、seatunnel-web数据库相关配置application.yml
用来web服务中的数据持久化2、SEATUNNEL_HOME环境变量
seatunnel-web调用seaunnel的插件获取的API,扫描connector相关的连接器3、ST_WEB_HOME环境变量
seatunnel-web会加载seatunnel-web/datasource下的插件包这里决定了seatunnel-web支持哪些数据源的任务定义4、重要的配置文件
connector-datasource-mapper.yaml
该配置文件配置了支持的数据源类型以及该数据源支持的数据同步方式等信息比如是否支持多表同步、是否支持cdc等
hazelcast-client.yaml
seatunnel-web服务通过seatunnel-api的方式与seatunnel集群进行交互该配置文件配置了集群节点等相关信息
感谢大家的阅读希望对各位兄弟有所帮助如果有任何疑问欢迎来社区找我交流 本文由 白鲸开源科技 提供发布支持