This commit is contained in:
龚祖望 2022-11-14 17:53:43 +08:00
commit 61a42596c4
26 changed files with 838 additions and 85 deletions

View File

@ -94,6 +94,12 @@
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<!-- 引入 redis 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
<version>1.5.7.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
@ -102,6 +108,13 @@
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-mail</artifactId>
</dependency>
</dependencies>

View File

@ -1,4 +1,100 @@
package com.aiit.xiuos.Utils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSender;
import org.springframework.mail.javamail.MimeMessageHelper;
import org.springframework.stereotype.Component;
import javax.mail.MessagingException;
import javax.mail.internet.MimeMessage;
import java.io.File;
@Component
public class EmailUtil {
@Autowired
private JavaMailSender mailSender;
private String from ="419034340@qq.com";
/**
* 发送纯文本邮件信息
*
* @param to 接收方
* @param subject 邮件主题
* @param content 邮件内容发送内容
*/
public void sendMessage(String to, String subject, String content) {
// 创建一个邮件对象
SimpleMailMessage msg = new SimpleMailMessage();
msg.setFrom(from); // 设置发送发
msg.setTo(to); // 设置接收方
msg.setSubject(subject); // 设置邮件主题
msg.setText(content); // 设置邮件内容
// 发送邮件
mailSender.send(msg);
}
/**
* 发送带附件的邮件信息
*
* @param to 接收方
* @param subject 邮件主题
* @param content 邮件内容发送内容
* @param files 文件数组 // 可发送多个附件
*/
public void sendMessageCarryFiles(String to, String subject, String content, File[] files) {
MimeMessage mimeMessage = mailSender.createMimeMessage();
try {
MimeMessageHelper helper = new MimeMessageHelper(mimeMessage,true);
helper.setFrom(from); // 设置发送发
helper.setTo(to); // 设置接收方
helper.setSubject(subject); // 设置邮件主题
helper.setText(content); // 设置邮件内容
if (files != null && files.length > 0) { // 添加附件多个
for (File file : files) {
helper.addAttachment(file.getName(), file);
}
}
} catch (MessagingException e) {
e.printStackTrace();
}
// 发送邮件
mailSender.send(mimeMessage);
}
/**
* 发送带附件的邮件信息
*
* @param to 接收方
* @param subject 邮件主题
* @param content 邮件内容发送内容
* @param file 单个文件
*/
public void sendMessageCarryFile(String to, String subject, String content, File file) {
MimeMessage mimeMessage = mailSender.createMimeMessage();
try {
MimeMessageHelper helper = new MimeMessageHelper(mimeMessage,true);
helper.setFrom(from); // 设置发送发
helper.setTo(to); // 设置接收方
helper.setSubject(subject); // 设置邮件主题
helper.setText(content); // 设置邮件内容
helper.addAttachment(file.getName(), file); // 单个附件
} catch (MessagingException e) {
e.printStackTrace();
}
// 发送邮件
mailSender.send(mimeMessage);
}
public String getFrom() {
return from;
}
public void setFrom(String from) {
this.from = from;
}
}

View File

@ -1,6 +1,7 @@
package com.aiit.xiuos.Utils;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
@ -14,7 +15,7 @@ import org.apache.http.util.EntityUtils;
import java.io.IOException;
@Slf4j
public class HttpClientUtil {
private static String tokenString = "";
@ -24,7 +25,6 @@ public class HttpClientUtil {
/**
* 以get方式调用第三方接口
* @param url
* @param token
* @return
*/
public static JSONObject doGet(String url) {
@ -45,7 +45,7 @@ public class HttpClientUtil {
return (JSONObject) JSONObject.parse(res);
}
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
}
return null;
}
@ -81,13 +81,13 @@ public class HttpClientUtil {
return res;
}
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
} finally {
if (httpClient != null){
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
}
@ -123,7 +123,7 @@ public class HttpClientUtil {
token = result.getString("token");
}
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
}
return token;
}
@ -144,7 +144,9 @@ public class HttpClientUtil {
System.out.println(response);
}
public static void main(String[] args) {
test("12345678910");
public static void main(String[] arg0){
}
}

View File

