Newer
Older
newfiber-data-adapter / src / main / java / org / springnewfiber / dataadapter / xf / controller / MonitorDataController.java
package org.springnewfiber.dataadapter.xf.controller;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.HttpUtil;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.Maps;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiParam;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springnewfiber.dataadapter.config.BladeRedis;
import org.springnewfiber.dataadapter.config.R;
import org.springnewfiber.dataadapter.entity.MqNodeData;
import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel;
import org.springnewfiber.dataadapter.sswj.util.RealTimeSerializer;
import org.springnewfiber.dataadapter.xf.XfDataEnum;
import org.springnewfiber.dataadapter.xf.entity.*;
import org.springnewfiber.dataadapter.xf.enums.EDataAccessType;
import org.springnewfiber.dataadapter.xf.service.DataAccessRecordService;

import java.math.BigDecimal;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

/**
 * @program: newfiber-data-adapter
 * @description:
 * @author: djt
 * @create: 2022-06-14 20:44
 **/
@RestController
@RequiredArgsConstructor
//@PreAuth(AuthConstant.PERMISSION_ALL)
@RequestMapping("/monitorData")
@Api(value = "讯飞数据接受接口", tags = "讯飞数据接受接口")
@Slf4j
public class MonitorDataController {
    private final RabbitTemplate rabbitTemplate;
    private final BladeRedis redis;
    @Value("${business.hankouSaveUrl}")
    private String hankouSaveUrl;
    private final DataAccessRecordService dataAccessRecordService;

