TD-1310 minor changes

This commit is contained in:
Shengliang Guan 2020-09-03 15:35:54 +00:00
parent 932761a2e1
commit 42f83c3cb0
3 changed files with 17 additions and 11 deletions

View File

@ -55,6 +55,9 @@
} }
*/ */
// send msg cmd
// mosquitto_pub -h test.mosquitto.org -t "/test" -m '{"timestamp": 1599121290,"gateway": {"name": "AcuLink 810 Gateway","model": "AcuLink810-868","serial": "S8P20200207"},"device": {"name": "Acuvim L V3 .221","model": "Acuvim-L-V3","serial": "221","online": true,"readings": [{"param": "Freq_Hz","value": "59.977539","unit": "Hz"},{"param": "Va_V","value": "122.002907","unit": "V"},{"param": "DI4","value": "5.000000","unit": ""}]}}'
/* /*
* This is an example, this function needs to be implemented in order to parse the json file into a sql statement * This is an example, this function needs to be implemented in order to parse the json file into a sql statement
* Note that you need to create a super table and database before writing data * Note that you need to create a super table and database before writing data
@ -65,6 +68,7 @@
char* mqttConverJsonToSql(char* json, int maxSize) { char* mqttConverJsonToSql(char* json, int maxSize) {
// const int32_t maxSize = 10240; // const int32_t maxSize = 10240;
maxSize *= 5;
char* sql = malloc(maxSize); char* sql = malloc(maxSize);
cJSON* root = cJSON_Parse(json); cJSON* root = cJSON_Parse(json);
@ -139,10 +143,10 @@ char* mqttConverJsonToSql(char* json, int maxSize) {
goto MQTT_PARSE_OVER; goto MQTT_PARSE_OVER;
} }
len += snprintf(sql, maxSize - len, len += snprintf(sql + len, maxSize - len,
" mqttdb.%s using mqttdb.devices tags('%s', '%s', '%s', '%s', '%s') values(%" PRId64 ", %s)", " mqttdb.serial_%s_%s using mqttdb.devices tags('%s', '%s', '%s', '%s', '%s') values(%" PRId64 ", %s)",
serial->valuestring, name->valuestring, model->valuestring, serial->valuestring, param->valuestring, serial->valuestring, param->valuestring, name->valuestring, model->valuestring, serial->valuestring,
unit->valuestring, timestamp->valueint * 1000, value->valuestring); param->valuestring, unit->valuestring, timestamp->valueint * 1000, value->valuestring);
} }
cJSON_free(root); cJSON_free(root);

View File

@ -74,10 +74,10 @@ void mqttPublishCallback(void** unused, struct mqtt_response_publish* published)
if (tsMqttConnect == NULL) { if (tsMqttConnect == NULL) {
tsMqttConnect = taos_connect(NULL, "_root", tsInternalPass, "", 0); tsMqttConnect = taos_connect(NULL, "_root", tsInternalPass, "", 0);
if (tsMqttConnect == NULL) { if (tsMqttConnect == NULL) {
mqttError("failed to connect tdengine"); mqttError("failed to connect to tdengine");
return; return;
} else { } else {
mqttInfo("successed to connect tdengine"); mqttInfo("successfully connected to the tdengine");
} }
} }
@ -88,7 +88,9 @@ void mqttPublishCallback(void** unused, struct mqtt_response_publish* published)
void* res = taos_query(tsMqttConnect, sql); void* res = taos_query(tsMqttConnect, sql);
int code = taos_errno(res); int code = taos_errno(res);
if (code != 0) { if (code != 0) {
mqttError("failed to exec sql%s", sql); mqttError("failed to exec sql:%s", sql);
} else {
mqttDebug("successfully to exec sql:%s", sql);
} }
taos_free_result(res); taos_free_result(res);
} else { } else {
@ -136,6 +138,6 @@ void mqttReconnectClient(struct mqtt_client* client, void** unused) {
} }
mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz); mqtt_reinit(client, sockfd, tsMqttStatus.sendbuf, tsMqttStatus.sendbufsz, tsMqttStatus.recvbuf, tsMqttStatus.recvbufsz);
mqtt_connect(client, "tsMqttClientId", NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400); mqtt_connect(client, tsMqttClientId, NULL, NULL, 0, tsMqttUser, tsMqttPass, MQTT_CONNECT_CLEAN_SESSION, 400);
mqtt_subscribe(client, tsMqttTopic, 0); mqtt_subscribe(client, tsMqttTopic, 0);
} }

View File

@ -10,10 +10,10 @@ system sh/cfg.sh -n dnode1 -c mqtt -v 1
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
sql sleep 3000 sleep 3000
sql connect sql connect
sql create database mqttdb; sql create database mqttdb;
sql create table mqttdb.devices(ts timestamp, value bigint) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16)); sql create table mqttdb.devices(ts timestamp, value double) tags(name binary(32), model binary(32), serial binary(16), param binary(16), unit binary(16));
sql sleep 1000 sleep 1000
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT