diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java index 69e1b4c..86db886 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java @@ -55,25 +55,29 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), BzDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("BZ:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - BzDto dto = new BzDto(); - if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { - log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); - bladeRedis.set("BZ:" + stcd, i); - log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), BzDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("BZ:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + BzDto dto = new BzDto(); + if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { + log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); + bladeRedis.set("BZ:" + stcd, i); + log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("泵站数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java index 69e1b4c..86db886 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java @@ -55,25 +55,29 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), BzDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("BZ:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - BzDto dto = new BzDto(); - if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { - log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); - bladeRedis.set("BZ:" + stcd, i); - log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), BzDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("BZ:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + BzDto dto = new BzDto(); + if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { + log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); + bladeRedis.set("BZ:" + stcd, i); + log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("泵站数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java index 768cbc4..4cdc82d 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java @@ -57,26 +57,30 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), GqDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("gq:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - GqDto dto = new GqDto(); - if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); - bladeRedis.set("gq:" + stcd, i); - log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); - return; + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), GqDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("gq:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + GqDto dto = new GqDto(); + if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); + bladeRedis.set("gq:" + stcd, i); + log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); + return; + } + }); } - }); + }catch (Exception e){ + log.error("请求港渠的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java index 69e1b4c..86db886 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java @@ -55,25 +55,29 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), BzDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("BZ:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - BzDto dto = new BzDto(); - if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { - log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); - bladeRedis.set("BZ:" + stcd, i); - log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), BzDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("BZ:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + BzDto dto = new BzDto(); + if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { + log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); + bladeRedis.set("BZ:" + stcd, i); + log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("泵站数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java index 768cbc4..4cdc82d 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java @@ -57,26 +57,30 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), GqDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("gq:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - GqDto dto = new GqDto(); - if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); - bladeRedis.set("gq:" + stcd, i); - log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); - return; + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), GqDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("gq:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + GqDto dto = new GqDto(); + if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); + bladeRedis.set("gq:" + stcd, i); + log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); + return; + } + }); } - }); + }catch (Exception e){ + log.error("请求港渠的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java index 5f9cbd1..e191c81 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java @@ -56,26 +56,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HdDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hd:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HdDto dto = new HdDto(); - if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); - bladeRedis.set("hd:" + stcd,i); - log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HdDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hd:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HdDto dto = new HdDto(); + if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); + bladeRedis.set("hd:" + stcd,i); + log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求河道的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java index 69e1b4c..86db886 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java @@ -55,25 +55,29 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), BzDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("BZ:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - BzDto dto = new BzDto(); - if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { - log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); - bladeRedis.set("BZ:" + stcd, i); - log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), BzDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("BZ:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + BzDto dto = new BzDto(); + if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { + log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); + bladeRedis.set("BZ:" + stcd, i); + log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("泵站数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java index 768cbc4..4cdc82d 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java @@ -57,26 +57,30 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), GqDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("gq:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - GqDto dto = new GqDto(); - if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); - bladeRedis.set("gq:" + stcd, i); - log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); - return; + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), GqDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("gq:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + GqDto dto = new GqDto(); + if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); + bladeRedis.set("gq:" + stcd, i); + log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); + return; + } + }); } - }); + }catch (Exception e){ + log.error("请求港渠的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java index 5f9cbd1..e191c81 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java @@ -56,26 +56,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HdDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hd:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HdDto dto = new HdDto(); - if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); - bladeRedis.set("hd:" + stcd,i); - log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HdDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hd:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HdDto dto = new HdDto(); + if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); + bladeRedis.set("hd:" + stcd,i); + log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求河道的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java index 558f684..45cf4f8 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java @@ -58,26 +58,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHp().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个湖泊实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HpDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hp:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HpDto dto = new HpDto(); - if ((flag && (dto = bladeRedis.get("hp:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("湖泊redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HpDto.convert(i))); - bladeRedis.set("hp:" + stcd, i); - log.info("发送湖泊的数据:{},数据:{}", stcd, JSONObject.toJSONString(HpDto.convert(i))); - return; - } - }); + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHp().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个湖泊实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HpDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hp:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HpDto dto = new HpDto(); + if ((flag && (dto = bladeRedis.get("hp:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("湖泊redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HpDto.convert(i))); + bladeRedis.set("hp:" + stcd, i); + log.info("发送湖泊的数据:{},数据:{}", stcd, JSONObject.toJSONString(HpDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求湖泊错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java index 69e1b4c..86db886 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java @@ -55,25 +55,29 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), BzDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("BZ:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - BzDto dto = new BzDto(); - if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { - log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); - bladeRedis.set("BZ:" + stcd, i); - log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), BzDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("BZ:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + BzDto dto = new BzDto(); + if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { + log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); + bladeRedis.set("BZ:" + stcd, i); + log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("泵站数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java index 768cbc4..4cdc82d 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java @@ -57,26 +57,30 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), GqDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("gq:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - GqDto dto = new GqDto(); - if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); - bladeRedis.set("gq:" + stcd, i); - log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); - return; + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), GqDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("gq:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + GqDto dto = new GqDto(); + if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); + bladeRedis.set("gq:" + stcd, i); + log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); + return; + } + }); } - }); + }catch (Exception e){ + log.error("请求港渠的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java index 5f9cbd1..e191c81 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java @@ -56,26 +56,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HdDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hd:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HdDto dto = new HdDto(); - if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); - bladeRedis.set("hd:" + stcd,i); - log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HdDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hd:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HdDto dto = new HdDto(); + if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); + bladeRedis.set("hd:" + stcd,i); + log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求河道的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java index 558f684..45cf4f8 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java @@ -58,26 +58,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHp().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个湖泊实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HpDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hp:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HpDto dto = new HpDto(); - if ((flag && (dto = bladeRedis.get("hp:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("湖泊redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HpDto.convert(i))); - bladeRedis.set("hp:" + stcd, i); - log.info("发送湖泊的数据:{},数据:{}", stcd, JSONObject.toJSONString(HpDto.convert(i))); - return; - } - }); + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHp().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个湖泊实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HpDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hp:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HpDto dto = new HpDto(); + if ((flag && (dto = bladeRedis.get("hp:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("湖泊redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HpDto.convert(i))); + bladeRedis.set("hp:" + stcd, i); + log.info("发送湖泊的数据:{},数据:{}", stcd, JSONObject.toJSONString(HpDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求湖泊错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java index 4722f1e..5bb8d6c 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java @@ -15,10 +15,16 @@ import org.springnewfiber.dataadapter.ziguang.common.LoginAction; import org.springnewfiber.dataadapter.ziguang.common.ZiGuangConfig; import org.springnewfiber.dataadapter.ziguang.yl.dto.YlDto; +import org.springnewfiber.dataadapter.ziguang.yl.util.RainfallAccumulationUtil; +import java.math.BigDecimal; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * @program: newfiber-data-adapter @@ -36,6 +42,8 @@ private static Map baseListPramMap; private final RabbitTemplate rabbitTemplate; + private final RainfallAccumulationUtil rainfallAccumulationUtil = new RainfallAccumulationUtil(); + { baseListPramMap = Maps.newHashMap(); Map pramChildMap = Maps.newHashMap(); @@ -69,29 +77,42 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getYl().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个雨量实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), YlDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("yl:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - YlDto dto = new YlDto(); - if ((flag && (dto = bladeRedis.get("yl:" + stcd)) != null && dto.getTM().before(i.getTM())) || !flag) { - log.info("雨量redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - getData(stcd, i.getTM(), i); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(YlDto.convert(i))); - bladeRedis.set("yl:" + stcd, i); - log.info("发送雨量的数据:{},数据:{}", stcd, JSONObject.toJSONString(YlDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getYl().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个雨量实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), YlDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("yl:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + YlDto dto = new YlDto(); + if ((flag && (dto = bladeRedis.get("yl:" + stcd)) != null && dto.getTM().before(i.getTM())) || !flag) { + log.info("雨量redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); +// getData(stcd, i.getTM(), i); + Future future = rainfallAccumulationUtil.submit(i, i.getSTCD()); + //自己累计雨量 nb-plus + i.setCountPt(dto.getCountPt().add(new BigDecimal(i.getDRP()))); + try { + future.get(60, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("累计错误:stcd:{},Cause:{}", stcd, e.getCause()); + } + rabbitTemplate.convertAndSend(JSONObject.toJSONString(YlDto.convert(i))); + bladeRedis.set("yl:" + stcd, i); + log.info("发送雨量的数据:{},数据:{}", stcd, JSONObject.toJSONString(YlDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("雨量数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } + @Deprecated private void getData(String stcd, Date TM, YlDto dto) { Float one = gethourData(stcd, TM, ziGuangConfig.getYl().getOneHourUrl()); Float Three = gethourData(stcd, TM, ziGuangConfig.getYl().getThreeHourUrl()); @@ -104,6 +125,7 @@ //todo 历史每个月查询 } + @Deprecated private Float gethourData(String stcd, Date TM, String url) { Map childPramMap = Maps.newHashMap(); childPramMap.put("stcd", stcd); @@ -117,8 +139,8 @@ HttpResponse response = request.execute(); log.info("单个雨量小时数据:stcd:{},数据:{}", stcd, response.body()); List list = JSONObject.parseArray(response.body(), YlDto.class); - log.info("分钟级别时间:{},小时级别时间:{}", list.get(0).getTM(), TM); if (CollectionUtil.isNotEmpty(list) && list.get(0).getTM().compareTo(TM) == 0) { + log.info("分钟级别时间:{},小时级别时间:{}", list.get(0).getTM(), TM); return list.get(0).getDRP(); } return null; diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java index 69e1b4c..86db886 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java @@ -55,25 +55,29 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), BzDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("BZ:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - BzDto dto = new BzDto(); - if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { - log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); - bladeRedis.set("BZ:" + stcd, i); - log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), BzDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("BZ:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + BzDto dto = new BzDto(); + if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { + log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); + bladeRedis.set("BZ:" + stcd, i); + log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("泵站数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java index 768cbc4..4cdc82d 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java @@ -57,26 +57,30 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), GqDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("gq:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - GqDto dto = new GqDto(); - if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); - bladeRedis.set("gq:" + stcd, i); - log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); - return; + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), GqDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("gq:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + GqDto dto = new GqDto(); + if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); + bladeRedis.set("gq:" + stcd, i); + log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); + return; + } + }); } - }); + }catch (Exception e){ + log.error("请求港渠的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java index 5f9cbd1..e191c81 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java @@ -56,26 +56,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HdDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hd:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HdDto dto = new HdDto(); - if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); - bladeRedis.set("hd:" + stcd,i); - log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HdDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hd:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HdDto dto = new HdDto(); + if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); + bladeRedis.set("hd:" + stcd,i); + log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求河道的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java index 558f684..45cf4f8 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java @@ -58,26 +58,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHp().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个湖泊实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HpDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hp:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HpDto dto = new HpDto(); - if ((flag && (dto = bladeRedis.get("hp:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("湖泊redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HpDto.convert(i))); - bladeRedis.set("hp:" + stcd, i); - log.info("发送湖泊的数据:{},数据:{}", stcd, JSONObject.toJSONString(HpDto.convert(i))); - return; - } - }); + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHp().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个湖泊实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HpDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hp:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HpDto dto = new HpDto(); + if ((flag && (dto = bladeRedis.get("hp:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("湖泊redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HpDto.convert(i))); + bladeRedis.set("hp:" + stcd, i); + log.info("发送湖泊的数据:{},数据:{}", stcd, JSONObject.toJSONString(HpDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求湖泊错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java index 4722f1e..5bb8d6c 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java @@ -15,10 +15,16 @@ import org.springnewfiber.dataadapter.ziguang.common.LoginAction; import org.springnewfiber.dataadapter.ziguang.common.ZiGuangConfig; import org.springnewfiber.dataadapter.ziguang.yl.dto.YlDto; +import org.springnewfiber.dataadapter.ziguang.yl.util.RainfallAccumulationUtil; +import java.math.BigDecimal; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * @program: newfiber-data-adapter @@ -36,6 +42,8 @@ private static Map baseListPramMap; private final RabbitTemplate rabbitTemplate; + private final RainfallAccumulationUtil rainfallAccumulationUtil = new RainfallAccumulationUtil(); + { baseListPramMap = Maps.newHashMap(); Map pramChildMap = Maps.newHashMap(); @@ -69,29 +77,42 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getYl().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个雨量实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), YlDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("yl:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - YlDto dto = new YlDto(); - if ((flag && (dto = bladeRedis.get("yl:" + stcd)) != null && dto.getTM().before(i.getTM())) || !flag) { - log.info("雨量redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - getData(stcd, i.getTM(), i); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(YlDto.convert(i))); - bladeRedis.set("yl:" + stcd, i); - log.info("发送雨量的数据:{},数据:{}", stcd, JSONObject.toJSONString(YlDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getYl().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个雨量实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), YlDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("yl:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + YlDto dto = new YlDto(); + if ((flag && (dto = bladeRedis.get("yl:" + stcd)) != null && dto.getTM().before(i.getTM())) || !flag) { + log.info("雨量redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); +// getData(stcd, i.getTM(), i); + Future future = rainfallAccumulationUtil.submit(i, i.getSTCD()); + //自己累计雨量 nb-plus + i.setCountPt(dto.getCountPt().add(new BigDecimal(i.getDRP()))); + try { + future.get(60, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("累计错误:stcd:{},Cause:{}", stcd, e.getCause()); + } + rabbitTemplate.convertAndSend(JSONObject.toJSONString(YlDto.convert(i))); + bladeRedis.set("yl:" + stcd, i); + log.info("发送雨量的数据:{},数据:{}", stcd, JSONObject.toJSONString(YlDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("雨量数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } + @Deprecated private void getData(String stcd, Date TM, YlDto dto) { Float one = gethourData(stcd, TM, ziGuangConfig.getYl().getOneHourUrl()); Float Three = gethourData(stcd, TM, ziGuangConfig.getYl().getThreeHourUrl()); @@ -104,6 +125,7 @@ //todo 历史每个月查询 } + @Deprecated private Float gethourData(String stcd, Date TM, String url) { Map childPramMap = Maps.newHashMap(); childPramMap.put("stcd", stcd); @@ -117,8 +139,8 @@ HttpResponse response = request.execute(); log.info("单个雨量小时数据:stcd:{},数据:{}", stcd, response.body()); List list = JSONObject.parseArray(response.body(), YlDto.class); - log.info("分钟级别时间:{},小时级别时间:{}", list.get(0).getTM(), TM); if (CollectionUtil.isNotEmpty(list) && list.get(0).getTM().compareTo(TM) == 0) { + log.info("分钟级别时间:{},小时级别时间:{}", list.get(0).getTM(), TM); return list.get(0).getDRP(); } return null; diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java index 44e9dde..e85ca15 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java @@ -4,8 +4,8 @@ import lombok.Data; import org.springnewfiber.dataadapter.entity.MqNodeData; import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel; -import org.springnewfiber.dataadapter.ziguang.hp.dto.HpDto; +import java.math.BigDecimal; import java.util.Date; import java.util.Map; @@ -40,53 +40,92 @@ private Float p1; private Float p3; private Float p6; + private Float p12; private Float pd; - public static String one="p1"; - public static String Three="p3"; - public static String six="p6"; - public static String day="pd"; - public static PtReceiveBaseModel convert(YlDto dto){ - PtReceiveBaseModel model=new PtReceiveBaseModel(); + private BigDecimal countPt=BigDecimal.ZERO; + public static String one = "p1"; + public static String Three = "p3"; + public static String six = "p6"; + public static String twelve = "p12"; + public static String day = "pd"; + public static String pt = "pt"; + + public static PtReceiveBaseModel convert(YlDto dto) { + PtReceiveBaseModel model = new PtReceiveBaseModel(); model.setSt(dto.getSTCD()); model.setTt(dto.getTM()); model.setUt(new Date()); model.setSn(""); - Map dataMap= Maps.newHashMap(); - MqNodeData data= new MqNodeData(); + Map dataMap = Maps.newHashMap(); + MqNodeData data = new MqNodeData(); data.setKey("drp"); data.setValue(dto.getDRP()); data.setSn("drp"); - if(dto.getP1()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP1() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.one); p1.setValue(dto.getP1()); p1.setSn(YlDto.one); - dataMap.put(YlDto.one,p1); + dataMap.put(YlDto.one, p1); } - if(dto.getP3()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP3() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.Three); p1.setValue(dto.getP3()); p1.setSn(YlDto.Three); - dataMap.put(YlDto.Three,p1); + dataMap.put(YlDto.Three, p1); } - if(dto.getP6()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP6() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.six); p1.setValue(dto.getP6()); p1.setSn(YlDto.six); - dataMap.put(YlDto.six,p1); + dataMap.put(YlDto.six, p1); } - if(dto.getPd()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP12() != null) { + MqNodeData p1 = new MqNodeData(); + p1.setKey(YlDto.twelve); + p1.setValue(dto.getP12()); + p1.setSn(YlDto.twelve); + dataMap.put(YlDto.twelve, p1); + } + if (dto.getPd() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.day); p1.setValue(dto.getPd()); p1.setSn(YlDto.day); - dataMap.put(YlDto.day,p1); + dataMap.put(YlDto.day, p1); } - dataMap.put("drp",data); + if (dto.getCountPt() != null) { + MqNodeData p1 = new MqNodeData(); + p1.setKey(YlDto.pt); + p1.setValue(dto.getCountPt()); + p1.setSn(YlDto.pt); + dataMap.put(YlDto.pt, p1); + } + dataMap.put("drp", data); model.setDataMap(dataMap); return model; } + /** + * 累计count + * + * @param size + * @param dto + * @param countBig + */ + public void setCountBig(int size, YlDto dto, BigDecimal countBig) { + if (size == 60 / 5) { + dto.setP1(countBig.floatValue()); + } else if (size == 60 * 3 / 5) { + dto.setP3(countBig.floatValue()); + } else if (size == 60 * 6 / 5) { + dto.setP6(countBig.floatValue()); + } else if (size == 60 * 12 / 5) { + dto.setP12(countBig.floatValue()); + } else if (size == 60 * 24 / 5) { + dto.setPd(countBig.floatValue()); + } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java index 69e1b4c..86db886 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java @@ -55,25 +55,29 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), BzDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("BZ:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - BzDto dto = new BzDto(); - if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { - log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); - bladeRedis.set("BZ:" + stcd, i); - log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), BzDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("BZ:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + BzDto dto = new BzDto(); + if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { + log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); + bladeRedis.set("BZ:" + stcd, i); + log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("泵站数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java index 768cbc4..4cdc82d 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java @@ -57,26 +57,30 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), GqDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("gq:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - GqDto dto = new GqDto(); - if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); - bladeRedis.set("gq:" + stcd, i); - log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); - return; + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), GqDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("gq:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + GqDto dto = new GqDto(); + if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); + bladeRedis.set("gq:" + stcd, i); + log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); + return; + } + }); } - }); + }catch (Exception e){ + log.error("请求港渠的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java index 5f9cbd1..e191c81 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java @@ -56,26 +56,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HdDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hd:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HdDto dto = new HdDto(); - if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); - bladeRedis.set("hd:" + stcd,i); - log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HdDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hd:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HdDto dto = new HdDto(); + if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); + bladeRedis.set("hd:" + stcd,i); + log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求河道的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java index 558f684..45cf4f8 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java @@ -58,26 +58,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHp().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个湖泊实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HpDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hp:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HpDto dto = new HpDto(); - if ((flag && (dto = bladeRedis.get("hp:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("湖泊redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HpDto.convert(i))); - bladeRedis.set("hp:" + stcd, i); - log.info("发送湖泊的数据:{},数据:{}", stcd, JSONObject.toJSONString(HpDto.convert(i))); - return; - } - }); + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHp().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个湖泊实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HpDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hp:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HpDto dto = new HpDto(); + if ((flag && (dto = bladeRedis.get("hp:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("湖泊redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HpDto.convert(i))); + bladeRedis.set("hp:" + stcd, i); + log.info("发送湖泊的数据:{},数据:{}", stcd, JSONObject.toJSONString(HpDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求湖泊错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java index 4722f1e..5bb8d6c 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java @@ -15,10 +15,16 @@ import org.springnewfiber.dataadapter.ziguang.common.LoginAction; import org.springnewfiber.dataadapter.ziguang.common.ZiGuangConfig; import org.springnewfiber.dataadapter.ziguang.yl.dto.YlDto; +import org.springnewfiber.dataadapter.ziguang.yl.util.RainfallAccumulationUtil; +import java.math.BigDecimal; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * @program: newfiber-data-adapter @@ -36,6 +42,8 @@ private static Map baseListPramMap; private final RabbitTemplate rabbitTemplate; + private final RainfallAccumulationUtil rainfallAccumulationUtil = new RainfallAccumulationUtil(); + { baseListPramMap = Maps.newHashMap(); Map pramChildMap = Maps.newHashMap(); @@ -69,29 +77,42 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getYl().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个雨量实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), YlDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("yl:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - YlDto dto = new YlDto(); - if ((flag && (dto = bladeRedis.get("yl:" + stcd)) != null && dto.getTM().before(i.getTM())) || !flag) { - log.info("雨量redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - getData(stcd, i.getTM(), i); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(YlDto.convert(i))); - bladeRedis.set("yl:" + stcd, i); - log.info("发送雨量的数据:{},数据:{}", stcd, JSONObject.toJSONString(YlDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getYl().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个雨量实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), YlDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("yl:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + YlDto dto = new YlDto(); + if ((flag && (dto = bladeRedis.get("yl:" + stcd)) != null && dto.getTM().before(i.getTM())) || !flag) { + log.info("雨量redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); +// getData(stcd, i.getTM(), i); + Future future = rainfallAccumulationUtil.submit(i, i.getSTCD()); + //自己累计雨量 nb-plus + i.setCountPt(dto.getCountPt().add(new BigDecimal(i.getDRP()))); + try { + future.get(60, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("累计错误:stcd:{},Cause:{}", stcd, e.getCause()); + } + rabbitTemplate.convertAndSend(JSONObject.toJSONString(YlDto.convert(i))); + bladeRedis.set("yl:" + stcd, i); + log.info("发送雨量的数据:{},数据:{}", stcd, JSONObject.toJSONString(YlDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("雨量数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } + @Deprecated private void getData(String stcd, Date TM, YlDto dto) { Float one = gethourData(stcd, TM, ziGuangConfig.getYl().getOneHourUrl()); Float Three = gethourData(stcd, TM, ziGuangConfig.getYl().getThreeHourUrl()); @@ -104,6 +125,7 @@ //todo 历史每个月查询 } + @Deprecated private Float gethourData(String stcd, Date TM, String url) { Map childPramMap = Maps.newHashMap(); childPramMap.put("stcd", stcd); @@ -117,8 +139,8 @@ HttpResponse response = request.execute(); log.info("单个雨量小时数据:stcd:{},数据:{}", stcd, response.body()); List list = JSONObject.parseArray(response.body(), YlDto.class); - log.info("分钟级别时间:{},小时级别时间:{}", list.get(0).getTM(), TM); if (CollectionUtil.isNotEmpty(list) && list.get(0).getTM().compareTo(TM) == 0) { + log.info("分钟级别时间:{},小时级别时间:{}", list.get(0).getTM(), TM); return list.get(0).getDRP(); } return null; diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java index 44e9dde..e85ca15 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java @@ -4,8 +4,8 @@ import lombok.Data; import org.springnewfiber.dataadapter.entity.MqNodeData; import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel; -import org.springnewfiber.dataadapter.ziguang.hp.dto.HpDto; +import java.math.BigDecimal; import java.util.Date; import java.util.Map; @@ -40,53 +40,92 @@ private Float p1; private Float p3; private Float p6; + private Float p12; private Float pd; - public static String one="p1"; - public static String Three="p3"; - public static String six="p6"; - public static String day="pd"; - public static PtReceiveBaseModel convert(YlDto dto){ - PtReceiveBaseModel model=new PtReceiveBaseModel(); + private BigDecimal countPt=BigDecimal.ZERO; + public static String one = "p1"; + public static String Three = "p3"; + public static String six = "p6"; + public static String twelve = "p12"; + public static String day = "pd"; + public static String pt = "pt"; + + public static PtReceiveBaseModel convert(YlDto dto) { + PtReceiveBaseModel model = new PtReceiveBaseModel(); model.setSt(dto.getSTCD()); model.setTt(dto.getTM()); model.setUt(new Date()); model.setSn(""); - Map dataMap= Maps.newHashMap(); - MqNodeData data= new MqNodeData(); + Map dataMap = Maps.newHashMap(); + MqNodeData data = new MqNodeData(); data.setKey("drp"); data.setValue(dto.getDRP()); data.setSn("drp"); - if(dto.getP1()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP1() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.one); p1.setValue(dto.getP1()); p1.setSn(YlDto.one); - dataMap.put(YlDto.one,p1); + dataMap.put(YlDto.one, p1); } - if(dto.getP3()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP3() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.Three); p1.setValue(dto.getP3()); p1.setSn(YlDto.Three); - dataMap.put(YlDto.Three,p1); + dataMap.put(YlDto.Three, p1); } - if(dto.getP6()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP6() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.six); p1.setValue(dto.getP6()); p1.setSn(YlDto.six); - dataMap.put(YlDto.six,p1); + dataMap.put(YlDto.six, p1); } - if(dto.getPd()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP12() != null) { + MqNodeData p1 = new MqNodeData(); + p1.setKey(YlDto.twelve); + p1.setValue(dto.getP12()); + p1.setSn(YlDto.twelve); + dataMap.put(YlDto.twelve, p1); + } + if (dto.getPd() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.day); p1.setValue(dto.getPd()); p1.setSn(YlDto.day); - dataMap.put(YlDto.day,p1); + dataMap.put(YlDto.day, p1); } - dataMap.put("drp",data); + if (dto.getCountPt() != null) { + MqNodeData p1 = new MqNodeData(); + p1.setKey(YlDto.pt); + p1.setValue(dto.getCountPt()); + p1.setSn(YlDto.pt); + dataMap.put(YlDto.pt, p1); + } + dataMap.put("drp", data); model.setDataMap(dataMap); return model; } + /** + * 累计count + * + * @param size + * @param dto + * @param countBig + */ + public void setCountBig(int size, YlDto dto, BigDecimal countBig) { + if (size == 60 / 5) { + dto.setP1(countBig.floatValue()); + } else if (size == 60 * 3 / 5) { + dto.setP3(countBig.floatValue()); + } else if (size == 60 * 6 / 5) { + dto.setP6(countBig.floatValue()); + } else if (size == 60 * 12 / 5) { + dto.setP12(countBig.floatValue()); + } else if (size == 60 * 24 / 5) { + dto.setPd(countBig.floatValue()); + } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/util/RainfallAccumulationUtil.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/util/RainfallAccumulationUtil.java new file mode 100644 index 0000000..6839315 --- /dev/null +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/util/RainfallAccumulationUtil.java @@ -0,0 +1,218 @@ +package org.springnewfiber.dataadapter.ziguang.yl.util; + +import cn.hutool.core.thread.NamedThreadFactory; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springnewfiber.dataadapter.ziguang.yl.dto.YlDto; + +import javax.annotation.concurrent.ThreadSafe; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +/** + * @program: newwater-data-engine + * @description: 累计雨量 + * @author: djt + * @create: 2022-04-06 09:09 + **/ +@Getter +@ThreadSafe +@Slf4j +public class RainfallAccumulationUtil { + private final static Map map = Maps.newHashMap(); + private final ExecutorService executor; + + public Future submit(YlDto ylDto, String stKeySnKey) { +// handle(ylDto, stKeySnKey); + return executor.submit(() -> handle(ylDto, stKeySnKey)); + } + + private void handle(YlDto ylDto, String stKeySnKey) { + if (map.containsKey(stKeySnKey) && map.get(stKeySnKey) != null) { + map.get(stKeySnKey).doHandle(ylDto); + } else { + NodeQueuedGroup var = new NodeQueuedGroup(stKeySnKey); + var.doHandle(ylDto); + map.put(stKeySnKey, var); + } + } + + public RainfallAccumulationUtil() { + executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("RainfallAccumulation", Boolean.FALSE)); + } + + static final class NodeQueuedGroup { + private final NodeQueued oneNodeQueued = new NodeQueued(60 * 1 / 5); + private final NodeQueued threeNodeQueued = new NodeQueued(60 * 3 / 5); + private final NodeQueued sixNodeQueued = new NodeQueued(60 * 6 / 5); + private final NodeQueued twelveNodeQueued = new NodeQueued(60 * 12 / 5); + private final NodeQueued twentyFourNodeQueued = new NodeQueued(60 * 24 / 5); + // private final NodeQueued oneNodeQueued = new NodeQueued(10); +// private final NodeQueued threeNodeQueued = new NodeQueued(20); +// private final NodeQueued sixNodeQueued = new NodeQueued(30); +// private final NodeQueued twelveNodeQueued = new NodeQueued(40); +// private final NodeQueued twentyFourNodeQueued = new NodeQueued(50); + List handleList = Lists.newArrayList(oneNodeQueued, threeNodeQueued, sixNodeQueued, twelveNodeQueued, twentyFourNodeQueued); + // List handleList = Lists.newArrayList(oneNodeQueued); + private String stKeySnKey; + + NodeQueuedGroup(String stKeySnKey) { + this.stKeySnKey = stKeySnKey; + } + + private void doHandle(YlDto ylDto) { + handleList.forEach(i -> { + log.info("当前处理队列为:{}", i.size); + count(ylDto, i); + ylDto.setCountBig(i.size, ylDto, i.countBig); + log.info("处理队列完成队列为:{}", i.size); + }); + } + } + + /** + * 基础队列 + */ + static final class Node { + volatile Node prev; + volatile Node next; + volatile YlDto ylDto; + + final Node predecessor() throws NullPointerException { + Node p = prev; + if (p == null) { + throw new NullPointerException(); + } else { + return p; + } + } + + private Node(YlDto ylDto) { + this.ylDto = ylDto; + } + + Node() { + } + } + + /** + * @author djt + * @creed: Talk is cheap,show me the code + * @date 2022/4/6 9:58 + * @description: 主队列类,用于计算 + */ + static final class NodeQueued { + private volatile Node head; + private volatile Node tail; + /** + * 队列限定个数 + */ + private volatile int size; + /** + * 队列累计值 + */ + private volatile BigDecimal countBig = BigDecimal.ZERO; + + NodeQueued(int size) { + this.size = size; + } + + public final int getQueueLength() { + int n = 0; + for (Node p = tail; p != null; p = p.prev) { + ++n; + } + return n; + } + + //入栈 + private Node enq(Node node) { + for (; ; ) { + Node t = tail; + if (head == null) { // Must initialize + head = node; + return node; + } else if (t != null) { + node.prev = t; + t.next = node; + tail = node; + return t; + } else if (t == null) { + tail = node; + node.prev = head; + head.next = node; + return node; + } + } + } + + /** + * @param node + * @return 返回移除的头部节点 + * @author djt + * @creed: Talk is cheap,show me the code + * @date 2022/4/6 11:17 + * @description: 出栈,入栈 + */ + private Node outq(Node node) { + for (; ; ) { + Node t = head; + if (t == null) { + throw new RuntimeException("头节点为null"); + } else { + head = t.next; + head.prev = null; + enq(node); + return t; + } + } + } + + //计算累计值 + private void countBig(Node node) { + if (node != null && node.ylDto != null) { + log.info("处理数据:{},key:{},处理前累计:{}", node.ylDto.getDRP(), node.ylDto.getSTCD(), countBig); + if (getQueueLength() == size) { + //弹出头部,进入尾部,(个数相等,并计算累计,累计减去头部,加上尾部值) + Node oldHeadNode = outq(node); + countBig = countBig + .subtract(new BigDecimal(oldHeadNode.ylDto.getDRP())) + .add(new BigDecimal(node.ylDto.getDRP())); + + log.info("满:{},去掉头部",size); + } else { + //队列未满时,入栈时,累计 + enq(node); + countBig = countBig.add(new BigDecimal(node.ylDto.getDRP())); + } + log.info("处理数据:{},key:{},处理后累计:{}", node.ylDto.getDRP(), node.ylDto.getSTCD(), countBig); + return; + } + log.error("node为null:{}", JSONObject.toJSONString(node)); + } + } + + private static boolean checkNode(YlDto ylDto) { + if (ylDto == null || ylDto.getDRP() == null) { + log.error("数据为null"); + return false; + } else { + return true; + } + } + + private static void count(YlDto ylDto, NodeQueued nodeQueued) { + if (checkNode(ylDto)) { + nodeQueued.countBig(new Node(ylDto)); + } + } + +} diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java index 69e1b4c..86db886 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/bz/action/BzAction.java @@ -55,25 +55,29 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), BzDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("BZ:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - BzDto dto = new BzDto(); - if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { - log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); - bladeRedis.set("BZ:" + stcd, i); - log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getBz().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("泵站单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), BzDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("BZ:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + BzDto dto = new BzDto(); + if ((flag && (dto = bladeRedis.get("BZ:" + stcd)) != null && DateUtil.parse(dto.getTM()).before(DateUtil.parse(i.getTM()))) || !flag) { + log.info("泵站redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(BzDto.convert(i))); + bladeRedis.set("BZ:" + stcd, i); + log.info("发送泵站的数据:code:{},data:{}", stcd, JSONObject.toJSONString(BzDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("泵站数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java index 768cbc4..4cdc82d 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/gq/action/GqAction.java @@ -57,26 +57,30 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), GqDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("gq:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - GqDto dto = new GqDto(); - if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); - bladeRedis.set("gq:" + stcd, i); - log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); - return; + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getGq().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("港渠单个实时数据,stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), GqDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("gq:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + GqDto dto = new GqDto(); + if ((flag && (dto = bladeRedis.get("gq:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("港渠redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(GqDto.convert(i))); + bladeRedis.set("gq:" + stcd, i); + log.info("发送港渠的数据:{},数据:{}", stcd, JSONObject.toJSONString(GqDto.convert(i))); + return; + } + }); } - }); + }catch (Exception e){ + log.error("请求港渠的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java index 5f9cbd1..e191c81 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hd/action/HdAction.java @@ -56,26 +56,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HdDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hd:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HdDto dto = new HdDto(); - if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); - bladeRedis.set("hd:" + stcd,i); - log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd)); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHd().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个河道实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HdDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hd:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HdDto dto = new HdDto(); + if ((flag && (dto = bladeRedis.get("hd:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("河道redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HdDto.convert(i))); + bladeRedis.set("hd:" + stcd,i); + log.info("发送河道的数据:{},数据:{}", stcd, JSONObject.toJSONString(HdDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求河道的数据错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java index 558f684..45cf4f8 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/hp/action/HpAction.java @@ -58,26 +58,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHp().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个湖泊实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), HpDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("hp:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - HpDto dto = new HpDto(); - if ((flag && (dto = bladeRedis.get("hp:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("湖泊redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(HpDto.convert(i))); - bladeRedis.set("hp:" + stcd, i); - log.info("发送湖泊的数据:{},数据:{}", stcd, JSONObject.toJSONString(HpDto.convert(i))); - return; - } - }); + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getHp().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个湖泊实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), HpDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("hp:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + HpDto dto = new HpDto(); + if ((flag && (dto = bladeRedis.get("hp:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("湖泊redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(HpDto.convert(i))); + bladeRedis.set("hp:" + stcd, i); + log.info("发送湖泊的数据:{},数据:{}", stcd, JSONObject.toJSONString(HpDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("请求湖泊错误:stcd:{},exception:{}",stcd,e.getCause()); } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java index 4722f1e..5bb8d6c 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/action/YlAction.java @@ -15,10 +15,16 @@ import org.springnewfiber.dataadapter.ziguang.common.LoginAction; import org.springnewfiber.dataadapter.ziguang.common.ZiGuangConfig; import org.springnewfiber.dataadapter.ziguang.yl.dto.YlDto; +import org.springnewfiber.dataadapter.ziguang.yl.util.RainfallAccumulationUtil; +import java.math.BigDecimal; import java.util.Date; import java.util.List; import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; /** * @program: newfiber-data-adapter @@ -36,6 +42,8 @@ private static Map baseListPramMap; private final RabbitTemplate rabbitTemplate; + private final RainfallAccumulationUtil rainfallAccumulationUtil = new RainfallAccumulationUtil(); + { baseListPramMap = Maps.newHashMap(); Map pramChildMap = Maps.newHashMap(); @@ -69,29 +77,42 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getYl().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个雨量实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), YlDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("yl:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i -> { - YlDto dto = new YlDto(); - if ((flag && (dto = bladeRedis.get("yl:" + stcd)) != null && dto.getTM().before(i.getTM())) || !flag) { - log.info("雨量redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - getData(stcd, i.getTM(), i); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(YlDto.convert(i))); - bladeRedis.set("yl:" + stcd, i); - log.info("发送雨量的数据:{},数据:{}", stcd, JSONObject.toJSONString(YlDto.convert(i))); - return; - } - }); + try { + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getYl().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个雨量实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), YlDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("yl:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i -> { + YlDto dto = new YlDto(); + if ((flag && (dto = bladeRedis.get("yl:" + stcd)) != null && dto.getTM().before(i.getTM())) || !flag) { + log.info("雨量redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); +// getData(stcd, i.getTM(), i); + Future future = rainfallAccumulationUtil.submit(i, i.getSTCD()); + //自己累计雨量 nb-plus + i.setCountPt(dto.getCountPt().add(new BigDecimal(i.getDRP()))); + try { + future.get(60, TimeUnit.SECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("累计错误:stcd:{},Cause:{}", stcd, e.getCause()); + } + rabbitTemplate.convertAndSend(JSONObject.toJSONString(YlDto.convert(i))); + bladeRedis.set("yl:" + stcd, i); + log.info("发送雨量的数据:{},数据:{}", stcd, JSONObject.toJSONString(YlDto.convert(i))); + return; + } + }); + } + } catch (Exception e) { + log.error("雨量数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } } + @Deprecated private void getData(String stcd, Date TM, YlDto dto) { Float one = gethourData(stcd, TM, ziGuangConfig.getYl().getOneHourUrl()); Float Three = gethourData(stcd, TM, ziGuangConfig.getYl().getThreeHourUrl()); @@ -104,6 +125,7 @@ //todo 历史每个月查询 } + @Deprecated private Float gethourData(String stcd, Date TM, String url) { Map childPramMap = Maps.newHashMap(); childPramMap.put("stcd", stcd); @@ -117,8 +139,8 @@ HttpResponse response = request.execute(); log.info("单个雨量小时数据:stcd:{},数据:{}", stcd, response.body()); List list = JSONObject.parseArray(response.body(), YlDto.class); - log.info("分钟级别时间:{},小时级别时间:{}", list.get(0).getTM(), TM); if (CollectionUtil.isNotEmpty(list) && list.get(0).getTM().compareTo(TM) == 0) { + log.info("分钟级别时间:{},小时级别时间:{}", list.get(0).getTM(), TM); return list.get(0).getDRP(); } return null; diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java index 44e9dde..e85ca15 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/dto/YlDto.java @@ -4,8 +4,8 @@ import lombok.Data; import org.springnewfiber.dataadapter.entity.MqNodeData; import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel; -import org.springnewfiber.dataadapter.ziguang.hp.dto.HpDto; +import java.math.BigDecimal; import java.util.Date; import java.util.Map; @@ -40,53 +40,92 @@ private Float p1; private Float p3; private Float p6; + private Float p12; private Float pd; - public static String one="p1"; - public static String Three="p3"; - public static String six="p6"; - public static String day="pd"; - public static PtReceiveBaseModel convert(YlDto dto){ - PtReceiveBaseModel model=new PtReceiveBaseModel(); + private BigDecimal countPt=BigDecimal.ZERO; + public static String one = "p1"; + public static String Three = "p3"; + public static String six = "p6"; + public static String twelve = "p12"; + public static String day = "pd"; + public static String pt = "pt"; + + public static PtReceiveBaseModel convert(YlDto dto) { + PtReceiveBaseModel model = new PtReceiveBaseModel(); model.setSt(dto.getSTCD()); model.setTt(dto.getTM()); model.setUt(new Date()); model.setSn(""); - Map dataMap= Maps.newHashMap(); - MqNodeData data= new MqNodeData(); + Map dataMap = Maps.newHashMap(); + MqNodeData data = new MqNodeData(); data.setKey("drp"); data.setValue(dto.getDRP()); data.setSn("drp"); - if(dto.getP1()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP1() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.one); p1.setValue(dto.getP1()); p1.setSn(YlDto.one); - dataMap.put(YlDto.one,p1); + dataMap.put(YlDto.one, p1); } - if(dto.getP3()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP3() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.Three); p1.setValue(dto.getP3()); p1.setSn(YlDto.Three); - dataMap.put(YlDto.Three,p1); + dataMap.put(YlDto.Three, p1); } - if(dto.getP6()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP6() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.six); p1.setValue(dto.getP6()); p1.setSn(YlDto.six); - dataMap.put(YlDto.six,p1); + dataMap.put(YlDto.six, p1); } - if(dto.getPd()!=null){ - MqNodeData p1= new MqNodeData(); + if (dto.getP12() != null) { + MqNodeData p1 = new MqNodeData(); + p1.setKey(YlDto.twelve); + p1.setValue(dto.getP12()); + p1.setSn(YlDto.twelve); + dataMap.put(YlDto.twelve, p1); + } + if (dto.getPd() != null) { + MqNodeData p1 = new MqNodeData(); p1.setKey(YlDto.day); p1.setValue(dto.getPd()); p1.setSn(YlDto.day); - dataMap.put(YlDto.day,p1); + dataMap.put(YlDto.day, p1); } - dataMap.put("drp",data); + if (dto.getCountPt() != null) { + MqNodeData p1 = new MqNodeData(); + p1.setKey(YlDto.pt); + p1.setValue(dto.getCountPt()); + p1.setSn(YlDto.pt); + dataMap.put(YlDto.pt, p1); + } + dataMap.put("drp", data); model.setDataMap(dataMap); return model; } + /** + * 累计count + * + * @param size + * @param dto + * @param countBig + */ + public void setCountBig(int size, YlDto dto, BigDecimal countBig) { + if (size == 60 / 5) { + dto.setP1(countBig.floatValue()); + } else if (size == 60 * 3 / 5) { + dto.setP3(countBig.floatValue()); + } else if (size == 60 * 6 / 5) { + dto.setP6(countBig.floatValue()); + } else if (size == 60 * 12 / 5) { + dto.setP12(countBig.floatValue()); + } else if (size == 60 * 24 / 5) { + dto.setPd(countBig.floatValue()); + } + } } diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/util/RainfallAccumulationUtil.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/util/RainfallAccumulationUtil.java new file mode 100644 index 0000000..6839315 --- /dev/null +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/yl/util/RainfallAccumulationUtil.java @@ -0,0 +1,218 @@ +package org.springnewfiber.dataadapter.ziguang.yl.util; + +import cn.hutool.core.thread.NamedThreadFactory; +import com.alibaba.fastjson.JSONObject; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.springnewfiber.dataadapter.ziguang.yl.dto.YlDto; + +import javax.annotation.concurrent.ThreadSafe; +import java.math.BigDecimal; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; + +/** + * @program: newwater-data-engine + * @description: 累计雨量 + * @author: djt + * @create: 2022-04-06 09:09 + **/ +@Getter +@ThreadSafe +@Slf4j +public class RainfallAccumulationUtil { + private final static Map map = Maps.newHashMap(); + private final ExecutorService executor; + + public Future submit(YlDto ylDto, String stKeySnKey) { +// handle(ylDto, stKeySnKey); + return executor.submit(() -> handle(ylDto, stKeySnKey)); + } + + private void handle(YlDto ylDto, String stKeySnKey) { + if (map.containsKey(stKeySnKey) && map.get(stKeySnKey) != null) { + map.get(stKeySnKey).doHandle(ylDto); + } else { + NodeQueuedGroup var = new NodeQueuedGroup(stKeySnKey); + var.doHandle(ylDto); + map.put(stKeySnKey, var); + } + } + + public RainfallAccumulationUtil() { + executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("RainfallAccumulation", Boolean.FALSE)); + } + + static final class NodeQueuedGroup { + private final NodeQueued oneNodeQueued = new NodeQueued(60 * 1 / 5); + private final NodeQueued threeNodeQueued = new NodeQueued(60 * 3 / 5); + private final NodeQueued sixNodeQueued = new NodeQueued(60 * 6 / 5); + private final NodeQueued twelveNodeQueued = new NodeQueued(60 * 12 / 5); + private final NodeQueued twentyFourNodeQueued = new NodeQueued(60 * 24 / 5); + // private final NodeQueued oneNodeQueued = new NodeQueued(10); +// private final NodeQueued threeNodeQueued = new NodeQueued(20); +// private final NodeQueued sixNodeQueued = new NodeQueued(30); +// private final NodeQueued twelveNodeQueued = new NodeQueued(40); +// private final NodeQueued twentyFourNodeQueued = new NodeQueued(50); + List handleList = Lists.newArrayList(oneNodeQueued, threeNodeQueued, sixNodeQueued, twelveNodeQueued, twentyFourNodeQueued); + // List handleList = Lists.newArrayList(oneNodeQueued); + private String stKeySnKey; + + NodeQueuedGroup(String stKeySnKey) { + this.stKeySnKey = stKeySnKey; + } + + private void doHandle(YlDto ylDto) { + handleList.forEach(i -> { + log.info("当前处理队列为:{}", i.size); + count(ylDto, i); + ylDto.setCountBig(i.size, ylDto, i.countBig); + log.info("处理队列完成队列为:{}", i.size); + }); + } + } + + /** + * 基础队列 + */ + static final class Node { + volatile Node prev; + volatile Node next; + volatile YlDto ylDto; + + final Node predecessor() throws NullPointerException { + Node p = prev; + if (p == null) { + throw new NullPointerException(); + } else { + return p; + } + } + + private Node(YlDto ylDto) { + this.ylDto = ylDto; + } + + Node() { + } + } + + /** + * @author djt + * @creed: Talk is cheap,show me the code + * @date 2022/4/6 9:58 + * @description: 主队列类,用于计算 + */ + static final class NodeQueued { + private volatile Node head; + private volatile Node tail; + /** + * 队列限定个数 + */ + private volatile int size; + /** + * 队列累计值 + */ + private volatile BigDecimal countBig = BigDecimal.ZERO; + + NodeQueued(int size) { + this.size = size; + } + + public final int getQueueLength() { + int n = 0; + for (Node p = tail; p != null; p = p.prev) { + ++n; + } + return n; + } + + //入栈 + private Node enq(Node node) { + for (; ; ) { + Node t = tail; + if (head == null) { // Must initialize + head = node; + return node; + } else if (t != null) { + node.prev = t; + t.next = node; + tail = node; + return t; + } else if (t == null) { + tail = node; + node.prev = head; + head.next = node; + return node; + } + } + } + + /** + * @param node + * @return 返回移除的头部节点 + * @author djt + * @creed: Talk is cheap,show me the code + * @date 2022/4/6 11:17 + * @description: 出栈,入栈 + */ + private Node outq(Node node) { + for (; ; ) { + Node t = head; + if (t == null) { + throw new RuntimeException("头节点为null"); + } else { + head = t.next; + head.prev = null; + enq(node); + return t; + } + } + } + + //计算累计值 + private void countBig(Node node) { + if (node != null && node.ylDto != null) { + log.info("处理数据:{},key:{},处理前累计:{}", node.ylDto.getDRP(), node.ylDto.getSTCD(), countBig); + if (getQueueLength() == size) { + //弹出头部,进入尾部,(个数相等,并计算累计,累计减去头部,加上尾部值) + Node oldHeadNode = outq(node); + countBig = countBig + .subtract(new BigDecimal(oldHeadNode.ylDto.getDRP())) + .add(new BigDecimal(node.ylDto.getDRP())); + + log.info("满:{},去掉头部",size); + } else { + //队列未满时,入栈时,累计 + enq(node); + countBig = countBig.add(new BigDecimal(node.ylDto.getDRP())); + } + log.info("处理数据:{},key:{},处理后累计:{}", node.ylDto.getDRP(), node.ylDto.getSTCD(), countBig); + return; + } + log.error("node为null:{}", JSONObject.toJSONString(node)); + } + } + + private static boolean checkNode(YlDto ylDto) { + if (ylDto == null || ylDto.getDRP() == null) { + log.error("数据为null"); + return false; + } else { + return true; + } + } + + private static void count(YlDto ylDto, NodeQueued nodeQueued) { + if (checkNode(ylDto)) { + nodeQueued.countBig(new Node(ylDto)); + } + } + +} diff --git a/src/main/java/org/springnewfiber/dataadapter/ziguang/zs/action/ZsAction.java b/src/main/java/org/springnewfiber/dataadapter/ziguang/zs/action/ZsAction.java index e5168a8..5824d6b 100644 --- a/src/main/java/org/springnewfiber/dataadapter/ziguang/zs/action/ZsAction.java +++ b/src/main/java/org/springnewfiber/dataadapter/ziguang/zs/action/ZsAction.java @@ -58,36 +58,31 @@ * @param stcd */ public void realTimeData(String stcd) { - Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); - HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getZs().getRealUrl()); - request.body(JSONObject.toJSONString(listByPramMap)); - HttpResponse response = request.execute(); - log.info("单个渍水实时数据:stcd:{},数据:{}", stcd, response.body()); - List list = JSONObject.parseArray(response.body(), ZsDto.class); - if (CollectionUtil.isNotEmpty(list)) { - Boolean flag = bladeRedis.exists("zs:" + stcd); - List newList = CollectionUtil.reverseNew(list); - newList.forEach(i->{ - ZsDto dto = new ZsDto(); - if ((flag && (dto = bladeRedis.get("zs:" + stcd)) != null && dto.getTM().before(i.getTM())) - || !flag) { - log.info("渍水redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(ZsDto.convert(i))); - bladeRedis.set("zs:" + stcd, i); - log.info("发送渍水的数据:{},数据:{}", stcd, JSONObject.toJSONString(ZsDto.convert(i))); - return; - } - }); + try{ + Map listByPramMap = ImmutableMap.of("searchFilters", ImmutableMap.of("stcd", stcd, "datatime", DateUtil.today())); + HttpRequest request = loginAction.getPostAuthRequest(ziGuangConfig.getZs().getRealUrl()); + request.body(JSONObject.toJSONString(listByPramMap)); + HttpResponse response = request.execute(); + log.info("单个渍水实时数据:stcd:{},数据:{}", stcd, response.body()); + List list = JSONObject.parseArray(response.body(), ZsDto.class); + if (CollectionUtil.isNotEmpty(list)) { + Boolean flag = bladeRedis.exists("zs:" + stcd); + List newList = CollectionUtil.reverseNew(list); + newList.forEach(i->{ + ZsDto dto = new ZsDto(); + if ((flag && (dto = bladeRedis.get("zs:" + stcd)) != null && dto.getTM().before(i.getTM())) + || !flag) { + log.info("渍水redis的数据:{},请求到的最新数据:{}", JSONObject.toJSONString(dto), JSONObject.toJSONString(i.getTM())); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(ZsDto.convert(i))); + bladeRedis.set("zs:" + stcd, i); + log.info("发送渍水的数据:{},数据:{}", stcd, JSONObject.toJSONString(ZsDto.convert(i))); + return; + } + }); + } + }catch (Exception e){ + log.error("渍水数据出错:stcd:{},Cause:{}", stcd, e.getCause()); } - } -// public static void main(String[] args) { -// List stringList= Lists.newArrayList("1","2","3"); -// stringList.forEach(i->{ -// if(Integer.valueOf(i)==2){ -// return; -// } -// System.out.println(i); -// }); -// } + } }