Merge pull request #28035 from taosdata/fix/TD-32259-3.0
fix(query)[TD-32259]. Fix error handling in merge sort
This commit is contained in:
commit
0fa6c31b5b
|
@ -437,41 +437,36 @@ int32_t tsortAddSource(SSortHandle* pSortHandle, void* pSource) {
|
|||
|
||||
static int32_t doAddNewExternalMemSource(SDiskbasedBuf* pBuf, SArray* pAllSources, SSDataBlock* pBlock,
|
||||
int32_t* sourceId, SArray* pPageIdList) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||
if (pSource == NULL) {
|
||||
taosArrayDestroy(pPageIdList);
|
||||
return terrno;
|
||||
}
|
||||
QUERY_CHECK_NULL(pSource, code, lino, _err, terrno);
|
||||
|
||||
pSource->src.pBlock = pBlock;
|
||||
pSource->pageIdList = pPageIdList;
|
||||
|
||||
void* p = taosArrayPush(pAllSources, &pSource);
|
||||
if (p == NULL) {
|
||||
taosArrayDestroy(pPageIdList);
|
||||
return terrno;
|
||||
}
|
||||
SSortSource** p = taosArrayPush(pAllSources, &pSource);
|
||||
QUERY_CHECK_NULL(p, code, lino, _err, terrno);
|
||||
pSource = NULL;
|
||||
|
||||
(*sourceId) += 1;
|
||||
|
||||
int32_t rowSize = blockDataGetSerialRowSize(pSource->src.pBlock);
|
||||
int32_t rowSize = blockDataGetSerialRowSize((*p)->src.pBlock);
|
||||
|
||||
// The value of numOfRows must be greater than 0, which is guaranteed by the previous memory allocation
|
||||
int32_t numOfRows =
|
||||
(getBufPageSize(pBuf) - blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock))) / rowSize;
|
||||
if (numOfRows <= 0) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
taosArrayDestroy(pPageIdList);
|
||||
return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
}
|
||||
QUERY_CHECK_CONDITION((numOfRows > 0), code, lino, _err, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
|
||||
|
||||
int32_t code = blockDataEnsureCapacity(pSource->src.pBlock, numOfRows);
|
||||
if (code != 0) {
|
||||
qError("sort failed at: %s:%d", __func__, __LINE__);
|
||||
taosArrayDestroy(pPageIdList);
|
||||
}
|
||||
code = blockDataEnsureCapacity((*p)->src.pBlock, numOfRows);
|
||||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
if (pSource) taosMemoryFree(pSource);
|
||||
qError("sort failed at %s:%d since %s", __func__, lino, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
||||
|
@ -554,7 +549,12 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
|||
return code;
|
||||
}
|
||||
|
||||
return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
|
||||
code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId, pPageIdList);
|
||||
if (code) {
|
||||
blockDataDestroy(pBlock);
|
||||
taosArrayDestroy(pPageIdList);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static void setCurrentSourceDone(SSortSource* pSource, SSortHandle* pHandle) {
|
||||
|
@ -1023,6 +1023,9 @@ static int32_t doSortForEachGroup(SSortHandle* pHandle, int32_t sortTimes, int32
|
|||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
|
||||
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
blockDataDestroy(pBlock);
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
}
|
||||
|
||||
|
@ -2144,6 +2147,10 @@ static int32_t sortBlocksToExtSource(SSortHandle* pHandle, SArray* aBlk, SArray*
|
|||
if (code) goto _error;
|
||||
|
||||
code = doAddNewExternalMemSource(pHandle->pBuf, aExtSrc, pMemSrcBlk, &pHandle->sourceId, aPgId);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
blockDataDestroy(pMemSrcBlk);
|
||||
goto _error;
|
||||
}
|
||||
|
||||
cleanupMergeSup(&sup);
|
||||
tMergeTreeDestroy(&pTree);
|
||||
|
@ -2306,9 +2313,15 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
|
|||
}
|
||||
|
||||
code = tSimpleHashPut(mUidBlk, &pBlk->info.id.uid, sizeof(pBlk->info.id.uid), &tBlk, POINTER_BYTES);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
blockDataDestroy(tBlk);
|
||||
}
|
||||
QUERY_CHECK_CODE(code, lino, _err);
|
||||
|
||||
void* px = taosArrayPush(aBlkSort, &tBlk);
|
||||
if (px == NULL) {
|
||||
blockDataDestroy(tBlk);
|
||||
}
|
||||
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue