enh: optimize db vgroup cache and child table meta retrieving
This commit is contained in:
parent
051ac7d874
commit
76fd76cd1a
|
@ -129,6 +129,7 @@ typedef struct SDBVgInfo {
|
||||||
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
|
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
|
||||||
int64_t stateTs;
|
int64_t stateTs;
|
||||||
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
|
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
|
||||||
|
SArray* vgArray;
|
||||||
} SDBVgInfo;
|
} SDBVgInfo;
|
||||||
|
|
||||||
typedef struct SUseDbOutput {
|
typedef struct SUseDbOutput {
|
||||||
|
|
|
@ -764,8 +764,6 @@ void ctgFreeJob(void* job);
|
||||||
void ctgFreeHandleImpl(SCatalog* pCtg);
|
void ctgFreeHandleImpl(SCatalog* pCtg);
|
||||||
void ctgFreeVgInfo(SDBVgInfo* vgInfo);
|
void ctgFreeVgInfo(SDBVgInfo* vgInfo);
|
||||||
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup);
|
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup);
|
||||||
int32_t ctgGetVgInfoFromHashValue2(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup);
|
|
||||||
|
|
||||||
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
|
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
|
||||||
char* dbFName, SArray* pNames, bool update);
|
char* dbFName, SArray* pNames, bool update);
|
||||||
void ctgResetTbMetaTask(SCtgTask* pTask);
|
void ctgResetTbMetaTask(SCtgTask* pTask);
|
||||||
|
@ -791,6 +789,7 @@ int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
|
||||||
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
|
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
|
||||||
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
|
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
|
||||||
int32_t ctgdGetOneHandle(SCatalog **pHandle);
|
int32_t ctgdGetOneHandle(SCatalog **pHandle);
|
||||||
|
int ctgVgInfoComp(const void* lp, const void* rp);
|
||||||
|
|
||||||
extern SCatalogMgmt gCtgMgmt;
|
extern SCatalogMgmt gCtgMgmt;
|
||||||
extern SCtgDebug gCTGDebug;
|
extern SCtgDebug gCTGDebug;
|
||||||
|
|
|
@ -536,7 +536,7 @@ int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue2(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
|
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
|
|
@ -226,6 +226,7 @@ _return:
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
|
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
|
||||||
SCtgDBCache *dbCache = NULL;
|
SCtgDBCache *dbCache = NULL;
|
||||||
SCtgTbCache *pCache = NULL;
|
SCtgTbCache *pCache = NULL;
|
||||||
|
@ -276,9 +277,9 @@ _return:
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
int32_t ctgAcquireStbMetaFromCache(SCtgDBCache *dbCache, SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgTbCache **pTb) {
|
||||||
int32_t ctgAcquireStbMetaFromCache2(SCtgDBCache *dbCache, SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgTbCache **pTb) {
|
|
||||||
SCtgTbCache *pCache = NULL;
|
SCtgTbCache *pCache = NULL;
|
||||||
char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
|
char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
|
||||||
if (NULL == stName) {
|
if (NULL == stName) {
|
||||||
|
@ -437,7 +438,7 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
|
||||||
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
|
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
|
||||||
ctx->tbInfo.tbType, dbFName);
|
ctx->tbInfo.tbType, dbFName);
|
||||||
|
|
||||||
ctgAcquireStbMetaFromCache2(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
|
ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
|
||||||
|
|
||||||
STableMeta *stbMeta = tbCache->pMeta;
|
STableMeta *stbMeta = tbCache->pMeta;
|
||||||
if (stbMeta->suid != ctx->tbInfo.suid) {
|
if (stbMeta->suid != ctx->tbInfo.suid) {
|
||||||
|
@ -503,10 +504,15 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
|
||||||
|
|
||||||
// PROCESS FOR CHILD TABLE
|
// PROCESS FOR CHILD TABLE
|
||||||
|
|
||||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||||
|
if (tbCache) {
|
||||||
|
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||||
|
taosHashRelease(dbCache->tbCache, tbCache);
|
||||||
|
}
|
||||||
|
|
||||||
ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
|
ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
|
||||||
|
|
||||||
ctgAcquireStbMetaFromCache(pCtg, dbFName, *suid, &dbCache, &tbCache);
|
ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, *suid, &tbCache);
|
||||||
if (NULL == tbCache) {
|
if (NULL == tbCache) {
|
||||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||||
ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
|
ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
|
||||||
|
@ -846,6 +852,18 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
|
||||||
dbFName = p + 1;
|
dbFName = p + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
|
||||||
|
dbInfo->vgArray = taosArrayInit(100, sizeof(SVgroupInfo));
|
||||||
|
|
||||||
|
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
taosArrayPush(dbInfo->vgArray, pIter);
|
||||||
|
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArraySort(dbInfo->vgArray, ctgVgInfoComp);
|
||||||
|
}
|
||||||
|
|
||||||
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||||
msg->pCtg = pCtg;
|
msg->pCtg = pCtg;
|
||||||
msg->dbId = dbId;
|
msg->dbId = dbId;
|
||||||
|
@ -1592,6 +1610,20 @@ void ctgFreeAllInstance(void) {
|
||||||
taosHashClear(gCtgMgmt.pCluster);
|
taosHashClear(gCtgMgmt.pCluster);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgVgInfoIdComp(void const* lp, void const* rp) {
|
||||||
|
int32_t* key = (int32_t*)lp;
|
||||||
|
SVgroupInfo* pVg = (SVgroupInfo*)rp;
|
||||||
|
|
||||||
|
if (*key < pVg->vgId) {
|
||||||
|
return -1;
|
||||||
|
} else if (*key > pVg->vgId) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgUpdateVgMsg *msg = operation->data;
|
SCtgUpdateVgMsg *msg = operation->data;
|
||||||
|
@ -1973,7 +2005,13 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
|
||||||
|
|
||||||
SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
|
SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
|
||||||
if (NULL == pInfo) {
|
if (NULL == pInfo) {
|
||||||
ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName);
|
ctgDebug("no vgroup %d in db %s vgHash, ignore epset update", msg->vgId, msg->dbFName);
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SVgroupInfo *pInfo2 = taosArraySearch(vgInfo->vgArray, &msg->vgId, ctgVgInfoIdComp, TD_EQ);
|
||||||
|
if (NULL == pInfo2) {
|
||||||
|
ctgDebug("no vgroup %d in db %s vgArray, ignore epset update", msg->vgId, msg->dbFName);
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1984,6 +2022,7 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
|
||||||
msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
|
msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
|
||||||
|
|
||||||
pInfo->epSet = msg->epSet;
|
pInfo->epSet = msg->epSet;
|
||||||
|
pInfo2->epSet = msg->epSet;
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
|
|
@ -236,10 +236,8 @@ void ctgFreeVgInfo(SDBVgInfo* vgInfo) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (vgInfo->vgHash) {
|
taosHashCleanup(vgInfo->vgHash);
|
||||||
taosHashCleanup(vgInfo->vgHash);
|
taosArrayDestroy(vgInfo->vgArray);
|
||||||
vgInfo->vgHash = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFreeClear(vgInfo);
|
taosMemoryFreeClear(vgInfo);
|
||||||
}
|
}
|
||||||
|
@ -837,7 +835,7 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
int ctgVgInfoComp2(const void* lp, const void* rp) {
|
int ctgVgInfoComp(const void* lp, const void* rp) {
|
||||||
SVgroupInfo* pLeft = (SVgroupInfo*)lp;
|
SVgroupInfo* pLeft = (SVgroupInfo*)lp;
|
||||||
SVgroupInfo* pRight = (SVgroupInfo*)rp;
|
SVgroupInfo* pRight = (SVgroupInfo*)rp;
|
||||||
if (pLeft->hashBegin < pRight->hashBegin) {
|
if (pLeft->hashBegin < pRight->hashBegin) {
|
||||||
|
@ -850,32 +848,6 @@ int ctgVgInfoComp2(const void* lp, const void* rp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgHashValueComp(void const* lp, void const* rp) {
|
int32_t ctgHashValueComp(void const* lp, void const* rp) {
|
||||||
uint32_t* key = (uint32_t*)lp;
|
|
||||||
SVgroupInfo* pVg = *(SVgroupInfo**)rp;
|
|
||||||
|
|
||||||
if (*key < pVg->hashBegin) {
|
|
||||||
return -1;
|
|
||||||
} else if (*key > pVg->hashEnd) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int ctgVgInfoComp(const void* lp, const void* rp) {
|
|
||||||
SVgroupInfo* pLeft = *(SVgroupInfo**)lp;
|
|
||||||
SVgroupInfo* pRight = *(SVgroupInfo**)rp;
|
|
||||||
if (pLeft->hashBegin < pRight->hashBegin) {
|
|
||||||
return -1;
|
|
||||||
} else if (pLeft->hashBegin > pRight->hashBegin) {
|
|
||||||
return 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgHashValueComp2(void const* lp, void const* rp) {
|
|
||||||
uint32_t* key = (uint32_t*)lp;
|
uint32_t* key = (uint32_t*)lp;
|
||||||
SVgroupInfo* pVg = (SVgroupInfo*)rp;
|
SVgroupInfo* pVg = (SVgroupInfo*)rp;
|
||||||
|
|
||||||
|
@ -891,7 +863,7 @@ int32_t ctgHashValueComp2(void const* lp, void const* rp) {
|
||||||
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup) {
|
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
|
int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
|
||||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
tNameGetFullDbName(pTableName, db);
|
tNameGetFullDbName(pTableName, db);
|
||||||
|
|
||||||
|
@ -907,6 +879,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName
|
||||||
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
||||||
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
||||||
|
|
||||||
|
vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, ctgHashValueComp, TD_EQ);
|
||||||
|
|
||||||
|
/*
|
||||||
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
vgInfo = pIter;
|
vgInfo = pIter;
|
||||||
|
@ -918,84 +893,11 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName
|
||||||
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
||||||
vgInfo = NULL;
|
vgInfo = NULL;
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
if (NULL == vgInfo) {
|
if (NULL == vgInfo) {
|
||||||
ctgError("0no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db,
|
ctgError("0no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db,
|
||||||
taosHashGetSize(dbInfo->vgHash));
|
taosArrayGetSize(dbInfo->vgArray));
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
*pVgroup = *vgInfo;
|
|
||||||
|
|
||||||
ctgDebug("Got tb %s hash vgroup, vgId:%d, epNum %d, current %s port %d", tbFullName, vgInfo->vgId,
|
|
||||||
vgInfo->epSet.numOfEps, vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn,
|
|
||||||
vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
|
|
||||||
|
|
||||||
CTG_RET(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgGetVgInfoFromHashValue2(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup) {
|
|
||||||
int32_t code = 0;
|
|
||||||
|
|
||||||
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
|
|
||||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
|
||||||
tNameGetFullDbName(pTableName, db);
|
|
||||||
|
|
||||||
if (vgNum <= 0) {
|
|
||||||
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", db, vgNum);
|
|
||||||
CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED);
|
|
||||||
}
|
|
||||||
|
|
||||||
SVgroupInfo* vgInfo = NULL;
|
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
|
||||||
tNameExtractFullName(pTableName, tbFullName);
|
|
||||||
|
|
||||||
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
|
||||||
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
|
||||||
|
|
||||||
static SArray* pVgList = NULL;
|
|
||||||
static bool created = false;
|
|
||||||
|
|
||||||
if (!created) {
|
|
||||||
ctgDebug("create vg array, %d", taosHashGetSize(dbInfo->vgHash));
|
|
||||||
pVgList = taosArrayInit(100, sizeof(SVgroupInfo));
|
|
||||||
|
|
||||||
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
|
||||||
while (pIter) {
|
|
||||||
taosArrayPush(pVgList, pIter);
|
|
||||||
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArraySort(pVgList, ctgVgInfoComp2);
|
|
||||||
created = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
vgInfo = taosArraySearch(pVgList, &hashValue, ctgHashValueComp2, TD_EQ);
|
|
||||||
|
|
||||||
if (NULL == vgInfo) {
|
|
||||||
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
|
||||||
while (pIter) {
|
|
||||||
vgInfo = pIter;
|
|
||||||
if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
|
|
||||||
taosHashCancelIterate(dbInfo->vgHash, pIter);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
|
||||||
vgInfo = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (vgInfo) {
|
|
||||||
taosArrayDestroy(pVgList);
|
|
||||||
created = false;
|
|
||||||
ctgDebug("need to re-create vg array, %d", taosHashGetSize(dbInfo->vgHash));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL == vgInfo) {
|
|
||||||
ctgError("1no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db,
|
|
||||||
taosHashGetSize(dbInfo->vgHash));
|
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1013,7 +915,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgTask* pTask = tReq->pTask;
|
SCtgTask* pTask = tReq->pTask;
|
||||||
SMetaRes res = {0};
|
SMetaRes res = {0};
|
||||||
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
|
int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
|
||||||
if (vgNum <= 0) {
|
if (vgNum <= 0) {
|
||||||
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
|
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
|
@ -1023,20 +925,13 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
|
||||||
int32_t tbNum = taosArrayGetSize(pNames);
|
int32_t tbNum = taosArrayGetSize(pNames);
|
||||||
|
|
||||||
if (1 == vgNum) {
|
if (1 == vgNum) {
|
||||||
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
|
||||||
if (NULL == pIter) {
|
|
||||||
ctgError("empty vgHash, db:%s, vgroup number:%d", dbFName, vgNum);
|
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
|
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
|
||||||
if (NULL == vgInfo) {
|
if (NULL == vgInfo) {
|
||||||
taosHashCancelIterate(dbInfo->vgHash, pIter);
|
|
||||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
*vgInfo = *(SVgroupInfo*)pIter;
|
*vgInfo = *(SVgroupInfo*)taosArrayGet(dbInfo->vgArray, 0);
|
||||||
|
|
||||||
ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps,
|
ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps,
|
||||||
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
|
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
|
||||||
|
@ -1051,19 +946,9 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCancelIterate(dbInfo->vgHash, pIter);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SArray* pVgList = taosArrayInit(vgNum, POINTER_BYTES);
|
|
||||||
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
|
||||||
while (pIter) {
|
|
||||||
taosArrayPush(pVgList, &pIter);
|
|
||||||
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArraySort(pVgList, ctgVgInfoComp);
|
|
||||||
|
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
sprintf(tbFullName, "%s.", dbFName);
|
sprintf(tbFullName, "%s.", dbFName);
|
||||||
int32_t offset = strlen(tbFullName);
|
int32_t offset = strlen(tbFullName);
|
||||||
|
@ -1079,20 +964,15 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
|
||||||
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
||||||
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
||||||
|
|
||||||
SVgroupInfo** p = taosArraySearch(pVgList, &hashValue, ctgHashValueComp, TD_EQ);
|
vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, ctgHashValueComp, TD_EQ);
|
||||||
|
if (NULL == vgInfo) {
|
||||||
if (NULL == p) {
|
|
||||||
ctgError("2no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
|
ctgError("2no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
|
||||||
taosHashGetSize(dbInfo->vgHash));
|
taosArrayGetSize(dbInfo->vgArray));
|
||||||
taosArrayDestroy(pVgList);
|
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
vgInfo = *p;
|
|
||||||
|
|
||||||
SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
|
SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
|
||||||
if (NULL == pNewVg) {
|
if (NULL == pNewVg) {
|
||||||
taosArrayDestroy(pVgList);
|
|
||||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1112,8 +992,6 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pVgList);
|
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1190,6 +1068,10 @@ int32_t ctgCloneVgInfo(SDBVgInfo* src, SDBVgInfo** dst) {
|
||||||
pIter = taosHashIterate(src->vgHash, pIter);
|
pIter = taosHashIterate(src->vgHash, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (src->vgArray) {
|
||||||
|
(*dst)->vgArray = taosArrayDup(src->vgArray, NULL);
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue