enh:[TD-32166]remove useless code in sml

This commit is contained in:
wangmm0220 2024-10-22 17:43:30 +08:00
parent 2e8a5348c9
commit 21a7b6dc16
3 changed files with 4 additions and 426 deletions

View File

@ -225,8 +225,6 @@ extern int64_t smlFactorS[];
int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle);
void smlDestroyInfo(SSmlHandle *info);
int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset);
int smlJsonParseObj(char **start, char *end, SSmlLineInfo *element, int8_t *offset);
void smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2);
int32_t smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg);
int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision);
@ -246,7 +244,7 @@ void smlDestroyTableInfo(void *para);
void freeSSmlKv(void* data);
int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseJSON(SSmlHandle *info, char *payload);
int32_t smlParseJSONExt(SSmlHandle *info, char *payload);
int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSmlSTableMeta** sMeta);
bool isSmlTagAligned(SSmlHandle *info, int cnt, SSmlKv *kv);

View File

@ -1597,9 +1597,9 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
int32_t code = TSDB_CODE_SUCCESS;
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
if (lines) {
code = smlParseJSON(info, *lines);
code = smlParseJSONExt(info, *lines);
} else if (rawLine) {
code = smlParseJSON(info, rawLine);
code = smlParseJSONExt(info, rawLine);
}
if (code != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);

View File

