Implemented app to forward data from eth to 4g

This commit is contained in:
KirisameMashiro 2025-02-26 16:39:28 +08:00
parent 4d1ecbd6ec
commit abb007cd5b
9 changed files with 1473 additions and 17 deletions

View File

@ -0,0 +1 @@
/client_id.h

View File

@ -0,0 +1,19 @@
config USE_ADL_APP
bool "Use app for adl400 electrometer"
default n
if BSP_USING_ETH
config USE_ETH_4G_APP
bool "Use app for bridging ETH and 4G"
default n
endif
config USE_SENSOR_APP
bool "Use app for collecting sensor data from rs485"
default n
if USE_ETH_4G_APP || USE_SENSOR_APP
config CLIENT_ID
int "Client ID used in 4G dataframe"
default 0
endif

View File

@ -1,3 +1,15 @@
SRC_FILES := 4g_app.c ch32v208_adl400.c
SRC_FILES :=
ifeq ($(CONFIG_USE_ADL_APP),y)
SRC_FILES += ch32v208_adl400.c
endif
ifeq ($(CONFIG_USE_ETH_4G_APP),y)
SRC_FILES += ch32v208_eth_4g.c
endif
ifeq ($(CONFIG_USE_SENSOR_APP),y)
SRC_FILES += ch32v208_sensor.c
endif
include $(KERNEL_ROOT)/compiler.mk

View File

