网站开发交流群,站长广告联盟平台,哈尔滨网站建设口碑好,运行下打开wordpress项目需要处理一堆表#xff0c;这些表数据量不是很大都有经纬度信息#xff0c;但是这些表的数据没有流域信息#xff0c;需要按经纬度信息计算所属流域信息。比较简单的项目#xff0c;按DeepSeek提示思索完成开发#xff0c;AI真好用。 阿里AI个人版本IDEA安装 IDEA中使… 项目需要处理一堆表这些表数据量不是很大都有经纬度信息但是这些表的数据没有流域信息需要按经纬度信息计算所属流域信息。比较简单的项目按DeepSeek提示思索完成开发AI真好用。 阿里AI个人版本IDEA安装 IDEA中使用DeepSeek满血版的手把手教程来了 代码实现
1、controller
/*** 批量流域处理任务*/
Tag(name 批量流域处理任务)
ApiSupport(order 2)
RequestMapping(/job)
RestController
public class SysBatchJobController {AutowiredJobLauncher jobLauncher;AutowiredJobOperator jobOperator;AutowiredQualifier(updateWaterCodeJob)private Job updateWaterCodeJob;// 多线程分页更新数据GetMapping(/asyncJob)public void asyncJob() throws Exception {JobParameters jobParameters new JobParametersBuilder().addLong(time,System.currentTimeMillis()).toJobParameters();JobExecution run jobLauncher.run(updateWaterCodeJob, jobParameters);run.getId();}}
2、批量处理表
/*** 需要批量处理的业务表信息*/
Builder
AllArgsConstructor
Data
TableName(value ads_t_sys_batch_update_table)
public class SysBatchUpdateTable extends BaseEntity implements Serializable {private static final long serialVersionUID -7367871287146067724L;TableId(type IdType.ASSIGN_UUID)private String pkId;/*** 需要更新的表名**/TableField(value table_name)private String tableName;/*** 所需更新表所在数据库ID**/TableField(value data_source_id)private String dataSourceId;/*** 表对应的主键字段**/TableField(value key_id)private String keyId;/*** 表对应的流域字段**/TableField(value water_code_column)private String waterCodeColumn;/*** 表对应的经度字段**/TableField(value lon_column)private String lonColumn;/*** 表对应的纬度字段**/TableField(value lat_column)private String latColumn;public SysBatchUpdateTable() {}}
3、Mapper传递参数比较麻烦可以考虑将参数动态整合到sql里面构造
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.bigdatacd.panorama.system.domain.po.SysBatchUpdateTable;
import org.apache.ibatis.annotations.Mapper;import java.util.List;
import java.util.Map;Mapper
public interface UpdateTableMapper extends BaseMapperSysBatchUpdateTable {/*** 根据表名分页查询数据* param tableName 表名* return*/ListMapString, Object selectUpdateTableByPage(String tableName);/*** 更新数据* param tableName 表名* param waterCode 流域编码* param pkId 表主键ID*/void updateWaterCode(String tableName,String waterCode,String pkId);
}?xml version1.0 encodingUTF-8 ?
!DOCTYPE mapperPUBLIC -//mybatis.org//DTD Mapper 3.0//ENhttp://mybatis.org/dtd/mybatis-3-mapper.dtd
mapper namespacecom.bigdatacd.panorama.system.mapper.UpdateTableMapper!-- 动态分页查询通过#和$区别动态构造更新所需参数 --select idselectUpdateTableByPage resultTypejava.util.HashMap!--如果有分页查询就直接使用分页查询sql--SELECT${keyId} as pkId,#{keyId} as keyId,${waterCodeColumn} as waterCode,#{waterCodeColumn} as waterCodeColumn,${lonColumn} as lon,${latColumn} as lat,#{tableName} as tableNameFROM ${tableName} a where ${waterCodeColumn} is nullORDER BY ${keyId} !-- 确保分页顺序 --LIMIT #{_pagesize} OFFSET #{_skiprows}/select!-- 动态更新 --update idupdateWaterCodeUPDATE ${tableName}SET ${waterCodeColumn} #{waterCode}WHERE ${keyId} #{pkId} !-- 假设主键为id --/update
/mapper4、配置文件
Springbatch:job:enabled: false #启动时不启动jobjdbc:initialize-schema: always sql:init:schema-locations: classpath:/org/springframework/batch/core/schema-mysql.sql 数据源url加个批处理参数rewriteBatchedStatementstrueurl: jdbc:mysql://localhost:3306/xxxx?autoReconnecttrueuseUnicodetruecreateDatabaseIfNotExisttruecharacterEncodingutf8serverTimezoneGMT%2b8useSSLfalserewriteBatchedStatementstrue
5、主配置类调整按表分区
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.batch.MyBatisPagingItemReader;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.skip.AlwaysSkipItemSkipPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;import java.util.HashMap;
import java.util.Map;// 1. 主配置类调整按表分区
Configuration
EnableBatchProcessing
public class BatchConfiguration {Autowiredprivate JobBuilderFactory jobBuilderFactory;Autowiredprivate StepBuilderFactory stepBuilderFactory;Autowiredprivate SqlSessionFactory sqlSessionFactory;/*** 主任务** return*/Bean(updateWaterCodeJob)public Job updateWaterCodeJob(Qualifier(masterStep) Step masterStep) {return jobBuilderFactory.get(updateWaterCodeJob).start(masterStep).build();}Bean(masterStep)public Step masterStep(Qualifier(updateBatchTableData) Step updateBatchTableData,Qualifier(multiTablePartitioner) MultiTablePartitioner multiTablePartitioner) {return stepBuilderFactory.get(masterStep).partitioner(updateBatchTableData.getName(), multiTablePartitioner) // 分区器按表名分区一个表一个分区.step(updateBatchTableData).gridSize(10) // 按表分区了 并发数一般设置为核心数.taskExecutor(taskExecutor()).listener(new BatchJobListener()).build();}// 线程池配置核心线程数表数量Bean(batchTaskExecutor)public TaskExecutor taskExecutor() {ThreadPoolTaskExecutor executor new ThreadPoolTaskExecutor();executor.setCorePoolSize(5);executor.setMaxPoolSize(10);executor.setThreadNamePrefix(table-processor-);return executor;}/*** 处理分页数据更新步骤* return*/Bean(updateBatchTableData)public Step updateBatchTableData(Qualifier(dynamicTableReader) MyBatisPagingItemReaderMapString, Object myBatisPagingItemReader,Qualifier(batchUpdateWriter) BatchUpdateWriter batchUpdateWriter,Qualifier(tableProcessor) TableProcessor tableProcessor) {return stepBuilderFactory.get(updateBatchTableData).MapString, Object, MapString, Objectchunk(100).reader(myBatisPagingItemReader).processor(tableProcessor).writer(batchUpdateWriter).faultTolerant().skipPolicy(new AlwaysSkipItemSkipPolicy()).build();}/*** 分页获取需要更新的表数据* return*/BeanStepScopepublic MyBatisPagingItemReaderMapString, Object dynamicTableReader(Value(#{stepExecutionContext[keyId]}) String keyId, //需要更新的表ID字段Value(#{stepExecutionContext[waterCodeColumn]}) String waterCodeColumn,// 需要更新的流域字段Value(#{stepExecutionContext[lonColumn]}) String lonColumn,// 经度纬度字段Value(#{stepExecutionContext[latColumn]}) String latColumn,// 经度纬度字段Value(#{stepExecutionContext[tableName]}) String tableName // 需要更新的表名) {MyBatisPagingItemReaderMapString, Object reader new MyBatisPagingItemReader();reader.setSqlSessionFactory(sqlSessionFactory);reader.setQueryId(com.bigdatacd.panorama.system.mapper.UpdateTableMapper.selectUpdateTableByPage);MapString,Object param new HashMap();param.put(keyId,keyId);param.put(waterCodeColumn,waterCodeColumn);param.put(lonColumn,lonColumn);param.put(latColumn,latColumn);param.put(tableName,tableName);reader.setParameterValues(param);reader.setPageSize(2000);return reader;}}import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.core.namedparam.SqlParameterSource;
import org.springframework.stereotype.Component;import java.util.List;
import java.util.Map;// 批量更新Writer
Component(batchUpdateWriter)
StepScope
public class BatchUpdateWriter implements ItemWriterMapString, Object {Autowiredprivate NamedParameterJdbcTemplate jdbcTemplate;Overridepublic void write(List? extends MapString, Object items) {// 构造动态sql更新数据StringBuilder sb new StringBuilder();sb.append(UPDATE );sb.append((String) items.get(0).get(tableName));sb.append( SET );sb.append((String) items.get(0).get(waterCodeColumn));sb.append( :waterCode);sb.append( WHERE );sb.append((String) items.get(0).get(keyId));sb.append( :pkId);jdbcTemplate.batchUpdate(sb.toString(), items.stream().map(item - new MapSqlParameterSource().addValue(waterCode, item.get(waterCode)).addValue(tableName, item.get(tableName)).addValue(waterCodeColumn, item.get(waterCodeColumn)).addValue(keyId, item.get(keyId)).addValue(pkId, item.get(pkId))).toArray(SqlParameterSource[]::new));}
}import javax.sql.DataSource;
import java.util.HashMap;
import java.util.List;
import java.util.Map;Component
Slf4j
public class MultiTablePartitioner implements Partitioner {private final DataSource dataSource;public MultiTablePartitioner(DataSource dataSource) {this.dataSource dataSource;}Overridepublic MapString, ExecutionContext partition(int gridSize) {JdbcTemplate jdbcTemplate new JdbcTemplate(dataSource);String sql SELECT key_id as keyId,water_code_column as waterCodeColumn,lon_column as lonColumn,lat_column as latColumn,page_sql as pageSql,table_name as tableName FROM ads_t_sys_batch_update_table where deleted 0 and data_status 0;ListMapString,Object tables jdbcTemplate.queryForList(sql);log.info(查询 sql);MapString, ExecutionContext partitions new HashMap();for (int i 0; i tables.size(); i) {ExecutionContext ctx new ExecutionContext();// 将需要传递的参数放到上下文中,用于动态批量更新的sql用ctx.putString(keyId, String.valueOf(tables.get(i).get(keyId)));ctx.putString(waterCodeColumn, String.valueOf(tables.get(i).get(waterCodeColumn)));ctx.putString(lonColumn, String.valueOf(tables.get(i).get(lonColumn)));ctx.putString(latColumn, String.valueOf(tables.get(i).get(latColumn)));//ctx.putString(pageSql, String.valueOf(tables.get(i).get(pageSql)));ctx.putString(tableName, String.valueOf(tables.get(i).get(tableName)));partitions.put(partition i, ctx);}return partitions;}
}import com.bigdatacd.panorama.common.utils.GeoJsonUtil;
import lombok.Builder;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.stereotype.Component;import java.util.Map;// 处理数据的经纬度所在流域
Component(tableProcessor)
Builder
public class TableProcessor implements ItemProcessorMapString, Object, MapString, Object {Overridepublic MapString, Object process(MapString, Object item) {// 处理数据经纬度查找对应的流域item.put(waterCode, GeoJsonUtil.getWaterCode(Double.parseDouble(item.get(lon).toString()), Double.parseDouble(item.get(lat).toString())));return item;}
}import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobExecutionListener;/*** Job 监听*/
public class BatchJobListener implements JobExecutionListener {private long beingTime;private long endTime;Overridepublic void beforeJob(JobExecution jobExecution) {beingTime System.currentTimeMillis();System.out.println(jobExecution.getJobInstance().getJobName() beforeJob...... beingTime);}Overridepublic void afterJob(JobExecution jobExecution) {endTime System.currentTimeMillis();System.out.println(jobExecution.getJobInstance().getJobName() afterJob...... endTime);System.out.println(jobExecution.getJobInstance().getJobName() 一共耗耗时【 (endTime - beingTime) 】毫秒);}} 6、通过经纬度计算流域工具类
import lombok.extern.slf4j.Slf4j;
import org.geotools.feature.FeatureCollection;
import org.geotools.feature.FeatureIterator;
import org.geotools.geojson.feature.FeatureJSON;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.locationtech.jts.geom.*;
import org.opengis.feature.Feature;
import org.opengis.feature.Property;import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;/*** Description: GeoJSON工具类* author: Mr.xulong* date: 2023年01月09日 14:39*/
Slf4j
public class GeoJsonUtil {/*public static void main(String[] args) {try {FeatureCollection featureCollection getFeatureCollection(sichuanliuyu.json);double x 106.955085;double y 32.09546061139062;System.out.println(JSON.toJSONString(properties(x,y,featureCollection)));} catch (Exception e) {e.printStackTrace();}}*/private static String geoJsonFilePath sichuanliuyu.json;private GeoJsonUtil() {}/*** 获取区域数据集合** return*/public static FeatureCollection getFeatureCollection() {// 读取 GeoJson 文件InputStream resourceAsStream GeoJsonUtil.class.getResourceAsStream(/json/ geoJsonFilePath);FeatureJSON featureJSON new FeatureJSON();try {return featureJSON.readFeatureCollection(resourceAsStream);} catch (IOException e) {e.printStackTrace();}return null;}/*** 判断指定区域集合是否包含某个点* param longitude* param latitude* param featureCollection* return*/public static boolean contains(double longitude, double latitude, FeatureCollection featureCollection) {FeatureIterator features featureCollection.features();try {while (features.hasNext()) {Feature next features.next();if (isContains(longitude, latitude, next)) {return true;}}} finally {features.close();}return false;}/*** 判断指定区域集合是否包含某个点如果包含则返回所需属性* param longitude* param latitude* param featureCollection* return*/public static MapString, Object properties(double longitude, double latitude, FeatureCollection featureCollection) {FeatureIterator features featureCollection.features();try {while (features.hasNext()) {Feature next features.next();boolean contains isContains(longitude, latitude, next);// 如果点在面内则返回所需属性if (contains) {HashMapString, Object properties new HashMap();properties.put(waterCode, next.getProperty(FID).getValue());properties.put(waterName, next.getProperty(name).getValue());return properties;}}} finally {features.close();}return null;}/*** 判断指定区域集合是否包含某个点如果包含则返回所需属性* param longitude* param latitude* return*/public static MapString, Object properties(double longitude, double latitude) {FeatureCollection featureCollection getFeatureCollection();FeatureIterator features featureCollection.features();try {while (features.hasNext()) {Feature next features.next();boolean contains isContains(longitude, latitude, next);// 如果点在面内则返回所需属性if (contains) {HashMapString, Object properties new HashMap();properties.put(waterCode, next.getProperty(FID).getValue());properties.put(waterName, next.getProperty(name).getValue());return properties;}}} finally {features.close();}return null;}/*** 判断指定区域集合是否包含某个点如果包含则返回所需属性* param longitude* param latitude* return*/public static String getWaterCode(double longitude, double latitude) {FeatureCollection featureCollection getFeatureCollection();FeatureIterator features featureCollection.features();try {while (features.hasNext()) {Feature next features.next();boolean contains isContains(longitude, latitude, next);// 如果点在面内则返回所需属性if (contains) {return String.valueOf(next.getProperty(FID).getValue());}}} finally {features.close();}return null;}private static boolean isContains(double longitude, double latitude, Feature feature) {// 获取边界数据Property geometry feature.getProperty(geometry);Object value geometry.getValue();// 创建坐标的pointGeometryFactory geometryFactory JTSFactoryFinder.getGeometryFactory();Point point geometryFactory.createPoint(new Coordinate(longitude, latitude));boolean contains false;// 判断是单面还是多面if (value instanceof MultiPolygon) {MultiPolygon multiPolygon (MultiPolygon) value;contains multiPolygon.contains(point);} else if (value instanceof Polygon) {Polygon polygon (Polygon) value;contains polygon.contains(point);}return contains;}
}
7、地图依赖
geotools.version27-SNAPSHOT/geotools.version
!--地图--dependencygroupIdorg.geotools/groupIdartifactIdgt-shapefile/artifactIdversion${geotools.version}/versionexclusionsexclusiongroupIdorg.geotools/groupIdartifactIdgt-main/artifactId/exclusion/exclusions/dependencydependencygroupIdorg.geotools/groupIdartifactIdgt-main/artifactIdversion${geotools.version}/version/dependencydependencygroupIdorg.geotools/groupIdartifactIdgt-geojson/artifactIdversion${geotools.version}/version/dependencydependencygroupIdorg.geotools/groupIdartifactIdgt-swing/artifactIdversion${geotools.version}/version/dependencyrepositoriesrepositoryidosgeo/idnameOSGeo Release Repository/nameurlhttps://repo.osgeo.org/repository/release//urlsnapshotsenabledfalse/enabled/snapshotsreleasesenabledtrue/enabled/releases/repositoryrepositoryidosgeo-snapshot/idnameOSGeo Snapshot Repository/nameurlhttps://repo.osgeo.org/repository/snapshot//urlsnapshotsenabledtrue/enabled/snapshotsreleasesenabledfalse/enabled/releases/repository/repositories
参考git项目 springbatch: 这是一个SpringBoot开发的SpringBatch批处理示例示例主要是将文件30W条数据使用多线程导入到t_cust_temp表然后又将t_cust_temp表数据使用分片导入到t_cust_info表。下载即可用。