modify for mqtt start and cleanup
This commit is contained in:
parent
3671647cd5
commit
81530a9b04
|
@ -30,86 +30,90 @@
|
||||||
#include "tsocket.h"
|
#include "tsocket.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "mqttSystem.h"
|
#include "mqttSystem.h"
|
||||||
struct mqtt_client client;
|
struct mqtt_client mqttClient = {0};
|
||||||
pthread_t client_daemon;
|
pthread_t clientDaemonThread = {0};
|
||||||
void* mqtt_conn;
|
void* mqttConnect=NULL;
|
||||||
struct reconnect_state_t recnt_status;
|
struct reconnect_state_t recntStatus = {0};
|
||||||
char* topicPath;
|
char* topicPath=NULL;
|
||||||
int isStop = 1;
|
int mttIsRuning = 1;
|
||||||
|
|
||||||
int32_t mqttInitSystem() {
|
int32_t mqttInitSystem() {
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
|
uint8_t sendbuf[2048];
|
||||||
|
uint8_t recvbuf[1024];
|
||||||
|
recntStatus.sendbuf = sendbuf;
|
||||||
|
recntStatus.sendbufsz = sizeof(sendbuf);
|
||||||
|
recntStatus.recvbuf = recvbuf;
|
||||||
|
recntStatus.recvbufsz = sizeof(recvbuf);
|
||||||
|
char* url = tsMqttBrokerAddress;
|
||||||
|
recntStatus.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL;
|
||||||
|
recntStatus.password = strstr(url, "@") != NULL ? strbetween(strstr(url, recntStatus.user_name), ":", "@") : NULL;
|
||||||
|
|
||||||
|
if (strstr(url, "@") != NULL) {
|
||||||
|
recntStatus.hostname = strbetween(url, "@", ":");
|
||||||
|
} else if (strstr(strstr(url, "://") + 3, ":") != NULL) {
|
||||||
|
recntStatus.hostname = strbetween(url, "//", ":");
|
||||||
|
|
||||||
|
} else {
|
||||||
|
recntStatus.hostname = strbetween(url, "//", "/");
|
||||||
|
}
|
||||||
|
|
||||||
|
char* _begin_hostname = strstr(url, recntStatus.hostname);
|
||||||
|
if (strstr(_begin_hostname, ":") != NULL) {
|
||||||
|
recntStatus.port = strbetween(_begin_hostname, ":", "/");
|
||||||
|
} else {
|
||||||
|
recntStatus.port = strbetween("'1883'", "'", "'");
|
||||||
|
}
|
||||||
|
|
||||||
|
topicPath = strbetween(strstr(url, strstr(_begin_hostname, ":") != NULL ? recntStatus.port : recntStatus.hostname),
|
||||||
|
"/", "/");
|
||||||
|
char* _topic = "+/+/+/";
|
||||||
|
int _tpsize = strlen(topicPath) + strlen(_topic) + 1;
|
||||||
|
recntStatus.topic = calloc(1, _tpsize);
|
||||||
|
sprintf(recntStatus.topic, "/%s/%s", topicPath, _topic);
|
||||||
|
recntStatus.client_id = strlen(tsMqttBrokerClientId) < 3 ? tsMqttBrokerClientId : "taos_mqtt";
|
||||||
|
mqttConnect = NULL;
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mqttStartSystem() {
|
int32_t mqttStartSystem() {
|
||||||
int rc = 0;
|
int rc = 0;
|
||||||
uint8_t sendbuf[2048];
|
if (recntStatus.user_name != NULL && recntStatus.password != NULL) {
|
||||||
uint8_t recvbuf[1024];
|
mqttPrint("connecting to mqtt://%s:%s@%s:%s/%s/", recntStatus.user_name, recntStatus.password,
|
||||||
recnt_status.sendbuf = sendbuf;
|
recntStatus.hostname, recntStatus.port, topicPath);
|
||||||
recnt_status.sendbufsz = sizeof(sendbuf);
|
}
|
||||||
recnt_status.recvbuf = recvbuf;
|
else if (recntStatus.user_name != NULL && recntStatus.password == NULL)
|
||||||
recnt_status.recvbufsz = sizeof(recvbuf);
|
{
|
||||||
char* url = tsMqttBrokerAddress;
|
mqttPrint("connecting to mqtt://%s@%s:%s/%s/", recntStatus.user_name,recntStatus.hostname, recntStatus.port, topicPath);
|
||||||
recnt_status.user_name = strstr(url, "@") != NULL ? strbetween(url, "//", ":") : NULL;
|
|
||||||
recnt_status.password = strstr(url, "@") != NULL ? strbetween(strstr(url, recnt_status.user_name), ":", "@") : NULL;
|
|
||||||
|
|
||||||
if (strstr(url, "@") != NULL) {
|
|
||||||
recnt_status.hostname = strbetween(url, "@", ":");
|
|
||||||
} else if (strstr(strstr(url, "://") + 3, ":") != NULL) {
|
|
||||||
recnt_status.hostname = strbetween(url, "//", ":");
|
|
||||||
|
|
||||||
} else {
|
|
||||||
recnt_status.hostname = strbetween(url, "//", "/");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char* _begin_hostname = strstr(url, recnt_status.hostname);
|
mqtt_init_reconnect(&mqttClient, mqttReconnectClient, &recntStatus, mqtt_PublishCallback);
|
||||||
if (strstr(_begin_hostname, ":") != NULL) {
|
if (pthread_create(&clientDaemonThread, NULL, mqttClientRefresher, &mqttClient)) {
|
||||||
recnt_status.port = strbetween(_begin_hostname, ":", "/");
|
|
||||||
} else {
|
|
||||||
recnt_status.port = strbetween("'1883'", "'", "'");
|
|
||||||
}
|
|
||||||
|
|
||||||
topicPath = strbetween(strstr(url, strstr(_begin_hostname, ":") != NULL ? recnt_status.port : recnt_status.hostname),
|
|
||||||
"/", "/");
|
|
||||||
char* _topic = "+/+/+/";
|
|
||||||
int _tpsize = strlen(topicPath) + strlen(_topic) + 1;
|
|
||||||
recnt_status.topic = calloc(1, _tpsize);
|
|
||||||
sprintf(recnt_status.topic, "/%s/%s", topicPath, _topic);
|
|
||||||
recnt_status.client_id = strlen(tsMqttBrokerClientId) < 3 ? tsMqttBrokerClientId : "taos_mqtt";
|
|
||||||
|
|
||||||
taos_init();
|
|
||||||
mqttPrint("mqttInitSystem mqtt://%s:%s@%s:%s/%s/", recnt_status.user_name, recnt_status.password,
|
|
||||||
recnt_status.hostname, recnt_status.port, topicPath);
|
|
||||||
mqtt_conn = NULL;
|
|
||||||
mqtt_init_reconnect(&client, mqttReconnectClient, &recnt_status, mqtt_PublishCallback);
|
|
||||||
if (pthread_create(&client_daemon, NULL, mqttClientRefresher, &client)) {
|
|
||||||
mqttError("Failed to start client daemon.");
|
mqttError("Failed to start client daemon.");
|
||||||
mqttCleanup(EXIT_FAILURE, -1, NULL);
|
mqttCleanup(EXIT_FAILURE, -1, NULL);
|
||||||
rc = -1;
|
rc = -1;
|
||||||
|
} else {
|
||||||
|
mqttPrint("listening for '%s' messages.", recntStatus.topic);
|
||||||
}
|
}
|
||||||
mqttPrint("listening for '%s' messages.", recnt_status.topic);
|
|
||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mqttStopSystem() {
|
void mqttStopSystem() {
|
||||||
mqttError("Injecting error: \"MQTT_ERROR_SOCKET_ERROR\"");
|
mqttClient.error = MQTT_ERROR_SOCKET_ERROR;
|
||||||
client.error = MQTT_ERROR_SOCKET_ERROR;
|
mttIsRuning = 0;
|
||||||
isStop = 0;
|
|
||||||
usleep(300000U);
|
usleep(300000U);
|
||||||
|
mqttCleanup(EXIT_SUCCESS, mqttClient.socketfd, &clientDaemonThread);
|
||||||
|
mqttPrint("mqtt is stoped");
|
||||||
}
|
}
|
||||||
|
|
||||||
void mqttCleanUpSystem() {
|
void mqttCleanUpSystem() {
|
||||||
mqttPrint("starting to clean up mqtt");
|
mqttPrint("starting to clean up mqtt");
|
||||||
mqttCleanup(EXIT_SUCCESS, client.socketfd, &client_daemon);
|
free(recntStatus.user_name);
|
||||||
taos_cleanup(mqtt_conn);
|
free(recntStatus.password);
|
||||||
free(recnt_status.user_name);
|
free(recntStatus.hostname);
|
||||||
free(recnt_status.password);
|
free(recntStatus.port);
|
||||||
free(recnt_status.hostname);
|
free(recntStatus.topic);
|
||||||
free(recnt_status.port);
|
|
||||||
free(recnt_status.topic);
|
|
||||||
free(topicPath);
|
free(topicPath);
|
||||||
|
|
||||||
mqttPrint("mqtt is cleaned up");
|
mqttPrint("mqtt is cleaned up");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -123,9 +127,9 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published
|
||||||
char _token[128] = {0};
|
char _token[128] = {0};
|
||||||
char _dbname[128] = {0};
|
char _dbname[128] = {0};
|
||||||
char _tablename[128] = {0};
|
char _tablename[128] = {0};
|
||||||
if (mqtt_conn == NULL) {
|
if (mqttConnect == NULL) {
|
||||||
mqttPrint("connect database");
|
mqttPrint("connect database");
|
||||||
taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &client, &mqtt_conn);
|
taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &mqttClient, &mqttConnect);
|
||||||
}
|
}
|
||||||
if (topic_name[1]=='/' && strncmp((char*)&topic_name[1], topicPath, strlen(topicPath)) == 0) {
|
if (topic_name[1]=='/' && strncmp((char*)&topic_name[1], topicPath, strlen(topicPath)) == 0) {
|
||||||
char* p_p_cmd_part[5] = {0};
|
char* p_p_cmd_part[5] = {0};
|
||||||
|
@ -141,10 +145,10 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published
|
||||||
mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname,
|
mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname,
|
||||||
_tablename);
|
_tablename);
|
||||||
|
|
||||||
if (mqtt_conn != NULL) {
|
if (mqttConnect != NULL) {
|
||||||
char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename);
|
char* _sql = converJsonToSql((char*)published->application_message, _dbname, _tablename);
|
||||||
mqttPrint("query:%s", _sql);
|
mqttPrint("query:%s", _sql);
|
||||||
taos_query_a(mqtt_conn, _sql, mqttQueryInsertCallback, &client);
|
taos_query_a(mqttConnect, _sql, mqttQueryInsertCallback, &mqttClient);
|
||||||
mqttPrint("free sql:%s", _sql);
|
mqttPrint("free sql:%s", _sql);
|
||||||
free(_sql);
|
free(_sql);
|
||||||
}
|
}
|
||||||
|
@ -154,7 +158,7 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published
|
||||||
}
|
}
|
||||||
|
|
||||||
void* mqttClientRefresher(void* client) {
|
void* mqttClientRefresher(void* client) {
|
||||||
while (isStop) {
|
while (mttIsRuning) {
|
||||||
mqtt_sync((struct mqtt_client*)client);
|
mqtt_sync((struct mqtt_client*)client);
|
||||||
usleep(100000U);
|
usleep(100000U);
|
||||||
}
|
}
|
||||||
|
@ -171,8 +175,8 @@ void mqttCleanup(int status, int sockfd, pthread_t* client_daemon) {
|
||||||
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) {
|
void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code) {
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code));
|
mqttError("mqtt:%d, connect to database failed, reason:%s", code, tstrerror(code));
|
||||||
taos_close(mqtt_conn);
|
taos_close(mqttConnect);
|
||||||
mqtt_conn = NULL;
|
mqttConnect = NULL;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
mqttTrace("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code));
|
mqttTrace("mqtt:%d, connect to database success, reason:%s", code, tstrerror(code));
|
||||||
|
|
Loading…
Reference in New Issue