123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344 |
- package cn.cslg.pas.service.upLoadPatent;
- import cn.cslg.pas.common.model.vo.ProjectImportPatentVO;
- import cn.cslg.pas.common.model.vo.UploadParamsVO;
- import cn.cslg.pas.common.model.vo.UploadSettingVO;
- import cn.cslg.pas.common.utils.DateUtils;
- import cn.cslg.pas.common.utils.FileUtils;
- import cn.cslg.pas.common.utils.JsonUtils;
- import cn.cslg.pas.common.utils.ReadExcelUtils;
- import cn.cslg.pas.domain.PatentData;
- import cn.cslg.pas.domain.QueueData;
- import cn.cslg.pas.domain.Task;
- import cn.cslg.pas.service.TaskService;
- import cn.cslg.pas.service.UploadPatentBatchService;
- import lombok.RequiredArgsConstructor;
- import org.apache.poi.ss.usermodel.Sheet;
- import org.springframework.context.annotation.Lazy;
- import org.springframework.stereotype.Service;
- import java.io.IOException;
- import java.util.*;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.Lock;
- import java.util.concurrent.locks.ReentrantLock;
- /**
- * 将专利信息存入队列或从队列取出
- */
- @Service
- @RequiredArgsConstructor(onConstructor_ = {@Lazy})
- public class PantentQueueService {
- private final ExcuteDataToVOService excuteDataToVOService;
- private final ExcuteUploadSettingService excuteUploadSettingService;
- private final UploadPatentToDBService uploadPatentToDBService;
- private final UploadPatentBatchService uploadPatentBatchService;
- private final FileUtils fileUtils;
- private final MessageService messageService;
- private final TaskService taskService;
- private Queue<QueueData> patentImageQueue = new LinkedList<>();
- private Queue<QueueData> patentZhuluQueue = new LinkedList<>();
- private Queue<QueueData> patentRightQueue = new LinkedList<>();
- private Queue<QueueData> patentInstructionTextQueue = new LinkedList<>();
- private Queue<QueueData> patentAssoQueue = new LinkedList<>();
- private List<Integer> taskQueueList = new ArrayList<>();
- private Boolean flag = false;
- private Lock taskLock = new ReentrantLock();
- private Lock patentImageLock = new ReentrantLock();
- private Lock patentZhuluLock = new ReentrantLock();
- private Lock patentRightLock = new ReentrantLock();
- private Lock patentInstructionTextLock = new ReentrantLock();
- private Lock patentAssoLock = new ReentrantLock();
- Condition taskCondition = taskLock.newCondition();
- private Condition patentImageCondition = patentImageLock.newCondition();
- private Condition patentZhuluCondition = patentZhuluLock.newCondition();
- private Condition patentRightCondition = patentRightLock.newCondition();
- private Condition patentInstructionTextCondition = patentInstructionTextLock.newCondition();
- private Condition patentAssoCondition = patentAssoLock.newCondition();
- private HashMap<Integer, Integer> patentIdMap = new HashMap<>();
- private Task task = null;
- private Integer patentFinishNum = 0;
- /**
- * 生产者:从任务队列取出任务解析成专利实体类,分配给三个消费者队列
- */
- public void addPatnetToQueue() {
- try {
- while (true) {
- //检查任务队列
- if (taskQueueList.size() == 0) {
- taskLock.lock();
- taskCondition.await();
- }
- //1.从taskQueueList中取出第一个task,将其状态改为进行中,并将其从任务队列taskQueueList中删除
- task = taskService.getById(taskQueueList.get(0));
- task.setStatus(1);
- taskService.updateById(task);
- taskQueueList.remove(0);
- //从任务中取出文件路径、总条数、成功条数、前台参数json
- String filePath = task.getUrl(); //相对路径
- filePath = fileUtils.getPath(filePath); //绝对路径
- Integer total = task.getTotal();
- patentIdMap.put(0, total);
- int lastIndex = task.getSuccessNum();
- String json = task.getPramJson();
- ProjectImportPatentVO projectImportPatentVO = JsonUtils.jsonToPojo(json, ProjectImportPatentVO.class);
- //解析数据源类,通过数据来源id(如1:智慧芽)解析数据源配置文件,获得数据源配置文件对象jsonData
- List<UploadSettingVO.Column> jsonData = excuteUploadSettingService.ExcuteUploadSetting(projectImportPatentVO.getSourceId());
- //解析Excel文件获得工作簿
- Sheet sheet = ReadExcelUtils.readExcel(filePath);
- //遍历专利总数量,在循环中保存专利
- for (int i = lastIndex; i < total; i++) {
- PatentData patentData = ReadExcelUtils.readExcelOneRow(filePath, sheet, i + 1);
- //调用装载数据类,专利数据转换为VO对象
- UploadParamsVO uploadParamsVO = excuteDataToVOService.fileToPatentVO(patentData, jsonData);
- //保存专利基础数据(专利表"os_patent")
- uploadPatentBatchService.getOneOrInsertOne(uploadParamsVO);
- //专利分别加入5个消费者队列
- QueueData queueData = new QueueData()
- .setUploadParamsVO(uploadParamsVO)
- .setProjectImportPatentVO(projectImportPatentVO);
- patentImageQueue.add(queueData);
- patentZhuluQueue.add(queueData);
- patentRightQueue.add(queueData);
- patentInstructionTextQueue.add(queueData);
- patentAssoQueue.add(queueData);
- //通知消费者线程(5个消费者:摘要附图、著录项目、权利要求文本、说明书文本、与专利关联数据)
- //消费者1摘要附图
- patentImageLock.lock();
- patentImageCondition.signalAll();
- patentImageLock.unlock();
- //消费者2著录项目
- patentZhuluLock.lock();
- patentZhuluCondition.signalAll();
- patentZhuluLock.unlock();
- //消费者3权利要求文本
- patentRightLock.lock();
- patentRightCondition.signalAll();
- patentRightLock.unlock();
- //消费者4说明书文本
- patentInstructionTextLock.lock();
- patentInstructionTextCondition.signalAll();
- patentInstructionTextLock.unlock();
- //消费者5与专利关联数据
- patentAssoLock.lock();
- patentAssoCondition.signalAll();
- patentAssoLock.unlock();
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- //任务表更新状态为失败
- task.setStatus(3);
- task.setEndTime(DateUtils.getDateTime());
- taskService.updateById(task);
- }
- flag = true;
- }
- /**
- * 消费者1:将专利从队列取出,摘要附图入库
- */
- public void pushPatentImageToDB() throws InterruptedException, IOException {
- while (true) {
- try {
- if (patentImageQueue.isEmpty()) {
- if (flag) {
- System.out.println("摘要附图全部完成,退出循环");
- return;
- } else {
- patentImageLock.lock();
- patentImageCondition.await();
- patentImageLock.unlock();
- }
- } else {
- QueueData queueData = patentImageQueue.remove();
- //摘要附图入库
- uploadPatentToDBService.uploadPatentImage(queueData.getUploadParamsVO());
- //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
- sendMessage(queueData, patentIdMap.get(0), task);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 消费者2:将专利从队列取出,著录项目入库
- */
- public void pushPatentZhuLuToDB() throws InterruptedException, IOException {
- while (true) {
- try {
- if (patentZhuluQueue.isEmpty()) {
- if (flag) {
- System.out.println("著录项目全部完成,退出循环");
- return;
- } else {
- patentZhuluLock.lock();
- patentZhuluCondition.await();
- patentZhuluLock.unlock();
- }
- } else {
- QueueData queueData = patentZhuluQueue.remove();
- //著录项目入库
- uploadPatentToDBService.uploadPatentZhulu(queueData.getUploadParamsVO());
- //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
- sendMessage(queueData, patentIdMap.get(0), task);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 消费者3:将专利从队列取出,权利要求文本入库
- */
- public void pushPatentRightToDB() throws InterruptedException, IOException {
- while (true) {
- try {
- if (patentRightQueue.isEmpty()) {
- if (flag) {
- System.out.println("权利要求全部完成,退出循环");
- return;
- } else {
- patentRightLock.lock();
- patentRightCondition.await();
- patentRightLock.unlock();
- }
- } else {
- QueueData queueData = patentRightQueue.remove();
- //权要文本入库
- uploadPatentToDBService.uploadPatentRight(queueData.getUploadParamsVO());
- //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
- sendMessage(queueData, patentIdMap.get(0), task);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 消费者4:将专利从队列取出,说明书文本入库
- */
- public void pushPatentInstructionTextToDB() throws InterruptedException, IOException {
- while (true) {
- try {
- if (patentInstructionTextQueue.isEmpty()) {
- if (flag) {
- System.out.println("说明书文本全部完成,退出循环");
- return;
- } else {
- patentInstructionTextLock.lock();
- patentInstructionTextCondition.await();
- patentInstructionTextLock.unlock();
- }
- } else {
- QueueData queueData = patentInstructionTextQueue.remove();
- //说明书文本入库
- uploadPatentToDBService.uploadPatentInstructionText(queueData.getUploadParamsVO());
- //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
- sendMessage(queueData, patentIdMap.get(0), task);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- /**
- * 消费者5:将专利从队列取出,与专利关联数据入库
- */
- public void pushPatentAssoToDB() throws InterruptedException, IOException {
- while (true) {
- try {
- if (patentAssoQueue.isEmpty()) {
- if (flag) {
- System.out.println("与专利关联的数据全部完成,退出循环");
- return;
- } else {
- patentAssoLock.lock();
- patentAssoCondition.await();
- patentAssoLock.unlock();
- }
- } else {
- QueueData queueData = patentAssoQueue.remove();
- //专题库与专利关联入库
- uploadPatentToDBService.uploadAssoThemaPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
- //自定义字段标引与专利关联入库
- uploadPatentToDBService.uploadAssoFieldPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
- //文件夹与专利关联入库
- uploadPatentToDBService.uploadAssoPorPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
- //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
- sendMessage(queueData, patentIdMap.get(0), task);
- }
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- public void sendMessage(QueueData queueData, Integer total, Task task) {
- //每完成一个专利,通过计算,发送进度
- synchronized ("此为同步锁") {
- Integer currentPatentId = queueData.getUploadParamsVO().getPatent().getId();
- Integer num = patentIdMap.get(currentPatentId);
- if (num == null) {
- patentIdMap.put(currentPatentId, 1);
- } else if (num < 4) {
- patentIdMap.put(currentPatentId, ++num);
- } else {
- //num达到3了就从patentIdMap中删除
- patentIdMap.remove(currentPatentId);
- patentFinishNum++;
- long percentage = Math.round((patentFinishNum + 1D) / total * 100D);
- //当全部完成时
- if (patentFinishNum.equals(total)) {
- percentage = 100L;
- //任务表更新状态为成功
- task.setStatus(2);
- task.setEndTime(DateUtils.getDateTime());
- taskService.updateById(task);
- System.out.println("结束时间:" + new Date());
- }
- messageService.sendWebsocketMessage(task, total, patentFinishNum, percentage);
- //任务表更新数据
- task.setSuccessNum(patentFinishNum);
- task.setDefaultNum(total - task.getSuccessNum());
- taskService.updateById(task);
- }
- }
- }
- public void queueAddTask(List<Integer> taskQueueList) {
- this.taskQueueList.addAll(taskQueueList);
- }
- public void awakeTasktch() {
- taskLock.lock();
- taskCondition.signalAll();
- taskLock.unlock();
- }
- }
|