enh: insert optimize

This commit is contained in:
Xiaoyu Wang 2022-11-05 12:09:11 +08:00
parent 80dc658dbb
commit 7484ff430c
2 changed files with 41 additions and 42 deletions

View File

@ -186,8 +186,6 @@ int32_t catalogRemoveStbMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId,
*/ */
int32_t catalogGetTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, int32_t catalogGetTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName,
STableMeta** pTableMeta); STableMeta** pTableMeta);
int32_t catalogGetCachedTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
STableMeta** pTableMeta);
/** /**
* Get a super table's meta data. * Get a super table's meta data.
@ -200,8 +198,6 @@ int32_t catalogGetCachedTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const
*/ */
int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName, int32_t catalogGetSTableMeta(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pTableName,
STableMeta** pTableMeta); STableMeta** pTableMeta);
int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
STableMeta** pTableMeta);
int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg); int32_t catalogUpdateTableMeta(SCatalog* pCatalog, STableMetaRsp* rspMsg);
@ -211,7 +207,7 @@ int32_t catalogGetCachedTableMeta(SCatalog* pCtg, const SName* pTableName, STabl
int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta); int32_t catalogGetCachedSTableMeta(SCatalog* pCtg, const SName* pTableName, STableMeta** pTableMeta);
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists); int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
/** /**
* Force refresh DB's local cached vgroup info. * Force refresh DB's local cached vgroup info.
@ -271,8 +267,7 @@ int32_t catalogGetTableDistVgInfo(SCatalog* pCatalog, SRequestConnInfo* pConn, c
* @return error code * @return error code
*/ */
int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pName, SVgroupInfo* vgInfo); int32_t catalogGetTableHashVgroup(SCatalog* pCatalog, SRequestConnInfo* pConn, const SName* pName, SVgroupInfo* vgInfo);
int32_t catalogGetCachedTableHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName,
SVgroupInfo* pVgroup, bool* exists);
/** /**
* Get all meta data required in pReq. * Get all meta data required in pReq.
* @param pCatalog (input, got with catalogGetHandle) * @param pCatalog (input, got with catalogGetHandle)
@ -312,8 +307,8 @@ int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* f
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type,
bool* pass); bool* pass);
int32_t catalogChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, int32_t catalogChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool* pass,
bool* pass, bool* exists); bool* exists);
int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth); int32_t catalogUpdateUserAuthInfo(SCatalog* pCtg, SGetUserAuthRsp* pAuth);
@ -329,9 +324,9 @@ SMetaData* catalogCloneMetaData(SMetaData* pData);
void catalogFreeMetaData(SMetaData* pData); void catalogFreeMetaData(SMetaData* pData);
int32_t ctgdEnableDebug(char *option, bool enable); int32_t ctgdEnableDebug(char* option, bool enable);
int32_t ctgdHandleDbgCommand(char *command); int32_t ctgdHandleDbgCommand(char* command);
/** /**
* Destroy catalog and relase all resources * Destroy catalog and relase all resources

View File

@ -740,42 +740,46 @@ static int32_t parseUsingClauseBottom(SInsertParseContext* pCxt, SVnodeModifOpSt
return code; return code;
} }
static int32_t checkAuth(SParseContext* pCxt, SName* pTbName) { static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache) {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTbName, dbFName); tNameGetFullDbName(pTbName, dbFName);
SRequestConnInfo conn = {.pTrans = pCxt->pTransporter, int32_t code = TSDB_CODE_SUCCESS;
.requestId = pCxt->requestId, bool pass = true;
.requestObjRefId = pCxt->requestRid, bool exists = true;
.mgmtEps = pCxt->mgmtEpSet};
int32_t code = TSDB_CODE_SUCCESS;
bool pass = true;
if (pCxt->async) { if (pCxt->async) {
// todo replace with cached api code = catalogChkAuthFromCache(pCxt->pCatalog, pCxt->pUser, dbFName, AUTH_TYPE_WRITE, &pass, &exists);
code = catalogChkAuth(pCxt->pCatalog, &conn, pCxt->pUser, dbFName, AUTH_TYPE_WRITE, &pass);
} else { } else {
SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
.requestId = pCxt->requestId,
.requestObjRefId = pCxt->requestRid,
.mgmtEps = pCxt->mgmtEpSet};
code = catalogChkAuth(pCxt->pCatalog, &conn, pCxt->pUser, dbFName, AUTH_TYPE_WRITE, &pass); code = catalogChkAuth(pCxt->pCatalog, &conn, pCxt->pUser, dbFName, AUTH_TYPE_WRITE, &pass);
} }
if (TSDB_CODE_SUCCESS == code && !pass) { if (TSDB_CODE_SUCCESS == code) {
code = TSDB_CODE_PAR_PERMISSION_DENIED; if (!exists) {
*pMissCache = true;
} else if (!pass) {
code = TSDB_CODE_PAR_PERMISSION_DENIED;
}
} }
return code; return code;
} }
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta, static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta,
bool* pMissCache) { bool* pMissCache) {
SParseContext* pComCxt = pCxt->pComCxt; SParseContext* pComCxt = pCxt->pComCxt;
SRequestConnInfo conn = {.pTrans = pComCxt->pTransporter, int32_t code = TSDB_CODE_SUCCESS;
.requestId = pComCxt->requestId,
.requestObjRefId = pComCxt->requestRid,
.mgmtEps = pComCxt->mgmtEpSet};
int32_t code = TSDB_CODE_SUCCESS;
if (pComCxt->async) { if (pComCxt->async) {
if (isStb) { if (isStb) {
code = catalogGetCachedSTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta); code = catalogGetCachedSTableMeta(pComCxt->pCatalog, pTbName, pTableMeta);
} else { } else {
code = catalogGetCachedTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta); code = catalogGetCachedTableMeta(pComCxt->pCatalog, pTbName, pTableMeta);
} }
} else { } else {
SRequestConnInfo conn = {.pTrans = pComCxt->pTransporter,
.requestId = pComCxt->requestId,
.requestObjRefId = pComCxt->requestRid,
.mgmtEps = pComCxt->mgmtEpSet};
if (isStb) { if (isStb) {
code = catalogGetSTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta); code = catalogGetSTableMeta(pComCxt->pCatalog, &conn, pTbName, pTableMeta);
} else { } else {
@ -793,16 +797,16 @@ static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, bool isSt
} }
static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool isStb, bool* pMissCache) { static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool isStb, bool* pMissCache) {
SRequestConnInfo conn = {.pTrans = pCxt->pTransporter, int32_t code = TSDB_CODE_SUCCESS;
.requestId = pCxt->requestId, SVgroupInfo vg;
.requestObjRefId = pCxt->requestRid, bool exists = true;
.mgmtEps = pCxt->mgmtEpSet};
int32_t code = TSDB_CODE_SUCCESS;
SVgroupInfo vg;
bool exists = true;
if (pCxt->async) { if (pCxt->async) {
code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &conn, &pStmt->targetTableName, &vg, &exists); code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists);
} else { } else {
SRequestConnInfo conn = {.pTrans = pCxt->pTransporter,
.requestId = pCxt->requestId,
.requestObjRefId = pCxt->requestRid,
.mgmtEps = pCxt->mgmtEpSet};
code = catalogGetTableHashVgroup(pCxt->pCatalog, &conn, &pStmt->targetTableName, &vg); code = catalogGetTableHashVgroup(pCxt->pCatalog, &conn, &pStmt->targetTableName, &vg);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
@ -818,8 +822,8 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo
} }
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName); int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache); code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
} }
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
@ -833,8 +837,8 @@ static int32_t preParseUsingTableName(SInsertParseContext* pCxt, SVnodeModifOpSt
} }
static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) { static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName); int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
code = getTableMeta(pCxt, &pStmt->usingTableName, true, &pStmt->pTableMeta, &pCxt->missCache); code = getTableMeta(pCxt, &pStmt->usingTableName, true, &pStmt->pTableMeta, &pCxt->missCache);
} }
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) { if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {