From f3ebfe025f731e0e4867c24ccceee52e7f08f998 Mon Sep 17 00:00:00 2001 From: wty <419034340@qq.com> Date: Mon, 14 Nov 2022 11:17:46 +0800 Subject: [PATCH 1/3] add data forwarding function include 1. data select 2. data forwadrding by email --- xiuosiot-backend/pom.xml | 13 ++ .../java/com/aiit/xiuos/Utils/EmailUtil.java | 96 +++++++++++ .../aiit/xiuos/Utils/TDengineJDBCUtil.java | 150 ------------------ .../xiuos/controller/AlarmInfoController.java | 10 ++ .../xiuos/controller/AlarmRuleController.java | 9 +- .../controller/DataForwardController.java | 95 +++++++++++ .../controller/DeviceDataController.java | 20 ++- .../dao/mappers/DataForwardingMapper.java | 9 ++ .../com/aiit/xiuos/model/DataForwarding.java | 4 +- .../aiit/xiuos/scheduled/TaskScheduled.java | 16 +- .../xiuos/service/DataForwardService.java | 9 +- .../service/impl/DataForwardServiceImpl.java | 21 ++- .../src/main/resources/application-local.yml | 3 + .../src/main/resources/application-prod.yml | 3 +- .../test/java/com/aiit/xiuos/MqttTest.java | 36 +++++ 15 files changed, 315 insertions(+), 179 deletions(-) delete mode 100644 xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/TDengineJDBCUtil.java create mode 100644 xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java diff --git a/xiuosiot-backend/pom.xml b/xiuosiot-backend/pom.xml index dd2e03c..8993e6e 100644 --- a/xiuosiot-backend/pom.xml +++ b/xiuosiot-backend/pom.xml @@ -94,6 +94,12 @@ spring-boot-configuration-processor true + + + org.springframework.boot + spring-boot-starter-data-redis + 1.5.7.RELEASE + org.springframework.boot @@ -102,6 +108,13 @@ + + org.springframework.boot + spring-boot-starter-mail + + + + diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/EmailUtil.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/EmailUtil.java index c24f6bb..348cebf 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/EmailUtil.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/EmailUtil.java @@ -1,4 +1,100 @@ package com.aiit.xiuos.Utils; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.mail.SimpleMailMessage; +import org.springframework.mail.javamail.JavaMailSender; +import org.springframework.mail.javamail.MimeMessageHelper; +import org.springframework.stereotype.Component; + +import javax.mail.MessagingException; +import javax.mail.internet.MimeMessage; +import java.io.File; +@Component public class EmailUtil { + + + + @Autowired + private JavaMailSender mailSender; + + private String from ="419034340@qq.com"; + + /** + * 发送纯文本邮件信息 + * + * @param to 接收方 + * @param subject 邮件主题 + * @param content 邮件内容(发送内容) + */ + public void sendMessage(String to, String subject, String content) { + // 创建一个邮件对象 + SimpleMailMessage msg = new SimpleMailMessage(); + msg.setFrom(from); // 设置发送发 + msg.setTo(to); // 设置接收方 + msg.setSubject(subject); // 设置邮件主题 + msg.setText(content); // 设置邮件内容 + // 发送邮件 + mailSender.send(msg); + } + + /** + * 发送带附件的邮件信息 + * + * @param to 接收方 + * @param subject 邮件主题 + * @param content 邮件内容(发送内容) + * @param files 文件数组 // 可发送多个附件 + */ + public void sendMessageCarryFiles(String to, String subject, String content, File[] files) { + MimeMessage mimeMessage = mailSender.createMimeMessage(); + try { + MimeMessageHelper helper = new MimeMessageHelper(mimeMessage,true); + helper.setFrom(from); // 设置发送发 + helper.setTo(to); // 设置接收方 + helper.setSubject(subject); // 设置邮件主题 + helper.setText(content); // 设置邮件内容 + if (files != null && files.length > 0) { // 添加附件(多个) + for (File file : files) { + helper.addAttachment(file.getName(), file); + } + } + } catch (MessagingException e) { + e.printStackTrace(); + } + // 发送邮件 + mailSender.send(mimeMessage); + } + /** + * 发送带附件的邮件信息 + * + * @param to 接收方 + * @param subject 邮件主题 + * @param content 邮件内容(发送内容) + * @param file 单个文件 + */ + public void sendMessageCarryFile(String to, String subject, String content, File file) { + MimeMessage mimeMessage = mailSender.createMimeMessage(); + try { + MimeMessageHelper helper = new MimeMessageHelper(mimeMessage,true); + helper.setFrom(from); // 设置发送发 + helper.setTo(to); // 设置接收方 + helper.setSubject(subject); // 设置邮件主题 + helper.setText(content); // 设置邮件内容 + helper.addAttachment(file.getName(), file); // 单个附件 + } catch (MessagingException e) { + e.printStackTrace(); + } + // 发送邮件 + mailSender.send(mimeMessage); + } + + public String getFrom() { + return from; + } + + public void setFrom(String from) { + this.from = from; + } } + diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/TDengineJDBCUtil.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/TDengineJDBCUtil.java deleted file mode 100644 index 83adecf..0000000 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/TDengineJDBCUtil.java +++ /dev/null @@ -1,150 +0,0 @@ -package com.aiit.xiuos.Utils; - -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import com.taosdata.jdbc.TSDBDriver; -import lombok.extern.slf4j.Slf4j; - -import java.sql.*; -import java.util.ArrayList; -import java.util.Properties; -@Slf4j -public class TDengineJDBCUtil { - public static Connection getConn() throws Exception{ - Class.forName("com.taosdata.jdbc.TSDBDriver"); - String jdbcUrl = "jdbc:TAOS://xiuosiot.taosnode1:6030/xiuosiot?user=root&password=taosdata"; - Properties connProps = new Properties(); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); - connProps.setProperty("debugFlag", "135"); - connProps.setProperty("maxSQLLength", "1048576"); - Connection conn = DriverManager.getConnection(jdbcUrl, connProps); - return conn; - } - - public static Connection getRestConn() throws Exception{ - Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); - String jdbcUrl = "jdbc:TAOS-RS://10.0.30.23:6041/test?user=root&password=taosdata"; - Properties connProps = new Properties(); - connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true"); - Connection conn = DriverManager.getConnection(jdbcUrl, connProps); - return conn; - } - - - public static boolean createTable(String deviceno, String type, String org, String productname){ - Connection connection = null; - try { - connection = TDengineJDBCUtil.getConn(); - Statement stmt = connection.createStatement(); - String devicetype=TDengineJDBCUtil.changeType(type); - if(devicetype==null){ - return false; - } - // create table - String sql ="create table if not exists " + deviceno + " using "+devicetype+" tags("+"\""+org+"\","+"\"" + productname+"\")"; - stmt.executeUpdate(sql); - return true; - } catch (Exception e) { - e.printStackTrace(); - }finally { - try { - if(connection!=null){ - connection.close(); - } - - } catch (SQLException throwables) { - throwables.printStackTrace(); - } - - } - return false; - - } - - public static String changeType(String type){ - if(type.equals("M168-LoRa-FM100")) return "M168type"; - if(type.equals("RV400-NPU16T-5G-AR100")) return "RV400ARtype"; - if(type.equals("RV400-NPU4T-5G-SR100")) return "RV400SRtype"; - if(type.equals("M528-A800-5G-HM100")) return "M528type"; - if(type.equals("RV400-4G-FR100")) return "RV400FRtype"; - return null; - } - - public static ArrayList executeSql(String sql) throws Exception { - Connection connection = null; - ArrayList arrayList =new ArrayList<>(); - try { - connection = TDengineJDBCUtil.getConn(); - Statement stmt = connection.createStatement(); - ResultSet resultSet =stmt.executeQuery(sql); - log.info("tdengine executeQuery:"+sql); - - ResultSetMetaData metaData = resultSet.getMetaData(); - while (resultSet.next()) { - StringBuilder sb=new StringBuilder(); - for (int i = 1; i <= metaData.getColumnCount(); i++) { - String columnLabel = metaData.getColumnLabel(i); - String value = resultSet.getString(i); - sb.append(columnLabel+":"+value+" "); - } - arrayList.add(sb.toString()); - System.out.println(sb.toString()); - } - return arrayList; - } catch (SQLException e) { - System.out.println("ERROR Message: " + e.getMessage()); - System.out.println("ERROR Code: " + e.getErrorCode()); - e.printStackTrace(); - }finally { - try { - if(connection!=null){ - connection.close(); - } - - } catch (SQLException throwables) { - throwables.printStackTrace(); - } - - } - return null; - - } - - public static int getCount(String sql) throws Exception { - Connection connection = null; - int count =-1; - try { - connection = TDengineJDBCUtil.getConn(); - Statement stmt = connection.createStatement(); - ResultSet resultSet =stmt.executeQuery(sql); - log.info("tdengine executeQuery:"+sql); - ResultSetMetaData metaData = resultSet.getMetaData(); - if (resultSet.next()) { - count = resultSet.getInt(1); - } - return count; - } catch (SQLException e) { - System.out.println("ERROR Message: " + e.getMessage()); - System.out.println("ERROR Code: " + e.getErrorCode()); - e.printStackTrace(); - }finally { - try { - if(connection!=null){ - connection.close(); - } - - } catch (SQLException throwables) { - throwables.printStackTrace(); - } - - } - return count; - - } - - - -} diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/AlarmInfoController.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/AlarmInfoController.java index 3210fb3..d612a21 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/AlarmInfoController.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/AlarmInfoController.java @@ -45,6 +45,16 @@ public class AlarmInfoController { return new ResultRespons(Constant.ERROR_CODE,"更新告警信息失败!"); } + + @PostMapping("/add") + public ResultRespons addAlarmInfo(@RequestBody AlarmInfo alarmInfo){ + int res =alarmInfoService.addAlarmInfo(alarmInfo); + if(res==1){ + return new ResultRespons(Constant.SUCCESS_CODE,"新增告警信息成功!"); + } + return new ResultRespons(Constant.ERROR_CODE,"新增告警信息失败!"); + } + @GetMapping("/getAlarmLevelCount") public ResultRespons getAlarmLevelCount(HttpServletRequest request){ UserInfo userInfo =(UserInfo) request.getSession().getAttribute("user"); diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/AlarmRuleController.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/AlarmRuleController.java index 28e8d81..7a8890c 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/AlarmRuleController.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/AlarmRuleController.java @@ -63,12 +63,5 @@ public class AlarmRuleController { return new ResultRespons(Constant.ERROR_CODE,"删除告警规则失败"); } - @GetMapping("/executeAlarmTask") - public void executeTask(){ - try { - taskScheduled.ExecuteRule(); - } catch (Exception e) { - e.printStackTrace(); - } - } + } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/DataForwardController.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/DataForwardController.java index d603593..a561692 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/DataForwardController.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/DataForwardController.java @@ -1,4 +1,99 @@ package com.aiit.xiuos.controller; +import com.aiit.xiuos.Utils.*; +import com.aiit.xiuos.model.DataForwarding; +import com.aiit.xiuos.model.UserInfo; +import com.aiit.xiuos.service.DataForwardService; +import com.aiit.xiuos.tdengine.TDengineJDBCUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; + +import javax.servlet.http.HttpServletRequest; +import java.io.*; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +@RestController +@RequestMapping("/dataForward") +@Slf4j public class DataForwardController { + @Autowired + DataForwardService dataForwardService; + @Autowired + EmailUtil emailUtil; + + @PostMapping("/addRecord") + public ResultRespons addRecord(@RequestBody Map jsonMap) { + boolean flag =false; + List dataList =(List) jsonMap.get("datalist"); + DataForwarding dataForwarding =new DataForwarding(); + dataForwarding.setAddress((String)jsonMap.get("address")); + dataForwarding.setContent((String)jsonMap.get("content")); + dataForwarding.setOrg((String)jsonMap.get("org")); + dataForwarding.setSql((String)jsonMap.get("sql")); + dataForwarding.setTopic((String)jsonMap.get("topic")); + dataForwarding.setType((String)jsonMap.get("type")); + dataForwarding.setTime(MyUtils.getDateTime()); + try { + BufferedWriter bufferedWriter = new BufferedWriter( + new OutputStreamWriter(new FileOutputStream("/home/aiitadmin/email/data.txt"), "UTF-8")); + for (String s : dataList) { + bufferedWriter.write(s); + bufferedWriter.newLine(); + bufferedWriter.flush(); + } + emailUtil.sendMessageCarryFile(dataForwarding.getAddress(),dataForwarding.getTopic(),dataForwarding.getContent(),new File("/home/aiitadmin/email/data.txt")); + flag =true; + } catch (FileNotFoundException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } catch (IOException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + }catch (Exception e){ + e.printStackTrace(); + log.info("邮件发送失败"); + } + + if(flag){ + int res = dataForwardService.addRecord(dataForwarding); + if(res==1){ + return new ResultRespons(Constant.SUCCESS_CODE,"邮件发送成功,记录已保存"); + }else { + return new ResultRespons(Constant.ERROR_CODE, "邮件发送成功,记录保存失败"); + } + }else { + return new ResultRespons(Constant.ERROR_CODE,"邮件发送失败"); + } + + + } + + @GetMapping("/executeSql") + public ResultRespons executeSql(@RequestParam String sql){ + ArrayList resultData = null; + try { + resultData = TDengineJDBCUtil.executeSql(sql); + } catch (Exception e) { + e.printStackTrace(); + } + if(resultData!=null){ + return new ResultRespons(Constant.SUCCESS_CODE,"数据查询成功",resultData); + } + return new ResultRespons(Constant.ERROR_CODE,"数据查询失败,请检查查询语句"); + + } + + @GetMapping("/getRecord") + public ResultRespons getRecord(HttpServletRequest request){ + UserInfo userInfo =(UserInfo) request.getSession().getAttribute("user"); + List lists = dataForwardService.selectRecord(userInfo.getOrg()); + if(lists != null && lists.size()>=0){ + return new ResultRespons(Constant.SUCCESS_CODE,"数据查询成功",lists); + } + return new ResultRespons(Constant.ERROR_CODE,"数据查询失败,请检查查询语句"); + + } } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/DeviceDataController.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/DeviceDataController.java index 0f664b9..f935b58 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/DeviceDataController.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/DeviceDataController.java @@ -3,23 +3,25 @@ package com.aiit.xiuos.controller; import com.aiit.xiuos.Utils.Constant; import com.aiit.xiuos.Utils.HttpClientUtil; import com.aiit.xiuos.Utils.ResultRespons; -import com.aiit.xiuos.dao.mappers.AvgDayDataMapper; import com.aiit.xiuos.model.AvgDayData; +import com.aiit.xiuos.mqtt.MqttConfiguration; +import com.aiit.xiuos.mqtt.MyMqttClient; import com.aiit.xiuos.service.AvgDayDataService; import com.alibaba.fastjson.JSONObject; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.RequestMapping; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import java.util.List; +import java.util.Map; @RestController @RequestMapping("/data") public class DeviceDataController { @Autowired AvgDayDataService avgDayDataService; + @Autowired + private MyMqttClient myMqttClient; @GetMapping("/getAll") public ResultRespons getAllDataFromDSD(HttpServletRequest request){ @@ -38,4 +40,14 @@ public class DeviceDataController { return new ResultRespons(Constant.ERROR_CODE,"查询数据失败"); } + @PostMapping("/publishData") + public ResultRespons publishData(@RequestBody Map jsonMap){ + if(myMqttClient==null){ + myMqttClient=new MqttConfiguration().getMqttPushClient(); + } + myMqttClient.publish("xiuosiot/"+jsonMap.get("deviceno"), JSONObject.toJSONString(jsonMap)); + return new ResultRespons(Constant.ERROR_CODE,"数据发送成功"); + } + + } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/dao/mappers/DataForwardingMapper.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/dao/mappers/DataForwardingMapper.java index ff84833..27f627c 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/dao/mappers/DataForwardingMapper.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/dao/mappers/DataForwardingMapper.java @@ -1,7 +1,13 @@ package com.aiit.xiuos.dao.mappers; import com.aiit.xiuos.model.DataForwarding; +import org.apache.ibatis.annotations.Param; +import org.apache.ibatis.annotations.Select; +import org.springframework.stereotype.Repository; +import java.util.List; + +@Repository public interface DataForwardingMapper { int deleteByPrimaryKey(Integer id); @@ -14,4 +20,7 @@ public interface DataForwardingMapper { int updateByPrimaryKeySelective(DataForwarding record); int updateByPrimaryKey(DataForwarding record); + + @Select("select * from data_forwarding where org = #{org}") + List selectAll(@Param("org") String org); } \ No newline at end of file diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/model/DataForwarding.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/model/DataForwarding.java index e08ee25..eb441f6 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/model/DataForwarding.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/model/DataForwarding.java @@ -1,6 +1,8 @@ package com.aiit.xiuos.model; import java.util.Date; + +import com.fasterxml.jackson.annotation.JsonFormat; import lombok.Builder; import lombok.Data; @@ -14,7 +16,7 @@ public class DataForwarding { private String content; private String type; - + @JsonFormat(pattern ="yyyy-MM-dd HH:mm:ss",timezone = "GMT+8") private Date time; private String address; diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/scheduled/TaskScheduled.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/scheduled/TaskScheduled.java index 33d71e3..f56ae5e 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/scheduled/TaskScheduled.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/scheduled/TaskScheduled.java @@ -2,17 +2,13 @@ package com.aiit.xiuos.scheduled; import com.aiit.xiuos.Utils.EmailUtil; import com.aiit.xiuos.Utils.MyUtils; -import com.aiit.xiuos.Utils.TDengineJDBCUtil; +import com.aiit.xiuos.tdengine.TDengineJDBCUtil; import com.aiit.xiuos.model.*; import com.aiit.xiuos.service.*; -import com.aiit.xiuos.service.impl.AvgDayDataServiceImpl; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; -import org.checkerframework.checker.units.qual.A; import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.context.annotation.Lazy; -import org.springframework.scheduling.annotation.Async; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -134,16 +130,10 @@ public class TaskScheduled { //保存告警信息。告警级别高的插入,低的因主键冲突可自动排除。 try{ alarmInfoService.addAlarmInfo(alarmInfo); - }catch (Exception e){ - e.printStackTrace(); - } - //发送邮件 - try{ emailUtil.sendMessage(alarmRule.getNoticeAddress(),"设备告警",alarmRule.getNoticeContent()); log.info("邮件发送成功"); - }catch (Exception e) { - log.error("邮件发送失败"); - e.printStackTrace(); + }catch (Exception e){ + log.info(e.getMessage()); } } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/DataForwardService.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/DataForwardService.java index b28c7c6..4050837 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/DataForwardService.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/DataForwardService.java @@ -1,4 +1,11 @@ package com.aiit.xiuos.service; -public interface DataForward { +import com.aiit.xiuos.model.DataForwarding; + +import java.util.List; + +public interface DataForwardService { + int addRecord(DataForwarding dataForwarding); + List selectRecord(String org); + } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DataForwardServiceImpl.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DataForwardServiceImpl.java index 14789f4..fb0c71e 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DataForwardServiceImpl.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DataForwardServiceImpl.java @@ -1,4 +1,23 @@ package com.aiit.xiuos.service.impl; -public class DataForwardServiceImpl { +import com.aiit.xiuos.dao.mappers.DataForwardingMapper; +import com.aiit.xiuos.model.DataForwarding; +import com.aiit.xiuos.service.DataForwardService; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Service; + +import java.util.List; +@Service +public class DataForwardServiceImpl implements DataForwardService { + @Autowired + DataForwardingMapper dataForwardingMapper; + @Override + public int addRecord(DataForwarding dataForwarding) { + return dataForwardingMapper.insertSelective(dataForwarding); + } + + @Override + public List selectRecord(String org) { + return dataForwardingMapper.selectAll(org); + } } diff --git a/xiuosiot-backend/src/main/resources/application-local.yml b/xiuosiot-backend/src/main/resources/application-local.yml index d8002f8..e34b40f 100644 --- a/xiuosiot-backend/src/main/resources/application-local.yml +++ b/xiuosiot-backend/src/main/resources/application-local.yml @@ -76,6 +76,9 @@ mqtt: #默认主题 default-topic: xiuosiot/# +tdengine: + url: jdbc:TAOS://taosnode1:6030/xiuosiot?user=root&password=taosdata + logging: file: name: xiuosiot.log diff --git a/xiuosiot-backend/src/main/resources/application-prod.yml b/xiuosiot-backend/src/main/resources/application-prod.yml index 83fddab..1c5e4c7 100644 --- a/xiuosiot-backend/src/main/resources/application-prod.yml +++ b/xiuosiot-backend/src/main/resources/application-prod.yml @@ -69,7 +69,8 @@ mqtt: keepalive: 100 #默认主题 default-topic: xiuosiot/# - +tdengine: + url: jdbc:TAOS://xiuosiot.taosnode1:6030/xiuosiot?user=root&password=taosdata logging: file: name: xiuosiot.log diff --git a/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java b/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java new file mode 100644 index 0000000..c224718 --- /dev/null +++ b/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java @@ -0,0 +1,36 @@ +package com.aiit.xiuos; + + +import com.aiit.xiuos.mqtt.MyMqttClient; +import com.alibaba.fastjson.JSONObject; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; + +import java.util.Random; + +@SpringBootTest(classes = XiuosApplication.class,webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class MqttTest { + @Autowired + private MyMqttClient myMqttClient; + + @Test + void pushlish() { + Random random =new Random(); + for (int i = 0; i < 1; i++) { + JSONObject jsonObject =new JSONObject(); + jsonObject.put("co2",19); + jsonObject.put("so2",19); + jsonObject.put("temperature",30); + jsonObject.put("humidity",28); + jsonObject.put("deviceno","a000001"); + myMqttClient.publish("xiuosiot/A000001", jsonObject.toJSONString()); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + +} \ No newline at end of file From aec8fc7bcfdf780c649a3938498d95e9a4d238ac Mon Sep 17 00:00:00 2001 From: wty <419034340@qq.com> Date: Mon, 14 Nov 2022 14:51:23 +0800 Subject: [PATCH 2/3] get device data from jiangxishida --- .../xiuos/controller/ExcludeController.java | 53 +++++- .../aiit/xiuos/service/DeviceInfoService.java | 1 + .../service/impl/DeviceInfoServiceImpl.java | 10 + .../aiit/xiuos/socket/WebSocketServer.java | 45 ++--- .../aiit/xiuos/tdengine/TDengineJDBCUtil.java | 174 ++++++++++++++++++ .../aiit/xiuos/tdengine/tdengineConfig.java | 14 ++ .../mappers/DataForwardingMapper.xml | 130 +++++++++++++ 7 files changed, 400 insertions(+), 27 deletions(-) create mode 100644 xiuosiot-backend/src/main/java/com/aiit/xiuos/tdengine/TDengineJDBCUtil.java create mode 100644 xiuosiot-backend/src/main/java/com/aiit/xiuos/tdengine/tdengineConfig.java create mode 100644 xiuosiot-backend/src/main/resources/mappers/DataForwardingMapper.xml diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/ExcludeController.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/ExcludeController.java index fadc9ee..3149316 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/ExcludeController.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/controller/ExcludeController.java @@ -1,4 +1,55 @@ package com.aiit.xiuos.controller; +import com.aiit.xiuos.Utils.MyUtils; +import com.aiit.xiuos.scheduled.TaskScheduled; +import com.aiit.xiuos.service.impl.ProtocolServiceImpl; +import com.aiit.xiuos.socket.WebSocketServer; +import com.aiit.xiuos.tdengine.TDengineJDBCUtil; +import com.alibaba.fastjson.JSONArray; +import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.*; -public class EecludeController { +import javax.servlet.http.HttpServletRequest; +import java.text.ParseException; +@RestController +@RequestMapping("/exclude") +@Slf4j +public class ExcludeController { + @Autowired + ProtocolServiceImpl protocolService; + @Autowired + TaskScheduled taskScheduled; + + @Autowired + WebSocketServer webSocketServer; + @PostMapping("/addjxsd") + public void addJXSD(@RequestBody String jsonString, HttpServletRequest request) throws ParseException { + System.out.println("收到的jsonString:"+jsonString); + JSONObject jsonObject = MyUtils.StringToJson(jsonString); + int valueNum = jsonObject.getInteger("valueNum"); + String client =jsonObject.getString("org"); + webSocketServer.sendToMessageById(client,jsonString); + String deviceId= jsonObject.getString("deviceId"); + if(valueNum==72){ + JSONArray jsonArray =jsonObject.getJSONArray("readItemList"); + StringBuilder sql =new StringBuilder("insert into jxsd_1500 values(now"); + for(int i=0;i selectbyNo(String no); List> getDeviceTypeCount(String org); List> getDeviceRunStautsCount(String org); + String getTypeByName(String deviceNo); } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DeviceInfoServiceImpl.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DeviceInfoServiceImpl.java index b1c8a61..0777c06 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DeviceInfoServiceImpl.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/service/impl/DeviceInfoServiceImpl.java @@ -14,6 +14,7 @@ import java.util.Map; public class DeviceInfoServiceImpl implements DeviceInfoService { @Autowired private DeviceInfoMapper deviceInfoMapper; + @Override public int addDevice(DeviceInfo deviceInfo) { return deviceInfoMapper.insert(deviceInfo); @@ -54,4 +55,13 @@ public class DeviceInfoServiceImpl implements DeviceInfoService { public List> getDeviceRunStautsCount(String org) { return deviceInfoMapper.getDeviceRunStatusCount(org); } + + @Override + public String getTypeByName(String deviceNo) { + List types = deviceInfoMapper.selectTypeByNo(deviceNo); + if(types!=null && types.size()>0){ + return types.get(0); + } + return null; + } } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/socket/WebSocketServer.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/socket/WebSocketServer.java index 8869a88..ec80785 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/socket/WebSocketServer.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/socket/WebSocketServer.java @@ -1,8 +1,6 @@ package com.aiit.xiuos.socket; - import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; - import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; @@ -11,7 +9,7 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -@ServerEndpoint(value = "/{paramName}") +@ServerEndpoint(value = "/websocket/{clientId}") @Component @Slf4j public class WebSocketServer { @@ -42,29 +40,22 @@ public class WebSocketServer { /** * 连接建立成功调用的方法*/ @OnOpen - public void onOpen(@PathParam(value = "paramName") String param, Session session) { - if(numofAccount.containsKey(param)){ - Integer num=numofAccount.get(param); - String paramtmp=param; - param=param+num; - System.out.println(param); - numofAccount.put(paramtmp,num+1); - }else{ - numofAccount.put(param,1); - } + public void onOpen(@PathParam(value = "clientId") String clientId, Session session) { + //接收到发送消息的人员编号 - name = param; + name = clientId; this.session = session; /**加入set中*/ - webSocketSet.put(param,this); + webSocketSet.put(clientId,this); /**在线数加1*/ addOnlineCount(); - System.out.println("有新连接"+param+"加入!当前在线人数为" + getOnlineCount()); - //try { - //sendMessage("-连接已建立-"); - //} catch (IOException e) { - // System.out.println("IO异常"); - // } + System.out.println("有新连接"+clientId+"加入!当前在线人数为" + getOnlineCount()); + log.info("有新连接"+clientId+"加入!当前在线人数为" + getOnlineCount()); + try { + sendMessage(clientId+"-连接已建立-"); + } catch (IOException e) { + System.out.println("IO异常"); + } } /** @@ -78,6 +69,7 @@ public class WebSocketServer { /** 在线数减1 */ subOnlineCount(); System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount()); + log.info("有一连接"+name+"关闭!当前在线人数为" + getOnlineCount()); } } @@ -88,11 +80,11 @@ public class WebSocketServer { @OnMessage public void onMessage(String message, Session session) { System.out.println("来自客户端的消息:" + message); - + log.info("来自客户端的消息:" + message); try { - this.sendMessage(message); + this.sendMessage("收到!"); } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } } @@ -122,9 +114,10 @@ public class WebSocketServer { webSocketSet.get(id).sendMessage(message); } else { System.out.println("webSocketSet中没有此key,不推送消息"); + log.info("webSocketSet中没有此key,不推送消息"); } } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } } @@ -140,7 +133,7 @@ public class WebSocketServer { value.sendMessage(message); } }catch(IOException e){ - e.printStackTrace(); + log.error(e.getMessage()); } } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/tdengine/TDengineJDBCUtil.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/tdengine/TDengineJDBCUtil.java new file mode 100644 index 0000000..d84dabd --- /dev/null +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/tdengine/TDengineJDBCUtil.java @@ -0,0 +1,174 @@ +package com.aiit.xiuos.tdengine; + +import com.taosdata.jdbc.TSDBDriver; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; + +import java.sql.*; +import java.util.ArrayList; +import java.util.Properties; +@Slf4j +public class TDengineJDBCUtil { + public static Connection getConn() throws Exception{ + Class.forName("com.taosdata.jdbc.TSDBDriver"); + Properties connProps = new Properties(); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8"); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8"); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8"); + connProps.setProperty("debugFlag", "135"); + connProps.setProperty("maxSQLLength", "1048576"); + log.info(tdengineConfig.jdbcurl); + Connection conn = DriverManager.getConnection(tdengineConfig.jdbcurl, connProps); + return conn; + } + + public static Connection getRestConn() throws Exception{ + Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); + String jdbcUrl = "jdbc:TAOS-RS://taosnode1:6041/test?user=root&password=taosdata"; + Properties connProps = new Properties(); + connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true"); + Connection conn = DriverManager.getConnection(jdbcUrl, connProps); + return conn; + } + + + public static boolean createTable(String deviceno, String type, String org, String productname){ + Connection connection = null; + try { + connection = TDengineJDBCUtil.getConn(); + Statement stmt = connection.createStatement(); + String devicetype=TDengineJDBCUtil.changeType(type); + if(devicetype==null){ + return false; + } + // create table + String sql ="create table if not exists " + deviceno + " using "+devicetype+" tags("+"\""+org+"\","+"\"" + productname+"\")"; + stmt.executeUpdate(sql); + return true; + } catch (Exception e) { + log.error(e.getMessage()); + }finally { + try { + if(connection!=null){ + connection.close(); + } + + } catch (SQLException throwables) { + throwables.printStackTrace(); + } + + } + return false; + + } + + public static String changeType(String type){ + if(type.equals("M168-LoRa-FM100")) return "M168_LoRa_FM100"; + if(type.equals("RV400-NPU16T-5G-AR100")) return "RV400_NPU16T_5G_AR100"; + if(type.equals("RV400-NPU4T-5G-SR100")) return "RV400_NPU4T_5G_SR100"; + if(type.equals("M528-A800-5G-HM100")) return "M528_A800_5G_HM100"; + if(type.equals("RV400-4G-FR100")) return "RV400_4G_FR100"; + return null; + } + + public static ArrayList executeSql(String sql) throws Exception { + Connection connection = null; + ArrayList arrayList =new ArrayList<>(); + try { + connection = TDengineJDBCUtil.getConn(); + Statement stmt = connection.createStatement(); + ResultSet resultSet =stmt.executeQuery(sql); + log.info("tdengine executeQuery:"+sql); + + ResultSetMetaData metaData = resultSet.getMetaData(); + while (resultSet.next()) { + StringBuilder sb=new StringBuilder(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { + String columnLabel = metaData.getColumnLabel(i); + String value = resultSet.getString(i); + sb.append(columnLabel+":"+value+" "); + } + arrayList.add(sb.toString()); + System.out.println(sb.toString()); + } + return arrayList; + } catch (SQLException e) { + System.out.println("ERROR Message: " + e.getMessage()); + System.out.println("ERROR Code: " + e.getErrorCode()); + log.error(e.getMessage()); + }finally { + try { + if(connection!=null){ + connection.close(); + } + + } catch (SQLException throwables) { + throwables.printStackTrace(); + } + + } + return null; + + } + + public static int getCount(String sql) throws Exception { + Connection connection = null; + int count =0; + try { + connection = TDengineJDBCUtil.getConn(); + Statement stmt = connection.createStatement(); + log.info("tdengine executeQuery:"+sql); + ResultSet resultSet =stmt.executeQuery(sql); + + ResultSetMetaData metaData = resultSet.getMetaData(); + if (resultSet.next()) { + count = resultSet.getInt(1); + } + return count; + } catch (SQLException e) { + System.out.println("ERROR Message: " + e.getMessage()); + System.out.println("ERROR Code: " + e.getErrorCode()); + log.error(e.getMessage()); + }finally { + try { + if(connection!=null){ + connection.close(); + } + + } catch (SQLException throwables) { + throwables.printStackTrace(); + } + + } + return count; + + } + + public static void executeInsertSql(String sql) { + Connection connection = null; + try { + connection = TDengineJDBCUtil.getConn(); + Statement stmt = connection.createStatement(); + log.info("tdengine executeQuery:"+sql); + stmt.executeUpdate(sql); + } catch (SQLException e) { + System.out.println("ERROR Message: " + e.getMessage()); + System.out.println("ERROR Code: " + e.getErrorCode()); + log.error(e.getMessage()); + } catch (Exception e) { + log.error(e.getMessage()); + } finally { + try { + if(connection!=null){ + connection.close(); + } + } catch (SQLException throwables) { + throwables.printStackTrace(); + } + + } + } + + + +} diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/tdengine/tdengineConfig.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/tdengine/tdengineConfig.java new file mode 100644 index 0000000..e3b6dcc --- /dev/null +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/tdengine/tdengineConfig.java @@ -0,0 +1,14 @@ +package com.aiit.xiuos.tdengine; + +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Component +public class tdengineConfig { + public static String jdbcurl; + @Value("${tdengine.url}") + public void setJdbcurl(String jdbcurl){ + tdengineConfig.jdbcurl =jdbcurl; + } + +} diff --git a/xiuosiot-backend/src/main/resources/mappers/DataForwardingMapper.xml b/xiuosiot-backend/src/main/resources/mappers/DataForwardingMapper.xml new file mode 100644 index 0000000..8eb1b6a --- /dev/null +++ b/xiuosiot-backend/src/main/resources/mappers/DataForwardingMapper.xml @@ -0,0 +1,130 @@ + + + + + + + + + + + + + + + + + id, topic, content, type, time, address, org, sql + + + + delete from data_forwarding + where id = #{id,jdbcType=INTEGER} + + + insert into data_forwarding (id, topic, content, + type, time, address, + org, sql) + values (#{id,jdbcType=INTEGER}, #{topic,jdbcType=VARCHAR}, #{content,jdbcType=VARCHAR}, + #{type,jdbcType=VARCHAR}, #{time,jdbcType=TIMESTAMP}, #{address,jdbcType=VARCHAR}, + #{org,jdbcType=VARCHAR}, #{sql,jdbcType=VARCHAR}) + + + insert into data_forwarding + + + id, + + + topic, + + + content, + + + type, + + + time, + + + address, + + + org, + + + sql, + + + + + #{id,jdbcType=INTEGER}, + + + #{topic,jdbcType=VARCHAR}, + + + #{content,jdbcType=VARCHAR}, + + + #{type,jdbcType=VARCHAR}, + + + #{time,jdbcType=TIMESTAMP}, + + + #{address,jdbcType=VARCHAR}, + + + #{org,jdbcType=VARCHAR}, + + + #{sql,jdbcType=VARCHAR}, + + + + + update data_forwarding + + + topic = #{topic,jdbcType=VARCHAR}, + + + content = #{content,jdbcType=VARCHAR}, + + + type = #{type,jdbcType=VARCHAR}, + + + time = #{time,jdbcType=TIMESTAMP}, + + + address = #{address,jdbcType=VARCHAR}, + + + org = #{org,jdbcType=VARCHAR}, + + + sql = #{sql,jdbcType=VARCHAR}, + + + where id = #{id,jdbcType=INTEGER} + + + update data_forwarding + set topic = #{topic,jdbcType=VARCHAR}, + content = #{content,jdbcType=VARCHAR}, + type = #{type,jdbcType=VARCHAR}, + time = #{time,jdbcType=TIMESTAMP}, + address = #{address,jdbcType=VARCHAR}, + org = #{org,jdbcType=VARCHAR}, + sql = #{sql,jdbcType=VARCHAR} + where id = #{id,jdbcType=INTEGER} + + \ No newline at end of file From 46452af4686275a5c9eb2ef40fa34db18c6e106b Mon Sep 17 00:00:00 2001 From: wty <419034340@qq.com> Date: Mon, 14 Nov 2022 17:27:19 +0800 Subject: [PATCH 3/3] add connect to emqx function --- .../com/aiit/xiuos/Utils/HttpClientUtil.java | 18 +-- .../java/com/aiit/xiuos/Utils/MyUtils.java | 12 +- .../aiit/xiuos/mqtt/MqttConfiguration.java | 59 +++++++++ .../com/aiit/xiuos/mqtt/MyMqttClient.java | 121 ++++++++++++++++++ .../com/aiit/xiuos/mqtt/PushCallback.java | 55 ++++++++ .../src/main/resources/application-prod.yml | 2 +- .../test/java/com/aiit/xiuos/MqttTest.java | 5 +- 7 files changed, 258 insertions(+), 14 deletions(-) create mode 100644 xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MqttConfiguration.java create mode 100644 xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java create mode 100644 xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/HttpClientUtil.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/HttpClientUtil.java index c12dfcf..f6095a0 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/HttpClientUtil.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/HttpClientUtil.java @@ -1,6 +1,7 @@ package com.aiit.xiuos.Utils; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpResponse; import org.apache.http.HttpStatus; import org.apache.http.client.methods.CloseableHttpResponse; @@ -14,7 +15,7 @@ import org.apache.http.util.EntityUtils; import java.io.IOException; - +@Slf4j public class HttpClientUtil { private static String tokenString = ""; @@ -24,7 +25,6 @@ public class HttpClientUtil { /** * 以get方式调用第三方接口 * @param url - * @param token * @return */ public static JSONObject doGet(String url) { @@ -45,7 +45,7 @@ public class HttpClientUtil { return (JSONObject) JSONObject.parse(res); } } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } return null; } @@ -81,13 +81,13 @@ public class HttpClientUtil { return res; } } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } finally { if (httpClient != null){ try { httpClient.close(); } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } } } @@ -123,7 +123,7 @@ public class HttpClientUtil { token = result.getString("token"); } } catch (IOException e) { - e.printStackTrace(); + log.error(e.getMessage()); } return token; } @@ -144,7 +144,9 @@ public class HttpClientUtil { System.out.println(response); } - public static void main(String[] args) { - test("12345678910"); + + +public static void main(String[] arg0){ + } } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/MyUtils.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/MyUtils.java index 8d1d05b..e6b03c0 100644 --- a/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/MyUtils.java +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/Utils/MyUtils.java @@ -2,6 +2,7 @@ package com.aiit.xiuos.Utils; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import javax.servlet.http.HttpServletRequest; import java.io.*; @@ -12,7 +13,7 @@ import java.util.Date; import java.util.List; - +@Slf4j public class MyUtils { public static void main(String[] args) { @@ -62,9 +63,14 @@ public class MyUtils { return time; } - public static Date getDateTime() throws ParseException { + public static Date getDateTime() { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Date date = sdf.parse(MyUtils.getTime()); + Date date = null; + try { + date = sdf.parse(MyUtils.getTime()); + } catch (ParseException e) { + log.error(e.getMessage()); + } return date; } diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MqttConfiguration.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MqttConfiguration.java new file mode 100644 index 0000000..3d7bd53 --- /dev/null +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MqttConfiguration.java @@ -0,0 +1,59 @@ +package com.aiit.xiuos.mqtt; + +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + + +@Component +@ConfigurationProperties("mqtt") +@Setter +@Getter +@Slf4j +public class MqttConfiguration { + @Autowired + private MyMqttClient myMqttClient; + + /** + * 用户名 + */ + private String username; /** + + * 密码 + */ + private String password; + /** + * 连接地址 + */ + private String hostUrl; + /** + * 客户Id + */ + private String clientId; + /** + * 默认连接话题 + */ + private String defaultTopic; + /** + * 超时时间 + */ + private int timeout; + /** + * 保持连接数 + */ + private int keepalive; + + @Bean + public MyMqttClient getMqttPushClient() { + myMqttClient.connect(hostUrl, clientId, username, password, timeout, keepalive); + // 以/#结尾表示订阅所有以test开头的主题 + myMqttClient.subscribe(defaultTopic, 0); + log.info("订阅成功"+defaultTopic); + System.out.println("订阅成功"+defaultTopic); + return myMqttClient; + } +} diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java new file mode 100644 index 0000000..fd05317 --- /dev/null +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/MyMqttClient.java @@ -0,0 +1,121 @@ +package com.aiit.xiuos.mqtt; + + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; +import org.springframework.stereotype.Component; + +/** + * @author wangtianyi + * @description mqtt客户端 + */ +@Slf4j +@Component +public class MyMqttClient { + + + + private static MqttClient client; + + public static MqttClient getClient(){ + return client; + } + + public static void setClient(MqttClient client){ + MyMqttClient.client=client; + } + + /** + * 客户端连接 + * + * @param host ip+端口 + * @param clientID 客户端Id + * @param username 用户名 + * @param password 密码 + * @param timeout 超时时间 + * @param keepalive 保留数 + */ + public void connect(String host,String clientID,String username,String password,int timeout,int keepalive){ + MqttClient client=null; + + try { + client=new MqttClient(host,clientID,new MemoryPersistence()); + MqttConnectOptions options=new MqttConnectOptions(); + options.setCleanSession(true); + options.setUserName(username); + options.setPassword(password.toCharArray()); + options.setConnectionTimeout(timeout); + options.setKeepAliveInterval(keepalive); + options.setAutomaticReconnect(true); + MyMqttClient.setClient(client); + try { + client.setCallback(new PushCallback()); + client.connect(options); + boolean complete = client.isConnected(); + log.info("MQTT连接"+(complete?"成功":"失败")); + }catch (Exception e){ + log.error(e.getMessage()); + } + }catch (Exception e){ + log.error(e.getMessage()); + } + + } + + + + /** + * 发布,默认qos为0,非持久化 + * @param topic + * @param pushMessage + */ + public void publish(String topic,String pushMessage){ + publish(0,false,topic,pushMessage); + } + + /** + * 发布 + * + * @param qos 连接方式 + * @param retained 是否保留 + * @param topic 主题 + * @param pushMessage 消息体 + */ + public void publish(int qos,boolean retained,String topic,String pushMessage){ + MqttMessage message=new MqttMessage(); + message.setQos(qos); + message.setRetained(retained); + message.setPayload(pushMessage.getBytes()); + MqttTopic mqttTopic= MyMqttClient.getClient().getTopic(topic); + if(null== mqttTopic){ + log.error("topic not exist"); + } + MqttDeliveryToken token; + try { + token=mqttTopic.publish(message); + token.waitForCompletion(); + }catch (MqttPersistenceException e){ + log.error(e.getMessage()); + }catch (MqttException e){ + log.error(e.getMessage()); + } + } + + /** + * 订阅某个主题,qos默认为0 + * @param topic + */ + public void subscribe(String topic){ + log.error("开始订阅主题" + topic); + subscribe(topic,0); + } + + public void subscribe(String topic,int qos){ + try { + MyMqttClient.getClient().subscribe(topic,qos); + }catch (MqttException e){ + log.error(e.getMessage()); + } + } +} \ No newline at end of file diff --git a/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java new file mode 100644 index 0000000..55edfe0 --- /dev/null +++ b/xiuosiot-backend/src/main/java/com/aiit/xiuos/mqtt/PushCallback.java @@ -0,0 +1,55 @@ +package com.aiit.xiuos.mqtt; + +import lombok.extern.slf4j.Slf4j; +import org.eclipse.paho.client.mqttv3.*; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + + +@Component +@Slf4j +public class PushCallback implements MqttCallbackExtended { + + + @Autowired + private MyMqttClient myMqttClient; + + @Override + public void connectionLost(Throwable throwable) { + long reconnectTimes = 1; + while (true) { + try { + if (myMqttClient.getClient().isConnected()) { + log.info("mqtt reconnect success end"); + return; + } + log.info("mqtt reconnect times = {} try again...", reconnectTimes++); + myMqttClient.getClient().reconnect(); + } catch (MqttException e) { + log.error("", e); + } + try { + Thread.sleep(3000); + } catch (InterruptedException e1) { + e1.printStackTrace(); + } + } + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + System.out.println("接收消息主题 : " + topic); + System.out.println("接收消息Qos : " + message.getQos()); + System.out.println("接收消息内容 : " + new String(message.getPayload())); + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + System.out.println("deliveryComplete---------" + token.isComplete()); + } + + @Override + public void connectComplete(boolean b, String s) { + + } +} \ No newline at end of file diff --git a/xiuosiot-backend/src/main/resources/application-prod.yml b/xiuosiot-backend/src/main/resources/application-prod.yml index 1c5e4c7..8327905 100644 --- a/xiuosiot-backend/src/main/resources/application-prod.yml +++ b/xiuosiot-backend/src/main/resources/application-prod.yml @@ -32,7 +32,7 @@ spring: debug: true redis: - database: 0 + database: 1 host: 115.238.53.59 port: 6379 password: abc123 diff --git a/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java b/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java index c224718..ae0e51e 100644 --- a/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java +++ b/xiuosiot-backend/src/test/java/com/aiit/xiuos/MqttTest.java @@ -3,12 +3,13 @@ package com.aiit.xiuos; import com.aiit.xiuos.mqtt.MyMqttClient; import com.alibaba.fastjson.JSONObject; +import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import java.util.Random; - +@Slf4j @SpringBootTest(classes = XiuosApplication.class,webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) public class MqttTest { @Autowired @@ -28,7 +29,7 @@ public class MqttTest { try { Thread.sleep(3000); } catch (InterruptedException e) { - e.printStackTrace(); + log.error(e.getMessage()); } } }