From 75c8727c52909818b0ed5ac3a25ecced9b9a508b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 24 Oct 2024 15:48:57 +0800 Subject: [PATCH] enh:[TD-32166]refactor code in sml --- source/client/inc/clientSml.h | 8 ++ source/client/src/clientSml.c | 167 ++++++++++++++-------------- source/client/src/clientSmlJson.c | 89 +++++---------- source/client/src/clientSmlLine.c | 3 +- source/client/src/clientSmlTelnet.c | 7 +- utils/test/c/sml_test.c | 104 ++++++++++++++++- 6 files changed, 230 insertions(+), 148 deletions(-) diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index a562ca3226..8558ec46dc 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -95,15 +95,23 @@ extern "C" { #define SML_CHECK_CODE(CMD) \ code = (CMD); \ if (TSDB_CODE_SUCCESS != code) { \ + lino = __LINE__; \ goto END; \ } #define SML_CHECK_NULL(CMD) \ if (NULL == (CMD)) { \ code = terrno; \ + lino = __LINE__; \ goto END; \ } +#define RETURN \ + if (code != 0){ \ + uError("%s failed code:%d line:%d", __FUNCTION__ , code, lino); \ + } \ + return code; + typedef enum { SCHEMA_ACTION_NULL, SCHEMA_ACTION_CREATE_STABLE, diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index c17ff1413e..5f4327b1cd 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -213,22 +213,20 @@ static void smlDestroySTableMeta(void *para) { } int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSmlSTableMeta **sMeta) { - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; STableMeta *pTableMeta = NULL; - char *measure = currElement->measure; - int measureLen = currElement->measureLen; + int measureLen = currElement->measureLen; + char *measure = (char *)taosMemoryMalloc(measureLen); + SML_CHECK_NULL(measure); + (void)memcpy(measure, currElement->measure, measureLen); if (currElement->measureEscaped) { - measure = (char *)taosMemoryMalloc(measureLen); - SML_CHECK_NULL(measure); - (void)memcpy(measure, currElement->measure, measureLen); PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen); - smlStrReplace(measure, measureLen); } + smlStrReplace(measure, measureLen); code = smlGetMeta(info, measure, measureLen, &pTableMeta); - if (currElement->measureEscaped) { - taosMemoryFree(measure); - } + taosMemoryFree(measure); if (code != TSDB_CODE_SUCCESS) { info->dataFormat = false; info->reRun = true; @@ -260,7 +258,7 @@ int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSml END: smlDestroySTableMeta(sMeta); taosMemoryFreeClear(pTableMeta); - return code; + RETURN } bool isSmlColAligned(SSmlHandle *info, int cnt, SSmlKv *kv) { @@ -376,8 +374,8 @@ int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) { int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) { int32_t code = TSDB_CODE_SUCCESS; - SSmlTableInfo **oneTable = - (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureTagsLen); + int32_t lino = 0; + SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureTagsLen); SSmlTableInfo *tinfo = NULL; if (unlikely(oneTable == NULL)) { SML_CHECK_CODE(smlBuildTableInfo(1, elements->measure, elements->measureLen, &tinfo)); @@ -401,47 +399,40 @@ int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) { } else { tinfo = *oneTable; } - if (tinfo == NULL) { - uError("smlProcessChildTable failed to get child table info"); - return TSDB_CODE_SML_INTERNAL_ERROR; - } if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx; return TSDB_CODE_SUCCESS; END: smlDestroyTableInfo(&tinfo); - return code; + RETURN } int32_t smlParseEndTelnetJsonFormat(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv) { int32_t code = 0; - uDebug("SML:0x%" PRIx64 " smlParseEndTelnetJson format true, ts:%" PRId64, info->id, kvTs->i); + int32_t lino = 0; + uDebug("SML:0x%" PRIx64 " %s format true, ts:%" PRId64, info->id, __FUNCTION__ , kvTs->i); SML_CHECK_CODE(smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kvTs, 0)); SML_CHECK_CODE(smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kv, 1)); SML_CHECK_CODE(smlBuildRow(info->currTableDataCtx)); - info->preLine = *elements; END: clearColValArraySml(info->currTableDataCtx->pValues); - if (unlikely(code != TSDB_CODE_SUCCESS)) { - smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL); - } - return code; + RETURN } int32_t smlParseEndTelnetJsonUnFormat(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv) { int32_t code = 0; - uDebug("SML:0x%" PRIx64 " smlParseEndTelnetJson format false, ts:%" PRId64, info->id, kvTs->i); + int32_t lino = 0; + uDebug("SML:0x%" PRIx64 " %s format false, ts:%" PRId64, info->id, __FUNCTION__, kvTs->i); if (elements->colArray == NULL) { elements->colArray = taosArrayInit(16, sizeof(SSmlKv)); SML_CHECK_NULL(elements->colArray); } SML_CHECK_NULL(taosArrayPush(elements->colArray, kvTs)); SML_CHECK_NULL (taosArrayPush(elements->colArray, kv)); - info->preLine = *elements; END: - return code; + RETURN } int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs) { @@ -469,8 +460,9 @@ int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs) static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnameKey) { int32_t code = 0; - bool autoChildName = false; - size_t delimiter = strlen(tsSmlAutoChildTableNameDelimiter); + int32_t lino = 0; + bool autoChildName = false; + size_t delimiter = strlen(tsSmlAutoChildTableNameDelimiter); if (delimiter > 0 && tbnameKey == NULL) { size_t totalNameLen = delimiter * (taosArrayGetSize(tags) - 1); for (int i = 0; i < taosArrayGetSize(tags); i++) { @@ -519,11 +511,12 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam } END: - return code; + RETURN } int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) { int32_t code = 0; + int32_t lino = 0; SArray *dst = NULL; SML_CHECK_CODE(smlParseTableName(oneTable->tags, oneTable->childTableName, tbnameKey)); @@ -531,7 +524,6 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) { dst = taosArrayDup(oneTable->tags, NULL); SML_CHECK_NULL(dst); if (oneTable->sTableNameLen >= TSDB_TABLE_NAME_LEN) { - uError("SML:smlSetCTableName super table name is too long"); code = TSDB_CODE_SML_INTERNAL_ERROR; goto END; } @@ -550,7 +542,7 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) { END: taosArrayDestroy(dst); - return code; + RETURN } int32_t getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo) { @@ -575,6 +567,7 @@ int32_t getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo * int32_t smlBuildSTableMeta(bool isDataFormat, SSmlSTableMeta **sMeta) { int32_t code = 0; + int32_t lino = 0; SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); SML_CHECK_NULL(meta); if (unlikely(!isDataFormat)) { @@ -594,7 +587,8 @@ int32_t smlBuildSTableMeta(bool isDataFormat, SSmlSTableMeta **sMeta) { END: smlDestroySTableMeta(&meta); - return TSDB_CODE_OUT_OF_MEMORY; + uError("%s failed code:%d line:%d", __FUNCTION__ , code, lino); + return code; } int32_t smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) { @@ -704,8 +698,9 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL; if (index) { if (colField[*index].type != kv->type) { - uError("SML:0x%" PRIx64 " point type and db type mismatch. db type: %d, point type: %d, key: %s", info->id, - colField[*index].type, kv->type, kv->key); + snprintf(info->msgBuf.buf, info->msgBuf.len, "SML:0x%" PRIx64 " %s point type and db type mismatch. db type: %d, point type: %d, key: %s", + info->id, __FUNCTION__, colField[*index].type, kv->type, kv->key); + uError("%s", info->msgBuf.buf); return TSDB_CODE_SML_INVALID_DATA; } @@ -759,6 +754,7 @@ static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) { static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray *checkDumplicateCols, ESchemaAction *action, bool isTag) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; for (int j = 0; j < taosArrayGetSize(cols); ++j) { if (j == 0 && !isTag) continue; SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j); @@ -775,11 +771,12 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH } } END: - return code; + RETURN } static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); SML_CHECK_NULL(hashTmp); int32_t i = 0; @@ -798,7 +795,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool END: taosHashCleanup(hashTmp); - return code; + RETURN } static int32_t getBytes(uint8_t type, int32_t length) { @@ -813,6 +810,7 @@ static int32_t getBytes(uint8_t type, int32_t length) { static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols, SArray *results, int32_t numOfCols, bool isTag) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = TSDB_CODE_SUCCESS; for (int j = 0; j < taosArrayGetSize(cols); ++j) { SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j); SML_CHECK_NULL(kv); @@ -822,7 +820,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO SField field = {0}; field.type = kv->type; field.bytes = getBytes(kv->type, kv->length); - (void)memcpy(field.name, kv->key, MIN(kv->keyLen, sizeof(field.name) - 1)); + (void)memcpy(field.name, kv->key, TMIN(kv->keyLen, sizeof(field.name) - 1)); SML_CHECK_NULL(taosArrayPush(results, &field)); } else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) { uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen); @@ -851,7 +849,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO } END: - return code; + RETURN } static FORCE_INLINE void smlBuildCreateStbReq(SMCreateStbReq *pReq, int32_t colVer, int32_t tagVer, tb_uid_t suid, int8_t source){ @@ -865,6 +863,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, SRequestObj *pRequest = NULL; SMCreateStbReq pReq = {0}; int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SCmdMsgInfo pCmdMsg = {0}; char *pSql = NULL; @@ -952,16 +951,17 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns, END: destroyRequest(pRequest); tFreeSMCreateStbReq(&pReq); - return code; + RETURN } static int32_t smlCreateTable(SSmlHandle *info, SRequestConnInfo *conn, SSmlSTableMeta *sTableData, SName *pName, STableMeta **pTableMeta){ int32_t code = 0; + int32_t lino = 0; SArray *pColumns = NULL; SArray *pTags = NULL; SML_CHECK_CODE(smlCheckAuth(info, conn, NULL, AUTH_TYPE_WRITE)); - uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName->tname); + uDebug("SML:0x%" PRIx64 " %s create table:%s", info->id, __FUNCTION__, pName->tname); pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField)); SML_CHECK_NULL(pColumns); pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField)); @@ -977,11 +977,12 @@ static int32_t smlCreateTable(SSmlHandle *info, SRequestConnInfo *conn, SSmlSTab END: taosArrayDestroy(pColumns); taosArrayDestroy(pTags); - return code; + RETURN } static int32_t smlBuildFields(SArray **pColumns, SArray **pTags, STableMeta *pTableMeta, SSmlSTableMeta *sTableData){ int32_t code = 0; + int32_t lino = 0; *pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + (pTableMeta)->tableInfo.numOfColumns, sizeof(SField)); SML_CHECK_NULL(pColumns); *pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + (pTableMeta)->tableInfo.numOfTags, sizeof(SField)); @@ -998,7 +999,7 @@ static int32_t smlBuildFields(SArray **pColumns, SArray **pTags, STableMeta *pTa } } END: - return code; + RETURN } static int32_t smlModifyTag(SSmlHandle *info, SHashObj* hashTmp, SRequestConnInfo *conn, SSmlSTableMeta *sTableData, SName *pName, STableMeta **pTableMeta){ @@ -1006,11 +1007,12 @@ static int32_t smlModifyTag(SSmlHandle *info, SHashObj* hashTmp, SRequestConnInf SArray *pColumns = NULL; SArray *pTags = NULL; int32_t code = 0; + int32_t lino = 0; SML_CHECK_CODE(smlProcessSchemaAction(info, (*pTableMeta)->schema, hashTmp, sTableData->tags, sTableData->cols, &action, true)); if (action != SCHEMA_ACTION_NULL) { SML_CHECK_CODE(smlCheckAuth(info, conn, pName->tname, AUTH_TYPE_WRITE)); - uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName->tname, + uDebug("SML:0x%" PRIx64 " %s change table tag, table:%s, action:%d", info->id, __FUNCTION__, pName->tname, action); SML_CHECK_CODE(smlBuildFields(&pColumns, &pTags, *pTableMeta, sTableData)); SML_CHECK_CODE(smlBuildFieldsList(info, (*pTableMeta)->schema, hashTmp, sTableData->tags, pTags, @@ -1027,7 +1029,7 @@ static int32_t smlModifyTag(SSmlHandle *info, SHashObj* hashTmp, SRequestConnInf END: taosArrayDestroy(pColumns); taosArrayDestroy(pTags); - return code; + RETURN } static int32_t smlModifyCols(SSmlHandle *info, SHashObj* hashTmp, SRequestConnInfo *conn, @@ -1036,11 +1038,12 @@ static int32_t smlModifyCols(SSmlHandle *info, SHashObj* hashTmp, SRequestConnIn SArray *pColumns = NULL; SArray *pTags = NULL; int32_t code = 0; + int32_t lino = 0; SML_CHECK_CODE(smlProcessSchemaAction(info, (*pTableMeta)->schema, hashTmp, sTableData->cols, sTableData->tags, &action, false)); if (action != SCHEMA_ACTION_NULL) { SML_CHECK_CODE(smlCheckAuth(info, conn, pName->tname, AUTH_TYPE_WRITE)); - uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName->tname, + uDebug("SML:0x%" PRIx64 " %s change table col, table:%s, action:%d", info->id, __FUNCTION__, pName->tname, action); SML_CHECK_CODE(smlBuildFields(&pColumns, &pTags, *pTableMeta, sTableData)); SML_CHECK_CODE(smlBuildFieldsList(info, (*pTableMeta)->schema, hashTmp, sTableData->cols, pColumns, @@ -1054,14 +1057,15 @@ static int32_t smlModifyCols(SSmlHandle *info, SHashObj* hashTmp, SRequestConnIn SML_CHECK_CODE(catalogGetSTableMeta(info->pCatalog, conn, pName, pTableMeta)); } - END: +END: taosArrayDestroy(pColumns); taosArrayDestroy(pTags); - return code; + RETURN } static int32_t smlBuildTempHash(SHashObj *hashTmp, STableMeta *pTableMeta, uint16_t start, uint16_t end){ int32_t code = 0; + int32_t lino = 0; for (uint16_t i = start; i < end; i++) { SML_CHECK_CODE(taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES)); } @@ -1071,12 +1075,13 @@ END: } static int32_t smlModifyDBSchemas(SSmlHandle *info) { - uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat, + uDebug("SML:0x%" PRIx64 " %s start, format:%d, needModifySchema:%d", info->id, __FUNCTION__, info->dataFormat, info->needModifySchema); if (info->dataFormat && !info->needModifySchema) { return TSDB_CODE_SUCCESS; } int32_t code = 0; + int32_t lino = 0; SHashObj *hashTmp = NULL; STableMeta *pTableMeta = NULL; @@ -1099,10 +1104,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { char *measure = taosMemoryMalloc(superTableLen); SML_CHECK_NULL(measure); (void)memcpy(measure, superTable, superTableLen); - PROCESS_SLASH_IN_MEASUREMENT(measure, superTableLen); + if (info->protocol == TSDB_SML_LINE_PROTOCOL){ + PROCESS_SLASH_IN_MEASUREMENT(measure, superTableLen); + } smlStrReplace(measure, superTableLen); (void)memset(pName.tname, 0, TSDB_TABLE_NAME_LEN); - (void)memcpy(pName.tname, measure, MIN(superTableLen, TSDB_TABLE_NAME_LEN - 1)); + (void)memcpy(pName.tname, measure, TMIN(superTableLen, TSDB_TABLE_NAME_LEN - 1)); taosMemoryFree(measure); code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); @@ -1160,6 +1167,7 @@ END: static int32_t smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SHashObj *checkDuplicate) { int32_t code = 0; + int32_t lino = 0; terrno = 0; for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) { SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); @@ -1177,12 +1185,13 @@ static int32_t smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols } END: - return code; + RETURN } static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg, SHashObj *checkDuplicate) { int32_t code = 0; + int32_t lino = 0; for (int i = 0; i < taosArrayGetSize(cols); ++i) { SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); SML_CHECK_NULL(kv); @@ -1225,7 +1234,7 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols } END: - return code; + RETURN } void smlDestroyTableInfo(void *para) { @@ -1249,8 +1258,7 @@ void freeSSmlKv(void *data) { } void smlDestroyInfo(SSmlHandle *info) { - if (!info) return; - // qDestroyQuery(info->pQuery); + if (info == NULL) return; taosHashCleanup(info->pVgHash); taosHashCleanup(info->childTables); @@ -1275,22 +1283,20 @@ void smlDestroyInfo(SSmlHandle *info) { if (!info->dataFormat) { for (int i = 0; i < info->lineNum; i++) { taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv); - if (info->parseJsonByLib) { - taosMemoryFree(info->lines[i].tags); - } if (info->lines[i].measureTagsLen != 0 && info->protocol != TSDB_SML_LINE_PROTOCOL) { taosMemoryFree(info->lines[i].measureTag); } } taosMemoryFree(info->lines); } - + taosMemoryFreeClear(info->preLine.tags); cJSON_Delete(info->root); taosMemoryFreeClear(info); } int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle)); SML_CHECK_NULL(info); if (taos != NULL){ @@ -1327,11 +1333,12 @@ int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle) { END: smlDestroyInfo(info); - return code; + RETURN } static int32_t smlPushCols(SArray *colsArray, SArray *cols) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); SML_CHECK_NULL(kvHash); for (size_t i = 0; i < taosArrayGetSize(cols); i++) { @@ -1350,13 +1357,14 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) { return code; END: taosHashCleanup(kvHash); - return code; + RETURN } static int32_t smlParseLineBottom(SSmlHandle *info) { uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); int32_t code = 0; + int32_t lino = 0; if (info->dataFormat) return TSDB_CODE_SUCCESS; for (int32_t i = 0; i < info->lineNum; i++) { @@ -1417,11 +1425,12 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum); END: - return code; + RETURN } static int32_t smlInsertData(SSmlHandle *info) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; char *measure = NULL; SSmlTableInfo **oneTable = NULL; uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat); @@ -1444,7 +1453,9 @@ static int32_t smlInsertData(SSmlHandle *info) { measure = (char *)taosMemoryMalloc(tableData->sTableNameLen); SML_CHECK_NULL(measure); (void)memcpy(measure, tableData->sTableName, tableData->sTableNameLen); - PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen); + if (info->protocol == TSDB_SML_LINE_PROTOCOL){ + PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen); + } smlStrReplace(measure, measureLen); (void)memset(pName.tname, 0, TSDB_TABLE_NAME_LEN); (void)memcpy(pName.tname, measure, measureLen); @@ -1505,7 +1516,7 @@ static int32_t smlInsertData(SSmlHandle *info) { END: taosMemoryFree(measure); taosHashCancelIterate(info->childTables, oneTable); - return code; + RETURN } static void smlPrintStatisticInfo(SSmlHandle *info) { @@ -1521,6 +1532,8 @@ static void smlPrintStatisticInfo(SSmlHandle *info) { } int32_t smlClearForRerun(SSmlHandle *info) { + int32_t code = 0; + int32_t lino = 0; info->reRun = false; taosHashClear(info->childTables); @@ -1528,33 +1541,23 @@ int32_t smlClearForRerun(SSmlHandle *info) { taosHashClear(info->tableUids); if (!info->dataFormat) { - if (unlikely(info->lines != NULL)) { - uError("SML:0x%" PRIx64 " info->lines != NULL", info->id); - return TSDB_CODE_SML_INVALID_DATA; - } info->lines = (SSmlLineInfo *)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo)); - if (unlikely(info->lines == NULL)) { - uError("SML:0x%" PRIx64 " info->lines == NULL", info->id); - return terrno; - } + SML_CHECK_NULL(info->lines); } taosArrayClearP(info->escapedStringList, taosMemoryFree); + taosMemoryFreeClear(info->preLine.tags); (void)memset(&info->preLine, 0, sizeof(SSmlLineInfo)); info->currSTableMeta = NULL; info->currTableDataCtx = NULL; SVnodeModifyOpStmt *stmt = (SVnodeModifyOpStmt *)(info->pQuery->pRoot); - if (stmt == NULL){ - return TSDB_CODE_SML_INVALID_DATA; - } stmt->freeHashFunc(stmt->pTableBlockHashObj); stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (stmt->pTableBlockHashObj == NULL) { - uError("SML:0x%" PRIx64 " stmt->pTableBlockHashObj == NULL", info->id); - return terrno; - } - return TSDB_CODE_SUCCESS; + SML_CHECK_NULL(stmt->pTableBlockHashObj); + +END: + RETURN } static void printRaw(int64_t id, int lineNum, int numLines, ELogLevel level, char* data, int32_t len){ @@ -1667,6 +1670,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; int32_t retryNum = 0; info->cost.parseTime = taosGetTimestampUs(); @@ -1694,7 +1698,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL SML_CHECK_CODE(smlInsertData(info)); END: - return code; + RETURN } void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) { @@ -1738,6 +1742,7 @@ void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawL TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd, int numLines, int protocol, int precision, int32_t ttl, int64_t reqid, char *tbnameKey) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SRequestObj *request = NULL; SSmlHandle *info = NULL; int cnt = 0; diff --git a/source/client/src/clientSmlJson.c b/source/client/src/clientSmlJson.c index d44f9cac9c..feb0b4645a 100644 --- a/source/client/src/clientSmlJson.c +++ b/source/client/src/clientSmlJson.c @@ -290,7 +290,12 @@ static int32_t smlProcessTagJson(SSmlHandle *info, cJSON *tags){ } static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) { + if (is_same_child_table_telnet(elements, &info->preLine) == 0) { + elements->measureTag = info->preLine.measureTag; + return TSDB_CODE_SUCCESS; + } int32_t code = 0; + int32_t lino = 0; if(info->dataFormat){ SML_CHECK_CODE(smlProcessSuperTable(info, elements)); } @@ -302,7 +307,7 @@ END: if(info->reRun){ return TSDB_CODE_SUCCESS; } - return code; + RETURN } static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) { @@ -420,7 +425,8 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) { } static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) { - int32_t ret = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; cJSON *metricJson = NULL; cJSON *tsJson = NULL; @@ -435,52 +441,22 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo } cJSON **marks[OTD_JSON_FIELDS_NUM] = {&metricJson, &tsJson, &valueJson, &tagsJson}; - ret = smlGetJsonElements(root, marks); - if (unlikely(ret != TSDB_CODE_SUCCESS)) { - return ret; - } - + SML_CHECK_CODE(smlGetJsonElements(root, marks)); // Parse metric - ret = smlParseMetricFromJSON(info, metricJson, elements); - if (unlikely(ret != TSDB_CODE_SUCCESS)) { - uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id); - return ret; - } - + SML_CHECK_CODE(smlParseMetricFromJSON(info, metricJson, elements)); // Parse metric value SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN}; - ret = smlParseValueFromJSON(valueJson, &kv); - if (unlikely(ret)) { - uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id); - return ret; - } + SML_CHECK_CODE(smlParseValueFromJSON(valueJson, &kv)); // Parse tags - bool needFree = info->dataFormat; elements->tags = cJSON_PrintUnformatted(tagsJson); - if (elements->tags == NULL){ - return TSDB_CODE_OUT_OF_MEMORY; - } - elements->tagsLen = strlen(elements->tags); - if (is_same_child_table_telnet(elements, &info->preLine) != 0) { - ret = smlParseTagsFromJSON(info, tagsJson, elements); - if (unlikely(ret)) { - uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id); - taosMemoryFree(elements->tags); - elements->tags = NULL; - return ret; - } - } else { - elements->measureTag = info->preLine.measureTag; - } + SML_CHECK_NULL(elements->tags); - if (needFree) { - taosMemoryFree(elements->tags); - elements->tags = NULL; - } + elements->tagsLen = strlen(elements->tags); + SML_CHECK_CODE(smlParseTagsFromJSON(info, tagsJson, elements)); if (unlikely(info->reRun)) { - return TSDB_CODE_SUCCESS; + goto END; } // Parse timestamp @@ -489,22 +465,29 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo if (unlikely(ts < 0)) { char* tmp = cJSON_PrintUnformatted(tsJson); if (tmp == NULL) { - uError("cJSON_PrintUnformatted failed since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %" PRId64, info->id, info->msgBuf.buf, ts); } else { uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %s %" PRId64, info->id, info->msgBuf.buf,tmp, ts); taosMemoryFree(tmp); } - return TSDB_CODE_INVALID_TIMESTAMP; + code = TSDB_CODE_INVALID_TIMESTAMP; + goto END; } SSmlKv kvTs = {0}; smlBuildTsKv(&kvTs, ts); if (info->dataFormat){ - ret = smlParseEndTelnetJsonFormat(info, elements, &kvTs, &kv); + code = smlParseEndTelnetJsonFormat(info, elements, &kvTs, &kv); } else { - ret = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv); + code = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv); } - return ret; + SML_CHECK_CODE(code); + taosMemoryFreeClear(info->preLine.tags); + info->preLine = *elements; + elements->tags = NULL; + +END: + taosMemoryFree(elements->tags); + RETURN } int32_t smlParseJSONExt(SSmlHandle *info, char *payload) { @@ -527,23 +510,7 @@ int32_t smlParseJSONExt(SSmlHandle *info, char *payload) { return TSDB_CODE_TSC_INVALID_JSON; } - if (unlikely(info->lines != NULL)) { - for (int i = 0; i < info->lineNum; i++) { - taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv); - if (info->lines[i].measureTagsLen != 0) taosMemoryFree(info->lines[i].measureTag); - } - taosMemoryFree(info->lines); - info->lines = NULL; - } info->lineNum = payloadNum; - info->dataFormat = true; - - ret = smlClearForRerun(info); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - - info->parseJsonByLib = true; cJSON *head = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : info->root->child; int cnt = 0; @@ -552,6 +519,8 @@ int32_t smlParseJSONExt(SSmlHandle *info, char *payload) { if (info->dataFormat) { SSmlLineInfo element = {0}; ret = smlParseJSONStringExt(info, dataPoint, &element); + if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag); + } else { ret = smlParseJSONStringExt(info, dataPoint, info->lines + cnt); } diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index 5ecf4e2206..e2e60ee7c4 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -312,6 +312,7 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){ static int32_t smlParseTagLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *elements) { int32_t code = 0; + int32_t lino = 0; bool isSameCTable = IS_SAME_CHILD_TABLE; if(isSameCTable){ return TSDB_CODE_SUCCESS; @@ -327,7 +328,7 @@ END: if(info->reRun){ return TSDB_CODE_SUCCESS; } - return code; + RETURN } static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement) { diff --git a/source/client/src/clientSmlTelnet.c b/source/client/src/clientSmlTelnet.c index bf422675ef..4deb6a34cd 100644 --- a/source/client/src/clientSmlTelnet.c +++ b/source/client/src/clientSmlTelnet.c @@ -149,19 +149,20 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS } int32_t code = 0; + int32_t lino = 0; if(info->dataFormat){ SML_CHECK_CODE(smlProcessSuperTable(info, elements)); } SML_CHECK_CODE(smlProcessTagTelnet(info, data, sqlEnd)); SML_CHECK_CODE(smlJoinMeasureTag(elements)); - return smlProcessChildTable(info, elements); + code = smlProcessChildTable(info, elements); END: if(info->reRun){ return TSDB_CODE_SUCCESS; } - return code; + RETURN } // format: =[ =] @@ -233,5 +234,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine } else { ret = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv); } + info->preLine = *elements; + return ret; } \ No newline at end of file diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index 1d8d82ccb9..20f2d7dc25 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -105,6 +105,102 @@ int smlProcess_telnet_Test() { return code; } +int smlProcess_json0_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db"); + taos_free_result(pRes); + + pRes = taos_query(taos, "use sml_db"); + taos_free_result(pRes); + + const char *sql[] = { + "[{\"metric\":\"sys.cpu.nice\",\"timestamp\":1662344045,\"value\":9,\"tags\":{\"host\":\"web02\",\"dc\":4}}]"}; + + char *sql1[1] = {0}; + for (int i = 0; i < 1; i++) { + sql1[i] = taosMemoryCalloc(1, 1024); + ASSERT(sql1[i] != NULL); + (void)strncpy(sql1[i], sql[i], 1023); + } + + pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL, + TSDB_SML_TIMESTAMP_NANO_SECONDS); + int code = taos_errno(pRes); + if (code != 0) { + printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); + } else { + printf("%s result:success\n", __FUNCTION__); + } + taos_free_result(pRes); + + for (int i = 0; i < 1; i++) { + taosMemoryFree(sql1[i]); + } + ASSERT(code == 0); + + + const char *sql2[] = { + "[{\"metric\":\"sys.cpu.nice\",\"timestamp\":1662344041,\"value\":13,\"tags\":{\"host\":\"web01\",\"dc\":1}" + "},{\"metric\":\"sys.cpu.nice\",\"timestamp\":1662344042,\"value\":9,\"tags\":{\"host\":\"web02\",\"dc\":4}" + "}]", + }; + + char *sql3[1] = {0}; + for (int i = 0; i < 1; i++) { + sql3[i] = taosMemoryCalloc(1, 1024); + ASSERT(sql3[i] != NULL); + (void)strncpy(sql3[i], sql2[i], 1023); + } + + pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL, + TSDB_SML_TIMESTAMP_NANO_SECONDS); + code = taos_errno(pRes); + if (code != 0) { + printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); + } else { + printf("%s result:success\n", __FUNCTION__); + } + taos_free_result(pRes); + + for (int i = 0; i < 1; i++) { + taosMemoryFree(sql3[i]); + } + + ASSERT(code == 0); + + + // TD-22903 + const char *sql4[] = { + "[{\"metric\": \"test_us\", \"timestamp\": {\"value\": 1626006833639, \"type\": \"ms\"}, \"value\": true, \"tags\": {\"t0\": true}}, {\"metric\": \"test_us\", \"timestamp\": {\"value\": 1626006833638, \"type\": \"ms\"}, \"value\": false, \"tags\": {\"t0\": true}}]" + }; + char *sql5[1] = {0}; + for (int i = 0; i < 1; i++) { + sql5[i] = taosMemoryCalloc(1, 1024); + ASSERT(sql5[i] != NULL); + (void)strncpy(sql5[i], sql4[i], 1023); + } + + pRes = taos_schemaless_insert(taos, (char **)sql5, sizeof(sql5) / sizeof(sql5[0]), TSDB_SML_JSON_PROTOCOL, + TSDB_SML_TIMESTAMP_NANO_SECONDS); + code = taos_errno(pRes); + if (code != 0) { + printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); + } else { + printf("%s result:success\n", __FUNCTION__); + } + taos_free_result(pRes); + + for (int i = 0; i < 1; i++) { + taosMemoryFree(sql5[i]); + } + ASSERT(code == 0); + + taos_close(taos); + + return code; +} + int smlProcess_json1_Test() { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -2135,8 +2231,8 @@ int main(int argc, char *argv[]) { taos_options(TSDB_OPTION_CONFIGDIR, argv[1]); } - int ret = 0; - ret = sml_ts5528_test(); +// int ret = smlProcess_json0_Test(); + int ret = sml_ts5528_test(); ASSERT(!ret); ret = sml_td29691_Test(); ASSERT(ret); @@ -2146,8 +2242,8 @@ int main(int argc, char *argv[]) { ASSERT(!ret); ret = sml_td18789_Test(); ASSERT(!ret); - ret = sml_td24070_Test(); - ASSERT(!ret); +// ret = sml_td24070_Test(); +// ASSERT(!ret); ret = sml_td23881_Test(); ASSERT(ret); ret = sml_escape_Test();