package cn.cslg.pas.service.upLoadPatent; import cn.cslg.pas.common.model.vo.ProjectImportPatentVO; import cn.cslg.pas.common.model.vo.UploadParamsVO; import cn.cslg.pas.common.model.vo.UploadSettingVO; import cn.cslg.pas.common.utils.DateUtils; import cn.cslg.pas.common.utils.FileUtils; import cn.cslg.pas.common.utils.JsonUtils; import cn.cslg.pas.common.utils.ReadExcelUtils; import cn.cslg.pas.domain.PatentData; import cn.cslg.pas.domain.QueueData; import cn.cslg.pas.domain.Task; import cn.cslg.pas.service.TaskService; import cn.cslg.pas.service.UploadPatentBatchService; import lombok.RequiredArgsConstructor; import org.apache.poi.ss.usermodel.Sheet; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 将专利信息存入队列或从队列取出 */ @Service @RequiredArgsConstructor(onConstructor_ = {@Lazy}) public class PantentQueueService { private final ExcuteDataToVOService excuteDataToVOService; private final ExcuteUploadSettingService excuteUploadSettingService; private final UploadPatentToDBService uploadPatentToDBService; private final UploadPatentBatchService uploadPatentBatchService; private final FileUtils fileUtils; private final MessageService messageService; private final TaskService taskService; private Queue patentImageQueue = new LinkedList<>(); private Queue patentZhuluQueue = new LinkedList<>(); private Queue patentRightQueue = new LinkedList<>(); private Queue patentInstructionTextQueue = new LinkedList<>(); private Queue patentAssoQueue = new LinkedList<>(); private List taskQueueList = new ArrayList<>(); private Boolean flag = false; private Lock taskLock = new ReentrantLock(); private Lock patentImageLock = new ReentrantLock(); private Lock patentZhuluLock = new ReentrantLock(); private Lock patentRightLock = new ReentrantLock(); private Lock patentInstructionTextLock = new ReentrantLock(); private Lock patentAssoLock = new ReentrantLock(); Condition taskCondition = taskLock.newCondition(); private Condition patentImageCondition = patentImageLock.newCondition(); private Condition patentZhuluCondition = patentZhuluLock.newCondition(); private Condition patentRightCondition = patentRightLock.newCondition(); private Condition patentInstructionTextCondition = patentInstructionTextLock.newCondition(); private Condition patentAssoCondition = patentAssoLock.newCondition(); private HashMap patentIdMap = new HashMap<>(); private Task task = null; private Integer patentFinishNum = 0; /** * 生产者:从任务队列取出任务解析成专利实体类,分配给三个消费者队列 */ public void addPatnetToQueue() { try { while (true) { //检查任务队列 if (taskQueueList.size() == 0) { taskLock.lock(); taskCondition.await(); } //1.从taskQueueList中取出第一个task,将其状态改为进行中,并将其从任务队列taskQueueList中删除 task = taskService.getById(taskQueueList.get(0)); task.setStatus(1); taskService.updateById(task); taskQueueList.remove(0); //从任务中取出文件路径、总条数、成功条数、前台参数json String filePath = task.getUrl(); //相对路径 filePath = fileUtils.getPath(filePath); //绝对路径 Integer total = task.getTotal(); patentIdMap.put(0, total); int lastIndex = task.getSuccessNum(); String json = task.getPramJson(); ProjectImportPatentVO projectImportPatentVO = JsonUtils.jsonToPojo(json, ProjectImportPatentVO.class); //解析数据源类,通过数据来源id(如1:智慧芽)解析数据源配置文件,获得数据源配置文件对象jsonData List jsonData = excuteUploadSettingService.ExcuteUploadSetting(projectImportPatentVO.getSourceId()); //解析Excel文件获得工作簿 Sheet sheet = ReadExcelUtils.readExcel(filePath); //遍历专利总数量,在循环中保存专利 for (int i = lastIndex; i < total; i++) { PatentData patentData = ReadExcelUtils.readExcelOneRow(filePath, sheet, i + 1); //调用装载数据类,专利数据转换为VO对象 UploadParamsVO uploadParamsVO = excuteDataToVOService.fileToPatentVO(patentData, jsonData); //保存专利基础数据(专利表"os_patent") uploadPatentBatchService.getOneOrInsertOne(uploadParamsVO); //专利分别加入5个消费者队列 QueueData queueData = new QueueData() .setUploadParamsVO(uploadParamsVO) .setProjectImportPatentVO(projectImportPatentVO); patentImageQueue.add(queueData); patentZhuluQueue.add(queueData); patentRightQueue.add(queueData); patentInstructionTextQueue.add(queueData); patentAssoQueue.add(queueData); //通知消费者线程(5个消费者:摘要附图、著录项目、权利要求文本、说明书文本、与专利关联数据) //消费者1摘要附图 patentImageLock.lock(); patentImageCondition.signalAll(); patentImageLock.unlock(); //消费者2著录项目 patentZhuluLock.lock(); patentZhuluCondition.signalAll(); patentZhuluLock.unlock(); //消费者3权利要求文本 patentRightLock.lock(); patentRightCondition.signalAll(); patentRightLock.unlock(); //消费者4说明书文本 patentInstructionTextLock.lock(); patentInstructionTextCondition.signalAll(); patentInstructionTextLock.unlock(); //消费者5与专利关联数据 patentAssoLock.lock(); patentAssoCondition.signalAll(); patentAssoLock.unlock(); } } } catch (Exception e) { e.printStackTrace(); //任务表更新状态为失败 task.setStatus(3); task.setEndTime(DateUtils.getDateTime()); taskService.updateById(task); } flag = true; } /** * 消费者1:将专利从队列取出,摘要附图入库 */ public void pushPatentImageToDB() throws InterruptedException, IOException { while (true) { try { if (patentImageQueue.isEmpty()) { if (flag) { System.out.println("摘要附图全部完成,退出循环"); return; } else { patentImageLock.lock(); patentImageCondition.await(); patentImageLock.unlock(); } } else { QueueData queueData = patentImageQueue.remove(); //摘要附图入库 uploadPatentToDBService.uploadPatentImage(queueData.getUploadParamsVO()); //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度 sendMessage(queueData, patentIdMap.get(0), task); } } catch (Exception e) { e.printStackTrace(); } } } /** * 消费者2:将专利从队列取出,著录项目入库 */ public void pushPatentZhuLuToDB() throws InterruptedException, IOException { while (true) { try { if (patentZhuluQueue.isEmpty()) { if (flag) { System.out.println("著录项目全部完成,退出循环"); return; } else { patentZhuluLock.lock(); patentZhuluCondition.await(); patentZhuluLock.unlock(); } } else { QueueData queueData = patentZhuluQueue.remove(); //著录项目入库 uploadPatentToDBService.uploadPatentZhulu(queueData.getUploadParamsVO()); //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度 sendMessage(queueData, patentIdMap.get(0), task); } } catch (Exception e) { e.printStackTrace(); } } } /** * 消费者3:将专利从队列取出,权利要求文本入库 */ public void pushPatentRightToDB() throws InterruptedException, IOException { while (true) { try { if (patentRightQueue.isEmpty()) { if (flag) { System.out.println("权利要求全部完成,退出循环"); return; } else { patentRightLock.lock(); patentRightCondition.await(); patentRightLock.unlock(); } } else { QueueData queueData = patentRightQueue.remove(); //权要文本入库 uploadPatentToDBService.uploadPatentRight(queueData.getUploadParamsVO()); //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度 sendMessage(queueData, patentIdMap.get(0), task); } } catch (Exception e) { e.printStackTrace(); } } } /** * 消费者4:将专利从队列取出,说明书文本入库 */ public void pushPatentInstructionTextToDB() throws InterruptedException, IOException { while (true) { try { if (patentInstructionTextQueue.isEmpty()) { if (flag) { System.out.println("说明书文本全部完成,退出循环"); return; } else { patentInstructionTextLock.lock(); patentInstructionTextCondition.await(); patentInstructionTextLock.unlock(); } } else { QueueData queueData = patentInstructionTextQueue.remove(); //说明书文本入库 uploadPatentToDBService.uploadPatentInstructionText(queueData.getUploadParamsVO()); //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度 sendMessage(queueData, patentIdMap.get(0), task); } } catch (Exception e) { e.printStackTrace(); } } } /** * 消费者5:将专利从队列取出,与专利关联数据入库 */ public void pushPatentAssoToDB() throws InterruptedException, IOException { while (true) { try { if (patentAssoQueue.isEmpty()) { if (flag) { System.out.println("与专利关联的数据全部完成,退出循环"); return; } else { patentAssoLock.lock(); patentAssoCondition.await(); patentAssoLock.unlock(); } } else { QueueData queueData = patentAssoQueue.remove(); //专题库与专利关联入库 uploadPatentToDBService.uploadAssoThemaPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO()); //自定义字段标引与专利关联入库 uploadPatentToDBService.uploadAssoFieldPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO()); //文件夹与专利关联入库 uploadPatentToDBService.uploadAssoPorPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO()); //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度 sendMessage(queueData, patentIdMap.get(0), task); } } catch (Exception e) { e.printStackTrace(); } } } public void sendMessage(QueueData queueData, Integer total, Task task) { //每完成一个专利,通过计算,发送进度 synchronized ("此为同步锁") { Integer currentPatentId = queueData.getUploadParamsVO().getPatent().getId(); Integer num = patentIdMap.get(currentPatentId); if (num == null) { patentIdMap.put(currentPatentId, 1); } else if (num < 4) { patentIdMap.put(currentPatentId, ++num); } else { //num达到3了就从patentIdMap中删除 patentIdMap.remove(currentPatentId); patentFinishNum++; long percentage = Math.round((patentFinishNum + 1D) / total * 100D); //当全部完成时 if (patentFinishNum.equals(total)) { percentage = 100L; //任务表更新状态为成功 task.setStatus(2); task.setEndTime(DateUtils.getDateTime()); taskService.updateById(task); System.out.println("结束时间:" + new Date()); } messageService.sendWebsocketMessage(task, total, patentFinishNum, percentage); //任务表更新数据 task.setSuccessNum(patentFinishNum); task.setDefaultNum(total - task.getSuccessNum()); taskService.updateById(task); } } } public void queueAddTask(List taskQueueList) { this.taskQueueList.addAll(taskQueueList); } public void awakeTasktch() { taskLock.lock(); taskCondition.signalAll(); taskLock.unlock(); } }