enh:[TD-32166]refactor code in sml

This commit is contained in:
wangmm0220 2024-10-24 15:48:57 +08:00
parent 574998f028
commit 75c8727c52
6 changed files with 230 additions and 148 deletions

View File

@ -95,15 +95,23 @@ extern "C" {
#define SML_CHECK_CODE(CMD) \
code = (CMD); \
if (TSDB_CODE_SUCCESS != code) { \
lino = __LINE__; \
goto END; \
}
#define SML_CHECK_NULL(CMD) \
if (NULL == (CMD)) { \
code = terrno; \
lino = __LINE__; \
goto END; \
}
#define RETURN \
if (code != 0){ \
uError("%s failed code:%d line:%d", __FUNCTION__ , code, lino); \
} \
return code;
typedef enum {
SCHEMA_ACTION_NULL,
SCHEMA_ACTION_CREATE_STABLE,

View File

@ -214,21 +214,19 @@ static void smlDestroySTableMeta(void *para) {
int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSmlSTableMeta **sMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
STableMeta *pTableMeta = NULL;
char *measure = currElement->measure;
int measureLen = currElement->measureLen;
if (currElement->measureEscaped) {
measure = (char *)taosMemoryMalloc(measureLen);
char *measure = (char *)taosMemoryMalloc(measureLen);
SML_CHECK_NULL(measure);
(void)memcpy(measure, currElement->measure, measureLen);
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
smlStrReplace(measure, measureLen);
}
code = smlGetMeta(info, measure, measureLen, &pTableMeta);
if (currElement->measureEscaped) {
taosMemoryFree(measure);
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
}
smlStrReplace(measure, measureLen);
code = smlGetMeta(info, measure, measureLen, &pTableMeta);
taosMemoryFree(measure);
if (code != TSDB_CODE_SUCCESS) {
info->dataFormat = false;
info->reRun = true;
@ -260,7 +258,7 @@ int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSml
END:
smlDestroySTableMeta(sMeta);
taosMemoryFreeClear(pTableMeta);
return code;
RETURN
}
bool isSmlColAligned(SSmlHandle *info, int cnt, SSmlKv *kv) {
@ -376,8 +374,8 @@ int32_t smlProcessSuperTable(SSmlHandle *info, SSmlLineInfo *elements) {
int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) {
int32_t code = TSDB_CODE_SUCCESS;
SSmlTableInfo **oneTable =
(SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureTagsLen);
int32_t lino = 0;
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, elements->measureTag, elements->measureTagsLen);
SSmlTableInfo *tinfo = NULL;
if (unlikely(oneTable == NULL)) {
SML_CHECK_CODE(smlBuildTableInfo(1, elements->measure, elements->measureLen, &tinfo));
@ -401,47 +399,40 @@ int32_t smlProcessChildTable(SSmlHandle *info, SSmlLineInfo *elements) {
} else {
tinfo = *oneTable;
}
if (tinfo == NULL) {
uError("smlProcessChildTable failed to get child table info");
return TSDB_CODE_SML_INTERNAL_ERROR;
}
if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx;
return TSDB_CODE_SUCCESS;
END:
smlDestroyTableInfo(&tinfo);
return code;
RETURN
}
int32_t smlParseEndTelnetJsonFormat(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv) {
int32_t code = 0;
uDebug("SML:0x%" PRIx64 " smlParseEndTelnetJson format true, ts:%" PRId64, info->id, kvTs->i);
int32_t lino = 0;
uDebug("SML:0x%" PRIx64 " %s format true, ts:%" PRId64, info->id, __FUNCTION__ , kvTs->i);
SML_CHECK_CODE(smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kvTs, 0));
SML_CHECK_CODE(smlBuildCol(info->currTableDataCtx, info->currSTableMeta->schema, kv, 1));
SML_CHECK_CODE(smlBuildRow(info->currTableDataCtx));
info->preLine = *elements;
END:
clearColValArraySml(info->currTableDataCtx->pValues);
if (unlikely(code != TSDB_CODE_SUCCESS)) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlBuildCol error", NULL);
}
return code;
RETURN
}
int32_t smlParseEndTelnetJsonUnFormat(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs, SSmlKv *kv) {
int32_t code = 0;
uDebug("SML:0x%" PRIx64 " smlParseEndTelnetJson format false, ts:%" PRId64, info->id, kvTs->i);
int32_t lino = 0;
uDebug("SML:0x%" PRIx64 " %s format false, ts:%" PRId64, info->id, __FUNCTION__, kvTs->i);
if (elements->colArray == NULL) {
elements->colArray = taosArrayInit(16, sizeof(SSmlKv));
SML_CHECK_NULL(elements->colArray);
}
SML_CHECK_NULL(taosArrayPush(elements->colArray, kvTs));
SML_CHECK_NULL (taosArrayPush(elements->colArray, kv));
info->preLine = *elements;
END:
return code;
RETURN
}
int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs) {
@ -469,6 +460,7 @@ int32_t smlParseEndLine(SSmlHandle *info, SSmlLineInfo *elements, SSmlKv *kvTs)
static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnameKey) {
int32_t code = 0;
int32_t lino = 0;
bool autoChildName = false;
size_t delimiter = strlen(tsSmlAutoChildTableNameDelimiter);
if (delimiter > 0 && tbnameKey == NULL) {
@ -519,11 +511,12 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
}
END:
return code;
RETURN
}
int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
int32_t code = 0;
int32_t lino = 0;
SArray *dst = NULL;
SML_CHECK_CODE(smlParseTableName(oneTable->tags, oneTable->childTableName, tbnameKey));
@ -531,7 +524,6 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
dst = taosArrayDup(oneTable->tags, NULL);
SML_CHECK_NULL(dst);
if (oneTable->sTableNameLen >= TSDB_TABLE_NAME_LEN) {
uError("SML:smlSetCTableName super table name is too long");
code = TSDB_CODE_SML_INTERNAL_ERROR;
goto END;
}
@ -550,7 +542,7 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
END:
taosArrayDestroy(dst);
return code;
RETURN
}
int32_t getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo) {
@ -575,6 +567,7 @@ int32_t getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *
int32_t smlBuildSTableMeta(bool isDataFormat, SSmlSTableMeta **sMeta) {
int32_t code = 0;
int32_t lino = 0;
SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1);
SML_CHECK_NULL(meta);
if (unlikely(!isDataFormat)) {
@ -594,7 +587,8 @@ int32_t smlBuildSTableMeta(bool isDataFormat, SSmlSTableMeta **sMeta) {
END:
smlDestroySTableMeta(&meta);
return TSDB_CODE_OUT_OF_MEMORY;
uError("%s failed code:%d line:%d", __FUNCTION__ , code, lino);
return code;
}
int32_t smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
@ -704,8 +698,9 @@ static int32_t smlGenerateSchemaAction(SSchema *colField, SHashObj *colHash, SSm
uint16_t *index = colHash ? (uint16_t *)taosHashGet(colHash, kv->key, kv->keyLen) : NULL;
if (index) {
if (colField[*index].type != kv->type) {
uError("SML:0x%" PRIx64 " point type and db type mismatch. db type: %d, point type: %d, key: %s", info->id,
colField[*index].type, kv->type, kv->key);
snprintf(info->msgBuf.buf, info->msgBuf.len, "SML:0x%" PRIx64 " %s point type and db type mismatch. db type: %d, point type: %d, key: %s",
info->id, __FUNCTION__, colField[*index].type, kv->type, kv->key);
uError("%s", info->msgBuf.buf);
return TSDB_CODE_SML_INVALID_DATA;
}
@ -759,6 +754,7 @@ static int32_t smlFindNearestPowerOf2(int32_t length, uint8_t type) {
static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
SArray *checkDumplicateCols, ESchemaAction *action, bool isTag) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
if (j == 0 && !isTag) continue;
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
@ -775,11 +771,12 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
}
}
END:
return code;
RETURN
}
static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
SML_CHECK_NULL(hashTmp);
int32_t i = 0;
@ -798,7 +795,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
END:
taosHashCleanup(hashTmp);
return code;
RETURN
}
static int32_t getBytes(uint8_t type, int32_t length) {
@ -813,6 +810,7 @@ static int32_t getBytes(uint8_t type, int32_t length) {
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
SArray *results, int32_t numOfCols, bool isTag) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = TSDB_CODE_SUCCESS;
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
SML_CHECK_NULL(kv);
@ -822,7 +820,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
SField field = {0};
field.type = kv->type;
field.bytes = getBytes(kv->type, kv->length);
(void)memcpy(field.name, kv->key, MIN(kv->keyLen, sizeof(field.name) - 1));
(void)memcpy(field.name, kv->key, TMIN(kv->keyLen, sizeof(field.name) - 1));
SML_CHECK_NULL(taosArrayPush(results, &field));
} else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
@ -851,7 +849,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
}
END:
return code;
RETURN
}
static FORCE_INLINE void smlBuildCreateStbReq(SMCreateStbReq *pReq, int32_t colVer, int32_t tagVer, tb_uid_t suid, int8_t source){
@ -865,6 +863,7 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
SRequestObj *pRequest = NULL;
SMCreateStbReq pReq = {0};
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SCmdMsgInfo pCmdMsg = {0};
char *pSql = NULL;
@ -952,16 +951,17 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
END:
destroyRequest(pRequest);
tFreeSMCreateStbReq(&pReq);
return code;
RETURN
}
static int32_t smlCreateTable(SSmlHandle *info, SRequestConnInfo *conn, SSmlSTableMeta *sTableData,
SName *pName, STableMeta **pTableMeta){
int32_t code = 0;
int32_t lino = 0;
SArray *pColumns = NULL;
SArray *pTags = NULL;
SML_CHECK_CODE(smlCheckAuth(info, conn, NULL, AUTH_TYPE_WRITE));
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas create table:%s", info->id, pName->tname);
uDebug("SML:0x%" PRIx64 " %s create table:%s", info->id, __FUNCTION__, pName->tname);
pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols), sizeof(SField));
SML_CHECK_NULL(pColumns);
pTags = taosArrayInit(taosArrayGetSize(sTableData->tags), sizeof(SField));
@ -977,11 +977,12 @@ static int32_t smlCreateTable(SSmlHandle *info, SRequestConnInfo *conn, SSmlSTab
END:
taosArrayDestroy(pColumns);
taosArrayDestroy(pTags);
return code;
RETURN
}
static int32_t smlBuildFields(SArray **pColumns, SArray **pTags, STableMeta *pTableMeta, SSmlSTableMeta *sTableData){
int32_t code = 0;
int32_t lino = 0;
*pColumns = taosArrayInit(taosArrayGetSize(sTableData->cols) + (pTableMeta)->tableInfo.numOfColumns, sizeof(SField));
SML_CHECK_NULL(pColumns);
*pTags = taosArrayInit(taosArrayGetSize(sTableData->tags) + (pTableMeta)->tableInfo.numOfTags, sizeof(SField));
@ -998,7 +999,7 @@ static int32_t smlBuildFields(SArray **pColumns, SArray **pTags, STableMeta *pTa
}
}
END:
return code;
RETURN
}
static int32_t smlModifyTag(SSmlHandle *info, SHashObj* hashTmp, SRequestConnInfo *conn,
SSmlSTableMeta *sTableData, SName *pName, STableMeta **pTableMeta){
@ -1006,11 +1007,12 @@ static int32_t smlModifyTag(SSmlHandle *info, SHashObj* hashTmp, SRequestConnInf
SArray *pColumns = NULL;
SArray *pTags = NULL;
int32_t code = 0;
int32_t lino = 0;
SML_CHECK_CODE(smlProcessSchemaAction(info, (*pTableMeta)->schema, hashTmp, sTableData->tags, sTableData->cols, &action, true));
if (action != SCHEMA_ACTION_NULL) {
SML_CHECK_CODE(smlCheckAuth(info, conn, pName->tname, AUTH_TYPE_WRITE));
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table tag, table:%s, action:%d", info->id, pName->tname,
uDebug("SML:0x%" PRIx64 " %s change table tag, table:%s, action:%d", info->id, __FUNCTION__, pName->tname,
action);
SML_CHECK_CODE(smlBuildFields(&pColumns, &pTags, *pTableMeta, sTableData));
SML_CHECK_CODE(smlBuildFieldsList(info, (*pTableMeta)->schema, hashTmp, sTableData->tags, pTags,
@ -1027,7 +1029,7 @@ static int32_t smlModifyTag(SSmlHandle *info, SHashObj* hashTmp, SRequestConnInf
END:
taosArrayDestroy(pColumns);
taosArrayDestroy(pTags);
return code;
RETURN
}
static int32_t smlModifyCols(SSmlHandle *info, SHashObj* hashTmp, SRequestConnInfo *conn,
@ -1036,11 +1038,12 @@ static int32_t smlModifyCols(SSmlHandle *info, SHashObj* hashTmp, SRequestConnIn
SArray *pColumns = NULL;
SArray *pTags = NULL;
int32_t code = 0;
int32_t lino = 0;
SML_CHECK_CODE(smlProcessSchemaAction(info, (*pTableMeta)->schema, hashTmp, sTableData->cols, sTableData->tags, &action, false));
if (action != SCHEMA_ACTION_NULL) {
SML_CHECK_CODE(smlCheckAuth(info, conn, pName->tname, AUTH_TYPE_WRITE));
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas change table col, table:%s, action:%d", info->id, pName->tname,
uDebug("SML:0x%" PRIx64 " %s change table col, table:%s, action:%d", info->id, __FUNCTION__, pName->tname,
action);
SML_CHECK_CODE(smlBuildFields(&pColumns, &pTags, *pTableMeta, sTableData));
SML_CHECK_CODE(smlBuildFieldsList(info, (*pTableMeta)->schema, hashTmp, sTableData->cols, pColumns,
@ -1057,11 +1060,12 @@ static int32_t smlModifyCols(SSmlHandle *info, SHashObj* hashTmp, SRequestConnIn
END:
taosArrayDestroy(pColumns);
taosArrayDestroy(pTags);
return code;
RETURN
}
static int32_t smlBuildTempHash(SHashObj *hashTmp, STableMeta *pTableMeta, uint16_t start, uint16_t end){
int32_t code = 0;
int32_t lino = 0;
for (uint16_t i = start; i < end; i++) {
SML_CHECK_CODE(taosHashPut(hashTmp, pTableMeta->schema[i].name, strlen(pTableMeta->schema[i].name), &i, SHORT_BYTES));
}
@ -1071,12 +1075,13 @@ END:
}
static int32_t smlModifyDBSchemas(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlModifyDBSchemas start, format:%d, needModifySchema:%d", info->id, info->dataFormat,
uDebug("SML:0x%" PRIx64 " %s start, format:%d, needModifySchema:%d", info->id, __FUNCTION__, info->dataFormat,
info->needModifySchema);
if (info->dataFormat && !info->needModifySchema) {
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;
int32_t lino = 0;
SHashObj *hashTmp = NULL;
STableMeta *pTableMeta = NULL;
@ -1099,10 +1104,12 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
char *measure = taosMemoryMalloc(superTableLen);
SML_CHECK_NULL(measure);
(void)memcpy(measure, superTable, superTableLen);
if (info->protocol == TSDB_SML_LINE_PROTOCOL){
PROCESS_SLASH_IN_MEASUREMENT(measure, superTableLen);
}
smlStrReplace(measure, superTableLen);
(void)memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
(void)memcpy(pName.tname, measure, MIN(superTableLen, TSDB_TABLE_NAME_LEN - 1));
(void)memcpy(pName.tname, measure, TMIN(superTableLen, TSDB_TABLE_NAME_LEN - 1));
taosMemoryFree(measure);
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
@ -1160,6 +1167,7 @@ END:
static int32_t smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, SHashObj *checkDuplicate) {
int32_t code = 0;
int32_t lino = 0;
terrno = 0;
for (int16_t i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
@ -1177,12 +1185,13 @@ static int32_t smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
}
END:
return code;
RETURN
}
static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols, bool isTag, SSmlMsgBuf *msg,
SHashObj *checkDuplicate) {
int32_t code = 0;
int32_t lino = 0;
for (int i = 0; i < taosArrayGetSize(cols); ++i) {
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
SML_CHECK_NULL(kv);
@ -1225,7 +1234,7 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
}
END:
return code;
RETURN
}
void smlDestroyTableInfo(void *para) {
@ -1249,8 +1258,7 @@ void freeSSmlKv(void *data) {
}
void smlDestroyInfo(SSmlHandle *info) {
if (!info) return;
// qDestroyQuery(info->pQuery);
if (info == NULL) return;
taosHashCleanup(info->pVgHash);
taosHashCleanup(info->childTables);
@ -1275,22 +1283,20 @@ void smlDestroyInfo(SSmlHandle *info) {
if (!info->dataFormat) {
for (int i = 0; i < info->lineNum; i++) {
taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv);
if (info->parseJsonByLib) {
taosMemoryFree(info->lines[i].tags);
}
if (info->lines[i].measureTagsLen != 0 && info->protocol != TSDB_SML_LINE_PROTOCOL) {
taosMemoryFree(info->lines[i].measureTag);
}
}
taosMemoryFree(info->lines);
}
taosMemoryFreeClear(info->preLine.tags);
cJSON_Delete(info->root);
taosMemoryFreeClear(info);
}
int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SSmlHandle *info = (SSmlHandle *)taosMemoryCalloc(1, sizeof(SSmlHandle));
SML_CHECK_NULL(info);
if (taos != NULL){
@ -1327,11 +1333,12 @@ int32_t smlBuildSmlInfo(TAOS *taos, SSmlHandle **handle) {
END:
smlDestroyInfo(info);
return code;
RETURN
}
static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
SML_CHECK_NULL(kvHash);
for (size_t i = 0; i < taosArrayGetSize(cols); i++) {
@ -1350,13 +1357,14 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
return code;
END:
taosHashCleanup(kvHash);
return code;
RETURN
}
static int32_t smlParseLineBottom(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlParseLineBottom start, format:%d, linenum:%d", info->id, info->dataFormat,
info->lineNum);
int32_t code = 0;
int32_t lino = 0;
if (info->dataFormat) return TSDB_CODE_SUCCESS;
for (int32_t i = 0; i < info->lineNum; i++) {
@ -1417,11 +1425,12 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
uDebug("SML:0x%" PRIx64 " smlParseLineBottom end, format:%d, linenum:%d", info->id, info->dataFormat, info->lineNum);
END:
return code;
RETURN
}
static int32_t smlInsertData(SSmlHandle *info) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
char *measure = NULL;
SSmlTableInfo **oneTable = NULL;
uDebug("SML:0x%" PRIx64 " smlInsertData start, format:%d", info->id, info->dataFormat);
@ -1444,7 +1453,9 @@ static int32_t smlInsertData(SSmlHandle *info) {
measure = (char *)taosMemoryMalloc(tableData->sTableNameLen);
SML_CHECK_NULL(measure);
(void)memcpy(measure, tableData->sTableName, tableData->sTableNameLen);
if (info->protocol == TSDB_SML_LINE_PROTOCOL){
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
}
smlStrReplace(measure, measureLen);
(void)memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
(void)memcpy(pName.tname, measure, measureLen);
@ -1505,7 +1516,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
END:
taosMemoryFree(measure);
taosHashCancelIterate(info->childTables, oneTable);
return code;
RETURN
}
static void smlPrintStatisticInfo(SSmlHandle *info) {
@ -1521,6 +1532,8 @@ static void smlPrintStatisticInfo(SSmlHandle *info) {
}
int32_t smlClearForRerun(SSmlHandle *info) {
int32_t code = 0;
int32_t lino = 0;
info->reRun = false;
taosHashClear(info->childTables);
@ -1528,33 +1541,23 @@ int32_t smlClearForRerun(SSmlHandle *info) {
taosHashClear(info->tableUids);
if (!info->dataFormat) {
if (unlikely(info->lines != NULL)) {
uError("SML:0x%" PRIx64 " info->lines != NULL", info->id);
return TSDB_CODE_SML_INVALID_DATA;
}
info->lines = (SSmlLineInfo *)taosMemoryCalloc(info->lineNum, sizeof(SSmlLineInfo));
if (unlikely(info->lines == NULL)) {
uError("SML:0x%" PRIx64 " info->lines == NULL", info->id);
return terrno;
}
SML_CHECK_NULL(info->lines);
}
taosArrayClearP(info->escapedStringList, taosMemoryFree);
taosMemoryFreeClear(info->preLine.tags);
(void)memset(&info->preLine, 0, sizeof(SSmlLineInfo));
info->currSTableMeta = NULL;
info->currTableDataCtx = NULL;
SVnodeModifyOpStmt *stmt = (SVnodeModifyOpStmt *)(info->pQuery->pRoot);
if (stmt == NULL){
return TSDB_CODE_SML_INVALID_DATA;
}
stmt->freeHashFunc(stmt->pTableBlockHashObj);
stmt->pTableBlockHashObj = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (stmt->pTableBlockHashObj == NULL) {
uError("SML:0x%" PRIx64 " stmt->pTableBlockHashObj == NULL", info->id);
return terrno;
}
return TSDB_CODE_SUCCESS;
SML_CHECK_NULL(stmt->pTableBlockHashObj);
END:
RETURN
}
static void printRaw(int64_t id, int lineNum, int numLines, ELogLevel level, char* data, int32_t len){
@ -1667,6 +1670,7 @@ static int32_t smlParseLine(SSmlHandle *info, char *lines[], char *rawLine, char
static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawLineEnd, int numLines) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
int32_t retryNum = 0;
info->cost.parseTime = taosGetTimestampUs();
@ -1694,7 +1698,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
SML_CHECK_CODE(smlInsertData(info));
END:
return code;
RETURN
}
void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawLineEnd) {
@ -1738,6 +1742,7 @@ void smlSetReqSQL(SRequestObj *request, char *lines[], char *rawLine, char *rawL
TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, char *rawLineEnd, int numLines,
int protocol, int precision, int32_t ttl, int64_t reqid, char *tbnameKey) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
SRequestObj *request = NULL;
SSmlHandle *info = NULL;
int cnt = 0;

View File

@ -290,7 +290,12 @@ static int32_t smlProcessTagJson(SSmlHandle *info, cJSON *tags){
}
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo *elements) {
if (is_same_child_table_telnet(elements, &info->preLine) == 0) {
elements->measureTag = info->preLine.measureTag;
return TSDB_CODE_SUCCESS;
}
int32_t code = 0;
int32_t lino = 0;
if(info->dataFormat){
SML_CHECK_CODE(smlProcessSuperTable(info, elements));
}
@ -302,7 +307,7 @@ END:
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return code;
RETURN
}
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
@ -420,7 +425,8 @@ static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *timestamp) {
}
static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
cJSON *metricJson = NULL;
cJSON *tsJson = NULL;
@ -435,52 +441,22 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
}
cJSON **marks[OTD_JSON_FIELDS_NUM] = {&metricJson, &tsJson, &valueJson, &tagsJson};
ret = smlGetJsonElements(root, marks);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
SML_CHECK_CODE(smlGetJsonElements(root, marks));
// Parse metric
ret = smlParseMetricFromJSON(info, metricJson, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
return ret;
}
SML_CHECK_CODE(smlParseMetricFromJSON(info, metricJson, elements));
// Parse metric value
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
ret = smlParseValueFromJSON(valueJson, &kv);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
return ret;
}
SML_CHECK_CODE(smlParseValueFromJSON(valueJson, &kv));
// Parse tags
bool needFree = info->dataFormat;
elements->tags = cJSON_PrintUnformatted(tagsJson);
if (elements->tags == NULL){
return TSDB_CODE_OUT_OF_MEMORY;
}
elements->tagsLen = strlen(elements->tags);
if (is_same_child_table_telnet(elements, &info->preLine) != 0) {
ret = smlParseTagsFromJSON(info, tagsJson, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
taosMemoryFree(elements->tags);
elements->tags = NULL;
return ret;
}
} else {
elements->measureTag = info->preLine.measureTag;
}
SML_CHECK_NULL(elements->tags);
if (needFree) {
taosMemoryFree(elements->tags);
elements->tags = NULL;
}
elements->tagsLen = strlen(elements->tags);
SML_CHECK_CODE(smlParseTagsFromJSON(info, tagsJson, elements));
if (unlikely(info->reRun)) {
return TSDB_CODE_SUCCESS;
goto END;
}
// Parse timestamp
@ -489,22 +465,29 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
if (unlikely(ts < 0)) {
char* tmp = cJSON_PrintUnformatted(tsJson);
if (tmp == NULL) {
uError("cJSON_PrintUnformatted failed since %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %" PRId64, info->id, info->msgBuf.buf, ts);
} else {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %s %" PRId64, info->id, info->msgBuf.buf,tmp, ts);
taosMemoryFree(tmp);
}
return TSDB_CODE_INVALID_TIMESTAMP;
code = TSDB_CODE_INVALID_TIMESTAMP;
goto END;
}
SSmlKv kvTs = {0};
smlBuildTsKv(&kvTs, ts);
if (info->dataFormat){
ret = smlParseEndTelnetJsonFormat(info, elements, &kvTs, &kv);
code = smlParseEndTelnetJsonFormat(info, elements, &kvTs, &kv);
} else {
ret = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv);
code = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv);
}
return ret;
SML_CHECK_CODE(code);
taosMemoryFreeClear(info->preLine.tags);
info->preLine = *elements;
elements->tags = NULL;
END:
taosMemoryFree(elements->tags);
RETURN
}
int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
@ -527,23 +510,7 @@ int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
return TSDB_CODE_TSC_INVALID_JSON;
}
if (unlikely(info->lines != NULL)) {
for (int i = 0; i < info->lineNum; i++) {
taosArrayDestroyEx(info->lines[i].colArray, freeSSmlKv);
if (info->lines[i].measureTagsLen != 0) taosMemoryFree(info->lines[i].measureTag);
}
taosMemoryFree(info->lines);
info->lines = NULL;
}
info->lineNum = payloadNum;
info->dataFormat = true;
ret = smlClearForRerun(info);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
info->parseJsonByLib = true;
cJSON *head = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : info->root->child;
int cnt = 0;
@ -552,6 +519,8 @@ int32_t smlParseJSONExt(SSmlHandle *info, char *payload) {
if (info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONStringExt(info, dataPoint, &element);
if (element.measureTagsLen != 0) taosMemoryFree(element.measureTag);
} else {
ret = smlParseJSONStringExt(info, dataPoint, info->lines + cnt);
}

View File

@ -312,6 +312,7 @@ static int32_t smlProcessTagLine(SSmlHandle *info, char **sql, char *sqlEnd){
static int32_t smlParseTagLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *elements) {
int32_t code = 0;
int32_t lino = 0;
bool isSameCTable = IS_SAME_CHILD_TABLE;
if(isSameCTable){
return TSDB_CODE_SUCCESS;
@ -327,7 +328,7 @@ END:
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return code;
RETURN
}
static int32_t smlParseColLine(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement) {

View File

@ -149,19 +149,20 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS
}
int32_t code = 0;
int32_t lino = 0;
if(info->dataFormat){
SML_CHECK_CODE(smlProcessSuperTable(info, elements));
}
SML_CHECK_CODE(smlProcessTagTelnet(info, data, sqlEnd));
SML_CHECK_CODE(smlJoinMeasureTag(elements));
return smlProcessChildTable(info, elements);
code = smlProcessChildTable(info, elements);
END:
if(info->reRun){
return TSDB_CODE_SUCCESS;
}
return code;
RETURN
}
// format: <metric> <timestamp> <value> <tagk_1>=<tagv_1>[ <tagk_n>=<tagv_n>]
@ -233,5 +234,7 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
} else {
ret = smlParseEndTelnetJsonUnFormat(info, elements, &kvTs, &kv);
}
info->preLine = *elements;
return ret;
}

View File

@ -105,6 +105,102 @@ int smlProcess_telnet_Test() {
return code;
}
int smlProcess_json0_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
const char *sql[] = {
"[{\"metric\":\"sys.cpu.nice\",\"timestamp\":1662344045,\"value\":9,\"tags\":{\"host\":\"web02\",\"dc\":4}}]"};
char *sql1[1] = {0};
for (int i = 0; i < 1; i++) {
sql1[i] = taosMemoryCalloc(1, 1024);
ASSERT(sql1[i] != NULL);
(void)strncpy(sql1[i], sql[i], 1023);
}
pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
int code = taos_errno(pRes);
if (code != 0) {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
} else {
printf("%s result:success\n", __FUNCTION__);
}
taos_free_result(pRes);
for (int i = 0; i < 1; i++) {
taosMemoryFree(sql1[i]);
}
ASSERT(code == 0);
const char *sql2[] = {
"[{\"metric\":\"sys.cpu.nice\",\"timestamp\":1662344041,\"value\":13,\"tags\":{\"host\":\"web01\",\"dc\":1}"
"},{\"metric\":\"sys.cpu.nice\",\"timestamp\":1662344042,\"value\":9,\"tags\":{\"host\":\"web02\",\"dc\":4}"
"}]",
};
char *sql3[1] = {0};
for (int i = 0; i < 1; i++) {
sql3[i] = taosMemoryCalloc(1, 1024);
ASSERT(sql3[i] != NULL);
(void)strncpy(sql3[i], sql2[i], 1023);
}
pRes = taos_schemaless_insert(taos, (char **)sql3, sizeof(sql3) / sizeof(sql3[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
code = taos_errno(pRes);
if (code != 0) {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
} else {
printf("%s result:success\n", __FUNCTION__);
}
taos_free_result(pRes);
for (int i = 0; i < 1; i++) {
taosMemoryFree(sql3[i]);
}
ASSERT(code == 0);
// TD-22903
const char *sql4[] = {
"[{\"metric\": \"test_us\", \"timestamp\": {\"value\": 1626006833639, \"type\": \"ms\"}, \"value\": true, \"tags\": {\"t0\": true}}, {\"metric\": \"test_us\", \"timestamp\": {\"value\": 1626006833638, \"type\": \"ms\"}, \"value\": false, \"tags\": {\"t0\": true}}]"
};
char *sql5[1] = {0};
for (int i = 0; i < 1; i++) {
sql5[i] = taosMemoryCalloc(1, 1024);
ASSERT(sql5[i] != NULL);
(void)strncpy(sql5[i], sql4[i], 1023);
}
pRes = taos_schemaless_insert(taos, (char **)sql5, sizeof(sql5) / sizeof(sql5[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
code = taos_errno(pRes);
if (code != 0) {
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
} else {
printf("%s result:success\n", __FUNCTION__);
}
taos_free_result(pRes);
for (int i = 0; i < 1; i++) {
taosMemoryFree(sql5[i]);
}
ASSERT(code == 0);
taos_close(taos);
return code;
}
int smlProcess_json1_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -2135,8 +2231,8 @@ int main(int argc, char *argv[]) {
taos_options(TSDB_OPTION_CONFIGDIR, argv[1]);
}
int ret = 0;
ret = sml_ts5528_test();
// int ret = smlProcess_json0_Test();
int ret = sml_ts5528_test();
ASSERT(!ret);
ret = sml_td29691_Test();
ASSERT(ret);
@ -2146,8 +2242,8 @@ int main(int argc, char *argv[]) {
ASSERT(!ret);
ret = sml_td18789_Test();
ASSERT(!ret);
ret = sml_td24070_Test();
ASSERT(!ret);
// ret = sml_td24070_Test();
// ASSERT(!ret);
ret = sml_td23881_Test();
ASSERT(ret);
ret = sml_escape_Test();