From 46452af4686275a5c9eb2ef40fa34db18c6e106b Mon Sep 17 00:00:00 2001 From: wty <419034340@qq.com> Date: Mon, 14 Nov 2022 17:27:19 +0800 Subject: [PATCH] add connect to emqx function --- .../com/aiit/xiuos/Utils/HttpClientUtil.java | 18 +-- .../java/com/aiit/xiuos/Utils/MyUtils.java | 12 +- .../aiit/xiuos/mqtt/MqttConfiguration.java | 59 +++++++++ .../com/aiit/xiuos/mqtt/MyMqttClient.java | 121 ++++++++++++++++++ .../com/aiit/xiuos/mqtt/PushCallback.java | 55 ++++++++ .../src/main/resources/application-prod.yml | 2 +- .../test/java/com/aiit/xiuos/MqttTest.java | 5 +- 7 files changed, 258 insertions(+), 14 deletions(-) create mode 100644 xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MqttConfiguration.java create mode 100644 xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java create mode 100644 xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/HttpClientUtil.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/HttpClientUtil.java index c12dfcf..f6095a0 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/HttpClientUtil.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/HttpClientUtil.java @@ -1,6 +1,7 @@ package com.aiit.xiuos.Utils; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.methods.CloseableHttpResponse; @@ -14,7 +15,7 @@ import org.apache.http.util.EntityUtils; import java.io.IOException; - +@Slf4j public class HttpClientUtil { private static String tokenString = ""; @@ -24,7 +25,6 @@ public class HttpClientUtil { /** * 以get方式调用第三方接口 * @param url - * @param token * @return */ public static JSONObject doGet(String url) { @@ -45,7 +45,7 @@ public class HttpClientUtil { return (JSONObject) JSONObject.parse(res); } } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } return null; } @@ -81,13 +81,13 @@ public class HttpClientUtil { return res; } } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } finally { if (httpClient != null){ try { httpClient.close(); } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } } } @@ -123,7 +123,7 @@ public class HttpClientUtil { token = result.getString("token"); } } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } return token; } @@ -144,7 +144,9 @@ public class HttpClientUtil { System.out.println(response); } - public static void main(String[] args) { - test("12345678910"); + + +public static void main(String[] arg0){ + } } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/MyUtils.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/MyUtils.java index 8d1d05b..e6b03c0 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/MyUtils.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/MyUtils.java @@ -2,6 +2,7 @@ package com.aiit.xiuos.Utils; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import javax.servlet.http.HttpServletRequest; import java.io.*; @@ -12,7 +13,7 @@ import java.util.Date; import java.util.List; - +@Slf4j public class MyUtils { public static void main(String[] args) { @@ -62,9 +63,14 @@ public class MyUtils { return time; } - public static Date getDateTime() throws ParseException { + public static Date getDateTime() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Date date = sdf.parse(MyUtils.getTime()); + Date date = null; + try { + date = sdf.parse(MyUtils.getTime()); + } catch (ParseException e) { + log.error(e.getMessage()); + } return date; } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MqttConfiguration.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..3d7bd53 --- /dev/null +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MqttConfiguration.java @@ -0,0 +1,59 @@ +package com.aiit.xiuos.mqtt; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + + +@Component +@ConfigurationProperties("mqtt") +@Setter +@Getter +@Slf4j +public class MqttConfiguration { + @Autowired + private MyMqttClient myMqttClient; + + /** + * 用户名 + */ + private String username; /** + + * 密码 + */ + private String password; + /** + * 连接地址 + */ + private String hostUrl; + /** + * 客户Id + */ + private String clientId; + /** + * 默认连接话题 + */ + private String defaultTopic; + /** + * 超时时间 + */ + private int timeout; + /** + * 保持连接数 + */ + private int keepalive; + + @Bean + public MyMqttClient getMqttPushClient() { + myMqttClient.connect(hostUrl, clientId, username, password, timeout, keepalive); + // 以/#结尾表示订阅所有以test开头的主题 + myMqttClient.subscribe(defaultTopic, 0); + log.info("订阅成功"+defaultTopic); + System.out.println("订阅成功"+defaultTopic); + return myMqttClient; + } +} diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java new file mode 100644 index 0000000..fd05317 --- /dev/null +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java @@ -0,0 +1,121 @@ +package com.aiit.xiuos.mqtt; + + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Component; + +/** + * @author wangtianyi + * @description mqtt客户端 + */ +@Slf4j +@Component +public class MyMqttClient { + + + + private static MqttClient client; + + public static MqttClient getClient(){ + return client; + } + + public static void setClient(MqttClient client){ + MyMqttClient.client=client; + } + + /** + * 客户端连接 + * + * @param host ip+端口 + * @param clientID 客户端Id + * @param username 用户名 + * @param password 密码 + * @param timeout 超时时间 + * @param keepalive 保留数 + */ + public void connect(String host,String clientID,String username,String password,int timeout,int keepalive){ + MqttClient client=null; + + try { + client=new MqttClient(host,clientID,new MemoryPersistence()); + MqttConnectOptions options=new MqttConnectOptions(); + options.setCleanSession(true); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setConnectionTimeout(timeout); + options.setKeepAliveInterval(keepalive); + options.setAutomaticReconnect(true); + MyMqttClient.setClient(client); + try { + client.setCallback(new PushCallback()); + client.connect(options); + boolean complete = client.isConnected(); + log.info("MQTT连接"+(complete?"成功":"失败")); + }catch (Exception e){ + log.error(e.getMessage()); + } + }catch (Exception e){ + log.error(e.getMessage()); + } + + } + + + + /** + * 发布,默认qos为0,非持久化 + * @param topic + * @param pushMessage + */ + public void publish(String topic,String pushMessage){ + publish(0,false,topic,pushMessage); + } + + /** + * 发布 + * + * @param qos 连接方式 + * @param retained 是否保留 + * @param topic 主题 + * @param pushMessage 消息体 + */ + public void publish(int qos,boolean retained,String topic,String pushMessage){ + MqttMessage message=new MqttMessage(); + message.setQos(qos); + message.setRetained(retained); + message.setPayload(pushMessage.getBytes()); + MqttTopic mqttTopic= MyMqttClient.getClient().getTopic(topic); + if(null== mqttTopic){ + log.error("topic not exist"); + } + MqttDeliveryToken token; + try { + token=mqttTopic.publish(message); + token.waitForCompletion(); + }catch (MqttPersistenceException e){ + log.error(e.getMessage()); + }catch (MqttException e){ + log.error(e.getMessage()); + } + } + + /** + * 订阅某个主题,qos默认为0 + * @param topic + */ + public void subscribe(String topic){ + log.error("开始订阅主题" + topic); + subscribe(topic,0); + } + + public void subscribe(String topic,int qos){ + try { + MyMqttClient.getClient().subscribe(topic,qos); + }catch (MqttException e){ + log.error(e.getMessage()); + } + } +} \ No newline at end of file diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java new file mode 100644 index 0000000..55edfe0 --- /dev/null +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java @@ -0,0 +1,55 @@ +package com.aiit.xiuos.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + + +@Component +@Slf4j +public class PushCallback implements MqttCallbackExtended { + + + @Autowired + private MyMqttClient myMqttClient; + + @Override + public void connectionLost(Throwable throwable) { + long reconnectTimes = 1; + while (true) { + try { + if (myMqttClient.getClient().isConnected()) { + log.info("mqtt reconnect success end"); + return; + } + log.info("mqtt reconnect times = {} try again...", reconnectTimes++); + myMqttClient.getClient().reconnect(); + } catch (MqttException e) { + log.error("", e); + } + try { + Thread.sleep(3000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + System.out.println("接收消息主题 : " + topic); + System.out.println("接收消息Qos : " + message.getQos()); + System.out.println("接收消息内容 : " + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } + + @Override + public void connectComplete(boolean b, String s) { + + } +} \ No newline at end of file diff --git a/xiuosiot-backend/src/main/resources/application-prod.yml b/xiuosiot-backend/src/main/resources/application-prod.yml index 1c5e4c7..8327905 100644 --- a/xiuosiot-backend/src/main/resources/application-prod.yml +++ b/xiuosiot-backend/src/main/resources/application-prod.yml @@ -32,7 +32,7 @@ spring: debug: true redis: - database: 0 + database: 1 host: 115.238.53.59 port: 6379 password: abc123 diff --git a/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java b/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java index c224718..ae0e51e 100644 --- a/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java +++ b/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java @@ -3,12 +3,13 @@ package com.aiit.xiuos; import com.aiit.xiuos.mqtt.MyMqttClient; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.Random; - +@Slf4j @SpringBootTest(classes = XiuosApplication.class,webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class MqttTest { @Autowired @@ -28,7 +29,7 @@ public class MqttTest { try { Thread.sleep(3000); } catch (InterruptedException e) { - e.printStackTrace(); + log.error(e.getMessage()); } } }