@ -2,6 +2,7 @@ package com.aiit.xiuos.Utils;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import javax.servlet.http.HttpServletRequest;
import java.io.*;
@ -12,7 +13,7 @@ import java.util.Date;
import java.util.List;
@Slf4j
public class MyUtils {
public static void main(String[] args) {
@ -62,9 +63,14 @@ public class MyUtils {
return time;
}
public static Date getDateTime() throws ParseException {
public static Date getDateTime() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Date date = sdf.parse(MyUtils.getTime());
Date date = null;
try {
date = sdf.parse(MyUtils.getTime());
} catch (ParseException e) {
log.error(e.getMessage());
}
return date;
}

View File

@ -45,6 +45,16 @@ public class AlarmInfoController {
return new ResultRespons(Constant.ERROR_CODE,"更新告警信息失败!");
}
@PostMapping("/add")
public ResultRespons addAlarmInfo(@RequestBody AlarmInfo alarmInfo){
int res =alarmInfoService.addAlarmInfo(alarmInfo);
if(res==1){
return new ResultRespons(Constant.SUCCESS_CODE,"新增告警信息成功!");
}
return new ResultRespons(Constant.ERROR_CODE,"新增告警信息失败!");
}
@GetMapping("/getAlarmLevelCount")
public ResultRespons getAlarmLevelCount(HttpServletRequest request){
UserInfo userInfo =(UserInfo) request.getSession().getAttribute("user");

View File

@ -63,12 +63,5 @@ public class AlarmRuleController {
return new ResultRespons(Constant.ERROR_CODE,"删除告警规则失败");
}
@GetMapping("/executeAlarmTask")
public void executeTask(){
try {
taskScheduled.ExecuteRule();
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -1,4 +1,99 @@
package com.aiit.xiuos.controller;
import com.aiit.xiuos.Utils.*;
import com.aiit.xiuos.model.DataForwarding;
import com.aiit.xiuos.model.UserInfo;
import com.aiit.xiuos.service.DataForwardService;
import com.aiit.xiuos.tdengine.TDengineJDBCUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/dataForward")
@Slf4j
public class DataForwardController {
@Autowired
DataForwardService dataForwardService;
@Autowired
EmailUtil emailUtil;
@PostMapping("/addRecord")
public ResultRespons addRecord(@RequestBody Map<String,Object> jsonMap) {
boolean flag =false;
List<String> dataList =(List<String>) jsonMap.get("datalist");
DataForwarding dataForwarding =new DataForwarding();
dataForwarding.setAddress((String)jsonMap.get("address"));
dataForwarding.setContent((String)jsonMap.get("content"));
dataForwarding.setOrg((String)jsonMap.get("org"));
dataForwarding.setSql((String)jsonMap.get("sql"));
dataForwarding.setTopic((String)jsonMap.get("topic"));
dataForwarding.setType((String)jsonMap.get("type"));
dataForwarding.setTime(MyUtils.getDateTime());
try {
BufferedWriter bufferedWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream("/home/aiitadmin/email/data.txt"), "UTF-8"));
for (String s : dataList) {
bufferedWriter.write(s);
bufferedWriter.newLine();
bufferedWriter.flush();
}
emailUtil.sendMessageCarryFile(dataForwarding.getAddress(),dataForwarding.getTopic(),dataForwarding.getContent(),new File("/home/aiitadmin/email/data.txt"));
flag =true;
} catch (FileNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}catch (Exception e){
e.printStackTrace();
log.info("邮件发送失败");
}
if(flag){
int res = dataForwardService.addRecord(dataForwarding);
if(res==1){
return new ResultRespons(Constant.SUCCESS_CODE,"邮件发送成功,记录已保存");
}else {
return new ResultRespons(Constant.ERROR_CODE, "邮件发送成功,记录保存失败");
}
}else {
return new ResultRespons(Constant.ERROR_CODE,"邮件发送失败");
}
}
@GetMapping("/executeSql")
public ResultRespons executeSql(@RequestParam String sql){
ArrayList resultData = null;
try {
resultData = TDengineJDBCUtil.executeSql(sql);
} catch (Exception e) {
e.printStackTrace();
}
if(resultData!=null){
return new ResultRespons(Constant.SUCCESS_CODE,"数据查询成功",resultData);
}
return new ResultRespons(Constant.ERROR_CODE,"数据查询失败,请检查查询语句");
}
@GetMapping("/getRecord")
public ResultRespons getRecord(HttpServletRequest request){
UserInfo userInfo =(UserInfo) request.getSession().getAttribute("user");
List<DataForwarding> lists = dataForwardService.selectRecord(userInfo.getOrg());
if(lists != null && lists.size()>=0){
return new ResultRespons(Constant.SUCCESS_CODE,"数据查询成功",lists);
}
return new ResultRespons(Constant.ERROR_CODE,"数据查询失败,请检查查询语句");
}
}

View File

@ -3,23 +3,25 @@ package com.aiit.xiuos.controller;
import com.aiit.xiuos.Utils.Constant;
import com.aiit.xiuos.Utils.HttpClientUtil;
import com.aiit.xiuos.Utils.ResultRespons;
import com.aiit.xiuos.dao.mappers.AvgDayDataMapper;
import com.aiit.xiuos.model.AvgDayData;
import com.aiit.xiuos.mqtt.MqttConfiguration;
import com.aiit.xiuos.mqtt.MyMqttClient;
import com.aiit.xiuos.service.AvgDayDataService;
import com.alibaba.fastjson.JSONObject;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.bind.annotation.*;
import javax.servlet.http.HttpServletRequest;
import java.util.List;
import java.util.Map;
@RestController
@RequestMapping("/data")
public class DeviceDataController {
@Autowired
AvgDayDataService avgDayDataService;
@Autowired
private MyMqttClient myMqttClient;
@GetMapping("/getAll")
public ResultRespons getAllDataFromDSD(HttpServletRequest request){
@ -38,4 +40,14 @@ public class DeviceDataController {
return new ResultRespons(Constant.ERROR_CODE,"查询数据失败");
}
@PostMapping("/publishData")
public ResultRespons publishData(@RequestBody Map<String, Object> jsonMap){
if(myMqttClient==null){
myMqttClient=new MqttConfiguration().getMqttPushClient();
}
myMqttClient.publish("xiuosiot/"+jsonMap.get("deviceno"), JSONObject.toJSONString(jsonMap));
return new ResultRespons(Constant.ERROR_CODE,"数据发送成功");
}
}

View File

@ -1,4 +1,55 @@
package com.aiit.xiuos.controller;
import com.aiit.xiuos.Utils.MyUtils;
import com.aiit.xiuos.scheduled.TaskScheduled;
import com.aiit.xiuos.service.impl.ProtocolServiceImpl;
import com.aiit.xiuos.socket.WebSocketServer;
import com.aiit.xiuos.tdengine.TDengineJDBCUtil;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
public class EecludeController {
import javax.servlet.http.HttpServletRequest;
import java.text.ParseException;
@RestController
@RequestMapping("/exclude")
@Slf4j
public class ExcludeController {
@Autowired
ProtocolServiceImpl protocolService;
@Autowired
TaskScheduled taskScheduled;
@Autowired
WebSocketServer webSocketServer;
@PostMapping("/addjxsd")
public void addJXSD(@RequestBody String jsonString, HttpServletRequest request) throws ParseException {
System.out.println("收到的jsonString"+jsonString);
JSONObject jsonObject = MyUtils.StringToJson(jsonString);
int valueNum = jsonObject.getInteger("valueNum");
String client =jsonObject.getString("org");
webSocketServer.sendToMessageById(client,jsonString);
String deviceId= jsonObject.getString("deviceId");
if(valueNum==72){
JSONArray jsonArray =jsonObject.getJSONArray("readItemList");
StringBuilder sql =new StringBuilder("insert into jxsd_1500 values(now");
for(int i=0;i<jsonArray.size();i++){
sql.append(",");
sql.append(jsonArray.getJSONObject(i).getInteger("value"));
}
sql.append(")");
TDengineJDBCUtil.executeInsertSql(sql.toString());
}
}
@GetMapping("/executeAlarmTask")
public void executeTask(){
try {
taskScheduled.ExecuteRule();
} catch (Exception e) {
log.error(e.getMessage());
}
}
}

View File

@ -1,7 +1,13 @@
package com.aiit.xiuos.dao.mappers;
import com.aiit.xiuos.model.DataForwarding;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import org.springframework.stereotype.Repository;
import java.util.List;
@Repository
public interface DataForwardingMapper {
int deleteByPrimaryKey(Integer id);
@ -14,4 +20,7 @@ public interface DataForwardingMapper {
int updateByPrimaryKeySelective(DataForwarding record);
int updateByPrimaryKey(DataForwarding record);
@Select("select * from data_forwarding where org = #{org}")
List<DataForwarding> selectAll(@Param("org") String org);
}

View File

@ -1,6 +1,8 @@
package com.aiit.xiuos.model;
import java.util.Date;
import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.Builder;
import lombok.Data;
@ -14,7 +16,7 @@ public class DataForwarding {
private String content;
private String type;
@JsonFormat(pattern ="yyyy-MM-dd HH:mm:ss",timezone = "GMT+8")
private Date time;
private String address;

View File

@ -0,0 +1,59 @@
package com.aiit.xiuos.mqtt;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
@Component
@ConfigurationProperties("mqtt")
@Setter
@Getter
@Slf4j
public class MqttConfiguration {
@Autowired
private MyMqttClient myMqttClient;
/**
* 用户名
*/
private String username; /**
* 密码
*/
private String password;
/**
* 连接地址
*/
private String hostUrl;
/**
* 客户Id
*/
private String clientId;
/**
* 默认连接话题
*/
private String defaultTopic;
/**
* 超时时间
*/
private int timeout;
/**
* 保持连接数
*/
private int keepalive;
@Bean
public MyMqttClient getMqttPushClient() {
myMqttClient.connect(hostUrl, clientId, username, password, timeout, keepalive);
// /#结尾表示订阅所有以test开头的主题
myMqttClient.subscribe(defaultTopic, 0);
log.info("订阅成功"+defaultTopic);
System.out.println("订阅成功"+defaultTopic);
return myMqttClient;
}
}

View File

@ -0,0 +1,121 @@
package com.aiit.xiuos.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.stereotype.Component;
/**
* @author wangtianyi
* @description mqtt客户端
*/
@Slf4j
@Component
public class MyMqttClient {
private static MqttClient client;
public static MqttClient getClient(){
return client;
}
public static void setClient(MqttClient client){
MyMqttClient.client=client;
}
/**
* 客户端连接
*
* @param host ip+端口
* @param clientID 客户端Id
* @param username 用户名
* @param password 密码
* @param timeout 超时时间
* @param keepalive 保留数
*/
public void connect(String host,String clientID,String username,String password,int timeout,int keepalive){
MqttClient client=null;
try {
client=new MqttClient(host,clientID,new MemoryPersistence());
MqttConnectOptions options=new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keepalive);
options.setAutomaticReconnect(true);
MyMqttClient.setClient(client);
try {
client.setCallback(new PushCallback());
client.connect(options);
boolean complete = client.isConnected();
log.info("MQTT连接"+(complete?"成功":"失败"));
}catch (Exception e){
log.error(e.getMessage());
}
}catch (Exception e){
log.error(e.getMessage());
}
}
/**
* 发布默认qos为0非持久化
* @param topic
* @param pushMessage
*/
public void publish(String topic,String pushMessage){
publish(0,false,topic,pushMessage);
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void publish(int qos,boolean retained,String topic,String pushMessage){
MqttMessage message=new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mqttTopic= MyMqttClient.getClient().getTopic(topic);
if(null== mqttTopic){
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
token=mqttTopic.publish(message);
token.waitForCompletion();
}catch (MqttPersistenceException e){
log.error(e.getMessage());
}catch (MqttException e){
log.error(e.getMessage());
}
}
/**
* 订阅某个主题qos默认为0
* @param topic
*/
public void subscribe(String topic){
log.error("开始订阅主题" + topic);
subscribe(topic,0);
}
public void subscribe(String topic,int qos){
try {
MyMqttClient.getClient().subscribe(topic,qos);
}catch (MqttException e){
log.error(e.getMessage());
}
}
}

View File

@ -0,0 +1,55 @@
package com.aiit.xiuos.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class PushCallback implements MqttCallbackExtended {
@Autowired
private MyMqttClient myMqttClient;
@Override
public void connectionLost(Throwable throwable) {
long reconnectTimes = 1;
while (true) {
try {
if (myMqttClient.getClient().isConnected()) {
log.info("mqtt reconnect success end");
return;
}
log.info("mqtt reconnect times = {} try again...", reconnectTimes++);
myMqttClient.getClient().reconnect();
} catch (MqttException e) {
log.error("", e);
}
try {
Thread.sleep(3000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
}
}
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
@Override
public void connectComplete(boolean b, String s) {
}
}

View File

@ -2,17 +2,13 @@ package com.aiit.xiuos.scheduled;
import com.aiit.xiuos.Utils.EmailUtil;
import com.aiit.xiuos.Utils.MyUtils;
import com.aiit.xiuos.Utils.TDengineJDBCUtil;
import com.aiit.xiuos.tdengine.TDengineJDBCUtil;
import com.aiit.xiuos.model.*;
import com.aiit.xiuos.service.*;
import com.aiit.xiuos.service.impl.AvgDayDataServiceImpl;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.checkerframework.checker.units.qual.A;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
@ -134,16 +130,10 @@ public class TaskScheduled {
//保存告警信息告警级别高的插入,低的因主键冲突可自动排除
try{
alarmInfoService.addAlarmInfo(alarmInfo);
}catch (Exception e){
e.printStackTrace();
}
//发送邮件
try{
emailUtil.sendMessage(alarmRule.getNoticeAddress(),"设备告警",alarmRule.getNoticeContent());
log.info("邮件发送成功");
}catch (Exception e) {
log.error("邮件发送失败");
e.printStackTrace();
}catch (Exception e){
log.info(e.getMessage());
}
}

View File

@ -1,4 +1,11 @@
package com.aiit.xiuos.service;
public interface DataForward {
import com.aiit.xiuos.model.DataForwarding;
import java.util.List;
public interface DataForwardService {
int addRecord(DataForwarding dataForwarding);
List<DataForwarding> selectRecord(String org);
}

View File

@ -14,5 +14,6 @@ public interface DeviceInfoService {
List<DeviceInfo> selectbyNo(String no);
List<Map<String,String>> getDeviceTypeCount(String org);
List<Map<String,String>> getDeviceRunStautsCount(String org);
String getTypeByName(String deviceNo);
}

View File

@ -1,4 +1,23 @@
package com.aiit.xiuos.service.impl;
public class DataForwardServiceImpl {
import com.aiit.xiuos.dao.mappers.DataForwardingMapper;
import com.aiit.xiuos.model.DataForwarding;
import com.aiit.xiuos.service.DataForwardService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
@Service
public class DataForwardServiceImpl implements DataForwardService {
@Autowired
DataForwardingMapper dataForwardingMapper;
@Override
public int addRecord(DataForwarding dataForwarding) {
return dataForwardingMapper.insertSelective(dataForwarding);
}
@Override
public List<DataForwarding> selectRecord(String org) {
return dataForwardingMapper.selectAll(org);
}
}

View File

@ -14,6 +14,7 @@ import java.util.Map;
public class DeviceInfoServiceImpl implements DeviceInfoService {
@Autowired
private DeviceInfoMapper deviceInfoMapper;
@Override
public int addDevice(DeviceInfo deviceInfo) {
return deviceInfoMapper.insert(deviceInfo);
@ -54,4 +55,13 @@ public class DeviceInfoServiceImpl implements DeviceInfoService {
public List<Map<String, String>> getDeviceRunStautsCount(String org) {
return deviceInfoMapper.getDeviceRunStatusCount(org);
}
@Override
public String getTypeByName(String deviceNo) {
List<String> types = deviceInfoMapper.selectTypeByNo(deviceNo);
if(types!=null && types.size()>0){
return types.get(0);
}
return null;
}
}

View File

@ -1,8 +1,6 @@
package com.aiit.xiuos.socket;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
@ -11,7 +9,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ServerEndpoint(value = "/{paramName}")
@ServerEndpoint(value = "/websocket/{clientId}")
@Component
@Slf4j
public class WebSocketServer {
@ -42,29 +40,22 @@ public class WebSocketServer {
/**
* 连接建立成功调用的方法*/
@OnOpen
public void onOpen(@PathParam(value = "paramName") String param, Session session) {
if(numofAccount.containsKey(param)){
Integer num=numofAccount.get(param);
String paramtmp=param;
param=param+num;
System.out.println(param);
numofAccount.put(paramtmp,num+1);
}else{
numofAccount.put(param,1);
}
public void onOpen(@PathParam(value = "clientId") String clientId, Session session) {
//接收到发送消息的人员编号
name = param;
name = clientId;
this.session = session;
/**加入set中*/
webSocketSet.put(param,this);
webSocketSet.put(clientId,this);
/**在线数加1*/
addOnlineCount();
System.out.println("有新连接"+param+"加入!当前在线人数为" + getOnlineCount());
//try {
//sendMessage("-连接已建立-");
//} catch (IOException e) {
// System.out.println("IO异常");
// }
System.out.println("有新连接"+clientId+"加入!当前在线人数为" + getOnlineCount());
log.info("有新连接"+clientId+"加入!当前在线人数为" + getOnlineCount());
try {
sendMessage(clientId+"-连接已建立-");
} catch (IOException e) {
System.out.println("IO异常");
}
}
/**
@ -78,6 +69,7 @@ public class WebSocketServer {
/** 在线数减1 */
subOnlineCount();
System.out.println("有一连接关闭!当前在线人数为" + getOnlineCount());
log.info("有一连接"+name+"关闭!当前在线人数为" + getOnlineCount());
}
}
@ -88,11 +80,11 @@ public class WebSocketServer {
@OnMessage
public void onMessage(String message, Session session) {
System.out.println("来自客户端的消息:" + message);
log.info("来自客户端的消息:" + message);
try {
this.sendMessage(message);
this.sendMessage("收到!");
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
@ -122,9 +114,10 @@ public class WebSocketServer {
webSocketSet.get(id).sendMessage(message);
} else {
System.out.println("webSocketSet中没有此key不推送消息");
log.info("webSocketSet中没有此key不推送消息");
}
} catch (IOException e) {
e.printStackTrace();
log.error(e.getMessage());
}
}
@ -140,7 +133,7 @@ public class WebSocketServer {
value.sendMessage(message);
}
}catch(IOException e){
e.printStackTrace();
log.error(e.getMessage());
}
}

View File

@ -1,10 +1,8 @@
package com.aiit.xiuos.Utils;
package com.aiit.xiuos.tdengine;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.taosdata.jdbc.TSDBDriver;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import java.sql.*;
import java.util.ArrayList;
@ -13,20 +11,20 @@ import java.util.Properties;
public class TDengineJDBCUtil {
public static Connection getConn() throws Exception{
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = "jdbc:TAOS://xiuosiot.taosnode1:6030/xiuosiot?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_LOCALE, "en_US.UTF-8");
connProps.setProperty(TSDBDriver.PROPERTY_KEY_TIME_ZONE, "UTC-8");
connProps.setProperty("debugFlag", "135");
connProps.setProperty("maxSQLLength", "1048576");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
log.info(tdengineConfig.jdbcurl);
Connection conn = DriverManager.getConnection(tdengineConfig.jdbcurl, connProps);
return conn;
}
public static Connection getRestConn() throws Exception{
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
String jdbcUrl = "jdbc:TAOS-RS://10.0.30.23:6041/test?user=root&password=taosdata";
String jdbcUrl = "jdbc:TAOS-RS://taosnode1:6041/test?user=root&password=taosdata";
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_BATCH_LOAD, "true");
Connection conn = DriverManager.getConnection(jdbcUrl, connProps);
@ -48,7 +46,7 @@ public class TDengineJDBCUtil {
stmt.executeUpdate(sql);
return true;
} catch (Exception e) {
e.printStackTrace();
log.error(e.getMessage());
}finally {
try {
if(connection!=null){
@ -65,11 +63,11 @@ public class TDengineJDBCUtil {
}
public static String changeType(String type){
if(type.equals("M168-LoRa-FM100")) return "M168type";
if(type.equals("RV400-NPU16T-5G-AR100")) return "RV400ARtype";
if(type.equals("RV400-NPU4T-5G-SR100")) return "RV400SRtype";
if(type.equals("M528-A800-5G-HM100")) return "M528type";
if(type.equals("RV400-4G-FR100")) return "RV400FRtype";
if(type.equals("M168-LoRa-FM100")) return "M168_LoRa_FM100";
if(type.equals("RV400-NPU16T-5G-AR100")) return "RV400_NPU16T_5G_AR100";
if(type.equals("RV400-NPU4T-5G-SR100")) return "RV400_NPU4T_5G_SR100";
if(type.equals("M528-A800-5G-HM100")) return "M528_A800_5G_HM100";
if(type.equals("RV400-4G-FR100")) return "RV400_4G_FR100";
return null;
}
@ -97,7 +95,7 @@ public class TDengineJDBCUtil {
} catch (SQLException e) {
System.out.println("ERROR Message: " + e.getMessage());
System.out.println("ERROR Code: " + e.getErrorCode());
e.printStackTrace();
log.error(e.getMessage());
}finally {
try {
if(connection!=null){
@ -115,12 +113,13 @@ public class TDengineJDBCUtil {
public static int getCount(String sql) throws Exception {
Connection connection = null;
int count =-1;
int count =0;
try {
connection = TDengineJDBCUtil.getConn();
Statement stmt = connection.createStatement();
ResultSet resultSet =stmt.executeQuery(sql);
log.info("tdengine executeQuery:"+sql);
ResultSet resultSet =stmt.executeQuery(sql);
ResultSetMetaData metaData = resultSet.getMetaData();
if (resultSet.next()) {
count = resultSet.getInt(1);
@ -129,7 +128,7 @@ public class TDengineJDBCUtil {
} catch (SQLException e) {
System.out.println("ERROR Message: " + e.getMessage());
System.out.println("ERROR Code: " + e.getErrorCode());
e.printStackTrace();
log.error(e.getMessage());
}finally {
try {
if(connection!=null){
@ -145,6 +144,31 @@ public class TDengineJDBCUtil {
}
public static void executeInsertSql(String sql) {
Connection connection = null;
try {
connection = TDengineJDBCUtil.getConn();
Statement stmt = connection.createStatement();
log.info("tdengine executeQuery:"+sql);
stmt.executeUpdate(sql);
} catch (SQLException e) {
System.out.println("ERROR Message: " + e.getMessage());
System.out.println("ERROR Code: " + e.getErrorCode());
log.error(e.getMessage());
} catch (Exception e) {
log.error(e.getMessage());
} finally {
try {
if(connection!=null){
connection.close();
}
} catch (SQLException throwables) {
throwables.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,14 @@
package com.aiit.xiuos.tdengine;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Component
public class tdengineConfig {
public static String jdbcurl;
@Value("${tdengine.url}")
public void setJdbcurl(String jdbcurl){
tdengineConfig.jdbcurl =jdbcurl;
}
}

View File

@ -76,6 +76,9 @@ mqtt:
#默认主题
default-topic: xiuosiot/#
tdengine:
url: jdbc:TAOS://taosnode1:6030/xiuosiot?user=root&password=taosdata
logging:
file:
name: xiuosiot.log

View File

@ -32,7 +32,7 @@ spring:
debug: true
redis:
database: 0
database: 1
host: 115.238.53.59
port: 6379
password: abc123
@ -69,7 +69,8 @@ mqtt:
keepalive: 100
#默认主题
default-topic: xiuosiot/#
tdengine:
url: jdbc:TAOS://xiuosiot.taosnode1:6030/xiuosiot?user=root&password=taosdata
logging:
file:
name: xiuosiot.log

View File

@ -0,0 +1,130 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.aiit.xiuos.dao.mappers.DataForwardingMapper">
<resultMap id="BaseResultMap" type="com.aiit.xiuos.model.DataForwarding">
<constructor>
<idArg column="id" javaType="java.lang.Integer" jdbcType="INTEGER" />
<arg column="topic" javaType="java.lang.String" jdbcType="VARCHAR" />
<arg column="content" javaType="java.lang.String" jdbcType="VARCHAR" />
<arg column="type" javaType="java.lang.String" jdbcType="VARCHAR" />
<arg column="time" javaType="java.util.Date" jdbcType="TIMESTAMP" />
<arg column="address" javaType="java.lang.String" jdbcType="VARCHAR" />
<arg column="org" javaType="java.lang.String" jdbcType="VARCHAR" />
<arg column="sql" javaType="java.lang.String" jdbcType="VARCHAR" />
</constructor>
</resultMap>
<sql id="Base_Column_List">
id, topic, content, type, time, address, org, sql
</sql>
<select id="selectByPrimaryKey" parameterType="java.lang.Integer" resultMap="BaseResultMap">
select
<include refid="Base_Column_List" />
from data_forwarding
where id = #{id,jdbcType=INTEGER}
</select>
<delete id="deleteByPrimaryKey" parameterType="java.lang.Integer">
delete from data_forwarding
where id = #{id,jdbcType=INTEGER}
</delete>
<insert id="insert" parameterType="com.aiit.xiuos.model.DataForwarding">
insert into data_forwarding (id, topic, content,
type, time, address,
org, sql)
values (#{id,jdbcType=INTEGER}, #{topic,jdbcType=VARCHAR}, #{content,jdbcType=VARCHAR},
#{type,jdbcType=VARCHAR}, #{time,jdbcType=TIMESTAMP}, #{address,jdbcType=VARCHAR},
#{org,jdbcType=VARCHAR}, #{sql,jdbcType=VARCHAR})
</insert>
<insert id="insertSelective" parameterType="com.aiit.xiuos.model.DataForwarding">
insert into data_forwarding
<trim prefix="(" suffix=")" suffixOverrides=",">
<if test="id != null">
id,
</if>
<if test="topic != null">
topic,
</if>
<if test="content != null">
content,
</if>
<if test="type != null">
type,
</if>
<if test="time != null">
time,
</if>
<if test="address != null">
address,
</if>
<if test="org != null">
org,
</if>
<if test="sql != null">
sql,
</if>
</trim>
<trim prefix="values (" suffix=")" suffixOverrides=",">
<if test="id != null">
#{id,jdbcType=INTEGER},
</if>
<if test="topic != null">
#{topic,jdbcType=VARCHAR},
</if>
<if test="content != null">
#{content,jdbcType=VARCHAR},
</if>
<if test="type != null">
#{type,jdbcType=VARCHAR},
</if>
<if test="time != null">
#{time,jdbcType=TIMESTAMP},
</if>
<if test="address != null">
#{address,jdbcType=VARCHAR},
</if>
<if test="org != null">
#{org,jdbcType=VARCHAR},
</if>
<if test="sql != null">
#{sql,jdbcType=VARCHAR},
</if>
</trim>
</insert>
<update id="updateByPrimaryKeySelective" parameterType="com.aiit.xiuos.model.DataForwarding">
update data_forwarding
<set>
<if test="topic != null">
topic = #{topic,jdbcType=VARCHAR},
</if>
<if test="content != null">
content = #{content,jdbcType=VARCHAR},
</if>
<if test="type != null">
type = #{type,jdbcType=VARCHAR},
</if>
<if test="time != null">
time = #{time,jdbcType=TIMESTAMP},
</if>
<if test="address != null">
address = #{address,jdbcType=VARCHAR},
</if>
<if test="org != null">
org = #{org,jdbcType=VARCHAR},
</if>
<if test="sql != null">
sql = #{sql,jdbcType=VARCHAR},
</if>
</set>
where id = #{id,jdbcType=INTEGER}
</update>
<update id="updateByPrimaryKey" parameterType="com.aiit.xiuos.model.DataForwarding">
update data_forwarding
set topic = #{topic,jdbcType=VARCHAR},
content = #{content,jdbcType=VARCHAR},
type = #{type,jdbcType=VARCHAR},
time = #{time,jdbcType=TIMESTAMP},
address = #{address,jdbcType=VARCHAR},
org = #{org,jdbcType=VARCHAR},
sql = #{sql,jdbcType=VARCHAR}
where id = #{id,jdbcType=INTEGER}
</update>
</mapper>

View File

@ -0,0 +1,37 @@
package com.aiit.xiuos;
import com.aiit.xiuos.mqtt.MyMqttClient;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Random;
@Slf4j
@SpringBootTest(classes = XiuosApplication.class,webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
public class MqttTest {
@Autowired
private MyMqttClient myMqttClient;
@Test
void pushlish() {
Random random =new Random();
for (int i = 0; i < 1; i++) {
JSONObject jsonObject =new JSONObject();
jsonObject.put("co2",19);
jsonObject.put("so2",19);
jsonObject.put("temperature",30);
jsonObject.put("humidity",28);
jsonObject.put("deviceno","a000001");
myMqttClient.publish("xiuosiot/A000001", jsonObject.toJSONString());
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
}
}
}