diff --git a/src/plugins/mqtt/inc/mqttInit.h b/src/plugins/mqtt/inc/mqttInit.h index 9534a786d9..a183cd2f5f 100644 --- a/src/plugins/mqtt/inc/mqttInit.h +++ b/src/plugins/mqtt/inc/mqttInit.h @@ -30,7 +30,7 @@ extern "C" { char split(char str[], char delims[], char** p_p_cmd_part, int max); void mqttConnnectLost(void* context, char* cause); int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message); -void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code); +void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code); void onDisconnectFailure(void* context, MQTTAsync_failureData* response); void onDisconnect(void* context, MQTTAsync_successData* response); void onSubscribe(void* context, MQTTAsync_successData* response); diff --git a/src/plugins/mqtt/src/mqttPayload.c b/src/plugins/mqtt/src/mqttPayload.c index 3f4cb5b3d9..b8f5ec3135 100644 --- a/src/plugins/mqtt/src/mqttPayload.c +++ b/src/plugins/mqtt/src/mqttPayload.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "mqttUitl.h" +#include "mqttPayload.h" #include "cJSON.h" #include "string.h" #include "taos.h" @@ -58,7 +58,7 @@ char* converJsonToSql(char* json, char* _dbname, char* _tablename) { } } cJSON_free(jPlayload); - char _sql[102400] = {0}; + char* _sql = calloc(0, strlen(_names) + strlen(_values) + strlen(_dbname) + strlen(_tablename) + 1024); sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values); return _sql; } \ No newline at end of file diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 6f3bd0aee8..97fbcfba80 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -27,7 +27,7 @@ #include "tsocket.h" #include "ttimer.h" #include "mqttInit.h" -#include "mqttPlyload.h" +#include "mqttPayload.h" MQTTAsync client; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; @@ -78,10 +78,12 @@ int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_m strncpy(_tablename, p_p_cmd_part[3], 127); mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname, _tablename); - char* sql = converJsonToSql((char*)message->payload, _dbname, _tablename); + if (mqtt_conn != NULL) { + char* _sql = converJsonToSql((char*)message->payload, _dbname, _tablename); mqttPrint("query:%s", _sql); taos_query_a(mqtt_conn, _sql, mqttQueryInsertCallback, &client); + free(_sql); } } } @@ -100,12 +102,12 @@ void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) { } void onDisconnectFailure(void* context, MQTTAsync_failureData* response) { - mqttError("Disconnect failed, rc %d\n", response->code); + mqttError("Disconnect failed, rc %d", response->code); disc_finished = 1; } void onDisconnect(void* context, MQTTAsync_successData* response) { - mqttError("Successful disconnection\n"); + mqttError("Successful disconnection"); if (mqtt_conn != NULL) { taos_close(mqtt_conn); mqtt_conn = NULL; @@ -114,18 +116,24 @@ void onDisconnect(void* context, MQTTAsync_successData* response) { } void onSubscribe(void* context, MQTTAsync_successData* response) { - mqttPrint("Subscribe succeeded\n"); + mqttPrint("Subscribe succeeded"); subscribed = 1; } void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { - mqttError("Subscribe failed, rc %d\n", response->code); + mqttError("Subscribe failed, rc %d", response->code); finished = 1; } void onConnectFailure(void* context, MQTTAsync_failureData* response) { - mqttError("Connect failed, rc %d\n", response->code); + mqttError("Connect failed, rc %d,,Retry later", response->code); finished = 1; + taosMsleep(1000); + int rc = 0; + if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) { + mqttError("Failed to start connect, return code %d", rc); + finished = 1; + } } void onConnect(void* context, MQTTAsync_successData* response) {