opti:performance for json in schemaless

This commit is contained in:
wangmm0220 2022-12-13 22:30:06 +08:00
parent 70281e5897
commit ac16bfc46d
4 changed files with 144 additions and 58 deletions

View File

@ -2,7 +2,7 @@
# taos-tools # taos-tools
ExternalProject_Add(taos-tools ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 4a4027c GIT_TAG sml/json
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR "" BINARY_DIR ""
#BUILD_IN_SOURCE TRUE #BUILD_IN_SOURCE TRUE

View File

@ -1376,7 +1376,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
return code; return code;
} }
info->cost.lineNum = numLines; info->cost.lineNum = info->lineNum;
info->cost.numOfSTables = nodeListSize(info->superTables); info->cost.numOfSTables = nodeListSize(info->superTables);
info->cost.numOfCTables = nodeListSize(info->childTables); info->cost.numOfCTables = nodeListSize(info->childTables);
@ -1463,10 +1463,9 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
request->code = code; request->code = code;
info->cost.endTime = taosGetTimestampUs(); info->cost.endTime = taosGetTimestampUs();
info->cost.code = code; info->cost.code = code;
smlPrintStatisticInfo(info); // smlPrintStatisticInfo(info);
end: end:
uDebug("resultend:%s", request->msgBuf);
smlDestroyInfo(info); smlDestroyInfo(info);
return (TAOS_RES *)request; return (TAOS_RES *)request;
} }

View File