@ -21,259 +21,6 @@
#define OTD_JSON_SUB_FIELDS_NUM 2
#define JUMP_JSON_SPACE(start) \
while (*(start)) { \
if (unlikely(*(start) > 32)) \
break; \
else \
(start)++; \
}
static int32_t smlJsonGetObj(char **payload) {
int leftBracketCnt = 0;
bool isInQuote = false;
while (**payload) {
if (**payload == '"' && *((*payload) - 1) != '\\') {
isInQuote = !isInQuote;
} else if (!isInQuote && unlikely(**payload == '{')) {
leftBracketCnt++;
(*payload)++;
continue;
} else if (!isInQuote && unlikely(**payload == '}')) {
leftBracketCnt--;
(*payload)++;
if (leftBracketCnt == 0) {
return 0;
} else if (leftBracketCnt < 0) {
return -1;
}
continue;
}
(*payload)++;
}
return -1;
}
int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) {
int index = 0;
while (*(*start)) {
if ((*start)[0] != '"') {
(*start)++;
continue;
}
if (unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start);
return TSDB_CODE_TSC_INVALID_JSON;
}
char *sTmp = *start;
if ((*start)[1] == 'm' && (*start)[2] == 'e' && (*start)[3] == 't' && (*start)[4] == 'r' && (*start)[5] == 'i' &&
(*start)[6] == 'c' && (*start)[7] == '"') {
(*start) += 8;
bool isInQuote = false;
while (*(*start)) {
if (unlikely(!isInQuote && *(*start) == '"')) {
(*start)++;
offset[index++] = *start - sTmp;
element->measure = (*start);
isInQuote = true;
continue;
}
if (unlikely(isInQuote && *(*start) == '"')) {
element->measureLen = (*start) - element->measure;
(*start)++;
break;
}
(*start)++;
}
} else if ((*start)[1] == 't' && (*start)[2] == 'i' && (*start)[3] == 'm' && (*start)[4] == 'e' &&
(*start)[5] == 's' && (*start)[6] == 't' && (*start)[7] == 'a' && (*start)[8] == 'm' &&
(*start)[9] == 'p' && (*start)[10] == '"') {
(*start) += 11;
bool hasColon = false;
while (*(*start)) {
if (unlikely(!hasColon && *(*start) == ':')) {
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->timestamp = (*start);
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->timestampLen = tmp - (*start);
*start = tmp;
}
break;
}
hasColon = true;
continue;
}
if (unlikely(hasColon && (*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32))) {
element->timestampLen = (*start) - element->timestamp;
break;
}
(*start)++;
}
} else if ((*start)[1] == 'v' && (*start)[2] == 'a' && (*start)[3] == 'l' && (*start)[4] == 'u' &&
(*start)[5] == 'e' && (*start)[6] == '"') {
(*start) += 7;
bool hasColon = false;
while (*(*start)) {
if (unlikely(!hasColon && *(*start) == ':')) {
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->cols = (*start);
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->colsLen = tmp - (*start);
*start = tmp;
}
break;
}
hasColon = true;
continue;
}
if (unlikely(hasColon && (*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32))) {
element->colsLen = (*start) - element->cols;
break;
}
(*start)++;
}
} else if ((*start)[1] == 't' && (*start)[2] == 'a' && (*start)[3] == 'g' && (*start)[4] == 's' &&
(*start)[5] == '"') {
(*start) += 6;
while (*(*start)) {
if (unlikely(*(*start) == ':')) {
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->tags = (*start);
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->tagsLen = tmp - (*start);
*start = tmp;
}
break;
}
(*start)++;
}
}
if (*(*start) == '\0') {
break;
}
if (*(*start) == '}') {
(*start)++;
break;
}
(*start)++;
}
if (unlikely(index != OTD_JSON_FIELDS_NUM) || element->tags == NULL || element->cols == NULL ||
element->measure == NULL || element->timestamp == NULL) {
uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM);
return TSDB_CODE_TSC_INVALID_JSON;
}
return 0;
}
int smlJsonParseObj(char **start, char *end, SSmlLineInfo *element, int8_t *offset) {
int index = 0;
while (*(*start)) {
if ((*start)[0] != '"') {
(*start)++;
continue;
}
if (unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start);
return TSDB_CODE_TSC_INVALID_JSON;
}
if ((*start) + offset[index] >= end){
return TSDB_CODE_TSC_INVALID_JSON;
}
if ((*start)[1] == 'm') {
(*start) += offset[index++];
element->measure = *start;
while (*(*start)) {
if (unlikely(*(*start) == '"')) {
element->measureLen = (*start) - element->measure;
(*start)++;
break;
}
(*start)++;
}
} else if ((*start)[1] == 't' && (*start)[2] == 'i') {
(*start) += offset[index++];
element->timestamp = *start;
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->timestampLen = tmp - (*start);
*start = tmp;
}
} else {
while (*(*start)) {
if (unlikely(*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32)) {
element->timestampLen = (*start) - element->timestamp;
break;
}
(*start)++;
}
}
} else if ((*start)[1] == 'v') {
(*start) += offset[index++];
element->cols = *start;
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->colsLen = tmp - (*start);
*start = tmp;
}
} else {
while (*(*start)) {
if (unlikely(*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32)) {
element->colsLen = (*start) - element->cols;
break;
}
(*start)++;
}
}
} else if ((*start)[1] == 't' && (*start)[2] == 'a') {
(*start) += offset[index++];
element->tags = (*start);
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->tagsLen = tmp - (*start);
*start = tmp;
}
}
if (*(*start) == '}') {
(*start)++;
break;
}
(*start)++;
}
if (unlikely(index != 0 && index != OTD_JSON_FIELDS_NUM)) {
uError("elements != %d", OTD_JSON_FIELDS_NUM);
return TSDB_CODE_TSC_INVALID_JSON;
}
return TSDB_CODE_SUCCESS;
}
static inline int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *metric, SSmlLineInfo *elements) {
elements->measureLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
@ -760,7 +507,7 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
return ret;
}
static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
int32_t payloadNum = 0;
int32_t ret = TSDB_CODE_SUCCESS;
@ -830,170 +577,3 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseJSONString(SSmlHandle *info, char **start, char *end, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
if (info->offset[0] == 0) {
ret = smlJsonParseObjFirst(start, elements, info->offset);
} else {
ret = smlJsonParseObj(start, end, elements, info->offset);
}
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (unlikely(**start == '\0' && elements->measure == NULL)) return TSDB_CODE_SUCCESS;
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen))) {
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (unlikely(elements->colsLen == 0)) {
uError("SML:colsLen == 0");
return TSDB_CODE_TSC_INVALID_VALUE;
} else if (unlikely(elements->cols[0] == '{')) {
char tmp = elements->cols[elements->colsLen];
elements->cols[elements->colsLen] = '\0';
cJSON *valueJson = cJSON_Parse(elements->cols);
if (unlikely(valueJson == NULL)) {
uError("SML:0x%" PRIx64 " parse json cols failed:%s", info->id, elements->cols);
elements->cols[elements->colsLen] = tmp;
return TSDB_CODE_TSC_INVALID_JSON;
}
if (taosArrayPush(info->valueJsonArray, &valueJson) == NULL){
cJSON_Delete(valueJson);
elements->cols[elements->colsLen] = tmp;
return terrno;
}
ret = smlParseValueFromJSONObj(valueJson, &kv);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " Failed to parse value from JSON Obj:%s", info->id, elements->cols);
elements->cols[elements->colsLen] = tmp;
return TSDB_CODE_TSC_INVALID_VALUE;
}
elements->cols[elements->colsLen] = tmp;
} else if (smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " cols invalidate:%s", info->id, elements->cols);
return TSDB_CODE_TSC_INVALID_VALUE;
}
// Parse tags
if (is_same_child_table_telnet(elements, &info->preLine) != 0) {
char tmp = *(elements->tags + elements->tagsLen);
*(elements->tags + elements->tagsLen) = 0;
cJSON *tagsJson = cJSON_Parse(elements->tags);
*(elements->tags + elements->tagsLen) = tmp;
if (unlikely(tagsJson == NULL)) {
uError("SML:0x%" PRIx64 " parse json tag failed:%s", info->id, elements->tags);
return TSDB_CODE_TSC_INVALID_JSON;
}
if (taosArrayPush(info->tagJsonArray, &tagsJson) == NULL){
cJSON_Delete(tagsJson);
uError("SML:0x%" PRIx64 " taosArrayPush failed", info->id);
return terrno;
}
ret = smlParseTagsFromJSON(info, tagsJson, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret;
}
} else {
elements->measureTag = info->preLine.measureTag;
}
if (unlikely(info->reRun)) {
return TSDB_CODE_SUCCESS;
}
// Parse timestamp
// notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = 0;
if (unlikely(elements->timestampLen == 0)) {
uError("OTD:0x%" PRIx64 " elements->timestampLen == 0", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
} else if (elements->timestamp[0] == '{') {
char tmp = elements->timestamp[elements->timestampLen];
elements->timestamp[elements->timestampLen] = '\0';
cJSON *tsJson = cJSON_Parse(elements->timestamp);
ts = smlParseTSFromJSON(info, tsJson);
if (unlikely(ts < 0)) {
uError("SML:0x%" PRIx64 " Unable to parse timestamp from JSON payload:%s", info->id, elements->timestamp);
elements->timestamp[elements->timestampLen] = tmp;
cJSON_Delete(tsJson);
return TSDB_CODE_INVALID_TIMESTAMP;
}
elements->timestamp[elements->timestampLen] = tmp;
cJSON_Delete(tsJson);
} else {
ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
}
}
SSmlKv kvTs = {0};
smlBuildTsKv(&kvTs, ts);
if (info->dataFormat){
ret = smlParseEndTelnetJsonFormat(info, elements, &kvTs, &kv);
} else {
ret = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv);
}
return ret;
}
int32_t smlParseJSON(SSmlHandle *info, char *payload) {
int32_t payloadNum = 1 << 10;
int32_t ret = TSDB_CODE_SUCCESS;
uDebug("SML:0x%" PRIx64 "json:%s", info->id, payload);
int cnt = 0;
char *dataPointStart = payload;
char *dataPointEnd = payload + strlen(payload);
while (1) {
if (info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONString(info, &dataPointStart, dataPointEnd, &element);
if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
} else {
if (cnt >= payloadNum) {
payloadNum = payloadNum << 1;
void *tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo));
if (tmp == NULL) {
ret = terrno;
return ret;
}
info->lines = (SSmlLineInfo *)tmp;
(void)memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
}
ret = smlParseJSONString(info, &dataPointStart, dataPointEnd, info->lines + cnt);
if ((info->lines + cnt)->measure == NULL) break;
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload 1:%s", info->id, payload);
return smlParseJSONExt(info, payload);
}
if (unlikely(info->reRun)) {
cnt = 0;
dataPointStart = payload;
info->lineNum = payloadNum;
ret = smlClearForRerun(info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
continue;
}
cnt++;
if (*dataPointStart == '\0') break;
}
info->lineNum = cnt;
return TSDB_CODE_SUCCESS;
}