package cn.cslg.pas.service.upLoadPatent; import cn.cslg.pas.common.model.vo.ProjectImportPatentVO; import cn.cslg.pas.common.model.vo.UploadParamsVO; import cn.cslg.pas.common.utils.DateUtils; import cn.cslg.pas.domain.QueueData; import cn.cslg.pas.domain.Task; import cn.cslg.pas.service.TaskService; import cn.cslg.pas.service.UploadPatentBatchService; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import lombok.Data; import lombok.RequiredArgsConstructor; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.*; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; /** * 1.生产专利(生产者从任务队列取出任务并解析任务生成专利数据,分配给消费者,唤醒消费者) * 2.消费专利(消费者从专利队列中取出专利并保存专利入库) */ @Service @RequiredArgsConstructor(onConstructor_ = {@Lazy}) public class PantentQueueService { private final UploadPatentToDBService uploadPatentToDBService; private final UploadPatentBatchService uploadPatentBatchService; private final MessageService messageService; private final TaskService taskService; private final ExcutePatentDataExcel excutePatentDataExcel; private final ExcutePatentDataEpo excutePatentDataEpo; private final List taskQueueList = new ArrayList<>(); private Boolean flag = false; private final List patentImageQueueList = new ArrayList<>(); private final List patentZhuluQueueList = new ArrayList<>(); private final List patentRightQueueList = new ArrayList<>(); private final List patentInstructionTextQueueList = new ArrayList<>(); private final Lock taskLock = new ReentrantLock(); private final Lock patentImageLock = new ReentrantLock(); private final Lock patentZhuluLock = new ReentrantLock(); private final Lock patentRightLock = new ReentrantLock(); private final Lock patentInstructionTextLock = new ReentrantLock(); private final Condition taskCondition = taskLock.newCondition(); private final Condition patentImageCondition = patentImageLock.newCondition(); private final Condition patentZhuluCondition = patentZhuluLock.newCondition(); private final Condition patentRightCondition = patentRightLock.newCondition(); private final Condition patentInstructionTextCondition = patentInstructionTextLock.newCondition(); private final HashMap patentIdMap = new HashMap<>(); /** * 生产者:从任务队列取出任务,再调用工厂方法根据任务类型返回对应的生产专利方法的对象将专利分配给消费者 */ public void addPatnetToQueue() { Task task = null; try { while (true) { //判断任务队列是否有任务,若没有则该生产者线程睡眠 if (taskQueueList.size() == 0) { taskLock.lock(); taskCondition.await(); } //线程被唤醒后 ↓ if (taskQueueList.size() > 0) { //1.从任务队列中取出一个task任务 //2.同时将其从任务队列中剔除 //3.查询任务,判断任务不存在或状态为已暂停,则直接跳过进行下一个任务 task = taskService.getById(taskQueueList.get(0)); taskQueueList.remove(0); if (task == null || task.getStatus() == 4) { continue; } //TODO 调用工厂方法并将任务扔进去,工厂方法会根据任务的类型创建并返回对应的生产专利方法的对象 //1.用工厂方法根据任务类型创建对应的获取专利数据的对象 IExcutePatentData excutePatentDataObject = createObject(task); //2.对象调用执行生产专利方法(解析任务生产专利并丢入消费者专利队列,唤醒消费者线程) if (excutePatentDataObject != null) { excutePatentDataObject.startExcute(task); } } } } catch (Exception e) { e.printStackTrace(); //任务表更新状态为失败 task.setStatus(3); task.setEndTime(DateUtils.getDateTime()); taskService.updateById(task); } flag = true; } /** * 消费者1:将专利从队列取出,摘要附图入库 */ public void pushPatentImageToDB() throws InterruptedException, IOException { while (true) { try { if (patentImageQueueList.isEmpty()) { if (flag) { System.out.println("摘要附图全部完成,退出循环"); return; } else { patentImageLock.lock(); patentImageCondition.await(); patentImageLock.unlock(); } } else { QueueData queueData = patentImageQueueList.remove(0); //摘要附图入库 uploadPatentToDBService.uploadPatentImage(queueData.getUploadParamsVO()); //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度 sendMessage(queueData); } } catch (Exception e) { e.printStackTrace(); } } } /** * 消费者2:将专利从队列取出,著录项目入库 */ public void pushPatentZhuLuToDB() throws InterruptedException, IOException { while (true) { try { if (patentZhuluQueueList.isEmpty()) { if (flag) { System.out.println("著录项目全部完成,退出循环"); return; } else { patentZhuluLock.lock(); patentZhuluCondition.await(); patentZhuluLock.unlock(); } } else { QueueData queueData = patentZhuluQueueList.remove(0); //著录项目入库 uploadPatentToDBService.uploadPatentZhulu(queueData.getUploadParamsVO()); //专题库与专利关联入库 uploadPatentToDBService.uploadAssoThemaPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO()); //自定义字段标引与专利关联入库 uploadPatentToDBService.uploadAssoFieldPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO()); //文件夹与专利关联入库 uploadPatentToDBService.uploadAssoPorPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO()); //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度 sendMessage(queueData); } } catch (Exception e) { e.printStackTrace(); } } } /** * 消费者3:将专利从队列取出,权利要求文本入库 */ public void pushPatentRightToDB() throws InterruptedException, IOException { while (true) { try { if (patentRightQueueList.isEmpty()) { if (flag) { System.out.println("权利要求全部完成,退出循环"); return; } else { patentRightLock.lock(); patentRightCondition.await(); patentRightLock.unlock(); } } else { QueueData queueData = patentRightQueueList.remove(0); //权要文本入库 uploadPatentToDBService.uploadPatentRight(queueData.getUploadParamsVO()); //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度 sendMessage(queueData); } } catch (Exception e) { e.printStackTrace(); } } } /** * 消费者4:将专利从队列取出,说明书文本入库 */ public void pushPatentInstructionTextToDB() throws InterruptedException, IOException { while (true) { try { if (patentInstructionTextQueueList.isEmpty()) { if (flag) { System.out.println("说明书文本全部完成,退出循环"); return; } else { patentInstructionTextLock.lock(); patentInstructionTextCondition.await(); patentInstructionTextLock.unlock(); } } else { QueueData queueData = patentInstructionTextQueueList.remove(0); //说明书文本入库 uploadPatentToDBService.uploadPatentInstructionText(queueData.getUploadParamsVO()); //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度 sendMessage(queueData); } } catch (Exception e) { e.printStackTrace(); } } } public void sendMessage(QueueData queueData) { //每完成一个专利,通过计算,发送进度 synchronized ("此为同步锁") { //当前进行的任务 Task task = queueData.getTask(); //当前进行的任务的专利总数量 Integer total = task.getTotal(); //将任务id + "|" + 专利id,拼接成专利的标识(作为当前某一个消费者消费完的这个专利的数量标识) String currentPatent = task.getId() + "|" + queueData.getUploadParamsVO().getPatent().getId(); Integer num = patentIdMap.get(currentPatent); if (num == null) { patentIdMap.put(currentPatent, 1); } else if (num < 3) { patentIdMap.put(currentPatent, ++num); } else { /* 1)若是看当前任务,表示当前这个任务这个专利的4个消费者全都消费结束,即当前这个任务的这个专利已完成 2)若是看下一个任务,表示下一个任务的著录项目消费者终于第一次进来了(即表示上一个任务最慢的著录项目消费者也结束了即上一个任务完成了),就将这下一个任务的状态改为 1进行中 */ if (task.getStatus() == 0) { task.setStatus(1); task.setStartTime(DateUtils.getDateTime()); taskService.updateById(task); } //num达到4了就在patentIdMap中将其删除 patentIdMap.remove(currentPatent); long percentage = Math.round((task.getSuccessNum() + 1D) / total * 100D); //任务表更新数据(这里只更新成功条数和失败条数,注意不能更新状态等其他信息) task.setSuccessNum(task.getSuccessNum() + 1); task.setDefaultNum(total - task.getSuccessNum()); Task updateTask = new Task(); updateTask.setId(task.getId()); updateTask.setSuccessNum(task.getSuccessNum()); updateTask.setDefaultNum(total - task.getSuccessNum()); taskService.updateById(updateTask); //当全部完成时 if (task.getSuccessNum().equals(total)) { percentage = 100L; //设置任务状态为成功 task.setStatus(2); //设置任务结束时间为当前时间 task.setEndTime(DateUtils.getDateTime()); taskService.updateById(task); } messageService.sendWebsocketMessage(task, total, task.getSuccessNum(), percentage); } } } public void patentToQueue(Task task, UploadParamsVO uploadParamsVO, ProjectImportPatentVO projectImportPatentVO) { //保存专利基础数据(专利表"os_patent") uploadPatentBatchService.getOneOrInsertOne(uploadParamsVO); QueueData queueData = new QueueData() .setTask(task) .setUploadParamsVO(uploadParamsVO) .setProjectImportPatentVO(projectImportPatentVO); //专利分别加入5个消费者队列 patentImageQueueList.add(queueData); patentZhuluQueueList.add(queueData); patentRightQueueList.add(queueData); patentInstructionTextQueueList.add(queueData); //通知消费者线程(5个消费者:摘要附图、著录项目、权利要求文本、说明书文本、与专利关联数据) //1.摘要附图 patentImageLock.lock(); patentImageCondition.signalAll(); patentImageLock.unlock(); //2.著录项目 patentZhuluLock.lock(); patentZhuluCondition.signalAll(); patentZhuluLock.unlock(); //3.权利要求文本 patentRightLock.lock(); patentRightCondition.signalAll(); patentRightLock.unlock(); //4.说明书文本 patentInstructionTextLock.lock(); patentInstructionTextCondition.signalAll(); patentInstructionTextLock.unlock(); } /** * 生产者任务队列新增任务ids * * @param taskQueueList 生产者任务队列 */ public void taskQueueAddTask(List taskQueueList) { this.taskQueueList.addAll(taskQueueList); } /** * 消费者专利队列剔除指定任务的所有专利 * * @param taskId 任务id */ public void consumerQueueDeleteTasks(Integer taskId) { //任务暂停时清除4个消费者专利队列中该任务的专利 this.patentImageQueueList.removeIf(queueData -> queueData.getTask().getId().equals(taskId)); this.patentZhuluQueueList.removeIf(queueData -> queueData.getTask().getId().equals(taskId)); this.patentRightQueueList.removeIf(queueData -> queueData.getTask().getId().equals(taskId)); this.patentInstructionTextQueueList.removeIf(queueData -> queueData.getTask().getId().equals(taskId)); //任务暂停时清除 patentIdMap(任务id + "|" + 专利id,拼接而成的专利的数量标识(作为当前某一个消费者消费完的这个专利的数量标识))中该任务的专利的数量标识 Iterator iterator = this.patentIdMap.keySet().iterator(); while (iterator.hasNext()) { String key = iterator.next(); if (key.contains(taskId + "")) { iterator.remove(); this.patentIdMap.remove(key); } } } /** * 唤醒生产者线程执 */ public void awakeTasktch() { taskLock.lock(); taskCondition.signalAll(); taskLock.unlock(); } /** * 工厂方法,根据任务类型创建对应的获取专利数据的对象 * * @param task 任务 * @return 返回获取专利数据的对象 */ private IExcutePatentData createObject(Task task) { //根据任务的类型创建并返回对应的解析获取专利数据的对象 switch (task.getType()) { case 1: //Excel导入专利任务 return excutePatentDataExcel; case 3: //Epo欧专局网站导入 return excutePatentDataEpo; default: return null; } } }