Merge pull request #28591 from taosdata/3.0

3.0
This commit is contained in:
dongming chen 2024-10-31 16:36:42 +08:00 committed by GitHub
commit 9d04ddf727
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 1005 additions and 1560 deletions

View File

@ -92,6 +92,26 @@ extern "C" {
} \
}
#define SML_CHECK_CODE(CMD) \
code = (CMD); \
if (TSDB_CODE_SUCCESS != code) { \
lino = __LINE__; \
goto END; \
}
#define SML_CHECK_NULL(CMD) \
if (NULL == (CMD)) { \
code = terrno; \
lino = __LINE__; \
goto END; \
}
#define RETURN \
if (code != 0){ \
uError("%s failed code:%d line:%d", __FUNCTION__ , code, lino); \
} \
return code;
typedef enum {
SCHEMA_ACTION_NULL,
SCHEMA_ACTION_CREATE_STABLE,
@ -191,7 +211,6 @@ typedef struct {
cJSON *root; // for parse json
int8_t offset[OTD_JSON_FIELDS_NUM];
SSmlLineInfo *lines; // element is SSmlLineInfo
bool parseJsonByLib;
SArray *tagJsonArray;
SArray *valueJsonArray;
@ -211,13 +230,8 @@ typedef struct {
extern int64_t smlFactorNS[];
extern int64_t smlFactorS[];
typedef int32_t (*_equal_fn_sml)(const void *, const void *);
int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle);
void smlDestroyInfo(SSmlHandle *info);
int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset);
int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset);
bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg);
void smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2);
int32_t smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg);
int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, uint8_t toPrecision);
@ -237,7 +251,7 @@ void smlDestroyTableInfo(void *para);
void freeSSmlKv(void* data);
int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseJSON(SSmlHandle *info, char *payload);
int32_t smlParseJSONExt(SSmlHandle *info, char *payload);
int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSmlSTableMeta** sMeta);
bool isSmlTagAligned(SSmlHandle *info, int cnt, SSmlKv *kv);
@ -246,7 +260,8 @@ int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements);
int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements);
int32_t smlJoinMeasureTag(SSmlLineInfo *elements);
void smlBuildTsKv(SSmlKv *kv, int64_t ts);
int32_t smlParseEndTelnetJson(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv);
int32_t smlParseEndTelnetJsonFormat(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv);
int32_t smlParseEndTelnetJsonUnFormat(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv);
int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs);
static inline bool smlDoubleToInt64OverFlow(double num) {

File diff suppressed because it is too large Load Diff

View File

@ -21,259 +21,10 @@
#define OTD_JSON_SUB_FIELDS_NUM 2
#define JUMP_JSON_SPACE(start) \
while (*(start)) { \
if (unlikely(*(start) > 32)) \
break; \
else \
(start)++; \
}
static int32_t smlJsonGetObj(char **payload) {
int leftBracketCnt = 0;
bool isInQuote = false;
while (**payload) {
if (**payload == '"' && *((*payload) - 1) != '\\') {
isInQuote = !isInQuote;
} else if (!isInQuote && unlikely(**payload == '{')) {
leftBracketCnt++;
(*payload)++;
continue;
} else if (!isInQuote && unlikely(**payload == '}')) {
leftBracketCnt--;
(*payload)++;
if (leftBracketCnt == 0) {
return 0;
} else if (leftBracketCnt < 0) {
return -1;
}
continue;
}
(*payload)++;
}
return -1;
}
int smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset) {
int index = 0;
while (*(*start)) {
if ((*start)[0] != '"') {
(*start)++;
continue;
}
if (unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start);
return TSDB_CODE_TSC_INVALID_JSON;
}
char *sTmp = *start;
if ((*start)[1] == 'm' && (*start)[2] == 'e' && (*start)[3] == 't' && (*start)[4] == 'r' && (*start)[5] == 'i' &&
(*start)[6] == 'c' && (*start)[7] == '"') {
(*start) += 8;
bool isInQuote = false;
while (*(*start)) {
if (unlikely(!isInQuote && *(*start) == '"')) {
(*start)++;
offset[index++] = *start - sTmp;
element->measure = (*start);
isInQuote = true;
continue;
}
if (unlikely(isInQuote && *(*start) == '"')) {
element->measureLen = (*start) - element->measure;
(*start)++;
break;
}
(*start)++;
}
} else if ((*start)[1] == 't' && (*start)[2] == 'i' && (*start)[3] == 'm' && (*start)[4] == 'e' &&
(*start)[5] == 's' && (*start)[6] == 't' && (*start)[7] == 'a' && (*start)[8] == 'm' &&
(*start)[9] == 'p' && (*start)[10] == '"') {
(*start) += 11;
bool hasColon = false;
while (*(*start)) {
if (unlikely(!hasColon && *(*start) == ':')) {
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->timestamp = (*start);
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->timestampLen = tmp - (*start);
*start = tmp;
}
break;
}
hasColon = true;
continue;
}
if (unlikely(hasColon && (*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32))) {
element->timestampLen = (*start) - element->timestamp;
break;
}
(*start)++;
}
} else if ((*start)[1] == 'v' && (*start)[2] == 'a' && (*start)[3] == 'l' && (*start)[4] == 'u' &&
(*start)[5] == 'e' && (*start)[6] == '"') {
(*start) += 7;
bool hasColon = false;
while (*(*start)) {
if (unlikely(!hasColon && *(*start) == ':')) {
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->cols = (*start);
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->colsLen = tmp - (*start);
*start = tmp;
}
break;
}
hasColon = true;
continue;
}
if (unlikely(hasColon && (*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32))) {
element->colsLen = (*start) - element->cols;
break;
}
(*start)++;
}
} else if ((*start)[1] == 't' && (*start)[2] == 'a' && (*start)[3] == 'g' && (*start)[4] == 's' &&
(*start)[5] == '"') {
(*start) += 6;
while (*(*start)) {
if (unlikely(*(*start) == ':')) {
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->tags = (*start);
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->tagsLen = tmp - (*start);
*start = tmp;
}
break;
}
(*start)++;
}
}
if (*(*start) == '\0') {
break;
}
if (*(*start) == '}') {
(*start)++;
break;
}
(*start)++;
}
if (unlikely(index != OTD_JSON_FIELDS_NUM) || element->tags == NULL || element->cols == NULL ||
element->measure == NULL || element->timestamp == NULL) {
uError("elements != %d or element parse null", OTD_JSON_FIELDS_NUM);
return TSDB_CODE_TSC_INVALID_JSON;
}
return 0;
}
int smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset) {
int index = 0;
while (*(*start)) {
if ((*start)[0] != '"') {
(*start)++;
continue;
}
if (unlikely(index >= OTD_JSON_FIELDS_NUM)) {
uError("index >= %d, %s", OTD_JSON_FIELDS_NUM, *start);
return TSDB_CODE_TSC_INVALID_JSON;
}
if ((*start)[1] == 'm') {
(*start) += offset[index++];
element->measure = *start;
while (*(*start)) {
if (unlikely(*(*start) == '"')) {
element->measureLen = (*start) - element->measure;
(*start)++;
break;
}
(*start)++;
}
} else if ((*start)[1] == 't' && (*start)[2] == 'i') {
(*start) += offset[index++];
element->timestamp = *start;
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->timestampLen = tmp - (*start);
*start = tmp;
}
} else {
while (*(*start)) {
if (unlikely(*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32)) {
element->timestampLen = (*start) - element->timestamp;
break;
}
(*start)++;
}
}
} else if ((*start)[1] == 'v') {
(*start) += offset[index++];
element->cols = *start;
if (*(*start) == '{') {
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->colsLen = tmp - (*start);
*start = tmp;
}
} else {
while (*(*start)) {
if (unlikely(*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32)) {
element->colsLen = (*start) - element->cols;
break;
}
(*start)++;
}
}
} else if ((*start)[1] == 't' && (*start)[2] == 'a') {
(*start) += offset[index++];
element->tags = (*start);
char *tmp = *start;
int32_t code = smlJsonGetObj(&tmp);
if (code == 0) {
element->tagsLen = tmp - (*start);
*start = tmp;
}
}
if (*(*start) == '}') {
(*start)++;
break;
}
(*start)++;
}
if (unlikely(index != 0 && index != OTD_JSON_FIELDS_NUM)) {
uError("elements != %d", OTD_JSON_FIELDS_NUM);
return TSDB_CODE_TSC_INVALID_JSON;
}
return TSDB_CODE_SUCCESS;
}
static inline int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *metric, SSmlLineInfo *elements) {
elements->measureLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
uError("OTD:0x%" PRIx64 " Metric length is 0 or large than 192", info->id);
uError("SML:0x%" PRIx64 " Metric length is 0 or large than 192", info->id);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
@ -293,7 +44,7 @@ static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks) {
child = child->next;
}
if (*marks[i] == NULL) {
uError("smlGetJsonElements error, not find mark:%d:%s", i, jsonName[i]);
uError("SML %s error, not find mark:%d:%s", __FUNCTION__, i, jsonName[i]);
return TSDB_CODE_TSC_INVALID_JSON;
}
}
@ -302,7 +53,7 @@ static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks) {
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
if (strcasecmp(typeStr, "bool") != 0) {
uError("OTD:invalid type(%s) for JSON Bool", typeStr);
uError("SML:invalid type(%s) for JSON Bool", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
pVal->type = TSDB_DATA_TYPE_BOOL;
@ -316,7 +67,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
// tinyint
if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
if (!IS_VALID_TINYINT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
uError("SML:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_TINYINT;
@ -327,7 +78,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
// smallint
if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
if (!IS_VALID_SMALLINT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
uError("SML:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_SMALLINT;
@ -338,7 +89,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
// int
if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
if (!IS_VALID_INT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
uError("SML:JSON value(%f) cannot fit in type(int)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_INT;
@ -362,7 +113,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
// float
if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
if (!IS_VALID_FLOAT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
uError("SML:JSON value(%f) cannot fit in type(float)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_FLOAT;
@ -379,7 +130,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
}
// if reach here means type is unsupported
uError("OTD:invalid type(%s) for JSON Number", typeStr);
uError("SML:invalid type(%s) for JSON Number", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
@ -391,7 +142,7 @@ static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
} else if (strcasecmp(typeStr, "nchar") == 0) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
} else {
uError("OTD:invalid type(%s) for JSON String", typeStr);
uError("SML:invalid type(%s) for JSON String", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
pVal->length = strlen(value->valuestring);
@ -474,7 +225,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
case cJSON_String: {
int32_t ret = smlConvertJSONString(kv, "binary", root);
if (ret != TSDB_CODE_SUCCESS) {
uError("OTD:Failed to parse binary value from JSON Obj");
uError("SML:Failed to parse binary value from JSON Obj");
return ret;
}
break;
@ -482,7 +233,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
case cJSON_Object: {
int32_t ret = smlParseValueFromJSONObj(root, kv);
if (ret != TSDB_CODE_SUCCESS) {
uError("OTD:Failed to parse value from JSON Obj");
uError("SML:Failed to parse value from JSON Obj");
return ret;
}
break;
@ -511,7 +262,7 @@ static int32_t smlProcessTagJson(SSmlHandle *info, cJSON *tags){
}
size_t keyLen = strlen(tag->string);
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
uError("OTD:Tag key length is 0 or too large than 64");
uError("SML:Tag key length is 0 or too large than 64");
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
@ -539,28 +290,24 @@ static int32_t smlProcessTagJson(SSmlHandle *info, cJSON *tags){
}
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) {
int32_t ret = 0;
if (is_same_child_table_telnet(elements, &info->preLine) == 0) {
elements->measureTag = info->preLine.measureTag;
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;
int32_t lino = 0;
if(info->dataFormat){
ret = smlProcessSuperTable(info, elements);
if(ret != 0){
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
}
ret = smlProcessTagJson(info, tags);
if(ret != 0){
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
ret = smlJoinMeasureTag(elements);
if(ret != 0){
return ret;
SML_CHECK_CODE(smlProcessSuperTable(info, elements));
}
SML_CHECK_CODE(smlProcessTagJson(info, tags));
SML_CHECK_CODE(smlJoinMeasureTag(elements));
return smlProcessChildTable(info, elements);
END:
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
RETURN
}
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
@ -678,7 +425,8 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
}
static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
cJSON *metricJson = NULL;
cJSON *tsJson = NULL;
@ -688,57 +436,27 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
int32_t size = cJSON_GetArraySize(root);
// outmost json fields has to be exactly 4
if (size != OTD_JSON_FIELDS_NUM) {
uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
uError("SML:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON **marks[OTD_JSON_FIELDS_NUM] = {&metricJson, &tsJson, &valueJson, &tagsJson};
ret = smlGetJsonElements(root, marks);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
SML_CHECK_CODE(smlGetJsonElements(root, marks));
// Parse metric
ret = smlParseMetricFromJSON(info, metricJson, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
return ret;
}
SML_CHECK_CODE(smlParseMetricFromJSON(info, metricJson, elements));
// Parse metric value
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
ret = smlParseValueFromJSON(valueJson, &kv);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
return ret;
}
SML_CHECK_CODE(smlParseValueFromJSON(valueJson, &kv));
// Parse tags
bool needFree = info->dataFormat;
elements->tags = cJSON_PrintUnformatted(tagsJson);
if (elements->tags == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
elements->tagsLen = strlen(elements->tags);
if (is_same_child_table_telnet(elements, &info->preLine) != 0) {
ret = smlParseTagsFromJSON(info, tagsJson, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
taosMemoryFree(elements->tags);
elements->tags = NULL;
return ret;
}
} else {
elements->measureTag = info->preLine.measureTag;
}
SML_CHECK_NULL(elements->tags);
if (needFree) {
taosMemoryFree(elements->tags);
elements->tags = NULL;
}
elements->tagsLen = strlen(elements->tags);
SML_CHECK_CODE(smlParseTagsFromJSON(info, tagsJson, elements));
if (unlikely(info->reRun)) {
return TSDB_CODE_SUCCESS;
goto END;
}
// Parse timestamp
@ -747,29 +465,34 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
if (unlikely(ts < 0)) {
char* tmp = cJSON_PrintUnformatted(tsJson);
if (tmp == NULL) {
uError("cJSON_PrintUnformatted failed since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %" PRId64, info->id, info->msgBuf.buf, ts);
uError("SML:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %" PRId64, info->id, info->msgBuf.buf, ts);
} else {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %s %" PRId64, info->id, info->msgBuf.buf,tmp, ts);
uError("SML:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %s %" PRId64, info->id, info->msgBuf.buf,tmp, ts);
taosMemoryFree(tmp);
}
return TSDB_CODE_INVALID_TIMESTAMP;
SML_CHECK_CODE(TSDB_CODE_INVALID_TIMESTAMP);
}
SSmlKv kvTs = {0};
smlBuildTsKv(&kvTs, ts);
if (info->dataFormat){
code = smlParseEndTelnetJsonFormat(info, elements, &kvTs, &kv);
} else {
code = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv);
}
SML_CHECK_CODE(code);
taosMemoryFreeClear(info->preLine.tags);
info->preLine = *elements;
elements->tags = NULL;
return smlParseEndTelnetJson(info, elements, &kvTs, &kv);
END:
taosMemoryFree(elements->tags);
RETURN
}
static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
int32_t payloadNum = 0;
int32_t ret = TSDB_CODE_SUCCESS;
if (unlikely(payload == NULL)) {
uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
info->root = cJSON_Parse(payload);
if (unlikely(info->root == NULL)) {
uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
@ -782,27 +505,11 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
} else if (cJSON_IsObject(info->root)) {
payloadNum = 1;
} else {
uError("SML:0x%" PRIx64 " Invalid JSON Payload 3:%s", info->id, payload);
uError("SML:0x%" PRIx64 " Invalid JSON type:%s", info->id, payload);
return TSDB_CODE_TSC_INVALID_JSON;
}
if (unlikely(info->lines != NULL)) {
for (int i = 0; i < info->lineNum; i++) {
taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv);
if (info->lines[i].measureTagsLen != 0) taosMemoryFree(info->lines[i].measureTag);
}
taosMemoryFree(info->lines);
info->lines = NULL;
}
info->lineNum = payloadNum;
info->dataFormat = true;
ret = smlClearForRerun(info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
info->parseJsonByLib = true;
cJSON *head = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : info->root->child;
int cnt = 0;
@ -811,6 +518,7 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
if (info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONStringExt(info, dataPoint, &element);
if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
} else {
ret = smlParseJSONStringExt(info, dataPoint, info->lines + cnt);
}
@ -836,164 +544,3 @@ static int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
if (info->offset[0] == 0) {
ret = smlJsonParseObjFirst(start, elements, info->offset);
} else {
ret = smlJsonParseObj(start, elements, info->offset);
}
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (unlikely(**start == '\0' && elements->measure == NULL)) return TSDB_CODE_SUCCESS;
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen))) {
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (unlikely(elements->colsLen == 0)) {
uError("SML:colsLen == 0");
return TSDB_CODE_TSC_INVALID_VALUE;
} else if (unlikely(elements->cols[0] == '{')) {
char tmp = elements->cols[elements->colsLen];
elements->cols[elements->colsLen] = '\0';
cJSON *valueJson = cJSON_Parse(elements->cols);
if (unlikely(valueJson == NULL)) {
uError("SML:0x%" PRIx64 " parse json cols failed:%s", info->id, elements->cols);
elements->cols[elements->colsLen] = tmp;
return TSDB_CODE_TSC_INVALID_JSON;
}
if (taosArrayPush(info->tagJsonArray, &valueJson) == NULL){
cJSON_Delete(valueJson);
elements->cols[elements->colsLen] = tmp;
return terrno;
}
ret = smlParseValueFromJSONObj(valueJson, &kv);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " Failed to parse value from JSON Obj:%s", info->id, elements->cols);
elements->cols[elements->colsLen] = tmp;
return TSDB_CODE_TSC_INVALID_VALUE;
}
elements->cols[elements->colsLen] = tmp;
} else if (smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " cols invalidate:%s", info->id, elements->cols);
return TSDB_CODE_TSC_INVALID_VALUE;
}
// Parse tags
if (is_same_child_table_telnet(elements, &info->preLine) != 0) {
char tmp = *(elements->tags + elements->tagsLen);
*(elements->tags + elements->tagsLen) = 0;
cJSON *tagsJson = cJSON_Parse(elements->tags);
*(elements->tags + elements->tagsLen) = tmp;
if (unlikely(tagsJson == NULL)) {
uError("SML:0x%" PRIx64 " parse json tag failed:%s", info->id, elements->tags);
return TSDB_CODE_TSC_INVALID_JSON;
}
if (taosArrayPush(info->tagJsonArray, &tagsJson) == NULL){
cJSON_Delete(tagsJson);
uError("SML:0x%" PRIx64 " taosArrayPush failed", info->id);
return terrno;
}
ret = smlParseTagsFromJSON(info, tagsJson, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret;
}
} else {
elements->measureTag = info->preLine.measureTag;
}
if (unlikely(info->reRun)) {
return TSDB_CODE_SUCCESS;
}
// Parse timestamp
// notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = 0;
if (unlikely(elements->timestampLen == 0)) {
uError("OTD:0x%" PRIx64 " elements->timestampLen == 0", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
} else if (elements->timestamp[0] == '{') {
char tmp = elements->timestamp[elements->timestampLen];
elements->timestamp[elements->timestampLen] = '\0';
cJSON *tsJson = cJSON_Parse(elements->timestamp);
ts = smlParseTSFromJSON(info, tsJson);
if (unlikely(ts < 0)) {
uError("SML:0x%" PRIx64 " Unable to parse timestamp from JSON payload:%s", info->id, elements->timestamp);
elements->timestamp[elements->timestampLen] = tmp;
cJSON_Delete(tsJson);
return TSDB_CODE_INVALID_TIMESTAMP;
}
elements->timestamp[elements->timestampLen] = tmp;
cJSON_Delete(tsJson);
} else {
ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
}
}
SSmlKv kvTs = {0};
smlBuildTsKv(&kvTs, ts);
return smlParseEndTelnetJson(info, elements, &kvTs, &kv);
}
int32_t smlParseJSON(SSmlHandle *info, char *payload) {
int32_t payloadNum = 1 << 15;
int32_t ret = TSDB_CODE_SUCCESS;
uDebug("SML:0x%" PRIx64 "json:%s", info->id, payload);
int cnt = 0;
char *dataPointStart = payload;
while (1) {
if (info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONString(info, &dataPointStart, &element);
if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
} else {
if (cnt >= payloadNum) {
payloadNum = payloadNum << 1;
void *tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo));
if (tmp == NULL) {
ret = terrno;
return ret;
}
info->lines = (SSmlLineInfo *)tmp;
(void)memset(info->lines + cnt, 0, (payloadNum - cnt) * sizeof(SSmlLineInfo));
}
ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt);
if ((info->lines + cnt)->measure == NULL) break;
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload 1:%s", info->id, payload);
return smlParseJSONExt(info, payload);
}
if (unlikely(info->reRun)) {
cnt = 0;
dataPointStart = payload;
info->lineNum = payloadNum;
ret = smlClearForRerun(info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
continue;
}
cnt++;
if (*dataPointStart == '\0') break;
}
info->lineNum = cnt;
return TSDB_CODE_SUCCESS;
}

View File

@ -63,7 +63,7 @@ static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t le
int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
if (unlikely(ts == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid timestamp", data);
return TSDB_CODE_SML_INVALID_DATA;
}
return ts;
@ -84,7 +84,7 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
}
if (pVal->value[0] == 'l' || pVal->value[0] == 'L') { // nchar
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= 3) {
if (pVal->length >= NCHAR_ADD_LEN && pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"') {
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length -= NCHAR_ADD_LEN;
if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
@ -97,7 +97,7 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
}
if (pVal->value[0] == 'g' || pVal->value[0] == 'G') { // geometry
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= sizeof("POINT")+3) {
if (pVal->length >= NCHAR_ADD_LEN && pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"') {
int32_t code = initCtxGeomFromText();
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -124,7 +124,7 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
}
if (pVal->value[0] == 'b' || pVal->value[0] == 'B') { // varbinary
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= 3) {
if (pVal->length >= NCHAR_ADD_LEN && pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"') {
pVal->type = TSDB_DATA_TYPE_VARBINARY;
if(isHex(pVal->value + NCHAR_ADD_LEN - 1, pVal->length - NCHAR_ADD_LEN)){
if(!isValidateHex(pVal->value + NCHAR_ADD_LEN - 1, pVal->length - NCHAR_ADD_LEN)){
@ -298,7 +298,7 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
}
if (info->dataFormat && !isSmlTagAligned(info, cnt, &kv)) {
return TSDB_CODE_TSC_INVALID_JSON;
return TSDB_CODE_SML_INVALID_DATA;
}
cnt++;
@ -311,31 +311,24 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
}
static int32_t smlParseTagLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *elements) {
int32_t code = 0;
int32_t lino = 0;
bool isSameCTable = IS_SAME_CHILD_TABLE;
if(isSameCTable){
return TSDB_CODE_SUCCESS;
}
int32_t ret = 0;
if(info->dataFormat){
ret = smlProcessSuperTable(info, elements);
if(ret != 0){
SML_CHECK_CODE(smlProcessSuperTable(info, elements));
}
SML_CHECK_CODE(smlProcessTagLine(info, sql, sqlEnd));
return smlProcessChildTable(info, elements);
END:
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
}
ret = smlProcessTagLine(info, sql, sqlEnd);
if(ret != 0){
if (info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
return smlProcessChildTable(info, elements);
RETURN
}
static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement) {
@ -353,7 +346,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
const char *escapeChar = NULL;
while (*sql < sqlEnd) {
if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid data", *sql);
return TSDB_CODE_SML_INVALID_DATA;
}
if (unlikely(IS_EQUAL(*sql,escapeChar))) {
@ -370,7 +363,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
}
if (unlikely(IS_INVALID_COL_LEN(keyLen - keyLenEscaped))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid key or key is too long than 64", key);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
@ -404,18 +397,18 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
valueLen = *sql - value;
if (unlikely(quoteNum != 0 && quoteNum != 2)) {
smlBuildInvalidDataMsg(&info->msgBuf, "unbalanced quotes", value);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line unbalanced quotes", value);
return TSDB_CODE_SML_INVALID_DATA;
}
if (unlikely(valueLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid value", value);
return TSDB_CODE_SML_INVALID_DATA;
}
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
int32_t ret = smlParseValue(&kv, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlParseValue error", value);
uError("SML:0x%" PRIx64 " %s parse value error:%d.", info->id, __FUNCTION__, ret);
return ret;
}
@ -437,11 +430,6 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
}
(void)memcpy(tmp, kv.value, kv.length);
PROCESS_SLASH_IN_FIELD_VALUE(tmp, kv.length);
if(kv.type == TSDB_DATA_TYPE_GEOMETRY) {
uError("SML:0x%" PRIx64 " smlParseColLine error, invalid GEOMETRY type.", info->id);
taosMemoryFree((void*)kv.value);
return TSDB_CODE_TSC_INVALID_VALUE;
}
if(kv.type == TSDB_DATA_TYPE_VARBINARY){
taosMemoryFree((void*)kv.value);
}
@ -510,7 +498,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
}
elements->measureLen = sql - elements->measure;
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen - measureLenEscaped))) {
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line measure is empty or too large than 192", NULL);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
@ -557,7 +545,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
elements->colsLen = sql - elements->cols;
if (unlikely(elements->colsLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL);
smlBuildInvalidDataMsg(&info->msgBuf, "SML line cols is empty", NULL);
return TSDB_CODE_SML_INVALID_DATA;
}
@ -574,7 +562,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
int64_t ts = smlParseInfluxTime(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts <= 0)) {
uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
uError("SML:0x%" PRIx64 " %s error:%" PRId64, info->id, __FUNCTION__, ts);
return TSDB_CODE_INVALID_TIMESTAMP;
}

View File

@ -148,31 +148,21 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
return TSDB_CODE_SUCCESS;
}
int32_t ret = 0;
int32_t code = 0;
int32_t lino = 0;
if(info->dataFormat){
ret = smlProcessSuperTable(info, elements);
if(ret != 0){
SML_CHECK_CODE(smlProcessSuperTable(info, elements));
}
SML_CHECK_CODE(smlProcessTagTelnet(info, data, sqlEnd));
SML_CHECK_CODE(smlJoinMeasureTag(elements));
code = smlProcessChildTable(info, elements);
END:
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
}
ret = smlProcessTagTelnet(info, data, sqlEnd);
if(ret != 0){
if (info->reRun){
return TSDB_CODE_SUCCESS;
}
return ret;
}
ret = smlJoinMeasureTag(elements);
if(ret != 0){
return ret;
}
return smlProcessChildTable(info, elements);
RETURN
}
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
@ -182,14 +172,14 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
// parse metric
smlParseTelnetElement(&sql, sqlEnd, &elements->measure, &elements->measureLen);
if (unlikely((!(elements->measure) || IS_INVALID_TABLE_LEN(elements->measureLen)))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid measure", sql);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
// parse timestamp
smlParseTelnetElement(&sql, sqlEnd, &elements->timestamp, &elements->timestampLen);
if (unlikely(!elements->timestamp || elements->timestampLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid timestamp", sql);
return TSDB_CODE_SML_INVALID_DATA;
}
@ -199,19 +189,21 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
}
int64_t ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts < 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet parse timestamp failed", sql);
return TSDB_CODE_INVALID_TIMESTAMP;
}
// parse value
smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
if (unlikely(!elements->cols || elements->colsLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid value", sql);
return TSDB_CODE_TSC_INVALID_VALUE;
}
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
int ret = smlParseValue(&kv, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
uError("SML:0x%" PRIx64 " %s parse value error:%d.", info->id, __FUNCTION__, ret);
return TSDB_CODE_TSC_INVALID_VALUE;
}
@ -220,11 +212,11 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
elements->tags = sql;
elements->tagsLen = sqlEnd - sql;
if (unlikely(!elements->tags || elements->tagsLen == 0)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid tag value", sql);
return TSDB_CODE_TSC_INVALID_VALUE;
}
int ret = smlParseTelnetTags(info, sql, sqlEnd, elements);
ret = smlParseTelnetTags(info, sql, sqlEnd, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
@ -239,5 +231,12 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
kvTs.i = convertTimePrecision(kvTs.i, TSDB_TIME_PRECISION_NANO, info->currSTableMeta->tableInfo.precision);
}
return smlParseEndTelnetJson(info, elements, &kvTs, &kv);
if (info->dataFormat){
ret = smlParseEndTelnetJsonFormat(info, elements, &kvTs, &kv);
} else {
ret = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv);
}
info->preLine = *elements;
return ret;
}

View File

@ -68,6 +68,15 @@ TEST(testCase, smlParseInfluxString_Test) {
taosArrayDestroy(elements.colArray);
elements.colArray = nullptr;
// case 0 false
tmp = "st,t1=3 c3=\"";
(void)memcpy(sql, tmp, strlen(tmp) + 1);
(void)memset(&elements, 0, sizeof(SSmlLineInfo));
ret = smlParseInfluxString(info, sql, sql + strlen(sql), &elements);
ASSERT_NE(ret, 0);
taosArrayDestroy(elements.colArray);
elements.colArray = nullptr;
// case 2 false
tmp = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
(void)memcpy(sql, tmp, strlen(tmp) + 1);
@ -591,6 +600,104 @@ TEST(testCase, smlParseTelnetLine_Test) {
// smlDestroyInfo(info);
//}
bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->length;
char *endptr = NULL;
double result = taosStr2Double(pVal, &endptr);
if (pVal == endptr) {
smlBuildInvalidDataMsg(msg, "invalid data", pVal);
return false;
}
int32_t left = len - (endptr - pVal);
if (left == 0 || (left == 3 && strncasecmp(endptr, "f64", left) == 0)) {
kvVal->type = TSDB_DATA_TYPE_DOUBLE;
kvVal->d = result;
} else if ((left == 3 && strncasecmp(endptr, "f32", left) == 0)) {
if (!IS_VALID_FLOAT(result)) {
smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_FLOAT;
kvVal->f = (float)result;
} else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)) {
if (smlDoubleToInt64OverFlow(result)) {
errno = 0;
int64_t tmp = taosStr2Int64(pVal, &endptr, 10);
if (errno == ERANGE) {
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_BIGINT;
kvVal->i = tmp;
return true;
}
kvVal->type = TSDB_DATA_TYPE_BIGINT;
kvVal->i = (int64_t)result;
} else if ((left == 1 && *endptr == 'u') || (left == 3 && strncasecmp(endptr, "u64", left) == 0)) {
if (result >= (double)UINT64_MAX || result < 0) {
errno = 0;
uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);
if (errno == ERANGE || result < 0) {
smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_UBIGINT;
kvVal->u = tmp;
return true;
}
kvVal->type = TSDB_DATA_TYPE_UBIGINT;
kvVal->u = result;
} else if (left == 3 && strncasecmp(endptr, "i32", left) == 0) {
if (!IS_VALID_INT(result)) {
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_INT;
kvVal->i = result;
} else if (left == 3 && strncasecmp(endptr, "u32", left) == 0) {
if (!IS_VALID_UINT(result)) {
smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_UINT;
kvVal->u = result;
} else if (left == 3 && strncasecmp(endptr, "i16", left) == 0) {
if (!IS_VALID_SMALLINT(result)) {
smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_SMALLINT;
kvVal->i = result;
} else if (left == 3 && strncasecmp(endptr, "u16", left) == 0) {
if (!IS_VALID_USMALLINT(result)) {
smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_USMALLINT;
kvVal->u = result;
} else if (left == 2 && strncasecmp(endptr, "i8", left) == 0) {
if (!IS_VALID_TINYINT(result)) {
smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_TINYINT;
kvVal->i = result;
} else if (left == 2 && strncasecmp(endptr, "u8", left) == 0) {
if (!IS_VALID_UTINYINT(result)) {
smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);
return false;
}
kvVal->type = TSDB_DATA_TYPE_UTINYINT;
kvVal->u = result;
} else {
smlBuildInvalidDataMsg(msg, "invalid data", pVal);
return false;
}
return true;
}
TEST(testCase, smlParseNumber_performance_Test) {
char msg[256] = {0};
SSmlMsgBuf msgBuf;

View File

@ -38,7 +38,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
SVnodeObj *pVnode = *ppVnode;
if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount);
dTrace("vgId:%d,acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
pVnodes[num++] = (*ppVnode);
pIter = taosHashIterate(pMgmt->hash, pIter);
} else {
@ -52,7 +52,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
SVnodeObj *pVnode = *ppVnode;
if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount);
dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
pVnodes[num++] = (*ppVnode);
pIter = taosHashIterate(pMgmt->closedHash, pIter);
} else {
@ -84,7 +84,7 @@ int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeOb
SVnodeObj *pVnode = *ppVnode;
if (pVnode && num < size) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount);
dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
pVnodes[num++] = (*ppVnode);
pIter = taosHashIterate(pMgmt->hash, pIter);
} else {

View File

@ -103,7 +103,7 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
pVnode = NULL;
} else {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount);
dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
@ -115,16 +115,24 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { return vmAcquireVno
void vmReleaseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
if (pVnode == NULL) return;
(void)taosThreadRwlockRdlock(&pMgmt->lock);
//(void)taosThreadRwlockRdlock(&pMgmt->lock);
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
// dTrace("vgId:%d, release vnode, ref:%d", pVnode->vgId, refCount);
(void)taosThreadRwlockUnlock(&pMgmt->lock);
dTrace("vgId:%d, release vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
//(void)taosThreadRwlockUnlock(&pMgmt->lock);
}
static void vmFreeVnodeObj(SVnodeObj **ppVnode) {
if (!ppVnode || !(*ppVnode)) return;
SVnodeObj *pVnode = *ppVnode;
int32_t refCount = atomic_load_32(&pVnode->refCount);
while (refCount > 0) {
dWarn("vgId:%d, vnode is refenced, retry to free in 200ms, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
taosMsleep(200);
refCount = atomic_load_32(&pVnode->refCount);
}
taosMemoryFree(pVnode->path);
taosMemoryFree(pVnode);
ppVnode[0] = NULL;

View File

@ -30,10 +30,19 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
dmGetMnodeEpSetForRedirect(&pDnode->data, pMsg, &epSet);
if (epSet.numOfEps <= 1) {
if (epSet.numOfEps == 0) {
pMsg->pCont = NULL;
pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
return;
}
// dnode is not the mnode or mnode leader and This ensures that the function correctly handles cases where the
// dnode cannot obtain a valid epSet and avoids returning an incorrect or misleading epSet.
if (strcmp(epSet.eps[0].fqdn, tsLocalFqdn) == 0 && epSet.eps[0].port == tsServerPort) {
pMsg->pCont = NULL;
pMsg->code = TSDB_CODE_MNODE_NOT_FOUND;
return;
}
}
int32_t contLen = tSerializeSEpSet(NULL, 0, &epSet);
pMsg->pCont = rpcMallocCont(contLen);

View File

@ -2018,7 +2018,12 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
cleanupAfterGroupResultGen(pMiaInfo, pRes);
code = doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
QUERY_CHECK_CODE(code, lino, _end);
if (pRes->info.rows == 0) {
// After filtering for last group, the result is empty, so we need to continue to process next group
continue;
} else {
break;
}
} else {
// continue
pRes->info.id.groupId = pMiaInfo->groupId;

View File

@ -468,35 +468,38 @@ end:
int32_t smlInitHandle(SQuery** query) {
*query = NULL;
SQuery* pQuery = NULL;
SVnodeModifyOpStmt* stmt = NULL;
int32_t code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
if (NULL == pQuery) {
uError("create pQuery error");
return code;
if (code != 0) {
uError("SML create pQuery error");
goto END;
}
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
pQuery->haveResultSet = false;
pQuery->msgType = TDMT_VND_SUBMIT;
SVnodeModifyOpStmt* stmt = NULL;
code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&stmt);
if (NULL == stmt) {
uError("create SVnodeModifyOpStmt error");
qDestroyQuery(pQuery);
return code;
if (code != 0) {
uError("SML create SVnodeModifyOpStmt error");
goto END;
}
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (stmt->pTableBlockHashObj == NULL){
uError("create pTableBlockHashObj error");
qDestroyQuery(pQuery);
nodesDestroyNode((SNode*)stmt);
return terrno;
uError("SML create pTableBlockHashObj error");
code = terrno;
goto END;
}
stmt->freeHashFunc = insDestroyTableDataCxtHashMap;
stmt->freeArrayFunc = insDestroyVgroupDataCxtList;
pQuery->pRoot = (SNode*)stmt;
*query = pQuery;
return code;
return TSDB_CODE_SUCCESS;
END:
nodesDestroyNode((SNode*)stmt);
qDestroyQuery(pQuery);
return code;
}
int32_t smlBuildOutput(SQuery* handle, SHashObj* pVgHash) {

View File

@ -3757,7 +3757,7 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
bool partionByTbname = hasTbnameFunction(pSelect->pPartitionByList);
FOREACH(pPartKey, pSelect->pPartitionByList) {
if (nodesEqualNode(pPartKey, *pNode)) {
return pCxt->currClause == SQL_CLAUSE_HAVING ? DEAL_RES_IGNORE_CHILD : rewriteExprToGroupKeyFunc(pCxt, pNode);
return pSelect->hasAggFuncs ? rewriteExprToGroupKeyFunc(pCxt, pNode) : DEAL_RES_IGNORE_CHILD;
}
if ((partionByTbname) && QUERY_NODE_COLUMN == nodeType(*pNode) &&
((SColumnNode*)*pNode)->colType == COLUMN_TYPE_TAG) {

View File

@ -664,7 +664,7 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy
// set status
if (pWal->vers.firstVer == -1) {
pWal->vers.firstVer = 0;
pWal->vers.firstVer = index;
}
pWal->vers.lastVer = index;
pWal->totSize += sizeof(SWalCkHead) + cyptedBodyLen;

View File

@ -344,9 +344,72 @@ class TDTestCase:
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(55)
# TODO Fix Me!
sql = "explain SELECT count(*), timediff(_wend, last(ts)), timediff('2018-09-20 01:00:00', _wstart) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY concat(tbname, 'asd') INTERVAL(5m) having(concat(tbname, 'asd') like '%asd');"
tdSql.error(sql, -2147473664) # Error: Planner internal error
sql = "SELECT count(*), timediff(_wend, last(ts)), timediff('2018-09-20 01:00:00', _wstart) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY concat(tbname, 'asd') INTERVAL(5m) having(concat(tbname, 'asd') like '%asd');"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(60)
sql = "SELECT count(*), timediff(_wend, last(ts)), timediff('2018-09-20 01:00:00', _wstart) FROM meters WHERE ts >= '2018-09-20 00:00:00.000' AND ts < '2018-09-20 01:00:00.000' PARTITION BY concat(tbname, 'asd') INTERVAL(5m) having(concat(tbname, 'asd') like 'asd%');"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(0)
sql = "SELECT c1 FROM meters PARTITION BY c1 HAVING c1 > 0 slimit 2 limit 10"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(20)
sql = "SELECT t1 FROM meters PARTITION BY t1 HAVING(t1 = 1)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(20000)
sql = "SELECT concat(t2, 'asd') FROM meters PARTITION BY t2 HAVING(t2 like '%5')"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(10000)
tdSql.checkData(0, 0, 'tb5asd')
sql = "SELECT concat(t2, 'asd') FROM meters PARTITION BY concat(t2, 'asd') HAVING(concat(t2, 'asd')like '%5%')"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(10000)
tdSql.checkData(0, 0, 'tb5asd')
sql = "SELECT avg(c1) FROM meters PARTITION BY tbname, t1 HAVING(t1 = 1)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(2)
sql = "SELECT count(*) FROM meters PARTITION BY concat(tbname, 'asd') HAVING(concat(tbname, 'asd') like '%asd')"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(10)
sql = "SELECT count(*), concat(tbname, 'asd') FROM meters PARTITION BY concat(tbname, 'asd') HAVING(concat(tbname, 'asd') like '%asd')"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(10)
sql = "SELECT count(*) FROM meters PARTITION BY t1 HAVING(t1 < 4) order by t1 +1"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(4)
sql = "SELECT count(*), t1 + 100 FROM meters PARTITION BY t1 HAVING(t1 < 4) order by t1 +1"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(4)
sql = "SELECT count(*), t1 + 100 FROM meters PARTITION BY t1 INTERVAL(1d) HAVING(t1 < 4) order by t1 +1 desc"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(280)
sql = "SELECT count(*), concat(t3, 'asd') FROM meters PARTITION BY concat(t3, 'asd') INTERVAL(1d) HAVING(concat(t3, 'asd') like '%5asd' and count(*) = 118)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(1)
sql = "SELECT count(*), concat(t3, 'asd') FROM meters PARTITION BY concat(t3, 'asd') INTERVAL(1d) HAVING(concat(t3, 'asd') like '%5asd' and count(*) != 118)"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(69)
sql = "SELECT count(*), concat(t3, 'asd') FROM meters PARTITION BY concat(t3, 'asd') INTERVAL(1d) HAVING(concat(t3, 'asd') like '%5asd') order by count(*) asc limit 10"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(10)
sql = "SELECT count(*), concat(t3, 'asd') FROM meters PARTITION BY concat(t3, 'asd') INTERVAL(1d) HAVING(concat(t3, 'asd') like '%5asd' or concat(t3, 'asd') like '%3asd') order by count(*) asc limit 10000"
tdSql.query(sql, queryTimes=1)
tdSql.checkRows(140)
def run(self):
self.prepareTestEnv()

View File

@ -17,8 +17,6 @@ sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'sDebugFlag':143, 'wDebugFlag':143}
def __init__(self):
self.vgroups = 1
self.ctbNum = 10

View File

@ -105,6 +105,113 @@ int smlProcess_telnet_Test() {
return code;
}
int smlProcess_telnet0_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
const char *sql1[] = {"sysif.bytes.out 1479496100 1.3E0 host=web01 interface=eth0"};
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_TELNET_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
ASSERT(code == 0);
taos_free_result(pRes);
const char *sql2[] = {"sysif.bytes.out 1479496700 1.6E0 host=web01 interface=eth0"};
pRes = taos_schemaless_insert(taos, (char **)sql2, sizeof(sql2) / sizeof(sql2[0]), TSDB_SML_TELNET_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
code = taos_errno(pRes);
ASSERT(code == 0);
taos_free_result(pRes);
const char *sql3[] = {"sysif.bytes.out 1479496300 1.1E0 interface=eth0 host=web01"};
pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_TELNET_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
code = taos_errno(pRes);
ASSERT(code == 0);
taos_free_result(pRes);
taos_close(taos);
return code;
}
int smlProcess_json0_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
const char *sql[] = {
"[{\"metric\":\"syscpu.nice\",\"timestamp\":1662344045,\"value\":9,\"tags\":{\"host\":\"web02\",\"dc\":4}}]"};
char *sql1[1] = {0};
for (int i = 0; i < 1; i++) {
sql1[i] = taosMemoryCalloc(1, 1024);
ASSERT(sql1[i] != NULL);
(void)strncpy(sql1[i], sql[i], 1023);
}
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
int code = taos_errno(pRes);
if (code != 0) {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
} else {
printf("%s result:success\n", __FUNCTION__);
}
taos_free_result(pRes);
for (int i = 0; i < 1; i++) {
taosMemoryFree(sql1[i]);
}
ASSERT(code == 0);
const char *sql2[] = {
"[{\"metric\":\"syscpu.nice\",\"timestamp\":1662344041,\"value\":13,\"tags\":{\"host\":\"web01\",\"dc\":1}"
"},{\"metric\":\"syscpu.nice\",\"timestamp\":1662344042,\"value\":9,\"tags\":{\"host\":\"web02\",\"dc\":4}"
"}]",
};
char *sql3[1] = {0};
for (int i = 0; i < 1; i++) {
sql3[i] = taosMemoryCalloc(1, 1024);
ASSERT(sql3[i] != NULL);
(void)strncpy(sql3[i], sql2[i], 1023);
}
pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
code = taos_errno(pRes);
if (code != 0) {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
} else {
printf("%s result:success\n", __FUNCTION__);
}
taos_free_result(pRes);
for (int i = 0; i < 1; i++) {
taosMemoryFree(sql3[i]);
}
ASSERT(code == 0);
taos_close(taos);
return code;
}
int smlProcess_json1_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -1775,6 +1882,21 @@ int sml_td24559_Test() {
pRes = taos_query(taos, "create database if not exists td24559");
taos_free_result(pRes);
const char *sql1[] = {
"sttb,t1=1 f1=283i32,f2=g\"\" 1632299372000",
"sttb,t1=1 f2=G\"Point(4.343 89.342)\",f1=106i32 1632299373000",
};
pRes = taos_query(taos, "use td24559");
taos_free_result(pRes);
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
int code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
ASSERT(code);
taos_free_result(pRes);
const char *sql[] = {
"stb,t1=1 f1=283i32,f2=g\"Point(4.343 89.342)\" 1632299372000",
"stb,t1=1 f2=G\"Point(4.343 89.342)\",f1=106i32 1632299373000",
@ -1788,7 +1910,7 @@ int sml_td24559_Test() {
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
int code = taos_errno(pRes);
code = taos_errno(pRes);
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
taos_free_result(pRes);
@ -2136,7 +2258,8 @@ int main(int argc, char *argv[]) {
taos_options(TSDB_OPTION_CONFIGDIR, argv[1]);
}
int ret = 0;
int ret = smlProcess_json0_Test();
ASSERT(!ret);
ret = sml_ts5528_test();
ASSERT(!ret);
ret = sml_td29691_Test();
@ -2173,6 +2296,8 @@ int main(int argc, char *argv[]) {
ASSERT(!ret);
ret = smlProcess_telnet_Test();
ASSERT(!ret);
ret = smlProcess_telnet0_Test();
ASSERT(!ret);
ret = smlProcess_json1_Test();
ASSERT(!ret);
ret = smlProcess_json2_Test();