package org.springnewfiber.dataadapter.sswj.core; import cn.hutool.core.collection.CollUtil; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.toolkit.StringPool; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import org.springnewfiber.dataadapter.config.BladeRedis; import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel; import org.springnewfiber.dataadapter.sswj.entity.*; import org.springnewfiber.dataadapter.sswj.service.impl.*; import org.springnewfiber.dataadapter.sswj.util.RealTimeSerializer; import java.util.List; /** * @program: newfiber-data-adapter * @description: * @author: djt * @create: 2022-04-21 19:06 **/ @Component @Slf4j @AllArgsConstructor public class SswjCoreJob { private final BladeRedis redis; private final RabbitTemplate rabbitTemplate; private final BzLiveServiceImpl bzLiveService; private final SqGqRServiceImpl sqGqRService; private final SqHdSrServiceImpl sqHdSrService; private final SqHpRServiceImpl sqHpRService; private final SqHzRServiceImpl sqHzRService; private final SqSkRServiceImpl sqSkRService; private final SqZsRServiceImpl sqZsRService; private final SzGssydServiceImpl szGssydService; private final SzyRServiceImpl szyRService; private final TrsqTjhsRServiceImpl trsqTjhsRService; // @Scheduled(cron = "0 0/1 * * * ?") public void selectSwj() { checkHasRealDate(BzLive.class, bzLiveService); checkHasRealDate(SqGqR.class, sqGqRService); checkHasRealDate(SqHdSr.class, sqHdSrService); checkHasRealDate(SqHpR.class, sqHpRService); checkHasRealDate(SqHzR.class, sqHzRService); checkHasRealDate(SqSkR.class, sqSkRService); checkHasRealDate(SqZsR.class, sqZsRService); checkHasRealDate(SzGssyd.class, szGssydService); checkHasRealDate(SzyR.class, szyRService); checkHasRealDate(TrsqTjhsR.class, trsqTjhsRService); } private void checkHasRealDate(Class clazz, ServiceImpl service) { try { String name = clazz.getSimpleName(); if (CollUtil.isNotEmpty(redis.keys(name.concat(StringPool.COLON).concat(StringPool.ASTERISK)))) { //已经有 QueryWrapper<BaseSwjEntity> qw = new QueryWrapper<>(); qw.groupBy("DATA_UP_UUID"); qw.select("DATA_UP_UUID"); //第一次初始化执行 List<BaseSwjEntity> list = service.list(qw); list.forEach(i -> { String key = name.concat(StringPool.COLON).concat(i.getDataUpUuid()); BaseSwjEntity redisObject; if (redis.exists(key) && (redisObject = redis.get(key)) != null && redisObject.getDataUpTime() != null) { QueryWrapper<BaseSwjEntity> selectQwId = new QueryWrapper<>(); selectQwId.orderByAsc("DATA_UP_TIME"); selectQwId.eq("DATA_UP_UUID",redisObject.getDataUpUuid()); selectQwId.gt("DATA_UP_TIME",redisObject.getDataUpTime()); // List<BaseSwjEntity> nowList = service.list(Wrappers.<BaseSwjEntity>lambdaQuery().eq(BaseSwjEntity::getDataUpUuid, redisObject.getDataUpUuid()).gt(BaseSwjEntity::getDataUpTime, redisObject.getDataUpTime()).orderByAsc(BaseSwjEntity::getDataUpTime)); List<BaseSwjEntity> nowList = service.list(selectQwId); if (CollUtil.isNotEmpty(nowList)) { nowList.forEach(now -> { PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(now); redis.set(key, now); //发送mq log.info("发送数据:{},点位:{},具体数据:{}", name, now.getDataUpUuid(), JSONObject.toJSONString(model)); // rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); }); } } }); } else { QueryWrapper<BaseSwjEntity> qw = new QueryWrapper<>(); qw.groupBy("DATA_UP_UUID"); qw.select("DATA_UP_UUID"); //第一次初始化执行 List<BaseSwjEntity> list = service.list(qw); list.forEach(i -> { QueryWrapper<BaseSwjEntity> selectQwId = new QueryWrapper<>(); selectQwId.orderByDesc("DATA_UP_TIME"); selectQwId.last("limit 0,1"); selectQwId.eq("DATA_UP_UUID",i.getDataUpUuid()); List<BaseSwjEntity> stRealDataList = service.list(selectQwId); if (CollUtil.isNotEmpty(stRealDataList)) { BaseSwjEntity entity = stRealDataList.get(0); PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(entity); if (null == model) { return; } else { redis.set(name.concat(StringPool.COLON).concat(model.getSt()), entity); //发送mq log.info("发送数据:{},点位:{},具体数据:{}", name, model.getSt(), JSONObject.toJSONString(model)); // rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); } } }); } } catch (Exception e) { log.error("发生错误:{},e:{}", clazz.getSimpleName(), e.getStackTrace()); } } }