diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MsgReceiveHandle.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MsgReceiveHandle.java index e246aa2..7cca8c0 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MsgReceiveHandle.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MsgReceiveHandle.java @@ -1,4 +1,112 @@ package com.aiit.xiuos.mqtt; +import com.aiit.xiuos.Utils.MyUtils; +import com.aiit.xiuos.Utils.OTAFileUtil; +import com.aiit.xiuos.Utils.SpringUtil; +import com.aiit.xiuos.model.DeviceInfo; +import com.aiit.xiuos.model.FirmwareInfo; +import com.aiit.xiuos.model.OtaInfo; +import com.aiit.xiuos.service.DeviceInfoService; +import com.aiit.xiuos.service.FirmwareInfoService; +import com.aiit.xiuos.service.OtaInfoService; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.stereotype.Component; + +@Slf4j public class MsgReceiveHandle { + + private OtaInfoService otaInfoService=SpringUtil.getBeanByClass(OtaInfoService.class); + + private FirmwareInfoService firmwareInfoService=SpringUtil.getBeanByClass(FirmwareInfoService.class); + + private DeviceInfoService deviceInfoService=SpringUtil.getBeanByClass(DeviceInfoService.class); + + + private MyMqttClient myMqttClient= SpringUtil.getBeanByClass(MyMqttClient.class); + + public void handleConnected(JSONObject msgJson) { + String deviceId =msgJson.getString("clientid"); + System.out.println("收到设备上线信息,设备ID:" + msgJson.getString("clientid")); + DeviceInfo deviceInfo = deviceInfoService.selectbyNo(deviceId); + if(deviceInfo==null){ + log.info("平台不存在该设备id,请检查"); + return; + } + deviceInfo.setUpdatetime(MyUtils.getTime()); + deviceInfo.setActivestatus(1); + deviceInfoService.updateDeviceInfo(deviceInfo); + } + + public void handleDisConnected(JSONObject msgJson) { + String deviceId =msgJson.getString("clientid"); + System.out.println("收到设备下线信息,设备ID:" + msgJson.getString("clientid")); + DeviceInfo deviceInfo = deviceInfoService.selectbyNo(deviceId); + if(deviceInfo==null){ + log.info("平台不存在该设备id,请检查"); + return; + } + deviceInfo.setUpdatetime(MyUtils.getTime()); + deviceInfo.setActivestatus(0); + deviceInfoService.updateDeviceInfo(deviceInfo); + } + + public void handleOtaFile(JSONObject msgJson) { + int fileId = msgJson.getIntValue("fileId"); + int fileOffset = msgJson.getIntValue("fileOffset"); + int size = msgJson.getIntValue("size"); + String clientId = msgJson.getString("clientId"); + OtaInfo otaInfo = otaInfoService.getJobById(clientId); + if(otaInfo!=null){ + otaInfo.setCurrentProcess(fileOffset+size); + otaInfoService.updateOtaInfo(otaInfo); + } + FirmwareInfo firmwareInfo = firmwareInfoService.getById(fileId); + String version =firmwareInfo.getFileVersion(); + String name = firmwareInfo.getFileName(); + String realName = version+"_"+name; + byte[] chunk = OTAFileUtil.getFile(realName, fileOffset,size); + JSONObject jsonObject = new JSONObject(); + jsonObject.put("file",chunk); + myMqttClient.publish(1,false,"ota/" + clientId + "/files", chunk); + } + + public void handleOtaVersion(JSONObject msgJson) { + String deviceId = msgJson.getString("clientId"); + String version = msgJson.getString("version"); + DeviceInfo deviceInfo = deviceInfoService.selectbyNo(deviceId); + if(deviceInfo==null) { + log.info("不存在设备号为" + deviceId + "的设备"); + return; + } + deviceInfo.setUpdatetime(MyUtils.getTime()); + deviceInfo.setWebversion(version); + //查看是否存在进行中的ota信息 + OtaInfo otaInfo = otaInfoService.getJobById(deviceId); + if(otaInfo!=null){ + String targetVersion = otaInfo.getFileVersion(); + if(version.equals(targetVersion)){ + otaInfo.setStatus(1); + + }else { + otaInfo.setStatus(0); + } + otaInfoService.updateOtaInfo(otaInfo); + } + deviceInfoService.updateDeviceInfo(deviceInfo); + + + System.out.println("更新设备" + deviceId + "版本为" + version); + //执行更新内核版本操作,并更新ota状态为失败。 + } + + public void handleClientData(JSONObject msgJson) { + String clientId = msgJson.getString("clientId"); + JSONObject data =msgJson.getJSONObject("data"); + String time =msgJson.getString("time"); + System.out.println("收到设备" + clientId + "的数据" + data.toJSONString()); + //执行更新内核版本操作,并更新ota状态为失败。 + } } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java index fd05317..cbc4725 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java @@ -4,17 +4,20 @@ package com.aiit.xiuos.mqtt; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.io.FileOutputStream; +import java.util.ArrayList; + /** * @author wangtianyi * @description mqtt客户端 */ @Slf4j -@Component public class MyMqttClient { - - + @Autowired + PushCallback pushCallback; private static MqttClient client; @@ -88,13 +91,37 @@ public class MyMqttClient { message.setRetained(retained); message.setPayload(pushMessage.getBytes()); MqttTopic mqttTopic= MyMqttClient.getClient().getTopic(topic); + if(null== mqttTopic){ log.error("topic not exist"); } MqttDeliveryToken token; try { + //client.publish(topic,message); token=mqttTopic.publish(message); - token.waitForCompletion(); + token.waitForCompletion(100); + }catch (MqttPersistenceException e){ + log.error(e.getMessage()); + }catch (MqttException e){ + log.error(e.getMessage()); + } + } + + public void publish(int qos,boolean retained,String topic,byte[] payload){ + MqttMessage message=new MqttMessage(); + message.setQos(qos); + message.setRetained(retained); + message.setPayload(payload); + MqttTopic mqttTopic= MyMqttClient.getClient().getTopic(topic); + + if(null== mqttTopic){ + log.error("topic not exist"); + } + MqttDeliveryToken token; + try { + //client.publish(topic,message); + token=mqttTopic.publish(message); + token.waitForCompletion(100); }catch (MqttPersistenceException e){ log.error(e.getMessage()); }catch (MqttException e){ @@ -107,13 +134,14 @@ public class MyMqttClient { * @param topic */ public void subscribe(String topic){ - log.error("开始订阅主题" + topic); + log.info("开始订阅主题" + topic); subscribe(topic,0); } public void subscribe(String topic,int qos){ try { MyMqttClient.getClient().subscribe(topic,qos); + log.info("开始订阅主题" + topic); }catch (MqttException e){ log.error(e.getMessage()); } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java index 43e8afa..5c12240 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java @@ -1,22 +1,30 @@ package com.aiit.xiuos.mqtt; +import cn.hutool.core.io.FileUtil; +import com.aiit.xiuos.Utils.SpringUtil; +import com.aiit.xiuos.service.OtaInfoService; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.eclipse.paho.client.mqttv3.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import java.io.BufferedOutputStream; +import java.io.FileOutputStream; + @Component @Slf4j public class PushCallback implements MqttCallbackExtended { - - @Autowired private MyMqttClient myMqttClient; + @Override public void connectionLost(Throwable throwable) { - log.error("mqtt lose connect:"+throwable) ; + log.error("mqtt lose connect") ; + throwable.printStackTrace(); long reconnectTimes = 1; while (true) { try { @@ -28,6 +36,7 @@ public class PushCallback implements MqttCallbackExtended { myMqttClient.getClient().reconnect(); } catch (MqttException e) { log.error("", e); + e.printStackTrace(); } try { Thread.sleep(3000); @@ -38,12 +47,34 @@ public class PushCallback implements MqttCallbackExtended { } @Override - public void messageArrived(String topic, MqttMessage message) throws Exception { + public void messageArrived(String topic, MqttMessage message) { + MsgReceiveHandle msgReceiveHandle = new MsgReceiveHandle(); System.out.println("接收消息主题 : " + topic); System.out.println("接收消息Qos : " + message.getQos()); - System.out.println("接收消息内容 : " + new String(message.getPayload())); + String msg = new String(message.getPayload()); + System.out.println("接收消息内容 : " + msg); + + try{ + JSONObject jsonObject = JSON.parseObject(msg); + if (topic.endsWith("disconnected")) { + msgReceiveHandle.handleDisConnected(jsonObject); + } else if(topic.endsWith("connected")){ + msgReceiveHandle.handleConnected(jsonObject); + } else if(topic.endsWith("ota/files")){ + msgReceiveHandle.handleOtaFile(jsonObject); + } else if(topic.endsWith("ota/version")){ + msgReceiveHandle.handleOtaVersion(jsonObject); + } else if(topic.endsWith("/data")){ + msgReceiveHandle.handleClientData(jsonObject); + } + }catch (Exception e){ + log.error("处理信息发生错误,请检查"); + e.printStackTrace(); + } + } + @Override public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/scheduled/GZJCTaskScheduled.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/scheduled/GZJCTaskScheduled.java index dbb22b8..387bc6b 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/scheduled/GZJCTaskScheduled.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/scheduled/GZJCTaskScheduled.java @@ -34,7 +34,7 @@ public class GZJCTaskScheduled { } - @Scheduled(cron = "0 */1 * * * ?")//every hour + @Scheduled(cron = "0 */1 * * * ?")//every minute public void sendWebsocket() throws ParseException, IOException { GZJCData GZJCData = mockData(); JSONObject jsonObject = JSONUtil.parseObj(GZJCData,false,true); diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/DeviceInfoService.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/DeviceInfoService.java index 535a6ff..4566e30 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/DeviceInfoService.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/DeviceInfoService.java @@ -11,9 +11,11 @@ public interface DeviceInfoService { int activeDeviceByNo(String no); List selectActiveDevice(String org); List selectUnActiveDevice(String org); - List selectbyNo(String no); + DeviceInfo selectbyNo(String no); List> getDeviceTypeCount(String org); List> getDeviceRunStautsCount(String org); String getTypeByName(String deviceNo); + List getDeviceList(String org); + int updateDeviceInfo(DeviceInfo deviceInfo); } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DeviceInfoServiceImpl.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DeviceInfoServiceImpl.java index 0777c06..b309455 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DeviceInfoServiceImpl.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DeviceInfoServiceImpl.java @@ -42,8 +42,8 @@ public class DeviceInfoServiceImpl implements DeviceInfoService { } @Override - public List selectbyNo(String no) { - return deviceInfoMapper.selectByNo(no); + public DeviceInfo selectbyNo(String no) { + return deviceInfoMapper.selectByPrimaryKey(no); } @Override @@ -64,4 +64,14 @@ public class DeviceInfoServiceImpl implements DeviceInfoService { } return null; } + + @Override + public List getDeviceList(String org) { + return null; + } + + @Override + public int updateDeviceInfo(DeviceInfo deviceInfo) { + return deviceInfoMapper.updateByPrimaryKeySelective(deviceInfo); + } } diff --git a/xiuosiot-backend/src/main/resources/mappers/DeviceInfoMapper.xml b/xiuosiot-backend/src/main/resources/mappers/DeviceInfoMapper.xml index 0fc5dfb..08e7363 100644 --- a/xiuosiot-backend/src/main/resources/mappers/DeviceInfoMapper.xml +++ b/xiuosiot-backend/src/main/resources/mappers/DeviceInfoMapper.xml @@ -3,8 +3,8 @@ - + @@ -31,52 +31,50 @@ - id, no, productname, type, activestatus, updatetime, devicedesc, runstatus, statusdesc, - kernel, webversion, ipaddr, netmask, gateway, dnsserver0, dnsserver1, topic, serveraddr, - serverport, username, clientid, privateserveraddr, privateserverport, privateserverusername, + no, id, productname, type, activestatus, updatetime, devicedesc, runstatus, statusdesc, + kernel, webversion, ipaddr, netmask, gateway, dnsserver0, dnsserver1, topic, serveraddr, + serverport, username, clientid, privateserveraddr, privateserverport, privateserverusername, org - + select from device_info - where id = #{id,jdbcType=VARCHAR} - and no = #{no,jdbcType=VARCHAR} + where no = #{no,jdbcType=VARCHAR} - + delete from device_info - where id = #{id,jdbcType=VARCHAR} - and no = #{no,jdbcType=VARCHAR} + where no = #{no,jdbcType=VARCHAR} - insert into device_info (id, no, productname, - type, activestatus, updatetime, - devicedesc, runstatus, statusdesc, - kernel, webversion, ipaddr, - netmask, gateway, dnsserver0, - dnsserver1, topic, serveraddr, - serverport, username, clientid, - privateserveraddr, privateserverport, privateserverusername, - org) - values (#{id,jdbcType=VARCHAR}, #{no,jdbcType=VARCHAR}, #{productname,jdbcType=VARCHAR}, - #{type,jdbcType=VARCHAR}, #{activestatus,jdbcType=INTEGER}, #{updatetime,jdbcType=VARCHAR}, - #{devicedesc,jdbcType=VARCHAR}, #{runstatus,jdbcType=INTEGER}, #{statusdesc,jdbcType=VARCHAR}, - #{kernel,jdbcType=VARCHAR}, #{webversion,jdbcType=VARCHAR}, #{ipaddr,jdbcType=VARCHAR}, - #{netmask,jdbcType=VARCHAR}, #{gateway,jdbcType=VARCHAR}, #{dnsserver0,jdbcType=VARCHAR}, - #{dnsserver1,jdbcType=VARCHAR}, #{topic,jdbcType=VARCHAR}, #{serveraddr,jdbcType=VARCHAR}, - #{serverport,jdbcType=VARCHAR}, #{username,jdbcType=VARCHAR}, #{clientid,jdbcType=VARCHAR}, - #{privateserveraddr,jdbcType=VARCHAR}, #{privateserverport,jdbcType=VARCHAR}, #{privateserverusername,jdbcType=VARCHAR}, - #{org,jdbcType=VARCHAR}) + insert into device_info (no, id, productname, + type, activestatus, updatetime, + devicedesc, runstatus, statusdesc, + kernel, webversion, ipaddr, + netmask, gateway, dnsserver0, + dnsserver1, topic, serveraddr, + serverport, username, clientid, + privateserveraddr, privateserverport, privateserverusername, + org) + values (#{no,jdbcType=VARCHAR}, #{id,jdbcType=VARCHAR}, #{productname,jdbcType=VARCHAR}, + #{type,jdbcType=VARCHAR}, #{activestatus,jdbcType=INTEGER}, #{updatetime,jdbcType=VARCHAR}, + #{devicedesc,jdbcType=VARCHAR}, #{runstatus,jdbcType=INTEGER}, #{statusdesc,jdbcType=VARCHAR}, + #{kernel,jdbcType=VARCHAR}, #{webversion,jdbcType=VARCHAR}, #{ipaddr,jdbcType=VARCHAR}, + #{netmask,jdbcType=VARCHAR}, #{gateway,jdbcType=VARCHAR}, #{dnsserver0,jdbcType=VARCHAR}, + #{dnsserver1,jdbcType=VARCHAR}, #{topic,jdbcType=VARCHAR}, #{serveraddr,jdbcType=VARCHAR}, + #{serverport,jdbcType=VARCHAR}, #{username,jdbcType=VARCHAR}, #{clientid,jdbcType=VARCHAR}, + #{privateserveraddr,jdbcType=VARCHAR}, #{privateserverport,jdbcType=VARCHAR}, #{privateserverusername,jdbcType=VARCHAR}, + #{org,jdbcType=VARCHAR}) insert into device_info - - id, - no, + + id, + productname, @@ -148,12 +146,12 @@ - - #{id,jdbcType=VARCHAR}, - #{no,jdbcType=VARCHAR}, + + #{id,jdbcType=VARCHAR}, + #{productname,jdbcType=VARCHAR}, @@ -228,6 +226,9 @@ update device_info + + id = #{id,jdbcType=VARCHAR}, + productname = #{productname,jdbcType=VARCHAR}, @@ -298,36 +299,35 @@ org = #{org,jdbcType=VARCHAR}, - where id = #{id,jdbcType=VARCHAR} - and no = #{no,jdbcType=VARCHAR} + where no = #{no,jdbcType=VARCHAR} update device_info - set productname = #{productname,jdbcType=VARCHAR}, - type = #{type,jdbcType=VARCHAR}, - activestatus = #{activestatus,jdbcType=INTEGER}, - updatetime = #{updatetime,jdbcType=VARCHAR}, - devicedesc = #{devicedesc,jdbcType=VARCHAR}, - runstatus = #{runstatus,jdbcType=INTEGER}, - statusdesc = #{statusdesc,jdbcType=VARCHAR}, - kernel = #{kernel,jdbcType=VARCHAR}, - webversion = #{webversion,jdbcType=VARCHAR}, - ipaddr = #{ipaddr,jdbcType=VARCHAR}, - netmask = #{netmask,jdbcType=VARCHAR}, - gateway = #{gateway,jdbcType=VARCHAR}, - dnsserver0 = #{dnsserver0,jdbcType=VARCHAR}, - dnsserver1 = #{dnsserver1,jdbcType=VARCHAR}, - topic = #{topic,jdbcType=VARCHAR}, - serveraddr = #{serveraddr,jdbcType=VARCHAR}, - serverport = #{serverport,jdbcType=VARCHAR}, - username = #{username,jdbcType=VARCHAR}, - clientid = #{clientid,jdbcType=VARCHAR}, - privateserveraddr = #{privateserveraddr,jdbcType=VARCHAR}, - privateserverport = #{privateserverport,jdbcType=VARCHAR}, - privateserverusername = #{privateserverusername,jdbcType=VARCHAR}, - org = #{org,jdbcType=VARCHAR} - where id = #{id,jdbcType=VARCHAR} - and no = #{no,jdbcType=VARCHAR} + set id = #{id,jdbcType=VARCHAR}, + productname = #{productname,jdbcType=VARCHAR}, + type = #{type,jdbcType=VARCHAR}, + activestatus = #{activestatus,jdbcType=INTEGER}, + updatetime = #{updatetime,jdbcType=VARCHAR}, + devicedesc = #{devicedesc,jdbcType=VARCHAR}, + runstatus = #{runstatus,jdbcType=INTEGER}, + statusdesc = #{statusdesc,jdbcType=VARCHAR}, + kernel = #{kernel,jdbcType=VARCHAR}, + webversion = #{webversion,jdbcType=VARCHAR}, + ipaddr = #{ipaddr,jdbcType=VARCHAR}, + netmask = #{netmask,jdbcType=VARCHAR}, + gateway = #{gateway,jdbcType=VARCHAR}, + dnsserver0 = #{dnsserver0,jdbcType=VARCHAR}, + dnsserver1 = #{dnsserver1,jdbcType=VARCHAR}, + topic = #{topic,jdbcType=VARCHAR}, + serveraddr = #{serveraddr,jdbcType=VARCHAR}, + serverport = #{serverport,jdbcType=VARCHAR}, + username = #{username,jdbcType=VARCHAR}, + clientid = #{clientid,jdbcType=VARCHAR}, + privateserveraddr = #{privateserveraddr,jdbcType=VARCHAR}, + privateserverport = #{privateserverport,jdbcType=VARCHAR}, + privateserverusername = #{privateserverusername,jdbcType=VARCHAR}, + org = #{org,jdbcType=VARCHAR} + where no = #{no,jdbcType=VARCHAR}