[TD-6443]<feature>: Support OpenTSDB HTTP JSON data import format

This commit is contained in:
Ganlin Zhao 2021-09-06 12:32:49 +08:00
parent 7d9a9f919e
commit 480c621f7b
1 changed files with 149 additions and 3 deletions

View File

@ -543,8 +543,147 @@ int32_t parseTimestampFromJSON(cJSON *root, TAOS_SML_KV **pTS, int *num_kvs, SSm
} }
int32_t convertJSONBoolValue(TAOS_SML_KV *pVal, char* typeStr, int64_t valueInt, SSmlLinesInfo* info) {
if (strcasecmp(typeStr, "bool") != 0) {
tscError("OTD:0x%"PRIx64" invalid type(%s) in JSON payload", info->id, typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(bool *)(pVal->value) = valueInt ? true : false;
int32_t parseValueFromJSON(cJSON *root, TAOS_SML_KV *pVal) { return TSDB_CODE_SUCCESS;
}
int32_t parseValueFromJSONObj(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info) {
int32_t ret = TSDB_CODE_SUCCESS;
int32_t size = cJSON_GetArraySize(root);
if (size != JSON_SUB_FIELDS_NUM) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (value == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (!cJSON_IsString(type)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
switch (value->type) {
case cJSON_True:
case cJSON_False: {
ret = convertJSONBoolValue(pVal, type->valuestring, value->valueint, info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
break;
}
case cJSON_Number: {
//tinyint
if (strcasecmp(type->valuestring, "i8") == 0) {
if (!IS_VALID_TINYINT(value->valueint)) {
tscError("OTD:0x%"PRIx64" JSON value(%ld) cannot fit in type(tinyint)", info->id, value->valueint);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_TINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(int8_t *)(pVal->value) = (int8_t)(value->valueint);
return TSDB_CODE_SUCCESS;
}
//smallint
if (strcasecmp(type->valuestring, "i16") == 0) {
if (!IS_VALID_SMALLINT(value->valueint)) {
tscError("OTD:0x%"PRIx64" JSON value(%ld) cannot fit in type(smallint)", info->id, value->valueint);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_SMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(int16_t *)(pVal->value) = (int16_t)(value->valueint);
return TSDB_CODE_SUCCESS;
}
//int
if (strcasecmp(type->valuestring, "i32") == 0) {
if (!IS_VALID_INT(value->valueint)) {
tscError("OTD:0x%"PRIx64" JSON value(%ld) cannot fit in type(int)", info->id, value->valueint);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_INT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(int32_t *)(pVal->value) = (int32_t)(value->valueint);
return TSDB_CODE_SUCCESS;
}
//bigint
if (strcasecmp(type->valuestring, "i64") == 0) {
if (!IS_VALID_BIGINT(value->valueint)) {
tscError("OTD:0x%"PRIx64" JSON value(%ld) cannot fit in type(bigint)", info->id, value->valueint);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(int64_t *)(pVal->value) = (int64_t)(value->valueint);
return TSDB_CODE_SUCCESS;
}
//float
if (strcasecmp(type->valuestring, "f32") == 0) {
if (!IS_VALID_FLOAT(value->valuedouble)) {
tscError("OTD:0x%"PRIx64" JSON value(%f) cannot fit in type(float)", info->id, value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(float *)(pVal->value) = (float)(value->valuedouble);
return TSDB_CODE_SUCCESS;
}
//double
if (strcasecmp(type->valuestring, "f64") == 0) {
if (!IS_VALID_DOUBLE(value->valuedouble)) {
tscError("OTD:0x%"PRIx64" JSON value(%f) cannot fit in type(double)", info->id, value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->value = tcalloc(pVal->length, 1);
*(double *)(pVal->value) = (double)(value->valuedouble);
return TSDB_CODE_SUCCESS;
}
//if reach here means type string is not supported
tscError("OTD:0x%"PRIx64" invalid type(%s) in JSON payload", info->id, type->valuestring);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
case cJSON_String: {
if (strcasecmp(type->valuestring, "binary") == 0) {
pVal->type = TSDB_DATA_TYPE_BINARY;
} else if (strcasecmp(type->valuestring, "nchar") == 0) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
} else {
tscError("OTD:0x%"PRIx64" invalid type(%s) in JSON payload", info->id, type->valuestring);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
pVal->length = strlen(root->valuestring);
pVal->value = tcalloc(pVal->length + 1, 1);
memcpy(pVal->value, root->valuestring, pVal->length);
return TSDB_CODE_SUCCESS;
}
default:
return TSDB_CODE_TSC_INVALID_JSON;
}
return TSDB_CODE_SUCCESS;
}
int32_t parseValueFromJSON(cJSON *root, TAOS_SML_KV *pVal, SSmlLinesInfo* info) {
int type = root->type; int type = root->type;
switch (type) { switch (type) {
@ -573,6 +712,10 @@ int32_t parseValueFromJSON(cJSON *root, TAOS_SML_KV *pVal) {
memcpy(pVal->value, root->valuestring, pVal->length); memcpy(pVal->value, root->valuestring, pVal->length);
break; break;
} }
case cJSON_Object: {
parseValueFromJSONObj(root, pVal, info);
break;
}
default: default:
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
@ -590,7 +733,7 @@ int32_t parseMetricValueFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs,
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
int32_t ret = parseValueFromJSON(metricVal, pVal); int32_t ret = parseValueFromJSON(metricVal, pVal, info);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }
@ -640,12 +783,15 @@ int32_t parseTagsFromJSON(cJSON *root, TAOS_SML_KV **pKVs, int *num_kvs, char **
for (int32_t i = 0; i < tagNum; ++i) { for (int32_t i = 0; i < tagNum; ++i) {
cJSON *tag = cJSON_GetArrayItem(tags, i); cJSON *tag = cJSON_GetArrayItem(tags, i);
if (tag == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
//key //key
int32_t keyLen = strlen(tag->string); int32_t keyLen = strlen(tag->string);
pkv->key = tcalloc(keyLen + 1, sizeof(char)); pkv->key = tcalloc(keyLen + 1, sizeof(char));
strncpy(pkv->key, tag->string, keyLen); strncpy(pkv->key, tag->string, keyLen);
//value //value
ret = parseValueFromJSON(tag, pkv); ret = parseValueFromJSON(tag, pkv, info);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
} }