diff --git a/src/main/java/org/springnewfiber/dataadapter/sswj/core/SswjCoreJob.java b/src/main/java/org/springnewfiber/dataadapter/sswj/core/SswjCoreJob.java index a5e06a4..4b88007 100644 --- a/src/main/java/org/springnewfiber/dataadapter/sswj/core/SswjCoreJob.java +++ b/src/main/java/org/springnewfiber/dataadapter/sswj/core/SswjCoreJob.java @@ -63,73 +63,45 @@ try { Class clazz = baseSwjEntity.getClass(); String name = clazz.getSimpleName(); - if (CollUtil.isNotEmpty(redis.keys(name.concat(StringPool.COLON).concat(StringPool.ASTERISK)))) { - //已经有 - QueryWrapper qw = new QueryWrapper<>(); - qw.groupBy("DATA_UP_UUID"); - qw.select("DATA_UP_UUID"); - //第一次初始化执行 - List list = service.list(qw); - list.forEach(i -> { - String key = name.concat(StringPool.COLON).concat(i.getDataUpUuid()); - BaseSwjEntity redisObject; - if (redis.exists(key) && (redisObject = redis.get(key)) != null && redisObject.getTTDate() != null) { - QueryWrapper selectQwId = new QueryWrapper<>(); - selectQwId.orderByAsc(baseSwjEntity.getTTFileName()); - selectQwId.eq("DATA_UP_UUID", redisObject.getDataUpUuid()); - selectQwId.gt(baseSwjEntity.getTTFileName(), redisObject.getTTDate()); - List nowList = service.list(selectQwId); - if (CollUtil.isNotEmpty(nowList)) { - nowList.forEach(now -> { - PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(now); - redis.set(key, now); - //发送mq - log.info("发送数据:{},点位:{},具体数据:{}", name, now.getDataUpUuid(), JSONObject.toJSONString(model)); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); - }); - } - }else { - QueryWrapper selectQwId = new QueryWrapper<>(); - selectQwId.orderByAsc(baseSwjEntity.getTTFileName()); - selectQwId.eq("DATA_UP_UUID", i); - List nowList = service.list(selectQwId); - if (CollUtil.isNotEmpty(nowList)) { - nowList.forEach(now -> { - PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(now); - redis.set(key, now); - //发送mq - log.info("发送数据:{},点位:{},具体数据:{}", name, now.getDataUpUuid(), JSONObject.toJSONString(model)); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); - }); - } - } - }); - } else { - QueryWrapper qw = new QueryWrapper<>(); - qw.groupBy("DATA_UP_UUID"); - qw.select("DATA_UP_UUID"); - //第一次初始化执行 - List list = service.list(qw); - list.forEach(i -> { + QueryWrapper qw = new QueryWrapper<>(); + qw.groupBy("DATA_UP_UUID"); + qw.select("DATA_UP_UUID"); + //第一次初始化执行 + List list = service.list(qw); + list.forEach(i -> { + String key = name.concat(StringPool.COLON).concat(i.getDataUpUuid()); + BaseSwjEntity redisObject; + if (redis.exists(key) && (redisObject = redis.get(key)) != null && redisObject.getTTDate() != null) { QueryWrapper selectQwId = new QueryWrapper<>(); - selectQwId.orderByDesc(baseSwjEntity.getTTFileName()); - selectQwId.last("limit 0,1"); - selectQwId.eq("DATA_UP_UUID", i.getDataUpUuid()); - List stRealDataList = service.list(selectQwId); - if (CollUtil.isNotEmpty(stRealDataList)) { - BaseSwjEntity entity = stRealDataList.get(0); - PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(entity); - if (null == model) { - return; - } else { - redis.set(name.concat(StringPool.COLON).concat(model.getSt()), entity); + selectQwId.orderByAsc(baseSwjEntity.getTTFileName()); + selectQwId.eq("DATA_UP_UUID", redisObject.getDataUpUuid()); + selectQwId.gt(baseSwjEntity.getTTFileName(), redisObject.getTTDate()); + List nowList = service.list(selectQwId); + if (CollUtil.isNotEmpty(nowList)) { + nowList.forEach(now -> { + PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(now); + redis.set(key, now); //发送mq - log.info("发送数据:{},点位:{},具体数据:{}", name, model.getSt(), JSONObject.toJSONString(model)); + log.info("发送数据:{},点位:{},具体数据:{}", name, now.getDataUpUuid(), JSONObject.toJSONString(model)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); - } + }); } - }); - } + } else { + QueryWrapper selectQwId = new QueryWrapper<>(); + selectQwId.orderByAsc(baseSwjEntity.getTTFileName()); + selectQwId.eq("DATA_UP_UUID", i); + List nowList = service.list(selectQwId); + if (CollUtil.isNotEmpty(nowList)) { + nowList.forEach(now -> { + PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(now); + redis.set(key, now); + //发送mq + log.info("发送数据:{},点位:{},具体数据:{}", name, now.getDataUpUuid(), JSONObject.toJSONString(model)); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); + }); + } + } + }); } catch (Exception e) { log.error("发生错误:{},e:{}", baseSwjEntity.getClass().getSimpleName(), e.getStackTrace()); } @@ -138,77 +110,49 @@ private void zgRainData() { try { String name = ZgRainData.class.getSimpleName(); - if (CollUtil.isNotEmpty(redis.keys(name.concat(StringPool.COLON).concat(StringPool.ASTERISK)))) { - //已经有 - QueryWrapper qw = new QueryWrapper<>(); - qw.groupBy("STCD"); - qw.select("STCD"); - //第一次初始化执行 - List list = zgRainDataMapper.selectList(qw); - list.forEach(i -> { - String key = name.concat(StringPool.COLON).concat(i.getSTCD()); - ZgRainData redisObject; - if ((redis.exists(key) && (redisObject = redis.get(key)) != null && redisObject.getTM() != null)) { - QueryWrapper selectQwId = new QueryWrapper<>(); - selectQwId.orderByAsc("TM"); - selectQwId.eq("STCD", i.getSTCD()); - selectQwId.gt("TM", redisObject.getTM()); - List nowList = zgRainDataMapper.selectList(selectQwId); - if (CollUtil.isNotEmpty(nowList)) { - for (ZgRainData zgRainData : nowList) { - redisObject = redis.get(key); - zgRainData.setPt(redisObject.getPt().add(zgRainData.getDRP())); - PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(zgRainData); - redis.set(key, zgRainData); - //发送mq - log.info("发送数据:{},点位:{},具体数据:{}", name, zgRainData.getSTCD(), JSONObject.toJSONString(model)); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); - } - } - } else { - QueryWrapper selectQwId = new QueryWrapper<>(); - selectQwId.orderByAsc("TM"); - selectQwId.eq("STCD", i.getSTCD()); - List nowList = zgRainDataMapper.selectList(selectQwId); - if (CollUtil.isNotEmpty(nowList)) { - for (ZgRainData zgRainData : nowList) { - redisObject = redis.get(key); - zgRainData.setPt(redisObject==null? BigDecimal.ZERO.add(zgRainData.getDRP()):redisObject.getPt().add(zgRainData.getDRP())); - PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(zgRainData); - redis.set(key, zgRainData); - //发送mq - log.info("发送数据:{},点位:{},具体数据:{}", name, zgRainData.getSTCD(), JSONObject.toJSONString(model)); - rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); - } - } - } - }); - } else { - QueryWrapper qw = new QueryWrapper<>(); - qw.groupBy("STCD"); - qw.select("STCD"); - //第一次初始化执行 - List list = zgRainDataMapper.selectList(qw); - list.forEach(i -> { + QueryWrapper qw = new QueryWrapper<>(); + qw.groupBy("STCD"); + qw.select("STCD"); + //第一次初始化执行 + List list = zgRainDataMapper.selectList(qw); + list.forEach(i -> { + String key = name.concat(StringPool.COLON).concat(i.getSTCD()); + ZgRainData redisObject; + if ((redis.exists(key) && (redisObject = redis.get(key)) != null && redisObject.getTM() != null)) { QueryWrapper selectQwId = new QueryWrapper<>(); selectQwId.orderByAsc("TM"); - selectQwId.last("limit 0,1"); selectQwId.eq("STCD", i.getSTCD()); - List stRealDataList = zgRainDataMapper.selectList(selectQwId); - if (CollUtil.isNotEmpty(stRealDataList)) { - ZgRainData entity = stRealDataList.get(0); - PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(entity); - if (null == model) { - return; - } else { - redis.set(name.concat(StringPool.COLON).concat(model.getSt()), entity); + selectQwId.gt("TM", redisObject.getTM()); + List nowList = zgRainDataMapper.selectList(selectQwId); + if (CollUtil.isNotEmpty(nowList)) { + for (ZgRainData zgRainData : nowList) { + redisObject = redis.get(key); + zgRainData.setPt(redisObject.getPt().add(zgRainData.getDRP())); + PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(zgRainData); + redis.set(key, zgRainData); //发送mq - log.info("发送数据:{},点位:{},具体数据:{}", name, model.getSt(), JSONObject.toJSONString(model)); + log.info("发送数据:{},点位:{},具体数据:{}", name, zgRainData.getSTCD(), JSONObject.toJSONString(model)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); } } - }); - } + } else { + QueryWrapper selectQwId = new QueryWrapper<>(); + selectQwId.orderByAsc("TM"); + selectQwId.eq("STCD", i.getSTCD()); + List nowList = zgRainDataMapper.selectList(selectQwId); + if (CollUtil.isNotEmpty(nowList)) { + for (ZgRainData zgRainData : nowList) { + redisObject = redis.get(key); + zgRainData.setPt(redisObject == null ? BigDecimal.ZERO.add(zgRainData.getDRP()) : redisObject.getPt().add(zgRainData.getDRP())); + PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(zgRainData); + redis.set(key, zgRainData); + //发送mq + log.info("发送数据:{},点位:{},具体数据:{}", name, zgRainData.getSTCD(), JSONObject.toJSONString(model)); + rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); + } + } + } + }); } catch (Exception e) { log.error("发生错误:{},e:{}", "紫光雨量数据", e.getStackTrace()); }