Merge pull request #18643 from taosdata/enh/insert_multi_table_optimize
enh: optimize multiple table insert
This commit is contained in:
commit
9cb07360de
|
@ -129,6 +129,7 @@ typedef struct SDBVgInfo {
|
|||
int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT
|
||||
int64_t stateTs;
|
||||
SHashObj* vgHash; // key:vgId, value:SVgroupInfo
|
||||
SArray* vgArray;
|
||||
} SDBVgInfo;
|
||||
|
||||
typedef struct SUseDbOutput {
|
||||
|
@ -238,6 +239,7 @@ int32_t dataConverToStr(char* str, int type, void* buf, int32_t bufSize, int32_t
|
|||
char* parseTagDatatoJson(void* p);
|
||||
int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst);
|
||||
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst);
|
||||
void freeVgInfo(SDBVgInfo* vgInfo);
|
||||
|
||||
extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen,
|
||||
void* (*mallocFp)(int64_t));
|
||||
|
|
|
@ -762,7 +762,6 @@ int32_t ctgCloneMetaOutput(STableMetaOutput* output, STableMetaOutput** pOutput)
|
|||
int32_t ctgGenerateVgList(SCatalog* pCtg, SHashObj* vgHash, SArray** pList);
|
||||
void ctgFreeJob(void* job);
|
||||
void ctgFreeHandleImpl(SCatalog* pCtg);
|
||||
void ctgFreeVgInfo(SDBVgInfo* vgInfo);
|
||||
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup);
|
||||
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
|
||||
char* dbFName, SArray* pNames, bool update);
|
||||
|
@ -789,6 +788,8 @@ int32_t ctgRemoveTbMeta(SCatalog* pCtg, SName* pTableName);
|
|||
int32_t ctgGetTbHashVgroup(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SVgroupInfo* pVgroup, bool* exists);
|
||||
SName* ctgGetFetchName(SArray* pNames, SCtgFetch* pFetch);
|
||||
int32_t ctgdGetOneHandle(SCatalog **pHandle);
|
||||
int ctgVgInfoComp(const void* lp, const void* rp);
|
||||
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo);
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
extern SCtgDebug gCTGDebug;
|
||||
|
|
|
@ -505,8 +505,7 @@ _return:
|
|||
taosMemoryFreeClear(tbMeta);
|
||||
|
||||
if (vgInfo) {
|
||||
taosHashCleanup(vgInfo->vgHash);
|
||||
taosMemoryFreeClear(vgInfo);
|
||||
freeVgInfo(vgInfo);
|
||||
}
|
||||
|
||||
if (vgList) {
|
||||
|
@ -546,8 +545,7 @@ _return:
|
|||
}
|
||||
|
||||
if (vgInfo) {
|
||||
taosHashCleanup(vgInfo->vgHash);
|
||||
taosMemoryFreeClear(vgInfo);
|
||||
freeVgInfo(vgInfo);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
|
@ -778,8 +776,7 @@ _return:
|
|||
}
|
||||
|
||||
if (vgInfo) {
|
||||
taosHashCleanup(vgInfo->vgHash);
|
||||
taosMemoryFreeClear(vgInfo);
|
||||
freeVgInfo(vgInfo);
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
|
@ -836,8 +833,7 @@ _return:
|
|||
ctgRUnlockVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
} else if (dbInfo) {
|
||||
taosHashCleanup(dbInfo->vgHash);
|
||||
taosMemoryFreeClear(dbInfo);
|
||||
freeVgInfo(dbInfo);
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
|
@ -849,7 +845,7 @@ int32_t catalogUpdateDBVgInfo(SCatalog* pCtg, const char* dbFName, uint64_t dbId
|
|||
int32_t code = 0;
|
||||
|
||||
if (NULL == pCtg || NULL == dbFName || NULL == dbInfo) {
|
||||
ctgFreeVgInfo(dbInfo);
|
||||
freeVgInfo(dbInfo);
|
||||
CTG_ERR_JRET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
|
|
|
@ -226,6 +226,7 @@ _return:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
int32_t ctgAcquireStbMetaFromCache(SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgDBCache **pDb, SCtgTbCache **pTb) {
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
SCtgTbCache *pCache = NULL;
|
||||
|
@ -276,6 +277,49 @@ _return:
|
|||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
*/
|
||||
|
||||
int32_t ctgAcquireStbMetaFromCache(SCtgDBCache *dbCache, SCatalog *pCtg, char *dbFName, uint64_t suid, SCtgTbCache **pTb) {
|
||||
SCtgTbCache *pCache = NULL;
|
||||
char *stName = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
|
||||
if (NULL == stName) {
|
||||
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
pCache = taosHashAcquire(dbCache->tbCache, stName, strlen(stName));
|
||||
if (NULL == pCache) {
|
||||
ctgDebug("stb 0x%" PRIx64 " name %s not in cache, dbFName:%s", suid, stName, dbFName);
|
||||
taosHashRelease(dbCache->stbCache, stName);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
taosHashRelease(dbCache->stbCache, stName);
|
||||
|
||||
CTG_LOCK(CTG_READ, &pCache->metaLock);
|
||||
if (NULL == pCache->pMeta) {
|
||||
ctgDebug("stb 0x%" PRIx64 " meta not in cache, dbFName:%s", suid, dbFName);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
*pTb = pCache;
|
||||
|
||||
ctgDebug("stb 0x%" PRIx64 " meta got in cache, dbFName:%s", suid, dbFName);
|
||||
|
||||
CTG_CACHE_STAT_INC(numOfMetaHit, 1);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, pCache);
|
||||
|
||||
CTG_CACHE_STAT_INC(numOfMetaMiss, 1);
|
||||
|
||||
*pTb = NULL;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgAcquireTbIndexFromCache(SCatalog *pCtg, char *dbFName, char *tbName, SCtgDBCache **pDb, SCtgTbCache **pTb) {
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
|
@ -384,13 +428,19 @@ int32_t ctgReadTbMetaFromCache(SCatalog *pCtg, SCtgTbMetaCtx *ctx, STableMeta **
|
|||
|
||||
memcpy(*pTableMeta, tbMeta, metaSize);
|
||||
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
|
||||
if (tbCache) {
|
||||
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||
taosHashRelease(dbCache->tbCache, tbCache);
|
||||
}
|
||||
|
||||
ctgDebug("Got ctb %s meta from cache, will continue to get its stb meta, type:%d, dbFName:%s", ctx->pName->tname,
|
||||
ctx->tbInfo.tbType, dbFName);
|
||||
|
||||
ctgAcquireStbMetaFromCache(pCtg, dbFName, ctx->tbInfo.suid, &dbCache, &tbCache);
|
||||
ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, ctx->tbInfo.suid, &tbCache);
|
||||
if (NULL == tbCache) {
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
taosMemoryFreeClear(*pTableMeta);
|
||||
ctgDebug("stb 0x%" PRIx64 " meta not in cache", ctx->tbInfo.suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -460,12 +510,17 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
|
|||
|
||||
// PROCESS FOR CHILD TABLE
|
||||
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
if (tbCache) {
|
||||
CTG_UNLOCK(CTG_READ, &tbCache->metaLock);
|
||||
taosHashRelease(dbCache->tbCache, tbCache);
|
||||
}
|
||||
|
||||
ctgDebug("Got ctb %s ver from cache, will continue to get its stb ver, dbFName:%s", pTableName->tname, dbFName);
|
||||
|
||||
ctgAcquireStbMetaFromCache(pCtg, dbFName, *suid, &dbCache, &tbCache);
|
||||
ctgAcquireStbMetaFromCache(dbCache, pCtg, dbFName, *suid, &tbCache);
|
||||
if (NULL == tbCache) {
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
//ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
ctgDebug("stb 0x%" PRIx64 " meta not in cache", *suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -794,7 +849,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
|
|||
if (NULL == msg) {
|
||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateVgMsg));
|
||||
taosMemoryFree(op);
|
||||
ctgFreeVgInfo(dbInfo);
|
||||
freeVgInfo(dbInfo);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -803,6 +858,14 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
|
|||
dbFName = p + 1;
|
||||
}
|
||||
|
||||
code = ctgMakeVgArray(dbInfo);
|
||||
if (code) {
|
||||
taosMemoryFree(op);
|
||||
taosMemoryFree(msg);
|
||||
freeVgInfo(dbInfo);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||
msg->pCtg = pCtg;
|
||||
msg->dbId = dbId;
|
||||
|
@ -816,7 +879,7 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
|
|||
|
||||
_return:
|
||||
|
||||
ctgFreeVgInfo(dbInfo);
|
||||
freeVgInfo(dbInfo);
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -1549,6 +1612,20 @@ void ctgFreeAllInstance(void) {
|
|||
taosHashClear(gCtgMgmt.pCluster);
|
||||
}
|
||||
|
||||
int32_t ctgVgInfoIdComp(void const* lp, void const* rp) {
|
||||
int32_t* key = (int32_t*)lp;
|
||||
SVgroupInfo* pVg = (SVgroupInfo*)rp;
|
||||
|
||||
if (*key < pVg->vgId) {
|
||||
return -1;
|
||||
} else if (*key > pVg->vgId) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
||||
int32_t code = 0;
|
||||
SCtgUpdateVgMsg *msg = operation->data;
|
||||
|
@ -1600,7 +1677,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
|||
goto _return;
|
||||
}
|
||||
|
||||
ctgFreeVgInfo(vgInfo);
|
||||
freeVgInfo(vgInfo);
|
||||
}
|
||||
|
||||
vgCache->vgInfo = dbInfo;
|
||||
|
@ -1621,7 +1698,7 @@ int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
|
|||
|
||||
_return:
|
||||
|
||||
ctgFreeVgInfo(msg->dbInfo);
|
||||
freeVgInfo(msg->dbInfo);
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
CTG_RET(code);
|
||||
|
@ -1674,7 +1751,7 @@ int32_t ctgOpDropDbVgroup(SCtgCacheOperation *operation) {
|
|||
|
||||
CTG_ERR_JRET(ctgWLockVgInfo(pCtg, dbCache));
|
||||
|
||||
ctgFreeVgInfo(dbCache->vgCache.vgInfo);
|
||||
freeVgInfo(dbCache->vgCache.vgInfo);
|
||||
dbCache->vgCache.vgInfo = NULL;
|
||||
|
||||
ctgDebug("db vgInfo removed, dbFName:%s", msg->dbFName);
|
||||
|
@ -1930,7 +2007,13 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
|
|||
|
||||
SVgroupInfo *pInfo = taosHashGet(vgInfo->vgHash, &msg->vgId, sizeof(msg->vgId));
|
||||
if (NULL == pInfo) {
|
||||
ctgDebug("no vgroup %d in db %s, ignore epset update", msg->vgId, msg->dbFName);
|
||||
ctgDebug("no vgroup %d in db %s vgHash, ignore epset update", msg->vgId, msg->dbFName);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
SVgroupInfo *pInfo2 = taosArraySearch(vgInfo->vgArray, &msg->vgId, ctgVgInfoIdComp, TD_EQ);
|
||||
if (NULL == pInfo2) {
|
||||
ctgDebug("no vgroup %d in db %s vgArray, ignore epset update", msg->vgId, msg->dbFName);
|
||||
goto _return;
|
||||
}
|
||||
|
||||
|
@ -1941,6 +2024,7 @@ int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation) {
|
|||
msg->epSet.numOfEps, pNewEp->fqdn, pNewEp->port, msg->dbFName);
|
||||
|
||||
pInfo->epSet = msg->epSet;
|
||||
pInfo2->epSet = msg->epSet;
|
||||
|
||||
_return:
|
||||
|
||||
|
@ -2057,7 +2141,7 @@ void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
|
|||
switch (op->opId) {
|
||||
case CTG_OP_UPDATE_VGROUP: {
|
||||
SCtgUpdateVgMsg *msg = op->data;
|
||||
ctgFreeVgInfo(msg->dbInfo);
|
||||
freeVgInfo(msg->dbInfo);
|
||||
taosMemoryFreeClear(op->data);
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -231,20 +231,7 @@ void ctgFreeTbCache(SCtgDBCache* dbCache) {
|
|||
CTG_CACHE_STAT_DEC(numOfTbl, tblNum);
|
||||
}
|
||||
|
||||
void ctgFreeVgInfo(SDBVgInfo* vgInfo) {
|
||||
if (NULL == vgInfo) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (vgInfo->vgHash) {
|
||||
taosHashCleanup(vgInfo->vgHash);
|
||||
vgInfo->vgHash = NULL;
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(vgInfo);
|
||||
}
|
||||
|
||||
void ctgFreeVgInfoCache(SCtgDBCache* dbCache) { ctgFreeVgInfo(dbCache->vgCache.vgInfo); }
|
||||
void ctgFreeVgInfoCache(SCtgDBCache* dbCache) { freeVgInfo(dbCache->vgCache.vgInfo); }
|
||||
|
||||
void ctgFreeDbCache(SCtgDBCache* dbCache) {
|
||||
if (NULL == dbCache) {
|
||||
|
@ -366,8 +353,7 @@ void ctgFreeSUseDbOutput(SUseDbOutput* pOutput) {
|
|||
}
|
||||
|
||||
if (pOutput->dbVgroup) {
|
||||
taosHashCleanup(pOutput->dbVgroup->vgHash);
|
||||
taosMemoryFreeClear(pOutput->dbVgroup);
|
||||
freeVgInfo(pOutput->dbVgroup);
|
||||
}
|
||||
|
||||
taosMemoryFree(pOutput);
|
||||
|
@ -573,8 +559,7 @@ void ctgFreeSubTaskRes(CTG_TASK_TYPE type, void** pRes) {
|
|||
case CTG_TASK_GET_DB_VGROUP: {
|
||||
if (*pRes) {
|
||||
SDBVgInfo* pInfo = (SDBVgInfo*)*pRes;
|
||||
taosHashCleanup(pInfo->vgHash);
|
||||
taosMemoryFreeClear(*pRes);
|
||||
freeVgInfo(pInfo);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
@ -837,10 +822,36 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int ctgVgInfoComp(const void* lp, const void* rp) {
|
||||
SVgroupInfo* pLeft = (SVgroupInfo*)lp;
|
||||
SVgroupInfo* pRight = (SVgroupInfo*)rp;
|
||||
if (pLeft->hashBegin < pRight->hashBegin) {
|
||||
return -1;
|
||||
} else if (pLeft->hashBegin > pRight->hashBegin) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t ctgHashValueComp(void const* lp, void const* rp) {
|
||||
uint32_t* key = (uint32_t*)lp;
|
||||
SVgroupInfo* pVg = (SVgroupInfo*)rp;
|
||||
|
||||
if (*key < pVg->hashBegin) {
|
||||
return -1;
|
||||
} else if (*key > pVg->hashEnd) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName* pTableName, SVgroupInfo* pVgroup) {
|
||||
int32_t code = 0;
|
||||
CTG_ERR_RET(ctgMakeVgArray(dbInfo));
|
||||
|
||||
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
|
||||
int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
|
||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||
tNameGetFullDbName(pTableName, db);
|
||||
|
||||
|
@ -856,6 +867,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName
|
|||
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
||||
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
||||
|
||||
vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, ctgHashValueComp, TD_EQ);
|
||||
|
||||
/*
|
||||
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
||||
while (pIter) {
|
||||
vgInfo = pIter;
|
||||
|
@ -867,10 +881,11 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName
|
|||
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
||||
vgInfo = NULL;
|
||||
}
|
||||
*/
|
||||
|
||||
if (NULL == vgInfo) {
|
||||
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, db,
|
||||
taosHashGetSize(dbInfo->vgHash));
|
||||
(int32_t)taosArrayGetSize(dbInfo->vgArray));
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
|
@ -883,37 +898,15 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog* pCtg, SDBVgInfo* dbInfo, const SName
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgHashValueComp(void const* lp, void const* rp) {
|
||||
uint32_t* key = (uint32_t*)lp;
|
||||
SVgroupInfo* pVg = *(SVgroupInfo**)rp;
|
||||
|
||||
if (*key < pVg->hashBegin) {
|
||||
return -1;
|
||||
} else if (*key > pVg->hashEnd) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int ctgVgInfoComp(const void* lp, const void* rp) {
|
||||
SVgroupInfo* pLeft = *(SVgroupInfo**)lp;
|
||||
SVgroupInfo* pRight = *(SVgroupInfo**)rp;
|
||||
if (pLeft->hashBegin < pRight->hashBegin) {
|
||||
return -1;
|
||||
} else if (pLeft->hashBegin > pRight->hashBegin) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo* dbInfo, SCtgTbHashsCtx* pCtx,
|
||||
char* dbFName, SArray* pNames, bool update) {
|
||||
int32_t code = 0;
|
||||
SCtgTask* pTask = tReq->pTask;
|
||||
SMetaRes res = {0};
|
||||
int32_t vgNum = taosHashGetSize(dbInfo->vgHash);
|
||||
|
||||
CTG_ERR_RET(ctgMakeVgArray(dbInfo));
|
||||
|
||||
int32_t vgNum = taosArrayGetSize(dbInfo->vgArray);
|
||||
if (vgNum <= 0) {
|
||||
ctgError("db vgroup cache invalid, db:%s, vgroup number:%d", dbFName, vgNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
|
@ -923,20 +916,13 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
|
|||
int32_t tbNum = taosArrayGetSize(pNames);
|
||||
|
||||
if (1 == vgNum) {
|
||||
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
||||
if (NULL == pIter) {
|
||||
ctgError("empty vgHash, db:%s, vgroup number:%d", dbFName, vgNum);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < tbNum; ++i) {
|
||||
vgInfo = taosMemoryMalloc(sizeof(SVgroupInfo));
|
||||
if (NULL == vgInfo) {
|
||||
taosHashCancelIterate(dbInfo->vgHash, pIter);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
*vgInfo = *(SVgroupInfo*)pIter;
|
||||
*vgInfo = *(SVgroupInfo*)taosArrayGet(dbInfo->vgArray, 0);
|
||||
|
||||
ctgDebug("Got tb hash vgroup, vgId:%d, epNum %d, current %s port %d", vgInfo->vgId, vgInfo->epSet.numOfEps,
|
||||
vgInfo->epSet.eps[vgInfo->epSet.inUse].fqdn, vgInfo->epSet.eps[vgInfo->epSet.inUse].port);
|
||||
|
@ -951,19 +937,9 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
|
|||
}
|
||||
}
|
||||
|
||||
taosHashCancelIterate(dbInfo->vgHash, pIter);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SArray* pVgList = taosArrayInit(vgNum, POINTER_BYTES);
|
||||
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
||||
while (pIter) {
|
||||
taosArrayPush(pVgList, &pIter);
|
||||
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
||||
}
|
||||
|
||||
taosArraySort(pVgList, ctgVgInfoComp);
|
||||
|
||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||
sprintf(tbFullName, "%s.", dbFName);
|
||||
int32_t offset = strlen(tbFullName);
|
||||
|
@ -979,20 +955,15 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
|
|||
uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod,
|
||||
dbInfo->hashPrefix, dbInfo->hashSuffix);
|
||||
|
||||
SVgroupInfo** p = taosArraySearch(pVgList, &hashValue, ctgHashValueComp, TD_EQ);
|
||||
|
||||
if (NULL == p) {
|
||||
vgInfo = taosArraySearch(dbInfo->vgArray, &hashValue, ctgHashValueComp, TD_EQ);
|
||||
if (NULL == vgInfo) {
|
||||
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName,
|
||||
taosHashGetSize(dbInfo->vgHash));
|
||||
taosArrayDestroy(pVgList);
|
||||
(int32_t)taosArrayGetSize(dbInfo->vgArray));
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
vgInfo = *p;
|
||||
|
||||
SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
|
||||
if (NULL == pNewVg) {
|
||||
taosArrayDestroy(pVgList);
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
|
@ -1012,8 +983,6 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog* pCtg, SCtgTaskReq* tReq, SDBVgInfo*
|
|||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pVgList);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
@ -1057,7 +1026,33 @@ int32_t ctgDbVgVersionSortCompare(const void* key1, const void* key2) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t ctgMakeVgArray(SDBVgInfo* dbInfo) {
|
||||
if (NULL == dbInfo) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (dbInfo->vgHash && NULL == dbInfo->vgArray) {
|
||||
dbInfo->vgArray = taosArrayInit(100, sizeof(SVgroupInfo));
|
||||
if (NULL == dbInfo->vgArray) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
void* pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
||||
while (pIter) {
|
||||
taosArrayPush(dbInfo->vgArray, pIter);
|
||||
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
||||
}
|
||||
|
||||
taosArraySort(dbInfo->vgArray, ctgVgInfoComp);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgCloneVgInfo(SDBVgInfo* src, SDBVgInfo** dst) {
|
||||
CTG_ERR_RET(ctgMakeVgArray(src));
|
||||
|
||||
*dst = taosMemoryMalloc(sizeof(SDBVgInfo));
|
||||
if (NULL == *dst) {
|
||||
qError("malloc %d failed", (int32_t)sizeof(SDBVgInfo));
|
||||
|
@ -1090,6 +1085,10 @@ int32_t ctgCloneVgInfo(SDBVgInfo* src, SDBVgInfo** dst) {
|
|||
pIter = taosHashIterate(src->vgHash, pIter);
|
||||
}
|
||||
|
||||
if (src->vgArray) {
|
||||
(*dst)->vgArray = taosArrayDup(src->vgArray, NULL);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -447,6 +447,19 @@ int32_t cloneTableMeta(STableMeta* pSrc, STableMeta** pDst) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
void freeVgInfo(SDBVgInfo* vgInfo) {
|
||||
if (NULL == vgInfo) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosHashCleanup(vgInfo->vgHash);
|
||||
taosArrayDestroy(vgInfo->vgArray);
|
||||
|
||||
taosMemoryFreeClear(vgInfo);
|
||||
}
|
||||
|
||||
|
||||
int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
|
||||
if (NULL == pSrc) {
|
||||
*pDst = NULL;
|
||||
|
@ -475,8 +488,7 @@ int32_t cloneDbVgInfo(SDBVgInfo* pSrc, SDBVgInfo** pDst) {
|
|||
if (0 != taosHashPut((*pDst)->vgHash, vgId, sizeof(*vgId), vgInfo, sizeof(*vgInfo))) {
|
||||
qError("taosHashPut failed, vgId:%d", vgInfo->vgId);
|
||||
taosHashCancelIterate(pSrc->vgHash, pIter);
|
||||
taosHashCleanup((*pDst)->vgHash);
|
||||
taosMemoryFreeClear(*pDst);
|
||||
freeVgInfo(*pDst);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue