refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-09-06 11:32:54 +08:00
parent fc8ac48c68
commit f67e7d2b63
2 changed files with 43 additions and 43 deletions

View File

@ -363,17 +363,18 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
if (ps == NULL) {
SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource));
if (pSource == NULL) {
qInfo("alloc:%p", pSource);
return terrno;
}
ps->param = pOperator->pDownstream[0];
ps->onlyRef = true;
pSource->param = pOperator->pDownstream[0];
pSource->onlyRef = true;
code = tsortAddSource(pInfo->pSortHandle, ps);
code = tsortAddSource(pInfo->pSortHandle, pSource);
if (code) {
taosMemoryFree(ps);
taosMemoryFree(pSource);
return code;
}
@ -400,7 +401,7 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
return code;
}
// multi-group case not handle here
@ -408,7 +409,7 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
while (1) {
if (tsortIsClosed(pInfo->pSortHandle)) {
code = TSDB_CODE_TSC_QUERY_CANCELLED;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,
@ -648,23 +649,22 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam));
if (ps == NULL || param == NULL) {
T_LONG_JMP(pTaskInfo->env, terrno);
taosMemoryFree(ps);
taosMemoryFree(param);
return terrno;
}
param->childOpInfo = pOperator->pDownstream[0];
param->grpSortOpInfo = pInfo;
ps->param = param;
ps->onlyRef = false;
code = tsortAddSource(pInfo->pCurrSortHandle, ps);
if (code) {
T_LONG_JMP(pTaskInfo->env, code);
if (code != 0) {
return code;
}
code = tsortOpen(pInfo->pCurrSortHandle);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
return code;
}
@ -696,7 +696,7 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
int32_t code = pOperator->fpSet._openFn(pOperator);
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
return code;
}
if (!pInfo->hasGroupId) {
@ -720,15 +720,14 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) {
while (pInfo->pCurrSortHandle != NULL) {
if (tsortIsClosed(pInfo->pCurrSortHandle)) {
code = TSDB_CODE_TSC_QUERY_CANCELLED;
T_LONG_JMP(pOperator->pTaskInfo->env, code);
T_LONG_JMP(pTaskInfo->env, code);
}
// beginSortGroup would fetch all child blocks of pInfo->currGroupId;
if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) {
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
pOperator->pTaskInfo->code = code;
pTaskInfo->code = code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pOperator->pTaskInfo->env, code);
return code;
}
code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity,

View File

@ -367,7 +367,7 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) {
return TSDB_CODE_SUCCESS;
}
void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
void tsortClearOrderedSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) {
for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) {
SSortSource** pSource = taosArrayGet(pOrderedSource, i);
if (NULL == *pSource) {
@ -394,6 +394,7 @@ void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *f
(*pSource)->src.pBlock = NULL;
}
qInfo("---free:%p", *pSource);
taosMemoryFreeClear(*pSource);
}
@ -413,10 +414,11 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
destroyDiskbasedBuf(pSortHandle->pBuf);
taosMemoryFreeClear(pSortHandle->idStr);
blockDataDestroy(pSortHandle->pDataBlock);
if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
int64_t fetchUs = 0, fetchNum = 0;
tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
tsortClearOrderedSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr);
taosArrayDestroy(pSortHandle->pOrderedSource);
@ -1083,12 +1085,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList);
if (code != TSDB_CODE_SUCCESS) {
taosArrayDestroy(pResList);
taosArrayDestroy(pResList); // this may cause memory leak.
return code;
}
}
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
void* px = taosArrayAddAll(pHandle->pOrderedSource, pResList);
if (px == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -2346,7 +2348,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) {
}
}
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
if (!tsortIsClosed(pHandle)) {
void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc);
QUERY_CHECK_NULL(px, code, lino, _err, terrno);
@ -2396,19 +2398,19 @@ static void freeSSortSource(SSortSource* source) {
static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
int32_t code = 0;
size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize;
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
if (pSource == NULL) {
SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0);
if (p == NULL) {
return terrno;
}
SSortSource* source = *pSource;
*pSource = NULL;
SSortSource* pSource = *p;
tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL);
taosArrayRemove(pHandle->pOrderedSource, 0);
tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL);
while (1) {
SSDataBlock* pBlock = NULL;
TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock));
TAOS_CHECK_RETURN(pHandle->fetchfp(pSource->param, &pBlock));
if (pBlock == NULL) {
break;
}
@ -2422,7 +2424,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
sortBufSize = pHandle->numOfPages * pHandle->pageSize;
code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock);
if (code) {
freeSSortSource(source);
freeSSortSource(pSource);
return code;
}
}
@ -2433,47 +2435,45 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
code = blockDataMerge(pHandle->pDataBlock, pBlock);
if (code != TSDB_CODE_SUCCESS) {
freeSSortSource(source);
freeSSortSource(pSource);
return code;
}
size_t size = blockDataGetSize(pHandle->pDataBlock);
if (size > sortBufSize) {
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
int64_t st = taosGetTimestampUs();
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
freeSSortSource(source);
freeSSortSource(pSource);
return code;
}
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
pHandle->sortElapsed += (taosGetTimestampUs() - st);
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
code = doAddToBuf(pHandle->pDataBlock, pHandle);
if (code != TSDB_CODE_SUCCESS) {
freeSSortSource(source);
freeSSortSource(pSource);
return code;
}
}
}
freeSSortSource(source);
freeSSortSource(pSource);
if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) {
size_t size = blockDataGetSize(pHandle->pDataBlock);
// Perform the in-memory sort and then flush data in the buffer into disk.
int64_t p = taosGetTimestampUs();
int64_t st = taosGetTimestampUs();
code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo);
if (code != 0) {
return code;
}
if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows);
int64_t el = taosGetTimestampUs() - p;
pHandle->sortElapsed += el;
pHandle->sortElapsed += (taosGetTimestampUs() - st);
// All sorted data can fit in memory, external memory sort is not needed. Return to directly
if (size <= sortBufSize && pHandle->pBuf == NULL) {
@ -2488,6 +2488,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) {
code = doAddToBuf(pHandle->pDataBlock, pHandle);
}
}
return code;
}