Newer
Older
newfiber-data-adapter / src / main / java / org / springnewfiber / dataadapter / sswj / core / SswjCoreJob.java
package org.springnewfiber.dataadapter.sswj.core;

import cn.hutool.core.collection.CollUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.StringPool;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springnewfiber.dataadapter.config.BladeRedis;
import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel;
import org.springnewfiber.dataadapter.sswj.entity.*;
import org.springnewfiber.dataadapter.sswj.service.impl.*;
import org.springnewfiber.dataadapter.sswj.util.RealTimeSerializer;

import java.util.List;

/**
 * @program: newfiber-data-adapter
 * @description:
 * @author: djt
 * @create: 2022-04-21 19:06
 **/
@Component
@Slf4j
@AllArgsConstructor
public class SswjCoreJob {
    private final BladeRedis redis;
    private final RabbitTemplate rabbitTemplate;
    private final BzLiveServiceImpl bzLiveService;
    private final SqGqRServiceImpl sqGqRService;
    private final SqHdSrServiceImpl sqHdSrService;
    private final SqHpRServiceImpl sqHpRService;
    private final SqHzRServiceImpl sqHzRService;
    private final SqSkRServiceImpl sqSkRService;
    private final SqZsRServiceImpl sqZsRService;
    private final SzGssydServiceImpl szGssydService;
    private final SzyRServiceImpl szyRService;
    private final TrsqTjhsRServiceImpl trsqTjhsRService;

//    @Scheduled(cron = "0 0/1 * * * ?")
    public void selectSwj() {
        checkHasRealDate(BzLive.class, bzLiveService);
        checkHasRealDate(SqGqR.class, sqGqRService);
        checkHasRealDate(SqHdSr.class, sqHdSrService);
        checkHasRealDate(SqHpR.class, sqHpRService);
        checkHasRealDate(SqHzR.class, sqHzRService);
        checkHasRealDate(SqSkR.class, sqSkRService);
        checkHasRealDate(SqZsR.class, sqZsRService);
        checkHasRealDate(SzGssyd.class, szGssydService);
        checkHasRealDate(SzyR.class, szyRService);
        checkHasRealDate(TrsqTjhsR.class, trsqTjhsRService);
    }

    private void checkHasRealDate(Class clazz, ServiceImpl service) {
        try {
            String name = clazz.getSimpleName();
            if (CollUtil.isNotEmpty(redis.keys(name.concat(StringPool.COLON).concat(StringPool.ASTERISK)))) {
                //已经有
                QueryWrapper<BaseSwjEntity> qw = new QueryWrapper<>();
                qw.groupBy("DATA_UP_UUID");
                qw.select("DATA_UP_UUID");
                //第一次初始化执行
                List<BaseSwjEntity> list = service.list(qw);
                list.forEach(i -> {
                    String key = name.concat(StringPool.COLON).concat(i.getDataUpUuid());
                    BaseSwjEntity redisObject;
                    if (redis.exists(key) && (redisObject = redis.get(key)) != null && redisObject.getDataUpTime() != null) {
                        QueryWrapper<BaseSwjEntity> selectQwId = new QueryWrapper<>();
                        selectQwId.orderByAsc("DATA_UP_TIME");
                        selectQwId.eq("DATA_UP_UUID",redisObject.getDataUpUuid());
                        selectQwId.gt("DATA_UP_TIME",redisObject.getDataUpTime());
//                        List<BaseSwjEntity> nowList = service.list(Wrappers.<BaseSwjEntity>lambdaQuery().eq(BaseSwjEntity::getDataUpUuid, redisObject.getDataUpUuid()).gt(BaseSwjEntity::getDataUpTime, redisObject.getDataUpTime()).orderByAsc(BaseSwjEntity::getDataUpTime));
                        List<BaseSwjEntity> nowList = service.list(selectQwId);
                        if (CollUtil.isNotEmpty(nowList)) {
                            nowList.forEach(now -> {
                                PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(now);
                                redis.set(key, now);
                                //发送mq
                                log.info("发送数据:{},点位:{},具体数据:{}", name, now.getDataUpUuid(), JSONObject.toJSONString(model));
//                                rabbitTemplate.convertAndSend(JSONObject.toJSONString(model));
                            });
                        }
                    }
                });
            } else {
                QueryWrapper<BaseSwjEntity> qw = new QueryWrapper<>();
                qw.groupBy("DATA_UP_UUID");
                qw.select("DATA_UP_UUID");
                //第一次初始化执行
                List<BaseSwjEntity> list = service.list(qw);
                list.forEach(i -> {
                    QueryWrapper<BaseSwjEntity> selectQwId = new QueryWrapper<>();
                    selectQwId.orderByDesc("DATA_UP_TIME");
                    selectQwId.last("limit 0,1");
                    selectQwId.eq("DATA_UP_UUID",i.getDataUpUuid());
                    List<BaseSwjEntity> stRealDataList = service.list(selectQwId);
                    if (CollUtil.isNotEmpty(stRealDataList)) {
                        BaseSwjEntity entity = stRealDataList.get(0);
                        PtReceiveBaseModel model = RealTimeSerializer.objectToRealMap(entity);
                        if (null == model) {
                            return;
                        } else {
                            redis.set(name.concat(StringPool.COLON).concat(model.getSt()), entity);
                            //发送mq
                            log.info("发送数据:{},点位:{},具体数据:{}", name, model.getSt(), JSONObject.toJSONString(model));
//                            rabbitTemplate.convertAndSend(JSONObject.toJSONString(model));
                        }
                    }
                });
            }
        } catch (Exception e) {
            log.error("发生错误:{},e:{}", clazz.getSimpleName(), e.getStackTrace());
        }

    }
}