|
|
@@ -31,13 +31,16 @@ import okhttp3.RequestBody;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.beans.factory.annotation.Value;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
+import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;
|
|
|
import reactor.core.publisher.Flux;
|
|
|
import reactor.core.publisher.FluxSink;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
+import java.time.Duration;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
+import java.util.concurrent.atomic.AtomicBoolean;
|
|
|
import java.util.regex.Matcher;
|
|
|
import java.util.regex.Pattern;
|
|
|
import java.util.stream.Collectors;
|
|
|
@@ -163,7 +166,23 @@ public class GenerateInstructionService {
|
|
|
}
|
|
|
String mainClaim = mainClaimStr;
|
|
|
String claimStrs = StringUtils.join(reMainPatentClaims, "\n");
|
|
|
- return Flux.create(emitter -> {
|
|
|
+
|
|
|
+ AtomicBoolean businessCompleted = new AtomicBoolean(false);
|
|
|
+ Flux<String> heartbeatFlux = Flux.interval(Duration.ofSeconds(5))
|
|
|
+ .map(tick -> {
|
|
|
+ // 每次发送心跳前检查业务状态
|
|
|
+ if (businessCompleted.get()) {
|
|
|
+ // 如果业务已完成,此消息理论上不会发出,因为流会被终止
|
|
|
+ return ":heartbeat-final\n\n";
|
|
|
+ }
|
|
|
+ return "event: ping";
|
|
|
+ })
|
|
|
+ .takeUntil(item -> businessCompleted.get()) // 关键修改:当业务完成时,自动停止心跳流
|
|
|
+ .doFinally(signal -> {
|
|
|
+ System.out.println("心跳流已停止,原因: " + signal);
|
|
|
+ });
|
|
|
+
|
|
|
+ Flux<String> businessFlux = Flux.create(emitter -> {
|
|
|
new Thread(() -> {
|
|
|
try {
|
|
|
|
|
|
@@ -215,10 +234,13 @@ public class GenerateInstructionService {
|
|
|
} catch (Exception e) {
|
|
|
e.printStackTrace();
|
|
|
} finally {
|
|
|
+ businessCompleted.set(true);
|
|
|
emitter.complete();
|
|
|
}
|
|
|
}).start();
|
|
|
});
|
|
|
+
|
|
|
+ return Flux.merge(heartbeatFlux, businessFlux);
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
@@ -287,11 +309,15 @@ public class GenerateInstructionService {
|
|
|
} else {
|
|
|
message = getFluxMessage(FIELD_TITLE, MESSAGE, generateTechnicalVO.getTitle());
|
|
|
}
|
|
|
+ System.out.println("message:"+message);
|
|
|
fluxSink.next(message);
|
|
|
+ System.out.println("message: 发送成功");
|
|
|
String endMessage = getFluxMessage(FIELD_TITLE, END, "");
|
|
|
fluxSink.next(endMessage);
|
|
|
+ System.out.println("message:"+endMessage);
|
|
|
String startMessage2 = getFluxMessage(FIELD_TECHNICAL, START, "");
|
|
|
fluxSink.next(startMessage2);
|
|
|
+ System.out.println("message:"+startMessage2);
|
|
|
String message2 = "";
|
|
|
if (ifError) {
|
|
|
message2 = getFluxMessage(FIELD_TECHNICAL, ERROR, "生成技术领域异常");
|
|
|
@@ -299,6 +325,7 @@ public class GenerateInstructionService {
|
|
|
message2 = getFluxMessage(FIELD_TECHNICAL, MESSAGE, generateTechnicalVO.getTechnical());
|
|
|
}
|
|
|
fluxSink.next(message2);
|
|
|
+ System.out.println("message:"+message2);
|
|
|
String endMessage2 = getFluxMessage(FIELD_TECHNICAL, END, "");
|
|
|
fluxSink.next(endMessage2);
|
|
|
|