Optimize the parsing method of the MQTT protocol and solve the MQTT service disconnection issue.

This commit is contained in:
wty 2023-10-12 15:34:22 +08:00
parent 8121e7bbea
commit 77d5a1ab49
7 changed files with 255 additions and 76 deletions

View File

@ -1,4 +1,112 @@
package com.aiit.xiuos.mqtt;
import com.aiit.xiuos.Utils.MyUtils;
import com.aiit.xiuos.Utils.OTAFileUtil;
import com.aiit.xiuos.Utils.SpringUtil;
import com.aiit.xiuos.model.DeviceInfo;
import com.aiit.xiuos.model.FirmwareInfo;
import com.aiit.xiuos.model.OtaInfo;
import com.aiit.xiuos.service.DeviceInfoService;
import com.aiit.xiuos.service.FirmwareInfoService;
import com.aiit.xiuos.service.OtaInfoService;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;
@Slf4j
public class MsgReceiveHandle {
private OtaInfoService otaInfoService=SpringUtil.getBeanByClass(OtaInfoService.class);
private FirmwareInfoService firmwareInfoService=SpringUtil.getBeanByClass(FirmwareInfoService.class);
private DeviceInfoService deviceInfoService=SpringUtil.getBeanByClass(DeviceInfoService.class);
private MyMqttClient myMqttClient= SpringUtil.getBeanByClass(MyMqttClient.class);
public void handleConnected(JSONObject msgJson) {
String deviceId =msgJson.getString("clientid");
System.out.println("收到设备上线信息,设备ID:" + msgJson.getString("clientid"));
DeviceInfo deviceInfo = deviceInfoService.selectbyNo(deviceId);
if(deviceInfo==null){
log.info("平台不存在该设备id请检查");
return;
}
deviceInfo.setUpdatetime(MyUtils.getTime());
deviceInfo.setActivestatus(1);
deviceInfoService.updateDeviceInfo(deviceInfo);
}
public void handleDisConnected(JSONObject msgJson) {
String deviceId =msgJson.getString("clientid");
System.out.println("收到设备下线信息,设备ID:" + msgJson.getString("clientid"));
DeviceInfo deviceInfo = deviceInfoService.selectbyNo(deviceId);
if(deviceInfo==null){
log.info("平台不存在该设备id请检查");
return;
}
deviceInfo.setUpdatetime(MyUtils.getTime());
deviceInfo.setActivestatus(0);
deviceInfoService.updateDeviceInfo(deviceInfo);
}
public void handleOtaFile(JSONObject msgJson) {
int fileId = msgJson.getIntValue("fileId");
int fileOffset = msgJson.getIntValue("fileOffset");
int size = msgJson.getIntValue("size");
String clientId = msgJson.getString("clientId");
OtaInfo otaInfo = otaInfoService.getJobById(clientId);
if(otaInfo!=null){
otaInfo.setCurrentProcess(fileOffset+size);
otaInfoService.updateOtaInfo(otaInfo);
}
FirmwareInfo firmwareInfo = firmwareInfoService.getById(fileId);
String version =firmwareInfo.getFileVersion();
String name = firmwareInfo.getFileName();
String realName = version+"_"+name;
byte[] chunk = OTAFileUtil.getFile(realName, fileOffset,size);
JSONObject jsonObject = new JSONObject();
jsonObject.put("file",chunk);
myMqttClient.publish(1,false,"ota/" + clientId + "/files", chunk);
}
public void handleOtaVersion(JSONObject msgJson) {
String deviceId = msgJson.getString("clientId");
String version = msgJson.getString("version");
DeviceInfo deviceInfo = deviceInfoService.selectbyNo(deviceId);
if(deviceInfo==null) {
log.info("不存在设备号为" + deviceId + "的设备");
return;
}
deviceInfo.setUpdatetime(MyUtils.getTime());
deviceInfo.setWebversion(version);
//查看是否存在进行中的ota信息
OtaInfo otaInfo = otaInfoService.getJobById(deviceId);
if(otaInfo!=null){
String targetVersion = otaInfo.getFileVersion();
if(version.equals(targetVersion)){
otaInfo.setStatus(1);
}else {
otaInfo.setStatus(0);
}
otaInfoService.updateOtaInfo(otaInfo);
}
deviceInfoService.updateDeviceInfo(deviceInfo);
System.out.println("更新设备" + deviceId + "版本为" + version);
//执行更新内核版本操作并更新ota状态为失败
}
public void handleClientData(JSONObject msgJson) {
String clientId = msgJson.getString("clientId");
JSONObject data =msgJson.getJSONObject("data");
String time =msgJson.getString("time");
System.out.println("收到设备" + clientId + "的数据" + data.toJSONString());
//执行更新内核版本操作并更新ota状态为失败
}
}

