package org.springnewfiber.dataadapter.xf.controller; import cn.hutool.core.bean.BeanUtil; import cn.hutool.core.collection.CollUtil; import cn.hutool.core.date.DateUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.http.HttpUtil; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Maps; import io.swagger.annotations.Api; import io.swagger.annotations.ApiParam; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springnewfiber.dataadapter.config.BladeRedis; import org.springnewfiber.dataadapter.config.R; import org.springnewfiber.dataadapter.entity.MqNodeData; import org.springnewfiber.dataadapter.entity.PtReceiveBaseModel; import org.springnewfiber.dataadapter.sswj.util.RealTimeSerializer; import org.springnewfiber.dataadapter.xf.XfDataEnum; import org.springnewfiber.dataadapter.xf.entity.*; import org.springnewfiber.dataadapter.xf.enums.EDataAccessType; import org.springnewfiber.dataadapter.xf.service.DataAccessRecordService; import java.math.BigDecimal; import java.util.Date; import java.util.List; import java.util.Map; import java.util.concurrent.ThreadPoolExecutor; import java.util.stream.Collectors; /** * @program: newfiber-data-adapter * @description: * @author: djt * @create: 2022-06-14 20:44 **/ @RestController @RequiredArgsConstructor //@PreAuth(AuthConstant.PERMISSION_ALL) @RequestMapping("/monitorData") @Api(value = "讯飞数据接受接口", tags = "讯飞数据接受接口") @Slf4j public class MonitorDataController { private final RabbitTemplate rabbitTemplate; private final BladeRedis redis; @Value("${business.hankouSaveUrl}") private String hankouSaveUrl; private final DataAccessRecordService dataAccessRecordService; @PostMapping("/xfInterface") public R xfInterface(@ApiParam(value = "数据json") @RequestParam("data") String data, @ApiParam(value = "数据类型") @RequestParam("xfBaseData") XfDataEnum xfDataEnum) { log.error("data:{},flag:{}", JSONObject.toJSONString(data), xfDataEnum.getRemark()); dataAccessRecordService.save(EDataAccessType.IflytekRealtime, xfDataEnum.getCode(), data); List<? extends BaseXfInterfaceEntity> DTO = null; try { if (xfDataEnum == XfDataEnum.ChnlData) { DTO = JSONObject.parseArray(data, MonitorChnlDataDto.class); } else { if (xfDataEnum == XfDataEnum.ForcastData) { List<MonitorForcastData> dataList = JSONObject.parseArray(data, MonitorForcastData.class); dataList.forEach((i) -> { // this.build(i); log.info("cover:{}", JSONObject.toJSONString(i)); String request = HttpUtil.post(hankouSaveUrl, JSONObject.toJSONString(i), 30000); log.info("发送汉口站数据:{}", request); }); return R.status(true); } if (xfDataEnum == XfDataEnum.LakeData) { DTO = JSONObject.parseArray(data, MonitorLakeData.class); } else if (xfDataEnum == XfDataEnum.Meteorological) { DTO = JSONObject.parseArray(data, MonitorMeteorologicalData.class); // 整点数据 特殊处理 这个整点减去上个整点。。算出p1 //2022-12-01 讯飞侧给入p1 // DTO.forEach(i -> { // checkSetQxzP1((MonitorMeteorologicalData) i); // }); } else if (xfDataEnum == XfDataEnum.PptnData) { DTO = JSONObject.parseArray(data, MonitorPptnData.class); DTO.forEach(i -> { checkSet((MonitorPptnData) i); }); } else if (xfDataEnum == XfDataEnum.PumpData) { DTO = JSONObject.parseArray(data, MonitorPumpData.class); } else if (xfDataEnum == XfDataEnum.RiverData) { DTO = JSONObject.parseArray(data, MonitorRiverData.class); } else if (xfDataEnum == XfDataEnum.SoilData) { List<MonitorSoilData> monitorSoilDataList = JSONObject.parseArray(data, MonitorSoilData.class); DTO = MonitorSoilData.coverDto(monitorSoilDataList); } else if (xfDataEnum == XfDataEnum.WasData) { DTO = JSONObject.parseArray(data, MonitorWasData.class); } else if (xfDataEnum == XfDataEnum.WetlogData) { DTO = JSONObject.parseArray(data, MonitorWetlogData.class); } else if (xfDataEnum == XfDataEnum.SewageData) { DTO = JSONObject.parseArray(data, MonitorSewageData.class); } else if (xfDataEnum == XfDataEnum.WaterQualityData) { DTO = JSONObject.parseArray(data, WaterQualityData.class); } else if (xfDataEnum == XfDataEnum.StSlwtRData) { DTO = JSONObject.parseArray(data, MonitorOtherData.class); }else if (xfDataEnum == XfDataEnum.SurgeTankData) { List<SurgeTankData> surgeTankDataList = JSONObject.parseArray(data, SurgeTankData.class); DTO = surgeTankDataList.stream().map(SurgeTankDataDto::dataToDto).collect(Collectors.toList()); } else if (xfDataEnum == XfDataEnum.DiverterWellData) { List<DiverterWellData> diverterWellDataList = JSONObject.parseArray(data, DiverterWellData.class); DTO = diverterWellDataList.stream().map(DiverterWellDataDto::dataToDto).collect(Collectors.toList()); } else if(xfDataEnum == XfDataEnum.WhjkSzqData){ List<WhjkSzqDto> szqDtos = JSONObject.parseArray(data, WhjkSzqDto.class); // List<? extends BaseXfInterfaceEntity> DTO1= szqDtos.stream().map(WhjkSzqDto::dataToDto).collect(Collectors.toList()); // DTO1.forEach(i->{ // log.info("source:{}", JSONObject.toJSONString(i)); // PtReceiveBaseModel model = RealTimeSerializer.xfObjectToRealMap(i); // log.info("cover:{}", JSONObject.toJSONString(model)); // }); DTO= szqDtos.stream().map(WhjkSzqDto::dataToDto).collect(Collectors.toList()); }else if(xfDataEnum == XfDataEnum.InwamntData){ List<WrInwamntRDataDto> surgeTankDataList = JSONObject.parseArray(data, WrInwamntRDataDto.class); DTO = surgeTankDataList.stream().map(WrInwamntRData::dtoToDto).collect(Collectors.toList()); }else if(xfDataEnum == XfDataEnum.OutwamntData){ List<WrOutwamntRDataDto> surgeTankDataList = JSONObject.parseArray(data, WrOutwamntRDataDto.class); DTO = surgeTankDataList.stream().map(WrOutwamntRData::dtoToDto).collect(Collectors.toList()); } else if(xfDataEnum == XfDataEnum.GwWsLData ||xfDataEnum == XfDataEnum.GwWsQData ||xfDataEnum == XfDataEnum.GwYsLData ||xfDataEnum == XfDataEnum.GwYsQData ||xfDataEnum==XfDataEnum.GwYsSZData){ DTO = JSONObject.parseArray(data, WsLData.class); // DTO = surgeTankDataList.stream().map(WrOutwamntRData::dtoToDto).collect(Collectors.toList()); } else if(xfDataEnum == XfDataEnum.GqYlData) { DTO = JSONObject.parseArray(data, GqYlDataDto.class); } else { throw new RuntimeException("暂未开发"); } } if (CollUtil.isNotEmpty(DTO)) { DTO.forEach((i) -> { //十八家高闸外 市局编号 0461610871 转换成0461610050 -3米 if (StringUtils.equalsIgnoreCase(i.getStcd(), "0461610871")) { MonitorChnlDataDto newI = (MonitorChnlDataDto) i; MonitorChnlDataDto newDto = new MonitorChnlDataDto(); BeanUtil.copyProperties(newI, newDto); // newDto.setStcd("0461610050"); BigDecimal newZ = newDto.getZ().subtract(new BigDecimal(3)).setScale(2, BigDecimal.ROUND_DOWN); newDto.setZ(newZ); PtReceiveBaseModel newModel = RealTimeSerializer.xfObjectToRealMap(newDto); log.error("0461610050:cover:{}", JSONObject.toJSONString(newModel)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(newModel)); } PtReceiveBaseModel model = RealTimeSerializer.xfObjectToRealMap(i); log.info("cover:{}", JSONObject.toJSONString(model)); rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); }); return R.status(true); } else { log.error("数据转换错误:{}", JSONObject.toJSONString(DTO)); return R.status(true); } } catch (Exception var5) { log.error("解析错误,:{}", var5.getStackTrace()); throw new RuntimeException("类型转换错误"); } } private boolean checkDateTT(MonitorMeteorologicalData data) { Date tt = data.getTm(); String mmssDateStr = DateUtil.format(tt, "mmss"); if (StrUtil.equalsIgnoreCase("0000", mmssDateStr)) { return true; } return false; } /** * 气象站内存计算一小时降雨量 * * @param data */ private void checkSetQxzP1(MonitorMeteorologicalData data) { if (!checkDateTT(data)) { return; } String st = data.getStcd(); MonitorMeteorologicalData old; Date tm = data.getTm(); data.setDrp(BigDecimal.ZERO); if (redis.exists(st) && (old = redis.get(st)) != null && tm.compareTo(old.getTm()) > 0) { if (DateUtil.offsetHour(old.getTm(), 1).compareTo(tm) == 0) { BigDecimal p1 = data.getPresum().subtract(old.getPresum()); if (p1.compareTo(BigDecimal.ZERO) > -1) { data.setDrp(p1); } } redis.set(data.getStcd(), data); } else if (!redis.exists(st)) { redis.set(data.getStcd(), data); } } public static void main(String[] args) { String dateStr = "2022-08-30 09:00:01"; Date tt = DateUtil.parse(dateStr); String dateStr1 = "2022-08-30 10:00:00"; Date ttup1 = DateUtil.parse(dateStr1); // String mmssDateStr = DateUtil.format(tt, "mmss"); // if (StrUtil.equalsIgnoreCase("0000", mmssDateStr)) { // System.err.println(dateStr); // } else { // System.out.println(dateStr); // } Date tt1 = DateUtil.offsetHour(tt, 1); System.out.println(ttup1.compareTo(tt1)); } private void checkSet(MonitorPptnData pptnData) { String st = pptnData.getStcd(); Date tm = pptnData.getTm(); MonitorPptnData old; if (redis.exists(st) && (old = redis.get(st)) != null && tm.compareTo(old.getTm()) > 0) { pptnData.setPt(old.getPt().add(pptnData.getDrp())); redis.set(pptnData.getStcd(), pptnData); } else if (!redis.exists(st)) { redis.set(pptnData.getStcd(), pptnData); } } private PtReceiveBaseModel build(MonitorForcastData DTO) { PtReceiveBaseModel model = new PtReceiveBaseModel(); Map<String, MqNodeData> dataMap = Maps.newHashMap(); MqNodeData dataQ = new MqNodeData(); dataQ.setSn("q"); dataQ.setKey("q"); dataQ.setValue(DTO.getQ()); MqNodeData dataZ = new MqNodeData(); dataZ.setSn("z"); dataZ.setKey("z"); dataZ.setValue(DTO.getZ()); MqNodeData dataYbt = new MqNodeData(); dataYbt.setSn("ybt"); dataYbt.setKey("ybt"); dataYbt.setValue(DTO.getFocTime()); dataMap.put("q", dataQ); dataMap.put("z", dataZ); dataMap.put("ybt", dataYbt); model.setDataMap(dataMap); model.setSt(DTO.getStcd()); model.setTt(DTO.getPubTime()); model.setUt(new Date()); return model; } @PostMapping("/xfInterface/single") public R xfInterfaceSingle(@ApiParam(value = "数据json") @RequestParam("data") String data, @ApiParam(value = "数据类型") @RequestParam("xfBaseData") XfDataEnum xfDataEnum) { log.error("data:{},flag:{}", JSONObject.toJSONString(data), xfDataEnum.getRemark()); dataAccessRecordService.save(EDataAccessType.IflytekRealtime, xfDataEnum.getCode(), data); PtReceiveBaseModel model = new PtReceiveBaseModel(); try { if (xfDataEnum == XfDataEnum.ChnlData) { MonitorChnlDataDto DTO = JSONObject.parseObject(data, MonitorChnlDataDto.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.ForcastData) { MonitorForcastData DTO = JSONObject.parseObject(data, MonitorForcastData.class); log.info("cover:{}", JSONObject.toJSONString(DTO)); String request = HttpUtil.post(hankouSaveUrl, JSONObject.toJSONString(DTO), 30000); log.info("发送汉口站数据:{}", request); return R.status(true); } else if (xfDataEnum == XfDataEnum.LakeData) { MonitorLakeData DTO = JSONObject.parseObject(data, MonitorLakeData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.Meteorological) { MonitorMeteorologicalData DTO = JSONObject.parseObject(data, MonitorMeteorologicalData.class); // checkSetQxzP1(DTO); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.PptnData) { MonitorPptnData DTO = JSONObject.parseObject(data, MonitorPptnData.class); checkSet((MonitorPptnData) DTO); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.PumpData) { MonitorPumpData DTO = JSONObject.parseObject(data, MonitorPumpData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.RiverData) { MonitorRiverData DTO = JSONObject.parseObject(data, MonitorRiverData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.SoilData) { MonitorSoilData DTO = JSONObject.parseObject(data, MonitorSoilData.class);//todo model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.WasData) { MonitorWasData DTO = JSONObject.parseObject(data, MonitorWasData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.WetlogData) { MonitorWetlogData DTO = JSONObject.parseObject(data, MonitorWetlogData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.SewageData) { MonitorSewageData DTO = JSONObject.parseObject(data, MonitorSewageData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.WaterQualityData) { WaterQualityData DTO = JSONObject.parseObject(data, WaterQualityData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.StSlwtRData) { MonitorOtherData DTO = JSONObject.parseObject(data, MonitorOtherData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); } else if (xfDataEnum == XfDataEnum.SurgeTankData) { SurgeTankData surgeTankData = JSONObject.parseObject(data, SurgeTankData.class); model = RealTimeSerializer.xfObjectToRealMap(SurgeTankDataDto.dataToDto(surgeTankData)); } else if (xfDataEnum == XfDataEnum.DiverterWellData) { DiverterWellData diverterWellData = JSONObject.parseObject(data, DiverterWellData.class); model = RealTimeSerializer.xfObjectToRealMap(DiverterWellDataDto.dataToDto(diverterWellData)); } else if(xfDataEnum == XfDataEnum.WhjkSzqData){ WhjkSzqDto szqDto = JSONObject.parseObject(data, WhjkSzqDto.class); model = RealTimeSerializer.xfObjectToRealMap(WhjkSzqDto.dataToDto(szqDto)); }else if(xfDataEnum == XfDataEnum.InwamntData){ WrInwamntRDataDto surgeTankData = JSONObject.parseObject(data, WrInwamntRDataDto.class); model = RealTimeSerializer.xfObjectToRealMap(WrInwamntRData.dtoToDto(surgeTankData)); }else if(xfDataEnum == XfDataEnum.OutwamntData){ WrOutwamntRDataDto surgeTankData = JSONObject.parseObject(data, WrOutwamntRDataDto.class); model = RealTimeSerializer.xfObjectToRealMap(WrOutwamntRData.dtoToDto(surgeTankData)); } else if(xfDataEnum == XfDataEnum.GwWsLData ||xfDataEnum == XfDataEnum.GwWsQData ||xfDataEnum == XfDataEnum.GwYsLData ||xfDataEnum == XfDataEnum.GwYsQData ||xfDataEnum==XfDataEnum.GwYsSZData){ WsLData DTO = JSONObject.parseObject(data, WsLData.class); model = RealTimeSerializer.xfObjectToRealMap(DTO); // DTO = surgeTankDataList.stream().map(WrOutwamntRData::dtoToDto).collect(Collectors.toList()); } else if(xfDataEnum == XfDataEnum.GqYlData) { GqYlDataDto dto = JSONObject.parseObject(data, GqYlDataDto.class); model = RealTimeSerializer.xfObjectToRealMap(dto); } else { throw new RuntimeException("暂未开发"); } log.info("cover:{}", JSONObject.toJSONString(model)); if (CollUtil.isEmpty(model.getDataMap())) { log.error("数据转换错误:{}", JSONObject.toJSONString(model)); return R.status(true); } else { rabbitTemplate.convertAndSend(JSONObject.toJSONString(model)); return R.status(true); } } catch (Exception var5) { log.error("解析错误,:{}", var5.getStackTrace()); throw new RuntimeException("类型转换错误"); } } }