package org.springnewfiber.dataadapter.sswj.core; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.toolkit.StringPool; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springnewfiber.dataadapter.config.BladeRedis; import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel; import org.springnewfiber.dataadapter.sswj.entity.*; import org.springnewfiber.dataadapter.sswj.mapper.ZgRainDataMapper; import org.springnewfiber.dataadapter.sswj.service.impl.*; import org.springnewfiber.dataadapter.sswj.util.RealTimeSerializer; import java.math.BigDecimal; import java.util.List; /** * @program: newfiber-data-adapter * @description: * @author: djt * @create: 2022-04-21 19:06 **/ @Component @Slf4j @AllArgsConstructor public class SswjCoreJob { private final BladeRedis redis; private final RabbitTemplate rabbitTemplate; private final BzLiveServiceImpl bzLiveService; private final SqGqRServiceImpl sqGqRService; private final SqHdSrServiceImpl sqHdSrService; private final SqHpRServiceImpl sqHpRService; private final SqHzRServiceImpl sqHzRService; private final SqSkRServiceImpl sqSkRService; private final SqZsRServiceImpl sqZsRService; private final SzGssydServiceImpl szGssydService; private final SzyRServiceImpl szyRService; private final TrsqTjhsRServiceImpl trsqTjhsRService; private final ZgRainDataMapper zgRainDataMapper; @Scheduled(cron = "0 0/1 * * * ?") public void selectSwj() { checkHasRealDate(new BzLive(), bzLiveService); checkHasRealDate(new SqGqR(), sqGqRService); checkHasRealDate(new SqHdSr(), sqHdSrService); checkHasRealDate(new SqHpR(), sqHpRService); checkHasRealDate(new SqHzR(), sqHzRService); checkHasRealDate(new SqSkR(), sqSkRService); checkHasRealDate(new SqZsR(), sqZsRService); checkHasRealDate(new SzGssyd(), szGssydService); checkHasRealDate(new SzyR(), szyRService); checkHasRealDate(new TrsqTjhsR(), trsqTjhsRService); zgRainData(); } private void checkHasRealDate(BaseSwjEntity baseSwjEntity, ServiceImpl service) { try { Class clazz = baseSwjEntity.getClass(); String name = clazz.getSimpleName(); if (CollUtil.isNotEmpty(redis.keys(name.concat(StringPool.COLON).concat(StringPool.ASTERISK)))) { //已经有 QueryWrapper<BaseSwjEntity> qw = new QueryWrapper<>(); qw.groupBy("DATA_UP_UUID"); qw.select("DATA_UP_UUID"); //第一次初始化执行 List<BaseSwjEntity> list = service.list(qw); list.forEach(i -> { String key = name.concat(StringPool.COLON).concat(i.getDataUpUuid()); BaseSwjEntity redisObject; if (redis.exists(key) && (redisObject = redis.get(key)) != null && redisObject.getTTDate() != null) { QueryWrapper<BaseSwjEntity> selectQwId = new QueryWrapper<>(); selectQwId.orderByAsc(baseSwjEntity.getTTFileName()); selectQwId.eq("DATA_UP_UUID", redisObject.getDataUpUuid()); selectQwId.gt(baseSwjEntity.getTTFileName(), redisObject.getTTDate()); List<BaseSwjEntity> nowList = service.list(selectQwId); if (CollUtil.isNotEmpty(nowList)) { nowList.forEach(now -> { PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(now); redis.set(key, now); //发送mq log.info("发送数据:{},点位:{},具体数据:{}", name, now.getDataUpUuid(), JSONObject.toJSONString(model)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); }); } }else { QueryWrapper<BaseSwjEntity> selectQwId = new QueryWrapper<>(); selectQwId.orderByAsc(baseSwjEntity.getTTFileName()); selectQwId.eq("DATA_UP_UUID", i); List<BaseSwjEntity> nowList = service.list(selectQwId); if (CollUtil.isNotEmpty(nowList)) { nowList.forEach(now -> { PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(now); redis.set(key, now); //发送mq log.info("发送数据:{},点位:{},具体数据:{}", name, now.getDataUpUuid(), JSONObject.toJSONString(model)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); }); } } }); } else { QueryWrapper<BaseSwjEntity> qw = new QueryWrapper<>(); qw.groupBy("DATA_UP_UUID"); qw.select("DATA_UP_UUID"); //第一次初始化执行 List<BaseSwjEntity> list = service.list(qw); list.forEach(i -> { QueryWrapper<BaseSwjEntity> selectQwId = new QueryWrapper<>(); selectQwId.orderByDesc(baseSwjEntity.getTTFileName()); selectQwId.last("limit 0,1"); selectQwId.eq("DATA_UP_UUID", i.getDataUpUuid()); List<BaseSwjEntity> stRealDataList = service.list(selectQwId); if (CollUtil.isNotEmpty(stRealDataList)) { BaseSwjEntity entity = stRealDataList.get(0); PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(entity); if (null == model) { return; } else { redis.set(name.concat(StringPool.COLON).concat(model.getSt()), entity); //发送mq log.info("发送数据:{},点位:{},具体数据:{}", name, model.getSt(), JSONObject.toJSONString(model)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); } } }); } } catch (Exception e) { log.error("发生错误:{},e:{}", baseSwjEntity.getClass().getSimpleName(), e.getStackTrace()); } } private void zgRainData() { try { String name = ZgRainData.class.getSimpleName(); if (CollUtil.isNotEmpty(redis.keys(name.concat(StringPool.COLON).concat(StringPool.ASTERISK)))) { //已经有 QueryWrapper<ZgRainData> qw = new QueryWrapper<>(); qw.groupBy("STCD"); qw.select("STCD"); //第一次初始化执行 List<ZgRainData> list = zgRainDataMapper.selectList(qw); list.forEach(i -> { String key = name.concat(StringPool.COLON).concat(i.getSTCD()); ZgRainData redisObject; if ((redis.exists(key) && (redisObject = redis.get(key)) != null && redisObject.getTM() != null)) { QueryWrapper<ZgRainData> selectQwId = new QueryWrapper<>(); selectQwId.orderByAsc("TM"); selectQwId.eq("STCD", i.getSTCD()); selectQwId.gt("TM", redisObject.getTM()); List<ZgRainData> nowList = zgRainDataMapper.selectList(selectQwId); if (CollUtil.isNotEmpty(nowList)) { for (ZgRainData zgRainData : nowList) { redisObject = redis.get(key); zgRainData.setPt(redisObject.getPt().add(zgRainData.getDRP())); PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(zgRainData); redis.set(key, zgRainData); //发送mq log.info("发送数据:{},点位:{},具体数据:{}", name, zgRainData.getSTCD(), JSONObject.toJSONString(model)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); } } } else { QueryWrapper<ZgRainData> selectQwId = new QueryWrapper<>(); selectQwId.orderByAsc("TM"); selectQwId.eq("STCD", i.getSTCD()); List<ZgRainData> nowList = zgRainDataMapper.selectList(selectQwId); if (CollUtil.isNotEmpty(nowList)) { for (ZgRainData zgRainData : nowList) { redisObject = redis.get(key); zgRainData.setPt(redisObject==null? BigDecimal.ZERO.add(zgRainData.getDRP()):redisObject.getPt().add(zgRainData.getDRP())); PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(zgRainData); redis.set(key, zgRainData); //发送mq log.info("发送数据:{},点位:{},具体数据:{}", name, zgRainData.getSTCD(), JSONObject.toJSONString(model)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); } } } }); } else { QueryWrapper<ZgRainData> qw = new QueryWrapper<>(); qw.groupBy("STCD"); qw.select("STCD"); //第一次初始化执行 List<ZgRainData> list = zgRainDataMapper.selectList(qw); list.forEach(i -> { QueryWrapper<ZgRainData> selectQwId = new QueryWrapper<>(); selectQwId.orderByAsc("TM"); selectQwId.last("limit 0,1"); selectQwId.eq("STCD", i.getSTCD()); List<ZgRainData> stRealDataList = zgRainDataMapper.selectList(selectQwId); if (CollUtil.isNotEmpty(stRealDataList)) { ZgRainData entity = stRealDataList.get(0); PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(entity); if (null == model) { return; } else { redis.set(name.concat(StringPool.COLON).concat(model.getSt()), entity); //发送mq log.info("发送数据:{},点位:{},具体数据:{}", name, model.getSt(), JSONObject.toJSONString(model)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); } } }); } } catch (Exception e) { log.error("发生错误:{},e:{}", "紫光雨量数据", e.getStackTrace()); } } }