diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 1ca60c89cd..7b81343358 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -883,6 +883,32 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName CTG_RET(code); } +int32_t ctgHashValueComp(void const *lp, void const *rp) { + uint32_t *key = (uint32_t *)lp; + SVgroupInfo *pVg = *(SVgroupInfo **)rp; + + if (*key < pVg->hashBegin) { + return -1; + } else if (*key > pVg->hashEnd) { + return 1; + } + + return 0; +} + +int ctgVgInfoComp(const void* lp, const void* rp) { + SVgroupInfo *pLeft = *(SVgroupInfo **)lp; + SVgroupInfo *pRight = *(SVgroupInfo **)rp; + if (pLeft->hashBegin < pRight->hashBegin) { + return -1; + } else if (pLeft->hashBegin > pRight->hashBegin) { + return 1; + } + + return 0; +} + + int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update) { int32_t code = 0; SCtgTask* pTask = tReq->pTask; @@ -923,9 +949,19 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo } } + taosHashCancelIterate(dbInfo->vgHash, pIter); return TSDB_CODE_SUCCESS; } + SArray* pVgList = taosArrayInit(vgNum, POINTER_BYTES); + void *pIter = taosHashIterate(dbInfo->vgHash, NULL); + while (pIter) { + taosArrayPush(pVgList, &pIter); + pIter = taosHashIterate(dbInfo->vgHash, pIter); + } + + taosArraySort(pVgList, ctgVgInfoComp); + char tbFullName[TSDB_TABLE_FNAME_LEN]; sprintf(tbFullName, "%s.", dbFName); int32_t offset = strlen(tbFullName); @@ -940,25 +976,20 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo uint32_t hashValue = (*fp)(tbFullName, (uint32_t)tbNameLen); - void *pIter = taosHashIterate(dbInfo->vgHash, NULL); - while (pIter) { - vgInfo = pIter; - if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) { - taosHashCancelIterate(dbInfo->vgHash, pIter); - break; - } - - pIter = taosHashIterate(dbInfo->vgHash, pIter); - vgInfo = NULL; - } + SVgroupInfo **p = taosArraySearch(pVgList, &hashValue, ctgHashValueComp, TD_EQ); - if (NULL == vgInfo) { + if (NULL == p) { ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash)); + ASSERT(0); + taosArrayDestroy(pVgList); CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } + vgInfo = *p; + SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo)); if (NULL == pNewVg) { + taosArrayDestroy(pVgList); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -977,6 +1008,8 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo } } + taosArrayDestroy(pVgList); + CTG_RET(code); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3ac08bc20e..9e485e7684 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1949,6 +1949,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId); if (pExchangeInfo == NULL) { qWarn("failed to acquire exchange operator, since it may have been released"); + taosMemoryFree(pMsg->pData); return TSDB_CODE_SUCCESS; } @@ -1969,6 +1970,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, pRsp->numOfRows); } else { + taosMemoryFree(pMsg->pData); pSourceDataInfo->code = code; qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code)); }