@ -0,0 +1,462 @@
/**
* @file ch32v208_eth_4g.c
* @brief Application that runs on ch32v208 board, receiving data from ETH and
* send it over 4G.
* @author Zhou Zhi
* @version 1.0
* @date 2025-02-24
*/
#include <ModuleConfig.h>
#include <adapter.h>
#include <stdbool.h>
#include <transform.h>
#define MAX_FRAME_SIZE 256 // 最大帧大小
#define MAX_BUFFER_SIZE 1024 * 2 // 最大缓冲区大小
#define RESEND_COUNT 3 // 最大帧重发次数
#define MAX_RECV_RETRY 3 // 最大帧重试接受次数
#define RECONNECT_COUNT 5 // 最大连接次数
#define WATING_RESPONSE_MS 5000 // 等待响应时间,单位为毫秒
// manually configured SENSOR_CLIENT_ID, as modifying `xsconfig.h` would cause
// massive recompilation
#if __has_include("client_id.h")
#include "client_id.h"
#ifndef CLIENT_ID_4G
#error You should manually define `CLIENT_ID_4G` in client_id.h, as this file overrides config `CLIENT_ID`
#endif
#else
#define CLIENT_ID_4G CLIENT_ID
#endif
#define STR_H(x) #x
#define STR(x) STR_H(x)
#pragma message "Current SENSOR_CLIENT_ID: " STR(CLIENT_ID_4G)
__attribute__((always_inline)) inline bool any_of(const char* s, size_t len, char c) {
for (size_t i = 0; i < len; ++i) {
if (s[i] == c) {
return true;
}
}
return false;
}
/**
* @brief
*/
typedef struct DataFrame {
uint16_t id;
uint16_t length;
char data[MAX_FRAME_SIZE]; // 上传服务器的数据帧字符串
} DataFrame;
/**
* @brief 使
*/
struct QueueBuffer {
struct DataFrame* buffer[MAX_BUFFER_SIZE / sizeof(struct DataFrame)]; // 循环队列存储空间,使用数组存储
int front; // 循环队列队头
int rear; // 循环队列队尾
pthread_mutex_t mutex; // 互斥访问循环队列信号量
sem_t full; // 循环队列中有效成员个数的信号量
};
#define BUFFER_ELEM_COUNT (MAX_BUFFER_SIZE / sizeof(DataFrame)) // 循环队列中可以容纳的最大成员个数
/**
* @brief
* @param pQueueBuffer
* @return * int 0
*/
static int initBuffer(struct QueueBuffer* pQueueBuffer) {
pQueueBuffer->front = 0;
pQueueBuffer->rear = 0;
if (PrivMutexCreate(&pQueueBuffer->mutex, 0) < 0) {
printf("buffer mutex create failed.\n");
return -1;
}
if (PrivSemaphoreCreate(&pQueueBuffer->full, 0, 0) < 0) {
printf("buffer full semaphore create failed.\n");
return -1;
}
return 0;
}
/**
* @brief
* @param pQueueBuffer
* @param pDataFrame
* @return int 0
*/
static int offerBuffer(struct QueueBuffer* pQueueBuffer, struct DataFrame* pDataFrame) {
/* 循环队列已满,将最旧的成员出队 */
if ((pQueueBuffer->rear + 1) % BUFFER_ELEM_COUNT == pQueueBuffer->front) {
struct DataFrame* frontDataFrame = pQueueBuffer->buffer[pQueueBuffer->front];
PrivFree(frontDataFrame);
pQueueBuffer->front = (pQueueBuffer->front + 1) % BUFFER_ELEM_COUNT;
}
/* 新成员入队 */
pQueueBuffer->buffer[pQueueBuffer->rear] = pDataFrame;
pQueueBuffer->rear = (pQueueBuffer->rear + 1) % BUFFER_ELEM_COUNT;
printf("front: %d\n", pQueueBuffer->front);
printf("rear: %d\n", pQueueBuffer->rear);
return 0;
}
/**
* @brief NULL
* @param pQueueBuffer
* @return struct DataFrame* NULL
*/
static struct DataFrame* pollBuffer(struct QueueBuffer* pQueueBuffer) {
/* 队列为空返回NULL */
if (pQueueBuffer->front == pQueueBuffer->rear) {
return NULL;
}
/* 最旧的成员出队 */
struct DataFrame* pFrontDataFrame = pQueueBuffer->buffer[pQueueBuffer->front];
pQueueBuffer->buffer[pQueueBuffer->front] = NULL;
pQueueBuffer->front = (pQueueBuffer->front + 1) % BUFFER_ELEM_COUNT;
printf("front: %d\n", pQueueBuffer->front);
printf("rear: %d\n", pQueueBuffer->rear);
return pFrontDataFrame;
}
/**
* @brief NULL
* @param pQueueBuffer
* @return struct DataFrame* NULL
*/
static struct DataFrame* peekBuffer(struct QueueBuffer* pQueueBuffer) {
/* 如果队列为空返回NULL */
if (pQueueBuffer->front == pQueueBuffer->rear) {
return NULL;
}
/* 返回队头元素,但不出队 */
return pQueueBuffer->buffer[pQueueBuffer->front];
}
/**
* @brief
* PrivRead函数
* @param fd
* @param buf
* @param len
* @return int
* 0WATING_RESPONSE_MS仍未读取到指定字节数-1
*/
static int privReadEnoughData(int fd, void* buf, size_t len) {
char* buffer = (char*)buf; // 将接收的存储空间指针强制转型
int gottenBytes = 0; // 已经读取到的字节数
int remainTime = WATING_RESPONSE_MS; // 剩余的时间
/* 只有接收的字节数不够,并且还有剩余时间,才可以继续读取 */
while (gottenBytes < len && remainTime > 0) {
int bytes = PrivRead(fd, buffer + gottenBytes, len - gottenBytes); // 从设备读取
if (bytes > 0) {
gottenBytes += bytes; // 读取到字节
} else if (bytes < 0) {
printf("Error reading from serial port\n");
return -1; // 读取错误
}
PrivTaskDelay(100); // 每100ms读取一次
remainTime -= 100; // 剩余时间减去100ms
}
/* 若没有剩余时间,表示还没有读取到指定的字节数,返回-1若有剩余时间表示已经读取了指定的字节数返回0
*/
return remainTime < 0 ? -1 : 0;
}
static int32_t recvFrameFromEth(struct Adapter* eth_adapter, char* buffer, size_t buffer_len) {
uint32_t payloadLen;
int32_t recvLen;
// assume that received length is of same endianness
recvLen = AdapterDeviceRecv(eth_adapter, &payloadLen, sizeof(payloadLen));
if (recvLen != sizeof(payloadLen)) {
printf("Wrong length received: %d\n", recvLen);
return -1;
}
if (payloadLen > buffer_len) {
printf("Invalid length: %d\n", payloadLen);
return -1;
}
int ret = privReadEnoughData(eth_adapter->fd, buffer, payloadLen);
if (ret != 0) {
printf("Receive %d bytes from eth failed\n", payloadLen);
return -1;
}
return payloadLen;
}
static int waitAndRecv(struct QueueBuffer* queue, struct Adapter* eth_adapter) {
char receiveBuffer[sizeof(DataFrame) + 1]; // 从服务器接收每帧响应的存储空间
int32_t payloadLen = recvFrameFromEth(eth_adapter, receiveBuffer, sizeof(receiveBuffer));
if (payloadLen < 0) {
return payloadLen;
}
struct DataFrame* pDataFrame = (struct DataFrame*)PrivMalloc(sizeof(struct DataFrame));
memset(pDataFrame, 0, sizeof(struct DataFrame));
pDataFrame->id = CLIENT_ID_4G;
pDataFrame->length = payloadLen;
memcpy(pDataFrame->data, receiveBuffer, payloadLen);
/* 将解析后的数据帧放入循环队列 */
PrivMutexObtain(&queue->mutex); // 获取互斥锁
offerBuffer(queue, pDataFrame); // 将数据帧放入队列
printf("Received data from eth, length: %d\n", payloadLen); // 打印接收到的数据帧ID
PrivMutexAbandon(&queue->mutex); // 释放互斥锁
PrivSemaphoreAbandon(&queue->full); // 释放信号量,即告知发送数据线程,队列中有新的数据帧
return 0;
}
/**
* @brief ETH 线
* @param arg
* @return void*
*/
static void* receiveDataFromEthTask(void* arg) {
char serverIpAddress[16] = {}; // 目的IP地址
char serverPort[6] = {}; // 目的端口号
struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)arg; // 循环队列指针
struct Adapter* adapter = AdapterDeviceFindByName(ADAPTER_ETHERNET_NAME); // 查找以太网模块适配器
AdapterDeviceSetUp(adapter); // 启动以太网主任务线程
AdapterDeviceSetDhcp(adapter, CFG->dhcpSwitch_Ethernet); // 启用或禁用DHCP
struct DataFrame* pDataFrame = NULL; // 数据帧定义
/* 尝试连接到服务器 */
sprintf(
serverIpAddress,
"%u.%u.%u.%u",
CFG->destinationIpAddress_Ethernet[0],
CFG->destinationIpAddress_Ethernet[1],
CFG->destinationIpAddress_Ethernet[2],
CFG->destinationIpAddress_Ethernet[3]
);
sprintf(serverPort, "%u", (unsigned short)CFG->destinationPort_Ethernet[0] | CFG->destinationPort_Ethernet[1] << 8);
printf("-*-*-*-*receiveDataFromEthTask*-*-*-*\n");
printf("serverIpAddress:\t%s\n", serverIpAddress);
printf("serverPort:\t\t%s\n", serverPort);
printf("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*\n");
while (1) {
int reconnectCount = RECONNECT_COUNT; // 尝试重新连接服务器最多RECONNECT_COUNT次
while (reconnectCount > 0) {
int res = AdapterDeviceConnect(adapter, CLIENT, serverIpAddress, serverPort, IPV4); // 尝试连接服务器
if (res == 0 || res == 0x1D) {
break;
}
reconnectCount--;
}
if (reconnectCount <= 0) { // 若RECONNECT_COUNT次都连接失败则等待5s再次尝试连接
printf("Ethernet connect to server failed\n"); // 连接失败,打印错误信息
PrivTaskDelay(1000 * 5); // 延迟5秒避免网络拥塞
continue;
}
int recvFailureCnt = 0;
while (recvFailureCnt < MAX_RECV_RETRY) {
int ret = waitAndRecv(pQueueBuffer, adapter);
if (ret < 0) {
recvFailureCnt += 1;
}
}
printf("Max failure count of receiving from ethernet reached, reconnecting\n");
AdapterDeviceDisconnect(adapter, NULL);
PrivTaskDelay(1000 * 5);
}
return NULL;
}
static size_t waitAndSend(struct Adapter* adapter, struct QueueBuffer* pQueueBuffer) {
DataFrame* pDataFrame;
size_t res = 0;
char ack[4] = {0};
PrivSemaphoreObtainWait(&pQueueBuffer->full, NULL); // 尝试获取循环队列队头元素,如果获取信号量失败,则等待信号量
PrivMutexObtain(&pQueueBuffer->mutex); // 获取互斥锁
pDataFrame = pollBuffer(pQueueBuffer); // 从队列中获取数据帧
PrivMutexAbandon(&pQueueBuffer->mutex); // 释放互斥锁
// 定义数据帧重发次数
int resendCount = RESEND_COUNT;
// 只有数据帧非空并且还有剩余重发次数,才进行发送
while (pDataFrame != NULL && resendCount > 0) {
/* 向服务器发送数据 */
printf("Message id: %d, resend cnt %d\n", pDataFrame->id, resendCount);
// 发送不会报告错误
AdapterDeviceSend(adapter, pDataFrame,
sizeof(DataFrame)); // 发送数据注意当前最多发送256字节
memset(ack, 0, sizeof(ack));
// Recv 的实现使得读取会等待至 `len` 数量的字节可用, 这明显不对劲 <-
// won't fix
res = AdapterDeviceRecv(adapter, ack, 2);
printf("ack: %s, res: %zu\n", ack, res);
bool seems_like_ok_reply = any_of(ack, 2, 'O') || any_of(ack, 2, 'K');
// 服务器收到来自 client (通过 magic header 判断) 的东西之后必须回复
// ack
if (res != -1 && seems_like_ok_reply) {
// 发送成功, 别发了
printf("Successfully sent.\n");
break;
}
res = -1;
printf("Error sending frame, retry count left: %d\n", resendCount);
resendCount--;
}
if (pDataFrame != NULL) {
PrivFree(pDataFrame); // 释放数据帧内存
pDataFrame = NULL; // 避免野指针
}
// AdapterDeviceDisconnect(adapter, NULL); // 关闭适配器对应的设备
// 如果数据帧重发次数超过上限,表示发送失败,丢弃该帧
if (resendCount <= 0) {
PrivTaskDelay(1000 * 10); // 延迟10秒避免网络拥塞
}
return res;
}
/**
* @brief 4G向服务器发送数据的线程
* @param arg
* @return void*
*/
static void* sendDataToServerTask_4G(void* arg) {
char serverIpAddress[16] = {}; // 目的IP地址
char serverPort[6] = {}; // 目的端口号
struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)arg; // 循环队列指针
unsigned char receiveBuffer[256]; // 从服务器接收每帧响应的存储空间
struct Adapter* adapter = AdapterDeviceFindByName(ADAPTER_4G_NAME); // 查找4G模块适配器
printf("!!!!!!!! before open, adapter: %p\n", (void*)adapter);
AdapterDeviceOpen(adapter); // 打开适配器对应的设备(实际打开串口中断)
int baud_rate = BAUD_RATE_115200; // 波特率用于设置4G模块串口
printf("!!!!!!!! before devctl\n");
AdapterDeviceControl(adapter, OPE_INT, &baud_rate); // 对适配器对应设备进行配置(实际配置波特率)
printf("!!!!!!!! after devctl\n");
sprintf(
serverIpAddress,
"%u.%u.%u.%u",
CFG->destinationIpAddress_4G[0],
CFG->destinationIpAddress_4G[1],
CFG->destinationIpAddress_4G[2],
CFG->destinationIpAddress_4G[3]
);
sprintf(serverPort, "%u", (unsigned short)CFG->destinationPort_4G[0] | CFG->destinationPort_4G[1] << 8);
printf("-*-*-*-*sendDataToServerTask_4G*-*-*-*\n");
printf("serverIpAddress:\t%s\n", serverIpAddress);
printf("serverPort:\t\t%s\n", serverPort);
printf("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*\n");
// 工作循环
while (1) {
// 连接服务器直到连接成功
int reconnectCount = RECONNECT_COUNT; // 尝试重新连接服务器最多RECONNECT_COUNT次
while (reconnectCount > 0) {
int res;
res = AdapterDeviceConnect(adapter, CLIENT, serverIpAddress, serverPort, IPV4);
printf("Successfully connected to server: %s:%s.\n", serverIpAddress, serverPort);
if (res == 0) {
break;
}
reconnectCount--;
}
if (reconnectCount <= 0) { // 若RECONNECT_COUNT次都连接失败则等待10s再次尝试连接
printf("4G connect to server failed\n"); // 连接失败,打印错误信息
PrivSemaphoreAbandon(&pQueueBuffer->full); // 释放信号量
PrivTaskDelay(1000 * 10); // 延迟10秒避免网络拥塞
continue;
}
// 等待并发送数据帧
while (1) {
size_t res;
res = waitAndSend(adapter, pQueueBuffer);
// 连续发送失败某一帧, 重新连接服务器
if (res == -1) {
AdapterDeviceDisconnect(adapter, NULL);
PrivTaskDelay(1000 * 5);
break;
}
}
}
return NULL;
}
/**
* @brief
* ADL400接收数据的线程以及上传数据到服务器的线程main方法中被调用
*/
void startUpTransformDataTask(void) {
/* 分配循环队列空间 */
struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)PrivCalloc(1, sizeof(struct QueueBuffer));
if (initBuffer(pQueueBuffer) < 0) {
PrivFree(pQueueBuffer);
return;
}
/* 启动从ADL400接收数据的线程 */
pthread_attr_t receiveDataFromEthTaskAttr;
pthread_args_t receiveDataFromEthTaskArgs;
receiveDataFromEthTaskAttr.schedparam.sched_priority = 16; // 线程优先级
receiveDataFromEthTaskAttr.stacksize = 2048; // 线程栈大小
receiveDataFromEthTaskArgs.pthread_name = "receiveDataFromEthTask"; // 线程名字
receiveDataFromEthTaskArgs.arg = pQueueBuffer; // 线程参数
pthread_t receiveDataThread; // 线程ID
PrivTaskCreate(
&receiveDataThread, &receiveDataFromEthTaskAttr, receiveDataFromEthTask, &receiveDataFromEthTaskArgs
);
PrivTaskStartup(&receiveDataThread);
/* 启动上传数据到服务器的线程 */
pthread_attr_t sendDataToServerTaskAttr;
pthread_args_t sendDataToServerTaskArgs;
sendDataToServerTaskAttr.schedparam.sched_priority = 16; // 线程优先级
sendDataToServerTaskAttr.stacksize = 2200; // 线程栈大小
sendDataToServerTaskArgs.pthread_name = "sendDataToServerTask"; // 线程名字
sendDataToServerTaskArgs.arg = pQueueBuffer; // 线程参数
pthread_t sendDataThread; // 线程ID
void* (*start_routine)(void*) = sendDataToServerTask_4G; // 通过4G模块上传到服务器
PrivTaskCreate(
&sendDataThread,
&sendDataToServerTaskAttr,
start_routine,
&sendDataToServerTaskArgs
); // 通过4G模块上传到服务器
PrivTaskStartup(&sendDataThread);
}

