spring异步service中处理线程数限制详解
情况简介
spring项目,controller异步调用service的方法,产生大量并发。
具体业务:
前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果。
处理方式:
controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库。
本文主要知识点:
多线程同时(异步)调用方法后,开启新线程,并限制线程数量。
代码如下:
@Service publicclassLgtsAsyncServiceImpl{ /**logger日志.*/ publicstaticfinalLoggerLOGGER=Logger.getLogger(LgtsAsyncServiceImpl2.class); privatefinalBlockingQueueque=newLinkedBlockingQueue<>();//待翻译的队列 privatefinalAtomicIntegerthreadCnt=newAtomicInteger(0);//当前翻译中的线程数 privatefinalVector existsKey=newVector<>();//保存已入队列的数据 privatefinalintmaxThreadCnt=2;//允许同时执行的翻译线程数 privatestaticfinalintNUM_OF_EVERY_TIME=50;//每次提交的翻译条数 privatestaticfinalStringtranslationFrom="zh"; @Async publicvoidsaveAsync(Lgtst){ if(Objects.isNull(t)||StringUtils.isAnyBlank(t.getGco(),t.getCode())){ return; } offer(t); save(); return; } privatebooleanoffer(Lgtst){ Stringkey=t.getGco()+"-"+t.getCode(); if(!existsKey.contains(key)){ existsKey.add(key); booleanresult=que.offer(t); //LOGGER.trace("待翻译文字["+t.getGco()+":"+t.getCode()+"]加入队列结果["+result //+"],队列中数据总个数:"+que.size()); returnresult; } returnfalse; } @Autowired privateLgtsServicelgtsService; privatevoidsave(){ intcnt=threadCnt.incrementAndGet();//当前线程数+1 if(cnt>maxThreadCnt){ //已启动的线程大于设置的最大线程数直接丢弃 threadCnt.decrementAndGet();//+1的线程数再-回去 return; } GwallUseruser=UserUtils.getUser(); Threadthr=newThread(){ publicvoidrun(){ longsleepTime=30000l; UserUtils.setUser(user); booleancontinueFlag=true; intmaxContinueCnt=5;//最大连续休眠次数,连续休眠次数超过最大休眠次数后,while循环退出,当前线程销毁 intcontinueCnt=0;//连续休眠次数 while(continueFlag){//队列不为空时执行 if(Objects.isNull(que.peek())){ try{ if(continueCnt>maxContinueCnt){ //连续休眠次数达到最大连续休眠次数,当前线程将销毁。 continueFlag=false; continue; } //队列为空,准备休眠 Thread.sleep(sleepTime); continueCnt++; continue; }catch(InterruptedExceptione){ //休眠失败,无需处理 e.printStackTrace(); } } continueCnt=0;//重置连续休眠次数为0 List params=newArrayList<>(); inttotalCnt=que.size(); que.drainTo(params,NUM_OF_EVERY_TIME); StringBuilderutf8q=newStringBuilder(); Stringcode=""; List needRemove=newArrayList<>(); for(Lgtslgts:params){ if(StringUtils.isAnyBlank(code)){ code=lgts.getCode(); } //移除existsKey中保存的key,以免下面翻译失败时再次加入队列时,加入不进去 Stringkey=lgts.getGco()+"-"+lgts.getCode(); existsKey.remove(key); if(!code.equalsIgnoreCase(lgts.getCode())){//要翻译的目标语言与当前列表中的第一个不一致 offer(lgts);//重新将待翻译的语言放回队列 needRemove.add(lgts); continue; } utf8q.append(lgts.getGco()).append("\n"); } params.removeAll(needRemove); LOGGER.debug("队列中共"+totalCnt+"个,获取"+params.size()+"个符合条件的待翻译内容,编码:"+code); Stringto="en"; if(StringUtils.isAnyBlank(utf8q,to)){ LOGGER.warn("调用翻译出错,未找到["+code+"]对应的百度编码。"); continue; } Map result=getBaiduTranslation(utf8q.toString(),translationFrom,to); if(Objects.isNull(result)||result.isEmpty()){//把没有获取到翻译结果的重新放回队列 for(Lgtslgts:params){ offer(lgts); } LOGGER.debug("本次翻译结果为空。"); continue; } intsucessCnt=0,ignoreCnt=0; for(Lgtslgts:params){ lgts.setBdcode(to); Stringgna=result.get(lgts.getGco()); if(StringUtils.isAnyBlank(gna)){ offer(lgts);//重新将待翻译的语言放回队列 continue; } lgts.setStat(1); lgts.setGna(gna); intsaveResult=lgtsService.saveIgnore(lgts); if(0==saveResult){ ignoreCnt++; }else{ sucessCnt++; } } LOGGER.debug("待翻译个数:"+params.size()+",翻译成功个数:"+sucessCnt+",已存在并忽略个数:"+ignoreCnt); } threadCnt.decrementAndGet();//运行中的线程数-1 distory();//清理数据,必须放在方法最后,否则distory中的判断需要修改 } /** *如果是最后一个线程,清空队列和existsKey中的数据 */ privatevoiddistory(){ if(0==threadCnt.get()){ //最后一个线程退出时,执行清理操作 existsKey.clear(); que.clear(); } } }; thr.setDaemon(true);//守护线程,如果主线程执行完毕,则此线程会自动销毁 thr.setName("baidufanyi-"+RandomUtils.nextInt(1000,9999)); thr.start();//启动插入线程 } /** *百度翻译 * *@paramutf8q *待翻译的字符串,需要utf8格式的 *@paramfrom *百度翻译语言列表中的代码 *参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList *@paramto *百度翻译语言列表中的代码 *参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languageList *@return翻译结果 */ privateMap getBaiduTranslation(Stringutf8q,Stringfrom,Stringto){ Map result=newHashMap<>(); StringbaiduurlStr="http://api.fanyi.baidu.com/api/trans/vip/translate"; if(StringUtils.isAnyBlank(baiduurlStr)){ LOGGER.warn("百度翻译API接口URL相关参数为空!"); returnresult; } Map params=buildParams(utf8q,from,to); if(params.isEmpty()){ returnresult; } StringsendUrl=getUrlWithQueryString(baiduurlStr,params); try{ HttpClienthttpClient=newHttpClient(); httpClient.setMethod("GET"); StringremoteResult=httpClient.pub(sendUrl,""); result=convertRemote(remoteResult); }catch(Exceptione){ LOGGER.info("百度翻译API返回结果异常!",e); } returnresult; } privateMap convertRemote(StringremoteResult){ Map result=newHashMap<>(); if(StringUtils.isBlank(remoteResult)){ returnresult; } JSONObjectjsonObject=JSONObject.parseObject(remoteResult); JSONArraytrans_result=jsonObject.getJSONArray("trans_result"); if(Objects.isNull(trans_result)||trans_result.isEmpty()){ returnresult; } for(Objectobject:trans_result){ JSONObjecttrans=(JSONObject)object; result.put(trans.getString("src"),trans.getString("dst")); } returnresult; } privateMap buildParams(Stringutf8q,Stringfrom,Stringto){ if(StringUtils.isBlank(from)){ from="auto"; } Map params=newHashMap (); StringskStr="sk"; StringappidStr="appid"; if(StringUtils.isAnyBlank(skStr,appidStr)){ LOGGER.warn("百度翻译API接口相关参数为空!"); returnparams; } params.put("q",utf8q); params.put("from",from); params.put("to",to); params.put("appid",appidStr); //随机数 Stringsalt=String.valueOf(System.currentTimeMillis()); params.put("salt",salt); //签名 Stringsrc=appidStr+utf8q+salt+skStr;//加密前的原文 params.put("sign",MD5Util.md5Encrypt(src).toLowerCase()); returnparams; } publicstaticStringgetUrlWithQueryString(Stringurl,Map params){ if(params==null){ returnurl; } StringBuilderbuilder=newStringBuilder(url); if(url.contains("?")){ builder.append("&"); }else{ builder.append("?"); } inti=0; for(Stringkey:params.keySet()){ Stringvalue=params.get(key); if(value==null){//过滤空的key continue; } if(i!=0){ builder.append('&'); } builder.append(key); builder.append('='); builder.append(encode(value)); i++; } returnbuilder.toString(); } /** *对输入的字符串进行URL编码,即转换为%20这种形式 * *@paraminput *原文 *@returnURL编码.如果编码失败,则返回原文 */ publicstaticStringencode(Stringinput){ if(input==null){ return""; } try{ returnURLEncoder.encode(input,"utf-8"); }catch(UnsupportedEncodingExceptione){ e.printStackTrace(); } returninput; } }
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对毛票票的支持。