enhance: refactor getStbRowValues

This commit is contained in:
shenglian zhou 2023-11-01 08:32:31 +08:00
parent 670a95fabc
commit 7636f09bfb
1 changed files with 71 additions and 45 deletions

View File

@ -1567,6 +1567,7 @@ typedef struct SStbRowsDataContext {
STableMeta* pCtbMeta;
SVCreateTbReq* pCreateCtbReq;
bool hasTimestampTag;
bool isJsonTag;
} SStbRowsDataContext;
typedef union SRowsDataContext{
@ -1600,27 +1601,40 @@ static int32_t parseTbnameToken(SInsertParseContext* pCxt, SStbRowsDataContext*
return code;
}
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow,
SToken* pToken, bool *pCtbFirst) {
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
bool isJsonTag = false;
SArray* pTagNames = pStbRowsCxt->aTagNames;
SArray* pTagVals = pStbRowsCxt->aTagVals;
bool bFoundTbName = false;
const char* pOrigSql = *ppSql;
static int32_t processCtbTagsAfterCtbName(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt,
SStbRowsDataContext* pStbRowsCxt, bool ctbFirst,
const SToken* tagTokens, SSchema* const* tagSchemas,
int numOfTagTokens) {
int32_t code = TSDB_CODE_SUCCESS;
bool canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag;
SToken tagTokens[TSDB_MAX_TAGS] = {0};
SSchema* tagSchemas[TSDB_MAX_TAGS] = {0};
int numOfTagTokens = 0;
if (code == TSDB_CODE_SUCCESS && ctbFirst) {
for (int32_t i = 0; code == TSDB_CODE_SUCCESS && i < numOfTagTokens; ++i) {
SToken* pTagToken = (SToken*)(tagTokens + i);
SSchema* pTagSchema = tagSchemas[i];
code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg);
if (code == TSDB_CODE_SUCCESS) {
code = parseTagValue(pCxt, pStmt, NULL, pTagSchema, pTagToken, pStbRowsCxt->aTagNames, pStbRowsCxt->aTagVals,
&pStbRowsCxt->pTag);
}
}
if (code == TSDB_CODE_SUCCESS && !pStbRowsCxt->isJsonTag) {
code = tTagNew(pStbRowsCxt->aTagVals, 1, false, &pStbRowsCxt->pTag);
}
}
for (int i = 0; i < pCols->numOfBound && code == TSDB_CODE_SUCCESS; ++i) {
if (code == TSDB_CODE_SUCCESS && pStbRowsCxt->pTagCond) {
code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
}
return code;
}
static int32_t doGetStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char* const* ppSql,
SStbRowsDataContext* pStbRowsCxt, SToken* pToken, const SBoundColInfo* pCols,
const SSchema* pSchemas, SArray* pTagNames, SArray* pTagVals, SToken* tagTokens,
SSchema** tagSchemas, int* pNumOfTagTokens, bool* bFoundTbName) {
int32_t code = TSDB_CODE_SUCCESS;
bool canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag;
for (int i = 0; i < pCols->numOfBound && (code) == TSDB_CODE_SUCCESS; ++i) {
const char* pTmpSql = *ppSql;
bool ignoreComma = false;
NEXT_TOKEN_WITH_PREV_EXT(*ppSql, *pToken, &ignoreComma);
@ -1640,11 +1654,10 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
code = parseValueToken(pCxt, ppSql, pToken, pSchema, getTableInfo(pStbRowsCxt->pStbMeta).precision, pVal);
} else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON;
if (canParseTagsAfter) {
tagTokens[numOfTagTokens] = *pToken;
tagSchemas[numOfTagTokens] = pTagSchema;
++numOfTagTokens;
tagTokens[(*pNumOfTagTokens)] = *pToken;
tagSchemas[(*pNumOfTagTokens)] = pTagSchema;
++(*pNumOfTagTokens);
} else {
code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
if (code == TSDB_CODE_SUCCESS) {
@ -1654,7 +1667,7 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
}
else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
char ctbName[TSDB_TABLE_NAME_LEN];
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, ctbName, &bFoundTbName);
code = parseTbnameToken(pCxt, pStbRowsCxt, pToken, ctbName, bFoundTbName);
}
if (code == TSDB_CODE_SUCCESS && i < pCols->numOfBound - 1) {
@ -1664,38 +1677,47 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
}
}
}
if (!bFoundTbName) {
return code;
}
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow,
SToken* pToken, bool *pCtbFirst) {
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
SArray* pTagNames = pStbRowsCxt->aTagNames;
SArray* pTagVals = pStbRowsCxt->aTagVals;
bool bFoundTbName = false;
const char* pOrigSql = *ppSql;
int32_t code = TSDB_CODE_SUCCESS;
SToken tagTokens[TSDB_MAX_TAGS] = {0};
SSchema* tagSchemas[TSDB_MAX_TAGS] = {0};
int numOfTagTokens = 0;
code = doGetStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pToken, pCols, pSchemas, pTagNames, pTagVals, tagTokens,
tagSchemas, &numOfTagTokens, &bFoundTbName);
if (code == TSDB_CODE_SUCCESS && !bFoundTbName) {
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
}
bool ctbFirst = true;
if (code == TSDB_CODE_SUCCESS) {
char ctbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName));
ctbFirst = *pCtbFirst = (pCtbMeta == NULL);
ctbFirst = (pCtbMeta == NULL);
if (!ctbFirst) {
pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid;
pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId;
}
}
if (code == TSDB_CODE_SUCCESS && ctbFirst) {
for (int32_t i = 0; code == TSDB_CODE_SUCCESS && i < numOfTagTokens; ++i) {
SToken* pTagToken = tagTokens + i;
SSchema* pTagSchema = tagSchemas[i];
code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg);
if (code == TSDB_CODE_SUCCESS) {
code = parseTagValue(pCxt, pStmt, NULL, pTagSchema, pTagToken, pStbRowsCxt->aTagNames, pStbRowsCxt->aTagVals,
&pStbRowsCxt->pTag);
}
}
if (code == TSDB_CODE_SUCCESS && !isJsonTag) {
code = tTagNew(pStbRowsCxt->aTagVals, 1, false, &pStbRowsCxt->pTag);
}
*pCtbFirst = ctbFirst;
}
if (code == TSDB_CODE_SUCCESS && pStbRowsCxt->pTagCond) {
code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
if (code == TSDB_CODE_SUCCESS) {
code = processCtbTagsAfterCtbName(pCxt, pStmt, pStbRowsCxt, ctbFirst, tagTokens, tagSchemas, numOfTagTokens);
}
if (code == TSDB_CODE_SUCCESS) {
@ -2106,9 +2128,13 @@ static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModif
pStbRowsCxt->hasTimestampTag = false;
for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) {
int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i];
if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) &&
schemaIndex >= getNumOfColumns(pStmt->pTableMeta) && pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) {
pStbRowsCxt->hasTimestampTag = true;
if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) && schemaIndex >= getNumOfColumns(pStmt->pTableMeta) ) {
if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) {
pStbRowsCxt->hasTimestampTag = true;
}
if (pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_JSON) {
pStbRowsCxt->isJsonTag = true;
}
}
}
pStmt->pStbRowsCxt = pStbRowsCxt;