View File

@ -0,0 +1,956 @@
/**
* @file ch32v208_adl400.c
* @brief ch32v208 board gets data from Acrel-SENSOR electricity meter with
* rs485 bus, and then sends it to the server with 4G.
* @author Huo Yujia (huoyujia081@126.com)
* @version 1.0
* @date 2024-07-10
*/
#include <ModuleConfig.h>
#include <adapter.h>
#include <stdbool.h>
#include <transform.h>
#define MAX_FRAME_SIZE 128 // 最大帧大小
#define MAX_BUFFER_SIZE 1024 * 2 // 最大缓冲区大小
#define RECEIVE_DATA_INTERVAL_MS 1000 * 10 // SENSOR数据采集间隔时间单位为毫秒
#define RESEND_COUNT 3 // 最大帧重发次数
#define RECONNECT_COUNT 5 // 最大连接次数
#define WATING_RESPONSE_MS 5000 // 等待响应时间,单位为毫秒
#define SINGLE_DATA_RETRY 3 // 重读某一条 sensor 数据的最大次数
#define MAGIC_HEADER "XIJIUSS" // 魔法头, 用来在消息协议中快速标识自身
// manually configured SENSOR_CLIENT_ID, as modifying `xsconfig.h` would cause
// massive recompilation
#if __has_include("client_id.h")
#include "client_id.h"
#ifndef SENSOR_CLIENT_ID
#error You should manually define `SENSOR_CLIENT_ID` in client_id.h, as this file overrides config `CLIENT_ID`
#endif
#else
#define SENSOR_CLIENT_ID CLIENT_ID
#endif
#define STR_H(x) #x
#define STR(x) STR_H(x)
#pragma message "Current SENSOR_CLIENT_ID: " STR(SENSOR_CLIENT_ID)
#ifdef BSP_USING_WDT
// see main.c
static const uint8_t RECV_TASK = 0x1;
static const uint8_t SEND_TASK = 0x2;
#endif
void SensorTaskNotifyAlive(uint8_t task_idx) {
#ifdef BSP_USING_WDT
extern void SetTaskStatus(uint8_t task_idx);
SetTaskStatus(task_idx);
#endif
}
void RecvAlive() { SensorTaskNotifyAlive(RECV_TASK); }
void SendAlive() { SensorTaskNotifyAlive(SEND_TASK); }
static uint8_t channel_O3_data_instruction[] = {
0x01, 0x03, 0x01, 0x55, 0x00, 0x01, 0x95, 0xE6
};
static uint8_t channel_CO2_data_instruction[] = {
0x01, 0x03, 0x01, 0xAB, 0x00, 0x01, 0xF4, 0x16
};
static uint8_t channel_H2S_data_instruction[] = {
0x01, 0x03, 0x02, 0x01, 0x00, 0x01, 0xD4, 0x72
};
static uint8_t channel_CL2_data_instruction[] = {
0x01, 0x03, 0x02, 0x57, 0x00, 0x01, 0x34, 0x62
};
static uint8_t channel_NH3_data_instruction[] = {
0x01, 0x03, 0x02, 0xAD, 0x00, 0x01, 0x14, 0x53
};
static uint8_t channel_TVOC_data_instruction[] = {
0x01, 0x03, 0x03, 0x03, 0x00, 0x01, 0x74, 0x4E
};
static uint8_t channel_CH2O_data_instruction[] = {
0x01, 0x03, 0x03, 0x59, 0x00, 0x01, 0x54, 0x5D
};
static uint8_t channel_C2H5OH_data_instruction[] = {
0x01, 0x03, 0x03, 0xAF, 0x00, 0x01, 0xB4, 0x6F
};
static uint8_t channel_CH4_data_instruction[] = {
0x01, 0x03, 0x04, 0x05, 0x00, 0x01, 0x95, 0x3B
};
static uint8_t channel_O2_data_instruction[] = {
0x01, 0x03, 0x04, 0x5B, 0x00, 0x01, 0xF4, 0xE9
};
static uint8_t channel_AQS_data_instruction[] = {
0x01, 0x03, 0x04, 0xB1, 0x00, 0x01, 0xD5, 0x1D
};
static uint8_t channel_humidity_data_instruction[] = {
0x01, 0x03, 0x0C, 0x6B, 0x00, 0x01, 0xF6, 0x86
};
static uint8_t channel_temperature_data_instruction[] = {
0x01, 0x03, 0x0C, 0xC1, 0x00, 0x01, 0xD6, 0xA6
};
static uint8_t channel_PM1_data_instruction[] = {
0x01, 0x03, 0x0E, 0x6F, 0x00, 0x01, 0xB6, 0xFF
};
static uint8_t channel_PM2_5_data_instruction[] = {
0x01, 0x03, 0x0E, 0x19, 0x00, 0x01, 0x57, 0x25
};
static uint8_t channel_PM10_data_instruction[] = {
0x01, 0x03, 0x0D, 0xC3, 0x00, 0x01, 0x76, 0x9A
};
static uint8_t channel_windspeed_data_instruction[] = {
0x01, 0x03, 0x0D, 0x6D, 0x00, 0x01, 0x17, 0x7B
};
static uint8_t channel_winddirection_data_instruction[] = {
0x01, 0x03, 0x0D, 0x17, 0x00, 0x01, 0x36, 0xA2
};
static uint8_t channel_pressure_data_instruction[] = {
0x01, 0x03, 0x0C, 0x15, 0x00, 0x01, 0x96, 0x9E
};
static uint8_t channel_voice_data_instruction[] = {
0x01, 0x03, 0x0B, 0xBF, 0x00, 0x01, 0xB7, 0xCA
};
typedef struct SensorFields {
uint32_t o3;
uint32_t co2;
uint32_t h2s;
uint32_t cl2;
uint32_t nh3;
uint32_t tvoc;
uint32_t ch2o;
uint32_t c2h5oh;
uint32_t ch4;
uint32_t o2;
uint32_t aqs;
uint32_t humidity;
uint32_t temperature;
uint32_t pm1d0;
uint32_t pm2d5;
uint32_t pm10;
uint32_t windspeed;
uint32_t winddirection;
uint32_t airpressure;
uint32_t voice;
} SensorFields;
/**
* @brief
*/
typedef struct Message {
// magic header for fast validation
char magic_header[8];
// id of sensor cluster
uint16_t sensor_id;
uint8_t _padding[2];
union {
SensorFields fields;
uint32_t data[sizeof(SensorFields) / sizeof(uint32_t)];
};
} Message;
/**
* @brief Modbus RTU响应数据帧的缓存使
*/
struct QueueBuffer {
Message* buffer
[MAX_BUFFER_SIZE / sizeof(Message)]; // 循环队列存储空间,使用数组存储
int front; // 循环队列队头
int rear; // 循环队列队尾
pthread_mutex_t mutex; // 互斥访问循环队列信号量
sem_t full; // 循环队列中有效成员个数的信号量
};
#define BUFFER_ELEM_COUNT \
(MAX_BUFFER_SIZE / sizeof(Message)) // 循环队列中可以容纳的最大成员个数
__attribute__((always_inline)) inline bool
any_of(const char* s, size_t len, char c) {
for (size_t i = 0; i < len; ++i) {
if (s[i] == c) {
return true;
}
}
return false;
}
/**
* @brief
* @param pQueueBuffer
* @return * int 0
*/
static int initBuffer(struct QueueBuffer* pQueueBuffer) {
pQueueBuffer->front = 0;
pQueueBuffer->rear = 0;
if (PrivMutexCreate(&pQueueBuffer->mutex, 0) < 0) {
printf("buffer mutex create failed.\n");
return -1;
}
if (PrivSemaphoreCreate(&pQueueBuffer->full, 0, 0) < 0) {
printf("buffer full semaphore create failed.\n");
return -1;
}
return 0;
}
/**
* @brief
* @param pQueueBuffer
* @param pMsg SENSOR响应数据帧
* @return int 0
*/
static int offerBuffer(struct QueueBuffer* pQueueBuffer, Message* pMsg) {
/* 循环队列已满,将最旧的成员出队 */
if ((pQueueBuffer->rear + 1) % BUFFER_ELEM_COUNT == pQueueBuffer->front) {
Message* frontDataFrame = pQueueBuffer->buffer[pQueueBuffer->front];
PrivFree(frontDataFrame);
pQueueBuffer->front = (pQueueBuffer->front + 1) % BUFFER_ELEM_COUNT;
}
/* 新成员入队 */
pQueueBuffer->buffer[pQueueBuffer->rear] = pMsg;
pQueueBuffer->rear = (pQueueBuffer->rear + 1) % BUFFER_ELEM_COUNT;
printf("front: %d\n", pQueueBuffer->front);
printf("rear: %d\n", pQueueBuffer->rear);
return 0;
}
/**
* @brief NULL
* @param pQueueBuffer
* @return struct DataFrame* NULL
*/
static Message* pollBuffer(struct QueueBuffer* pQueueBuffer) {
/* 队列为空返回NULL */
if (pQueueBuffer->front == pQueueBuffer->rear) {
return NULL;
}
/* 最旧的成员出队 */
Message* pFrontDataFrame = pQueueBuffer->buffer[pQueueBuffer->front];
pQueueBuffer->buffer[pQueueBuffer->front] = NULL;
pQueueBuffer->front = (pQueueBuffer->front + 1) % BUFFER_ELEM_COUNT;
printf("front: %d\n", pQueueBuffer->front);
printf("rear: %d\n", pQueueBuffer->rear);
return pFrontDataFrame;
}
/**
* @brief NULL
* @param pQueueBuffer
* @return struct DataFrame* NULL
*/
static Message* peekBuffer(struct QueueBuffer* pQueueBuffer) {
/* 如果队列为空返回NULL */
if (pQueueBuffer->front == pQueueBuffer->rear) {
return NULL;
}
/* 返回队头元素,但不出队 */
return pQueueBuffer->buffer[pQueueBuffer->front];
}
/**
* @brief
* PrivRead函数
* @param fd
* @param buf
* @param len
* @return int
* 0WATING_RESPONSE_MS仍未读取到指定字节数-1
*/
static int privReadEnoughData(int fd, int32_t* sensor_data, size_t len) {
uint8_t buffer[8]; // 将接收的存储空间指针强制转型
int gottenBytes = 0; // 已经读取到的字节数
int remainTime = WATING_RESPONSE_MS; // 剩余的时间
/* 只有接收的字节数不够,并且还有剩余时间,才可以继续读取 */
while (gottenBytes < len && remainTime > 0) {
int bytes =
PrivRead(fd, buffer + gottenBytes, len - gottenBytes); // 从设备读取
if ((0x01 != buffer[0]) || (0x03 != buffer[1])) {
printf("wrong head 0x%x 0x%x\n", buffer[0], buffer[1]);
gottenBytes = 0;
return -1;
}
gottenBytes += bytes;
PrivTaskDelay(100); // 每100ms读取一次
remainTime -= 100; // 剩余时间减去100ms
}
printf("0x%X ", buffer[3]);
printf("0x%X\n", buffer[4]);
*sensor_data = ((buffer[3] << 8) & 0xFF00) | (buffer[4] & 0xFF);
/* 若没有剩余时间,表示还没有读取到指定的字节数,返回-1若有剩余时间表示已经读取了指定的字节数返回0
*/
printf("0x%08X\n", *sensor_data);
return remainTime < 0 ? -1 : 0;
}
__attribute__((always_inline)) inline static int sensorReadSingleData(
uint8_t* buffer,
int fd,
const uint8_t* instr,
size_t instr_len,
const char* name
) {
int32_t sensor_data = 0;
printf("########SENSOR_QUANTITY_%s########\n", name);
PrivWrite(fd, instr, instr_len);
PrivTaskDelay(20);
if (privReadEnoughData(fd, &sensor_data, 7)) {
// 读取超时,打印错误信息
printf("read data from sensor %s error\n", name);
return -1; // 读取失败,退出循环
}
/* drain the rs485 device buffer before requesting new data */
char dummy[8];
while (PrivRead(fd, dummy, sizeof(dummy)))
;
PrivTaskDelay(30);
memcpy(buffer, &sensor_data, sizeof(sensor_data));
return 0;
}
int sensorIntegrationReadData(int fd, uint8_t* read_data, size_t* data_len) {
size_t current_len = 0;
#define name2instr(name) channel_##name##_data_instruction
#define try_fetch_single(rest_retry, name) \
do { /* 'read_single */ \
uint8_t* cpos = read_data + current_len; \
int ret = sensorReadSingleData( \
cpos, fd, name2instr(name), sizeof(name2instr(name)), #name \
); \
if (ret != 0) { \
if (rest_retry > 0) { \
/* retry this data on fail */ \
rest_retry--; \
PrivTaskDelay(100); \
continue /* 'read_single */; \
} else { \
printf("Failed to retrieve %s, max attemp reached\n", #name); \
return -1; \
} \
} /* fall through when succeed */ \
current_len += sizeof(int32_t); \
break; \
} while (1)
#define fetch_single(name) \
do { \
int rest_retry = SINGLE_DATA_RETRY; \
try_fetch_single(rest_retry, name); \
} while (0)
#define x_sensors \
X(O3); \
X(CO2); \
X(H2S); \
X(CL2); \
X(NH3); \
X(TVOC); \
X(CH2O); \
X(C2H5OH); \
X(CH4); \
X(O2); \
X(AQS); \
X(humidity); \
X(temperature); \
X(PM1); \
X(PM2_5); \
X(PM10); \
X(windspeed); \
X(winddirection); \
X(pressure); \
X(voice);
#define X fetch_single
x_sensors;
#undef X
*data_len = current_len;
#undef x_sensors
#undef fetch_single
#undef try_fetch_single
#undef name2instr
return 0;
}
/**
* @brief sensor接收数据的线程
* @param arg
* @return void*
*/
static void* receiveDataFromSENSORTask(void* arg) {
RecvAlive();
struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)arg; // 循环队列指针
int fd = PrivOpen("/dev/rs485_dev1", O_RDWR); // 打开设备文件
if (fd < 0) { // 打开设备文件失败,打印错误信息
printf("open rs485 fd error: %d\n", fd);
return NULL;
}
struct SerialDataCfg rs485Configuration;
memset(&rs485Configuration, 0, sizeof(struct SerialDataCfg));
/* 读取RS485配置信息 */
PrivMutexObtain(&romConfigurationMutex
); // 若其他线程正在读取或者写入CFG则阻塞等待
int baudRatesOption = CFG->baudRate_Rs485;
int dataBitsOption = CFG->dataBits_Rs485;
int stopBitsOption = CFG->stopBits_Rs485;
int parityOption = CFG->parity_Rs485;
PrivMutexAbandon(&romConfigurationMutex); // 释放互斥锁
switch (baudRatesOption) {
case 1:
rs485Configuration.serial_baud_rate = BAUD_RATE_2400;
break;
case 2:
rs485Configuration.serial_baud_rate = BAUD_RATE_4800;
break;
case 3:
rs485Configuration.serial_baud_rate = BAUD_RATE_9600;
break;
case 4:
rs485Configuration.serial_baud_rate = BAUD_RATE_19200;
break;
case 5:
rs485Configuration.serial_baud_rate = BAUD_RATE_38400;
break;
case 6:
rs485Configuration.serial_baud_rate = BAUD_RATE_57600;
break;
case 7:
rs485Configuration.serial_baud_rate = BAUD_RATE_115200;
break;
case 8:
rs485Configuration.serial_baud_rate = BAUD_RATE_230400;
break;
default:
rs485Configuration.serial_baud_rate = BAUD_RATE_9600;
break;
}
switch (dataBitsOption) {
case 1:
rs485Configuration.serial_data_bits = DATA_BITS_8;
break;
case 2:
rs485Configuration.serial_data_bits = DATA_BITS_9;
break;
default:
rs485Configuration.serial_data_bits = DATA_BITS_8;
break;
}
switch (stopBitsOption) {
case 1:
rs485Configuration.serial_stop_bits = STOP_BITS_1;
break;
case 2:
rs485Configuration.serial_stop_bits = STOP_BITS_2;
break;
default:
rs485Configuration.serial_stop_bits = STOP_BITS_1;
break;
}
switch (parityOption) {
case 1:
rs485Configuration.serial_parity_mode = PARITY_NONE;
break;
case 2:
rs485Configuration.serial_parity_mode = PARITY_ODD;
break;
case 3:
rs485Configuration.serial_parity_mode = PARITY_EVEN;
break;
}
struct PrivIoctlCfg ioctl_cfg;
ioctl_cfg.ioctl_driver_type = SERIAL_TYPE;
ioctl_cfg.args = (void*)&rs485Configuration;
if (0 != PrivIoctl(fd, OPE_INT, &ioctl_cfg)) {
printf("ioctl uart fd error %d\n", fd);
PrivClose(fd);
return NULL;
}
Message* msg = NULL;
while (1) {
if (msg == NULL) {
msg = (Message*)PrivMalloc(sizeof(Message));
memset(msg, 0, sizeof(*msg));
memcpy(&msg->magic_header, MAGIC_HEADER, sizeof(MAGIC_HEADER));
msg->sensor_id = SENSOR_CLIENT_ID;
}
size_t data_len = 0;
if (sensorIntegrationReadData(fd, (uint8_t*)msg->data, &data_len)) {
// at least it tried, and was not stuck on anything
RecvAlive();
PrivTaskDelay(100);
continue;
}
// 测试打印数据
for (int i = 0; i < 20; ++i) {
printf("0x%08X ", msg->data[i]);
}
printf("\n");
// keep alive if actual data is obtained
RecvAlive();
/* 将解析后的数据帧放入循环队列 */
PrivMutexObtain(&pQueueBuffer->mutex); // 获取互斥锁
offerBuffer(pQueueBuffer, msg); // 将数据帧放入队列
msg = NULL; // 在下个循环时创建新的数据帧
// printf("receive data from SENSOR, id: %s\n", pDataFrame->id); //
// 打印接收到的数据帧ID
PrivMutexAbandon(&pQueueBuffer->mutex); // 释放互斥锁
PrivSemaphoreAbandon(&pQueueBuffer->full
); // 释放信号量,即告知发送数据线程,队列中有新的数据帧
PrivTaskDelay(RECEIVE_DATA_INTERVAL_MS); // 延迟一段时间再读取下一帧数据
// keep alive for this round of reading
RecvAlive();
}
PrivClose(fd); // 关闭设备文件
return NULL;
}
static size_t
waitAndSend(struct Adapter* adapter, struct QueueBuffer* pQueueBuffer) {
Message* pDataFrame;
size_t res = 0;
char ack[4] = {0};
PrivSemaphoreObtainWait(
&pQueueBuffer->full, NULL
); // 尝试获取循环队列队头元素,如果获取信号量失败,则等待信号量
PrivMutexObtain(&pQueueBuffer->mutex); // 获取互斥锁
pDataFrame = pollBuffer(pQueueBuffer); // 从队列中获取数据帧
PrivMutexAbandon(&pQueueBuffer->mutex); // 释放互斥锁
// 定义数据帧重发次数
int resendCount = RESEND_COUNT;
// 只有数据帧非空并且还有剩余重发次数,才进行发送
while (pDataFrame != NULL && resendCount > 0) {
/* 向服务器发送数据 */
printf(
"Message id: %d, resend cnt %d\n",
pDataFrame->sensor_id,
resendCount
);
if (CFG->mqttSwitch_4G == 1) { // MQTT模式下无需服务器响应数据
AdapterDeviceMqttSend(
adapter,
(char*)CFG->mqttTopic_4G,
pDataFrame,
sizeof(Message)
); // 发送数据注意当前最多发送256字节
break;
} else {
// 发送不会报告错误
AdapterDeviceSend(
adapter,
pDataFrame,
sizeof(Message)
); // 发送数据注意当前最多发送256字节
memset(ack, 0, sizeof(ack));
// Recv 的实现使得读取会等待至 `len` 数量的字节可用, 这明显不对劲 <-
// won't fix
res = AdapterDeviceRecv(adapter, ack, 2);
printf("ack: %s, res: %zu\n", ack, res);
bool seems_like_ok_reply =
any_of(ack, 2, 'O') || any_of(ack, 2, 'K');
// // 只要服务器回了东西就算发送成功
// // 端侧不感知是否发送成功, 看最后数据入库结果在前端做通知
// 服务器收到来自 client (通过 magic header 判断) 的东西之后必须回复
// ack
if (res != -1 && seems_like_ok_reply) {
// 发送成功, 别发了
printf("Successfully sent.\n");
break;
}
res = -1;
printf("Error sending frame, retry count left: %d\n", resendCount);
/* 从服务器接收响应约定服务器接收完数据帧后返回数据帧中的前12个字节即数据帧id
*/
/* 多读取2字节是为了防止前面还有命令模式返回的剩余的\r\n影响判断
*/
// memset(receiveBuffer, 0, sizeof(receiveBuffer));
// int receiveLength = AdapterDeviceRecv(adapter, receiveBuffer,
// strlen(pDataFrame->id) + 2); if (receiveLength ==
// strlen(pDataFrame->id) + 2 || receiveLength ==
// strlen(pDataFrame->id)) {
// // 打印服务器响应
// printf("receiveLength: %d\n", receiveLength);
// printf("receiveBuffer: ");
// for (int i = 0; i < receiveLength; i++) {
// printf("%c", receiveBuffer[i]);
// }
// printf("\n");
// //比较服务器响应的内容与发送的数据帧id是否一致
// if (strstr(receiveBuffer, pDataFrame->id) != NULL) {
// break; // 接收成功,退出循环
// }
// } else {
// printf("receiveLength: %d\n", receiveLength);
// printf("receiveBuffer: ");
// for (int i = 0; i < receiveLength; i++) {
// printf("%d ", receiveBuffer[i]);
// }
// printf("\n");
// }
}
resendCount--;
}
if (pDataFrame != NULL) {
PrivFree(pDataFrame); // 释放数据帧内存
pDataFrame = NULL; // 避免野指针
}
// AdapterDeviceDisconnect(adapter, NULL); // 关闭适配器对应的设备
// 如果数据帧重发次数超过上限,表示发送失败,丢弃该帧
if (resendCount <= 0) {
PrivTaskDelay(1000 * 10); // 延迟10秒避免网络拥塞
}
return res;
}
/**
* @brief 4G向服务器发送数据的线程
* @param arg
* @return void*
*/
static void* sendDataToServerTask_4G(void* arg) {
SendAlive();
char serverIpAddress[16] = {}; // 目的IP地址
char serverPort[6] = {}; // 目的端口号
struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)arg; // 循环队列指针
struct Adapter* adapter =
AdapterDeviceFindByName(ADAPTER_4G_NAME); // 查找4G模块适配器
printf("!!!!!!!! before open, adapter: %p\n", (void*)adapter);
AdapterDeviceOpen(adapter); // 打开适配器对应的设备(实际打开串口中断)
int baud_rate = BAUD_RATE_115200; // 波特率用于设置4G模块串口
printf("!!!!!!!! before devctl\n");
AdapterDeviceControl(
adapter, OPE_INT, &baud_rate
); // 对适配器对应设备进行配置(实际配置波特率)
printf("!!!!!!!! after devctl\n");
sprintf(
serverIpAddress,
"%u.%u.%u.%u",
CFG->destinationIpAddress_4G[0],
CFG->destinationIpAddress_4G[1],
CFG->destinationIpAddress_4G[2],
CFG->destinationIpAddress_4G[3]
);
sprintf(
serverPort,
"%u",
(unsigned short)CFG->destinationPort_4G[0] | CFG->destinationPort_4G[1]
<< 8
);
printf("-*-*-*-*sendDataToServerTask_4G*-*-*-*\n");
printf("serverIpAddress:\t%s\n", serverIpAddress);
printf("serverPort:\t\t%s\n", serverPort);
printf("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*\n");
// 工作循环
while (1) {
// 连接服务器直到连接成功
int reconnectCount =
RECONNECT_COUNT; // 尝试重新连接服务器最多RECONNECT_COUNT次
while (reconnectCount > 0) {
int res;
res = AdapterDeviceConnect(
adapter, CLIENT, serverIpAddress, serverPort, IPV4
);
printf(
"Successfully connected to server: %s:%s.\n",
serverIpAddress,
serverPort
);
if (res == 0) {
break;
}
reconnectCount--;
}
if (reconnectCount <=
0) { // 若RECONNECT_COUNT次都连接失败则等待10s再次尝试连接
printf("4G connect to server failed\n"); // 连接失败,打印错误信息
PrivSemaphoreAbandon(&pQueueBuffer->full); // 释放信号量
PrivTaskDelay(1000 * 10); // 延迟10秒避免网络拥塞
continue;
}
// successfully connected
SendAlive();
// 等待并发送数据帧
while (1) {
size_t res;
res = waitAndSend(adapter, pQueueBuffer);
// 连续发送失败某一帧, 重新连接服务器
if (res == -1) {
AdapterDeviceDisconnect(adapter, NULL);
PrivTaskDelay(1000 * 5);
break;
} else {
SendAlive();
}
}
}
return NULL;
}
/**
* @brief 线
* @param arg
* @return void*
*/
static void* sendDataToServerTask_Ethernet(void* arg) {
char serverIpAddress[16] = {}; // 目的IP地址
char serverPort[6] = {}; // 目的端口号
struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)arg; // 循环队列指针
unsigned char receiveBuffer[256]; // 从服务器接收每帧响应的存储空间
struct Adapter* adapter =
AdapterDeviceFindByName(ADAPTER_ETHERNET_NAME); // 查找以太网模块适配器
#ifndef BSP_BLE_CONFIG // 如果没有使能蓝牙配置功能
AdapterDeviceSetUp(adapter); // 启动以太网主任务线程
AdapterDeviceSetDhcp(adapter, CFG->dhcpSwitch_Ethernet); // 启用或禁用DHCP
#endif
Message* pDataFrame = NULL; // 数据帧定义
while (1) {
PrivSemaphoreObtainWait(
&pQueueBuffer->full, NULL
); // 尝试获取循环队列队头元素,如果获取信号量失败,则等待信号量
#ifdef BSP_BLE_CONFIG // 使能蓝牙配置功能
/* 获取互斥锁 */
PrivMutexObtain(&adapter->lock
); // 若其他线程正在使用adapter则阻塞等待
PrivMutexObtain(&romConfigurationMutex
); // 若其他线程正在读取或者写入CFG则阻塞等待;
/* 尝试连接服务器 */
sprintf(
serverIpAddress,
"%u.%u.%u.%u",
CFG->destinationIpAddress_Ethernet[0],
CFG->destinationIpAddress_Ethernet[1],
CFG->destinationIpAddress_Ethernet[2],
CFG->destinationIpAddress_Ethernet[3]
);
sprintf(
serverPort,
"%u",
(unsigned short)CFG->destinationPort_Ethernet[0] |
CFG->destinationPort_Ethernet[1] << 8
);
printf("-*-*-*-*sendDataToServerTask_Ethernet*-*-*-*\n");
printf("serverIpAddress:\t%s\n", serverIpAddress);
printf("serverPort:\t\t%s\n", serverPort);
printf("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*\n");
int res = AdapterDeviceConnect(
adapter, CLIENT, serverIpAddress, serverPort, IPV4
);
/* 连接失败则等待10s再次尝试连接 */
if (res != 0 && res != 0x1D) {
PrivSemaphoreAbandon(&pQueueBuffer->full); // 释放信号量
/* 释放互斥锁 */
PrivMutexAbandon(&romConfigurationMutex);
PrivMutexAbandon(&adapter->lock);
printf("Ethernet connect to server failed\n"
); // 连接失败,打印错误信息
PrivTaskDelay(1000 * 10); // 延迟10秒避免网络拥塞
continue;
}
#else
/* 尝试连接到服务器 */
sprintf(
serverIpAddress,
"%u.%u.%u.%u",
CFG->destinationIpAddress_Ethernet[0],
CFG->destinationIpAddress_Ethernet[1],
CFG->destinationIpAddress_Ethernet[2],
CFG->destinationIpAddress_Ethernet[3]
);
sprintf(
serverPort,
"%u",
(unsigned short)CFG->destinationPort_Ethernet[0] |
CFG->destinationPort_Ethernet[1] << 8
);
printf("-*-*-*-*sendDataToServerTask_Ethernet*-*-*-*\n");
printf("serverIpAddress:\t%s\n", serverIpAddress);
printf("serverPort:\t\t%s\n", serverPort);
printf("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*\n");
int reconnectCount =
RECONNECT_COUNT; // 尝试重新连接服务器最多RECONNECT_COUNT次
while (reconnectCount > 0) {
int res = AdapterDeviceConnect(
adapter, CLIENT, serverIpAddress, serverPort, IPV4
); // 尝试连接服务器
if (res == 0 || res == 0x1D) {
break;
}
reconnectCount--;
}
if (reconnectCount <=
0) { // 若RECONNECT_COUNT次都连接失败则等待10s再次尝试连接
PrivSemaphoreAbandon(&pQueueBuffer->full); // 释放信号量
printf("Ethernet connect to server failed\n"
); // 连接失败,打印错误信息
PrivTaskDelay(1000 * 10); // 延迟10秒避免网络拥塞
continue;
}
#endif
PrivMutexObtain(&pQueueBuffer->mutex); // 获取互斥锁
pDataFrame = pollBuffer(pQueueBuffer); // 从队列中获取数据帧
PrivMutexAbandon(&pQueueBuffer->mutex); // 释放互斥锁
int resendCount = RESEND_COUNT; // 定义数据帧重发次数
/* 只有数据帧非空并且还有剩余重发次数,才进行发送 */
while (pDataFrame != NULL && resendCount > 0) {
/* 向服务器发送数据 */
// printf("send data to server, id: %s\n", pDataFrame->id);
printf("pDataFrame id: %d\n", pDataFrame->sensor_id);
AdapterDeviceSend(
adapter,
pDataFrame,
sizeof(*pDataFrame)
); // 发送数据注意当前最多发送256字节
/* 从服务器接收响应约定服务器接收完数据帧后返回数据帧中的前12个字节即数据帧id
*/
memset(receiveBuffer, 0, sizeof(receiveBuffer));
PrivTaskDelay(6000);
/* if (AdapterDeviceRecv(adapter, receiveBuffer,
strlen(pDataFrame->id)) == strlen(pDataFrame->id)) {
printf("receiveBuffer: ");
for (int i = 0; i < strlen(receiveBuffer); i++) {
printf("%c", receiveBuffer[i]);
}
printf("\n");
if (strstr(pDataFrame->id, receiveBuffer) != NULL) {
break; // 接收成功,退出循环
}
} */
resendCount--;
}
if (pDataFrame != NULL) {
PrivFree(pDataFrame); // 释放数据帧内存
pDataFrame = NULL; // 避免野指针
}
AdapterDeviceDisconnect(adapter, NULL);
#ifdef BSP_BLE_CONFIG
/* 释放互斥锁 */
PrivMutexAbandon(&romConfigurationMutex);
PrivMutexAbandon(&adapter->lock);
#endif
if (resendCount <=
0) { // 如果数据帧重发次数超过上限,表示发送失败,丢弃该帧
PrivTaskDelay(1000 * 10); // 延迟10秒避免网络拥塞
}
}
return NULL;
}
/**
* @brief
* SENSOR接收数据的线程以及上传数据到服务器的线程main方法中被调用
*/
void startUpTransformDataTask(void) {
/* 分配循环队列空间 */
struct QueueBuffer* pQueueBuffer =
(struct QueueBuffer*)PrivCalloc(1, sizeof(struct QueueBuffer));
if (initBuffer(pQueueBuffer) < 0) {
PrivFree(pQueueBuffer);
return;
}
/* 启动从SENSOR接收数据的线程 */
pthread_attr_t receiveDataFromSENSORTaskAttr;
pthread_args_t receiveDataFromSENSORTaskArgs;
receiveDataFromSENSORTaskAttr.schedparam.sched_priority = 16; // 线程优先级
receiveDataFromSENSORTaskAttr.stacksize = 2048; // 线程栈大小
receiveDataFromSENSORTaskArgs.pthread_name =
"receiveDataFromSENSORTask"; // 线程名字
receiveDataFromSENSORTaskArgs.arg = pQueueBuffer; // 线程参数
pthread_t receiveDataThread; // 线程ID
PrivTaskCreate(
&receiveDataThread,
&receiveDataFromSENSORTaskAttr,
receiveDataFromSENSORTask,
&receiveDataFromSENSORTaskArgs
);
PrivTaskStartup(&receiveDataThread);
/* 启动上传数据到服务器的线程 */
pthread_attr_t sendDataToServerTaskAttr;
pthread_args_t sendDataToServerTaskArgs;
sendDataToServerTaskAttr.schedparam.sched_priority = 16; // 线程优先级
sendDataToServerTaskAttr.stacksize = 2200; // 线程栈大小
sendDataToServerTaskArgs.pthread_name = "sendDataToServerTask"; // 线程名字
sendDataToServerTaskArgs.arg = pQueueBuffer; // 线程参数
pthread_t sendDataThread; // 线程ID
void* (*start_routine)(void*) =
sendDataToServerTask_4G; // 通过4G模块上传到服务器
// void *(*start_routine)(void *) = sendDataToServerTask_Ethernet; //
// 通过以太网模块上传到服务器
PrivTaskCreate(
&sendDataThread,
&sendDataToServerTaskAttr,
start_routine,
&sendDataToServerTaskArgs
); // 通过4G模块上传到服务器
PrivTaskStartup(&sendDataThread);
}

