package com.dji.sdk.mqtt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.annotation.IntegrationComponentScan; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.mqtt.core.MqttPahoClientFactory; import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter; import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler; import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandler; import javax.annotation.Resource; import java.util.UUID; /** * Client configuration for inbound messages. * @author sean.zhou * @date 2021/11/10 * @version 0.1 */ @Configuration @IntegrationComponentScan public class MqttConfiguration { private static final Logger log = LoggerFactory.getLogger(MqttConfiguration.class); @Value("${cloud-sdk.mqtt.inbound-topic: }") private String inboundTopic; @Resource private MqttPahoClientFactory mqttClientFactory; @Resource(name = ChannelName.INBOUND) private MessageChannel inboundChannel; /** * Clients of inbound message channels. * @return */ @Bean public MqttPahoMessageDrivenChannelAdapter mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter( UUID.randomUUID().toString(), mqttClientFactory, inboundTopic.split(",")); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); // use byte types uniformly converter.setPayloadAsBytes(true); adapter.setConverter(converter); adapter.setQos(1); adapter.setOutputChannel(inboundChannel); return adapter; } /** * Clients of outbound message channels. * @return */ @Bean @ServiceActivator(inputChannel = ChannelName.OUTBOUND) public MessageHandler mqttOutbound() { MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler( UUID.randomUUID().toString(), mqttClientFactory); DefaultPahoMessageConverter converter = new DefaultPahoMessageConverter(); // use byte types uniformly converter.setPayloadAsBytes(true); messageHandler.setAsync(true); messageHandler.setDefaultQos(0); messageHandler.setConverter(converter); return messageHandler; } /** * Define a default channel to handle messages that have no effect. * @return */ @Bean @ServiceActivator(inputChannel = ChannelName.DEFAULT) public MessageHandler defaultInboundHandler() { return message -> { log.info("The default channel does not handle messages." + "\nTopic: " + message.getHeaders().get(MqttHeaders.RECEIVED_TOPIC) + "\nPayload: " + message.getPayload() + "\n"); }; } }