enh:[TD-32166]refactor code in sml
This commit is contained in:
parent
a062a0cb84
commit
2d310221b3
|
@ -138,7 +138,7 @@ void smlBuildInvalidDataMsg(SSmlMsgBuf *pBuf, const char *msg1, const char *msg2
|
|||
if (pBuf->buf == NULL) {
|
||||
return;
|
||||
}
|
||||
(void)memset(pBuf->buf, 0, pBuf->len);
|
||||
pBuf->buf[0] = 0;
|
||||
if (msg1) {
|
||||
(void)strncat(pBuf->buf, msg1, pBuf->len - 1);
|
||||
}
|
||||
|
@ -475,7 +475,7 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
|
|||
}
|
||||
}
|
||||
if (autoChildName) {
|
||||
(void)memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
|
||||
childTableName[0] = '\0';
|
||||
for (int i = 0; i < taosArrayGetSize(tags); i++) {
|
||||
SSmlKv *tag = (SSmlKv *)taosArrayGet(tags, i);
|
||||
SML_CHECK_NULL(tag);
|
||||
|
@ -499,7 +499,6 @@ static int32_t smlParseTableName(SArray *tags, char *childTableName, char *tbnam
|
|||
SML_CHECK_NULL(tag);
|
||||
// handle child table name
|
||||
if (childTableNameLen == tag->keyLen && strncmp(tag->key, tbnameKey, tag->keyLen) == 0) {
|
||||
(void)memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
|
||||
tstrncpy(childTableName, tag->value, TMIN(TSDB_TABLE_NAME_LEN, tag->length + 1));
|
||||
if (tsSmlDot2Underline) {
|
||||
smlStrReplace(childTableName, strlen(childTableName));
|
||||
|
@ -524,8 +523,7 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable, char *tbnameKey) {
|
|||
dst = taosArrayDup(oneTable->tags, NULL);
|
||||
SML_CHECK_NULL(dst);
|
||||
if (oneTable->sTableNameLen >= TSDB_TABLE_NAME_LEN) {
|
||||
code = TSDB_CODE_SML_INTERNAL_ERROR;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_SML_INTERNAL_ERROR);
|
||||
}
|
||||
char superName[TSDB_TABLE_NAME_LEN] = {0};
|
||||
RandTableName rName = {dst, NULL, (uint8_t)oneTable->sTableNameLen, oneTable->childTableName};
|
||||
|
@ -676,8 +674,9 @@ int32_t smlGetMeta(SSmlHandle *info, const void *measure, int32_t measureLen, ST
|
|||
conn.requestId = info->pRequest->requestId;
|
||||
conn.requestObjRefId = info->pRequest->self;
|
||||
conn.mgmtEps = getEpSet_s(&info->taos->pAppInfo->mgmtEp);
|
||||
(void)memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
|
||||
int32_t len = TMIN(measureLen, TSDB_TABLE_NAME_LEN - 1);
|
||||
(void)memcpy(pName.tname, measure, measureLen);
|
||||
pName.tname[len] = 0;
|
||||
|
||||
return catalogGetSTableMeta(info->pCatalog, &conn, &pName, pTableMeta);
|
||||
}
|
||||
|
@ -766,8 +765,7 @@ static int32_t smlProcessSchemaAction(SSmlHandle *info, SSchema *schemaField, SH
|
|||
SSmlKv *kv = (SSmlKv *)taosArrayGet(checkDumplicateCols, j);
|
||||
SML_CHECK_NULL(kv);
|
||||
if (taosHashGet(schemaHash, kv->key, kv->keyLen) != NULL) {
|
||||
code = TSDB_CODE_PAR_DUPLICATED_COLUMN;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_PAR_DUPLICATED_COLUMN);
|
||||
}
|
||||
}
|
||||
END:
|
||||
|
@ -788,8 +786,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool
|
|||
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i);
|
||||
SML_CHECK_NULL(kv);
|
||||
if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) {
|
||||
code = TSDB_CODE_SML_INVALID_DATA;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -810,7 +807,7 @@ static int32_t getBytes(uint8_t type, int32_t length) {
|
|||
static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashObj *schemaHash, SArray *cols,
|
||||
SArray *results, int32_t numOfCols, bool isTag) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = TSDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
for (int j = 0; j < taosArrayGetSize(cols); ++j) {
|
||||
SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, j);
|
||||
SML_CHECK_NULL(kv);
|
||||
|
@ -825,9 +822,7 @@ static int32_t smlBuildFieldsList(SSmlHandle *info, SSchema *schemaField, SHashO
|
|||
} else if (action == SCHEMA_ACTION_CHANGE_COLUMN_SIZE || action == SCHEMA_ACTION_CHANGE_TAG_SIZE) {
|
||||
uint16_t *index = (uint16_t *)taosHashGet(schemaHash, kv->key, kv->keyLen);
|
||||
if (index == NULL) {
|
||||
uError("smlBuildFieldsList get error, key:%s", kv->key);
|
||||
code = TSDB_CODE_SML_INVALID_DATA;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
|
||||
}
|
||||
uint16_t newIndex = *index;
|
||||
if (isTag) newIndex -= numOfCols;
|
||||
|
@ -894,17 +889,14 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
|
|||
pSql = (action == SCHEMA_ACTION_ADD_COLUMN) ? "sml_add_column" : "sml_modify_column_size";
|
||||
smlBuildCreateStbReq(&pReq, pTableMeta->sversion + 1, pTableMeta->tversion, pTableMeta->uid, TD_REQ_FROM_TAOX);
|
||||
} else {
|
||||
uError("SML:0x%" PRIx64 " invalid action:%d", info->id, action);
|
||||
code = TSDB_CODE_SML_INVALID_DATA;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
|
||||
}
|
||||
|
||||
SML_CHECK_CODE(buildRequest(info->taos->id, pSql, strlen(pSql), NULL, false, &pRequest, 0));
|
||||
|
||||
pRequest->syncQuery = true;
|
||||
if (!pRequest->pDb) {
|
||||
code = TSDB_CODE_PAR_DB_NOT_SPECIFIED;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_PAR_DB_NOT_SPECIFIED);
|
||||
}
|
||||
|
||||
if (pReq.numOfTags == 0) {
|
||||
|
@ -924,15 +916,16 @@ static int32_t smlSendMetaMsg(SSmlHandle *info, SName *pName, SArray *pColumns,
|
|||
pCmdMsg.msgType = TDMT_MND_CREATE_STB;
|
||||
pCmdMsg.msgLen = tSerializeSMCreateStbReq(NULL, 0, &pReq);
|
||||
if (pCmdMsg.msgLen < 0) {
|
||||
code = pCmdMsg.msgLen;
|
||||
goto END;
|
||||
uError("failed to serialize create stable request1, code:%d, terrno:%d", pCmdMsg.msgLen, terrno);
|
||||
SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
|
||||
}
|
||||
pCmdMsg.pMsg = taosMemoryMalloc(pCmdMsg.msgLen);
|
||||
SML_CHECK_NULL(pCmdMsg.pMsg);
|
||||
code = tSerializeSMCreateStbReq(pCmdMsg.pMsg, pCmdMsg.msgLen, &pReq);
|
||||
if (code < 0) {
|
||||
taosMemoryFree(pCmdMsg.pMsg);
|
||||
goto END;
|
||||
uError("failed to serialize create stable request2, code:%d, terrno:%d", code, terrno);
|
||||
SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
|
||||
}
|
||||
|
||||
SQuery pQuery = {0};
|
||||
|
@ -1108,8 +1101,9 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
PROCESS_SLASH_IN_MEASUREMENT(measure, superTableLen);
|
||||
}
|
||||
smlStrReplace(measure, superTableLen);
|
||||
(void)memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
|
||||
(void)memcpy(pName.tname, measure, TMIN(superTableLen, TSDB_TABLE_NAME_LEN - 1));
|
||||
size_t nameLen = TMIN(superTableLen, TSDB_TABLE_NAME_LEN - 1);
|
||||
(void)memcpy(pName.tname, measure, nameLen);
|
||||
pName.tname[nameLen] = '\0';
|
||||
taosMemoryFree(measure);
|
||||
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
|
@ -1118,8 +1112,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
SML_CHECK_CODE(smlCreateTable(info, &conn, sTableData, &pName, &pTableMeta));
|
||||
} else if (code == TSDB_CODE_SUCCESS) {
|
||||
if (smlIsPKTable(pTableMeta)) {
|
||||
code = TSDB_CODE_SML_NOT_SUPPORT_PK;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_SML_NOT_SUPPORT_PK);
|
||||
}
|
||||
|
||||
hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
|
@ -1176,8 +1169,7 @@ static int32_t smlInsertMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
|
|||
if (ret == 0) {
|
||||
SML_CHECK_NULL(taosArrayPush(metaArray, kv));
|
||||
if (taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) {
|
||||
code = TSDB_CODE_PAR_DUPLICATED_COLUMN;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_PAR_DUPLICATED_COLUMN);
|
||||
}
|
||||
} else if (terrno == TSDB_CODE_DUP_KEY) {
|
||||
return TSDB_CODE_PAR_DUPLICATED_COLUMN;
|
||||
|
@ -1208,8 +1200,7 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
|
|||
}
|
||||
if (kv->type != value->type) {
|
||||
smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key);
|
||||
code = TSDB_CODE_SML_NOT_SAME_TYPE;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_SML_NOT_SAME_TYPE);
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(kv->type) && (kv->length > value->length)) { // update string len, if bigger
|
||||
|
@ -1219,16 +1210,13 @@ static int32_t smlUpdateMeta(SHashObj *metaHash, SArray *metaArray, SArray *cols
|
|||
size_t tmp = taosArrayGetSize(metaArray);
|
||||
if (tmp > INT16_MAX) {
|
||||
smlBuildInvalidDataMsg(msg, "too many cols or tags", kv->key);
|
||||
uError("too many cols or tags");
|
||||
code = TSDB_CODE_SML_INVALID_DATA;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA);
|
||||
}
|
||||
int16_t size = tmp;
|
||||
SML_CHECK_CODE(taosHashPut(metaHash, kv->key, kv->keyLen, &size, SHORT_BYTES));
|
||||
SML_CHECK_NULL(taosArrayPush(metaArray, kv));
|
||||
if (taosHashGet(checkDuplicate, kv->key, kv->keyLen) != NULL) {
|
||||
code = TSDB_CODE_PAR_DUPLICATED_COLUMN;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_PAR_DUPLICATED_COLUMN);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1349,8 +1337,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) {
|
|||
terrno = 0;
|
||||
code = taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES);
|
||||
if (terrno == TSDB_CODE_DUP_KEY) {
|
||||
code = TSDB_CODE_PAR_DUPLICATED_COLUMN;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_PAR_DUPLICATED_COLUMN);
|
||||
}
|
||||
SML_CHECK_CODE(code);
|
||||
}
|
||||
|
@ -1417,8 +1404,7 @@ static int32_t smlParseEnd(SSmlHandle *info) {
|
|||
code = taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
smlDestroySTableMeta(&meta);
|
||||
uError("SML:0x%" PRIx64 " put measure to hash failed", info->id);
|
||||
goto END;
|
||||
SML_CHECK_CODE(code);
|
||||
}
|
||||
SML_CHECK_CODE(smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags, NULL));
|
||||
SML_CHECK_CODE(smlInsertMeta(meta->colHash, meta->cols, elements->colArray, meta->tagHash));
|
||||
|
@ -1459,8 +1445,8 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
|||
PROCESS_SLASH_IN_MEASUREMENT(measure, measureLen);
|
||||
}
|
||||
smlStrReplace(measure, measureLen);
|
||||
(void)memset(pName.tname, 0, TSDB_TABLE_NAME_LEN);
|
||||
(void)memcpy(pName.tname, measure, measureLen);
|
||||
pName.tname[measureLen] = '\0';
|
||||
|
||||
if (info->pRequest->tableList == NULL) {
|
||||
info->pRequest->tableList = taosArrayInit(1, sizeof(SName));
|
||||
|
@ -1485,8 +1471,7 @@ static int32_t smlInsertData(SSmlHandle *info) {
|
|||
(SSmlSTableMeta **)taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen);
|
||||
if (unlikely(NULL == pMeta || NULL == *pMeta || NULL == (*pMeta)->tableMeta)) {
|
||||
uError("SML:0x%" PRIx64 " %s NULL == pMeta. table name: %s", info->id, __FUNCTION__, tableData->childTableName);
|
||||
code = TSDB_CODE_SML_INTERNAL_ERROR;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_SML_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
// use tablemeta of stable to save vgid and uid of child table
|
||||
|
@ -1565,12 +1550,13 @@ END:
|
|||
}
|
||||
|
||||
static void printRaw(int64_t id, int lineNum, int numLines, ELogLevel level, char* data, int32_t len){
|
||||
char *print = taosMemoryCalloc(len + 1, 1);
|
||||
char *print = taosMemoryMalloc(len + 1);
|
||||
if (print == NULL) {
|
||||
uError("SML:0x%" PRIx64 " smlParseLine failed. code : %d", id, terrno);
|
||||
return;
|
||||
}
|
||||
(void)memcpy(print, data, len);
|
||||
print[len] = '\0';
|
||||
if (level == DEBUG_DEBUG){
|
||||
uDebug("SML:0x%" PRIx64 " smlParseLine is raw, line %d/%d : %s", id, lineNum, numLines, print);
|
||||
}else if (level == DEBUG_ERROR){
|
||||
|
|
|
@ -470,8 +470,7 @@ static int32_t smlParseJSONStringExt(SSmlHandle *info, cJSON *root, SSmlLineInfo
|
|||
uError("SML:0x%" PRIx64 " Unable to parse timestamp from JSON payload %s %s %" PRId64, info->id, info->msgBuf.buf,tmp, ts);
|
||||
taosMemoryFree(tmp);
|
||||
}
|
||||
code = TSDB_CODE_INVALID_TIMESTAMP;
|
||||
goto END;
|
||||
SML_CHECK_CODE(TSDB_CODE_INVALID_TIMESTAMP);
|
||||
}
|
||||
SSmlKv kvTs = {0};
|
||||
smlBuildTsKv(&kvTs, ts);
|
||||
|
|
Loading…
Reference in New Issue