enh:[TD-32166]refactor code in sml
This commit is contained in:
parent
4090aad6fd
commit
35d3d968ff
|
@ -211,7 +211,6 @@ typedef struct {
|
||||||
cJSON *root; // for parse json
|
cJSON *root; // for parse json
|
||||||
int8_t offset[OTD_JSON_FIELDS_NUM];
|
int8_t offset[OTD_JSON_FIELDS_NUM];
|
||||||
SSmlLineInfo *lines; // element is SSmlLineInfo
|
SSmlLineInfo *lines; // element is SSmlLineInfo
|
||||||
bool parseJsonByLib;
|
|
||||||
SArray *tagJsonArray;
|
SArray *tagJsonArray;
|
||||||
SArray *valueJsonArray;
|
SArray *valueJsonArray;
|
||||||
|
|
||||||
|
|
|
@ -169,23 +169,23 @@ int64_t smlGetTimeValue(const char *value, int32_t len, uint8_t fromPrecision, u
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t smlBuildTableInfo(int numRows, const char *measure, int32_t measureLen, SSmlTableInfo **tInfo) {
|
int32_t smlBuildTableInfo(int numRows, const char *measure, int32_t measureLen, SSmlTableInfo **tInfo) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t lino = 0;
|
||||||
SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
|
SSmlTableInfo *tag = (SSmlTableInfo *)taosMemoryCalloc(sizeof(SSmlTableInfo), 1);
|
||||||
if (!tag) {
|
SML_CHECK_NULL(tag)
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
tag->sTableName = measure;
|
tag->sTableName = measure;
|
||||||
tag->sTableNameLen = measureLen;
|
tag->sTableNameLen = measureLen;
|
||||||
|
|
||||||
tag->cols = taosArrayInit(numRows, POINTER_BYTES);
|
tag->cols = taosArrayInit(numRows, POINTER_BYTES);
|
||||||
if (tag->cols == NULL) {
|
SML_CHECK_NULL(tag->cols)
|
||||||
uError("SML:smlBuildTableInfo failed to allocate memory");
|
|
||||||
taosMemoryFree(tag);
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
*tInfo = tag;
|
*tInfo = tag;
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
|
|
||||||
|
END:
|
||||||
|
taosMemoryFree(tag);
|
||||||
|
uError("%s failed code:%d line:%d", __FUNCTION__ , code, lino);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void smlBuildTsKv(SSmlKv *kv, int64_t ts) {
|
void smlBuildTsKv(SSmlKv *kv, int64_t ts) {
|
||||||
|
@ -437,7 +437,7 @@ END:
|
||||||
|
|
||||||
int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs) {
|
int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs) {
|
||||||
if (info->dataFormat) {
|
if (info->dataFormat) {
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseEndLine format true, ts:%" PRId64, info->id, kvTs->i);
|
uDebug("SML:0x%" PRIx64 " %s format true, ts:%" PRId64, info->id, __FUNCTION__, kvTs->i);
|
||||||
int32_t ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kvTs, 0);
|
int32_t ret = smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kvTs, 0);
|
||||||
if (ret == TSDB_CODE_SUCCESS) {
|
if (ret == TSDB_CODE_SUCCESS) {
|
||||||
ret = smlBuildRow(info->currTableDataCtx);
|
ret = smlBuildRow(info->currTableDataCtx);
|
||||||
|
@ -446,11 +446,11 @@ int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs)
|
||||||
clearColValArraySml(info->currTableDataCtx->pValues);
|
clearColValArraySml(info->currTableDataCtx->pValues);
|
||||||
taosArrayClearP(info->escapedStringList, taosMemoryFree);
|
taosArrayClearP(info->escapedStringList, taosMemoryFree);
|
||||||
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
|
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
|
uError("SML:0x%" PRIx64 " %s smlBuildCol error:%d", info->id, __FUNCTION__, ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseEndLine format false, ts:%" PRId64, info->id, kvTs->i);
|
uDebug("SML:0x%" PRIx64 " %s format false, ts:%" PRId64, info->id, __FUNCTION__, kvTs->i);
|
||||||
taosArraySet(elements->colArray, 0, kvTs);
|
taosArraySet(elements->colArray, 0, kvTs);
|
||||||
}
|
}
|
||||||
info->preLine = *elements;
|
info->preLine = *elements;
|
||||||
|
@ -1134,7 +1134,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
taosHashCleanup(hashTmp);
|
taosHashCleanup(hashTmp);
|
||||||
hashTmp = NULL;
|
hashTmp = NULL;
|
||||||
} else {
|
} else {
|
||||||
uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
|
uError("SML:0x%" PRIx64 " %s load table meta error: %s", info->id, __FUNCTION__, tstrerror(code));
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1145,11 +1145,11 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
||||||
|
|
||||||
taosMemoryFreeClear(sTableData->tableMeta);
|
taosMemoryFreeClear(sTableData->tableMeta);
|
||||||
sTableData->tableMeta = pTableMeta;
|
sTableData->tableMeta = pTableMeta;
|
||||||
uDebug("SML:0x%" PRIx64 "modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, pTableMeta->uid,
|
uDebug("SML:0x%" PRIx64 " %s modify schema uid:%" PRIu64 ", sversion:%d, tversion:%d", info->id, __FUNCTION__, pTableMeta->uid,
|
||||||
pTableMeta->sversion, pTableMeta->tversion);
|
pTableMeta->sversion, pTableMeta->tversion);
|
||||||
tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
|
tmp = (SSmlSTableMeta **)taosHashIterate(info->superTables, tmp);
|
||||||
}
|
}
|
||||||
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas end success, format:%d, needModifySchema:%d", info->id, info->dataFormat,
|
uDebug("SML:0x%" PRIx64 " %s end success, format:%d, needModifySchema:%d", info->id, __FUNCTION__, info->dataFormat,
|
||||||
info->needModifySchema);
|
info->needModifySchema);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1159,7 +1159,7 @@ END:
|
||||||
taosHashCleanup(hashTmp);
|
taosHashCleanup(hashTmp);
|
||||||
taosMemoryFreeClear(pTableMeta);
|
taosMemoryFreeClear(pTableMeta);
|
||||||
(void)catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); // ignore refresh meta code if there is an error
|
(void)catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); // ignore refresh meta code if there is an error
|
||||||
uError("SML:0x%" PRIx64 " smlModifyDBSchemas end failed:%d:%s, format:%d, needModifySchema:%d", info->id, code,
|
uError("SML:0x%" PRIx64 " %s end failed:%d:%s, format:%d, needModifySchema:%d", info->id, __FUNCTION__, code,
|
||||||
tstrerror(code), info->dataFormat, info->needModifySchema);
|
tstrerror(code), info->dataFormat, info->needModifySchema);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -1331,7 +1331,7 @@ int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle) {
|
||||||
SML_CHECK_NULL(info->escapedStringList);
|
SML_CHECK_NULL(info->escapedStringList);
|
||||||
|
|
||||||
*handle = info;
|
*handle = info;
|
||||||
return code;
|
info = NULL;
|
||||||
|
|
||||||
END:
|
END:
|
||||||
smlDestroyInfo(info);
|
smlDestroyInfo(info);
|
||||||
|
@ -1362,8 +1362,8 @@ END:
|
||||||
RETURN
|
RETURN
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseLineBottom(SSmlHandle *info) {
|
static int32_t smlParseEnd(SSmlHandle *info) {
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat,
|
uDebug("SML:0x%" PRIx64 " %s start, format:%d, linenum:%d", info->id, __FUNCTION__, info->dataFormat,
|
||||||
info->lineNum);
|
info->lineNum);
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
|
@ -1403,28 +1403,28 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
|
||||||
SSmlSTableMeta **tableMeta =
|
SSmlSTableMeta **tableMeta =
|
||||||
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
|
(SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen);
|
||||||
if (tableMeta) { // update meta
|
if (tableMeta) { // update meta
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseLineBottom update meta, format:%d, linenum:%d", info->id, info->dataFormat,
|
uDebug("SML:0x%" PRIx64 " %s update meta, format:%d, linenum:%d", info->id, __FUNCTION__, info->dataFormat,
|
||||||
info->lineNum);
|
info->lineNum);
|
||||||
SML_CHECK_CODE(smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf,
|
SML_CHECK_CODE(smlUpdateMeta((*tableMeta)->colHash, (*tableMeta)->cols, elements->colArray, false, &info->msgBuf,
|
||||||
(*tableMeta)->tagHash));
|
(*tableMeta)->tagHash));
|
||||||
SML_CHECK_CODE(smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf,
|
SML_CHECK_CODE(smlUpdateMeta((*tableMeta)->tagHash, (*tableMeta)->tags, tinfo->tags, true, &info->msgBuf,
|
||||||
(*tableMeta)->colHash));
|
(*tableMeta)->colHash));
|
||||||
} else {
|
} else {
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseLineBottom add meta, format:%d, linenum:%d", info->id, info->dataFormat,
|
uDebug("SML:0x%" PRIx64 " %s add meta, format:%d, linenum:%d", info->id, __FUNCTION__, info->dataFormat,
|
||||||
info->lineNum);
|
info->lineNum);
|
||||||
SSmlSTableMeta *meta = NULL;
|
SSmlSTableMeta *meta = NULL;
|
||||||
SML_CHECK_CODE(smlBuildSTableMeta(info->dataFormat, &meta));
|
SML_CHECK_CODE(smlBuildSTableMeta(info->dataFormat, &meta));
|
||||||
code = taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
|
code = taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
smlDestroySTableMeta(&meta);
|
smlDestroySTableMeta(&meta);
|
||||||
uError("SML:0x%" PRIx64 " put measuer to hash failed", info->id);
|
uError("SML:0x%" PRIx64 " put measure to hash failed", info->id);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
SML_CHECK_CODE(smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags, NULL));
|
SML_CHECK_CODE(smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags, NULL));
|
||||||
SML_CHECK_CODE(smlInsertMeta(meta->colHash, meta->cols, elements->colArray, meta->tagHash));
|
SML_CHECK_CODE(smlInsertMeta(meta->colHash, meta->cols, elements->colArray, meta->tagHash));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
|
uDebug("SML:0x%" PRIx64 " %s end, format:%d, linenum:%d", info->id, __FUNCTION__, info->dataFormat, info->lineNum);
|
||||||
|
|
||||||
END:
|
END:
|
||||||
RETURN
|
RETURN
|
||||||
|
@ -1435,7 +1435,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
char *measure = NULL;
|
char *measure = NULL;
|
||||||
SSmlTableInfo **oneTable = NULL;
|
SSmlTableInfo **oneTable = NULL;
|
||||||
uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat);
|
uDebug("SML:0x%" PRIx64 " %s start, format:%d", info->id, __FUNCTION__, info->dataFormat);
|
||||||
|
|
||||||
if (info->pRequest->dbList == NULL) {
|
if (info->pRequest->dbList == NULL) {
|
||||||
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
|
info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN);
|
||||||
|
@ -1484,7 +1484,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
SSmlSTableMeta **pMeta =
|
SSmlSTableMeta **pMeta =
|
||||||
(SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
|
(SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
|
||||||
if (unlikely(NULL == pMeta || NULL == *pMeta || NULL == (*pMeta)->tableMeta)) {
|
if (unlikely(NULL == pMeta || NULL == *pMeta || NULL == (*pMeta)->tableMeta)) {
|
||||||
uError("SML:0x%" PRIx64 " NULL == pMeta. table name: %s", info->id, tableData->childTableName);
|
uError("SML:0x%" PRIx64 " %s NULL == pMeta. table name: %s", info->id, __FUNCTION__, tableData->childTableName);
|
||||||
code = TSDB_CODE_SML_INTERNAL_ERROR;
|
code = TSDB_CODE_SML_INTERNAL_ERROR;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
@ -1492,7 +1492,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
// use tablemeta of stable to save vgid and uid of child table
|
// use tablemeta of stable to save vgid and uid of child table
|
||||||
(*pMeta)->tableMeta->vgId = vg.vgId;
|
(*pMeta)->tableMeta->vgId = vg.vgId;
|
||||||
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
(*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid
|
||||||
uDebug("SML:0x%" PRIx64 " smlInsertData table:%s, uid:%" PRIu64 ", format:%d", info->id, pName.tname,
|
uDebug("SML:0x%" PRIx64 " %s table:%s, uid:%" PRIu64 ", format:%d", info->id, __FUNCTION__, pName.tname,
|
||||||
tableData->uid, info->dataFormat);
|
tableData->uid, info->dataFormat);
|
||||||
|
|
||||||
SML_CHECK_CODE(smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
|
SML_CHECK_CODE(smlBindData(info->pQuery, info->dataFormat, tableData->tags, (*pMeta)->cols, tableData->cols,
|
||||||
|
@ -1510,7 +1510,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
||||||
|
|
||||||
launchQueryImpl(info->pRequest, info->pQuery, true, NULL); // no need to check return code
|
launchQueryImpl(info->pRequest, info->pQuery, true, NULL); // no need to check return code
|
||||||
|
|
||||||
uDebug("SML:0x%" PRIx64 " smlInsertData end, format:%d, code:%d,%s", info->id, info->dataFormat, info->pRequest->code,
|
uDebug("SML:0x%" PRIx64 " %s end, format:%d, code:%d,%s", info->id, __FUNCTION__, info->dataFormat, info->pRequest->code,
|
||||||
tstrerror(info->pRequest->code));
|
tstrerror(info->pRequest->code));
|
||||||
|
|
||||||
return info->pRequest->code;
|
return info->pRequest->code;
|
||||||
|
@ -1605,20 +1605,25 @@ static bool getLine(SSmlHandle *info, char *lines[], char **rawLine, char *rawLi
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
|
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseLine start", info->id);
|
static int32_t smlParseJson(SSmlHandle *info, char *lines[], char *rawLine) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
if (lines) {
|
||||||
|
code = smlParseJSONExt(info, *lines);
|
||||||
|
} else if (rawLine) {
|
||||||
|
code = smlParseJSONExt(info, rawLine);
|
||||||
|
}
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("%s failed code:%d", __FUNCTION__ , code);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t smlParseStart(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
|
||||||
|
uDebug("SML:0x%" PRIx64 " %s start", info->id, __FUNCTION__);
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
if (info->protocol == TSDB_SML_JSON_PROTOCOL) {
|
||||||
if (lines) {
|
return smlParseJson(info, lines, rawLine);
|
||||||
code = smlParseJSONExt(info, *lines);
|
|
||||||
} else if (rawLine) {
|
|
||||||
code = smlParseJSONExt(info, rawLine);
|
|
||||||
}
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
uError("SML:0x%" PRIx64 " smlParseJSON failed:%s", info->id, lines ? *lines : rawLine);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char *oldRaw = rawLine;
|
char *oldRaw = rawLine;
|
||||||
|
@ -1644,19 +1649,17 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
|
||||||
} else {
|
} else {
|
||||||
code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
|
code = smlParseTelnetString(info, (char *)tmp, (char *)tmp + len, info->lines + i);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE;
|
|
||||||
}
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
if (rawLine != NULL) {
|
if (rawLine != NULL) {
|
||||||
printRaw(info->id, i, numLines, DEBUG_ERROR, tmp, len);
|
printRaw(info->id, i, numLines, DEBUG_ERROR, tmp, len);
|
||||||
} else {
|
} else {
|
||||||
uError("SML:0x%" PRIx64 " smlParseLine failed. line %d : %s", info->id, i, tmp);
|
uError("SML:0x%" PRIx64 " %s failed. line %d : %s", info->id, __FUNCTION__, i, tmp);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
if (info->reRun) {
|
if (info->reRun) {
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseLine re run", info->id);
|
uDebug("SML:0x%" PRIx64 " %s re run", info->id, __FUNCTION__);
|
||||||
i = 0;
|
i = 0;
|
||||||
rawLine = oldRaw;
|
rawLine = oldRaw;
|
||||||
code = smlClearForRerun(info);
|
code = smlClearForRerun(info);
|
||||||
|
@ -1667,7 +1670,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
|
||||||
}
|
}
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
uDebug("SML:0x%" PRIx64 " smlParseLine end", info->id);
|
uDebug("SML:0x%" PRIx64 " %s end", info->id, __FUNCTION__);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -1679,8 +1682,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
|
||||||
|
|
||||||
info->cost.parseTime = taosGetTimestampUs();
|
info->cost.parseTime = taosGetTimestampUs();
|
||||||
|
|
||||||
SML_CHECK_CODE(smlParseLine(info, lines, rawLine, rawLineEnd, numLines));
|
SML_CHECK_CODE(smlParseStart(info, lines, rawLine, rawLineEnd, numLines));
|
||||||
SML_CHECK_CODE(smlParseLineBottom(info));
|
SML_CHECK_CODE(smlParseEnd(info));
|
||||||
|
|
||||||
info->cost.lineNum = info->lineNum;
|
info->cost.lineNum = info->lineNum;
|
||||||
info->cost.numOfSTables = taosHashGetSize(info->superTables);
|
info->cost.numOfSTables = taosHashGetSize(info->superTables);
|
||||||
|
@ -1752,7 +1755,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
|
||||||
int cnt = 0;
|
int cnt = 0;
|
||||||
while (1) {
|
while (1) {
|
||||||
SML_CHECK_CODE(createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid, &request));
|
SML_CHECK_CODE(createRequest(*(int64_t *)taos, TSDB_SQL_INSERT, reqid, &request));
|
||||||
SSmlMsgBuf msg = {ERROR_MSG_BUF_DEFAULT_SIZE, request->msgBuf};
|
SSmlMsgBuf msg = {request->msgBufLen, request->msgBuf};
|
||||||
request->code = smlBuildSmlInfo(taos, &info);
|
request->code = smlBuildSmlInfo(taos, &info);
|
||||||
SML_CHECK_CODE(request->code);
|
SML_CHECK_CODE(request->code);
|
||||||
|
|
||||||
|
@ -1822,7 +1825,7 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
smlDestroyInfo(info);
|
smlDestroyInfo(info);
|
||||||
return (TAOS_RES *)request;
|
return (TAOS_RES *)request;
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
static inline int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *metric, SSmlLineInfo *elements) {
|
static inline int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *metric, SSmlLineInfo *elements) {
|
||||||
elements->measureLen = strlen(metric->valuestring);
|
elements->measureLen = strlen(metric->valuestring);
|
||||||
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
|
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
|
||||||
uError("OTD:0x%" PRIx64 " Metric length is 0 or large than 192", info->id);
|
uError("SML:0x%" PRIx64 " Metric length is 0 or large than 192", info->id);
|
||||||
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,7 +44,7 @@ static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks) {
|
||||||
child = child->next;
|
child = child->next;
|
||||||
}
|
}
|
||||||
if (*marks[i] == NULL) {
|
if (*marks[i] == NULL) {
|
||||||
uError("smlGetJsonElements error, not find mark:%d:%s", i, jsonName[i]);
|
uError("SML %s error, not find mark:%d:%s", __FUNCTION__, i, jsonName[i]);
|
||||||
return TSDB_CODE_TSC_INVALID_JSON;
|
return TSDB_CODE_TSC_INVALID_JSON;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -53,7 +53,7 @@ static int32_t smlGetJsonElements(cJSON *root, cJSON ***marks) {
|
||||||
|
|
||||||
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
|
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
|
||||||
if (strcasecmp(typeStr, "bool") != 0) {
|
if (strcasecmp(typeStr, "bool") != 0) {
|
||||||
uError("OTD:invalid type(%s) for JSON Bool", typeStr);
|
uError("SML:invalid type(%s) for JSON Bool", typeStr);
|
||||||
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
|
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
|
||||||
}
|
}
|
||||||
pVal->type = TSDB_DATA_TYPE_BOOL;
|
pVal->type = TSDB_DATA_TYPE_BOOL;
|
||||||
|
@ -67,7 +67,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
|
||||||
// tinyint
|
// tinyint
|
||||||
if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
|
if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
|
||||||
if (!IS_VALID_TINYINT(value->valuedouble)) {
|
if (!IS_VALID_TINYINT(value->valuedouble)) {
|
||||||
uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
|
uError("SML:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
|
||||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
pVal->type = TSDB_DATA_TYPE_TINYINT;
|
pVal->type = TSDB_DATA_TYPE_TINYINT;
|
||||||
|
@ -78,7 +78,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
|
||||||
// smallint
|
// smallint
|
||||||
if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
|
if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
|
||||||
if (!IS_VALID_SMALLINT(value->valuedouble)) {
|
if (!IS_VALID_SMALLINT(value->valuedouble)) {
|
||||||
uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
|
uError("SML:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
|
||||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
pVal->type = TSDB_DATA_TYPE_SMALLINT;
|
pVal->type = TSDB_DATA_TYPE_SMALLINT;
|
||||||
|
@ -89,7 +89,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
|
||||||
// int
|
// int
|
||||||
if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
|
if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
|
||||||
if (!IS_VALID_INT(value->valuedouble)) {
|
if (!IS_VALID_INT(value->valuedouble)) {
|
||||||
uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
|
uError("SML:JSON value(%f) cannot fit in type(int)", value->valuedouble);
|
||||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
pVal->type = TSDB_DATA_TYPE_INT;
|
pVal->type = TSDB_DATA_TYPE_INT;
|
||||||
|
@ -113,7 +113,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
|
||||||
// float
|
// float
|
||||||
if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
|
if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
|
||||||
if (!IS_VALID_FLOAT(value->valuedouble)) {
|
if (!IS_VALID_FLOAT(value->valuedouble)) {
|
||||||
uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
|
uError("SML:JSON value(%f) cannot fit in type(float)", value->valuedouble);
|
||||||
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
|
||||||
}
|
}
|
||||||
pVal->type = TSDB_DATA_TYPE_FLOAT;
|
pVal->type = TSDB_DATA_TYPE_FLOAT;
|
||||||
|
@ -130,7 +130,7 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// if reach here means type is unsupported
|
// if reach here means type is unsupported
|
||||||
uError("OTD:invalid type(%s) for JSON Number", typeStr);
|
uError("SML:invalid type(%s) for JSON Number", typeStr);
|
||||||
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
|
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
|
||||||
} else if (strcasecmp(typeStr, "nchar") == 0) {
|
} else if (strcasecmp(typeStr, "nchar") == 0) {
|
||||||
pVal->type = TSDB_DATA_TYPE_NCHAR;
|
pVal->type = TSDB_DATA_TYPE_NCHAR;
|
||||||
} else {
|
} else {
|
||||||
uError("OTD:invalid type(%s) for JSON String", typeStr);
|
uError("SML:invalid type(%s) for JSON String", typeStr);
|
||||||
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
|
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
|
||||||
}
|
}
|
||||||
pVal->length = strlen(value->valuestring);
|
pVal->length = strlen(value->valuestring);
|
||||||
|
@ -225,7 +225,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
|
||||||
case cJSON_String: {
|
case cJSON_String: {
|
||||||
int32_t ret = smlConvertJSONString(kv, "binary", root);
|
int32_t ret = smlConvertJSONString(kv, "binary", root);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
uError("OTD:Failed to parse binary value from JSON Obj");
|
uError("SML:Failed to parse binary value from JSON Obj");
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -233,7 +233,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
|
||||||
case cJSON_Object: {
|
case cJSON_Object: {
|
||||||
int32_t ret = smlParseValueFromJSONObj(root, kv);
|
int32_t ret = smlParseValueFromJSONObj(root, kv);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
uError("OTD:Failed to parse value from JSON Obj");
|
uError("SML:Failed to parse value from JSON Obj");
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -262,7 +262,7 @@ static int32_t smlProcessTagJson(SSmlHandle *info, cJSON *tags){
|
||||||
}
|
}
|
||||||
size_t keyLen = strlen(tag->string);
|
size_t keyLen = strlen(tag->string);
|
||||||
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
|
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
|
||||||
uError("OTD:Tag key length is 0 or too large than 64");
|
uError("SML:Tag key length is 0 or too large than 64");
|
||||||
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
|
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -436,7 +436,7 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
|
||||||
int32_t size = cJSON_GetArraySize(root);
|
int32_t size = cJSON_GetArraySize(root);
|
||||||
// outmost json fields has to be exactly 4
|
// outmost json fields has to be exactly 4
|
||||||
if (size != OTD_JSON_FIELDS_NUM) {
|
if (size != OTD_JSON_FIELDS_NUM) {
|
||||||
uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
|
uError("SML:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
|
||||||
return TSDB_CODE_TSC_INVALID_JSON;
|
return TSDB_CODE_TSC_INVALID_JSON;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -465,9 +465,9 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
|
||||||
if (unlikely(ts < 0)) {
|
if (unlikely(ts < 0)) {
|
||||||
char* tmp = cJSON_PrintUnformatted(tsJson);
|
char* tmp = cJSON_PrintUnformatted(tsJson);
|
||||||
if (tmp == NULL) {
|
if (tmp == NULL) {
|
||||||
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %" PRId64, info->id, info->msgBuf.buf, ts);
|
uError("SML:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %" PRId64, info->id, info->msgBuf.buf, ts);
|
||||||
} else {
|
} else {
|
||||||
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %s %" PRId64, info->id, info->msgBuf.buf,tmp, ts);
|
uError("SML:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %s %" PRId64, info->id, info->msgBuf.buf,tmp, ts);
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(tmp);
|
||||||
}
|
}
|
||||||
code = TSDB_CODE_INVALID_TIMESTAMP;
|
code = TSDB_CODE_INVALID_TIMESTAMP;
|
||||||
|
@ -506,7 +506,7 @@ int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
|
||||||
} else if (cJSON_IsObject(info->root)) {
|
} else if (cJSON_IsObject(info->root)) {
|
||||||
payloadNum = 1;
|
payloadNum = 1;
|
||||||
} else {
|
} else {
|
||||||
uError("SML:0x%" PRIx64 " Invalid JSON Payload 3:%s", info->id, payload);
|
uError("SML:0x%" PRIx64 " Invalid JSON type:%s", info->id, payload);
|
||||||
return TSDB_CODE_TSC_INVALID_JSON;
|
return TSDB_CODE_TSC_INVALID_JSON;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -520,7 +520,6 @@ int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
|
||||||
SSmlLineInfo element = {0};
|
SSmlLineInfo element = {0};
|
||||||
ret = smlParseJSONStringExt(info, dataPoint, &element);
|
ret = smlParseJSONStringExt(info, dataPoint, &element);
|
||||||
if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
|
if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
ret = smlParseJSONStringExt(info, dataPoint, info->lines + cnt);
|
ret = smlParseJSONStringExt(info, dataPoint, info->lines + cnt);
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,7 +63,7 @@ static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t le
|
||||||
|
|
||||||
int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
|
int64_t ts = smlGetTimeValue(data, len, fromPrecision, toPrecision);
|
||||||
if (unlikely(ts == -1)) {
|
if (unlikely(ts == -1)) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid timestamp", data);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
return ts;
|
return ts;
|
||||||
|
@ -84,7 +84,7 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVal->value[0] == 'l' || pVal->value[0] == 'L') { // nchar
|
if (pVal->value[0] == 'l' || pVal->value[0] == 'L') { // nchar
|
||||||
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= 3) {
|
if (pVal->length >= NCHAR_ADD_LEN && pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"') {
|
||||||
pVal->type = TSDB_DATA_TYPE_NCHAR;
|
pVal->type = TSDB_DATA_TYPE_NCHAR;
|
||||||
pVal->length -= NCHAR_ADD_LEN;
|
pVal->length -= NCHAR_ADD_LEN;
|
||||||
if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
|
if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
|
||||||
|
@ -97,7 +97,7 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVal->value[0] == 'g' || pVal->value[0] == 'G') { // geometry
|
if (pVal->value[0] == 'g' || pVal->value[0] == 'G') { // geometry
|
||||||
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= sizeof("POINT")+3) {
|
if (pVal->length >= NCHAR_ADD_LEN && pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"') {
|
||||||
int32_t code = initCtxGeomFromText();
|
int32_t code = initCtxGeomFromText();
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
|
@ -124,7 +124,7 @@ int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVal->value[0] == 'b' || pVal->value[0] == 'B') { // varbinary
|
if (pVal->value[0] == 'b' || pVal->value[0] == 'B') { // varbinary
|
||||||
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= 3) {
|
if (pVal->length >= NCHAR_ADD_LEN && pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"') {
|
||||||
pVal->type = TSDB_DATA_TYPE_VARBINARY;
|
pVal->type = TSDB_DATA_TYPE_VARBINARY;
|
||||||
if(isHex(pVal->value + NCHAR_ADD_LEN - 1, pVal->length - NCHAR_ADD_LEN)){
|
if(isHex(pVal->value + NCHAR_ADD_LEN - 1, pVal->length - NCHAR_ADD_LEN)){
|
||||||
if(!isValidateHex(pVal->value + NCHAR_ADD_LEN - 1, pVal->length - NCHAR_ADD_LEN)){
|
if(!isValidateHex(pVal->value + NCHAR_ADD_LEN - 1, pVal->length - NCHAR_ADD_LEN)){
|
||||||
|
@ -346,7 +346,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
|
||||||
const char *escapeChar = NULL;
|
const char *escapeChar = NULL;
|
||||||
while (*sql < sqlEnd) {
|
while (*sql < sqlEnd) {
|
||||||
if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) {
|
if (unlikely(IS_SPACE(*sql,escapeChar) || IS_COMMA(*sql,escapeChar))) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", *sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid data", *sql);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
if (unlikely(IS_EQUAL(*sql,escapeChar))) {
|
if (unlikely(IS_EQUAL(*sql,escapeChar))) {
|
||||||
|
@ -363,7 +363,7 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
|
||||||
}
|
}
|
||||||
|
|
||||||
if (unlikely(IS_INVALID_COL_LEN(keyLen - keyLenEscaped))) {
|
if (unlikely(IS_INVALID_COL_LEN(keyLen - keyLenEscaped))) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid key or key is too long than 64", key);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid key or key is too long than 64", key);
|
||||||
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
|
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -397,18 +397,18 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
|
||||||
valueLen = *sql - value;
|
valueLen = *sql - value;
|
||||||
|
|
||||||
if (unlikely(quoteNum != 0 && quoteNum != 2)) {
|
if (unlikely(quoteNum != 0 && quoteNum != 2)) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "unbalanced quotes", value);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML line unbalanced quotes", value);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
if (unlikely(valueLen == 0)) {
|
if (unlikely(valueLen == 0)) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", value);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML line invalid value", value);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
|
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
|
||||||
int32_t ret = smlParseValue(&kv, &info->msgBuf);
|
int32_t ret = smlParseValue(&kv, &info->msgBuf);
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "smlParseValue error", value);
|
uError("SML:0x%" PRIx64 " %s parse value error:%d.", info->id, __FUNCTION__, ret);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -430,11 +430,6 @@ static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlL
|
||||||
}
|
}
|
||||||
(void)memcpy(tmp, kv.value, kv.length);
|
(void)memcpy(tmp, kv.value, kv.length);
|
||||||
PROCESS_SLASH_IN_FIELD_VALUE(tmp, kv.length);
|
PROCESS_SLASH_IN_FIELD_VALUE(tmp, kv.length);
|
||||||
if(kv.type == TSDB_DATA_TYPE_GEOMETRY) {
|
|
||||||
uError("SML:0x%" PRIx64 " smlParseColLine error, invalid GEOMETRY type.", info->id);
|
|
||||||
taosMemoryFree((void*)kv.value);
|
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
|
||||||
}
|
|
||||||
if(kv.type == TSDB_DATA_TYPE_VARBINARY){
|
if(kv.type == TSDB_DATA_TYPE_VARBINARY){
|
||||||
taosMemoryFree((void*)kv.value);
|
taosMemoryFree((void*)kv.value);
|
||||||
}
|
}
|
||||||
|
@ -503,7 +498,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
}
|
}
|
||||||
elements->measureLen = sql - elements->measure;
|
elements->measureLen = sql - elements->measure;
|
||||||
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen - measureLenEscaped))) {
|
if (unlikely(IS_INVALID_TABLE_LEN(elements->measureLen - measureLenEscaped))) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "measure is empty or too large than 192", NULL);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML line measure is empty or too large than 192", NULL);
|
||||||
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -550,7 +545,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
|
|
||||||
elements->colsLen = sql - elements->cols;
|
elements->colsLen = sql - elements->cols;
|
||||||
if (unlikely(elements->colsLen == 0)) {
|
if (unlikely(elements->colsLen == 0)) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "cols is empty", NULL);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML line cols is empty", NULL);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -567,7 +562,7 @@ int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
|
|
||||||
int64_t ts = smlParseInfluxTime(info, elements->timestamp, elements->timestampLen);
|
int64_t ts = smlParseInfluxTime(info, elements->timestamp, elements->timestampLen);
|
||||||
if (unlikely(ts <= 0)) {
|
if (unlikely(ts <= 0)) {
|
||||||
uError("SML:0x%" PRIx64 " smlParseTS error:%" PRId64, info->id, ts);
|
uError("SML:0x%" PRIx64 " %s error:%" PRId64, info->id, __FUNCTION__, ts);
|
||||||
return TSDB_CODE_INVALID_TIMESTAMP;
|
return TSDB_CODE_INVALID_TIMESTAMP;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -172,14 +172,14 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
// parse metric
|
// parse metric
|
||||||
smlParseTelnetElement(&sql, sqlEnd, &elements->measure, &elements->measureLen);
|
smlParseTelnetElement(&sql, sqlEnd, &elements->measure, &elements->measureLen);
|
||||||
if (unlikely((!(elements->measure) || IS_INVALID_TABLE_LEN(elements->measureLen)))) {
|
if (unlikely((!(elements->measure) || IS_INVALID_TABLE_LEN(elements->measureLen)))) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid measure", sql);
|
||||||
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse timestamp
|
// parse timestamp
|
||||||
smlParseTelnetElement(&sql, sqlEnd, &elements->timestamp, &elements->timestampLen);
|
smlParseTelnetElement(&sql, sqlEnd, &elements->timestamp, &elements->timestampLen);
|
||||||
if (unlikely(!elements->timestamp || elements->timestampLen == 0)) {
|
if (unlikely(!elements->timestamp || elements->timestampLen == 0)) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid timestamp", sql);
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_SML_INVALID_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -189,19 +189,21 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
}
|
}
|
||||||
int64_t ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
|
int64_t ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
|
||||||
if (unlikely(ts < 0)) {
|
if (unlikely(ts < 0)) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet parse timestamp failed", sql);
|
||||||
return TSDB_CODE_INVALID_TIMESTAMP;
|
return TSDB_CODE_INVALID_TIMESTAMP;
|
||||||
}
|
}
|
||||||
|
|
||||||
// parse value
|
// parse value
|
||||||
smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
|
smlParseTelnetElement(&sql, sqlEnd, &elements->cols, &elements->colsLen);
|
||||||
if (unlikely(!elements->cols || elements->colsLen == 0)) {
|
if (unlikely(!elements->cols || elements->colsLen == 0)) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid value", sql);
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
|
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
|
||||||
if (smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
|
int ret = smlParseValue(&kv, &info->msgBuf);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
uError("SML:0x%" PRIx64 " %s parse value error:%d.", info->id, __FUNCTION__, ret);
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -210,11 +212,11 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
|
||||||
elements->tags = sql;
|
elements->tags = sql;
|
||||||
elements->tagsLen = sqlEnd - sql;
|
elements->tagsLen = sqlEnd - sql;
|
||||||
if (unlikely(!elements->tags || elements->tagsLen == 0)) {
|
if (unlikely(!elements->tags || elements->tagsLen == 0)) {
|
||||||
smlBuildInvalidDataMsg(&info->msgBuf, "invalid value", sql);
|
smlBuildInvalidDataMsg(&info->msgBuf, "SML telnet invalid tag value", sql);
|
||||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
int ret = smlParseTelnetTags(info, sql, sqlEnd, elements);
|
ret = smlParseTelnetTags(info, sql, sqlEnd, elements);
|
||||||
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
|
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,6 +68,15 @@ TEST(testCase, smlParseInfluxString_Test) {
|
||||||
taosArrayDestroy(elements.colArray);
|
taosArrayDestroy(elements.colArray);
|
||||||
elements.colArray = nullptr;
|
elements.colArray = nullptr;
|
||||||
|
|
||||||
|
// case 0 false
|
||||||
|
tmp = "st,t1=3 c3=\"";
|
||||||
|
(void)memcpy(sql, tmp, strlen(tmp) + 1);
|
||||||
|
(void)memset(&elements, 0, sizeof(SSmlLineInfo));
|
||||||
|
ret = smlParseInfluxString(info, sql, sql + strlen(sql), &elements);
|
||||||
|
ASSERT_NE(ret, 0);
|
||||||
|
taosArrayDestroy(elements.colArray);
|
||||||
|
elements.colArray = nullptr;
|
||||||
|
|
||||||
// case 2 false
|
// case 2 false
|
||||||
tmp = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
|
tmp = "st,t1=3,t2=4,t3=t3 c1=3i64,c3=\"passit hello,c1=2,c2=false,c4=4f64 1626006833639000000";
|
||||||
(void)memcpy(sql, tmp, strlen(tmp) + 1);
|
(void)memcpy(sql, tmp, strlen(tmp) + 1);
|
||||||
|
|
|
@ -472,7 +472,7 @@ int32_t smlInitHandle(SQuery** query) {
|
||||||
|
|
||||||
int32_t code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
|
int32_t code = nodesMakeNode(QUERY_NODE_QUERY, (SNode**)&pQuery);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
uError("create pQuery error");
|
uError("SML create pQuery error");
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE;
|
||||||
|
@ -480,12 +480,12 @@ int32_t smlInitHandle(SQuery** query) {
|
||||||
pQuery->msgType = TDMT_VND_SUBMIT;
|
pQuery->msgType = TDMT_VND_SUBMIT;
|
||||||
code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&stmt);
|
code = nodesMakeNode(QUERY_NODE_VNODE_MODIFY_STMT, (SNode**)&stmt);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
uError("create SVnodeModifyOpStmt error");
|
uError("SML create SVnodeModifyOpStmt error");
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||||
if (stmt->pTableBlockHashObj == NULL){
|
if (stmt->pTableBlockHashObj == NULL){
|
||||||
uError("create pTableBlockHashObj error");
|
uError("SML create pTableBlockHashObj error");
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1882,6 +1882,20 @@ int sml_td24559_Test() {
|
||||||
pRes = taos_query(taos, "create database if not exists td24559");
|
pRes = taos_query(taos, "create database if not exists td24559");
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
const char *sql1[] = {
|
||||||
|
"sttb,t1=1 f1=283i32,f2=g\"\" 1632299372000",
|
||||||
|
"sttb,t1=1 f2=G\"Point(4.343 89.342)\",f1=106i32 1632299373000",
|
||||||
|
};
|
||||||
|
|
||||||
|
pRes = taos_query(taos, "use td24559");
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL,
|
||||||
|
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
|
||||||
|
int code = taos_errno(pRes);
|
||||||
|
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
|
ASSERT(code);
|
||||||
|
|
||||||
const char *sql[] = {
|
const char *sql[] = {
|
||||||
"stb,t1=1 f1=283i32,f2=g\"Point(4.343 89.342)\" 1632299372000",
|
"stb,t1=1 f1=283i32,f2=g\"Point(4.343 89.342)\" 1632299372000",
|
||||||
"stb,t1=1 f2=G\"Point(4.343 89.342)\",f1=106i32 1632299373000",
|
"stb,t1=1 f2=G\"Point(4.343 89.342)\",f1=106i32 1632299373000",
|
||||||
|
@ -1895,7 +1909,7 @@ int sml_td24559_Test() {
|
||||||
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
|
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL,
|
||||||
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
|
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
|
||||||
|
|
||||||
int code = taos_errno(pRes);
|
code = taos_errno(pRes);
|
||||||
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
|
printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes));
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
@ -2254,8 +2268,8 @@ int main(int argc, char *argv[]) {
|
||||||
ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
ret = sml_td18789_Test();
|
ret = sml_td18789_Test();
|
||||||
ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
// ret = sml_td24070_Test();
|
ret = sml_td24070_Test();
|
||||||
// ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
ret = sml_td23881_Test();
|
ret = sml_td23881_Test();
|
||||||
ASSERT(ret);
|
ASSERT(ret);
|
||||||
ret = sml_escape_Test();
|
ret = sml_escape_Test();
|
||||||
|
@ -2264,8 +2278,8 @@ int main(int argc, char *argv[]) {
|
||||||
ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
ret = sml_ts3116_Test();
|
ret = sml_ts3116_Test();
|
||||||
ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
// ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file
|
ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file
|
||||||
// ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
ret = sml_ts3303_Test();
|
ret = sml_ts3303_Test();
|
||||||
ASSERT(!ret);
|
ASSERT(!ret);
|
||||||
ret = sml_ttl_Test();
|
ret = sml_ttl_Test();
|
||||||
|
|
Loading…
Reference in New Issue