Reconnect Mqtt Broker when connection fails

This commit is contained in:
MysticBoy 2020-05-24 12:54:48 +08:00
parent df503b11ab
commit cb9ce34d06
3 changed files with 18 additions and 10 deletions

View File

@ -30,7 +30,7 @@ extern "C" {
char split(char str[], char delims[], char** p_p_cmd_part, int max); char split(char str[], char delims[], char** p_p_cmd_part, int max);
void mqttConnnectLost(void* context, char* cause); void mqttConnnectLost(void* context, char* cause);
int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message); int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_message* message);
void mqtt_query_insert_callback(void* param, TAOS_RES* result, int32_t code); void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code);
void onDisconnectFailure(void* context, MQTTAsync_failureData* response); void onDisconnectFailure(void* context, MQTTAsync_failureData* response);
void onDisconnect(void* context, MQTTAsync_successData* response); void onDisconnect(void* context, MQTTAsync_successData* response);
void onSubscribe(void* context, MQTTAsync_successData* response); void onSubscribe(void* context, MQTTAsync_successData* response);

View File

@ -14,7 +14,7 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mqttUitl.h" #include "mqttPayload.h"
#include "cJSON.h" #include "cJSON.h"
#include "string.h" #include "string.h"
#include "taos.h" #include "taos.h"
@ -58,7 +58,7 @@ char* converJsonToSql(char* json, char* _dbname, char* _tablename) {
} }
} }
cJSON_free(jPlayload); cJSON_free(jPlayload);
char _sql[102400] = {0}; char* _sql = calloc(0, strlen(_names) + strlen(_values) + strlen(_dbname) + strlen(_tablename) + 1024);
sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values); sprintf(_sql, "INSERT INTO %s.%s (%s) VALUES(%s);", _dbname, _tablename, _names, _values);
return _sql; return _sql;
} }

View File

@ -27,7 +27,7 @@
#include "tsocket.h" #include "tsocket.h"
#include "ttimer.h" #include "ttimer.h"
#include "mqttInit.h" #include "mqttInit.h"
#include "mqttPlyload.h" #include "mqttPayload.h"
MQTTAsync client; MQTTAsync client;
MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer; MQTTAsync_connectOptions conn_opts = MQTTAsync_connectOptions_initializer;
@ -78,10 +78,12 @@ int mqttMessageArrived(void* context, char* topicName, int topicLen, MQTTAsync_m
strncpy(_tablename, p_p_cmd_part[3], 127); strncpy(_tablename, p_p_cmd_part[3], 127);
mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname, mqttPrint("part count=%d,access token:%s,database name:%s, table name:%s", part_index, _token, _dbname,
_tablename); _tablename);
char* sql = converJsonToSql((char*)message->payload, _dbname, _tablename);
if (mqtt_conn != NULL) { if (mqtt_conn != NULL) {
char* _sql = converJsonToSql((char*)message->payload, _dbname, _tablename);
mqttPrint("query:%s", _sql); mqttPrint("query:%s", _sql);
taos_query_a(mqtt_conn, _sql, mqttQueryInsertCallback, &client); taos_query_a(mqtt_conn, _sql, mqttQueryInsertCallback, &client);
free(_sql);
} }
} }
} }
@ -100,12 +102,12 @@ void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) {
} }
void onDisconnectFailure(void* context, MQTTAsync_failureData* response) { void onDisconnectFailure(void* context, MQTTAsync_failureData* response) {
mqttError("Disconnect failed, rc %d\n", response->code); mqttError("Disconnect failed, rc %d", response->code);
disc_finished = 1; disc_finished = 1;
} }
void onDisconnect(void* context, MQTTAsync_successData* response) { void onDisconnect(void* context, MQTTAsync_successData* response) {
mqttError("Successful disconnection\n"); mqttError("Successful disconnection");
if (mqtt_conn != NULL) { if (mqtt_conn != NULL) {
taos_close(mqtt_conn); taos_close(mqtt_conn);
mqtt_conn = NULL; mqtt_conn = NULL;
@ -114,18 +116,24 @@ void onDisconnect(void* context, MQTTAsync_successData* response) {
} }
void onSubscribe(void* context, MQTTAsync_successData* response) { void onSubscribe(void* context, MQTTAsync_successData* response) {
mqttPrint("Subscribe succeeded\n"); mqttPrint("Subscribe succeeded");
subscribed = 1; subscribed = 1;
} }
void onSubscribeFailure(void* context, MQTTAsync_failureData* response) { void onSubscribeFailure(void* context, MQTTAsync_failureData* response) {
mqttError("Subscribe failed, rc %d\n", response->code); mqttError("Subscribe failed, rc %d", response->code);
finished = 1; finished = 1;
} }
void onConnectFailure(void* context, MQTTAsync_failureData* response) { void onConnectFailure(void* context, MQTTAsync_failureData* response) {
mqttError("Connect failed, rc %d\n", response->code); mqttError("Connect failed, rc %d,,Retry later", response->code);
finished = 1; finished = 1;
taosMsleep(1000);
int rc = 0;
if ((rc = MQTTAsync_connect(client, &conn_opts)) != MQTTASYNC_SUCCESS) {
mqttError("Failed to start connect, return code %d", rc);
finished = 1;
}
} }
void onConnect(void* context, MQTTAsync_successData* response) { void onConnect(void* context, MQTTAsync_successData* response) {