@ -28,12 +28,7 @@ int32_t is_same_child_table_json(const void *a, const void *b){
return (cJSON_Compare((const cJSON *)a, (const cJSON *)b, true)) ? 0 : 1; return (cJSON_Compare((const cJSON *)a, (const cJSON *)b, true)) ? 0 : 1;
} }
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) { static inline int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *metric, SSmlLineInfo *elements) {
cJSON *metric = cJSON_GetObjectItem(root, "metric");
if (!cJSON_IsString(metric)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
elements->measureLen = strlen(metric->valuestring); elements->measureLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(elements->measureLen)) { if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id); uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
@ -110,7 +105,7 @@ static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPr
} }
} }
static uint8_t smlGetTimestampLen(int64_t num) { static inline uint8_t smlGetTimestampLen(int64_t num) {
uint8_t len = 0; uint8_t len = 0;
while ((num /= 10) != 0) { while ((num /= 10) != 0) {
len++; len++;
@ -119,10 +114,9 @@ static uint8_t smlGetTimestampLen(int64_t num) {
return len; return len;
} }
static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root) { static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
// Timestamp must be the first KV to parse // Timestamp must be the first KV to parse
int32_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO; int32_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
if (cJSON_IsNumber(timestamp)) { if (cJSON_IsNumber(timestamp)) {
// timestamp value 0 indicates current system time // timestamp value 0 indicates current system time
double timeDouble = timestamp->valuedouble; double timeDouble = timestamp->valuedouble;
@ -140,6 +134,7 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root) {
} }
uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble); uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
int8_t fromPrecision = smlGetTsTypeByLen(tsLen); int8_t fromPrecision = smlGetTsTypeByLen(tsLen);
if (unlikely(fromPrecision == -1)) { if (unlikely(fromPrecision == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf, smlBuildInvalidDataMsg(&info->msgBuf,
@ -357,39 +352,20 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseColsFromJSON(cJSON *root, SSmlKv *kv) { static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) {
cJSON *metricVal = cJSON_GetObjectItem(root, "value");
if (metricVal == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
int32_t ret = smlParseValueFromJSON(metricVal, kv);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
cJSON *tags = cJSON_GetObjectItem(root, "tags");
if (unlikely(tags == NULL || tags->type != cJSON_Object)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
// add measure to tags to identify one child table // add measure to tags to identify one child table
cJSON *cMeasure = cJSON_AddStringToObject(tags, JSON_METERS_NAME, elements->measure); // cJSON *cMeasure = cJSON_AddStringToObject(tags, JSON_METERS_NAME, elements->measure);
if(unlikely(cMeasure == NULL)){ // if(unlikely(cMeasure == NULL)){
return TSDB_CODE_TSC_INVALID_JSON; // return TSDB_CODE_TSC_INVALID_JSON;
} // }
elements->tags = (char*)tags; elements->tags = (char*)tags;
if(is_same_child_table_json(elements->tags, info->preLine.tags) == 0){ if(is_same_child_table_json(elements->tags, info->preLine.tags) == 0){
cJSON_DeleteItemFromObjectCaseSensitive(tags, JSON_METERS_NAME); // cJSON_DeleteItemFromObjectCaseSensitive(tags, JSON_METERS_NAME);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
cJSON_DeleteItemFromObjectCaseSensitive(tags, JSON_METERS_NAME); // cJSON_DeleteItemFromObjectCaseSensitive(tags, JSON_METERS_NAME);
bool isSameMeasure = IS_SAME_SUPER_TABLE; bool isSameMeasure = IS_SAME_SUPER_TABLE;
@ -529,40 +505,62 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo
return ret; return ret;
} }
const char *jsonName[OTD_JSON_FIELDS_NUM] = {"metric", "timestamp", "value", "tags"};
static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks){
cJSON *child = root->child;
for (int i = 0; i < OTD_JSON_FIELDS_NUM; ++i) {
while(child != NULL)
{
if(strcasecmp(child->string, jsonName[i]) == 0){
*marks[i] = child;
break;
}
child = child->next;
}
if(*marks[i] == NULL){
uError("smlGetJsonElements error, not find mark:%s", jsonName[i]);
return -1;
}
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) { static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
int32_t size = cJSON_GetArraySize(root); cJSON *metricJson = NULL;
// outmost json fields has to be exactly 4 cJSON *tsJson = NULL;
if (unlikely(size != OTD_JSON_FIELDS_NUM)) { cJSON *valueJson = NULL;
uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size); cJSON *tagsJson = NULL;
return TSDB_CODE_TSC_INVALID_JSON;
cJSON **marks[OTD_JSON_FIELDS_NUM] = {&metricJson, &tsJson, &valueJson, &tagsJson};
ret = smlGetJsonElements(root, marks);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
} }
// Parse metric // Parse metric
ret = smlParseMetricFromJSON(info, root, elements); ret = smlParseMetricFromJSON(info, metricJson, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) { if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id); uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
return ret; return ret;
} }
uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
// Parse metric value // Parse metric value
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN}; SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
ret = smlParseColsFromJSON(root, &kv); ret = smlParseValueFromJSON(valueJson, &kv);
if (unlikely(ret)) { if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id); uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
return ret; return ret;
} }
uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
// Parse tags // Parse tags
ret = smlParseTagsFromJSON(info, root, elements); ret = smlParseTagsFromJSON(info, tagsJson, elements);
if (unlikely(ret)) { if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id); uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret; return ret;
} }
uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
if(unlikely(info->reRun)){ if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -570,12 +568,11 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *e
// Parse timestamp // Parse timestamp
// notice!!! put ts back to tag to ensure get meta->precision // notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = smlParseTSFromJSON(info, root); int64_t ts = smlParseTSFromJSON(info, tsJson);
if (unlikely(ts < 0)) { if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id); uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP; return TSDB_CODE_INVALID_TIMESTAMP;
} }
uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .type = TSDB_DATA_TYPE_TIMESTAMP, .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .type = TSDB_DATA_TYPE_TIMESTAMP, .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if(info->dataFormat){ if(info->dataFormat){
@ -627,14 +624,16 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
int32_t i = 0; cJSON *head = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : info->root->child;
while (i < payloadNum) {
cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : cJSON_GetArrayItem(info->root, i); int cnt = 0;
cJSON *dataPoint = head;
while (dataPoint) {
if(info->dataFormat) { if(info->dataFormat) {
SSmlLineInfo element = {0}; SSmlLineInfo element = {0};
ret = smlParseJSONString(info, dataPoint, &element); ret = smlParseJSONString(info, dataPoint, &element);
}else{ }else{
ret = smlParseJSONString(info, dataPoint, info->lines + i); ret = smlParseJSONString(info, dataPoint, info->lines + cnt);
} }
if (unlikely(ret != TSDB_CODE_SUCCESS)) { if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id); uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
@ -642,7 +641,8 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
} }
if(unlikely(info->reRun)){ if(unlikely(info->reRun)){
i = 0; cnt = 0;
dataPoint = head;
info->lineNum = payloadNum; info->lineNum = payloadNum;
ret = smlClearForRerun(info); ret = smlClearForRerun(info);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
@ -650,7 +650,66 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
} }
continue; continue;
} }
i++; cnt++;
dataPoint = dataPoint->next;
}
return TSDB_CODE_SUCCESS;
}
int32_t smlParseJSONOld(SSmlHandle *info, char *payload) {
int32_t payloadNum = 0;
int32_t ret = TSDB_CODE_SUCCESS;
if (unlikely(payload == NULL)) {
uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
info->root = cJSON_Parse(payload);
if (unlikely(info->root == NULL)) {
uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
return TSDB_CODE_TSC_INVALID_JSON;
}
// multiple data points must be sent in JSON array
if (cJSON_IsArray(info->root)) {
payloadNum = cJSON_GetArraySize(info->root);
} else if (cJSON_IsObject(info->root)) {
payloadNum = 1;
} else {
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *head = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : info->root->child;
int cnt = 0;
cJSON *dataPoint = head;
while (dataPoint) {
if(info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONString(info, dataPoint, &element);
}else{
ret = smlParseJSONString(info, dataPoint, info->lines + cnt);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
return ret;
}
if(unlikely(info->reRun)){
cnt = 0;
dataPoint = head;
info->lineNum = payloadNum;
ret = smlClearForRerun(info);
if(ret != TSDB_CODE_SUCCESS){
return ret;
}
continue;
}
cnt++;
dataPoint = dataPoint->next;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -622,3 +622,31 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
ASSERT_NE(ret, 0); ASSERT_NE(ret, 0);
smlDestroyInfo(info); smlDestroyInfo(info);
} }
TEST(testCase, smlParseNumber_performance_Test) {
char msg[256] = {0};
SSmlMsgBuf msgBuf;
SSmlKv kv;
char* str[3] = {"2893f64", "2323u32", "93u8"};
for (int i = 0; i < 3; ++i) {
int64_t t1 = taosGetTimestampUs();
for (int j = 0; j < 10000000; ++j) {
kv.value = str[i];
kv.length = strlen(str[i]);
smlParseNumber(&kv, &msgBuf);
}
printf("smlParseNumber:%s cost:%" PRId64, str[i], taosGetTimestampUs() - t1);
printf("\n");
int64_t t2 = taosGetTimestampUs();
for (int j = 0; j < 10000000; ++j) {
kv.value = str[i];
kv.length = strlen(str[i]);
smlParseNumberOld(&kv, &msgBuf);
}
printf("smlParseNumberOld:%s cost:%" PRId64, str[i], taosGetTimestampUs() - t2);
printf("\n\n");
}
}