合肥网站建设百姓网,住建局官网报名入口,江苏建设服务信息网站,如何做内容收费的网站C#百万数据处理
在我们经验的不断增长中不可避免的会遇到一些数据量很大操作也复杂的业务 这种情况我们如何取优化如何去处理呢#xff1f;一般都要根据业务逻辑和背景去进行合理的改进。 文章目录 C#百万数据处理前言一、项目业务需求和开发背景项目开发背景数据量计算业务需…C#百万数据处理
在我们经验的不断增长中不可避免的会遇到一些数据量很大操作也复杂的业务 这种情况我们如何取优化如何去处理呢一般都要根据业务逻辑和背景去进行合理的改进。 文章目录 C#百万数据处理前言一、项目业务需求和开发背景项目开发背景数据量计算业务需求业务分析 二、老代码分析运行逻辑代码展示伪代码展示分析性能低原因 三、优化后的代码分析运行逻辑代码展示伪代码展示优化了那些问题 总结 前言
这里我给大家带来一个我自己所经历的百万数据处理案例该案例中会拿优化前和优化后的代码进行对比让大家更直观的感受优化给项目带来的效率提升。该项目优化用到了线程同步多线程sqlSugar,异步委托等知识。 一、项目业务需求和开发背景 项目开发背景
随着连锁门店数量的不断增加门店商品的库存数据和商品信息的维护业务压力不断增加。数据量越来越大。
数据量计算
假如数据如下
门店数量160家门店商品数量6000还需要更新商品对码信息6000数据接口最多每页查询1000条那么按以上数据要处理的数据最少就为966000条
业务需求
新的商品要做插入操作旧的商品要做更新操作更新库存信息和商品编码厂家等信息门店库存数据变更要在半小时内完成
业务分析
按以上条件我们可以分析出需要有Insert和Update操作。其中经验丰富的程序员就会发现其实像这种业务一般 Insert操作最多只占所有业务的20%Update操作占80%。 注意 很多小白程序员都是不进行业务分析就直接撸代码这种方式是不可取的等你撸完代码之后你就会发现效率根本就不行。后面又花大量时间去优化还如不体现分析了再进行开发。
二、老代码分析
运行逻辑
老代码的操作流程如下
查询出所有门店循环门店将门店数据传入操作函数中操作函数中再根据传入的门店信息循环去调用接口当传入的这个门店的数据拉取完成之后再去操作数据库
代码展示
代码如下
//查询所有门店商品信息ListStoreCheckCodeDO storeInfos b_Stores.GetStoreCheckCodes();//这里是用于更新对码的信息ListProductInfoDO infoDOs new ListProductInfoDO();foreach (var storeInfo in storeInfos ){//GetProductInfo的作用是调用总部接口获取并更新商品数据InventoryHelper.GetProductInfo(storeInfo , ref infoDOs );}if(infoDOs.count0){//更新对码信息InventoryHelper.ChangeProductCheckCode(infoDOs);}GetProductInfo 的具体实现
public static bool GetProductInfo(string orgid, ref ListProductInfoDO infoDOs, string goodsNo , string goodsName ){bool rel false;int pageSize 1000;int pageNum 1;ListDslStoreStockDO list 根据门店ID查询它的商品库存信息Console.WriteLine(开始更新门店{0}, orgid);//定义个集合来存储拉取到的数据ListStoreStockDO Stores new ListStoreStockDO();while (true){try{DslERPResQueryStockRes res3 调用总部数据接口每页查询1000条返回的数据if (res3 ! null res3.code 0){if (res3.data ! null res3.data.list ! null){if (res3.data.total 0){#region 模型转换//循环总部返回的数据集合并转成我们需要的DO是实体foreach (var product in res3.data.list){DslStoreStockDO dsl new DslStoreStockDO();把总部数据转为我们需要的实体存储我们的集合中dslStores.Add(dsl);ProductInfoDO infoDO 根据总部商品ID查询本地对码表返回商品对码信息实体if (infoDO null){//如果没有则new一个对象infoDO new ProductInfoDO();infoDO.LocalSku product.goodsNo.ToString();}//如果有这个商品就对其数据更新没有就用新的对象存储后续做新增操作infoDO.Name product.goodsName;infoDO.DslSku product.goodsNo.ToString();infoDO.goodsType product.goodsType;infoDO.proarea product.prodArea;infoDO.barcode product.barcode;infoDO.CreateTime DateTime.Now;infoDO.UpdateTime DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss);infoDO.Operator 系统自动更新;infoDO.IsDelete false;//查询集合中已存在的该商品ProductInfoDO b_there infoDOs.Where(t t.DslSku product.goodsNo.ToString()).FirstOrDefault();if (b_there ! null){//如果有则更新为现在从总部拉取到的商品信息if (infoDO.goodsType ! b_there.goodsType){b_there.goodsType infoDO.goodsType;}if (infoDO.proarea ! b_there.proarea){b_there.proarea infoDO.proarea;}if (infoDO.barcode ! b_there.barcode){b_there.barcode infoDO.barcode;}}else{//插入实体集合中infoDOs.Add(infoDO);}}#endregion//判断该门店商品信息是否拉完if (res3.data.pages pageNum){//利用list的 Except 方法进行数据筛选var different list.Except(dslStores, new StoreStockDOComparer()).ToList();//找到没拉到库存的different.ForEach(goodsInfo {//把没有从总部拉到商品的库存数据变更为0goodsInfo.goodsQty 0;goodsInfo.warehouseGoodsQty 0;});//清除容器内数据从新插入信息变更数据//list 这个list就是我们开头根据门店id查询该门店所有商品库存信息的数据//所以要清空重新写入我们调整好的数据list new ListDslStoreStockDO();list.AddRange(dslStores);list.AddRange(different);//获取完毕,正常结束Console.WriteLine(请稍等正在提交事务...);//_BulkCopyModel方法是做insertOrUpadate操作的。//根据后面传入的主键【goodsNo,placePointNo 】去操作数据库的数据int count _sqlsugar_DSL._BulkCopyModel(list, 门店库存表, t new { t.goodsNo, t.placePointNo });rel count 0;Console.WriteLine(门店{0}数据更新成功,操作数据条数:{1}, orgid, count);break;}pageNum;}else{Console.WriteLine(未查询到更新数据);break;}}else{///获取完毕,正常结束Console.WriteLine(门店{0}第{1}页数据拉取失败, orgid, pageNum.ToString());break;}}else{//获取异常,强制结束Console.WriteLine(门店:{0}第{1}页数据发起请求失败{2}, orgid, pageNum.ToString(), res3.msg);break;}}catch (Exception ex){Console.WriteLine(门店:{0}第{1}页数据处理失败{2}, orgid, pageNum.ToString(), ex.Message);break;}}return rel;}
伪代码展示
上述代码还是比较复杂很长不太能直观的一眼看到问题所在下面我就写一段伪造代码来给大家展示一下逻辑
代码如下示例
门店集合查询到所有门店商品对码信息收集集合 new ListT();foreach (var 单个门店实体 in 门店集合 ){//GetProductInfo的作用是调用总部接口获取并更新商品数据InventoryHelper.GetProductInfo(门店ID, ref 商品对码信息收集集合 );}GetProductInfo(门店ID,ref 门店对码信息收集集合){List库存数据实体 旧库存数据根据门店ID获取库存信息门店ID;List库存数据实体 新库存数据new List库存数据实体();while(true){页码1;每页查询数量1000;总部数据返回数据集合请求总部数据接口(页码,每页查询数量)foreach (var 数据返回数据实体 in 总部数据返回数据集合 ){数据返回数据实体 转为 库存数据实体 新库存数据.add(库存数据实体)数据返回数据实体数据 取出所需数据组成 商品对码实体门店对码信息收集集合.add(商品对码实体)}if(页码总部返回数据集合.总页数){新库存数据 做Insert OR Update 操作break;}页码;}}分析性能低原因
单线程需要一个门店一个门店的处理每处理一个门店都需要去操作一次数据库做Insert OR Update 操作单线程请求总部接口线程要等待接口返回才继续往下处理没有达到效率最大化每次都要去查询门店的所有商品库存导致不必要的查询开销。
三、优化后的代码分析
运行逻辑
查询出所有门店根据传入线程数做分组多线程异步获取总部接口库存数据当分组的总部数据获取完成后以组为单位更新数据库数据
代码展示
代码如下 public static void GetProductInfo(ListDslStoreCheckCodeDO orgids, int ThreadNumber 1){if (ThreadNumber orgids.Count){ThreadNumber 1;}string startTime DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss);ListListDslStoreCheckCodeDO groupedLists orgids.Select((value, index) new { Index index, Value value }).GroupBy(x x.Index / (orgids.Count() / ThreadNumber)).Select(x x.Select(v v.Value).ToList()).ToList();ListProductInfoDO outval new ListProductInfoDO();int _ThreadCount groupedLists.Count();int finishcount 0;object locker new object();foreach (var dslStores in groupedLists){new Thread(async () {string TID Thread.CurrentThread.ManagedThreadId.ToString(00);int count 0;ListProductInfoDO productInfos new ListProductInfoDO();ListDslStoreStockDO dslStock new ListDslStoreStockDO();foreach (var StoreInfo in dslStores){count;Console.WriteLine(线程ID[{0}]剩余【{1}/{2}】, TID, dslStores.Count, count);string orgid StoreInfo.DslStoreNo;ListProductS products await GetProductInfo(orgid);#region 模型转换foreach (var product in products){DslStoreStockDO dsl new DslStoreStockDO();dsl.placePointNo orgid;dsl.goodsNo product.goodsNo;dsl.goodsId product.goodsId;dsl.goodsName product.goodsName;dsl.goodsType string.IsNullOrWhiteSpace(product.goodsType) ? 无 : product.goodsType;dsl.prodArea product.prodArea ?? 无;dsl.barcode string.IsNullOrWhiteSpace(product.barcode) ? 无 : product.barcode;dsl.goodsQty product.goodsQty;dsl.warehouseGoodsQty product.warehouseGoodsQty ?? 0;dsl.purPrice product.purPrice ?? 0;dsl.retailPrice product.retailPrice;dsl.salesTaxRate product.salesTaxRate;dsl.cretime DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss);dsl.isChange true;dslStock.Add(dsl);ProductInfoDO infoDO dSL_ERP.GetCodeCahe(product.goodsNo.ToString());if (infoDO null){infoDO new ProductInfoDO();infoDO.LocalSku product.goodsNo.ToString();}infoDO.Name product.goodsName;infoDO.DslSku product.goodsNo.ToString();infoDO.goodsType product.goodsType;infoDO.proarea product.prodArea;infoDO.barcode product.barcode;infoDO.CreateTime DateTime.Now;infoDO.UpdateTime DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss);infoDO.Operator 系统自动更新;infoDO.IsDelete false;productInfos.Add(infoDO);}#endregion}if (count dslStores.Count){ //分配的门店处理完成后再同一操作数据库Liststring SotoreIds dslStores.Select(t t.DslStoreNo).ToList();ListDslStoreStockDO Oldstocks _sqlsugar_DSL.GetModelListDslStoreStockDO(StoreStock, t SotoreIds.Contains(t.placePointNo)); ListDslStoreStockDO newStocks dslStock.Except(Oldstocks, new StoreStockDOComparer()).ToList();ListDslStoreStockDO commonStocks dslStock.Except(newStocks, new StoreStockDOComparer()).ToList();try{if (Oldstocks.Count 0){var diffent Oldstocks.Except(dslStock, new StoreStockDOComparer()).ToList();diffent.ForEach(goodsInfo {goodsInfo.goodsQty 0;goodsInfo.warehouseGoodsQty 0;});commonStocks.AddRange(diffent);}Console.WriteLine(请稍等正在提交...);int InsNumber 0;if (newStocks.Count 0){InsNumber await _sqlsugar_DSL.AsynBulkCopyModel(newStocks, StoreStock, t new { t.goodsNo, t.placePointNo });}int UpdNumber await _sqlsugar_DSL.AsynBulkUpdateModel(commonStocks, StoreStock, new string[] { goodsNo, placePointNo }, new string[] { placePointNo, goodsNo, goodsId, goodsName, goodsType, prodArea, barcode, goodsQty, warehouseGoodsQty, purPrice, retailPrice, salesTaxRate, cretime, isChange });Console.WriteLine(提交成功更新{0}条数据插入{1}条数据, UpdNumber, InsNumber);}catch (Exception ex){Console.WriteLine(线程{0}内部报错{1}, TID, ex.Message);}}lock (locker){outval.AddRange(productInfos);finishcount;Monitor.Pulse(locker); //完成通知等待队列,告知已完执行下一个。}}).Start();}lock (locker){while (finishcount ! _ThreadCount){Monitor.Wait(locker);//等待}}IEnumerableProductInfoDO infoDOs outval.OrderByDescending(x x.UpdateTime).GroupBy(x new { x.DslSku, x.barcode }).Select(y y.First());Console.WriteLine(开始更新商品对码表,待更新数:{0}, infoDOs.Count());if (infoDOs.Count() 0){Console.WriteLine(正在提交...);ChangeProductCheckCode(infoDOs.ToList());Dictionarystring, string pairs new Dictionarystring, string();pairs.Add(JsonStr, CommonFun.Base64Encode(JsonConvert.SerializeObject(infoDOs)));string rel HttpHelper.HttpPost(..., pairs);Console.WriteLine(商品对码表跟新成功,商品信息表更新:{0}, rel);}Console.WriteLine(【全量更新所有门店】执行完毕线程挂起24小时...[Start:{0}|End:{1}], startTime, DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss));#region 异步获取商品信息async TaskListProductS GetProductInfo(string orgid){ListProductS products new ListProductS();string res await Task.Run(() {int pageNum 1;int pageSize 1000;while (true){QueryStockReq req3 new QueryStockReq();req3.goodsno ;req3.goodsName ;req3.pageNum pageNum;req3.pageSize pageSize;req3.placePointNo orgid;Console.WriteLine(门店【{1}】开始获取第{0}页数据-线程ID【{2}】, req3.pageNum.ToString(), orgid, Thread.CurrentThread.ManagedThreadId.ToString(00));DslERPReq dslERP23 new DslERPReq(req3);DslERPResQueryStockRes res3 dSL_ERP.queryStock(dslERP23);if (res3 ! null res3.code 0){if (res3.data ! null res3.data.list ! null){if (res3.data.total 0){products.AddRange(res3.data.list);if (res3.data.pages pageNum){break;}pageNum;}else{Console.WriteLine(未查询到更新数据返回数据{0},数据JSON:【{1}】, res3.msg, JsonConvert.SerializeObject(res3));break;}}else{Console.WriteLine(门店{0}第{1}页数据拉取失败返回信息{2}, orgid, req3.pageNum.ToString(), res3.msg);break;}}else{Console.WriteLine(门店:{0}第{1}页数据发起请求失败{2}, orgid, req3.pageNum.ToString(), res3.msg);}}return ;});return products;}#endregion}伪代码展示 public static void GetProductInfo(List门店信息实体 门店信息实体集合, int 处理线程数量 1){if (处理线程数量 门店信息实体集合.Count){处理线程数量 1;}string startTime DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss);ListList门店信息实体 门店分组集合 门店信息实体集合.Select((value, index) new { Index index, Value value }).GroupBy(x x.Index / (门店信息实体集合.Count() / 处理线程数量)).Select(x x.Select(v v.Value).ToList()).ToList();List商品对码实体 outval new List商品对码实体();int _ThreadCount 门店分组集合.Count();int finishcount 0;object locker new object();//线程锁foreach (var 门店实体集合 in 门店分组集合){new Thread(async () {string TID Thread.CurrentThread.ManagedThreadId.ToString(00);int count 0;ListProductInfoDO productInfos new ListProductInfoDO();List库存数据实体 总部库存数据集合 new List库存数据实体();foreach (var StoreInfo in 门店实体集合){count;Console.WriteLine(线程ID[{0}]剩余【{1}/{2}】, TID, 门店实体集合.Count, count);string orgid StoreInfo.门店ID;//这里是异步从总部库存信息接口获取商品库存信息ListProductS products await GetProductInfo(orgid);#region 模型转换foreach (var product in products){库存数据实体dsl new 库存数据实体();dsl.placePointNo orgid;dsl.goodsNo product.goodsNo;dsl.goodsId product.goodsId;dsl.goodsName product.goodsName;dsl.goodsType string.IsNullOrWhiteSpace(product.goodsType) ? 无 : product.goodsType;dsl.prodArea product.prodArea ?? 无;dsl.barcode string.IsNullOrWhiteSpace(product.barcode) ? 无 : product.barcode;dsl.goodsQty product.goodsQty;dsl.warehouseGoodsQty product.warehouseGoodsQty ?? 0;dsl.purPrice product.purPrice ?? 0;dsl.retailPrice product.retailPrice;dsl.salesTaxRate product.salesTaxRate;dsl.cretime DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss);dsl.isChange true;总部库存数据集合.Add(dsl);ProductInfoDO infoDO dSL_ERP.GetCodeCahe(product.goodsNo.ToString());if (infoDO null){infoDO new ProductInfoDO();infoDO.LocalSku product.goodsNo.ToString();}infoDO.Name product.goodsName;infoDO.DslSku product.goodsNo.ToString();infoDO.goodsType product.goodsType;infoDO.proarea product.prodArea;infoDO.barcode product.barcode;infoDO.CreateTime DateTime.Now;infoDO.UpdateTime DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss);infoDO.Operator 系统自动更新;infoDO.IsDelete false;productInfos.Add(infoDO);}#endregion}//注意这里我为什么要用count来判断这组是否执行完成因为我们操作数据库的语句优化为了异步的//而Look语句内是不支持await 语句的所有利用组合里的门店数和已处理数来判断if (count 门店实体集合.Count){ //分配的门店处理完成后再同一操作数据库Liststring SotoreIds 门店实体集合.Select(t t.门店ID).ToList();// 查询出这个批次里所有门店的老的库存数据List库存数据实体 Oldstocks _sqlsugar_DSL.GetModelList库存数据实体(StoreStock, t SotoreIds.Contains(t.门店ID)); //List库存数据实体 newStocks 总部库存数据集合.Except(Oldstocks, new StoreStockDOComparer()).ToList();List库存数据实体 commonStocks 总部库存数据集合.Except(newStocks, new StoreStockDOComparer()).ToList();try{if (Oldstocks.Count 0){var diffent Oldstocks.Except(总部库存数据集合, new StoreStockDOComparer()).ToList();diffent.ForEach(goodsInfo {goodsInfo.goodsQty 0;goodsInfo.warehouseGoodsQty 0;});commonStocks.AddRange(diffent);}Console.WriteLine(请稍等正在提交...);int InsNumber 0;if (newStocks.Count 0){InsNumber await _sqlsugar_DSL.AsynBulkCopyModel(newStocks, StoreStock, t new { t.goodsNo, t.placePointNo });}int UpdNumber await _sqlsugar_DSL.AsynBulkUpdateModel(commonStocks, StoreStock, new string[] { goodsNo, placePointNo }, new string[] { placePointNo, goodsNo, goodsId, goodsName, goodsType, prodArea, barcode, goodsQty, warehouseGoodsQty, purPrice, retailPrice, salesTaxRate, cretime, isChange });Console.WriteLine(提交成功更新{0}条数据插入{1}条数据, UpdNumber, InsNumber);}catch (Exception ex){Console.WriteLine(线程{0}内部报错{1}, TID, ex.Message);}}lock (locker){outval.AddRange(productInfos);finishcount;Monitor.Pulse(locker); //完成通知等待队列,告知已完执行下一个。}}).Start();}lock (locker){while (finishcount ! _ThreadCount){Monitor.Wait(locker);//等待}}IEnumerableProductInfoDO infoDOs outval.OrderByDescending(x x.UpdateTime).GroupBy(x new { x.DslSku, x.barcode }).Select(y y.First());Console.WriteLine(开始更新商品对码表,待更新数:{0}, infoDOs.Count());if (infoDOs.Count() 0){Console.WriteLine(正在提交...);ChangeProductCheckCode(infoDOs.ToList());Dictionarystring, string pairs new Dictionarystring, string();pairs.Add(JsonStr, CommonFun.Base64Encode(JsonConvert.SerializeObject(infoDOs)));string rel HttpHelper.HttpPost(..., pairs);Console.WriteLine(商品对码表跟新成功,商品信息表更新:{0}, rel);}Console.WriteLine(【全量更新所有门店】执行完毕线程挂起24小时...[Start:{0}|End:{1}], startTime, DateTime.Now.ToString(yyyy-MM-dd HH:mm:ss));#region 异步获取商品信息async TaskListProductS GetProductInfo(string orgid){ListProductS products new ListProductS();string res await Task.Run(() {int pageNum 1;int pageSize 1000;while (true){QueryStockReq req3 new QueryStockReq();req3.goodsno ;req3.goodsName ;req3.pageNum pageNum;req3.pageSize pageSize;req3.placePointNo orgid;Console.WriteLine(门店【{1}】开始获取第{0}页数据-线程ID【{2}】, req3.pageNum.ToString(), orgid, Thread.CurrentThread.ManagedThreadId.ToString(00));DslERPReq dslERP23 new DslERPReq(req3);DslERPResQueryStockRes res3 dSL_ERP.queryStock(dslERP23);if (res3 ! null res3.code 0){if (res3.data ! null res3.data.list ! null){if (res3.data.total 0){products.AddRange(res3.data.list);if (res3.data.pages pageNum){break;}pageNum;}else{Console.WriteLine(未查询到更新数据返回数据{0},数据JSON:【{1}】, res3.msg, JsonConvert.SerializeObject(res3));break;}}else{Console.WriteLine(门店{0}第{1}页数据拉取失败返回信息{2}, orgid, req3.pageNum.ToString(), res3.msg);break;}}else{Console.WriteLine(门店:{0}第{1}页数据发起请求失败{2}, orgid, req3.pageNum.ToString(), res3.msg);}}return ;});return products;}#endregion}优化了那些问题
从总部取数据变为了异步多线程避免了线程阻塞门店按组合处理减小了数据库操作次数操作数据库改为异步避免阻塞
总结
以上就是我个人所经历的大数据处理虽然优化的并不算完美但是还是总结出了不少经验也从中学习到了很多比如单线程和多线程的运用以及委托和线程同步等知识。