TaskThread.java 8.3 KB


  1. package cn.cslg.pas.service.importPatent;
  2. import cn.cslg.pas.common.core.base.Constants;
  3. import cn.cslg.pas.common.dto.UploadPatentWebDTO;
  4. import cn.cslg.pas.common.model.cronModel.ImportTaskConfig;
  5. import cn.cslg.pas.common.model.cronModel.OrderConfig;
  6. import cn.cslg.pas.common.utils.FormatUtil;
  7. import cn.cslg.pas.common.utils.MathUtils;
  8. import cn.cslg.pas.common.vo.ImportTaskAMVO;
  9. import cn.cslg.pas.domain.business.ImportTask;
  10. import cn.cslg.pas.domain.es.Patent;
  11. import cn.cslg.pas.factorys.PatentImportFactory.PatentImportFactory;
  12. import cn.cslg.pas.factorys.PatentImportFactory.PatentImportImp;
  13. import cn.cslg.pas.service.business.CommonService;
  14. import cn.cslg.pas.service.business.ImportTaskService;
  15. import cn.cslg.pas.service.common.MessageService;
  16. import com.alibaba.fastjson.JSON;
  17. import io.swagger.v3.oas.models.security.SecurityScheme;
  18. import org.springframework.beans.BeanUtils;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.beans.factory.annotation.Configurable;
  21. import org.springframework.context.ApplicationContext;
  22. import org.springframework.stereotype.Component;
  23. import java.util.ArrayList;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.concurrent.locks.Condition;
  27. import java.util.concurrent.locks.Lock;
  28. import java.util.concurrent.locks.ReentrantLock;
  29. import java.util.stream.Collectors;
  30. @Configurable
  31. public class TaskThread extends Thread {
  32. @Autowired
  33. SchedulingTaskService importTaskAdd;
  34. private ApplicationContext applicationContext;
  35. private final Lock taskThreadLock = new ReentrantLock();
  36. private final Condition taskThreadCondition = taskThreadLock.newCondition();
  37. private ImportTaskAMVO importTaskAMVO;
  38. private PatentProcess patentProcess = new PatentProcess();
  39. private List<UploadPatentWebDTO> uploadPatentWebDTOS = new ArrayList<>();
  40. /**
  41. * 从任务队列取出并执行任务
  42. */
  43. public TaskThread(ImportTaskAMVO importTaskAMVO, ApplicationContext applicationContext) {
  44. this.importTaskAMVO = importTaskAMVO;
  45. //十进制转二进制
  46. String imContents = "0000";
  47. //当任务为
  48. if (importTaskAMVO.getType().equals(1)) {
  49. imContents = "1100";
  50. } else if (!importTaskAMVO.getImportContent().equals(0)) {
  51. imContents = MathUtils.fun(2, importTaskAMVO.getImportContent());
  52. }
  53. //下载字段
  54. char[] importCells = imContents.toCharArray();
  55. char ifCataloguing = importCells[0];
  56. char ifAddPicture = importCells[1];
  57. char ifFullText = importCells[2];
  58. char ifPdf = importCells[3];
  59. importTaskAMVO.setIfAddCatalogue(ifCataloguing);
  60. importTaskAMVO.setIfAddPicture(ifAddPicture);
  61. importTaskAMVO.setIfAddFullText(ifFullText);
  62. importTaskAMVO.setIfAddPDF(ifPdf);
  63. importTaskAMVO.setThreadDoneCounter(0);
  64. importTaskAMVO.setMessageThreadDoneCounter(0);
  65. Integer threadCount = 0;
  66. Integer messageThreadCount = 0;
  67. if (ifAddPicture == '1') {
  68. threadCount += 1;
  69. this.patentProcess.setPictureDoneNum(importTaskAMVO.getDoneNum());
  70. this.patentProcess.setPictureDefaultNum(0);
  71. }
  72. if (ifCataloguing == '1' || ifFullText == '1') {
  73. threadCount += 1;
  74. this.patentProcess.setPatentMessageDefaultNum(0);
  75. this.patentProcess.setPatentMessageDoneNum(0);
  76. if (ifCataloguing == '1') {
  77. messageThreadCount += 1;
  78. }
  79. if (ifFullText == '1') {
  80. messageThreadCount += 1;
  81. }
  82. }
  83. if (ifPdf == '1') {
  84. this.patentProcess.setPdfDefaultNum(0);
  85. this.patentProcess.setPdfDoneNum(0);
  86. threadCount += 1;
  87. }
  88. importTaskAMVO.setMessageThreadCounter(messageThreadCount);
  89. importTaskAMVO.setThreadCounter(threadCount);
  90. this.applicationContext = applicationContext;
  91. this.patentProcess.setPatentDoneNum(importTaskAMVO.getDoneNum());
  92. this.patentProcess.setPatentDefaultNum(0);
  93. }
  94. @Override
  95. public void run() {
  96. //TODO 判断任务的类型
  97. String json = CommonService.readJsonFile(Constants.IMPORT_TASK_CONFIG + ".json");
  98. List<ImportTaskConfig> configs = JSON.parseArray(json, ImportTaskConfig.class);
  99. PatentImportFactory patentImportFactory = applicationContext.getBean(PatentImportFactory.class);
  100. //根据任务类型使用不同的导入类
  101. ImportTaskConfig config = configs.stream().filter(item -> item.getTaskType().equals(importTaskAMVO.getType()) && item.getImportTo().equals(Constants.IMPORT_PATENT_TO)).findFirst().orElse(null);
  102. PatentImportImp patentImportImp = patentImportFactory.getClass(config.getImportClass());
  103. patentImportImp.setTaskThread(this);
  104. patentImportImp.startPatentThread();
  105. taskThreadLock.lock();
  106. try {
  107. taskThreadCondition.await();
  108. } catch (InterruptedException e) {
  109. e.printStackTrace();
  110. }
  111. ImportTaskService importTaskService = applicationContext.getBean(ImportTaskService.class);
  112. ImportTask task = importTaskService.getById(importTaskAMVO.getId());
  113. if (importTaskAMVO.getState().equals(1)) {
  114. task.setState(2);
  115. importTaskAMVO.setState(2);
  116. } else {
  117. task.setState(importTaskAMVO.getState());
  118. }
  119. task.setDoneNum(this.patentProcess.getPatentDoneNum());
  120. importTaskAMVO.setDoneNum(this.patentProcess.getPatentDoneNum());
  121. task.updateById();
  122. MessageService messageService = applicationContext.getBean(MessageService.class);
  123. messageService.sendAllDoneMessage(importTaskAMVO);
  124. }
  125. /**
  126. * 任务是否完成代码块
  127. */
  128. public void awakeTaskThread() {
  129. synchronized ("导入任务是否完成") {
  130. importTaskAMVO.setThreadDoneCounter(importTaskAMVO.getThreadDoneCounter() + 1);
  131. if (importTaskAMVO.getThreadCounter().equals(importTaskAMVO.getThreadDoneCounter())) {
  132. if (taskThreadLock.tryLock()) {
  133. //taskLock.lock();
  134. taskThreadCondition.signalAll();
  135. taskThreadLock.unlock();
  136. }
  137. MessageService messageService = applicationContext.getBean(MessageService.class);
  138. messageService.sendWebsocketMessage(importTaskAMVO, -1, patentProcess);
  139. }
  140. }
  141. }
  142. public ImportTaskAMVO getImportTaskAMVO() {
  143. return this.importTaskAMVO;
  144. }
  145. public ApplicationContext getApplicationContext() {
  146. return this.applicationContext;
  147. }
  148. public void updateProcess(Boolean ifDefault, Integer type, String appNo) {
  149. synchronized ("更新进度") {
  150. switch (type) {
  151. case 1:
  152. patentProcess.setPatentMessageDoneNum(patentProcess.getPatentMessageDoneNum() + 1);
  153. if (ifDefault) {
  154. patentProcess.setPatentMessageDefaultNum(patentProcess.getPatentMessageDefaultNum() + 1);
  155. }
  156. break;
  157. case 2:
  158. patentProcess.setPictureDoneNum(patentProcess.getPictureDoneNum() + 1);
  159. if (ifDefault) {
  160. patentProcess.setPictureDefaultNum(patentProcess.getPictureDefaultNum() + 1);
  161. }
  162. break;
  163. case 4:
  164. patentProcess.setPdfDoneNum(patentProcess.getPdfDoneNum() + 1);
  165. if (ifDefault) {
  166. patentProcess.setPdfDefaultNum(patentProcess.getPdfDefaultNum() + 1);
  167. }
  168. break;
  169. }
  170. List<Integer> nums = new ArrayList<>();
  171. nums.add(patentProcess.getPictureDoneNum());
  172. nums.add(patentProcess.getPdfDoneNum());
  173. nums.add(patentProcess.getPatentMessageDoneNum());
  174. Integer min = MathUtils.getNotNullMinNum(nums);
  175. if (patentProcess.getPatentDoneNum() < min) {
  176. patentProcess.setPatentDoneNum(min);
  177. importTaskAMVO.setDoneNum(min);
  178. MessageService messageService = applicationContext.getBean(MessageService.class);
  179. messageService.sendWebsocketMessage(importTaskAMVO, -1, this.patentProcess);
  180. }
  181. }
  182. }
  183. }