From b6b4563ac7b2e49bd36b0bc794223221ce175460 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 5 Mar 2022 10:54:36 +0800 Subject: [PATCH] feature/scheduler --- include/common/tmsg.h | 3 ++- include/libs/qcom/query.h | 3 ++- source/client/src/clientHb.c | 1 + source/libs/catalog/src/catalog.c | 14 +++++++++++--- source/libs/qcom/src/querymsg.c | 2 ++ 5 files changed, 18 insertions(+), 5 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 883ebdd0cd..2e4d3ff44e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -171,6 +171,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SBuildUseDBInput; typedef struct SField { @@ -568,7 +569,7 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t dbId; int32_t vgVersion; - int32_t numOfTable; + int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT } SUseDbReq; int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq); diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index ad1f988f4c..84c246dbf5 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -82,6 +82,7 @@ typedef struct STableMeta { typedef struct SDBVgInfo { int32_t vgVersion; int8_t hashMethod; + int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT SHashObj *vgHash; //key:vgId, value:SVgroupInfo } SDBVgInfo; @@ -133,7 +134,7 @@ typedef struct SQueryNodeAddr { } SQueryNodeAddr; typedef struct SQueryNodeStat { - double tableNum; //table number in million + double tableNum; // vg table number, unit is TSDB_TABLE_NUM_UNIT } SQueryNodeStat; int32_t initTaskQueue(); diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index e9e64a78b8..d32b2a7bd5 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -224,6 +224,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl SDbVgVersion *db = &dbs[i]; db->dbId = htobe64(db->dbId); db->vgVersion = htonl(db->vgVersion); + db->numOfTable = htonl(db->numOfTable); } SKv kv = {.key = HEARTBEAT_KEY_DBINFO, .valueLen = sizeof(SDbVgVersion) * dbNum, .value = dbs}; diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 5779906761..f857ea95c3 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -1331,7 +1331,7 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD } bool newAdded = false; - SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion}; + SDbVgVersion vgVersion = {.dbId = dbId, .vgVersion = dbInfo->vgVersion, .numOfTable = dbInfo->numOfTable}; SCtgDBCache *dbCache = NULL; CTG_ERR_RET(ctgGetAddDBCache(pCtg, dbFName, dbId, &dbCache)); @@ -1344,8 +1344,15 @@ int32_t ctgUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId, SD CTG_ERR_RET(ctgWAcquireVgInfo(pCtg, dbCache)); if (dbCache->vgInfo) { - if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) { - ctgInfo("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); + if (dbInfo->vgVersion < dbCache->vgInfo->vgVersion) { + ctgDebug("db vgVersion is old, dbFName:%s, vgVersion:%d, currentVersion:%d", dbFName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion); + ctgWReleaseVgInfo(dbCache); + + return TSDB_CODE_SUCCESS; + } + + if (dbInfo->vgVersion == dbCache->vgInfo->vgVersion && dbInfo->numOfTable == dbCache->vgInfo->numOfTable) { + ctgDebug("no new db vgVersion or numOfTable, dbFName:%s, vgVersion:%d, numOfTable:%d", dbFName, dbInfo->vgVersion, dbInfo->numOfTable); ctgWReleaseVgInfo(dbCache); return TSDB_CODE_SUCCESS; @@ -1511,6 +1518,7 @@ int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const if (inCache) { input.dbId = (*dbCache)->dbId; input.vgVersion = (*dbCache)->vgInfo->vgVersion; + input.numOfTable = (*dbCache)->vgInfo->numOfTable; } else { input.vgVersion = CTG_DEFAULT_INVALID_VERSION; } diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 41c20ad264..ab0bbd319a 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -42,6 +42,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { for (int32_t i = 0; i < usedbRsp->vgNum; ++i) { SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i); + pOut->dbVgroup->numOfTable += pVgInfo->numOfTable; if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -84,6 +85,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms usedbReq.db[sizeof(usedbReq.db) - 1] = 0; usedbReq.vgVersion = pInput->vgVersion; usedbReq.dbId = pInput->dbId; + usedbReq.numOfTable = pInput->numOfTable; int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq); void *pBuf = rpcMallocCont(bufLen);