diff --git a/APP_Framework/Applications/connection_app/4g_app/.gitignore b/APP_Framework/Applications/connection_app/4g_app/.gitignore new file mode 100644 index 000000000..370c9830e --- /dev/null +++ b/APP_Framework/Applications/connection_app/4g_app/.gitignore @@ -0,0 +1 @@ +/client_id.h diff --git a/APP_Framework/Applications/connection_app/4g_app/Kconfig b/APP_Framework/Applications/connection_app/4g_app/Kconfig new file mode 100644 index 000000000..19bea147e --- /dev/null +++ b/APP_Framework/Applications/connection_app/4g_app/Kconfig @@ -0,0 +1,19 @@ +config USE_ADL_APP + bool "Use app for adl400 electrometer" + default n + +if BSP_USING_ETH + config USE_ETH_4G_APP + bool "Use app for bridging ETH and 4G" + default n +endif + +config USE_SENSOR_APP + bool "Use app for collecting sensor data from rs485" + default n + +if USE_ETH_4G_APP || USE_SENSOR_APP + config CLIENT_ID + int "Client ID used in 4G dataframe" + default 0 +endif diff --git a/APP_Framework/Applications/connection_app/4g_app/Makefile b/APP_Framework/Applications/connection_app/4g_app/Makefile index cf103926a..87dc1c486 100644 --- a/APP_Framework/Applications/connection_app/4g_app/Makefile +++ b/APP_Framework/Applications/connection_app/4g_app/Makefile @@ -1,3 +1,15 @@ -SRC_FILES := 4g_app.c ch32v208_adl400.c +SRC_FILES := + +ifeq ($(CONFIG_USE_ADL_APP),y) + SRC_FILES += ch32v208_adl400.c +endif + +ifeq ($(CONFIG_USE_ETH_4G_APP),y) + SRC_FILES += ch32v208_eth_4g.c +endif + +ifeq ($(CONFIG_USE_SENSOR_APP),y) + SRC_FILES += ch32v208_sensor.c +endif include $(KERNEL_ROOT)/compiler.mk diff --git a/APP_Framework/Applications/connection_app/4g_app/ch32v208_eth_4g.c b/APP_Framework/Applications/connection_app/4g_app/ch32v208_eth_4g.c new file mode 100644 index 000000000..c3c8cdab1 --- /dev/null +++ b/APP_Framework/Applications/connection_app/4g_app/ch32v208_eth_4g.c @@ -0,0 +1,462 @@ +/** + * @file ch32v208_eth_4g.c + * @brief Application that runs on ch32v208 board, receiving data from ETH and + * send it over 4G. + * @author Zhou Zhi + * @version 1.0 + * @date 2025-02-24 + */ + +#include +#include +#include +#include + +#define MAX_FRAME_SIZE 256 // 最大帧大小 +#define MAX_BUFFER_SIZE 1024 * 2 // 最大缓冲区大小 +#define RESEND_COUNT 3 // 最大帧重发次数 +#define MAX_RECV_RETRY 3 // 最大帧重试接受次数 +#define RECONNECT_COUNT 5 // 最大连接次数 +#define WATING_RESPONSE_MS 5000 // 等待响应时间,单位为毫秒 + +// manually configured SENSOR_CLIENT_ID, as modifying `xsconfig.h` would cause +// massive recompilation +#if __has_include("client_id.h") + #include "client_id.h" + + #ifndef CLIENT_ID_4G + #error You should manually define `CLIENT_ID_4G` in client_id.h, as this file overrides config `CLIENT_ID` + #endif + +#else + #define CLIENT_ID_4G CLIENT_ID +#endif + +#define STR_H(x) #x +#define STR(x) STR_H(x) +#pragma message "Current SENSOR_CLIENT_ID: " STR(CLIENT_ID_4G) + +__attribute__((always_inline)) inline bool any_of(const char* s, size_t len, char c) { + for (size_t i = 0; i < len; ++i) { + if (s[i] == c) { + return true; + } + } + return false; +} + +/** + * @brief 将要上传服务器的数据帧 + */ +typedef struct DataFrame { + uint16_t id; + uint16_t length; + char data[MAX_FRAME_SIZE]; // 上传服务器的数据帧字符串 +} DataFrame; + +/** + * @brief 数据帧的缓存,使用循环队列作为数据结构 + */ +struct QueueBuffer { + struct DataFrame* buffer[MAX_BUFFER_SIZE / sizeof(struct DataFrame)]; // 循环队列存储空间,使用数组存储 + int front; // 循环队列队头 + int rear; // 循环队列队尾 + pthread_mutex_t mutex; // 互斥访问循环队列信号量 + sem_t full; // 循环队列中有效成员个数的信号量 +}; + +#define BUFFER_ELEM_COUNT (MAX_BUFFER_SIZE / sizeof(DataFrame)) // 循环队列中可以容纳的最大成员个数 + +/** + * @brief 初始化循环队列 + * @param pQueueBuffer 循环队列指针 + * @return * int 0表示初始化成功,其他表示初始化失败 + */ +static int initBuffer(struct QueueBuffer* pQueueBuffer) { + pQueueBuffer->front = 0; + pQueueBuffer->rear = 0; + if (PrivMutexCreate(&pQueueBuffer->mutex, 0) < 0) { + printf("buffer mutex create failed.\n"); + return -1; + } + if (PrivSemaphoreCreate(&pQueueBuffer->full, 0, 0) < 0) { + printf("buffer full semaphore create failed.\n"); + return -1; + } + return 0; +} + +/** + * @brief 循环队列入队,如果循环队列已满,则将最旧的成员出队后,新成员再入队 + * @param pQueueBuffer 循环队列指针 + * @param pDataFrame 数据帧 + * @return int 0表示入队成功,其他表示入队失败 + */ +static int offerBuffer(struct QueueBuffer* pQueueBuffer, struct DataFrame* pDataFrame) { + /* 循环队列已满,将最旧的成员出队 */ + if ((pQueueBuffer->rear + 1) % BUFFER_ELEM_COUNT == pQueueBuffer->front) { + struct DataFrame* frontDataFrame = pQueueBuffer->buffer[pQueueBuffer->front]; + PrivFree(frontDataFrame); + pQueueBuffer->front = (pQueueBuffer->front + 1) % BUFFER_ELEM_COUNT; + } + /* 新成员入队 */ + pQueueBuffer->buffer[pQueueBuffer->rear] = pDataFrame; + pQueueBuffer->rear = (pQueueBuffer->rear + 1) % BUFFER_ELEM_COUNT; + printf("front: %d\n", pQueueBuffer->front); + printf("rear: %d\n", pQueueBuffer->rear); + return 0; +} + +/** + * @brief 循环队列出队,如果队列为空则返回NULL + * @param pQueueBuffer 循环队列指针 + * @return struct DataFrame* 出队成员,如果队列为空则返回NULL + */ +static struct DataFrame* pollBuffer(struct QueueBuffer* pQueueBuffer) { + /* 队列为空,返回NULL */ + if (pQueueBuffer->front == pQueueBuffer->rear) { + return NULL; + } + /* 最旧的成员出队 */ + struct DataFrame* pFrontDataFrame = pQueueBuffer->buffer[pQueueBuffer->front]; + pQueueBuffer->buffer[pQueueBuffer->front] = NULL; + pQueueBuffer->front = (pQueueBuffer->front + 1) % BUFFER_ELEM_COUNT; + printf("front: %d\n", pQueueBuffer->front); + printf("rear: %d\n", pQueueBuffer->rear); + return pFrontDataFrame; +} + +/** + * @brief 查看队头元素,如果队列为空则返回NULL + * @param pQueueBuffer 循环队列指针 + * @return struct DataFrame* 队头元素,如果队列为空则返回NULL + */ +static struct DataFrame* peekBuffer(struct QueueBuffer* pQueueBuffer) { + /* 如果队列为空,返回NULL */ + if (pQueueBuffer->front == pQueueBuffer->rear) { + return NULL; + } + /* 返回队头元素,但不出队 */ + return pQueueBuffer->buffer[pQueueBuffer->front]; +} + +/** + * @brief + * 改写PrivRead函数,原有函数只会读取接收缓冲区的当前已有字节,新函数会读取指定字节数再返回 + * @param fd 文件描述符 + * @param buf 数据读取到的位置 + * @param len 读取的指定字节数 + * @return int + * 如果读取到指定字节数返回0;如果到达WATING_RESPONSE_MS仍未读取到指定字节数,或者读数据错误,返回-1 + */ +static int privReadEnoughData(int fd, void* buf, size_t len) { + char* buffer = (char*)buf; // 将接收的存储空间指针强制转型 + int gottenBytes = 0; // 已经读取到的字节数 + int remainTime = WATING_RESPONSE_MS; // 剩余的时间 + + /* 只有接收的字节数不够,并且还有剩余时间,才可以继续读取 */ + while (gottenBytes < len && remainTime > 0) { + int bytes = PrivRead(fd, buffer + gottenBytes, len - gottenBytes); // 从设备读取 + if (bytes > 0) { + gottenBytes += bytes; // 读取到字节 + } else if (bytes < 0) { + printf("Error reading from serial port\n"); + return -1; // 读取错误 + } + PrivTaskDelay(100); // 每100ms读取一次 + remainTime -= 100; // 剩余时间减去100ms + } + /* 若没有剩余时间,表示还没有读取到指定的字节数,返回-1;若有剩余时间,表示已经读取了指定的字节数,返回0 + */ + return remainTime < 0 ? -1 : 0; +} + +static int32_t recvFrameFromEth(struct Adapter* eth_adapter, char* buffer, size_t buffer_len) { + uint32_t payloadLen; + int32_t recvLen; + // assume that received length is of same endianness + recvLen = AdapterDeviceRecv(eth_adapter, &payloadLen, sizeof(payloadLen)); + + if (recvLen != sizeof(payloadLen)) { + printf("Wrong length received: %d\n", recvLen); + return -1; + } + + if (payloadLen > buffer_len) { + printf("Invalid length: %d\n", payloadLen); + return -1; + } + + int ret = privReadEnoughData(eth_adapter->fd, buffer, payloadLen); + + if (ret != 0) { + printf("Receive %d bytes from eth failed\n", payloadLen); + return -1; + } + + return payloadLen; +} + +static int waitAndRecv(struct QueueBuffer* queue, struct Adapter* eth_adapter) { + char receiveBuffer[sizeof(DataFrame) + 1]; // 从服务器接收每帧响应的存储空间 + + int32_t payloadLen = recvFrameFromEth(eth_adapter, receiveBuffer, sizeof(receiveBuffer)); + + if (payloadLen < 0) { + return payloadLen; + } + + struct DataFrame* pDataFrame = (struct DataFrame*)PrivMalloc(sizeof(struct DataFrame)); + memset(pDataFrame, 0, sizeof(struct DataFrame)); + + pDataFrame->id = CLIENT_ID_4G; + pDataFrame->length = payloadLen; + memcpy(pDataFrame->data, receiveBuffer, payloadLen); + + /* 将解析后的数据帧放入循环队列 */ + PrivMutexObtain(&queue->mutex); // 获取互斥锁 + offerBuffer(queue, pDataFrame); // 将数据帧放入队列 + printf("Received data from eth, length: %d\n", payloadLen); // 打印接收到的数据帧ID + PrivMutexAbandon(&queue->mutex); // 释放互斥锁 + PrivSemaphoreAbandon(&queue->full); // 释放信号量,即告知发送数据线程,队列中有新的数据帧 + + return 0; +} + +/** + * @brief 从 ETH 接收数据的线程 + * @param arg 循环队列指针 + * @return void* 目前返回值无意义 + */ +static void* receiveDataFromEthTask(void* arg) { + char serverIpAddress[16] = {}; // 目的IP地址 + char serverPort[6] = {}; // 目的端口号 + struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)arg; // 循环队列指针 + + struct Adapter* adapter = AdapterDeviceFindByName(ADAPTER_ETHERNET_NAME); // 查找以太网模块适配器 + + AdapterDeviceSetUp(adapter); // 启动以太网主任务线程 + AdapterDeviceSetDhcp(adapter, CFG->dhcpSwitch_Ethernet); // 启用或禁用DHCP + + struct DataFrame* pDataFrame = NULL; // 数据帧定义 + + /* 尝试连接到服务器 */ + sprintf( + serverIpAddress, + "%u.%u.%u.%u", + CFG->destinationIpAddress_Ethernet[0], + CFG->destinationIpAddress_Ethernet[1], + CFG->destinationIpAddress_Ethernet[2], + CFG->destinationIpAddress_Ethernet[3] + ); + sprintf(serverPort, "%u", (unsigned short)CFG->destinationPort_Ethernet[0] | CFG->destinationPort_Ethernet[1] << 8); + printf("-*-*-*-*receiveDataFromEthTask*-*-*-*\n"); + printf("serverIpAddress:\t%s\n", serverIpAddress); + printf("serverPort:\t\t%s\n", serverPort); + printf("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*\n"); + + while (1) { + int reconnectCount = RECONNECT_COUNT; // 尝试重新连接服务器最多RECONNECT_COUNT次 + while (reconnectCount > 0) { + int res = AdapterDeviceConnect(adapter, CLIENT, serverIpAddress, serverPort, IPV4); // 尝试连接服务器 + if (res == 0 || res == 0x1D) { + break; + } + reconnectCount--; + } + if (reconnectCount <= 0) { // 若RECONNECT_COUNT次都连接失败,则等待5s再次尝试连接 + printf("Ethernet connect to server failed\n"); // 连接失败,打印错误信息 + PrivTaskDelay(1000 * 5); // 延迟5秒,避免网络拥塞 + continue; + } + + int recvFailureCnt = 0; + + while (recvFailureCnt < MAX_RECV_RETRY) { + int ret = waitAndRecv(pQueueBuffer, adapter); + if (ret < 0) { + recvFailureCnt += 1; + } + } + + printf("Max failure count of receiving from ethernet reached, reconnecting\n"); + + AdapterDeviceDisconnect(adapter, NULL); + PrivTaskDelay(1000 * 5); + } + + return NULL; +} + +static size_t waitAndSend(struct Adapter* adapter, struct QueueBuffer* pQueueBuffer) { + DataFrame* pDataFrame; + size_t res = 0; + char ack[4] = {0}; + + PrivSemaphoreObtainWait(&pQueueBuffer->full, NULL); // 尝试获取循环队列队头元素,如果获取信号量失败,则等待信号量 + + PrivMutexObtain(&pQueueBuffer->mutex); // 获取互斥锁 + pDataFrame = pollBuffer(pQueueBuffer); // 从队列中获取数据帧 + PrivMutexAbandon(&pQueueBuffer->mutex); // 释放互斥锁 + + // 定义数据帧重发次数 + int resendCount = RESEND_COUNT; + // 只有数据帧非空并且还有剩余重发次数,才进行发送 + while (pDataFrame != NULL && resendCount > 0) { + /* 向服务器发送数据 */ + printf("Message id: %d, resend cnt %d\n", pDataFrame->id, resendCount); + + // 发送不会报告错误 + AdapterDeviceSend(adapter, pDataFrame, + sizeof(DataFrame)); // 发送数据,注意当前最多发送256字节 + + memset(ack, 0, sizeof(ack)); + // Recv 的实现使得读取会等待至 `len` 数量的字节可用, 这明显不对劲 <- + // won't fix + res = AdapterDeviceRecv(adapter, ack, 2); + printf("ack: %s, res: %zu\n", ack, res); + + bool seems_like_ok_reply = any_of(ack, 2, 'O') || any_of(ack, 2, 'K'); + + // 服务器收到来自 client (通过 magic header 判断) 的东西之后必须回复 + // ack + if (res != -1 && seems_like_ok_reply) { + // 发送成功, 别发了 + printf("Successfully sent.\n"); + break; + } + + res = -1; + printf("Error sending frame, retry count left: %d\n", resendCount); + + resendCount--; + } + if (pDataFrame != NULL) { + PrivFree(pDataFrame); // 释放数据帧内存 + pDataFrame = NULL; // 避免野指针 + } + // AdapterDeviceDisconnect(adapter, NULL); // 关闭适配器对应的设备 + + // 如果数据帧重发次数超过上限,表示发送失败,丢弃该帧 + if (resendCount <= 0) { + PrivTaskDelay(1000 * 10); // 延迟10秒,避免网络拥塞 + } + + return res; +} + +/** + * @brief 通过4G向服务器发送数据的线程 + * @param arg 循环队列指针 + * @return void* 目前返回值无意义 + */ +static void* sendDataToServerTask_4G(void* arg) { + char serverIpAddress[16] = {}; // 目的IP地址 + char serverPort[6] = {}; // 目的端口号 + struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)arg; // 循环队列指针 + unsigned char receiveBuffer[256]; // 从服务器接收每帧响应的存储空间 + + struct Adapter* adapter = AdapterDeviceFindByName(ADAPTER_4G_NAME); // 查找4G模块适配器 + + printf("!!!!!!!! before open, adapter: %p\n", (void*)adapter); + AdapterDeviceOpen(adapter); // 打开适配器对应的设备(实际打开串口中断) + int baud_rate = BAUD_RATE_115200; // 波特率,用于设置4G模块串口 + printf("!!!!!!!! before devctl\n"); + AdapterDeviceControl(adapter, OPE_INT, &baud_rate); // 对适配器对应设备进行配置(实际配置波特率) + printf("!!!!!!!! after devctl\n"); + + sprintf( + serverIpAddress, + "%u.%u.%u.%u", + CFG->destinationIpAddress_4G[0], + CFG->destinationIpAddress_4G[1], + CFG->destinationIpAddress_4G[2], + CFG->destinationIpAddress_4G[3] + ); + sprintf(serverPort, "%u", (unsigned short)CFG->destinationPort_4G[0] | CFG->destinationPort_4G[1] << 8); + printf("-*-*-*-*sendDataToServerTask_4G*-*-*-*\n"); + printf("serverIpAddress:\t%s\n", serverIpAddress); + printf("serverPort:\t\t%s\n", serverPort); + printf("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*\n"); + + // 工作循环 + while (1) { + // 连接服务器直到连接成功 + int reconnectCount = RECONNECT_COUNT; // 尝试重新连接服务器最多RECONNECT_COUNT次 + while (reconnectCount > 0) { + int res; + + res = AdapterDeviceConnect(adapter, CLIENT, serverIpAddress, serverPort, IPV4); + printf("Successfully connected to server: %s:%s.\n", serverIpAddress, serverPort); + + if (res == 0) { + break; + } + reconnectCount--; + } + if (reconnectCount <= 0) { // 若RECONNECT_COUNT次都连接失败,则等待10s再次尝试连接 + printf("4G connect to server failed\n"); // 连接失败,打印错误信息 + PrivSemaphoreAbandon(&pQueueBuffer->full); // 释放信号量 + PrivTaskDelay(1000 * 10); // 延迟10秒,避免网络拥塞 + continue; + } + + // 等待并发送数据帧 + while (1) { + size_t res; + res = waitAndSend(adapter, pQueueBuffer); + + // 连续发送失败某一帧, 重新连接服务器 + if (res == -1) { + AdapterDeviceDisconnect(adapter, NULL); + PrivTaskDelay(1000 * 5); + break; + } + } + } + + return NULL; +} + +/** + * @brief + * 开启从ADL400接收数据的线程以及上传数据到服务器的线程,此方法在main方法中被调用,开机或复位启动 + */ +void startUpTransformDataTask(void) { + /* 分配循环队列空间 */ + struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)PrivCalloc(1, sizeof(struct QueueBuffer)); + if (initBuffer(pQueueBuffer) < 0) { + PrivFree(pQueueBuffer); + return; + } + + /* 启动从ADL400接收数据的线程 */ + pthread_attr_t receiveDataFromEthTaskAttr; + pthread_args_t receiveDataFromEthTaskArgs; + receiveDataFromEthTaskAttr.schedparam.sched_priority = 16; // 线程优先级 + receiveDataFromEthTaskAttr.stacksize = 2048; // 线程栈大小 + receiveDataFromEthTaskArgs.pthread_name = "receiveDataFromEthTask"; // 线程名字 + receiveDataFromEthTaskArgs.arg = pQueueBuffer; // 线程参数 + pthread_t receiveDataThread; // 线程ID + PrivTaskCreate( + &receiveDataThread, &receiveDataFromEthTaskAttr, receiveDataFromEthTask, &receiveDataFromEthTaskArgs + ); + PrivTaskStartup(&receiveDataThread); + + /* 启动上传数据到服务器的线程 */ + pthread_attr_t sendDataToServerTaskAttr; + pthread_args_t sendDataToServerTaskArgs; + sendDataToServerTaskAttr.schedparam.sched_priority = 16; // 线程优先级 + sendDataToServerTaskAttr.stacksize = 2200; // 线程栈大小 + sendDataToServerTaskArgs.pthread_name = "sendDataToServerTask"; // 线程名字 + sendDataToServerTaskArgs.arg = pQueueBuffer; // 线程参数 + pthread_t sendDataThread; // 线程ID + void* (*start_routine)(void*) = sendDataToServerTask_4G; // 通过4G模块上传到服务器 + PrivTaskCreate( + &sendDataThread, + &sendDataToServerTaskAttr, + start_routine, + &sendDataToServerTaskArgs + ); // 通过4G模块上传到服务器 + PrivTaskStartup(&sendDataThread); +} \ No newline at end of file diff --git a/APP_Framework/Applications/connection_app/4g_app/ch32v208_sensor.c b/APP_Framework/Applications/connection_app/4g_app/ch32v208_sensor.c new file mode 100644 index 000000000..a0b4893a8 --- /dev/null +++ b/APP_Framework/Applications/connection_app/4g_app/ch32v208_sensor.c @@ -0,0 +1,956 @@ +/** + * @file ch32v208_adl400.c + * @brief ch32v208 board gets data from Acrel-SENSOR electricity meter with + * rs485 bus, and then sends it to the server with 4G. + * @author Huo Yujia (huoyujia081@126.com) + * @version 1.0 + * @date 2024-07-10 + */ + +#include +#include +#include +#include + +#define MAX_FRAME_SIZE 128 // 最大帧大小 +#define MAX_BUFFER_SIZE 1024 * 2 // 最大缓冲区大小 +#define RECEIVE_DATA_INTERVAL_MS 1000 * 10 // SENSOR数据采集间隔时间,单位为毫秒 +#define RESEND_COUNT 3 // 最大帧重发次数 +#define RECONNECT_COUNT 5 // 最大连接次数 +#define WATING_RESPONSE_MS 5000 // 等待响应时间,单位为毫秒 +#define SINGLE_DATA_RETRY 3 // 重读某一条 sensor 数据的最大次数 +#define MAGIC_HEADER "XIJIUSS" // 魔法头, 用来在消息协议中快速标识自身 + +// manually configured SENSOR_CLIENT_ID, as modifying `xsconfig.h` would cause +// massive recompilation +#if __has_include("client_id.h") + #include "client_id.h" + + #ifndef SENSOR_CLIENT_ID + #error You should manually define `SENSOR_CLIENT_ID` in client_id.h, as this file overrides config `CLIENT_ID` + #endif + +#else + #define SENSOR_CLIENT_ID CLIENT_ID +#endif + +#define STR_H(x) #x +#define STR(x) STR_H(x) +#pragma message "Current SENSOR_CLIENT_ID: " STR(SENSOR_CLIENT_ID) + +#ifdef BSP_USING_WDT +// see main.c +static const uint8_t RECV_TASK = 0x1; +static const uint8_t SEND_TASK = 0x2; +#endif + +void SensorTaskNotifyAlive(uint8_t task_idx) { +#ifdef BSP_USING_WDT + extern void SetTaskStatus(uint8_t task_idx); + SetTaskStatus(task_idx); +#endif +} + +void RecvAlive() { SensorTaskNotifyAlive(RECV_TASK); } + +void SendAlive() { SensorTaskNotifyAlive(SEND_TASK); } + +static uint8_t channel_O3_data_instruction[] = { + 0x01, 0x03, 0x01, 0x55, 0x00, 0x01, 0x95, 0xE6 +}; + +static uint8_t channel_CO2_data_instruction[] = { + 0x01, 0x03, 0x01, 0xAB, 0x00, 0x01, 0xF4, 0x16 +}; + +static uint8_t channel_H2S_data_instruction[] = { + 0x01, 0x03, 0x02, 0x01, 0x00, 0x01, 0xD4, 0x72 +}; + +static uint8_t channel_CL2_data_instruction[] = { + 0x01, 0x03, 0x02, 0x57, 0x00, 0x01, 0x34, 0x62 +}; + +static uint8_t channel_NH3_data_instruction[] = { + 0x01, 0x03, 0x02, 0xAD, 0x00, 0x01, 0x14, 0x53 +}; + +static uint8_t channel_TVOC_data_instruction[] = { + 0x01, 0x03, 0x03, 0x03, 0x00, 0x01, 0x74, 0x4E +}; + +static uint8_t channel_CH2O_data_instruction[] = { + 0x01, 0x03, 0x03, 0x59, 0x00, 0x01, 0x54, 0x5D +}; + +static uint8_t channel_C2H5OH_data_instruction[] = { + 0x01, 0x03, 0x03, 0xAF, 0x00, 0x01, 0xB4, 0x6F +}; + +static uint8_t channel_CH4_data_instruction[] = { + 0x01, 0x03, 0x04, 0x05, 0x00, 0x01, 0x95, 0x3B +}; + +static uint8_t channel_O2_data_instruction[] = { + 0x01, 0x03, 0x04, 0x5B, 0x00, 0x01, 0xF4, 0xE9 +}; + +static uint8_t channel_AQS_data_instruction[] = { + 0x01, 0x03, 0x04, 0xB1, 0x00, 0x01, 0xD5, 0x1D +}; + +static uint8_t channel_humidity_data_instruction[] = { + 0x01, 0x03, 0x0C, 0x6B, 0x00, 0x01, 0xF6, 0x86 +}; + +static uint8_t channel_temperature_data_instruction[] = { + 0x01, 0x03, 0x0C, 0xC1, 0x00, 0x01, 0xD6, 0xA6 +}; + +static uint8_t channel_PM1_data_instruction[] = { + 0x01, 0x03, 0x0E, 0x6F, 0x00, 0x01, 0xB6, 0xFF +}; + +static uint8_t channel_PM2_5_data_instruction[] = { + 0x01, 0x03, 0x0E, 0x19, 0x00, 0x01, 0x57, 0x25 +}; + +static uint8_t channel_PM10_data_instruction[] = { + 0x01, 0x03, 0x0D, 0xC3, 0x00, 0x01, 0x76, 0x9A +}; + +static uint8_t channel_windspeed_data_instruction[] = { + 0x01, 0x03, 0x0D, 0x6D, 0x00, 0x01, 0x17, 0x7B +}; + +static uint8_t channel_winddirection_data_instruction[] = { + 0x01, 0x03, 0x0D, 0x17, 0x00, 0x01, 0x36, 0xA2 +}; + +static uint8_t channel_pressure_data_instruction[] = { + 0x01, 0x03, 0x0C, 0x15, 0x00, 0x01, 0x96, 0x9E +}; + +static uint8_t channel_voice_data_instruction[] = { + 0x01, 0x03, 0x0B, 0xBF, 0x00, 0x01, 0xB7, 0xCA +}; + +typedef struct SensorFields { + uint32_t o3; + uint32_t co2; + uint32_t h2s; + uint32_t cl2; + uint32_t nh3; + uint32_t tvoc; + uint32_t ch2o; + uint32_t c2h5oh; + uint32_t ch4; + uint32_t o2; + uint32_t aqs; + uint32_t humidity; + uint32_t temperature; + uint32_t pm1d0; + uint32_t pm2d5; + uint32_t pm10; + uint32_t windspeed; + uint32_t winddirection; + uint32_t airpressure; + uint32_t voice; +} SensorFields; + +/** + * @brief 将要上传服务器的数据帧 + */ +typedef struct Message { + // magic header for fast validation + char magic_header[8]; + // id of sensor cluster + uint16_t sensor_id; + uint8_t _padding[2]; + union { + SensorFields fields; + uint32_t data[sizeof(SensorFields) / sizeof(uint32_t)]; + }; +} Message; + +/** + * @brief Modbus RTU响应数据帧的缓存,使用循环队列作为数据结构 + */ +struct QueueBuffer { + Message* buffer + [MAX_BUFFER_SIZE / sizeof(Message)]; // 循环队列存储空间,使用数组存储 + int front; // 循环队列队头 + int rear; // 循环队列队尾 + pthread_mutex_t mutex; // 互斥访问循环队列信号量 + sem_t full; // 循环队列中有效成员个数的信号量 +}; + +#define BUFFER_ELEM_COUNT \ + (MAX_BUFFER_SIZE / sizeof(Message)) // 循环队列中可以容纳的最大成员个数 + +__attribute__((always_inline)) inline bool +any_of(const char* s, size_t len, char c) { + for (size_t i = 0; i < len; ++i) { + if (s[i] == c) { + return true; + } + } + return false; +} + +/** + * @brief 初始化循环队列 + * @param pQueueBuffer 循环队列指针 + * @return * int 0表示初始化成功,其他表示初始化失败 + */ +static int initBuffer(struct QueueBuffer* pQueueBuffer) { + pQueueBuffer->front = 0; + pQueueBuffer->rear = 0; + if (PrivMutexCreate(&pQueueBuffer->mutex, 0) < 0) { + printf("buffer mutex create failed.\n"); + return -1; + } + if (PrivSemaphoreCreate(&pQueueBuffer->full, 0, 0) < 0) { + printf("buffer full semaphore create failed.\n"); + return -1; + } + return 0; +} + +/** + * @brief 循环队列入队,如果循环队列已满,则将最旧的成员出队后,新成员再入队 + * @param pQueueBuffer 循环队列指针 + * @param pMsg SENSOR响应数据帧 + * @return int 0表示入队成功,其他表示入队失败 + */ +static int offerBuffer(struct QueueBuffer* pQueueBuffer, Message* pMsg) { + /* 循环队列已满,将最旧的成员出队 */ + if ((pQueueBuffer->rear + 1) % BUFFER_ELEM_COUNT == pQueueBuffer->front) { + Message* frontDataFrame = pQueueBuffer->buffer[pQueueBuffer->front]; + PrivFree(frontDataFrame); + pQueueBuffer->front = (pQueueBuffer->front + 1) % BUFFER_ELEM_COUNT; + } + /* 新成员入队 */ + pQueueBuffer->buffer[pQueueBuffer->rear] = pMsg; + pQueueBuffer->rear = (pQueueBuffer->rear + 1) % BUFFER_ELEM_COUNT; + printf("front: %d\n", pQueueBuffer->front); + printf("rear: %d\n", pQueueBuffer->rear); + return 0; +} + +/** + * @brief 循环队列出队,如果队列为空则返回NULL + * @param pQueueBuffer 循环队列指针 + * @return struct DataFrame* 出队成员,如果队列为空则返回NULL + */ +static Message* pollBuffer(struct QueueBuffer* pQueueBuffer) { + /* 队列为空,返回NULL */ + if (pQueueBuffer->front == pQueueBuffer->rear) { + return NULL; + } + /* 最旧的成员出队 */ + Message* pFrontDataFrame = pQueueBuffer->buffer[pQueueBuffer->front]; + pQueueBuffer->buffer[pQueueBuffer->front] = NULL; + pQueueBuffer->front = (pQueueBuffer->front + 1) % BUFFER_ELEM_COUNT; + printf("front: %d\n", pQueueBuffer->front); + printf("rear: %d\n", pQueueBuffer->rear); + return pFrontDataFrame; +} + +/** + * @brief 查看队头元素,如果队列为空则返回NULL + * @param pQueueBuffer 循环队列指针 + * @return struct DataFrame* 队头元素,如果队列为空则返回NULL + */ +static Message* peekBuffer(struct QueueBuffer* pQueueBuffer) { + /* 如果队列为空,返回NULL */ + if (pQueueBuffer->front == pQueueBuffer->rear) { + return NULL; + } + /* 返回队头元素,但不出队 */ + return pQueueBuffer->buffer[pQueueBuffer->front]; +} + +/** + * @brief + * 改写PrivRead函数,原有函数只会读取接收缓冲区的当前已有字节,新函数会读取指定字节数再返回 + * @param fd 文件描述符 + * @param buf 数据读取到的位置 + * @param len 读取的指定字节数 + * @return int + * 如果读取到指定字节数返回0;如果到达WATING_RESPONSE_MS仍未读取到指定字节数,或者读数据错误,返回-1 + */ +static int privReadEnoughData(int fd, int32_t* sensor_data, size_t len) { + uint8_t buffer[8]; // 将接收的存储空间指针强制转型 + int gottenBytes = 0; // 已经读取到的字节数 + int remainTime = WATING_RESPONSE_MS; // 剩余的时间 + + /* 只有接收的字节数不够,并且还有剩余时间,才可以继续读取 */ + while (gottenBytes < len && remainTime > 0) { + int bytes = + PrivRead(fd, buffer + gottenBytes, len - gottenBytes); // 从设备读取 + if ((0x01 != buffer[0]) || (0x03 != buffer[1])) { + printf("wrong head 0x%x 0x%x\n", buffer[0], buffer[1]); + gottenBytes = 0; + return -1; + } + + gottenBytes += bytes; + PrivTaskDelay(100); // 每100ms读取一次 + remainTime -= 100; // 剩余时间减去100ms + } + printf("0x%X ", buffer[3]); + printf("0x%X\n", buffer[4]); + *sensor_data = ((buffer[3] << 8) & 0xFF00) | (buffer[4] & 0xFF); + /* 若没有剩余时间,表示还没有读取到指定的字节数,返回-1;若有剩余时间,表示已经读取了指定的字节数,返回0 + */ + printf("0x%08X\n", *sensor_data); + return remainTime < 0 ? -1 : 0; +} + +__attribute__((always_inline)) inline static int sensorReadSingleData( + uint8_t* buffer, + int fd, + const uint8_t* instr, + size_t instr_len, + const char* name +) { + int32_t sensor_data = 0; + + printf("########SENSOR_QUANTITY_%s########\n", name); + PrivWrite(fd, instr, instr_len); + PrivTaskDelay(20); + if (privReadEnoughData(fd, &sensor_data, 7)) { + // 读取超时,打印错误信息 + printf("read data from sensor %s error\n", name); + return -1; // 读取失败,退出循环 + } + + /* drain the rs485 device buffer before requesting new data */ + char dummy[8]; + while (PrivRead(fd, dummy, sizeof(dummy))) + ; + PrivTaskDelay(30); + + memcpy(buffer, &sensor_data, sizeof(sensor_data)); + + return 0; +} + +int sensorIntegrationReadData(int fd, uint8_t* read_data, size_t* data_len) { + size_t current_len = 0; + +#define name2instr(name) channel_##name##_data_instruction +#define try_fetch_single(rest_retry, name) \ + do { /* 'read_single */ \ + uint8_t* cpos = read_data + current_len; \ + int ret = sensorReadSingleData( \ + cpos, fd, name2instr(name), sizeof(name2instr(name)), #name \ + ); \ + if (ret != 0) { \ + if (rest_retry > 0) { \ + /* retry this data on fail */ \ + rest_retry--; \ + PrivTaskDelay(100); \ + continue /* 'read_single */; \ + } else { \ + printf("Failed to retrieve %s, max attemp reached\n", #name); \ + return -1; \ + } \ + } /* fall through when succeed */ \ + current_len += sizeof(int32_t); \ + break; \ + } while (1) +#define fetch_single(name) \ + do { \ + int rest_retry = SINGLE_DATA_RETRY; \ + try_fetch_single(rest_retry, name); \ + } while (0) + +#define x_sensors \ + X(O3); \ + X(CO2); \ + X(H2S); \ + X(CL2); \ + X(NH3); \ + X(TVOC); \ + X(CH2O); \ + X(C2H5OH); \ + X(CH4); \ + X(O2); \ + X(AQS); \ + X(humidity); \ + X(temperature); \ + X(PM1); \ + X(PM2_5); \ + X(PM10); \ + X(windspeed); \ + X(winddirection); \ + X(pressure); \ + X(voice); + +#define X fetch_single + x_sensors; +#undef X + + *data_len = current_len; + +#undef x_sensors +#undef fetch_single +#undef try_fetch_single +#undef name2instr + + return 0; +} + +/** + * @brief 从sensor接收数据的线程 + * @param arg 循环队列指针 + * @return void* 目前返回值无意义 + */ +static void* receiveDataFromSENSORTask(void* arg) { + RecvAlive(); + + struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)arg; // 循环队列指针 + int fd = PrivOpen("/dev/rs485_dev1", O_RDWR); // 打开设备文件 + if (fd < 0) { // 打开设备文件失败,打印错误信息 + printf("open rs485 fd error: %d\n", fd); + return NULL; + } + + struct SerialDataCfg rs485Configuration; + memset(&rs485Configuration, 0, sizeof(struct SerialDataCfg)); + /* 读取RS485配置信息 */ + PrivMutexObtain(&romConfigurationMutex + ); // 若其他线程正在读取或者写入CFG,则阻塞等待 + int baudRatesOption = CFG->baudRate_Rs485; + int dataBitsOption = CFG->dataBits_Rs485; + int stopBitsOption = CFG->stopBits_Rs485; + int parityOption = CFG->parity_Rs485; + PrivMutexAbandon(&romConfigurationMutex); // 释放互斥锁 + switch (baudRatesOption) { + case 1: + rs485Configuration.serial_baud_rate = BAUD_RATE_2400; + break; + case 2: + rs485Configuration.serial_baud_rate = BAUD_RATE_4800; + break; + case 3: + rs485Configuration.serial_baud_rate = BAUD_RATE_9600; + break; + case 4: + rs485Configuration.serial_baud_rate = BAUD_RATE_19200; + break; + case 5: + rs485Configuration.serial_baud_rate = BAUD_RATE_38400; + break; + case 6: + rs485Configuration.serial_baud_rate = BAUD_RATE_57600; + break; + case 7: + rs485Configuration.serial_baud_rate = BAUD_RATE_115200; + break; + case 8: + rs485Configuration.serial_baud_rate = BAUD_RATE_230400; + break; + default: + rs485Configuration.serial_baud_rate = BAUD_RATE_9600; + break; + } + switch (dataBitsOption) { + case 1: + rs485Configuration.serial_data_bits = DATA_BITS_8; + break; + case 2: + rs485Configuration.serial_data_bits = DATA_BITS_9; + break; + default: + rs485Configuration.serial_data_bits = DATA_BITS_8; + break; + } + switch (stopBitsOption) { + case 1: + rs485Configuration.serial_stop_bits = STOP_BITS_1; + break; + case 2: + rs485Configuration.serial_stop_bits = STOP_BITS_2; + break; + default: + rs485Configuration.serial_stop_bits = STOP_BITS_1; + break; + } + switch (parityOption) { + case 1: + rs485Configuration.serial_parity_mode = PARITY_NONE; + break; + case 2: + rs485Configuration.serial_parity_mode = PARITY_ODD; + break; + case 3: + rs485Configuration.serial_parity_mode = PARITY_EVEN; + break; + } + struct PrivIoctlCfg ioctl_cfg; + ioctl_cfg.ioctl_driver_type = SERIAL_TYPE; + ioctl_cfg.args = (void*)&rs485Configuration; + if (0 != PrivIoctl(fd, OPE_INT, &ioctl_cfg)) { + printf("ioctl uart fd error %d\n", fd); + PrivClose(fd); + return NULL; + } + + Message* msg = NULL; + while (1) { + if (msg == NULL) { + msg = (Message*)PrivMalloc(sizeof(Message)); + memset(msg, 0, sizeof(*msg)); + memcpy(&msg->magic_header, MAGIC_HEADER, sizeof(MAGIC_HEADER)); + msg->sensor_id = SENSOR_CLIENT_ID; + } + size_t data_len = 0; + + if (sensorIntegrationReadData(fd, (uint8_t*)msg->data, &data_len)) { + // at least it tried, and was not stuck on anything + RecvAlive(); + PrivTaskDelay(100); + continue; + } + // 测试打印数据 + for (int i = 0; i < 20; ++i) { + printf("0x%08X ", msg->data[i]); + } + printf("\n"); + + // keep alive if actual data is obtained + RecvAlive(); + /* 将解析后的数据帧放入循环队列 */ + PrivMutexObtain(&pQueueBuffer->mutex); // 获取互斥锁 + offerBuffer(pQueueBuffer, msg); // 将数据帧放入队列 + msg = NULL; // 在下个循环时创建新的数据帧 + // printf("receive data from SENSOR, id: %s\n", pDataFrame->id); // + // 打印接收到的数据帧ID + PrivMutexAbandon(&pQueueBuffer->mutex); // 释放互斥锁 + PrivSemaphoreAbandon(&pQueueBuffer->full + ); // 释放信号量,即告知发送数据线程,队列中有新的数据帧 + + PrivTaskDelay(RECEIVE_DATA_INTERVAL_MS); // 延迟一段时间再读取下一帧数据 + + // keep alive for this round of reading + RecvAlive(); + } + + PrivClose(fd); // 关闭设备文件 + return NULL; +} + +static size_t +waitAndSend(struct Adapter* adapter, struct QueueBuffer* pQueueBuffer) { + Message* pDataFrame; + size_t res = 0; + char ack[4] = {0}; + + PrivSemaphoreObtainWait( + &pQueueBuffer->full, NULL + ); // 尝试获取循环队列队头元素,如果获取信号量失败,则等待信号量 + + PrivMutexObtain(&pQueueBuffer->mutex); // 获取互斥锁 + pDataFrame = pollBuffer(pQueueBuffer); // 从队列中获取数据帧 + PrivMutexAbandon(&pQueueBuffer->mutex); // 释放互斥锁 + + // 定义数据帧重发次数 + int resendCount = RESEND_COUNT; + // 只有数据帧非空并且还有剩余重发次数,才进行发送 + while (pDataFrame != NULL && resendCount > 0) { + /* 向服务器发送数据 */ + printf( + "Message id: %d, resend cnt %d\n", + pDataFrame->sensor_id, + resendCount + ); + if (CFG->mqttSwitch_4G == 1) { // MQTT模式下,无需服务器响应数据 + AdapterDeviceMqttSend( + adapter, + (char*)CFG->mqttTopic_4G, + pDataFrame, + sizeof(Message) + ); // 发送数据,注意当前最多发送256字节 + break; + } else { + // 发送不会报告错误 + AdapterDeviceSend( + adapter, + pDataFrame, + sizeof(Message) + ); // 发送数据,注意当前最多发送256字节 + + memset(ack, 0, sizeof(ack)); + // Recv 的实现使得读取会等待至 `len` 数量的字节可用, 这明显不对劲 <- + // won't fix + res = AdapterDeviceRecv(adapter, ack, 2); + printf("ack: %s, res: %zu\n", ack, res); + + bool seems_like_ok_reply = + any_of(ack, 2, 'O') || any_of(ack, 2, 'K'); + + // // 只要服务器回了东西就算发送成功 + // // 端侧不感知是否发送成功, 看最后数据入库结果在前端做通知 + // 服务器收到来自 client (通过 magic header 判断) 的东西之后必须回复 + // ack + if (res != -1 && seems_like_ok_reply) { + // 发送成功, 别发了 + printf("Successfully sent.\n"); + break; + } + + res = -1; + printf("Error sending frame, retry count left: %d\n", resendCount); + + /* 从服务器接收响应,约定服务器接收完数据帧后,返回数据帧中的前12个字节,即数据帧id + */ + /* 多读取2字节,是为了防止前面还有命令模式返回的剩余的\r\n影响判断 + */ + // memset(receiveBuffer, 0, sizeof(receiveBuffer)); + // int receiveLength = AdapterDeviceRecv(adapter, receiveBuffer, + // strlen(pDataFrame->id) + 2); if (receiveLength == + // strlen(pDataFrame->id) + 2 || receiveLength == + // strlen(pDataFrame->id)) { + // // 打印服务器响应 + // printf("receiveLength: %d\n", receiveLength); + // printf("receiveBuffer: "); + // for (int i = 0; i < receiveLength; i++) { + // printf("%c", receiveBuffer[i]); + // } + // printf("\n"); + // //比较服务器响应的内容与发送的数据帧id是否一致 + // if (strstr(receiveBuffer, pDataFrame->id) != NULL) { + // break; // 接收成功,退出循环 + // } + // } else { + // printf("receiveLength: %d\n", receiveLength); + // printf("receiveBuffer: "); + // for (int i = 0; i < receiveLength; i++) { + // printf("%d ", receiveBuffer[i]); + // } + // printf("\n"); + // } + } + resendCount--; + } + if (pDataFrame != NULL) { + PrivFree(pDataFrame); // 释放数据帧内存 + pDataFrame = NULL; // 避免野指针 + } + // AdapterDeviceDisconnect(adapter, NULL); // 关闭适配器对应的设备 + + // 如果数据帧重发次数超过上限,表示发送失败,丢弃该帧 + if (resendCount <= 0) { + PrivTaskDelay(1000 * 10); // 延迟10秒,避免网络拥塞 + } + + return res; +} + +/** + * @brief 通过4G向服务器发送数据的线程 + * @param arg 循环队列指针 + * @return void* 目前返回值无意义 + */ +static void* sendDataToServerTask_4G(void* arg) { + SendAlive(); + + char serverIpAddress[16] = {}; // 目的IP地址 + char serverPort[6] = {}; // 目的端口号 + struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)arg; // 循环队列指针 + + struct Adapter* adapter = + AdapterDeviceFindByName(ADAPTER_4G_NAME); // 查找4G模块适配器 + + printf("!!!!!!!! before open, adapter: %p\n", (void*)adapter); + AdapterDeviceOpen(adapter); // 打开适配器对应的设备(实际打开串口中断) + int baud_rate = BAUD_RATE_115200; // 波特率,用于设置4G模块串口 + printf("!!!!!!!! before devctl\n"); + AdapterDeviceControl( + adapter, OPE_INT, &baud_rate + ); // 对适配器对应设备进行配置(实际配置波特率) + printf("!!!!!!!! after devctl\n"); + + sprintf( + serverIpAddress, + "%u.%u.%u.%u", + CFG->destinationIpAddress_4G[0], + CFG->destinationIpAddress_4G[1], + CFG->destinationIpAddress_4G[2], + CFG->destinationIpAddress_4G[3] + ); + sprintf( + serverPort, + "%u", + (unsigned short)CFG->destinationPort_4G[0] | CFG->destinationPort_4G[1] + << 8 + ); + printf("-*-*-*-*sendDataToServerTask_4G*-*-*-*\n"); + printf("serverIpAddress:\t%s\n", serverIpAddress); + printf("serverPort:\t\t%s\n", serverPort); + printf("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*\n"); + + // 工作循环 + while (1) { + // 连接服务器直到连接成功 + int reconnectCount = + RECONNECT_COUNT; // 尝试重新连接服务器最多RECONNECT_COUNT次 + while (reconnectCount > 0) { + int res; + + res = AdapterDeviceConnect( + adapter, CLIENT, serverIpAddress, serverPort, IPV4 + ); + printf( + "Successfully connected to server: %s:%s.\n", + serverIpAddress, + serverPort + ); + + if (res == 0) { + break; + } + reconnectCount--; + } + if (reconnectCount <= + 0) { // 若RECONNECT_COUNT次都连接失败,则等待10s再次尝试连接 + printf("4G connect to server failed\n"); // 连接失败,打印错误信息 + PrivSemaphoreAbandon(&pQueueBuffer->full); // 释放信号量 + PrivTaskDelay(1000 * 10); // 延迟10秒,避免网络拥塞 + continue; + } + + // successfully connected + SendAlive(); + + // 等待并发送数据帧 + while (1) { + size_t res; + res = waitAndSend(adapter, pQueueBuffer); + + // 连续发送失败某一帧, 重新连接服务器 + if (res == -1) { + AdapterDeviceDisconnect(adapter, NULL); + PrivTaskDelay(1000 * 5); + break; + } else { + SendAlive(); + } + } + } + + return NULL; +} + +/** + * @brief 通过以太网向服务器发送数据的线程 + * @param arg 循环队列指针 + * @return void* 目前返回值无意义 + */ +static void* sendDataToServerTask_Ethernet(void* arg) { + char serverIpAddress[16] = {}; // 目的IP地址 + char serverPort[6] = {}; // 目的端口号 + struct QueueBuffer* pQueueBuffer = (struct QueueBuffer*)arg; // 循环队列指针 + unsigned char receiveBuffer[256]; // 从服务器接收每帧响应的存储空间 + + struct Adapter* adapter = + AdapterDeviceFindByName(ADAPTER_ETHERNET_NAME); // 查找以太网模块适配器 + +#ifndef BSP_BLE_CONFIG // 如果没有使能蓝牙配置功能 + AdapterDeviceSetUp(adapter); // 启动以太网主任务线程 + AdapterDeviceSetDhcp(adapter, CFG->dhcpSwitch_Ethernet); // 启用或禁用DHCP +#endif + + Message* pDataFrame = NULL; // 数据帧定义 + while (1) { + PrivSemaphoreObtainWait( + &pQueueBuffer->full, NULL + ); // 尝试获取循环队列队头元素,如果获取信号量失败,则等待信号量 +#ifdef BSP_BLE_CONFIG // 使能蓝牙配置功能 + /* 获取互斥锁 */ + PrivMutexObtain(&adapter->lock + ); // 若其他线程正在使用adapter,则阻塞等待 + PrivMutexObtain(&romConfigurationMutex + ); // 若其他线程正在读取或者写入CFG,则阻塞等待; + + /* 尝试连接服务器 */ + sprintf( + serverIpAddress, + "%u.%u.%u.%u", + CFG->destinationIpAddress_Ethernet[0], + CFG->destinationIpAddress_Ethernet[1], + CFG->destinationIpAddress_Ethernet[2], + CFG->destinationIpAddress_Ethernet[3] + ); + sprintf( + serverPort, + "%u", + (unsigned short)CFG->destinationPort_Ethernet[0] | + CFG->destinationPort_Ethernet[1] << 8 + ); + printf("-*-*-*-*sendDataToServerTask_Ethernet*-*-*-*\n"); + printf("serverIpAddress:\t%s\n", serverIpAddress); + printf("serverPort:\t\t%s\n", serverPort); + printf("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*\n"); + int res = AdapterDeviceConnect( + adapter, CLIENT, serverIpAddress, serverPort, IPV4 + ); + + /* 连接失败,则等待10s再次尝试连接 */ + if (res != 0 && res != 0x1D) { + PrivSemaphoreAbandon(&pQueueBuffer->full); // 释放信号量 + /* 释放互斥锁 */ + PrivMutexAbandon(&romConfigurationMutex); + PrivMutexAbandon(&adapter->lock); + printf("Ethernet connect to server failed\n" + ); // 连接失败,打印错误信息 + PrivTaskDelay(1000 * 10); // 延迟10秒,避免网络拥塞 + continue; + } +#else + /* 尝试连接到服务器 */ + sprintf( + serverIpAddress, + "%u.%u.%u.%u", + CFG->destinationIpAddress_Ethernet[0], + CFG->destinationIpAddress_Ethernet[1], + CFG->destinationIpAddress_Ethernet[2], + CFG->destinationIpAddress_Ethernet[3] + ); + sprintf( + serverPort, + "%u", + (unsigned short)CFG->destinationPort_Ethernet[0] | + CFG->destinationPort_Ethernet[1] << 8 + ); + printf("-*-*-*-*sendDataToServerTask_Ethernet*-*-*-*\n"); + printf("serverIpAddress:\t%s\n", serverIpAddress); + printf("serverPort:\t\t%s\n", serverPort); + printf("-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*-*\n"); + int reconnectCount = + RECONNECT_COUNT; // 尝试重新连接服务器最多RECONNECT_COUNT次 + while (reconnectCount > 0) { + int res = AdapterDeviceConnect( + adapter, CLIENT, serverIpAddress, serverPort, IPV4 + ); // 尝试连接服务器 + if (res == 0 || res == 0x1D) { + break; + } + reconnectCount--; + } + if (reconnectCount <= + 0) { // 若RECONNECT_COUNT次都连接失败,则等待10s再次尝试连接 + PrivSemaphoreAbandon(&pQueueBuffer->full); // 释放信号量 + printf("Ethernet connect to server failed\n" + ); // 连接失败,打印错误信息 + PrivTaskDelay(1000 * 10); // 延迟10秒,避免网络拥塞 + continue; + } +#endif + PrivMutexObtain(&pQueueBuffer->mutex); // 获取互斥锁 + pDataFrame = pollBuffer(pQueueBuffer); // 从队列中获取数据帧 + PrivMutexAbandon(&pQueueBuffer->mutex); // 释放互斥锁 + + int resendCount = RESEND_COUNT; // 定义数据帧重发次数 + + /* 只有数据帧非空并且还有剩余重发次数,才进行发送 */ + while (pDataFrame != NULL && resendCount > 0) { + /* 向服务器发送数据 */ + // printf("send data to server, id: %s\n", pDataFrame->id); + printf("pDataFrame id: %d\n", pDataFrame->sensor_id); + AdapterDeviceSend( + adapter, + pDataFrame, + sizeof(*pDataFrame) + ); // 发送数据,注意当前最多发送256字节 + + /* 从服务器接收响应,约定服务器接收完数据帧后,返回数据帧中的前12个字节,即数据帧id + */ + memset(receiveBuffer, 0, sizeof(receiveBuffer)); + PrivTaskDelay(6000); + /* if (AdapterDeviceRecv(adapter, receiveBuffer, + strlen(pDataFrame->id)) == strlen(pDataFrame->id)) { + + printf("receiveBuffer: "); + for (int i = 0; i < strlen(receiveBuffer); i++) { + printf("%c", receiveBuffer[i]); + } + printf("\n"); + + if (strstr(pDataFrame->id, receiveBuffer) != NULL) { + break; // 接收成功,退出循环 + } + } */ + resendCount--; + } + if (pDataFrame != NULL) { + PrivFree(pDataFrame); // 释放数据帧内存 + pDataFrame = NULL; // 避免野指针 + } + AdapterDeviceDisconnect(adapter, NULL); +#ifdef BSP_BLE_CONFIG + /* 释放互斥锁 */ + PrivMutexAbandon(&romConfigurationMutex); + PrivMutexAbandon(&adapter->lock); +#endif + if (resendCount <= + 0) { // 如果数据帧重发次数超过上限,表示发送失败,丢弃该帧 + PrivTaskDelay(1000 * 10); // 延迟10秒,避免网络拥塞 + } + } + return NULL; +} + +/** + * @brief + * 开启从SENSOR接收数据的线程以及上传数据到服务器的线程,此方法在main方法中被调用,开机或复位启动 + */ +void startUpTransformDataTask(void) { + /* 分配循环队列空间 */ + struct QueueBuffer* pQueueBuffer = + (struct QueueBuffer*)PrivCalloc(1, sizeof(struct QueueBuffer)); + if (initBuffer(pQueueBuffer) < 0) { + PrivFree(pQueueBuffer); + return; + } + + /* 启动从SENSOR接收数据的线程 */ + pthread_attr_t receiveDataFromSENSORTaskAttr; + pthread_args_t receiveDataFromSENSORTaskArgs; + receiveDataFromSENSORTaskAttr.schedparam.sched_priority = 16; // 线程优先级 + receiveDataFromSENSORTaskAttr.stacksize = 2048; // 线程栈大小 + receiveDataFromSENSORTaskArgs.pthread_name = + "receiveDataFromSENSORTask"; // 线程名字 + receiveDataFromSENSORTaskArgs.arg = pQueueBuffer; // 线程参数 + pthread_t receiveDataThread; // 线程ID + PrivTaskCreate( + &receiveDataThread, + &receiveDataFromSENSORTaskAttr, + receiveDataFromSENSORTask, + &receiveDataFromSENSORTaskArgs + ); + PrivTaskStartup(&receiveDataThread); + + /* 启动上传数据到服务器的线程 */ + pthread_attr_t sendDataToServerTaskAttr; + pthread_args_t sendDataToServerTaskArgs; + sendDataToServerTaskAttr.schedparam.sched_priority = 16; // 线程优先级 + sendDataToServerTaskAttr.stacksize = 2200; // 线程栈大小 + sendDataToServerTaskArgs.pthread_name = "sendDataToServerTask"; // 线程名字 + sendDataToServerTaskArgs.arg = pQueueBuffer; // 线程参数 + pthread_t sendDataThread; // 线程ID + void* (*start_routine)(void*) = + sendDataToServerTask_4G; // 通过4G模块上传到服务器 + // void *(*start_routine)(void *) = sendDataToServerTask_Ethernet; // + // 通过以太网模块上传到服务器 + PrivTaskCreate( + &sendDataThread, + &sendDataToServerTaskAttr, + start_routine, + &sendDataToServerTaskArgs + ); // 通过4G模块上传到服务器 + PrivTaskStartup(&sendDataThread); +} diff --git a/APP_Framework/Applications/connection_app/Kconfig b/APP_Framework/Applications/connection_app/Kconfig index 46018c7b5..588c66543 100755 --- a/APP_Framework/Applications/connection_app/Kconfig +++ b/APP_Framework/Applications/connection_app/Kconfig @@ -6,8 +6,17 @@ menu "connection app" if APPLICATION_CONNECTION menuconfig SOCKET_DEMO - bool "Config test socket demo" - default n + bool "Config test socket demo" + default n + + menuconfig USE_4G_APP + bool "Enable 4G apps" + default n + + if USE_4G_APP + source "$APP_DIR/Applications/connection_app/4g_app/Kconfig" + endif + endif endmenu diff --git a/APP_Framework/Applications/connection_app/Makefile b/APP_Framework/Applications/connection_app/Makefile index 11c22f202..19bc86e3c 100755 --- a/APP_Framework/Applications/connection_app/Makefile +++ b/APP_Framework/Applications/connection_app/Makefile @@ -7,11 +7,11 @@ endif ifeq ($(CONFIG_ADD_XIZI_FEATURES),y) - ifeq ($(CONFIG_CONNECTION_ADAPTER_4G),y) + ifeq ($(CONFIG_USE_4G_APP),y) SRC_DIR += 4g_app endif - ifeq ($(CONFIG_RESOURCES_LWIP),y) + ifeq ($(CONFIG_SOCKET_DEMO),y) SRC_DIR += socket_demo endif diff --git a/APP_Framework/Applications/main.c b/APP_Framework/Applications/main.c index 8ab957614..163668601 100644 --- a/APP_Framework/Applications/main.c +++ b/APP_Framework/Applications/main.c @@ -11,10 +11,11 @@ */ #include -#include #include #include +#include + // #include #include @@ -25,18 +26,14 @@ static int watchdog_fd; static pthread_t watch_dog_task; static pthread_mutex_t task_status_lock; -// receiveDataFromSENSORTask(0x01)、sendDataToServerTask(0x02) - #define TASK_NUM 0x03 +// receiveDataFromEthTask(0x01)、sendDataToServerTask(0x02) +#define TASK_NUM 0x03 -void SetTaskStatus(uint8_t task_idx) { - atomic_fetch_or(&task_status, task_idx); -} +void SetTaskStatus(uint8_t task_idx) { atomic_fetch_or(&task_status, task_idx); } -static void ClearTaskStatus() { - atomic_store(&task_status, 0); -} +static void ClearTaskStatus() { atomic_store(&task_status, 0); } -static void* FeedWatchdogTask(void* parameter) { +static void *FeedWatchdogTask(void *parameter) { uint8_t status = 0; while (1) { @@ -62,7 +59,7 @@ int WatchdogInit(uint16_t timeout) { int ret = 0; PrivMutexCreate(&task_status_lock, 0); - watchdog_fd = PrivOpen("/dev/wdt0_dev", O_RDWR); + watchdog_fd = PrivOpen("/dev/" WDT_0_DEVICE_NAME_0, O_RDWR); /* set watchdog timeout time */ struct PrivIoctlCfg ioctl_cfg; diff --git a/APP_Framework/Framework/connection/lora/e220/Makefile b/APP_Framework/Framework/connection/lora/e220/Makefile index 0b158cdf9..2a0ba3a75 100644 --- a/APP_Framework/Framework/connection/lora/e220/Makefile +++ b/APP_Framework/Framework/connection/lora/e220/Makefile @@ -10,4 +10,4 @@ ifeq ($(CONFIG_ADD_XIZI_FEATURES),y) SRC_FILES := e220.c include $(KERNEL_ROOT)/compiler.mk -endif \ No newline at end of file +endif