EsCountService.java 23 KB


  1. package cn.cslg.pas.service.business.es;
  2. import cn.cslg.pas.common.dto.business.EsCountDTO;
  3. import cn.cslg.pas.common.dto.business.EsCountDetailDTO;
  4. import cn.cslg.pas.common.utils.parseQueryToTree.expressManager;
  5. import cn.cslg.pas.common.utils.parseQueryToTree.operateNode;
  6. import cn.cslg.pas.common.utils.parseQueryToTree.treeNode;
  7. import cn.cslg.pas.common.vo.EsConfigVO;
  8. import cn.cslg.pas.common.vo.business.EsCountVO;
  9. import cn.cslg.pas.domain.es.Patent;
  10. import cn.cslg.pas.factorys.EsAnalysisBuilderFactory.EsAnalysisBuilderFactory;
  11. import cn.cslg.pas.factorys.EsAnalysisBuilderFactory.IEsAnalysisBuilder;
  12. import cn.cslg.pas.factorys.EsCountBuilderFactory.EsCountBuilderFactory;
  13. import cn.cslg.pas.factorys.EsCountBuilderFactory.IEsCountBuilder;
  14. import cn.cslg.pas.service.business.CommonService;
  15. import cn.cslg.pas.service.query.FormatQueryService;
  16. import cn.hutool.json.JSONUtil;
  17. import co.elastic.clients.elasticsearch.ElasticsearchClient;
  18. import co.elastic.clients.elasticsearch._types.aggregations.*;
  19. import co.elastic.clients.elasticsearch._types.query_dsl.Query;
  20. import co.elastic.clients.elasticsearch._types.query_dsl.QueryBuilders;
  21. import co.elastic.clients.elasticsearch.core.SearchRequest;
  22. import co.elastic.clients.elasticsearch.core.SearchResponse;
  23. import com.alibaba.fastjson.JSON;
  24. import lombok.RequiredArgsConstructor;
  25. import org.apache.commons.lang3.StringUtils;
  26. import org.springframework.beans.factory.annotation.Autowired;
  27. import org.springframework.context.annotation.Lazy;
  28. import org.springframework.stereotype.Service;
  29. import org.springframework.util.CollectionUtils;
  30. import java.util.*;
  31. import java.util.stream.Collectors;
  32. @Service
  33. @RequiredArgsConstructor(onConstructor_ = {@Lazy})
  34. public class EsCountService {
  35. private final List<String> childList = Arrays.asList("field");
  36. private final List<String> nestedList = Arrays.asList("PA", "IN", "PE", "SAT", "MAT", "SRH", "MRH");
  37. private final List<String> dateList = Arrays.asList("PD", "AD", "GD");
  38. private final List<String> numberList = Arrays.asList("QPN", "QDPN", "SFN", "IFN", "PFN");
  39. private final ElasticsearchClient client;
  40. @Autowired
  41. private EsCountBuilderFactory esCountBuilderFactory;
  42. @Autowired
  43. private EsAnalysisBuilderFactory esAnalysisBuilderFactory;
  44. @Autowired
  45. private FormatQueryService formatQueryService;
  46. /**
  47. * 查询专利库中的专利分组聚合统计
  48. *
  49. * @param countVOS
  50. * @return
  51. * @throws Exception
  52. */
  53. public EsCountDTO esCountSearch(List<EsCountVO> countVOS) throws Exception {
  54. EsCountDTO esCountDTO = new EsCountDTO();
  55. List<EsCountDetailDTO> detailDTOS = new ArrayList<>();
  56. for (EsCountVO vo : countVOS) {
  57. String field = vo.getField();
  58. Integer topN = vo.getTopN();
  59. String condition = vo.getCondition();
  60. //查询es返回数据
  61. SearchRequest.Builder builder = new SearchRequest.Builder();
  62. Aggregation aggregation = this.selectAggregation(builder, vo);
  63. if (StringUtils.isNotEmpty(condition)) {
  64. //解析检索条件
  65. treeNode tree = expressManager.getInstance().Parse(condition, false);
  66. //从es中检索数据
  67. Query query = formatQueryService.EsQueryToQuery((operateNode) tree, "patent");
  68. Aggregation filtersAgg = null;
  69. if (aggregation != null) {
  70. filtersAgg = new Aggregation.Builder().filters(new FiltersAggregation.Builder()
  71. .filters(i -> i.array(Arrays.asList(query))).build())
  72. .aggregations(new HashMap() {{
  73. put("filters_agg", aggregation);
  74. }}).build();
  75. } else {
  76. filtersAgg = AggregationBuilders.filter(i -> i.bool(j -> j.must(query)));
  77. }
  78. builder.aggregations("Agg", filtersAgg);
  79. } else {
  80. builder.aggregations("Agg", aggregation);
  81. }
  82. SearchResponse<Patent> response = client.search(builder.build(), Patent.class);
  83. Aggregate agg = response.aggregations().get("Agg");
  84. if (StringUtils.isNotEmpty(condition)) {
  85. if (StringUtils.isNotEmpty(field)) {
  86. List<FiltersBucket> filtersBuckets = agg.filters().buckets().array();
  87. if (dateList.contains(field)) {
  88. filtersBuckets.forEach(filtersBucket -> {
  89. this.getFiltersCountDTO(filtersBucket, condition, detailDTOS);
  90. Aggregate filtersAgg = filtersBucket.aggregations().get("filters_agg");
  91. this.getDateCountDTOS(filtersAgg, field, topN, detailDTOS);
  92. });
  93. } else if (nestedList.contains(field)) {
  94. filtersBuckets.forEach(filtersBucket -> {
  95. this.getFiltersCountDTO(filtersBucket, condition, detailDTOS);
  96. Aggregate filtersAgg = filtersBucket.aggregations().get("filters_agg");
  97. this.getNestedCountDTOS(filtersAgg, field, detailDTOS);
  98. });
  99. } else if (childList.contains(field)) {
  100. filtersBuckets.forEach(filtersBucket -> {
  101. this.getFiltersCountDTO(filtersBucket, condition, detailDTOS);
  102. Aggregate filtersAgg = filtersBucket.aggregations().get("filters_agg");
  103. this.getChildCountDTOS(filtersAgg, field, detailDTOS);
  104. });
  105. } else {
  106. filtersBuckets.forEach(filtersBucket -> {
  107. this.getFiltersCountDTO(filtersBucket, condition, detailDTOS);
  108. Aggregate filtersAgg = filtersBucket.aggregations().get("filters_agg");
  109. this.getTermCountDTOS(filtersAgg, field, detailDTOS);
  110. });
  111. }
  112. } else {
  113. this.getFilterCountDTO(agg, condition, detailDTOS);
  114. }
  115. } else {
  116. if (dateList.contains(field)) {
  117. this.getDateCountDTOS(agg, field, topN, detailDTOS);
  118. } else if (nestedList.contains(field)) {
  119. this.getNestedCountDTOS(agg, field, detailDTOS);
  120. } else if (childList.contains(field)) {
  121. this.getChildCountDTOS(agg, field, detailDTOS);
  122. } else {
  123. this.getTermCountDTOS(agg, field, detailDTOS);
  124. }
  125. }
  126. }
  127. esCountDTO.setDetailDTOS(detailDTOS);
  128. return esCountDTO;
  129. }
  130. /**
  131. * 专利的聚合分析
  132. *
  133. * @param countVOS
  134. * @return
  135. * @throws Exception
  136. */
  137. public EsCountDTO esAnalysisSearch(List<EsCountVO> countVOS) throws Exception {
  138. EsCountDTO esCountDTO = new EsCountDTO();
  139. List<EsCountDetailDTO> detailDTOS = new ArrayList<>();
  140. for (EsCountVO vo : countVOS) {
  141. String field = vo.getField();
  142. Integer topN = vo.getTopN();
  143. Boolean ifHaveChild = vo.getIfHaveChild();
  144. String fieldId = vo.getFieldId();
  145. List<String> values = vo.getValues();
  146. //查询es返回数据
  147. SearchRequest.Builder builder = new SearchRequest.Builder();
  148. builder.index("patent");
  149. IEsAnalysisBuilder iEsAnalysisBuilder = null;
  150. String json = CommonService.readJsonFile("esAnalysis.json");
  151. List<EsConfigVO> esConfigVOS = JSON.parseArray(json, EsConfigVO.class);
  152. EsConfigVO esConfigVO = esConfigVOS.stream().filter(item -> item.getField().equals(field))
  153. .findFirst().orElse(null);
  154. if (esConfigVO != null) {
  155. iEsAnalysisBuilder = esAnalysisBuilderFactory.getClass(esConfigVO.getEsClass());
  156. iEsAnalysisBuilder.setField(esConfigVO.getEsField());
  157. iEsAnalysisBuilder.setTopN(topN);
  158. iEsAnalysisBuilder.setIfHaveChild(ifHaveChild);
  159. iEsAnalysisBuilder.setFieldValue(fieldId);
  160. if (iEsAnalysisBuilder.getField().contains(".")) {
  161. String path = iEsAnalysisBuilder.getField().substring(0, iEsAnalysisBuilder.getField().indexOf("."));
  162. iEsAnalysisBuilder.setPath(path);
  163. }
  164. iEsAnalysisBuilder.setValues(values);
  165. Aggregation aggregation = iEsAnalysisBuilder.createAnalyseAgg();
  166. builder.aggregations("Agg", aggregation);
  167. SearchResponse<Patent> response = client.search(builder.build(), Patent.class);
  168. Aggregate agg = response.aggregations().get("Agg");
  169. if (dateList.contains(field)) {
  170. this.getDateAnalysisDTOS(agg, field, topN, detailDTOS);
  171. } else if (nestedList.contains(field)) {
  172. this.getNestedCountDTOS(agg, field, detailDTOS);
  173. } else if (childList.contains(field)) {
  174. this.getChildAnalysisDTOS(agg, field, detailDTOS);
  175. } else if (numberList.contains(field)) {
  176. this.getNumberAnalysisDTOS(agg, field,detailDTOS);
  177. } else {
  178. this.getTermCountDTOS(agg, field, detailDTOS);
  179. }
  180. }
  181. }
  182. esCountDTO.setDetailDTOS(detailDTOS);
  183. return esCountDTO;
  184. }
  185. //测试用的
  186. public EsCountDTO esAnalysis(EsCountVO vo) throws Exception {
  187. EsCountDTO esCountDTO = new EsCountDTO();
  188. List<EsCountDetailDTO> detailDTOS = new ArrayList<>();
  189. String field = vo.getField();
  190. Integer topN = vo.getTopN();
  191. Boolean ifHaveChild = vo.getIfHaveChild();
  192. String fieldId = vo.getFieldId();
  193. List<String> values = vo.getValues();
  194. //查询es返回数据
  195. SearchRequest.Builder builder = new SearchRequest.Builder();
  196. builder.index("patent");
  197. IEsAnalysisBuilder iEsAnalysisBuilder = null;
  198. String json = CommonService.readJsonFile("esAnalysis.json");
  199. List<EsConfigVO> esConfigVOS = JSON.parseArray(json, EsConfigVO.class);
  200. EsConfigVO esConfigVO = esConfigVOS.stream().filter(item -> item.getField().equals(field))
  201. .findFirst().orElse(null);
  202. if (esConfigVO != null) {
  203. iEsAnalysisBuilder = esAnalysisBuilderFactory.getClass(esConfigVO.getEsClass());
  204. iEsAnalysisBuilder.setField(esConfigVO.getEsField());
  205. iEsAnalysisBuilder.setTopN(topN);
  206. iEsAnalysisBuilder.setIfHaveChild(ifHaveChild);
  207. iEsAnalysisBuilder.setFieldValue(fieldId);
  208. if (iEsAnalysisBuilder.getField().contains(".")) {
  209. String path = iEsAnalysisBuilder.getField().substring(0, iEsAnalysisBuilder.getField().indexOf("."));
  210. iEsAnalysisBuilder.setPath(path);
  211. }
  212. iEsAnalysisBuilder.setValues(values);
  213. Aggregation aggregation = iEsAnalysisBuilder.createAnalyseAgg();
  214. builder.aggregations("Agg", aggregation);
  215. SearchResponse<Patent> response = client.search(builder.build(), Patent.class);
  216. Aggregate agg = response.aggregations().get("Agg");
  217. if (dateList.contains(field)) {
  218. this.getDateAnalysisDTOS(agg, field, topN, detailDTOS);
  219. } else if (nestedList.contains(field)) {
  220. this.getNestedCountDTOS(agg, field, detailDTOS);
  221. } else if (childList.contains(field)) {
  222. this.getChildAnalysisDTOS(agg, field, detailDTOS);
  223. } else if (numberList.contains(field)) {
  224. this.getNumberAnalysisDTOS(agg, field,detailDTOS);
  225. } else {
  226. this.getTermCountDTOS(agg, field, detailDTOS);
  227. }
  228. }
  229. esCountDTO.setDetailDTOS(detailDTOS);
  230. return esCountDTO;
  231. }
  232. /**
  233. * 获取聚合后的aggregation
  234. *
  235. * @param builder
  236. * @param vo
  237. * @return
  238. * @throws Exception
  239. */
  240. public Aggregation selectAggregation(SearchRequest.Builder builder, EsCountVO vo) throws Exception {
  241. String valueOne = vo.getValueOne();
  242. String valueTwo = vo.getValueTwo();
  243. Integer topN = vo.getTopN();
  244. Boolean ifHaveChild = vo.getIfHaveChild();
  245. String field = vo.getField();
  246. Integer fieldType = vo.getFieldType();
  247. String fieldId = vo.getFieldId();
  248. Aggregation aggregation = null;
  249. builder.index("patent");
  250. IEsCountBuilder iEsCountBuilder = null;
  251. String json = CommonService.readJsonFile("esCount.json");
  252. List<EsConfigVO> esConfigVOS = JSON.parseArray(json, EsConfigVO.class);
  253. EsConfigVO esConfigVO = esConfigVOS.stream().filter(item -> item.getField().equals(field)).findFirst().orElse(null);
  254. if (esConfigVO != null) {
  255. iEsCountBuilder = esCountBuilderFactory.getClass(esConfigVO.getEsClass());
  256. iEsCountBuilder.setField(esConfigVO.getEsField());
  257. iEsCountBuilder.setFieldId(fieldId);
  258. iEsCountBuilder.setValueOne(valueOne);
  259. iEsCountBuilder.setValueTwo(valueTwo);
  260. iEsCountBuilder.setTopN(topN);
  261. iEsCountBuilder.setIfHaveChild(ifHaveChild);
  262. iEsCountBuilder.setFieldType(String.valueOf(fieldType));
  263. if (iEsCountBuilder.getField().contains(".")) {
  264. String path = iEsCountBuilder.getField().substring(0, iEsCountBuilder.getField().indexOf("."));
  265. iEsCountBuilder.setPath(path);
  266. }
  267. aggregation = iEsCountBuilder.createAggregation();
  268. }
  269. return aggregation;
  270. }
  271. /**
  272. * 获取Filter聚合返回数据
  273. *
  274. * @param agg
  275. * @param condition
  276. * @return
  277. */
  278. public void getFilterCountDTO(Aggregate agg, String condition, List<EsCountDetailDTO> detailDTOS) {
  279. EsCountDetailDTO filterDTO = new EsCountDetailDTO();
  280. filterDTO.setField("condition");
  281. filterDTO.setName(condition);
  282. filterDTO.setNumber(agg.filter().docCount());
  283. if (filterDTO.getNumber() > 0) {
  284. detailDTOS.add(filterDTO);
  285. }
  286. }
  287. /**
  288. * 获取Filters聚合返回数据
  289. *
  290. * @param filtersBucket
  291. * @param condition
  292. * @return
  293. */
  294. public void getFiltersCountDTO(FiltersBucket filtersBucket, String condition, List<EsCountDetailDTO> detailDTOS) {
  295. EsCountDetailDTO filtersDTO = new EsCountDetailDTO();
  296. filtersDTO.setField("condition");
  297. filtersDTO.setName(condition);
  298. filtersDTO.setNumber(filtersBucket.docCount());
  299. if (filtersDTO.getNumber() > 0) {
  300. detailDTOS.add(filtersDTO);
  301. }
  302. }
  303. /**
  304. * 获取Terms聚合后数据
  305. *
  306. * @param agg
  307. * @param field
  308. * @param detailDTOS
  309. */
  310. public void getTermCountDTOS(Aggregate agg, String field, List<EsCountDetailDTO> detailDTOS) {
  311. List<StringTermsBucket> list = agg.sterms().buckets().array();
  312. list.forEach(bucket -> {
  313. EsCountDetailDTO dto = new EsCountDetailDTO();
  314. dto.setField(field);
  315. Aggregate aggregate = bucket.aggregations().get("filter_agg");
  316. dto.setName(bucket.key().stringValue());
  317. dto.setNumber(bucket.docCount());
  318. if (aggregate != null) {
  319. dto.setNumber(aggregate.filter().docCount());
  320. }
  321. if (dto.getNumber() > 0) {
  322. detailDTOS.add(dto);
  323. }
  324. });
  325. }
  326. /**
  327. * 获取children聚合后数据
  328. *
  329. * @param agg
  330. * @param field
  331. * @param detailDTOS
  332. */
  333. public void getChildCountDTOS(Aggregate agg, String field, List<EsCountDetailDTO> detailDTOS) {
  334. Aggregate childAgg = agg.children().aggregations().get("child_agg");
  335. List<StringTermsBucket> list = childAgg.sterms().buckets().array();
  336. for (StringTermsBucket bucket : list) {
  337. Aggregate aggregate = bucket.aggregations().get("filter_agg");
  338. List<FiltersBucket> list1 = aggregate.filters().buckets().array();
  339. for (FiltersBucket filtersBucket : list1) {
  340. Aggregate filtersAgg = filtersBucket.aggregations().get("filters_agg");
  341. List<StringTermsBucket> list2 = filtersAgg.sterms().buckets().array();
  342. if (!CollectionUtils.isEmpty(list2)) {
  343. for (StringTermsBucket termsBucket : list2) {
  344. EsCountDetailDTO dto = new EsCountDetailDTO();
  345. dto.setField(field);
  346. dto.setName(termsBucket.key().stringValue());
  347. dto.setNumber(termsBucket.docCount());
  348. if (dto.getNumber() > 0) {
  349. detailDTOS.add(dto);
  350. }
  351. }
  352. }
  353. }
  354. }
  355. // list.forEach(bucket -> {
  356. // EsCountDetailDTO dto = new EsCountDetailDTO();
  357. // dto.setField(field);
  358. // Aggregate aggregate = bucket.aggregations().get("filter_agg");
  359. // List<StringTermsBucket> buckets = aggregate.sterms().buckets().array();
  360. // for (StringTermsBucket termsBucket : buckets) {
  361. // dto.setName(termsBucket.key().stringValue());
  362. // dto.setNumber(termsBucket.docCount());
  363. // if (dto.getNumber() > 0) {
  364. // detailDTOS.add(dto);
  365. // }
  366. // }
  367. // });
  368. }
  369. /**
  370. * 获取children分析后数据
  371. *
  372. * @param agg
  373. * @param field
  374. * @param detailDTOS
  375. */
  376. public void getChildAnalysisDTOS(Aggregate agg, String field, List<EsCountDetailDTO> detailDTOS) {
  377. Aggregate childAgg = agg.children().aggregations().get("child_agg");
  378. List<StringTermsBucket> list = childAgg.sterms().buckets().array();
  379. for (StringTermsBucket bucket : list) {
  380. Aggregate termAgg = bucket.aggregations().get("term_agg");
  381. List<StringTermsBucket> bucketList = termAgg.sterms().buckets().array();
  382. if (!CollectionUtils.isEmpty(bucketList)) {
  383. for (StringTermsBucket termsBucket : bucketList) {
  384. EsCountDetailDTO dto = new EsCountDetailDTO();
  385. dto.setField(field);
  386. dto.setName(termsBucket.key().stringValue());
  387. Aggregate aggregate = termsBucket.aggregations().get("filterAgg");
  388. List<FiltersBucket> filtersBuckets = aggregate.filters().buckets().array();
  389. for (int i = 0; i < filtersBuckets.size() - 1; i++) {
  390. FiltersBucket filtersBucket = filtersBuckets.get(i);
  391. if (filtersBucket.docCount() > 0) {
  392. dto.setNumber(filtersBucket.docCount());
  393. if (dto.getNumber() > 0) {
  394. detailDTOS.add(dto);
  395. }
  396. break;
  397. }
  398. }
  399. }
  400. }
  401. }
  402. }
  403. /**
  404. * 获取range分析后数据
  405. *
  406. * @param agg
  407. * @param field
  408. * @param detailDTOS
  409. */
  410. public void getNumberAnalysisDTOS(Aggregate agg, String field,List<EsCountDetailDTO> detailDTOS) {
  411. List<RangeBucket> list = agg.range().buckets().array();
  412. for (RangeBucket bucket : list) {
  413. EsCountDetailDTO dto = new EsCountDetailDTO();
  414. dto.setField(field);
  415. dto.setName(bucket.key());
  416. dto.setNumber(bucket.docCount());
  417. if (dto.getNumber() > 0) {
  418. detailDTOS.add(dto);
  419. }
  420. }
  421. }
  422. /**
  423. * 获取dateHistogram聚合后数据
  424. *
  425. * @param agg
  426. * @param field
  427. * @param detailDTOS
  428. */
  429. public void getDateCountDTOS(Aggregate agg, String field, Integer topN, List<EsCountDetailDTO> detailDTOS) {
  430. List<DateHistogramBucket> list = agg.dateHistogram().buckets().array();
  431. List<EsCountDetailDTO> esCountDetailDTOS = new ArrayList<>();
  432. list.forEach(bucket -> {
  433. EsCountDetailDTO dto = new EsCountDetailDTO();
  434. dto.setField(field);
  435. Aggregate aggregate = bucket.aggregations().get("filter_agg");
  436. dto.setName(bucket.keyAsString());
  437. dto.setNumber(bucket.docCount());
  438. if (aggregate != null) {
  439. dto.setNumber(aggregate.filter().docCount());
  440. }
  441. if (dto.getNumber() > 0) {
  442. esCountDetailDTOS.add(dto);
  443. }
  444. });
  445. if (!CollectionUtils.isEmpty(esCountDetailDTOS)) {
  446. List<EsCountDetailDTO> collect = esCountDetailDTOS.stream()
  447. .sorted(Comparator.comparing(EsCountDetailDTO::getName).reversed()).limit(topN).collect(Collectors.toList());
  448. detailDTOS.addAll(collect);
  449. }
  450. }
  451. /**
  452. * 获取dateHistogram分析后数据
  453. *
  454. * @param agg
  455. * @param field
  456. * @param topN
  457. * @param detailDTOS
  458. */
  459. public void getDateAnalysisDTOS(Aggregate agg, String field, Integer topN, List<EsCountDetailDTO> detailDTOS) {
  460. List<RangeBucket> list1 = agg.dateRange().buckets().array();
  461. List<EsCountDetailDTO> esCountDetailDTOS = new ArrayList<>();
  462. for (RangeBucket bucket : list1) {
  463. EsCountDetailDTO dto = new EsCountDetailDTO();
  464. dto.setField(field);
  465. dto.setName(bucket.key());
  466. dto.setNumber(bucket.docCount());
  467. if (dto.getNumber() > 0) {
  468. esCountDetailDTOS.add(dto);
  469. }
  470. }
  471. if (!CollectionUtils.isEmpty(esCountDetailDTOS)) {
  472. List<EsCountDetailDTO> collect = esCountDetailDTOS.stream()
  473. .sorted(Comparator.comparing(EsCountDetailDTO::getName).reversed()).limit(topN).collect(Collectors.toList());
  474. detailDTOS.addAll(collect);
  475. }
  476. }
  477. /**
  478. * 获取nested聚合后数据
  479. *
  480. * @param agg
  481. * @param field
  482. * @param detailDTOS
  483. */
  484. public void getNestedCountDTOS(Aggregate agg, String field, List<EsCountDetailDTO> detailDTOS) {
  485. Aggregate termsAgg = agg.nested().aggregations().get("terms_agg");
  486. List<StringTermsBucket> list = termsAgg.sterms().buckets().array();
  487. list.forEach(bucket -> {
  488. EsCountDetailDTO dto = new EsCountDetailDTO();
  489. dto.setField(field);
  490. Aggregate aggregate = bucket.aggregations().get("filter_agg");
  491. dto.setName(bucket.key().stringValue());
  492. dto.setNumber(bucket.docCount());
  493. if (aggregate != null) {
  494. dto.setNumber(aggregate.filter().docCount());
  495. }
  496. if (dto.getNumber() > 0) {
  497. detailDTOS.add(dto);
  498. }
  499. });
  500. }
  501. }