View File

@ -6,8 +6,17 @@ menu "connection app"
if APPLICATION_CONNECTION
menuconfig SOCKET_DEMO
bool "Config test socket demo"
default n
bool "Config test socket demo"
default n
menuconfig USE_4G_APP
bool "Enable 4G apps"
default n
if USE_4G_APP
source "$APP_DIR/Applications/connection_app/4g_app/Kconfig"
endif
endif
endmenu

View File

@ -7,11 +7,11 @@ endif
ifeq ($(CONFIG_ADD_XIZI_FEATURES),y)
ifeq ($(CONFIG_CONNECTION_ADAPTER_4G),y)
ifeq ($(CONFIG_USE_4G_APP),y)
SRC_DIR += 4g_app
endif
ifeq ($(CONFIG_RESOURCES_LWIP),y)
ifeq ($(CONFIG_SOCKET_DEMO),y)
SRC_DIR += socket_demo
endif

View File

@ -11,10 +11,11 @@
*/
#include <stdint.h>
#include <stdatomic.h>
#include <stdio.h>
#include <string.h>
#include <stdatomic.h>
// #include <user_api.h>
#include <transform.h>
@ -25,18 +26,14 @@ static int watchdog_fd;
static pthread_t watch_dog_task;
static pthread_mutex_t task_status_lock;
// receiveDataFromSENSORTask(0x01)、sendDataToServerTask(0x02)
#define TASK_NUM 0x03
// receiveDataFromEthTask(0x01)、sendDataToServerTask(0x02)
#define TASK_NUM 0x03
void SetTaskStatus(uint8_t task_idx) {
atomic_fetch_or(&task_status, task_idx);
}
void SetTaskStatus(uint8_t task_idx) { atomic_fetch_or(&task_status, task_idx); }
static void ClearTaskStatus() {
atomic_store(&task_status, 0);
}
static void ClearTaskStatus() { atomic_store(&task_status, 0); }
static void* FeedWatchdogTask(void* parameter) {
static void *FeedWatchdogTask(void *parameter) {
uint8_t status = 0;
while (1) {
@ -62,7 +59,7 @@ int WatchdogInit(uint16_t timeout) {
int ret = 0;
PrivMutexCreate(&task_status_lock, 0);
watchdog_fd = PrivOpen("/dev/wdt0_dev", O_RDWR);
watchdog_fd = PrivOpen("/dev/" WDT_0_DEVICE_NAME_0, O_RDWR);
/* set watchdog timeout time */
struct PrivIoctlCfg ioctl_cfg;

View File

@ -10,4 +10,4 @@ ifeq ($(CONFIG_ADD_XIZI_FEATURES),y)
SRC_FILES := e220.c
include $(KERNEL_ROOT)/compiler.mk
endif
endif