fix: refactor checkAuth for target table

This commit is contained in:
slzhou 2023-10-20 14:51:44 +08:00
parent 332268fd4b
commit ce6be3d3e7
1 changed files with 13 additions and 30 deletions

View File

@ -974,21 +974,6 @@ static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache,
return code; return code;
} }
static int32_t checkAuthForTable(SParseContext* pCxt, SName* pTbName, bool* pMissCache, bool* pNeedTableTagVal) {
SNode* pTagCond = NULL;
int32_t code = checkAuth(pCxt, pTbName, pMissCache, &pTagCond);
if (TSDB_CODE_SUCCESS == code) {
*pNeedTableTagVal = ((*pMissCache) || (NULL != pTagCond));
*pMissCache = (NULL != pTagCond);
}
nodesDestroyNode(pTagCond);
return code;
}
static int32_t checkAuthForStable(SParseContext* pCxt, SName* pTbName, bool* pMissCache, SNode** pTagCond) {
return checkAuth(pCxt, pTbName, pMissCache, pTagCond);
}
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMeta** pTableMeta, static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, STableMeta** pTableMeta,
bool* pMissCache, bool bUsingTable) { bool* pMissCache, bool bUsingTable) {
SParseContext* pComCxt = pCxt->pComCxt; SParseContext* pComCxt = pCxt->pComCxt;
@ -1045,18 +1030,6 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bo
return code; return code;
} }
static int32_t getTargetTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pMissCache) {
SVgroupInfo vg;
int32_t code = catalogGetCachedTableVgMeta(pCxt->pCatalog, &pStmt->targetTableName, &vg, &pStmt->pTableMeta);
if (TSDB_CODE_SUCCESS == code) {
if (NULL != pStmt->pTableMeta) {
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
}
*pMissCache = (NULL == pStmt->pTableMeta);
}
return code;
}
static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pMissCache) { static int32_t getTargetTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, bool* pMissCache) {
SParseContext* pComCxt = pCxt->pComCxt; SParseContext* pComCxt = pCxt->pComCxt;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
@ -1109,11 +1082,21 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStm
pCxt->missCache = true; pCxt->missCache = true;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SNode* pTagCond = NULL;
int32_t code = checkAuthForTable(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache, &pCxt->needTableTagVal); int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache, &pTagCond);
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTargetTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache); code = getTargetTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
} }
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
if (TSDB_SUPER_TABLE != pStmt->pTableMeta->tableType) {
pCxt->needTableTagVal = (NULL != pTagCond);
pCxt->missCache = (NULL != pTagCond);
} else {
pStmt->pTagCond = nodesCloneNode(pTagCond);
}
}
nodesDestroyNode(pTagCond);
if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) { if (TSDB_CODE_SUCCESS == code && !pCxt->pComCxt->async) {
code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj); code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -1133,7 +1116,7 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t code = checkAuthForStable(pCxt->pComCxt, &pStmt->usingTableName, &pCxt->missCache, &pStmt->pTagCond); int32_t code = checkAuth(pCxt->pComCxt, &pStmt->usingTableName, &pCxt->missCache, &pStmt->pTagCond);
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
bool bUsingTable = true; bool bUsingTable = true;
code = getTableMeta(pCxt, &pStmt->usingTableName, &pStmt->pTableMeta, &pCxt->missCache, bUsingTable); code = getTableMeta(pCxt, &pStmt->usingTableName, &pStmt->pTableMeta, &pCxt->missCache, bUsingTable);