    @PostMapping("/xfInterface")
    public R xfInterface(@ApiParam(value = "数据json") @RequestParam("data") String data, @ApiParam(value = "数据类型") @RequestParam("xfBaseData") XfDataEnum xfDataEnum) {
        log.error("data:{},flag:{}", JSONObject.toJSONString(data), xfDataEnum.getRemark());
        dataAccessRecordService.save(EDataAccessType.IflytekRealtime, xfDataEnum.getCode(), data);
        List<? extends BaseXfInterfaceEntity> DTO = null;
        try {
            if (xfDataEnum == XfDataEnum.ChnlData) {
                DTO = JSONObject.parseArray(data, MonitorChnlDataDto.class);
            } else {
                if (xfDataEnum == XfDataEnum.ForcastData) {
                    List<MonitorForcastData> dataList = JSONObject.parseArray(data, MonitorForcastData.class);
                    dataList.forEach((i) -> {
//                        this.build(i);
                        log.info("cover:{}", JSONObject.toJSONString(i));
                        String request = HttpUtil.post(hankouSaveUrl, JSONObject.toJSONString(i), 30000);
                        log.info("发送汉口站数据:{}", request);
                    });
                    return R.status(true);
                }
                if (xfDataEnum == XfDataEnum.LakeData) {
                    DTO = JSONObject.parseArray(data, MonitorLakeData.class);
                } else if (xfDataEnum == XfDataEnum.Meteorological) {
                    DTO = JSONObject.parseArray(data, MonitorMeteorologicalData.class);
                    //  整点数据  特殊处理  这个整点减去上个整点。。算出p1
                    //2022-12-01 讯飞侧给入p1
//                    DTO.forEach(i -> {
//                        checkSetQxzP1((MonitorMeteorologicalData) i);
//                    });
                } else if (xfDataEnum == XfDataEnum.PptnData) {
                    DTO = JSONObject.parseArray(data, MonitorPptnData.class);
                    DTO.forEach(i -> {
                        checkSet((MonitorPptnData) i);
                    });
                } else if (xfDataEnum == XfDataEnum.PumpData) {
                    DTO = JSONObject.parseArray(data, MonitorPumpData.class);
                } else if (xfDataEnum == XfDataEnum.RiverData) {
                    DTO = JSONObject.parseArray(data, MonitorRiverData.class);
                } else if (xfDataEnum == XfDataEnum.SoilData) {
                    List<MonitorSoilData> monitorSoilDataList = JSONObject.parseArray(data, MonitorSoilData.class);
                    DTO = MonitorSoilData.coverDto(monitorSoilDataList);
                } else if (xfDataEnum == XfDataEnum.WasData) {
                    DTO = JSONObject.parseArray(data, MonitorWasData.class);
                } else if (xfDataEnum == XfDataEnum.WetlogData) {
                    DTO = JSONObject.parseArray(data, MonitorWetlogData.class);
                } else if (xfDataEnum == XfDataEnum.SewageData) {
                    DTO = JSONObject.parseArray(data, MonitorSewageData.class);
                } else if (xfDataEnum == XfDataEnum.WaterQualityData) {
                    DTO = JSONObject.parseArray(data, WaterQualityData.class);
                } else if (xfDataEnum == XfDataEnum.StSlwtRData) {
                    DTO = JSONObject.parseArray(data, MonitorOtherData.class);
                }else if (xfDataEnum == XfDataEnum.SurgeTankData) {
                    List<SurgeTankData> surgeTankDataList = JSONObject.parseArray(data, SurgeTankData.class);
                    DTO = surgeTankDataList.stream().map(SurgeTankDataDto::dataToDto).collect(Collectors.toList());
                } else if (xfDataEnum == XfDataEnum.DiverterWellData) {
                    List<DiverterWellData> diverterWellDataList = JSONObject.parseArray(data, DiverterWellData.class);
                    DTO = diverterWellDataList.stream().map(DiverterWellDataDto::dataToDto).collect(Collectors.toList());
                } else if(xfDataEnum == XfDataEnum.WhjkSzqData){
                    List<WhjkSzqDto> szqDtos = JSONObject.parseArray(data, WhjkSzqDto.class);
//                    List<? extends BaseXfInterfaceEntity> DTO1= szqDtos.stream().map(WhjkSzqDto::dataToDto).collect(Collectors.toList());
//                    DTO1.forEach(i->{
//                        log.info("source:{}", JSONObject.toJSONString(i));
//                        PtReceiveBaseModel model = RealTimeSerializer.xfObjectToRealMap(i);
//                        log.info("cover:{}", JSONObject.toJSONString(model));
//                    });
                    DTO= szqDtos.stream().map(WhjkSzqDto::dataToDto).collect(Collectors.toList());
                }else if(xfDataEnum == XfDataEnum.InwamntData){
                    List<WrInwamntRDataDto> surgeTankDataList = JSONObject.parseArray(data, WrInwamntRDataDto.class);
                    DTO = surgeTankDataList.stream().map(WrInwamntRData::dtoToDto).collect(Collectors.toList());
                }else if(xfDataEnum == XfDataEnum.OutwamntData){
                    List<WrOutwamntRDataDto> surgeTankDataList = JSONObject.parseArray(data, WrOutwamntRDataDto.class);
                    DTO = surgeTankDataList.stream().map(WrOutwamntRData::dtoToDto).collect(Collectors.toList());
                }
                else if(xfDataEnum == XfDataEnum.GwWsLData
                        ||xfDataEnum == XfDataEnum.GwWsQData
                ||xfDataEnum == XfDataEnum.GwYsLData
                ||xfDataEnum == XfDataEnum.GwYsQData
                ||xfDataEnum==XfDataEnum.GwYsSZData){
                   DTO = JSONObject.parseArray(data, WsLData.class);
//                    DTO = surgeTankDataList.stream().map(WrOutwamntRData::dtoToDto).collect(Collectors.toList());
                }
                else if(xfDataEnum == XfDataEnum.GqYlData) {
                	DTO = JSONObject.parseArray(data, GqYlDataDto.class);
                }
                else {
                    throw new RuntimeException("暂未开发");
                }
            }
            if (CollUtil.isNotEmpty(DTO)) {
                DTO.forEach((i) -> {
                    //十八家高闸外 市局编号 0461610871 转换成0461610050   -3米
                    if (StringUtils.equalsIgnoreCase(i.getStcd(), "0461610871")) {
                        MonitorChnlDataDto newI = (MonitorChnlDataDto) i;
                        MonitorChnlDataDto newDto = new MonitorChnlDataDto();
                        BeanUtil.copyProperties(newI, newDto);
                        //
                        newDto.setStcd("0461610050");
                        BigDecimal newZ = newDto.getZ().subtract(new BigDecimal(3)).setScale(2, BigDecimal.ROUND_DOWN);
                        newDto.setZ(newZ);
                        PtReceiveBaseModel newModel = RealTimeSerializer.xfObjectToRealMap(newDto);
                        log.error("0461610050:cover:{}", JSONObject.toJSONString(newModel));
                        rabbitTemplate.convertAndSend(JSONObject.toJSONString(newModel));
                    }
                    PtReceiveBaseModel model = RealTimeSerializer.xfObjectToRealMap(i);
                    log.info("cover:{}", JSONObject.toJSONString(model));
                    rabbitTemplate.convertAndSend(JSONObject.toJSONString(model));
                });
                return R.status(true);
            } else {
                log.error("数据转换错误:{}", JSONObject.toJSONString(DTO));
                return R.status(true);
            }
        } catch (Exception var5) {
            log.error("解析错误,:{}", var5.getStackTrace());
            throw new RuntimeException("类型转换错误");
        }
    }

