From 42f83c3cb0f5000b22647528064c95f6c8d30147 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 3 Sep 2020 15:35:54 +0000 Subject: [PATCH] TD-1310 minor changes --- src/plugins/mqtt/src/mqttPayload.c | 12 ++++++++---- src/plugins/mqtt/src/mqttSystem.c | 10 ++++++---- tests/script/general/connection/mqtt.sim | 6 +++--- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/plugins/mqtt/src/mqttPayload.c b/src/plugins/mqtt/src/mqttPayload.c index ab3cd4c633..1af8b02fad 100644 --- a/src/plugins/mqtt/src/mqttPayload.c +++ b/src/plugins/mqtt/src/mqttPayload.c @@ -55,6 +55,9 @@ } */ +// send msg cmd +// mosquitto_pub -h test.mosquitto.org -t "/test" -m '{"timestamp": 1599121290,"gateway": {"name": "AcuLink 810 Gateway","model": "AcuLink810-868","serial": "S8P20200207"},"device": {"name": "Acuvim L V3 .221","model": "Acuvim-L-V3","serial": "221","online": true,"readings": [{"param": "Freq_Hz","value": "59.977539","unit": "Hz"},{"param": "Va_V","value": "122.002907","unit": "V"},{"param": "DI4","value": "5.000000","unit": ""}]}}' + /* * This is an example, this function needs to be implemented in order to parse the json file into a sql statement * Note that you need to create a super table and database before writing data @@ -65,6 +68,7 @@ char* mqttConverJsonToSql(char* json, int maxSize) { // const int32_t maxSize = 10240; + maxSize *= 5; char* sql = malloc(maxSize); cJSON* root = cJSON_Parse(json); @@ -139,10 +143,10 @@ char* mqttConverJsonToSql(char* json, int maxSize) { goto MQTT_PARSE_OVER; } - len += snprintf(sql, maxSize - len, - " mqttdb.%s using mqttdb.devices tags('%s', '%s', '%s', '%s', '%s') values(%" PRId64 ", %s)", - serial->valuestring, name->valuestring, model->valuestring, serial->valuestring, param->valuestring, - unit->valuestring, timestamp->valueint * 1000, value->valuestring); + len += snprintf(sql + len, maxSize - len, + " mqttdb.serial_%s_%s using mqttdb.devices tags('%s', '%s', '%s', '%s', '%s') values(%" PRId64 ", %s)", + serial->valuestring, param->valuestring, name->valuestring, model->valuestring, serial->valuestring, + param->valuestring, unit->valuestring, timestamp->valueint * 1000, value->valuestring); } cJSON_free(root); diff --git a/src/plugins/mqtt/src/mqttSystem.c b/src/plugins/mqtt/src/mqttSystem.c index 0779fd6d72..8079cedb27 100644 --- a/src/plugins/mqtt/src/mqttSystem.c +++ b/src/plugins/mqtt/src/mqttSystem.c @@ -74,10 +74,10 @@ void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) if (tsMqttConnect == NULL) { tsMqttConnect = taos_connect(NULL, "_root", tsInternalPass, "", 0); if (tsMqttConnect == NULL) { - mqttError("failed to connect tdengine"); + mqttError("failed to connect to tdengine"); return; } else { - mqttInfo("successed to connect tdengine"); + mqttInfo("successfully connected to the tdengine"); } } @@ -88,7 +88,9 @@ void mqttPublishCallback(void** unused, struct mqtt_response_publish* published) void* res = taos_query(tsMqttConnect, sql); int code = taos_errno(res); if (code != 0) { - mqttError("failed to exec sql%s", sql); + mqttError("failed to exec sql:%s", sql); + } else { + mqttDebug("successfully to exec sql:%s", sql); } taos_free_result(res); } else { @@ -136,6 +138,6 @@ void mqttReconnectClient(struct mqtt_client* client, void** unused) { } mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz); - mqtt_connect(client, "tsMqttClientId", NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400); + mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400); mqtt_subscribe(client, tsMqttTopic, 0); } \ No newline at end of file diff --git a/tests/script/general/connection/mqtt.sim b/tests/script/general/connection/mqtt.sim index f003252c5a..4b291f91ea 100644 --- a/tests/script/general/connection/mqtt.sim +++ b/tests/script/general/connection/mqtt.sim @@ -10,10 +10,10 @@ system sh/cfg.sh -n dnode1 -c mqtt -v 1 system sh/exec.sh -n dnode1 -s start -sql sleep 3000 +sleep 3000 sql connect sql create database mqttdb; -sql create table mqttdb.devices(ts timestamp, value bigint) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16)); +sql create table mqttdb.devices(ts timestamp, value double) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16)); -sql sleep 1000 +sleep 1000 system sh/exec.sh -n dnode1 -s stop -x SIGINT