add connect to emqx function

This commit is contained in:
wty 2022-11-14 17:27:19 +08:00
parent aec8fc7bcf
commit 46452af468
7 changed files with 258 additions and 14 deletions

View File

@ -1,6 +1,7 @@
package com.aiit.xiuos.Utils;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
@ -14,7 +15,7 @@ import org.apache.http.util.EntityUtils;
import java.io.IOException;
@Slf4j
public class HttpClientUtil {
private static String tokenString = "";
@ -24,7 +25,6 @@ public class HttpClientUtil {
/**
* 以get方式调用第三方接口
* @param url
* @param token
* @return
*/
public static JSONObject doGet(String url) {
@ -45,7 +45,7 @@ public class HttpClientUtil {
return (JSONObject) JSONObject.parse(res);
}
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
}
return null;
}
@ -81,13 +81,13 @@ public class HttpClientUtil {
return res;
}
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
} finally {
if (httpClient != null){
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
@ -123,7 +123,7 @@ public class HttpClientUtil {
token = result.getString("token");
}
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
}
return token;
}
@ -144,7 +144,9 @@ public class HttpClientUtil {
System.out.println(response);
}
public static void main(String[] args) {
test("12345678910");
public static void main(String[] arg0){
}
}

View File

@ -2,6 +2,7 @@ package com.aiit.xiuos.Utils;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.http.HttpServletRequest;
import java.io.*;
@ -12,7 +13,7 @@ import java.util.Date;
import java.util.List;
@Slf4j
public class MyUtils {
public static void main(String[] args) {
@ -62,9 +63,14 @@ public class MyUtils {
return time;
}
public static Date getDateTime() throws ParseException {
public static Date getDateTime() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = sdf.parse(MyUtils.getTime());
Date date = null;
try {
date = sdf.parse(MyUtils.getTime());
} catch (ParseException e) {
log.error(e.getMessage());
}
return date;
}

View File

@ -0,0 +1,59 @@
package com.aiit.xiuos.mqtt;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("mqtt")
@Setter
@Getter
@Slf4j
public class MqttConfiguration {
@Autowired
private MyMqttClient myMqttClient;
/**
* 用户名
*/
private String username; /**
* 密码
*/
private String password;
/**
* 连接地址
*/
private String hostUrl;
/**
* 客户Id
*/
private String clientId;
/**
* 默认连接话题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* 保持连接数
*/
private int keepalive;
@Bean
public MyMqttClient getMqttPushClient() {
myMqttClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
// /#结尾表示订阅所有以test开头的主题
myMqttClient.subscribe(defaultTopic, 0);
log.info("订阅成功"+defaultTopic);
System.out.println("订阅成功"+defaultTopic);
return myMqttClient;
}
}

View File

@ -0,0 +1,121 @@
package com.aiit.xiuos.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
/**
* @author wangtianyi
* @description mqtt客户端
*/
@Slf4j
@Component
public class MyMqttClient {
private static MqttClient client;
public static MqttClient getClient(){
return client;
}
public static void setClient(MqttClient client){
MyMqttClient.client=client;
}
/**
* 客户端连接
*
* @param host ip+端口
* @param clientID 客户端Id
* @param username 用户名
* @param password 密码
* @param timeout 超时时间
* @param keepalive 保留数
*/
public void connect(String host,String clientID,String username,String password,int timeout,int keepalive){
MqttClient client=null;
try {
client=new MqttClient(host,clientID,new MemoryPersistence());
MqttConnectOptions options=new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setAutomaticReconnect(true);
MyMqttClient.setClient(client);
try {
client.setCallback(new PushCallback());
client.connect(options);
boolean complete = client.isConnected();
log.info("MQTT连接"+(complete?"成功":"失败"));
}catch (Exception e){
log.error(e.getMessage());
}
}catch (Exception e){
log.error(e.getMessage());
}
}
/**
* 发布默认qos为0非持久化
* @param topic
* @param pushMessage
*/
public void publish(String topic,String pushMessage){
publish(0,false,topic,pushMessage);
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void publish(int qos,boolean retained,String topic,String pushMessage){
MqttMessage message=new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mqttTopic= MyMqttClient.getClient().getTopic(topic);
if(null== mqttTopic){
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
token=mqttTopic.publish(message);
token.waitForCompletion();
}catch (MqttPersistenceException e){
log.error(e.getMessage());
}catch (MqttException e){
log.error(e.getMessage());
}
}
/**
* 订阅某个主题qos默认为0
* @param topic
*/
public void subscribe(String topic){
log.error("开始订阅主题" + topic);
subscribe(topic,0);
}
public void subscribe(String topic,int qos){
try {
MyMqttClient.getClient().subscribe(topic,qos);
}catch (MqttException e){
log.error(e.getMessage());
}
}
}

View File

@ -0,0 +1,55 @@
package com.aiit.xiuos.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class PushCallback implements MqttCallbackExtended {
@Autowired
private MyMqttClient myMqttClient;
@Override
public void connectionLost(Throwable throwable) {
long reconnectTimes = 1;
while (true) {
try {
if (myMqttClient.getClient().isConnected()) {
log.info("mqtt reconnect success end");
return;
}
log.info("mqtt reconnect times = {} try again...", reconnectTimes++);
myMqttClient.getClient().reconnect();
} catch (MqttException e) {
log.error("", e);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
@Override
public void connectComplete(boolean b, String s) {
}
}

View File

@ -32,7 +32,7 @@ spring:
debug: true
redis:
database: 0
database: 1
host: 115.238.53.59
port: 6379
password: abc123

View File

@ -3,12 +3,13 @@ package com.aiit.xiuos;
import com.aiit.xiuos.mqtt.MyMqttClient;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Random;
@Slf4j
@SpringBootTest(classes = XiuosApplication.class,webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class MqttTest {
@Autowired
@ -28,7 +29,7 @@ public class MqttTest {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}