    private boolean checkDateTT(MonitorMeteorologicalData data) {
        Date tt = data.getTm();
        String mmssDateStr = DateUtil.format(tt, "mmss");
        if (StrUtil.equalsIgnoreCase("0000", mmssDateStr)) {
            return true;
        }
        return false;
    }

    /**
     * 气象站内存计算一小时降雨量
     *
     * @param data
     */
    private void checkSetQxzP1(MonitorMeteorologicalData data) {
        if (!checkDateTT(data)) {
            return;
        }
        String st = data.getStcd();
        MonitorMeteorologicalData old;
        Date tm = data.getTm();
        data.setDrp(BigDecimal.ZERO);
        if (redis.exists(st) && (old = redis.get(st)) != null && tm.compareTo(old.getTm()) > 0) {
            if (DateUtil.offsetHour(old.getTm(), 1).compareTo(tm) == 0) {
                BigDecimal p1 = data.getPresum().subtract(old.getPresum());
                if (p1.compareTo(BigDecimal.ZERO) > -1) {
                    data.setDrp(p1);
                }
            }
            redis.set(data.getStcd(), data);
        } else if (!redis.exists(st)) {
            redis.set(data.getStcd(), data);
        }
    }

    public static void main(String[] args) {
        String dateStr = "2022-08-30 09:00:01";
        Date tt = DateUtil.parse(dateStr);
        String dateStr1 = "2022-08-30 10:00:00";
        Date ttup1 = DateUtil.parse(dateStr1);
//        String mmssDateStr = DateUtil.format(tt, "mmss");
//        if (StrUtil.equalsIgnoreCase("0000", mmssDateStr)) {
//            System.err.println(dateStr);
//        } else {
//            System.out.println(dateStr);
//        }
        Date tt1 = DateUtil.offsetHour(tt, 1);
        System.out.println(ttup1.compareTo(tt1));
    }

    private void checkSet(MonitorPptnData pptnData) {
        String st = pptnData.getStcd();
        Date tm = pptnData.getTm();
        MonitorPptnData old;
        if (redis.exists(st) && (old = redis.get(st)) != null && tm.compareTo(old.getTm()) > 0) {
            pptnData.setPt(old.getPt().add(pptnData.getDrp()));
            redis.set(pptnData.getStcd(), pptnData);
        } else if (!redis.exists(st)) {
            redis.set(pptnData.getStcd(), pptnData);
        }

    }

    private PtReceiveBaseModel build(MonitorForcastData DTO) {
        PtReceiveBaseModel model = new PtReceiveBaseModel();
        Map<String, MqNodeData> dataMap = Maps.newHashMap();
        MqNodeData dataQ = new MqNodeData();
        dataQ.setSn("q");
        dataQ.setKey("q");
        dataQ.setValue(DTO.getQ());
        MqNodeData dataZ = new MqNodeData();
        dataZ.setSn("z");
        dataZ.setKey("z");
        dataZ.setValue(DTO.getZ());
        MqNodeData dataYbt = new MqNodeData();
        dataYbt.setSn("ybt");
        dataYbt.setKey("ybt");
        dataYbt.setValue(DTO.getFocTime());
        dataMap.put("q", dataQ);
        dataMap.put("z", dataZ);
        dataMap.put("ybt", dataYbt);
        model.setDataMap(dataMap);
        model.setSt(DTO.getStcd());
        model.setTt(DTO.getPubTime());
        model.setUt(new Date());
        return model;
    }

