fix query information_schema report sync leader unreachable

This commit is contained in:
wangjiaming0909 2025-02-08 16:19:02 +08:00
parent db807020a0
commit 490bce2605
7 changed files with 66 additions and 20 deletions

View File

@ -1119,7 +1119,7 @@ void ctgFreeBatch(SCtgBatch* pBatch);
void ctgFreeBatchs(SHashObj* pBatchs);
int32_t ctgCloneVgInfo(SDBVgInfo* src, SDBVgInfo** dst);
int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput);
int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList);
int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList, const char* dbFName);
void ctgFreeJob(void* job);
void ctgFreeHandleImpl(SCatalog* pCtg);
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SEpSet* pMgmtEps, SDBVgInfo* dbInfo, const SName* pTableName,
@ -1159,7 +1159,8 @@ int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName*
int32_t ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch, SName** ppName);
int32_t ctgdGetOneHandle(SCatalog** pHandle);
int ctgVgInfoComp(const void* lp, const void* rp);
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo);
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo, const char* dbName, bool isFullName);
int32_t ctgMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func);
int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res);
int32_t ctgReadDBCfgFromCache(SCatalog* pCtg, const char* dbFName, SDbCfgInfo* pDbCfg);

View File

@ -505,7 +505,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTabl
}
if (tbMeta->tableType == TSDB_SUPER_TABLE) {
CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList));
CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, pVgList, db));
} else {
// USE HASH METHOD INSTEAD OF VGID IN TBMETA
ctgError("invalid method to get none stb vgInfo, tbType:%d", tbMeta->tableType);
@ -1029,7 +1029,7 @@ int32_t catalogGetDBVgList(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
vgHash = vgInfo->vgHash;
}
CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, &vgList));
CTG_ERR_JRET(ctgGenerateVgList(pCtg, vgHash, &vgList, dbFName));
*vgroupList = vgList;
vgList = NULL;

View File

@ -1970,7 +1970,7 @@ static int32_t ctgHandleGetTbNamesRsp(SCtgTaskReq* tReq, int32_t reqType, const
switch (reqType) {
case TDMT_MND_USE_DB: {
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
CTG_ERR_RET(ctgMakeVgArray(pOut->dbVgroup));
CTG_ERR_RET(ctgMakeVgArray(pOut->dbVgroup, pName->dbname, false));
SArray* pVgArray = NULL;
TSWAP(pVgArray, pOut->dbVgroup->vgArray);
int32_t vgSize = taosArrayGetSize(pVgArray);
@ -2142,7 +2142,7 @@ int32_t ctgHandleGetDbVgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf*
SUseDbOutput* pOut = (SUseDbOutput*)pTask->msgCtx.out;
SDBVgInfo* pDb = NULL;
CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res));
CTG_ERR_JRET(ctgGenerateVgList(pCtg, pOut->dbVgroup->vgHash, (SArray**)&pTask->res, ctx->dbFName));
CTG_ERR_JRET(cloneDbVgInfo(pOut->dbVgroup, &pDb));
CTG_ERR_JRET(ctgUpdateVgroupEnqueue(pCtg, ctx->dbFName, pOut->dbId, pDb, false));
@ -3175,7 +3175,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask* pTask) {
CTG_ERR_JRET(ctgBuildUseDbOutput((SUseDbOutput**)&pMsgCtx->out, dbCache->vgCache.vgInfo));
}
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res));
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res, pCtx->dbFName));
ctgReleaseVgInfoToCache(pCtg, dbCache);
dbCache = NULL;

View File

