PantentQueueService.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. package cn.cslg.pas.service.upLoadPatent;
  2. import cn.cslg.pas.common.model.vo.UploadParamsVO;
  3. import cn.cslg.pas.common.model.vo.UploadSettingVO;
  4. import cn.cslg.pas.common.utils.ReadExcelUtils;
  5. import cn.cslg.pas.domain.PatentData;
  6. import cn.cslg.pas.domain.Task;
  7. import cn.cslg.pas.service.TaskService;
  8. import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
  9. import lombok.RequiredArgsConstructor;
  10. import org.springframework.context.annotation.Lazy;
  11. import org.springframework.stereotype.Component;
  12. import org.springframework.stereotype.Service;
  13. import javax.annotation.PostConstruct;
  14. import java.util.ArrayList;
  15. import java.util.LinkedList;
  16. import java.util.List;
  17. import java.util.Queue;
  18. import java.util.concurrent.CountDownLatch;
  19. /**
  20. * 将专利信息存入队列或从队列取出
  21. *
  22. * @author 李仁杰
  23. */
  24. @Service
  25. @RequiredArgsConstructor(onConstructor_ = {@Lazy})
  26. public class PantentQueueService {
  27. private final ExcuteDataToVOService excuteDataToVOService;
  28. private final ExcuteUploadSettingService excuteUploadSettingService;
  29. private final MessageService messageService;
  30. private final TaskService taskService;
  31. private Queue<UploadParamsVO> queue = new LinkedList<>();
  32. private List<Integer> taskQueueList = new ArrayList<>();
  33. private Boolean flag = false;
  34. private CountDownLatch patentLatch = new CountDownLatch(1);
  35. private CountDownLatch taskLatch = new CountDownLatch(1);
  36. //将专利信息存入队列
  37. public void addPatnetToQueue() {
  38. try {
  39. while (true){
  40. //检查任务队列
  41. if (taskQueueList.size() == 0) {
  42. taskLatch.await();
  43. }
  44. //查找 taskQueueList 中有没有进行中的任务
  45. long count = taskService.count(new LambdaQueryWrapper<Task>().in(Task::getId, taskQueueList).eq(Task::getStatus, 1));
  46. //若没有,则取出第一个队列中的任务开始执行
  47. Task task = null;
  48. if (count == 0) {
  49. task = taskService.getById(taskQueueList.get(0));
  50. //任务队列去除该任务
  51. taskQueueList.remove(0);
  52. //修改该任务状态,改为进行中
  53. Task currentTask = new Task();
  54. currentTask.setId(task.getId());
  55. currentTask.setStatus(1);
  56. taskService.updateById(currentTask);
  57. }
  58. //获得文件路径
  59. String filePath = task.getUrl();
  60. //检查文件合法性
  61. Integer totalRow = ReadExcelUtils.textExcel(filePath);
  62. //调用解析数据类,根据数据来源id(如1:智慧芽)解析数据源配置文件信息
  63. List<UploadSettingVO.Column> jsonData = excuteUploadSettingService.ExcuteUploadSetting(1 + "");
  64. //遍历专利总数量,在循环中保存专利
  65. for (int i = 1; i <= totalRow; i++) {
  66. //解析读取一行专利
  67. PatentData patentData = ReadExcelUtils.readExcelOneRow(filePath, i);
  68. //调用装载数据类,专利数据转换为VO对象
  69. UploadParamsVO uploadParamsVO = excuteDataToVOService.fileToPatentVO(patentData, jsonData);
  70. //一个专利加入队列
  71. queue.add(uploadParamsVO);
  72. //通知消费者线程
  73. patentLatch.countDown();
  74. //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
  75. Long percentage = totalRow == 0 ? 0 : Math.round((totalRow.equals(i) ? (i * 1D) : (i + 1D)) / totalRow * 100D);
  76. messageService.sendWebsocketMessage(task, totalRow, i, percentage);
  77. }
  78. //全部循环结束后,发送进度
  79. Long percentage = 100L;
  80. messageService.sendWebsocketMessage(task, totalRow, totalRow, percentage);
  81. }
  82. } catch (Exception e) {
  83. e.printStackTrace();
  84. } finally {
  85. }
  86. flag = true;
  87. }
  88. //将专利信息从队列取出
  89. public void pushPantentToDb() throws InterruptedException {
  90. try {
  91. while (true) {
  92. if (queue.isEmpty()) {
  93. if (flag) {
  94. System.out.println("退出循环");
  95. return;
  96. } else {
  97. patentLatch.await();
  98. }
  99. } else {
  100. UploadParamsVO uploadParamsVO = queue.remove();
  101. System.out.println("出队列" + uploadParamsVO);
  102. }
  103. }
  104. } finally {
  105. }
  106. }
  107. public void queueAddTask(List<Integer> taskQueueList) {
  108. this.taskQueueList.addAll(taskQueueList);
  109. }
  110. public void awakeTasktch(){
  111. this.taskLatch.countDown();
  112. }
  113. }