fix: performace improvement

This commit is contained in:
shenglian zhou 2023-10-31 10:48:10 +08:00
parent e9a7f6e322
commit 45ae3727a7
1 changed files with 75 additions and 33 deletions

View File

@ -230,13 +230,12 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, const char** pSql, E
token.z = tmpTokenBuf; token.z = tmpTokenBuf;
token.n = strdequote(token.z); token.n = strdequote(token.z);
if (boundColsType == BOUND_ALL_AND_TBNAME) { if (boundColsType == BOUND_ALL_AND_TBNAME &&
if (token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) { token.n == strlen("tbname") && (strcasecmp(token.z, "tbname") == 0)) {
pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex; pBoundInfo->pColIndex[pBoundInfo->numOfBound] = tbnameSchemaIndex;
pUseCols[tbnameSchemaIndex] = true; pUseCols[tbnameSchemaIndex] = true;
++pBoundInfo->numOfBound; ++pBoundInfo->numOfBound;
continue; continue;
}
} }
int16_t t = lastColIdx + 1; int16_t t = lastColIdx + 1;
int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema); int16_t index = insFindCol(&token, t, pBoundInfo->numOfCols, pSchema);
@ -1567,6 +1566,7 @@ typedef struct SStbRowsDataContext {
STag* pTag; STag* pTag;
STableMeta* pCtbMeta; STableMeta* pCtbMeta;
SVCreateTbReq* pCreateCtbReq; SVCreateTbReq* pCreateCtbReq;
bool hasTimestampTag;
} SStbRowsDataContext; } SStbRowsDataContext;
typedef union SRowsDataContext{ typedef union SRowsDataContext{
@ -1575,18 +1575,25 @@ typedef union SRowsDataContext{
} SRowsDataContext; } SRowsDataContext;
static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) { SStbRowsDataContext* pStbRowsCxt, bool* pGotRow,
SToken* pToken, bool *pCtbFirst) {
SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo; SBoundColInfo* pCols = &pStbRowsCxt->boundColsInfo;
SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta); SSchema* pSchemas = getTableColumnSchema(pStbRowsCxt->pStbMeta);
bool isJsonTag = false; bool isJsonTag = false;
SArray* pTagName = pStbRowsCxt->aTagNames; SArray* pTagNames = pStbRowsCxt->aTagNames;
SArray* pTagVals = pStbRowsCxt->aTagVals; SArray* pTagVals = pStbRowsCxt->aTagVals;
bool bFoundTbName = false; bool bFoundTbName = false;
const char* pOrigSql = *ppSql; const char* pOrigSql = *ppSql;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
bool canParseTagsAfter = !pStbRowsCxt->pTagCond && !pStbRowsCxt->hasTimestampTag;
SToken tagTokens[TSDB_MAX_TAGS] = {0};
SSchema* tagSchemas[TSDB_MAX_TAGS] = {0};
int numOfTagTokens = 0;
for (int i = 0; i < pCols->numOfBound && code == TSDB_CODE_SUCCESS; ++i) { for (int i = 0; i < pCols->numOfBound && code == TSDB_CODE_SUCCESS; ++i) {
const char* pTmpSql = *ppSql; const char* pTmpSql = *ppSql;
bool ignoreComma = false; bool ignoreComma = false;
@ -1608,9 +1615,15 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
} else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) { } else if (pCols->pColIndex[i] < getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]]; SSchema* pTagSchema = &pSchemas[pCols->pColIndex[i]];
isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON; isJsonTag = pTagSchema->type == TSDB_DATA_TYPE_JSON;
code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg); if (canParseTagsAfter) {
if (code == TSDB_CODE_SUCCESS) { tagTokens[numOfTagTokens] = *pToken;
code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagName, pTagVals, &pStbRowsCxt->pTag); tagSchemas[numOfTagTokens] = pTagSchema;
++numOfTagTokens;
} else {
code = checkAndTrimValue(pToken, pCxt->tmpTokenBuf, &pCxt->msg);
if (code == TSDB_CODE_SUCCESS) {
code = parseTagValue(pCxt, pStmt, ppSql, pTagSchema, pToken, pTagNames, pTagVals, &pStbRowsCxt->pTag);
}
} }
} }
else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) { else if (pCols->pColIndex[i] == getTbnameSchemaIndex(pStbRowsCxt->pStbMeta)) {
@ -1639,9 +1652,36 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
if (!bFoundTbName) { if (!bFoundTbName) {
code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql); code = buildSyntaxErrMsg(&pCxt->msg, "tbname value expected", pOrigSql);
} }
if (code == TSDB_CODE_SUCCESS && !isJsonTag) {
code = tTagNew(pTagVals, 1, false, &pStbRowsCxt->pTag); bool ctbFirst = true;
if (code == TSDB_CODE_SUCCESS) {
char ctbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName));
ctbFirst = *pCtbFirst = (pCtbMeta == NULL);
if (!ctbFirst) {
pStbRowsCxt->pCtbMeta = *pCtbMeta;
}
} }
if (code == TSDB_CODE_SUCCESS && ctbFirst) {
for (int32_t i = 0; code == TSDB_CODE_SUCCESS && i < numOfTagTokens; ++i) {
SToken* pTagToken = tagTokens + i;
SSchema* pTagSchema = tagSchemas[i];
code = checkAndTrimValue(pTagToken, pCxt->tmpTokenBuf, &pCxt->msg);
if (code == TSDB_CODE_SUCCESS) {
code = parseTagValue(pCxt, pStmt, NULL, pTagSchema, pTagToken, pStbRowsCxt->aTagNames, pStbRowsCxt->aTagVals,
&pStbRowsCxt->pTag);
}
}
if (code == TSDB_CODE_SUCCESS && !isJsonTag) {
code = tTagNew(pStbRowsCxt->aTagVals, 1, false, &pStbRowsCxt->pTag);
}
}
if (code == TSDB_CODE_SUCCESS && pStbRowsCxt->pTagCond) {
code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
}
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
*pGotRow = true; *pGotRow = true;
} }
@ -1650,9 +1690,6 @@ static int32_t getStbRowValues(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pS
static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt) { static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SStbRowsDataContext* pStbRowsCxt) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pStbRowsCxt->pTagCond) {
code = checkSubtablePrivilege(pStbRowsCxt->aTagVals, pStbRowsCxt->aTagNames, &pStbRowsCxt->pTagCond);
}
pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq)); pStbRowsCxt->pCreateCtbReq = taosMemoryCalloc(1, sizeof(SVCreateTbReq));
if (pStbRowsCxt->pCreateCtbReq == NULL) { if (pStbRowsCxt->pCreateCtbReq == NULL) {
@ -1666,28 +1703,24 @@ static int32_t processCtbAutoCreationAndCtbMeta(SInsertParseContext* pCxt, SVnod
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj);
char ctbFName[TSDB_TABLE_FNAME_LEN]; char ctbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName); tNameExtractFullName(&pStbRowsCxt->ctbName, ctbFName);
STableMeta** pCtbMeta = taosHashGet(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName)); SVgroupInfo vg;
if (NULL != pCtbMeta) { SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter,
pStbRowsCxt->pCtbMeta->uid = (*pCtbMeta)->uid; .requestId = pCxt->pComCxt->requestId,
pStbRowsCxt->pCtbMeta->vgId = (*pCtbMeta)->vgId; .requestObjRefId = pCxt->pComCxt->requestRid,
} else { .mgmtEps = pCxt->pComCxt->mgmtEpSet};
SVgroupInfo vg; code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
SRequestConnInfo conn = {.pTrans = pCxt->pComCxt->pTransporter, if (code == TSDB_CODE_SUCCESS) {
.requestId = pCxt->pComCxt->requestId,
.requestObjRefId = pCxt->pComCxt->requestRid,
.mgmtEps = pCxt->pComCxt->mgmtEpSet};
code = catalogGetTableHashVgroup(pCxt->pComCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg)); taosHashPut(pStmt->pVgroupsHashObj, (const char*)(&vg.vgId), sizeof(vg.vgId), &vg, sizeof(vg));
pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1; pStbRowsCxt->pCtbMeta->uid = taosHashGetSize(pStmt->pSubTableHashObj) + 1;
pStbRowsCxt->pCtbMeta->vgId = vg.vgId; pStbRowsCxt->pCtbMeta->vgId = vg.vgId;
STableMeta* pBackup = NULL; STableMeta* pBackup = NULL;
cloneTableMeta(pStbRowsCxt->pCtbMeta, &pBackup); cloneTableMeta(pStbRowsCxt->pCtbMeta, &pBackup);
taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES); taosHashPut(pStmt->pSubTableHashObj, ctbFName, strlen(ctbFName), &pBackup, POINTER_BYTES);
} }
collectUseTable(&pStbRowsCxt->ctbName, pStmt->pTableNameHashObj);
} }
return code; return code;
} }
@ -1724,13 +1757,14 @@ static void clearStbRowsDataContext(SStbRowsDataContext* pStbRowsCxt) {
static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql, static int32_t parseOneStbRow(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, const char** ppSql,
SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) { SStbRowsDataContext* pStbRowsCxt, bool* pGotRow, SToken* pToken) {
resetStbRowsDataContextPreStbRow(pStbRowsCxt); resetStbRowsDataContextPreStbRow(pStbRowsCxt);
int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken); bool bFirstTable = false;
int32_t code = getStbRowValues(pCxt, pStmt, ppSql, pStbRowsCxt, pGotRow, pToken, &bFirstTable);
if (code != TSDB_CODE_SUCCESS || !*pGotRow) { if (code != TSDB_CODE_SUCCESS || !*pGotRow) {
return code; return code;
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS && bFirstTable) {
code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt); code = processCtbAutoCreationAndCtbMeta(pCxt, pStmt, pStbRowsCxt);
} }
@ -2053,6 +2087,14 @@ static int32_t parseInsertStbClauseBottom(SInsertParseContext* pCxt, SVnodeModif
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta, code = parseBoundColumns(pCxt, &pStmt->pBoundCols, BOUND_ALL_AND_TBNAME, pStmt->pTableMeta,
&pStbRowsCxt->boundColsInfo); &pStbRowsCxt->boundColsInfo);
pStbRowsCxt->hasTimestampTag = false;
for (int32_t i = 0; i < pStbRowsCxt->boundColsInfo.numOfBound; ++i) {
int16_t schemaIndex = pStbRowsCxt->boundColsInfo.pColIndex[i];
if (schemaIndex != getTbnameSchemaIndex(pStmt->pTableMeta) &&
schemaIndex >= getNumOfColumns(pStmt->pTableMeta) && pStmt->pTableMeta->schema[schemaIndex].type == TSDB_DATA_TYPE_TIMESTAMP) {
pStbRowsCxt->hasTimestampTag = true;
}
}
pStmt->pStbRowsCxt = pStbRowsCxt; pStmt->pStbRowsCxt = pStbRowsCxt;
} }