@ -1116,7 +1116,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
dbFName = p + 1;
}
code = ctgMakeVgArray(dbInfo);
code = ctgMakeVgArray(dbInfo, dbFName, false);
if (code) {
taosMemoryFree(op);
taosMemoryFree(msg);
@ -2082,7 +2082,7 @@ void ctgFreeAllHandles(void) {
taosHashClear(gCtgMgmt.pCluster);
}
int32_t ctgVgInfoIdComp(void const *lp, void const *rp) {
int32_t ctgVgInfoIdCompId(void const *lp, void const *rp) {
int32_t *key = (int32_t *)lp;
SVgroupInfo *pVg = (SVgroupInfo *)rp;
@ -2695,7 +2695,7 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
goto _return;
}
SVgroupInfo *pInfo2 = taosArraySearch(vgInfo->vgArray, &msg->vgId, ctgVgInfoIdComp, TD_EQ);
SVgroupInfo *pInfo2 = taosArraySearch(vgInfo->vgArray, &msg->vgId, ctgVgInfoIdCompId, TD_EQ);
if (NULL == pInfo2) {
ctgDebug("no vgroup %d in db %s vgArray, ignore epset update", msg->vgId, msg->dbFName);
goto _return;

View File

@ -1147,12 +1147,28 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp* fp) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList) {
int32_t ctgVgInfoIdComp(void const *lp, void const *rp) {
SVgroupInfo* pVg1 = (SVgroupInfo*)lp;
SVgroupInfo* pVg2 = (SVgroupInfo*)rp;
if (pVg1->vgId < pVg2->vgId) {
return -1;
} else if (pVg1->vgId > pVg2->vgId) {
return 1;
}
return 0;
}
int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList, const char* dbFName) {
SHashObj* vgroupHash = NULL;
SVgroupInfo* vgInfo = NULL;
SArray* vgList = NULL;
int32_t code = 0;
int32_t vgNum = taosHashGetSize(vgHash);
SName name = {0};
code = tNameFromString(&name, dbFName, T_NAME_ACCT | T_NAME_DB);
CTG_ERR_RET(code);
vgList = taosArrayInit(vgNum, sizeof(SVgroupInfo));
if (NULL == vgList) {
@ -1173,7 +1189,10 @@ int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList) {
pIter = taosHashIterate(vgHash, pIter);
}
taosArraySort(vgList, ctgVgInfoComp);
if (IS_SYS_DBNAME(name.dbname))
taosArraySort(vgList, ctgVgInfoIdComp);
else
taosArraySort(vgList, ctgVgInfoComp);
*pList = vgList;
@ -1218,7 +1237,7 @@ int32_t ctgHashValueComp(void const* lp, void const* rp) {
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SEpSet* pMgmtEps, SDBVgInfo* dbInfo, const SName* pTableName,
SVgroupInfo* pVgroup) {
int32_t code = 0;
CTG_ERR_RET(ctgMakeVgArray(dbInfo));
CTG_ERR_RET(ctgMakeVgArray(dbInfo, pTableName->dbname, false));
int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
char db[TSDB_DB_FNAME_LEN] = {0};
@ -1288,7 +1307,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SEpSet* pMgmgEpSet, SCtgTaskR
SMetaRes res = {0};
SVgroupInfo* vgInfo = NULL;
CTG_ERR_RET(ctgMakeVgArray(dbInfo));
CTG_ERR_RET(ctgMakeVgArray(dbInfo, dbFName, true));
int32_t tbNum = taosArrayGetSize(pNames);
@ -1450,7 +1469,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SEpSet* pMgmgEpSet, SCtgTaskR
int32_t ctgGetVgIdsFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, char* dbFName, const char* pTbs[], int32_t tbNum,
int32_t* vgId) {
int32_t code = 0;
CTG_ERR_RET(ctgMakeVgArray(dbInfo));
CTG_ERR_RET(ctgMakeVgArray(dbInfo, dbFName, true));
int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
if (vgNum <= 0) {
@ -1563,7 +1582,22 @@ int32_t ctgTSMAVersionSortCompare(const void* key1, const void* key2) {
}
}
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo) {
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo, const char* dbName, bool isFullName) {
__compar_fn_t sortFunc = ctgVgInfoComp;
if (dbName) {
const char* realDbName = dbName;
SName name = {0};
if (isFullName) {
int32_t code = tNameFromString(&name, dbName, T_NAME_ACCT | T_NAME_DB);
CTG_ERR_RET(code);
realDbName = name.dbname;
}
if (IS_SYS_DBNAME(realDbName)) sortFunc = ctgVgInfoIdComp;
}
return ctgMakeVgArraySortBy(dbInfo, sortFunc);
}
int32_t ctgMakeVgArraySortBy(SDBVgInfo* dbInfo, __compar_fn_t sort_func) {
if (NULL == dbInfo) {
return TSDB_CODE_SUCCESS;
}
@ -1585,14 +1619,14 @@ int32_t ctgMakeVgArray(SDBVgInfo* dbInfo) {
pIter = taosHashIterate(dbInfo->vgHash, pIter);
}
taosArraySort(dbInfo->vgArray, ctgVgInfoComp);
taosArraySort(dbInfo->vgArray, sort_func);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgCloneVgInfo(SDBVgInfo* src, SDBVgInfo** dst) {
CTG_ERR_RET(ctgMakeVgArray(src));
CTG_ERR_RET(ctgMakeVgArray(src, NULL, false));
*dst = taosMemoryMalloc(sizeof(SDBVgInfo));
if (NULL == *dst) {

View File

@ -138,7 +138,9 @@ static uintptr_t getNextTimerId() {
return id;
}
static void timerAddRef(tmr_obj_t* timer) { (void)atomic_add_fetch_8(&timer->refCount, 1); }
static void timerAddRef(tmr_obj_t* timer) {
(void)atomic_add_fetch_8(&timer->refCount, 1);
}
static void timerDecRef(tmr_obj_t* timer) {
if (atomic_sub_fetch_8(&timer->refCount, 1) == 0) {

View File

@ -191,12 +191,21 @@ class TDTestCase:
tdSql.execute(create_db_replica_3_vgroups_100)
self.vote_leader_time_costs(db3)
def test_TS_5968(self):
conn = TDCom().newTdSql()
sql = "select db_name,sum(columns-1) from information_schema.ins_tables group by db_name"
conn.query(sql, queryTimes=10)
balance_sql = "balance vgroup leader database db_2"
tdSql.execute(balance_sql, queryTimes=1)
conn.query(sql, queryTimes=10)
tdLog.info("waiting for catalog update finished")
conn.close()
def run(self):
self.check_setup_cluster_status()
self.test_init_vgroups_time_costs()
self.test_TS_5968()
def stop(self):