opti:escape logic in schemaless
This commit is contained in:
parent
739dc5a7a6
commit
5b3087e48b
|
@ -341,6 +341,8 @@ typedef struct {
|
||||||
float f;
|
float f;
|
||||||
};
|
};
|
||||||
size_t length;
|
size_t length;
|
||||||
|
bool keyEscaped;
|
||||||
|
bool valueEscaped;
|
||||||
} SSmlKv;
|
} SSmlKv;
|
||||||
|
|
||||||
#define QUERY_ASC_FORWARD_STEP 1
|
#define QUERY_ASC_FORWARD_STEP 1
|
||||||
|
|
|
@ -107,6 +107,7 @@ typedef struct {
|
||||||
int32_t colsLen;
|
int32_t colsLen;
|
||||||
int32_t timestampLen;
|
int32_t timestampLen;
|
||||||
|
|
||||||
|
bool measureEscaped;
|
||||||
SArray *colArray;
|
SArray *colArray;
|
||||||
} SSmlLineInfo;
|
} SSmlLineInfo;
|
||||||
|
|
||||||
|
@ -206,6 +207,19 @@ typedef struct {
|
||||||
|
|
||||||
#define IS_SAME_KEY (maxKV->keyLen == kv.keyLen && memcmp(maxKV->key, kv.key, kv.keyLen) == 0)
|
#define IS_SAME_KEY (maxKV->keyLen == kv.keyLen && memcmp(maxKV->key, kv.key, kv.keyLen) == 0)
|
||||||
|
|
||||||
|
#define IS_SLASH_LETTER_IN_MEASUREMENT(sql) \
|
||||||
|
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE))
|
||||||
|
|
||||||
|
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
|
||||||
|
|
||||||
|
#define PROCESS_SLASH_IN_MEASUREMENT(key, keyLen) \
|
||||||
|
for (int i = 1; i < keyLen; ++i) { \
|
||||||
|
if (IS_SLASH_LETTER_IN_MEASUREMENT(key + i)) { \
|
||||||
|
MOVE_FORWARD_ONE(key + i, keyLen - i); \
|
||||||
|
keyLen--; \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
|
||||||
extern int64_t smlFactorNS[3];
|
extern int64_t smlFactorNS[3];
|
||||||
extern int64_t smlFactorS[3];
|
extern int64_t smlFactorS[3];
|
||||||
|
|
||||||
|
@ -237,6 +251,7 @@ uint8_t smlGetTimestampLen(int64_t num);
|
||||||
void clearColValArray(SArray* pCols);
|
void clearColValArray(SArray* pCols);
|
||||||
void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag);
|
void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag);
|
||||||
|
|
||||||
|
void freeSSmlKv(void* data);
|
||||||
int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
|
int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
|
||||||
int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
|
int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
|
||||||
int32_t smlParseJSON(SSmlHandle *info, char *payload);
|
int32_t smlParseJSON(SSmlHandle *info, char *payload);
|
||||||
|
|
|
@ -765,8 +765,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
|
|
||||||
size_t superTableLen = 0;
|
size_t superTableLen = 0;
|
||||||
void *superTable = taosHashGetKey(tmp, &superTableLen);
|
void *superTable = taosHashGetKey(tmp, &superTableLen);
|
||||||
|
char* measure = taosMemoryMalloc(superTableLen);
|
||||||
|
memcpy(measure, superTable, superTableLen);
|
||||||
|
PROCESS_SLASH_IN_MEASUREMENT(measure, superTableLen);
|
||||||
memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
|
memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
|
||||||
memcpy(pName.tname, superTable, superTableLen);
|
memcpy(pName.tname, measure, superTableLen);
|
||||||
|
taosMemoryFree(measure);
|
||||||
|
|
||||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||||
|
|
||||||
|
@ -1049,7 +1053,7 @@ void smlDestroyTableInfo(SSmlHandle *info, SSmlTableInfo *tag) {
|
||||||
// }
|
// }
|
||||||
// taosMemoryFree(tag->key);
|
// taosMemoryFree(tag->key);
|
||||||
taosArrayDestroy(tag->cols);
|
taosArrayDestroy(tag->cols);
|
||||||
taosArrayDestroy(tag->tags);
|
taosArrayDestroyEx(tag->tags, freeSSmlKv);
|
||||||
taosMemoryFree(tag);
|
taosMemoryFree(tag);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1063,6 +1067,12 @@ void clearColValArray(SArray *pCols) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void freeSSmlKv(void* data){
|
||||||
|
SSmlKv *kv = (SSmlKv*)data;
|
||||||
|
if(kv->keyEscaped) taosMemoryFree((void*)(kv->key));
|
||||||
|
if(kv->valueEscaped) taosMemoryFree((void*)(kv->value));
|
||||||
|
}
|
||||||
|
|
||||||
void smlDestroyInfo(SSmlHandle *info) {
|
void smlDestroyInfo(SSmlHandle *info) {
|
||||||
if (!info) return;
|
if (!info) return;
|
||||||
qDestroyQuery(info->pQuery);
|
qDestroyQuery(info->pQuery);
|
||||||
|
@ -1098,11 +1108,11 @@ void smlDestroyInfo(SSmlHandle *info) {
|
||||||
}
|
}
|
||||||
taosArrayDestroy(info->valueJsonArray);
|
taosArrayDestroy(info->valueJsonArray);
|
||||||
|
|
||||||
taosArrayDestroy(info->preLineTagKV);
|
taosArrayDestroyEx(info->preLineTagKV, freeSSmlKv);
|
||||||
|
|
||||||
if (!info->dataFormat) {
|
if (!info->dataFormat) {
|
||||||
for (int i = 0; i < info->lineNum; i++) {
|
for (int i = 0; i < info->lineNum; i++) {
|
||||||
taosArrayDestroy(info->lines[i].colArray);
|
taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv);
|
||||||
if (info->parseJsonByLib) {
|
if (info->parseJsonByLib) {
|
||||||
taosMemoryFree(info->lines[i].tags);
|
taosMemoryFree(info->lines[i].tags);
|
||||||
}
|
}
|
||||||
|
@ -1420,14 +1430,14 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
|
||||||
|
|
||||||
char cTmp = 0; // for print tmp if is raw
|
char cTmp = 0; // for print tmp if is raw
|
||||||
if (info->isRawLine) {
|
if (info->isRawLine) {
|
||||||
cTmp = tmp[len - 1];
|
cTmp = tmp[len];
|
||||||
tmp[len - 1] = '\0';
|
tmp[len] = '\0';
|
||||||
}
|
}
|
||||||
|
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id,
|
uDebug("SML:0x%" PRIx64 " smlParseLine israw:%d, numLines:%d, protocol:%d, len:%d, sql:%s", info->id,
|
||||||
info->isRawLine, numLines, info->protocol, len, tmp);
|
info->isRawLine, numLines, info->protocol, len, tmp);
|
||||||
if (info->isRawLine) {
|
if (info->isRawLine) {
|
||||||
tmp[len - 1] = cTmp;
|
tmp[len] = cTmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
if (info->protocol == TSDB_SML_LINE_PROTOCOL) {
|
||||||
|
@ -1449,6 +1459,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
|
||||||
code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
|
code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
|
||||||
}
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tmp[len] = '\0';
|
||||||
uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
|
uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,32 +21,33 @@
|
||||||
#include "clientSml.h"
|
#include "clientSml.h"
|
||||||
|
|
||||||
// comma ,
|
// comma ,
|
||||||
// #define IS_SLASH_COMMA(sql) (*(sql) == COMMA && *((sql)-1) == SLASH)
|
|
||||||
#define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH)
|
#define IS_COMMA(sql) (*(sql) == COMMA && *((sql)-1) != SLASH)
|
||||||
// space
|
// space
|
||||||
// #define IS_SLASH_SPACE(sql) (*(sql) == SPACE && *((sql)-1) == SLASH)
|
|
||||||
#define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH)
|
#define IS_SPACE(sql) (*(sql) == SPACE && *((sql)-1) != SLASH)
|
||||||
// equal =
|
// equal =
|
||||||
// #define IS_SLASH_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) == SLASH)
|
|
||||||
#define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH)
|
#define IS_EQUAL(sql) (*(sql) == EQUAL && *((sql)-1) != SLASH)
|
||||||
// quote "
|
// quote "
|
||||||
// #define IS_SLASH_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) == SLASH)
|
//#define IS_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) != SLASH)
|
||||||
#define IS_QUOTE(sql) (*(sql) == QUOTE && *((sql)-1) != SLASH)
|
|
||||||
// SLASH
|
// SLASH
|
||||||
// #define IS_SLASH_SLASH(sql) (*(sql) == SLASH && *((sql)-1) == SLASH)
|
|
||||||
|
|
||||||
#define IS_SLASH_LETTER(sql) \
|
#define IS_SLASH_LETTER_IN_FIELD_VALUE(sql) \
|
||||||
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL || *(sql) == QUOTE || \
|
(*((sql)-1) == SLASH && (*(sql) == QUOTE || *(sql) == SLASH))
|
||||||
*(sql) == SLASH)) // (IS_SLASH_COMMA(sql) || IS_SLASH_SPACE(sql) || IS_SLASH_EQUAL(sql) ||
|
|
||||||
// IS_SLASH_QUOTE(sql) || IS_SLASH_SLASH(sql))
|
|
||||||
|
|
||||||
#define MOVE_FORWARD_ONE(sql, len) (memmove((void *)((sql)-1), (sql), len))
|
#define IS_SLASH_LETTER_IN_TAG_FIELD_KEY(sql) \
|
||||||
|
(*((sql)-1) == SLASH && (*(sql) == COMMA || *(sql) == SPACE || *(sql) == EQUAL))
|
||||||
|
|
||||||
#define PROCESS_SLASH(key, keyLen) \
|
#define PROCESS_SLASH_IN_FIELD_VALUE(key, keyLen) \
|
||||||
for (int i = 1; i < keyLen; ++i) { \
|
for (int i = 1; i < keyLen; ++i) { \
|
||||||
if (IS_SLASH_LETTER(key + i)) { \
|
if (IS_SLASH_LETTER_IN_FIELD_VALUE(key + i)) { \
|
||||||
|
MOVE_FORWARD_ONE(key + i, keyLen - i); \
|
||||||
|
keyLen--; \
|
||||||
|
} \
|
||||||
|
}
|
||||||
|
|
||||||
|
#define PROCESS_SLASH_IN_TAG_FIELD_KEY(key, keyLen) \
|
||||||
|
for (int i = 1; i < keyLen; ++i) { \
|
||||||
|
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(key + i)) { \
|
||||||
MOVE_FORWARD_ONE(key + i, keyLen - i); \
|
MOVE_FORWARD_ONE(key + i, keyLen - i); \
|
||||||
i--; \
|
|
||||||
keyLen--; \
|
keyLen--; \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
|
@ -151,7 +152,17 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
|
|
||||||
SSmlSTableMeta *sMeta = NULL;
|
SSmlSTableMeta *sMeta = NULL;
|
||||||
if (unlikely(tmp == NULL)) {
|
if (unlikely(tmp == NULL)) {
|
||||||
STableMeta *pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
|
char* measure = currElement->measure;
|
||||||
|
int measureLen = currElement->measureLen;
|
||||||
|
if(currElement->measureEscaped){
|
||||||
|
measure = taosMemoryMalloc(currElement->measureLen);
|
||||||
|
memcpy(measure, currElement->measure, currElement->measureLen);
|
||||||
|
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
|
||||||
|
}
|
||||||
|
STableMeta *pTableMeta = smlGetMeta(info, measure, measureLen);
|
||||||
|
if(currElement->measureEscaped){
|
||||||
|
taosMemoryFree(measure);
|
||||||
|
}
|
||||||
if (pTableMeta == NULL) {
|
if (pTableMeta == NULL) {
|
||||||
info->dataFormat = false;
|
info->dataFormat = false;
|
||||||
info->reRun = true;
|
info->reRun = true;
|
||||||
|
@ -171,17 +182,18 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
info->maxTagKVs = (*tmp)->tags;
|
info->maxTagKVs = (*tmp)->tags;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosArrayClear(preLineKV);
|
taosArrayClearEx(preLineKV, freeSSmlKv);
|
||||||
|
|
||||||
while (*sql < sqlEnd) {
|
while (*sql < sqlEnd) {
|
||||||
if (unlikely(IS_SPACE(*sql))) {
|
if (unlikely(IS_SPACE(*sql))) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasSlash = false;
|
|
||||||
// parse key
|
// parse key
|
||||||
const char *key = *sql;
|
const char *key = *sql;
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
|
bool keyEscaped = false;
|
||||||
|
size_t keyLenEscaped = 0;
|
||||||
while (*sql < sqlEnd) {
|
while (*sql < sqlEnd) {
|
||||||
if (unlikely(IS_COMMA(*sql))) {
|
if (unlikely(IS_COMMA(*sql))) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
|
||||||
|
@ -192,16 +204,14 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
(*sql)++;
|
(*sql)++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (!hasSlash) {
|
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) {
|
||||||
hasSlash = (*(*sql) == SLASH);
|
keyLenEscaped++;
|
||||||
|
keyEscaped = true;
|
||||||
}
|
}
|
||||||
(*sql)++;
|
(*sql)++;
|
||||||
}
|
}
|
||||||
if (unlikely(hasSlash)) {
|
|
||||||
PROCESS_SLASH(key, keyLen)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
|
if (unlikely(IS_INVALID_COL_LEN(keyLen - keyLenEscaped))) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
|
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
|
||||||
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
|
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
|
||||||
}
|
}
|
||||||
|
@ -209,7 +219,8 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
// parse value
|
// parse value
|
||||||
const char *value = *sql;
|
const char *value = *sql;
|
||||||
size_t valueLen = 0;
|
size_t valueLen = 0;
|
||||||
hasSlash = false;
|
bool valueEscaped = false;
|
||||||
|
size_t valueLenEscaped = 0;
|
||||||
while (*sql < sqlEnd) {
|
while (*sql < sqlEnd) {
|
||||||
// parse value
|
// parse value
|
||||||
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
|
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
|
||||||
|
@ -219,8 +230,9 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!hasSlash) {
|
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(value)) {
|
||||||
hasSlash = (*(*sql) == SLASH);
|
valueLenEscaped++;
|
||||||
|
valueEscaped = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*sql)++;
|
(*sql)++;
|
||||||
|
@ -232,15 +244,24 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unlikely(hasSlash)) {
|
if (unlikely(valueLen - valueLenEscaped > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
|
||||||
PROCESS_SLASH(value, valueLen)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (unlikely(valueLen > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE)) {
|
|
||||||
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
|
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSmlKv kv = {.key = key, .keyLen = keyLen, .type = TSDB_DATA_TYPE_NCHAR, .value = value, .length = valueLen};
|
if (keyEscaped){
|
||||||
|
char *tmp = (char*)taosMemoryMalloc(keyLen);
|
||||||
|
memcpy(tmp, key, keyLen);
|
||||||
|
PROCESS_SLASH_IN_TAG_FIELD_KEY(tmp, keyLen);
|
||||||
|
key = tmp;
|
||||||
|
}
|
||||||
|
if (valueEscaped){
|
||||||
|
char *tmp = (char*)taosMemoryMalloc(valueLen);
|
||||||
|
memcpy(tmp, value, valueLen);
|
||||||
|
PROCESS_SLASH_IN_TAG_FIELD_KEY(tmp, valueLen);
|
||||||
|
value = tmp;
|
||||||
|
}
|
||||||
|
SSmlKv kv = {.key = key, .keyLen = keyLen, .type = TSDB_DATA_TYPE_NCHAR, .value = value, .length = valueLen, .keyEscaped = keyEscaped, .valueEscaped = valueEscaped};
|
||||||
|
taosArrayPush(preLineKV, &kv);
|
||||||
if (info->dataFormat) {
|
if (info->dataFormat) {
|
||||||
if (unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)) {
|
if (unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)) {
|
||||||
info->dataFormat = false;
|
info->dataFormat = false;
|
||||||
|
@ -266,7 +287,6 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
info->needModifySchema = true;
|
info->needModifySchema = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosArrayPush(preLineKV, &kv);
|
|
||||||
|
|
||||||
cnt++;
|
cnt++;
|
||||||
if (IS_SPACE(*sql)) {
|
if (IS_SPACE(*sql)) {
|
||||||
|
@ -285,6 +305,11 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
tinfo->tags = taosArrayDup(preLineKV, NULL);
|
tinfo->tags = taosArrayDup(preLineKV, NULL);
|
||||||
|
for(size_t i = 0; i < taosArrayGetSize(preLineKV); i++){
|
||||||
|
SSmlKv *kv = (SSmlKv *)taosArrayGet(preLineKV, i);
|
||||||
|
if(kv->keyEscaped)kv->key = NULL;
|
||||||
|
if(kv->valueEscaped)kv->value = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
smlSetCTableName(tinfo);
|
smlSetCTableName(tinfo);
|
||||||
tinfo->uid = info->uid++;
|
tinfo->uid = info->uid++;
|
||||||
|
@ -321,7 +346,17 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
SSmlSTableMeta **tmp =
|
SSmlSTableMeta **tmp =
|
||||||
(SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
|
(SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen);
|
||||||
if (unlikely(tmp == NULL)) {
|
if (unlikely(tmp == NULL)) {
|
||||||
STableMeta *pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen);
|
char* measure = currElement->measure;
|
||||||
|
int measureLen = currElement->measureLen;
|
||||||
|
if(currElement->measureEscaped){
|
||||||
|
measure = taosMemoryMalloc(currElement->measureLen);
|
||||||
|
memcpy(measure, currElement->measure, currElement->measureLen);
|
||||||
|
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
|
||||||
|
}
|
||||||
|
STableMeta *pTableMeta = smlGetMeta(info, measure, measureLen);
|
||||||
|
if(currElement->measureEscaped){
|
||||||
|
taosMemoryFree(measure);
|
||||||
|
}
|
||||||
if (pTableMeta == NULL) {
|
if (pTableMeta == NULL) {
|
||||||
info->dataFormat = false;
|
info->dataFormat = false;
|
||||||
info->reRun = true;
|
info->reRun = true;
|
||||||
|
@ -352,10 +387,11 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasSlash = false;
|
|
||||||
// parse key
|
// parse key
|
||||||
const char *key = *sql;
|
const char *key = *sql;
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
|
bool keyEscaped = false;
|
||||||
|
size_t keyLenEscaped = 0;
|
||||||
while (*sql < sqlEnd) {
|
while (*sql < sqlEnd) {
|
||||||
if (unlikely(IS_COMMA(*sql))) {
|
if (unlikely(IS_COMMA(*sql))) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
|
||||||
|
@ -366,16 +402,14 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
(*sql)++;
|
(*sql)++;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (!hasSlash) {
|
if (IS_SLASH_LETTER_IN_TAG_FIELD_KEY(*sql)) {
|
||||||
hasSlash = (*(*sql) == SLASH);
|
keyLenEscaped++;
|
||||||
|
keyEscaped = true;
|
||||||
}
|
}
|
||||||
(*sql)++;
|
(*sql)++;
|
||||||
}
|
}
|
||||||
if (unlikely(hasSlash)) {
|
|
||||||
PROCESS_SLASH(key, keyLen)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
|
if (unlikely(IS_INVALID_COL_LEN(keyLen - keyLenEscaped))) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
|
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
|
||||||
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
|
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
|
||||||
}
|
}
|
||||||
|
@ -383,11 +417,13 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
// parse value
|
// parse value
|
||||||
const char *value = *sql;
|
const char *value = *sql;
|
||||||
size_t valueLen = 0;
|
size_t valueLen = 0;
|
||||||
hasSlash = false;
|
bool valueEscaped = false;
|
||||||
|
size_t valueLenEscaped = 0;
|
||||||
bool isInQuote = false;
|
bool isInQuote = false;
|
||||||
|
const char *escapeChar = NULL;
|
||||||
while (*sql < sqlEnd) {
|
while (*sql < sqlEnd) {
|
||||||
// parse value
|
// parse value
|
||||||
if (unlikely(IS_QUOTE(*sql))) {
|
if (unlikely(*(*sql) == QUOTE && (*(*sql - 1) != SLASH || (*sql - 1) == escapeChar))) {
|
||||||
isInQuote = !isInQuote;
|
isInQuote = !isInQuote;
|
||||||
(*sql)++;
|
(*sql)++;
|
||||||
continue;
|
continue;
|
||||||
|
@ -395,13 +431,12 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
if (!isInQuote) {
|
if (!isInQuote) {
|
||||||
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
|
if (unlikely(IS_SPACE(*sql) || IS_COMMA(*sql))) {
|
||||||
break;
|
break;
|
||||||
} else if (unlikely(IS_EQUAL(*sql))) {
|
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
|
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!hasSlash) {
|
if (IS_SLASH_LETTER_IN_FIELD_VALUE(*sql) && (*sql - 1) != escapeChar) {
|
||||||
hasSlash = (*(*sql) == SLASH);
|
escapeChar = *sql;
|
||||||
|
valueEscaped = true;
|
||||||
|
valueLenEscaped++;
|
||||||
}
|
}
|
||||||
|
|
||||||
(*sql)++;
|
(*sql)++;
|
||||||
|
@ -416,14 +451,25 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
|
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
if (unlikely(hasSlash)) {
|
|
||||||
PROCESS_SLASH(value, valueLen)
|
if (keyEscaped){
|
||||||
|
char *tmp = (char*)taosMemoryMalloc(keyLen);
|
||||||
|
memcpy(tmp, key, keyLen);
|
||||||
|
PROCESS_SLASH_IN_TAG_FIELD_KEY(tmp, keyLen);
|
||||||
|
key = tmp;
|
||||||
|
}
|
||||||
|
if (valueEscaped){
|
||||||
|
char *tmp = (char*)taosMemoryMalloc(valueLen);
|
||||||
|
memcpy(tmp, value, valueLen);
|
||||||
|
PROCESS_SLASH_IN_FIELD_VALUE(tmp, valueLen);
|
||||||
|
value = tmp;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
|
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
|
||||||
int32_t ret = smlParseValue(&kv, &info->msgBuf);
|
int32_t ret = smlParseValue(&kv, &info->msgBuf);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "smlParseValue error", value);
|
smlBuildInvalidDataMsg(&info->msgBuf, "smlParseValue error", value);
|
||||||
|
freeSSmlKv(&kv);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -432,6 +478,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
if (unlikely(cnt + 2 > info->currSTableMeta->tableInfo.numOfColumns)) {
|
if (unlikely(cnt + 2 > info->currSTableMeta->tableInfo.numOfColumns)) {
|
||||||
info->dataFormat = false;
|
info->dataFormat = false;
|
||||||
info->reRun = true;
|
info->reRun = true;
|
||||||
|
freeSSmlKv(&kv);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
// bind data
|
// bind data
|
||||||
|
@ -440,22 +487,26 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
uDebug("smlBuildCol error, retry");
|
uDebug("smlBuildCol error, retry");
|
||||||
info->dataFormat = false;
|
info->dataFormat = false;
|
||||||
info->reRun = true;
|
info->reRun = true;
|
||||||
|
freeSSmlKv(&kv);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
if (cnt >= taosArrayGetSize(info->masColKVs)) {
|
if (cnt >= taosArrayGetSize(info->masColKVs)) {
|
||||||
info->dataFormat = false;
|
info->dataFormat = false;
|
||||||
info->reRun = true;
|
info->reRun = true;
|
||||||
|
freeSSmlKv(&kv);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->masColKVs, cnt);
|
SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->masColKVs, cnt);
|
||||||
if (kv.type != maxKV->type) {
|
if (kv.type != maxKV->type) {
|
||||||
info->dataFormat = false;
|
info->dataFormat = false;
|
||||||
info->reRun = true;
|
info->reRun = true;
|
||||||
|
freeSSmlKv(&kv);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
if (unlikely(!IS_SAME_KEY)) {
|
if (unlikely(!IS_SAME_KEY)) {
|
||||||
info->dataFormat = false;
|
info->dataFormat = false;
|
||||||
info->reRun = true;
|
info->reRun = true;
|
||||||
|
freeSSmlKv(&kv);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -463,6 +514,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin
|
||||||
maxKV->length = kv.length;
|
maxKV->length = kv.length;
|
||||||
info->needModifySchema = true;
|
info->needModifySchema = true;
|
||||||
}
|
}
|
||||||
|
freeSSmlKv(&kv);
|
||||||
} else {
|
} else {
|
||||||
if (currElement->colArray == NULL) {
|
if (currElement->colArray == NULL) {
|
||||||
currElement->colArray = taosArrayInit_s(sizeof(SSmlKv), 1);
|
currElement->colArray = taosArrayInit_s(sizeof(SSmlKv), 1);
|
||||||
|
@ -487,10 +539,12 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
elements->measure = sql;
|
elements->measure = sql;
|
||||||
|
|
||||||
// parse measure
|
// parse measure
|
||||||
|
size_t measureLenEscaped = 0;
|
||||||
while (sql < sqlEnd) {
|
while (sql < sqlEnd) {
|
||||||
if (unlikely((sql != elements->measure) && IS_SLASH_LETTER(sql))) {
|
if (unlikely((sql != elements->measure) && IS_SLASH_LETTER_IN_MEASUREMENT(sql))) {
|
||||||
MOVE_FORWARD_ONE(sql, sqlEnd - sql);
|
elements->measureEscaped = true;
|
||||||
sqlEnd--;
|
measureLenEscaped++;
|
||||||
|
sql++;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if (unlikely(IS_COMMA(sql))) {
|
if (unlikely(IS_COMMA(sql))) {
|
||||||
|
@ -503,7 +557,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
sql++;
|
sql++;
|
||||||
}
|
}
|
||||||
elements->measureLen = sql - elements->measure;
|
elements->measureLen = sql - elements->measure;
|
||||||
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen))) {
|
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen - measureLenEscaped))) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
|
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
|
||||||
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||||
}
|
}
|
||||||
|
@ -581,7 +635,9 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
.keyLen = TS_LEN,
|
.keyLen = TS_LEN,
|
||||||
.type = TSDB_DATA_TYPE_TIMESTAMP,
|
.type = TSDB_DATA_TYPE_TIMESTAMP,
|
||||||
.i = ts,
|
.i = ts,
|
||||||
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
|
.length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes,
|
||||||
|
.keyEscaped = false,
|
||||||
|
.valueEscaped = false};
|
||||||
if (info->dataFormat) {
|
if (info->dataFormat) {
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts);
|
uDebug("SML:0x%" PRIx64 " smlParseInfluxString format true, ts:%" PRId64, info->id, ts);
|
||||||
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
|
ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, &kv, 0);
|
||||||
|
|
|
@ -1243,9 +1243,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
|
|
||||||
vgId, epoch, tstrerror(code), requestId);
|
|
||||||
|
|
||||||
if (pMsg->pData) taosMemoryFree(pMsg->pData);
|
if (pMsg->pData) taosMemoryFree(pMsg->pData);
|
||||||
if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);
|
if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);
|
||||||
|
|
||||||
|
@ -1267,6 +1264,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
taosWriteQitem(tmq->mqueue, pRspWrapper);
|
||||||
} else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert
|
} else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert
|
||||||
taosMsleep(500);
|
taosMsleep(500);
|
||||||
|
} else{
|
||||||
|
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
|
||||||
|
vgId, epoch, tstrerror(code), requestId);
|
||||||
}
|
}
|
||||||
|
|
||||||
goto CREATE_MSG_FAIL;
|
goto CREATE_MSG_FAIL;
|
||||||
|
|
Loading…
Reference in New Issue