|
@@ -8,6 +8,7 @@ import cn.cslg.pas.domain.Task;
|
|
|
import cn.cslg.pas.service.TaskService;
|
|
|
import cn.cslg.pas.service.UploadPatentBatchService;
|
|
|
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
|
|
+import lombok.Data;
|
|
|
import lombok.RequiredArgsConstructor;
|
|
|
import org.springframework.context.annotation.Lazy;
|
|
|
import org.springframework.stereotype.Service;
|
|
@@ -33,23 +34,20 @@ public class PantentQueueService {
|
|
|
private final ExcutePatentDataEpo excutePatentDataEpo;
|
|
|
private final List<Integer> taskQueueList = new ArrayList<>();
|
|
|
private Boolean flag = false;
|
|
|
- private final Queue<QueueData> patentImageQueue = new LinkedList<>();
|
|
|
- private final Queue<QueueData> patentZhuluQueue = new LinkedList<>();
|
|
|
- private final Queue<QueueData> patentRightQueue = new LinkedList<>();
|
|
|
- private final Queue<QueueData> patentInstructionTextQueue = new LinkedList<>();
|
|
|
- private final Queue<QueueData> patentAssoQueue = new LinkedList<>();
|
|
|
+ private final List<QueueData> patentImageQueueList = new ArrayList<>();
|
|
|
+ private final List<QueueData> patentZhuluQueueList = new ArrayList<>();
|
|
|
+ private final List<QueueData> patentRightQueueList = new ArrayList<>();
|
|
|
+ private final List<QueueData> patentInstructionTextQueueList = new ArrayList<>();
|
|
|
private final Lock taskLock = new ReentrantLock();
|
|
|
private final Lock patentImageLock = new ReentrantLock();
|
|
|
private final Lock patentZhuluLock = new ReentrantLock();
|
|
|
private final Lock patentRightLock = new ReentrantLock();
|
|
|
private final Lock patentInstructionTextLock = new ReentrantLock();
|
|
|
- private final Lock patentAssoLock = new ReentrantLock();
|
|
|
private final Condition taskCondition = taskLock.newCondition();
|
|
|
private final Condition patentImageCondition = patentImageLock.newCondition();
|
|
|
private final Condition patentZhuluCondition = patentZhuluLock.newCondition();
|
|
|
private final Condition patentRightCondition = patentRightLock.newCondition();
|
|
|
private final Condition patentInstructionTextCondition = patentInstructionTextLock.newCondition();
|
|
|
- private final Condition patentAssoCondition = patentAssoLock.newCondition();
|
|
|
private final HashMap<String, Integer> patentIdMap = new HashMap<>();
|
|
|
|
|
|
/**
|
|
@@ -68,14 +66,18 @@ public class PantentQueueService {
|
|
|
//线程被唤醒后 ↓
|
|
|
if (taskQueueList.size() > 0) {
|
|
|
//1.从任务队列中取出一个task任务
|
|
|
+ //2.同时将其从任务队列中剔除
|
|
|
+ //3.查询任务,判断任务不存在或状态为已暂停,则直接跳过进行下一个任务
|
|
|
task = taskService.getById(taskQueueList.get(0));
|
|
|
- //同时将其从任务队列中剔除
|
|
|
taskQueueList.remove(0);
|
|
|
+ if (task == null || task.getStatus() == 4) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
|
|
|
- //TODO 调用工厂方法,将任务扔进去,工厂方法根据任务类型返回对应的生产专利方法对象
|
|
|
+ //TODO 调用工厂方法并将任务扔进去,工厂方法会根据任务的类型创建并返回对应的生产专利方法的对象
|
|
|
//1.用工厂方法根据任务类型创建对应的获取专利数据的对象
|
|
|
IExcutePatentData excutePatentDataObject = createObject(task);
|
|
|
- //2.开始执行生产专利方法(解析任务生产专利并丢入消费者专利队列,唤醒消费者线程)
|
|
|
+ //2.对象调用执行生产专利方法(解析任务生产专利并丢入消费者专利队列,唤醒消费者线程)
|
|
|
if (excutePatentDataObject != null) {
|
|
|
excutePatentDataObject.startExcute(task);
|
|
|
}
|
|
@@ -100,7 +102,7 @@ public class PantentQueueService {
|
|
|
public void pushPatentImageToDB() throws InterruptedException, IOException {
|
|
|
while (true) {
|
|
|
try {
|
|
|
- if (patentImageQueue.isEmpty()) {
|
|
|
+ if (patentImageQueueList.isEmpty()) {
|
|
|
if (flag) {
|
|
|
System.out.println("摘要附图全部完成,退出循环");
|
|
|
return;
|
|
@@ -110,11 +112,11 @@ public class PantentQueueService {
|
|
|
patentImageLock.unlock();
|
|
|
}
|
|
|
} else {
|
|
|
- QueueData queueData = patentImageQueue.remove();
|
|
|
+ QueueData queueData = patentImageQueueList.remove(0);
|
|
|
//摘要附图入库
|
|
|
uploadPatentToDBService.uploadPatentImage(queueData.getUploadParamsVO());
|
|
|
//Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
|
|
|
- sendMessage(queueData, queueData.getTask().getTotal(), queueData.getTask());
|
|
|
+ sendMessage(queueData);
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
@@ -131,7 +133,7 @@ public class PantentQueueService {
|
|
|
public void pushPatentZhuLuToDB() throws InterruptedException, IOException {
|
|
|
while (true) {
|
|
|
try {
|
|
|
- if (patentZhuluQueue.isEmpty()) {
|
|
|
+ if (patentZhuluQueueList.isEmpty()) {
|
|
|
if (flag) {
|
|
|
System.out.println("著录项目全部完成,退出循环");
|
|
|
return;
|
|
@@ -141,11 +143,17 @@ public class PantentQueueService {
|
|
|
patentZhuluLock.unlock();
|
|
|
}
|
|
|
} else {
|
|
|
- QueueData queueData = patentZhuluQueue.remove();
|
|
|
+ QueueData queueData = patentZhuluQueueList.remove(0);
|
|
|
//著录项目入库
|
|
|
uploadPatentToDBService.uploadPatentZhulu(queueData.getUploadParamsVO());
|
|
|
+ //专题库与专利关联入库
|
|
|
+ uploadPatentToDBService.uploadAssoThemaPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
|
|
|
+ //自定义字段标引与专利关联入库
|
|
|
+ uploadPatentToDBService.uploadAssoFieldPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
|
|
|
+ //文件夹与专利关联入库
|
|
|
+ uploadPatentToDBService.uploadAssoPorPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
|
|
|
//Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
|
|
|
- sendMessage(queueData, queueData.getTask().getTotal(), queueData.getTask());
|
|
|
+ sendMessage(queueData);
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
@@ -161,7 +169,7 @@ public class PantentQueueService {
|
|
|
public void pushPatentRightToDB() throws InterruptedException, IOException {
|
|
|
while (true) {
|
|
|
try {
|
|
|
- if (patentRightQueue.isEmpty()) {
|
|
|
+ if (patentRightQueueList.isEmpty()) {
|
|
|
if (flag) {
|
|
|
System.out.println("权利要求全部完成,退出循环");
|
|
|
return;
|
|
@@ -171,11 +179,11 @@ public class PantentQueueService {
|
|
|
patentRightLock.unlock();
|
|
|
}
|
|
|
} else {
|
|
|
- QueueData queueData = patentRightQueue.remove();
|
|
|
+ QueueData queueData = patentRightQueueList.remove(0);
|
|
|
//权要文本入库
|
|
|
uploadPatentToDBService.uploadPatentRight(queueData.getUploadParamsVO());
|
|
|
//Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
|
|
|
- sendMessage(queueData, queueData.getTask().getTotal(), queueData.getTask());
|
|
|
+ sendMessage(queueData);
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
@@ -191,7 +199,7 @@ public class PantentQueueService {
|
|
|
public void pushPatentInstructionTextToDB() throws InterruptedException, IOException {
|
|
|
while (true) {
|
|
|
try {
|
|
|
- if (patentInstructionTextQueue.isEmpty()) {
|
|
|
+ if (patentInstructionTextQueueList.isEmpty()) {
|
|
|
if (flag) {
|
|
|
System.out.println("说明书文本全部完成,退出循环");
|
|
|
return;
|
|
@@ -201,45 +209,11 @@ public class PantentQueueService {
|
|
|
patentInstructionTextLock.unlock();
|
|
|
}
|
|
|
} else {
|
|
|
- QueueData queueData = patentInstructionTextQueue.remove();
|
|
|
+ QueueData queueData = patentInstructionTextQueueList.remove(0);
|
|
|
//说明书文本入库
|
|
|
uploadPatentToDBService.uploadPatentInstructionText(queueData.getUploadParamsVO());
|
|
|
//Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
|
|
|
- sendMessage(queueData, queueData.getTask().getTotal(), queueData.getTask());
|
|
|
- }
|
|
|
-
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- /**
|
|
|
- * 消费者5:将专利从队列取出,与专利关联数据入库
|
|
|
- */
|
|
|
- public void pushPatentAssoToDB() throws InterruptedException, IOException {
|
|
|
- while (true) {
|
|
|
- try {
|
|
|
- if (patentAssoQueue.isEmpty()) {
|
|
|
- if (flag) {
|
|
|
- System.out.println("与专利关联的数据全部完成,退出循环");
|
|
|
- return;
|
|
|
- } else {
|
|
|
- patentAssoLock.lock();
|
|
|
- patentAssoCondition.await();
|
|
|
- patentAssoLock.unlock();
|
|
|
- }
|
|
|
- } else {
|
|
|
- QueueData queueData = patentAssoQueue.remove();
|
|
|
- //专题库与专利关联入库
|
|
|
- uploadPatentToDBService.uploadAssoThemaPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
|
|
|
- //自定义字段标引与专利关联入库
|
|
|
- uploadPatentToDBService.uploadAssoFieldPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
|
|
|
- //文件夹与专利关联入库
|
|
|
- uploadPatentToDBService.uploadAssoPorPat(queueData.getUploadParamsVO(), queueData.getProjectImportPatentVO());
|
|
|
- //Websocket发送message:通过WebSocket 在每一次循环结束后 向前端发送完成进度
|
|
|
- sendMessage(queueData, queueData.getTask().getTotal(), queueData.getTask());
|
|
|
+ sendMessage(queueData);
|
|
|
}
|
|
|
|
|
|
} catch (Exception e) {
|
|
@@ -249,54 +223,55 @@ public class PantentQueueService {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- public void sendMessage(QueueData queueData, Integer total, Task task) {
|
|
|
+ public void sendMessage(QueueData queueData) {
|
|
|
//每完成一个专利,通过计算,发送进度
|
|
|
synchronized ("此为同步锁") {
|
|
|
- //上一个任务的一个消费者结束,开始消费下一个任务的专利,即下一个任务的第一个消费者进来,将当前任务的状态改为4待进行
|
|
|
-// if (task.getStatus() == 0) {
|
|
|
-// task.setStatus(4);
|
|
|
-// taskService.updateById(task);
|
|
|
-// }
|
|
|
- Integer currentPatentId = queueData.getUploadParamsVO().getPatent().getId();
|
|
|
- Integer taskId = task.getId();
|
|
|
- String currentPatent = taskId + "|" + currentPatentId;
|
|
|
+ //当前进行的任务
|
|
|
+ Task task = queueData.getTask();
|
|
|
+ //当前进行的任务的专利总数量
|
|
|
+ Integer total = task.getTotal();
|
|
|
+
|
|
|
+ //将任务id + "|" + 专利id,拼接成专利的标识(作为当前某一个消费者消费完的这个专利的数量标识)
|
|
|
+ String currentPatent = task.getId() + "|" + queueData.getUploadParamsVO().getPatent().getId();
|
|
|
Integer num = patentIdMap.get(currentPatent);
|
|
|
if (num == null) {
|
|
|
patentIdMap.put(currentPatent, 1);
|
|
|
- } else if (num < 4) {
|
|
|
+ } else if (num < 3) {
|
|
|
patentIdMap.put(currentPatent, ++num);
|
|
|
} else {
|
|
|
- //下一个任务的第5个消费者进来(即表示上一个任务的5个消费者都结束了),就将这个任务的状态正式改为1进行中
|
|
|
+ /*
|
|
|
+ 1)若是看当前任务,表示当前这个任务这个专利的4个消费者全都消费结束,即当前这个任务的这个专利已完成
|
|
|
+ 2)若是看下一个任务,表示下一个任务的著录项目消费者终于第一次进来了(即表示上一个任务最慢的著录项目消费者也结束了即上一个任务完成了),就将这下一个任务的状态改为 1进行中
|
|
|
+ */
|
|
|
if (task.getStatus() == 0) {
|
|
|
task.setStatus(1);
|
|
|
task.setStartTime(DateUtils.getDateTime());
|
|
|
taskService.updateById(task);
|
|
|
}
|
|
|
- //num达到4了就从patentIdMap中删除
|
|
|
+ //num达到4了就在patentIdMap中将其删除
|
|
|
patentIdMap.remove(currentPatent);
|
|
|
long percentage = Math.round((task.getSuccessNum() + 1D) / total * 100D);
|
|
|
- //当全部完成时
|
|
|
- if (task.getSuccessNum().equals(total)) {
|
|
|
- percentage = 100L;
|
|
|
- }
|
|
|
- messageService.sendWebsocketMessage(task, total, task.getSuccessNum(), percentage);
|
|
|
- //任务表更新数据
|
|
|
+ //任务表更新数据(这里只更新成功条数和失败条数,注意不能更新状态等其他信息)
|
|
|
task.setSuccessNum(task.getSuccessNum() + 1);
|
|
|
task.setDefaultNum(total - task.getSuccessNum());
|
|
|
- taskService.updateById(task);
|
|
|
+ Task updateTask = new Task();
|
|
|
+ updateTask.setId(task.getId());
|
|
|
+ updateTask.setSuccessNum(task.getSuccessNum());
|
|
|
+ updateTask.setDefaultNum(total - task.getSuccessNum());
|
|
|
+ taskService.updateById(updateTask);
|
|
|
+
|
|
|
//当全部完成时
|
|
|
if (task.getSuccessNum().equals(total)) {
|
|
|
+ percentage = 100L;
|
|
|
//设置任务状态为成功
|
|
|
task.setStatus(2);
|
|
|
//设置任务结束时间为当前时间
|
|
|
task.setEndTime(DateUtils.getDateTime());
|
|
|
taskService.updateById(task);
|
|
|
-// //判断若没有进行中的任务则唤醒生产者线程
|
|
|
-// List<Task> tasks = taskService.list(new LambdaQueryWrapper<Task>().eq(Task::getStatus, 1));
|
|
|
-// if (tasks == null || tasks.size() == 0) {
|
|
|
-// awakeTasktch();
|
|
|
-// }
|
|
|
}
|
|
|
+
|
|
|
+ messageService.sendWebsocketMessage(task, total, task.getSuccessNum(), percentage);
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -305,44 +280,62 @@ public class PantentQueueService {
|
|
|
//保存专利基础数据(专利表"os_patent")
|
|
|
uploadPatentBatchService.getOneOrInsertOne(uploadParamsVO);
|
|
|
|
|
|
- //专利分别加入5个消费者队列
|
|
|
+
|
|
|
QueueData queueData = new QueueData()
|
|
|
.setTask(task)
|
|
|
.setUploadParamsVO(uploadParamsVO)
|
|
|
.setProjectImportPatentVO(projectImportPatentVO);
|
|
|
- patentImageQueue.add(queueData);
|
|
|
- patentZhuluQueue.add(queueData);
|
|
|
- patentRightQueue.add(queueData);
|
|
|
- patentInstructionTextQueue.add(queueData);
|
|
|
- patentAssoQueue.add(queueData);
|
|
|
+
|
|
|
+ //专利分别加入5个消费者队列
|
|
|
+ patentImageQueueList.add(queueData);
|
|
|
+ patentZhuluQueueList.add(queueData);
|
|
|
+ patentRightQueueList.add(queueData);
|
|
|
+ patentInstructionTextQueueList.add(queueData);
|
|
|
|
|
|
//通知消费者线程(5个消费者:摘要附图、著录项目、权利要求文本、说明书文本、与专利关联数据)
|
|
|
- //消费者1摘要附图
|
|
|
+ //1.摘要附图
|
|
|
patentImageLock.lock();
|
|
|
patentImageCondition.signalAll();
|
|
|
patentImageLock.unlock();
|
|
|
- //消费者2著录项目
|
|
|
+ //2.著录项目
|
|
|
patentZhuluLock.lock();
|
|
|
patentZhuluCondition.signalAll();
|
|
|
patentZhuluLock.unlock();
|
|
|
- //消费者3权利要求文本
|
|
|
+ //3.权利要求文本
|
|
|
patentRightLock.lock();
|
|
|
patentRightCondition.signalAll();
|
|
|
patentRightLock.unlock();
|
|
|
- //消费者4说明书文本
|
|
|
+ //4.说明书文本
|
|
|
patentInstructionTextLock.lock();
|
|
|
patentInstructionTextCondition.signalAll();
|
|
|
patentInstructionTextLock.unlock();
|
|
|
- //消费者5与专利关联数据
|
|
|
- patentAssoLock.lock();
|
|
|
- patentAssoCondition.signalAll();
|
|
|
- patentAssoLock.unlock();
|
|
|
}
|
|
|
|
|
|
- public void queueAddTask(List<Integer> taskQueueList) {
|
|
|
+ /**
|
|
|
+ * 生产者任务队列新增任务ids
|
|
|
+ *
|
|
|
+ * @param taskQueueList 生产者任务队列
|
|
|
+ */
|
|
|
+ public void taskQueueAddTask(List<Integer> taskQueueList) {
|
|
|
this.taskQueueList.addAll(taskQueueList);
|
|
|
}
|
|
|
|
|
|
+ /**
|
|
|
+ * 消费者专利队列剔除指定任务的所有专利
|
|
|
+ *
|
|
|
+ * @param taskId 任务id
|
|
|
+ */
|
|
|
+ public void consumerQueueDeleteTasks(Integer taskId) {
|
|
|
+ this.patentImageQueueList.removeIf(queueData -> queueData.getTask().getId().equals(taskId));
|
|
|
+ this.patentZhuluQueueList.removeIf(queueData -> queueData.getTask().getId().equals(taskId));
|
|
|
+ this.patentRightQueueList.removeIf(queueData -> queueData.getTask().getId().equals(taskId));
|
|
|
+ this.patentInstructionTextQueueList.removeIf(queueData -> queueData.getTask().getId().equals(taskId));
|
|
|
+ this.patentIdMap.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 唤醒生产者线程执
|
|
|
+ */
|
|
|
public void awakeTasktch() {
|
|
|
taskLock.lock();
|
|
|
taskCondition.signalAll();
|
|
@@ -369,4 +362,5 @@ public class PantentQueueService {
|
|
|
|
|
|
}
|
|
|
|
|
|
+
|
|
|
}
|