enh: optimize getting table hash performance
This commit is contained in:
parent
18f924cb5e
commit
c1539bc064
|
@ -883,6 +883,20 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgHashValueComp(void const *lp, void const *rp) {
|
||||||
|
uint32_t *key = (uint32_t *)lp;
|
||||||
|
SVgroupInfo *pVg = *(SVgroupInfo **)rp;
|
||||||
|
|
||||||
|
if (*key < pVg->hashBegin) {
|
||||||
|
return -1;
|
||||||
|
} else if (*key > pVg->hashEnd) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update) {
|
int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo *dbInfo, SCtgTbHashsCtx *pCtx, char* dbFName, SArray* pNames, bool update) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgTask* pTask = tReq->pTask;
|
SCtgTask* pTask = tReq->pTask;
|
||||||
|
@ -923,9 +937,17 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosHashCancelIterate(dbInfo->vgHash, pIter);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SArray* pVgList = taosArrayInit(vgNum, POINTER_BYTES);
|
||||||
|
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
||||||
|
while (pIter) {
|
||||||
|
taosArrayPush(pVgList, &pIter);
|
||||||
|
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
||||||
|
}
|
||||||
|
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
||||||
sprintf(tbFullName, "%s.", dbFName);
|
sprintf(tbFullName, "%s.", dbFName);
|
||||||
int32_t offset = strlen(tbFullName);
|
int32_t offset = strlen(tbFullName);
|
||||||
|
@ -940,25 +962,19 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
|
||||||
|
|
||||||
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)tbNameLen);
|
uint32_t hashValue = (*fp)(tbFullName, (uint32_t)tbNameLen);
|
||||||
|
|
||||||
void *pIter = taosHashIterate(dbInfo->vgHash, NULL);
|
SVgroupInfo **p = taosArraySearch(pVgList, &hashValue, ctgHashValueComp, TD_EQ);
|
||||||
while (pIter) {
|
|
||||||
vgInfo = pIter;
|
|
||||||
if (hashValue >= vgInfo->hashBegin && hashValue <= vgInfo->hashEnd) {
|
|
||||||
taosHashCancelIterate(dbInfo->vgHash, pIter);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pIter = taosHashIterate(dbInfo->vgHash, pIter);
|
if (NULL == p) {
|
||||||
vgInfo = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (NULL == vgInfo) {
|
|
||||||
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash));
|
ctgError("no hash range found for hash value [%u], db:%s, numOfVgId:%d", hashValue, dbFName, taosHashGetSize(dbInfo->vgHash));
|
||||||
|
taosArrayDestroy(pVgList);
|
||||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vgInfo = *p;
|
||||||
|
|
||||||
SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
|
SVgroupInfo* pNewVg = taosMemoryMalloc(sizeof(SVgroupInfo));
|
||||||
if (NULL == pNewVg) {
|
if (NULL == pNewVg) {
|
||||||
|
taosArrayDestroy(pVgList);
|
||||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -977,6 +993,8 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pVgList);
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1962,6 +1962,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
|
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pWrapper->exchangeId);
|
||||||
if (pExchangeInfo == NULL) {
|
if (pExchangeInfo == NULL) {
|
||||||
qWarn("failed to acquire exchange operator, since it may have been released");
|
qWarn("failed to acquire exchange operator, since it may have been released");
|
||||||
|
taosMemoryFree(pMsg->pData);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1980,6 +1981,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
ASSERT(pRsp != NULL);
|
ASSERT(pRsp != NULL);
|
||||||
qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
|
qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows);
|
||||||
} else {
|
} else {
|
||||||
|
taosMemoryFree(pMsg->pData);
|
||||||
pSourceDataInfo->code = code;
|
pSourceDataInfo->code = code;
|
||||||
qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code));
|
qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue