[TD-6443]<feature>: Support OpenTSDB HTTP JSON data import format

This commit is contained in:
Ganlin Zhao 2021-09-06 12:32:49 +08:00
parent c0a554e019
commit 4b9b44d0eb
1 changed files with 50 additions and 60 deletions

View File

@ -495,6 +495,42 @@ int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSm
} }
int32_t parseValueFromJSON(cJSON *root, TAOS_SML_KV *pVal) {
int type = root->type;
switch (type) {
case cJSON_True:
case cJSON_False: {
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(bool *)(pVal->value) = type ? true : false;
break;
}
case cJSON_Number: {
//convert default JSON Number type to float
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(float *)(pVal->value) = (float)(root->valuedouble);
break;
}
case cJSON_String: {
//convert default JSON String type to nchar
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length = wcslen((wchar_t *)root->valuestring) * TSDB_NCHAR_SIZE;
pVal->value = tcalloc(pVal->length + 1, 1);
memcpy(pVal->value, root->valuestring, pVal->length);
break;
}
default:
return TSDB_CODE_TSC_INVALID_JSON;
}
return TSDB_CODE_SUCCESS;
}
int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, SSmlLinesInfo* info) { int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, SSmlLinesInfo* info) {
//skip timestamp //skip timestamp
TAOS_SML_KV *pVal = *pKVs + 1; TAOS_SML_KV *pVal = *pKVs + 1;
@ -503,36 +539,12 @@ int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs,
cJSON *metricVal = cJSON_GetObjectItem(root, "value"); cJSON *metricVal = cJSON_GetObjectItem(root, "value");
if (metricVal == NULL) { if (metricVal == NULL) {
tscError("OTD:0x%"PRIx64" failed to parse metric value from JSON Payload", info->id); tscError("OTD:0x%"PRIx64" failed to parse metric value from JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
switch (metricVal->type) { int32_t ret = parseValueFromJSON(metricVal, pVal);
case cJSON_True: if (ret != TSDB_CODE_SUCCESS) {
case cJSON_False: { return ret;
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(bool *)(pVal->value) = metricVal->type ? true : false;
break;
}
case cJSON_Number: {
//convert default JSON Number type to float
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(float *)(pVal->value) = (float)(metricVal->valuedouble);
break;
}
case cJSON_String: {
//convert default JSON String type to nchar
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length = wcslen((wchar_t *)metricVal->valuestring) * TSDB_NCHAR_SIZE;
pVal->value = tcalloc(pVal->length + 1, 1);
memcpy(pVal->value, metricVal->valuestring, pVal->length);
break;
}
default:
return TSDB_CODE_TSC_INVALID_JSON;
} }
pVal->key = tcalloc(sizeof(key), 1); pVal->key = tcalloc(sizeof(key), 1);
@ -544,6 +556,7 @@ int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs,
} }
int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **childTableName, SSmlLinesInfo* info) { int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **childTableName, SSmlLinesInfo* info) {
int32_t ret;
cJSON *tags = cJSON_GetObjectItem(root, "tags"); cJSON *tags = cJSON_GetObjectItem(root, "tags");
if (tags == NULL || tags->type != cJSON_Object) { if (tags == NULL || tags->type != cJSON_Object) {
@ -554,8 +567,8 @@ int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **
cJSON *id = cJSON_GetObjectItem(root, "ID"); cJSON *id = cJSON_GetObjectItem(root, "ID");
if (id != NULL) { if (id != NULL) {
int32_t idLen = strlen(id->string); int32_t idLen = strlen(id->string);
int32_t ret = isValidChildTableName(id->string, idLen); ret = isValidChildTableName(id->string, idLen);
if (ret) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
*childTableName = tcalloc(idLen + 1, sizeof(char)); *childTableName = tcalloc(idLen + 1, sizeof(char));
@ -577,39 +590,16 @@ int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **
for (int32_t i = 0; i < tagNum; ++i) { for (int32_t i = 0; i < tagNum; ++i) {
cJSON *tag = cJSON_GetArrayItem(tags, i); cJSON *tag = cJSON_GetArrayItem(tags, i);
//key //key
pkv->key = tcalloc(strlen(tag->string) + 1, sizeof(char)); int32_t keyLen = strlen(tag->string);
strncpy(pkv->key, tag->string, strlen(tag->string)); pkv->key = tcalloc(keyLen + 1, sizeof(char));
strncpy(pkv->key, tag->string, keyLen);
//value //value
switch (tag->type) { ret = parseValueFromJSON(tag, pkv);
case cJSON_True: if (ret != TSDB_CODE_SUCCESS) {
case cJSON_False: { return ret;
pkv->type = TSDB_DATA_TYPE_BOOL;
pkv->length = (int16_t)tDataTypes[pkv->type].bytes;
pkv->value = tcalloc(pkv->length, 1);
*(bool *)(pkv->value) = tag->type ? true : false;
break;
}
case cJSON_Number: {
//convert default JSON Number type to float
pkv->type = TSDB_DATA_TYPE_FLOAT;
pkv->length = (int16_t)tDataTypes[pkv->type].bytes;
pkv->value = tcalloc(pkv->length, 1);
*(float *)(pkv->value) = (float)(tag->valuedouble);
break;
}
case cJSON_String: {
//convert default JSON String type to nchar
pkv->type = TSDB_DATA_TYPE_NCHAR;
pkv->length = wcslen((wchar_t *)tag->valuestring) * TSDB_NCHAR_SIZE;
pkv->value = tcalloc(pkv->length + 1, 1);
memcpy(pkv->value, tag->valuestring, pkv->length);
break;
}
default:
tfree(pkv->key);
return TSDB_CODE_TSC_INVALID_JSON;
} }
*num_kvs += 1; *num_kvs += 1;
pkv++;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;