    @PostMapping("/xfInterface/single")
    public R xfInterfaceSingle(@ApiParam(value = "数据json") @RequestParam("data") String data, @ApiParam(value = "数据类型") @RequestParam("xfBaseData") XfDataEnum xfDataEnum) {
        log.error("data:{},flag:{}", JSONObject.toJSONString(data), xfDataEnum.getRemark());
        dataAccessRecordService.save(EDataAccessType.IflytekRealtime, xfDataEnum.getCode(), data);
        PtReceiveBaseModel model = new PtReceiveBaseModel();
        try {
            if (xfDataEnum == XfDataEnum.ChnlData) {
                MonitorChnlDataDto DTO = JSONObject.parseObject(data, MonitorChnlDataDto.class);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.ForcastData) {
                MonitorForcastData DTO = JSONObject.parseObject(data, MonitorForcastData.class);
                log.info("cover:{}", JSONObject.toJSONString(DTO));
                String request = HttpUtil.post(hankouSaveUrl, JSONObject.toJSONString(DTO), 30000);
                log.info("发送汉口站数据:{}", request);
                return R.status(true);
            } else if (xfDataEnum == XfDataEnum.LakeData) {
                MonitorLakeData DTO = JSONObject.parseObject(data, MonitorLakeData.class);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.Meteorological) {
                MonitorMeteorologicalData DTO = JSONObject.parseObject(data, MonitorMeteorologicalData.class);
//                checkSetQxzP1(DTO);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.PptnData) {
                MonitorPptnData DTO = JSONObject.parseObject(data, MonitorPptnData.class);
                checkSet((MonitorPptnData) DTO);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.PumpData) {
                MonitorPumpData DTO = JSONObject.parseObject(data, MonitorPumpData.class);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.RiverData) {
                MonitorRiverData DTO = JSONObject.parseObject(data, MonitorRiverData.class);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.SoilData) {
                MonitorSoilData DTO = JSONObject.parseObject(data, MonitorSoilData.class);//todo
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.WasData) {
                MonitorWasData DTO = JSONObject.parseObject(data, MonitorWasData.class);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.WetlogData) {
                MonitorWetlogData DTO = JSONObject.parseObject(data, MonitorWetlogData.class);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.SewageData) {
                MonitorSewageData DTO = JSONObject.parseObject(data, MonitorSewageData.class);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.WaterQualityData) {
                WaterQualityData DTO = JSONObject.parseObject(data, WaterQualityData.class);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.StSlwtRData) {
                MonitorOtherData DTO = JSONObject.parseObject(data, MonitorOtherData.class);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
            } else if (xfDataEnum == XfDataEnum.SurgeTankData) {
                SurgeTankData surgeTankData = JSONObject.parseObject(data, SurgeTankData.class);
                model = RealTimeSerializer.xfObjectToRealMap(SurgeTankDataDto.dataToDto(surgeTankData));
            } else if (xfDataEnum == XfDataEnum.DiverterWellData) {
                DiverterWellData diverterWellData = JSONObject.parseObject(data, DiverterWellData.class);
                model = RealTimeSerializer.xfObjectToRealMap(DiverterWellDataDto.dataToDto(diverterWellData));
            } else if(xfDataEnum == XfDataEnum.WhjkSzqData){
                WhjkSzqDto szqDto = JSONObject.parseObject(data, WhjkSzqDto.class);
                model = RealTimeSerializer.xfObjectToRealMap(WhjkSzqDto.dataToDto(szqDto));
            }else if(xfDataEnum == XfDataEnum.InwamntData){
                WrInwamntRDataDto surgeTankData = JSONObject.parseObject(data, WrInwamntRDataDto.class);
                model = RealTimeSerializer.xfObjectToRealMap(WrInwamntRData.dtoToDto(surgeTankData));
            }else if(xfDataEnum == XfDataEnum.OutwamntData){
                WrOutwamntRDataDto surgeTankData = JSONObject.parseObject(data, WrOutwamntRDataDto.class);
                model = RealTimeSerializer.xfObjectToRealMap(WrOutwamntRData.dtoToDto(surgeTankData));
            }
            else if(xfDataEnum == XfDataEnum.GwWsLData
                    ||xfDataEnum == XfDataEnum.GwWsQData
                    ||xfDataEnum == XfDataEnum.GwYsLData
                    ||xfDataEnum == XfDataEnum.GwYsQData
                    ||xfDataEnum==XfDataEnum.GwYsSZData){
                WsLData DTO = JSONObject.parseObject(data, WsLData.class);
                model = RealTimeSerializer.xfObjectToRealMap(DTO);
//                    DTO = surgeTankDataList.stream().map(WrOutwamntRData::dtoToDto).collect(Collectors.toList());
            }
            else if(xfDataEnum == XfDataEnum.GqYlData) {
            	GqYlDataDto dto = JSONObject.parseObject(data, GqYlDataDto.class);
            	model = RealTimeSerializer.xfObjectToRealMap(dto);
            }
            else {
                throw new RuntimeException("暂未开发");
            }
            log.info("cover:{}", JSONObject.toJSONString(model));
            if (CollUtil.isEmpty(model.getDataMap())) {
                log.error("数据转换错误:{}", JSONObject.toJSONString(model));
                return R.status(true);
            } else {
                rabbitTemplate.convertAndSend(JSONObject.toJSONString(model));
                return R.status(true);
            }
        } catch (Exception var5) {
            log.error("解析错误,:{}", var5.getStackTrace());
            throw new RuntimeException("类型转换错误");
        }
    }
}