View File

@ -4,17 +4,20 @@ package com.aiit.xiuos.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.FileOutputStream;
import java.util.ArrayList;
/**
* @author wangtianyi
* @description mqtt客户端
*/
@Slf4j
@Component
public class MyMqttClient {
@Autowired
PushCallback pushCallback;
private static MqttClient client;
@ -88,13 +91,37 @@ public class MyMqttClient {
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mqttTopic= MyMqttClient.getClient().getTopic(topic);
if(null== mqttTopic){
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
//client.publish(topic,message);
token=mqttTopic.publish(message);
token.waitForCompletion();
token.waitForCompletion(100);
}catch (MqttPersistenceException e){
log.error(e.getMessage());
}catch (MqttException e){
log.error(e.getMessage());
}
}
public void publish(int qos,boolean retained,String topic,byte[] payload){
MqttMessage message=new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(payload);
MqttTopic mqttTopic= MyMqttClient.getClient().getTopic(topic);
if(null== mqttTopic){
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
//client.publish(topic,message);
token=mqttTopic.publish(message);
token.waitForCompletion(100);
}catch (MqttPersistenceException e){
log.error(e.getMessage());
}catch (MqttException e){
@ -107,13 +134,14 @@ public class MyMqttClient {
* @param topic
*/
public void subscribe(String topic){
log.error("开始订阅主题" + topic);
log.info("开始订阅主题" + topic);
subscribe(topic,0);
}
public void subscribe(String topic,int qos){
try {
MyMqttClient.getClient().subscribe(topic,qos);
log.info("开始订阅主题" + topic);
}catch (MqttException e){
log.error(e.getMessage());
}

View File

@ -1,22 +1,30 @@
package com.aiit.xiuos.mqtt;
import cn.hutool.core.io.FileUtil;
import com.aiit.xiuos.Utils.SpringUtil;
import com.aiit.xiuos.service.OtaInfoService;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.BufferedOutputStream;
import java.io.FileOutputStream;
@Component
@Slf4j
public class PushCallback implements MqttCallbackExtended {
@Autowired
private MyMqttClient myMqttClient;
@Override
public void connectionLost(Throwable throwable) {
log.error("mqtt lose connect:"+throwable) ;
log.error("mqtt lose connect") ;
throwable.printStackTrace();
long reconnectTimes = 1;
while (true) {
try {
@ -28,6 +36,7 @@ public class PushCallback implements MqttCallbackExtended {
myMqttClient.getClient().reconnect();
} catch (MqttException e) {
log.error("", e);
e.printStackTrace();
}
try {
Thread.sleep(3000);
@ -38,12 +47,34 @@ public class PushCallback implements MqttCallbackExtended {
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
public void messageArrived(String topic, MqttMessage message) {
MsgReceiveHandle msgReceiveHandle = new MsgReceiveHandle();
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
String msg = new String(message.getPayload());
System.out.println("接收消息内容 : " + msg);
try{
JSONObject jsonObject = JSON.parseObject(msg);
if (topic.endsWith("disconnected")) {
msgReceiveHandle.handleDisConnected(jsonObject);
} else if(topic.endsWith("connected")){
msgReceiveHandle.handleConnected(jsonObject);
} else if(topic.endsWith("ota/files")){
msgReceiveHandle.handleOtaFile(jsonObject);
} else if(topic.endsWith("ota/version")){
msgReceiveHandle.handleOtaVersion(jsonObject);
} else if(topic.endsWith("/data")){
msgReceiveHandle.handleClientData(jsonObject);
}
}catch (Exception e){
log.error("处理信息发生错误,请检查");
e.printStackTrace();
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());

View File

@ -34,7 +34,7 @@ public class GZJCTaskScheduled {
}
@Scheduled(cron = "0 */1 * * * ?")//every hour
@Scheduled(cron = "0 */1 * * * ?")//every minute
public void sendWebsocket() throws ParseException, IOException {
GZJCData GZJCData = mockData();
JSONObject jsonObject = JSONUtil.parseObj(GZJCData,false,true);

View File

@ -11,9 +11,11 @@ public interface DeviceInfoService {
int activeDeviceByNo(String no);
List<DeviceInfo> selectActiveDevice(String org);
List<DeviceInfo> selectUnActiveDevice(String org);
List<DeviceInfo> selectbyNo(String no);
DeviceInfo selectbyNo(String no);
List<Map<String,String>> getDeviceTypeCount(String org);
List<Map<String,String>> getDeviceRunStautsCount(String org);
String getTypeByName(String deviceNo);
List<String> getDeviceList(String org);
int updateDeviceInfo(DeviceInfo deviceInfo);
}

View File

@ -42,8 +42,8 @@ public class DeviceInfoServiceImpl implements DeviceInfoService {
}
@Override
public List<DeviceInfo> selectbyNo(String no) {
return deviceInfoMapper.selectByNo(no);
public DeviceInfo selectbyNo(String no) {
return deviceInfoMapper.selectByPrimaryKey(no);
}
@Override
@ -64,4 +64,14 @@ public class DeviceInfoServiceImpl implements DeviceInfoService {
}
return null;
}
@Override
public List<String> getDeviceList(String org) {
return null;
}
@Override
public int updateDeviceInfo(DeviceInfo deviceInfo) {
return deviceInfoMapper.updateByPrimaryKeySelective(deviceInfo);
}
}

View File

@ -3,8 +3,8 @@
<mapper namespace="com.aiit.xiuos.dao.mappers.DeviceInfoMapper">
<resultMap id="BaseResultMap" type="com.aiit.xiuos.model.DeviceInfo">
<constructor>
<idArg column="id" javaType="java.lang.String" jdbcType="VARCHAR" />
<idArg column="no" javaType="java.lang.String" jdbcType="VARCHAR" />
<arg column="id" javaType="java.lang.String" jdbcType="VARCHAR" />
<arg column="productname" javaType="java.lang.String" jdbcType="VARCHAR" />
<arg column="type" javaType="java.lang.String" jdbcType="VARCHAR" />
<arg column="activestatus" javaType="java.lang.Integer" jdbcType="INTEGER" />
@ -31,52 +31,50 @@
</constructor>
</resultMap>
<sql id="Base_Column_List">
id, no, productname, type, activestatus, updatetime, devicedesc, runstatus, statusdesc,
kernel, webversion, ipaddr, netmask, gateway, dnsserver0, dnsserver1, topic, serveraddr,
serverport, username, clientid, privateserveraddr, privateserverport, privateserverusername,
no, id, productname, type, activestatus, updatetime, devicedesc, runstatus, statusdesc,
kernel, webversion, ipaddr, netmask, gateway, dnsserver0, dnsserver1, topic, serveraddr,
serverport, username, clientid, privateserveraddr, privateserverport, privateserverusername,
org
</sql>
<select id="selectByPrimaryKey" parameterType="map" resultMap="BaseResultMap">
select
<select id="selectByPrimaryKey" parameterType="java.lang.String" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from device_info
where id = #{id,jdbcType=VARCHAR}
and no = #{no,jdbcType=VARCHAR}
where no = #{no,jdbcType=VARCHAR}
</select>
<delete id="deleteByPrimaryKey" parameterType="map">
<delete id="deleteByPrimaryKey" parameterType="java.lang.String">
delete from device_info
where id = #{id,jdbcType=VARCHAR}
and no = #{no,jdbcType=VARCHAR}
where no = #{no,jdbcType=VARCHAR}
</delete>
<insert id="insert" parameterType="com.aiit.xiuos.model.DeviceInfo">
insert into device_info (id, no, productname,
type, activestatus, updatetime,
devicedesc, runstatus, statusdesc,
kernel, webversion, ipaddr,
netmask, gateway, dnsserver0,
dnsserver1, topic, serveraddr,
serverport, username, clientid,
privateserveraddr, privateserverport, privateserverusername,
org)
values (#{id,jdbcType=VARCHAR}, #{no,jdbcType=VARCHAR}, #{productname,jdbcType=VARCHAR},
#{type,jdbcType=VARCHAR}, #{activestatus,jdbcType=INTEGER}, #{updatetime,jdbcType=VARCHAR},
#{devicedesc,jdbcType=VARCHAR}, #{runstatus,jdbcType=INTEGER}, #{statusdesc,jdbcType=VARCHAR},
#{kernel,jdbcType=VARCHAR}, #{webversion,jdbcType=VARCHAR}, #{ipaddr,jdbcType=VARCHAR},
#{netmask,jdbcType=VARCHAR}, #{gateway,jdbcType=VARCHAR}, #{dnsserver0,jdbcType=VARCHAR},
#{dnsserver1,jdbcType=VARCHAR}, #{topic,jdbcType=VARCHAR}, #{serveraddr,jdbcType=VARCHAR},
#{serverport,jdbcType=VARCHAR}, #{username,jdbcType=VARCHAR}, #{clientid,jdbcType=VARCHAR},
#{privateserveraddr,jdbcType=VARCHAR}, #{privateserverport,jdbcType=VARCHAR}, #{privateserverusername,jdbcType=VARCHAR},
#{org,jdbcType=VARCHAR})
insert into device_info (no, id, productname,
type, activestatus, updatetime,
devicedesc, runstatus, statusdesc,
kernel, webversion, ipaddr,
netmask, gateway, dnsserver0,
dnsserver1, topic, serveraddr,
serverport, username, clientid,
privateserveraddr, privateserverport, privateserverusername,
org)
values (#{no,jdbcType=VARCHAR}, #{id,jdbcType=VARCHAR}, #{productname,jdbcType=VARCHAR},
#{type,jdbcType=VARCHAR}, #{activestatus,jdbcType=INTEGER}, #{updatetime,jdbcType=VARCHAR},
#{devicedesc,jdbcType=VARCHAR}, #{runstatus,jdbcType=INTEGER}, #{statusdesc,jdbcType=VARCHAR},
#{kernel,jdbcType=VARCHAR}, #{webversion,jdbcType=VARCHAR}, #{ipaddr,jdbcType=VARCHAR},
#{netmask,jdbcType=VARCHAR}, #{gateway,jdbcType=VARCHAR}, #{dnsserver0,jdbcType=VARCHAR},
#{dnsserver1,jdbcType=VARCHAR}, #{topic,jdbcType=VARCHAR}, #{serveraddr,jdbcType=VARCHAR},
#{serverport,jdbcType=VARCHAR}, #{username,jdbcType=VARCHAR}, #{clientid,jdbcType=VARCHAR},
#{privateserveraddr,jdbcType=VARCHAR}, #{privateserverport,jdbcType=VARCHAR}, #{privateserverusername,jdbcType=VARCHAR},
#{org,jdbcType=VARCHAR})
</insert>
<insert id="insertSelective" parameterType="com.aiit.xiuos.model.DeviceInfo">
insert into device_info
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="id != null">
id,
</if>
<if test="no != null">
no,
</if>
<if test="id != null">
id,
</if>
<if test="productname != null">
productname,
</if>
@ -148,12 +146,12 @@
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">
#{id,jdbcType=VARCHAR},
</if>
<if test="no != null">
#{no,jdbcType=VARCHAR},
</if>
<if test="id != null">
#{id,jdbcType=VARCHAR},
</if>
<if test="productname != null">
#{productname,jdbcType=VARCHAR},
</if>
@ -228,6 +226,9 @@
<update id="updateByPrimaryKeySelective" parameterType="com.aiit.xiuos.model.DeviceInfo">
update device_info
<set>
<if test="id != null">
id = #{id,jdbcType=VARCHAR},
</if>
<if test="productname != null">
productname = #{productname,jdbcType=VARCHAR},
</if>
@ -298,36 +299,35 @@
org = #{org,jdbcType=VARCHAR},
</if>
</set>
where id = #{id,jdbcType=VARCHAR}
and no = #{no,jdbcType=VARCHAR}
where no = #{no,jdbcType=VARCHAR}
</update>
<update id="updateByPrimaryKey" parameterType="com.aiit.xiuos.model.DeviceInfo">
update device_info
set productname = #{productname,jdbcType=VARCHAR},
type = #{type,jdbcType=VARCHAR},
activestatus = #{activestatus,jdbcType=INTEGER},
updatetime = #{updatetime,jdbcType=VARCHAR},
devicedesc = #{devicedesc,jdbcType=VARCHAR},
runstatus = #{runstatus,jdbcType=INTEGER},
statusdesc = #{statusdesc,jdbcType=VARCHAR},
kernel = #{kernel,jdbcType=VARCHAR},
webversion = #{webversion,jdbcType=VARCHAR},
ipaddr = #{ipaddr,jdbcType=VARCHAR},
netmask = #{netmask,jdbcType=VARCHAR},
gateway = #{gateway,jdbcType=VARCHAR},
dnsserver0 = #{dnsserver0,jdbcType=VARCHAR},
dnsserver1 = #{dnsserver1,jdbcType=VARCHAR},
topic = #{topic,jdbcType=VARCHAR},
serveraddr = #{serveraddr,jdbcType=VARCHAR},
serverport = #{serverport,jdbcType=VARCHAR},
username = #{username,jdbcType=VARCHAR},
clientid = #{clientid,jdbcType=VARCHAR},
privateserveraddr = #{privateserveraddr,jdbcType=VARCHAR},
privateserverport = #{privateserverport,jdbcType=VARCHAR},
privateserverusername = #{privateserverusername,jdbcType=VARCHAR},
org = #{org,jdbcType=VARCHAR}
where id = #{id,jdbcType=VARCHAR}
and no = #{no,jdbcType=VARCHAR}
set id = #{id,jdbcType=VARCHAR},
productname = #{productname,jdbcType=VARCHAR},
type = #{type,jdbcType=VARCHAR},
activestatus = #{activestatus,jdbcType=INTEGER},
updatetime = #{updatetime,jdbcType=VARCHAR},
devicedesc = #{devicedesc,jdbcType=VARCHAR},
runstatus = #{runstatus,jdbcType=INTEGER},
statusdesc = #{statusdesc,jdbcType=VARCHAR},
kernel = #{kernel,jdbcType=VARCHAR},
webversion = #{webversion,jdbcType=VARCHAR},
ipaddr = #{ipaddr,jdbcType=VARCHAR},
netmask = #{netmask,jdbcType=VARCHAR},
gateway = #{gateway,jdbcType=VARCHAR},
dnsserver0 = #{dnsserver0,jdbcType=VARCHAR},
dnsserver1 = #{dnsserver1,jdbcType=VARCHAR},
topic = #{topic,jdbcType=VARCHAR},
serveraddr = #{serveraddr,jdbcType=VARCHAR},
serverport = #{serverport,jdbcType=VARCHAR},
username = #{username,jdbcType=VARCHAR},
clientid = #{clientid,jdbcType=VARCHAR},
privateserveraddr = #{privateserveraddr,jdbcType=VARCHAR},
privateserverport = #{privateserverport,jdbcType=VARCHAR},
privateserverusername = #{privateserverusername,jdbcType=VARCHAR},
org = #{org,jdbcType=VARCHAR}
where no = #{no,jdbcType=VARCHAR}
</update>
<select id="getDeviceTypeCount" parameterType="map" resultType="java.util.Map">