enh: merge getmetafromcache and getvgroupfromcache interface
This commit is contained in:
parent
1138d8abb2
commit
d5e3d750d1
|
@ -563,7 +563,8 @@ static int32_t parseTagValue(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt
|
|||
|
||||
static void buildCreateTbReq(SVnodeModifOpStmt* pStmt, STag* pTag, SArray* pTagName) {
|
||||
insBuildCreateTbReq(&pStmt->createTblReq, pStmt->targetTableName.tname, pTag, pStmt->pTableMeta->suid,
|
||||
pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
|
||||
pStmt->usingTableName.tname, pTagName, pStmt->pTableMeta->tableInfo.numOfTags,
|
||||
TSDB_DEFAULT_TABLE_TTL);
|
||||
}
|
||||
|
||||
static int32_t checkAndTrimValue(SToken* pToken, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
|
||||
|
@ -829,6 +830,33 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
|
||||
SVgroupInfo vg;
|
||||
bool exists = true;
|
||||
int32_t code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (exists) {
|
||||
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
|
||||
}
|
||||
*pMissCache = !exists;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
|
||||
SParseContext* pComCxt = pCxt->pComCxt;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (pComCxt->async) {
|
||||
code = getTableMetaAndVgroupImpl(pComCxt->pCatalog, pStmt, pMissCache);
|
||||
} else {
|
||||
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, pMissCache);
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||
if (pCxt->forceUpdate) {
|
||||
pCxt->missCache = true;
|
||||
|
@ -836,11 +864,14 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt
|
|||
}
|
||||
|
||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
||||
// if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
// code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
|
||||
// }
|
||||
// if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
// code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
||||
// }
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
||||
code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -933,11 +964,10 @@ static int32_t getTableDataBlocks(SInsertParseContext* pCxt, SVnodeModifOpStmt*
|
|||
if (pStmt->usingTableProcessing) {
|
||||
pStmt->pTableMeta->uid = 0;
|
||||
}
|
||||
|
||||
return insGetDataBlockFromList(pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid),
|
||||
TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||
getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL,
|
||||
&pStmt->createTblReq);
|
||||
|
||||
return insGetDataBlockFromList(
|
||||
pStmt->pTableBlockHashObj, &uid, sizeof(pStmt->pTableMeta->uid), TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk),
|
||||
getTableInfo(pStmt->pTableMeta).rowSize, pStmt->pTableMeta, pDataBuf, NULL, &pStmt->createTblReq);
|
||||
}
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(&pStmt->targetTableName, tbFName);
|
||||
|
@ -1540,8 +1570,9 @@ static int32_t setStmtInfo(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt)
|
|||
memcpy(tags, &pCxt->tags, sizeof(pCxt->tags));
|
||||
|
||||
SStmtCallback* pStmtCb = pCxt->pComCxt->pStmtCb;
|
||||
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName, pStmt->usingTableProcessing,
|
||||
pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj, pStmt->usingTableName.tname);
|
||||
int32_t code = (*pStmtCb->setInfoFn)(pStmtCb->pStmt, pStmt->pTableMeta, tags, &pStmt->targetTableName,
|
||||
pStmt->usingTableProcessing, pStmt->pVgroupsHashObj, pStmt->pTableBlockHashObj,
|
||||
pStmt->usingTableName.tname);
|
||||
|
||||
memset(&pCxt->tags, 0, sizeof(pCxt->tags));
|
||||
pStmt->pVgroupsHashObj = NULL;
|
||||
|
@ -1915,13 +1946,11 @@ static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCata
|
|||
}
|
||||
|
||||
int32_t parseInsertSql(SParseContext* pCxt, SQuery** pQuery, SCatalogReq* pCatalogReq, const SMetaData* pMetaData) {
|
||||
SInsertParseContext context = {
|
||||
.pComCxt = pCxt,
|
||||
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
|
||||
.missCache = false,
|
||||
.usingDuplicateTable = false,
|
||||
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)
|
||||
};
|
||||
SInsertParseContext context = {.pComCxt = pCxt,
|
||||
.msg = {.buf = pCxt->pMsg, .len = pCxt->msgLen},
|
||||
.missCache = false,
|
||||
.usingDuplicateTable = false,
|
||||
.forceUpdate = (NULL != pCatalogReq ? pCatalogReq->forceUpdate : false)};
|
||||
|
||||
int32_t code = initInsertQuery(&context, pCatalogReq, pMetaData, pQuery);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
|
Loading…
Reference in New Issue