commit
0f20457105
|
@ -122,12 +122,11 @@ void mqttCleanUpSystem() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) {
|
void mqtt_PublishCallback(void** unused, struct mqtt_response_publish* published) {
|
||||||
mqttPrint("mqtt_PublishCallback");
|
|
||||||
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
|
/* note that published->topic_name is NOT null-terminated (here we'll change it to a c-string) */
|
||||||
char* topic_name = (char*)malloc(published->topic_name_size + 1);
|
char* topic_name = (char*)malloc(published->topic_name_size + 1);
|
||||||
memcpy(topic_name, published->topic_name, published->topic_name_size);
|
memcpy(topic_name, published->topic_name, published->topic_name_size);
|
||||||
topic_name[published->topic_name_size] = '\0';
|
topic_name[published->topic_name_size] = '\0';
|
||||||
mqttPrint("Received publish('%s'): %s", topic_name, (const char*)published->application_message);
|
mqttPrint("received publish('%s'): %s", topic_name, (const char*)published->application_message);
|
||||||
char _token[128] = {0};
|
char _token[128] = {0};
|
||||||
char _dbname[128] = {0};
|
char _dbname[128] = {0};
|
||||||
char _tablename[128] = {0};
|
char _tablename[128] = {0};
|
||||||
|
@ -166,12 +165,12 @@ void* mqttClientRefresher(void* client) {
|
||||||
mqtt_sync((struct mqtt_client*)client);
|
mqtt_sync((struct mqtt_client*)client);
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
mqttPrint("Exit mqttClientRefresher");
|
mqttTrace("quit refresher");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mqttCleanup(int status, int sockfd, pthread_t* client_daemon) {
|
void mqttCleanup(int status, int sockfd, pthread_t* client_daemon) {
|
||||||
mqttPrint("mqttCleanup");
|
mqttPrint("clean up mqtt module");
|
||||||
if (sockfd != -1) close(sockfd);
|
if (sockfd != -1) close(sockfd);
|
||||||
if (client_daemon != NULL) pthread_cancel(*client_daemon);
|
if (client_daemon != NULL) pthread_cancel(*client_daemon);
|
||||||
}
|
}
|
||||||
|
@ -197,7 +196,7 @@ void mqttQueryInsertCallback(void* param, TAOS_RES* result, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr) {
|
void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr) {
|
||||||
mqttPrint("mqttReconnectClient");
|
mqttPrint("reconnect client");
|
||||||
struct reconnect_state_t* reconnect_state = *((struct reconnect_state_t**)reconnect_state_vptr);
|
struct reconnect_state_t* reconnect_state = *((struct reconnect_state_t**)reconnect_state_vptr);
|
||||||
|
|
||||||
/* Close the clients socket if this isn't the initial reconnect call */
|
/* Close the clients socket if this isn't the initial reconnect call */
|
||||||
|
@ -213,7 +212,7 @@ void mqttReconnectClient(struct mqtt_client* client, void** reconnect_state_vptr
|
||||||
/* Open a new socket. */
|
/* Open a new socket. */
|
||||||
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
|
int sockfd = open_nb_socket(reconnect_state->hostname, reconnect_state->port);
|
||||||
if (sockfd == -1) {
|
if (sockfd == -1) {
|
||||||
mqttError("Failed to open socket: ");
|
mqttError("failed to open socket: ");
|
||||||
mqttCleanup(EXIT_FAILURE, sockfd, NULL);
|
mqttCleanup(EXIT_FAILURE, sockfd, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue