enh: merge getmetafromcache and getvgroupfromcache interface
This commit is contained in:
parent
0a075cb57a
commit
44b2cd1b73
|
@ -832,13 +832,12 @@ static int32_t getTableVgroup(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, boo
|
||||||
|
|
||||||
static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
|
static int32_t getTableMetaAndVgroupImpl(SParseContext* pCxt, SVnodeModifOpStmt* pStmt, bool* pMissCache) {
|
||||||
SVgroupInfo vg;
|
SVgroupInfo vg;
|
||||||
bool exists = true;
|
int32_t code = catalogGetCachedTableVgMeta(pCxt->pCatalog, &pStmt->targetTableName, &vg, &pStmt->pTableMeta);
|
||||||
int32_t code = catalogGetCachedTableHashVgroup(pCxt->pCatalog, &pStmt->targetTableName, &vg, &exists);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
if (exists) {
|
if (NULL != pStmt->pTableMeta) {
|
||||||
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
|
code = taosHashPut(pStmt->pVgroupsHashObj, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg));
|
||||||
}
|
}
|
||||||
*pMissCache = !exists;
|
*pMissCache = (NULL == pStmt->pTableMeta);
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -857,6 +856,18 @@ static int32_t getTableMetaAndVgroup(SInsertParseContext* pCxt, SVnodeModifOpStm
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t collectUseTable(const SName* pName, SHashObj* pTable) {
|
||||||
|
char fullName[TSDB_TABLE_FNAME_LEN];
|
||||||
|
tNameExtractFullName(pName, fullName);
|
||||||
|
return taosHashPut(pTable, fullName, strlen(fullName), pName, sizeof(SName));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t collectUseDatabase(const SName* pName, SHashObj* pDbs) {
|
||||||
|
char dbFName[TSDB_DB_FNAME_LEN] = {0};
|
||||||
|
tNameGetFullDbName(pName, dbFName);
|
||||||
|
return taosHashPut(pDbs, dbFName, strlen(dbFName), dbFName, sizeof(dbFName));
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt* pStmt) {
|
||||||
if (pCxt->forceUpdate) {
|
if (pCxt->forceUpdate) {
|
||||||
pCxt->missCache = true;
|
pCxt->missCache = true;
|
||||||
|
@ -864,15 +875,24 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifOpStmt
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
||||||
// if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
#if 0
|
||||||
// code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
// }
|
code = getTableMeta(pCxt, &pStmt->targetTableName, false, &pStmt->pTableMeta, &pCxt->missCache);
|
||||||
// if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
}
|
||||||
// code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
// }
|
code = getTableVgroup(pCxt->pComCxt, pStmt, false, &pCxt->missCache);
|
||||||
|
}
|
||||||
|
#else
|
||||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
|
code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
|
code = collectUseDatabase(&pStmt->targetTableName, pStmt->pDbFNameHashObj);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
|
code = collectUseTable(&pStmt->targetTableName, pStmt->pTableNameHashObj);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1807,16 +1827,25 @@ static int32_t initInsertQuery(SInsertParseContext* pCxt, SCatalogReq* pCatalogR
|
||||||
|
|
||||||
static int32_t setRefreshMate(SQuery* pQuery) {
|
static int32_t setRefreshMate(SQuery* pQuery) {
|
||||||
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
||||||
SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL);
|
|
||||||
while (NULL != pTable) {
|
if (taosHashGetSize(pStmt->pTableNameHashObj) > 0) {
|
||||||
taosArrayPush(pQuery->pTableList, pTable);
|
taosArrayDestroy(pQuery->pTableList);
|
||||||
pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable);
|
pQuery->pTableList = taosArrayInit(taosHashGetSize(pStmt->pTableNameHashObj), sizeof(SName));
|
||||||
|
SName* pTable = taosHashIterate(pStmt->pTableNameHashObj, NULL);
|
||||||
|
while (NULL != pTable) {
|
||||||
|
taosArrayPush(pQuery->pTableList, pTable);
|
||||||
|
pTable = taosHashIterate(pStmt->pTableNameHashObj, pTable);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL);
|
if (taosHashGetSize(pStmt->pDbFNameHashObj) > 0) {
|
||||||
while (NULL != pDb) {
|
taosArrayDestroy(pQuery->pDbList);
|
||||||
taosArrayPush(pQuery->pDbList, pDb);
|
pQuery->pDbList = taosArrayInit(taosHashGetSize(pStmt->pDbFNameHashObj), TSDB_DB_FNAME_LEN);
|
||||||
pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb);
|
char* pDb = taosHashIterate(pStmt->pDbFNameHashObj, NULL);
|
||||||
|
while (NULL != pDb) {
|
||||||
|
taosArrayPush(pQuery->pDbList, pDb);
|
||||||
|
pDb = taosHashIterate(pStmt->pDbFNameHashObj, pDb);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1930,16 +1959,17 @@ static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifOpStm
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
|
static int32_t setNextStageInfo(SInsertParseContext* pCxt, SQuery* pQuery, SCatalogReq* pCatalogReq) {
|
||||||
|
SVnodeModifOpStmt* pStmt = (SVnodeModifOpStmt*)pQuery->pRoot;
|
||||||
if (pCxt->missCache) {
|
if (pCxt->missCache) {
|
||||||
parserDebug("0x%" PRIx64 " %d rows have been inserted before cache miss", pCxt->pComCxt->requestId,
|
parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted before cache miss", pCxt->pComCxt->requestId,
|
||||||
((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum);
|
pStmt->totalRowsNum, pStmt->totalTbNum);
|
||||||
|
|
||||||
pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
|
pQuery->execStage = QUERY_EXEC_STAGE_PARSE;
|
||||||
return buildInsertCatalogReq(pCxt, (SVnodeModifOpStmt*)pQuery->pRoot, pCatalogReq);
|
return buildInsertCatalogReq(pCxt, pStmt, pCatalogReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
parserDebug("0x%" PRIx64 " %d rows have been inserted", pCxt->pComCxt->requestId,
|
parserDebug("0x%" PRIx64 " %d rows of %d tables have been inserted", pCxt->pComCxt->requestId, pStmt->totalRowsNum,
|
||||||
((SVnodeModifOpStmt*)pQuery->pRoot)->totalRowsNum);
|
pStmt->totalTbNum);
|
||||||
|
|
||||||
pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
|
pQuery->execStage = QUERY_EXEC_STAGE_SCHEDULE;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue