PantentQueueService.java 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. package cn.cslg.pas.service.upLoadPatent;
  2. import cn.cslg.pas.common.model.vo.ProjectImportPatentVO;
  3. import cn.cslg.pas.common.model.vo.UploadParamsVO;
  4. import cn.cslg.pas.common.model.vo.UploadSettingVO;
  5. import cn.cslg.pas.common.utils.DateUtils;
  6. import cn.cslg.pas.common.utils.FileUtils;
  7. import cn.cslg.pas.common.utils.JsonUtils;
  8. import cn.cslg.pas.common.utils.ReadExcelUtils;
  9. import cn.cslg.pas.domain.PatentData;
  10. import cn.cslg.pas.domain.QueueData;
  11. import cn.cslg.pas.domain.Task;
  12. import cn.cslg.pas.service.TaskService;
  13. import cn.cslg.pas.service.UploadPatentBatchService;
  14. import lombok.RequiredArgsConstructor;
  15. import org.apache.poi.ss.usermodel.Sheet;
  16. import org.springframework.context.annotation.Lazy;
  17. import org.springframework.stereotype.Service;
  18. import java.io.IOException;
  19. import java.util.*;
  20. import java.util.concurrent.locks.Condition;
  21. import java.util.concurrent.locks.Lock;
  22. import java.util.concurrent.locks.ReentrantLock;
  23. /**
  24. * 将专利信息存入队列或从队列取出
  25. */
  26. @Service
  27. @RequiredArgsConstructor(onConstructor_ = {@Lazy})
  28. public class PantentQueueService {
  29. private final ExcuteDataToVOService excuteDataToVOService;
  30. private final ExcuteUploadSettingService excuteUploadSettingService;
  31. private final UploadPatentToDBService uploadPatentToDBService;
  32. private final UploadPatentBatchService uploadPatentBatchService;
  33. private final FileUtils fileUtils;
  34. private final MessageService messageService;
  35. private final TaskService taskService;
  36. private Queue<QueueData> patentImageQueue = new LinkedList<>();
  37. private Queue<QueueData> patentZhuluQueue = new LinkedList<>();
  38. private Queue<QueueData> patentRightQueue = new LinkedList<>();
  39. private Queue<QueueData> patentInstructionTextQueue = new LinkedList<>();
  40. private Queue<QueueData> patentAssoQueue = new LinkedList<>();
  41. private List<Integer> taskQueueList = new ArrayList<>();
  42. private Boolean flag = false;
  43. private Lock taskLock = new ReentrantLock();
  44. private Lock patentImageLock = new ReentrantLock();
  45. private Lock patentZhuluLock = new ReentrantLock();
  46. private Lock patentRightLock = new ReentrantLock();
  47. private Lock patentInstructionTextLock = new ReentrantLock();
  48. private Lock patentAssoLock = new ReentrantLock();
  49. Condition taskCondition = taskLock.newCondition();
  50. private Condition patentImageCondition = patentImageLock.newCondition();
  51. private Condition patentZhuluCondition = patentZhuluLock.newCondition();
  52. private Condition patentRightCondition = patentRightLock.newCondition();
  53. private Condition patentInstructionTextCondition = patentInstructionTextLock.newCondition();
  54. private Condition patentAssoCondition = patentAssoLock.newCondition();
  55. private HashMap<Integer, Integer> patentIdMap = new HashMap<>();
  56. private Task task = null;
  57. private Integer patentFinishNum = 0;
  58. /**
  59. * 生产者:从任务队列取出任务解析成专利实体类,分配给三个消费者队列
  60. */
  61. public void addPatnetToQueue() {
  62. try {
  63. while (true) {
  64. //检查任务队列
  65. if (taskQueueList.size() == 0) {
  66. taskLock.lock();
  67. taskCondition.await();
  68. }
  69. //1.从taskQueueList中取出第一个task,将其状态改为进行中,并将其从任务队列taskQueueList中删除
  70. task = taskService.getById(taskQueueList.get(0));
  71. task.setStatus(1);
  72. taskService.updateById(task);
  73. taskQueueList.remove(0);
  74. //从任务中取出文件路径、总条数、成功条数、前台参数json
  75. String filePath = task.getUrl(); //相对路径
  76. filePath = fileUtils.getPath(filePath); //绝对路径
  77. Integer total = task.getTotal();
  78. patentIdMap.put(0, total);
  79. int lastIndex = task.getSuccessNum();
  80. String json = task.getPramJson();
  81. ProjectImportPatentVO projectImportPatentVO = JsonUtils.jsonToPojo(json, ProjectImportPatentVO.class);
  82. //解析数据源类,通过数据来源id(如1:智慧芽)解析数据源配置文件,获得数据源配置文件对象jsonData
  83. List<UploadSettingVO.Column> jsonData = excuteUploadSettingService.ExcuteUploadSetting(projectImportPatentVO.getSourceId());
  84. //解析Excel文件获得工作簿
  85. Sheet sheet = ReadExcelUtils.readExcel(filePath);
  86. //遍历专利总数量,在循环中保存专利
  87. for (int i = lastIndex; i < total; i++) {
  88. PatentData patentData = ReadExcelUtils.readExcelOneRow(filePath, sheet, i + 1);
  89. //调用装载数据类,专利数据转换为VO对象
  90. UploadParamsVO uploadParamsVO = excuteDataToVOService.fileToPatentVO(patentData, jsonData);
  91. //保存专利基础数据(专利表"os_patent")
  92. uploadPatentBatchService.getOneOrInsertOne(uploadParamsVO);
  93. //专利分别加入5个消费者队列
  94. QueueData queueData = new QueueData()
  95. .setUploadParamsVO(uploadParamsVO)
  96. .setProjectImportPatentVO(projectImportPatentVO);
  97. patentImageQueue.add(queueData);
  98. patentZhuluQueue.add(queueData);
  99. patentRightQueue.add(queueData);
  100. patentInstructionTextQueue.add(queueData);
  101. patentAssoQueue.add(queueData);
  102. //通知消费者线程(5个消费者:摘要附图、著录项目、权利要求文本、说明书文本、与专利关联数据)
  103. //消费者1摘要附图
  104. patentImageLock.lock();
  105. patentImageCondition.signalAll();
  106. patentImageLock.unlock();
  107. //消费者2著录项目
  108. patentZhuluLock.lock();
  109. patentZhuluCondition.signalAll();
  110. patentZhuluLock.unlock();
  111. //消费者3权利要求文本
  112. patentRightLock.lock();
  113. patentRightCondition.signalAll();
  114. patentRightLock.unlock();
  115. //消费者4说明书文本
  116. patentInstructionTextLock.lock();
  117. patentInstructionTextCondition.signalAll();
  118. patentInstructionTextLock.unlock();
  119. //消费者5与专利关联数据
  120. patentAssoLock.lock();
  121. patentAssoCondition.signalAll();
  122. patentAssoLock.unlock();
  123. }
  124. }
  125. } catch (Exception e) {
  126. e.printStackTrace();
  127. //任务表更新状态为失败
  128. task.setStatus(3);
  129. task.setEndTime(DateUtils.getDateTime());
  130. taskService.updateById(task);
  131. }
  132. flag = true;
  133. }
  134. /**
  135. * 消费者1:将专利从队列取出,摘要附图入库
  136. */
  137. public void pushPatentImageToDB() throws InterruptedException, IOException {
  138. while (true) {
  139. try {
  140. if (patentImageQueue.isEmpty()) {
  141. if (flag) {
  142. System.out.println("摘要附图全部完成,退出循环");
  143. return;
  144. } else {
  145. patentImageLock.lock();
  146. patentImageCondition.await();
  147. patentImageLock.unlock();
  148. }
  149. } else {
  150. QueueData queueData = patentImageQueue.remove();
  151. //摘要附图入库
  152. uploadPatentToDBService.uploadPatentImage(queueData.getUploadParamsVO());
  153. //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
  154. sendMessage(queueData, patentIdMap.get(0), task);
  155. }
  156. } catch (Exception e) {
  157. e.printStackTrace();
  158. }
  159. }
  160. }
  161. /**
  162. * 消费者2:将专利从队列取出,著录项目入库
  163. */
  164. public void pushPatentZhuLuToDB() throws InterruptedException, IOException {
  165. while (true) {
  166. try {
  167. if (patentZhuluQueue.isEmpty()) {
  168. if (flag) {
  169. System.out.println("著录项目全部完成,退出循环");
  170. return;
  171. } else {
  172. patentZhuluLock.lock();
  173. patentZhuluCondition.await();
  174. patentZhuluLock.unlock();
  175. }
  176. } else {
  177. QueueData queueData = patentZhuluQueue.remove();
  178. //著录项目入库
  179. uploadPatentToDBService.uploadPatentZhulu(queueData.getUploadParamsVO());
  180. //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
  181. sendMessage(queueData, patentIdMap.get(0), task);
  182. }
  183. } catch (Exception e) {
  184. e.printStackTrace();
  185. }
  186. }
  187. }
  188. /**
  189. * 消费者3:将专利从队列取出,权利要求文本入库
  190. */
  191. public void pushPatentRightToDB() throws InterruptedException, IOException {
  192. while (true) {
  193. try {
  194. if (patentRightQueue.isEmpty()) {
  195. if (flag) {
  196. System.out.println("权利要求全部完成,退出循环");
  197. return;
  198. } else {
  199. patentRightLock.lock();
  200. patentRightCondition.await();
  201. patentRightLock.unlock();
  202. }
  203. } else {
  204. QueueData queueData = patentRightQueue.remove();
  205. //权要文本入库
  206. uploadPatentToDBService.uploadPatentRight(queueData.getUploadParamsVO());
  207. //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
  208. sendMessage(queueData, patentIdMap.get(0), task);
  209. }
  210. } catch (Exception e) {
  211. e.printStackTrace();
  212. }
  213. }
  214. }
  215. /**
  216. * 消费者4:将专利从队列取出,说明书文本入库
  217. */
  218. public void pushPatentInstructionTextToDB() throws InterruptedException, IOException {
  219. while (true) {
  220. try {
  221. if (patentInstructionTextQueue.isEmpty()) {
  222. if (flag) {
  223. System.out.println("说明书文本全部完成,退出循环");
  224. return;
  225. } else {
  226. patentInstructionTextLock.lock();
  227. patentInstructionTextCondition.await();
  228. patentInstructionTextLock.unlock();
  229. }
  230. } else {
  231. QueueData queueData = patentInstructionTextQueue.remove();
  232. //说明书文本入库
  233. uploadPatentToDBService.uploadPatentInstructionText(queueData.getUploadParamsVO());
  234. //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
  235. sendMessage(queueData, patentIdMap.get(0), task);
  236. }
  237. } catch (Exception e) {
  238. e.printStackTrace();
  239. }
  240. }
  241. }
  242. /**
  243. * 消费者5:将专利从队列取出,与专利关联数据入库
  244. */
  245. public void pushPatentAssoToDB() throws InterruptedException, IOException {
  246. while (true) {
  247. try {
  248. if (patentAssoQueue.isEmpty()) {
  249. if (flag) {
  250. System.out.println("与专利关联的数据全部完成,退出循环");
  251. return;
  252. } else {
  253. patentAssoLock.lock();
  254. patentAssoCondition.await();
  255. patentAssoLock.unlock();
  256. }
  257. } else {
  258. QueueData queueData = patentAssoQueue.remove();
  259. //专题库与专利关联入库
  260. uploadPatentToDBService.uploadAssoThemaPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
  261. //自定义字段标引与专利关联入库
  262. uploadPatentToDBService.uploadAssoFieldPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
  263. //文件夹与专利关联入库
  264. uploadPatentToDBService.uploadAssoPorPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
  265. //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
  266. sendMessage(queueData, patentIdMap.get(0), task);
  267. }
  268. } catch (Exception e) {
  269. e.printStackTrace();
  270. }
  271. }
  272. }
  273. public void sendMessage(QueueData queueData, Integer total, Task task) {
  274. //每完成一个专利,通过计算,发送进度
  275. synchronized ("此为同步锁") {
  276. Integer currentPatentId = queueData.getUploadParamsVO().getPatent().getId();
  277. Integer num = patentIdMap.get(currentPatentId);
  278. if (num == null) {
  279. patentIdMap.put(currentPatentId, 1);
  280. } else if (num < 4) {
  281. patentIdMap.put(currentPatentId, ++num);
  282. } else {
  283. //num达到3了就从patentIdMap中删除
  284. patentIdMap.remove(currentPatentId);
  285. patentFinishNum++;
  286. long percentage = Math.round((patentFinishNum + 1D) / total * 100D);
  287. //当全部完成时
  288. if (patentFinishNum.equals(total)) {
  289. percentage = 100L;
  290. //任务表更新状态为成功
  291. task.setStatus(2);
  292. task.setEndTime(DateUtils.getDateTime());
  293. taskService.updateById(task);
  294. System.out.println("结束时间:" + new Date());
  295. }
  296. messageService.sendWebsocketMessage(task, total, patentFinishNum, percentage);
  297. //任务表更新数据
  298. task.setSuccessNum(patentFinishNum);
  299. task.setDefaultNum(total - task.getSuccessNum());
  300. taskService.updateById(task);
  301. }
  302. }
  303. }
  304. public void queueAddTask(List<Integer> taskQueueList) {
  305. this.taskQueueList.addAll(taskQueueList);
  306. }
  307. public void awakeTasktch() {
  308. taskLock.lock();
  309. taskCondition.signalAll();
  310. taskLock.unlock();
  311. }
  312. }