diff --git a/src/plugins/mqtt/inc/mqttInit.h b/src/plugins/mqtt/inc/mqttInit.h index 61f7ef9a45..5dbd62789b 100644 --- a/src/plugins/mqtt/inc/mqttInit.h +++ b/src/plugins/mqtt/inc/mqttInit.h @@ -77,9 +77,6 @@ void* mqttClientRefresher(void* client); void mqttCleanup(int status, int sockfd, pthread_t* client_daemon); void mqttInitConnCb(void* param, TAOS_RES* result, int32_t code); void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code); -#define CLIENTID "taos" -#define TOPIC "+/+/+/" // path//// -#define PAYLOAD "Hello World!" #define QOS 1 #define TIMEOUT 10000L diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 45a8972d80..030d950a79 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -35,6 +35,7 @@ pthread_t client_daemon; void* mqtt_conn; struct reconnect_state_t recnt_status; char* topicPath; +int isStop = 1; int32_t mqttInitSystem() { int rc = 0; uint8_t sendbuf[2048]; @@ -65,9 +66,10 @@ int32_t mqttInitSystem() { topicPath = strbetween(strstr(url, strstr(_begin_hostname, ":") != NULL ? recnt_status.port : recnt_status.hostname), "/", "/"); - int _tpsize = strlen(topicPath) + strlen(TOPIC) + 1; + char* _topic = "+/+/+/"; + int _tpsize = strlen(topicPath) + strlen(_topic) + 1; recnt_status.topic = calloc(1, _tpsize); - snprintf(recnt_status.topic, _tpsize-1, "/%s/" TOPIC, topicPath); + sprintf(recnt_status.topic, "/%s/%s", topicPath, _topic); recnt_status.client_id = strlen(tsMqttBrokerClientId)<3? tsMqttBrokerClientId:"taos_mqtt"; @@ -86,17 +88,19 @@ int32_t mqttStartSystem() { mqttCleanup(EXIT_FAILURE, -1, NULL); rc = -1; } - mqttPrint("listening for '%s' messages.", TOPIC); + mqttPrint("listening for '%s' messages.", recnt_status.topic); return rc; } void mqttStopSystem() { mqttError("Injecting error: \"MQTT_ERROR_SOCKET_ERROR\""); client.error = MQTT_ERROR_SOCKET_ERROR; + isStop = 0; + usleep(300000U); } void mqttCleanUpSystem() { - mqttPrint("mqttCleanUpSystem"); + mqttPrint("starting to clean up mqtt"); mqttCleanup(EXIT_SUCCESS, client.socketfd, &client_daemon); taos_cleanup(mqtt_conn); free(recnt_status.user_name); @@ -105,6 +109,8 @@ void mqttCleanUpSystem() { free(recnt_status.port); free(recnt_status.topic); free(topicPath); + + mqttPrint("mqtt is cleaned up"); } void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) { @@ -119,15 +125,15 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published char _tablename[128] = {0}; if (mqtt_conn == NULL) { mqttPrint("connect database"); - taos_connect_a(NULL, "monitor", tsInternalPass, "", 0, mqttInitConnCb, &client, &mqtt_conn); + taos_connect_a(NULL, "_root", tsInternalPass, "", 0, mqttInitConnCb, &client, &mqtt_conn); } - if (strncmp(topic_name, "/taos/", 6) == 0) { + if (topic_name[1]=='/' && strncmp((char*)&topic_name[1], topicPath, strlen(topicPath)) == 0) { char* p_p_cmd_part[5] = {0}; char copystr[1024] = {0}; strncpy(copystr, topic_name, MIN(1024, published->topic_name_size)); char part_index = split(copystr, "/", p_p_cmd_part, 10); if (part_index < 4) { - mqttError("The topic %s is't format '%s'.", topic_name, TOPIC); + mqttError("The topic %s is't format '/path/token/dbname/table name/'. for expmle: '/taos/token/db/t'", topic_name); } else { strncpy(_token, p_p_cmd_part[1], 127); strncpy(_dbname, p_p_cmd_part[2], 127); @@ -148,10 +154,11 @@ void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published } void* mqttClientRefresher(void* client) { - while (1) { + while (isStop) { mqtt_sync((struct mqtt_client*)client); usleep(100000U); } + mqttPrint("Exit mqttClientRefresher"); return NULL; }