diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index bb494cacfa..bd67c4821d 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -363,17 +363,18 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); - SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); - if (ps == NULL) { + SSortSource* pSource = taosMemoryCalloc(1, sizeof(SSortSource)); + if (pSource == NULL) { + qInfo("alloc:%p", pSource); return terrno; } - ps->param = pOperator->pDownstream[0]; - ps->onlyRef = true; + pSource->param = pOperator->pDownstream[0]; + pSource->onlyRef = true; - code = tsortAddSource(pInfo->pSortHandle, ps); + code = tsortAddSource(pInfo->pSortHandle, pSource); if (code) { - taosMemoryFree(ps); + taosMemoryFree(pSource); return code; } @@ -400,7 +401,7 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); + return code; } // multi-group case not handle here @@ -408,7 +409,7 @@ int32_t doSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { while (1) { if (tsortIsClosed(pInfo->pSortHandle)) { code = TSDB_CODE_TSC_QUERY_CANCELLED; - T_LONG_JMP(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } code = getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, @@ -648,23 +649,22 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) { SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource)); SGroupSortSourceParam* param = taosMemoryCalloc(1, sizeof(SGroupSortSourceParam)); if (ps == NULL || param == NULL) { - T_LONG_JMP(pTaskInfo->env, terrno); + taosMemoryFree(ps); + taosMemoryFree(param); + return terrno; } param->childOpInfo = pOperator->pDownstream[0]; param->grpSortOpInfo = pInfo; + ps->param = param; ps->onlyRef = false; code = tsortAddSource(pInfo->pCurrSortHandle, ps); - if (code) { - T_LONG_JMP(pTaskInfo->env, code); + if (code != 0) { + return code; } code = tsortOpen(pInfo->pCurrSortHandle); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - return code; } @@ -696,7 +696,7 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { int32_t code = pOperator->fpSet._openFn(pOperator); if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); + return code; } if (!pInfo->hasGroupId) { @@ -720,15 +720,14 @@ int32_t doGroupSort(SOperatorInfo* pOperator, SSDataBlock** pResBlock) { while (pInfo->pCurrSortHandle != NULL) { if (tsortIsClosed(pInfo->pCurrSortHandle)) { code = TSDB_CODE_TSC_QUERY_CANCELLED; - T_LONG_JMP(pOperator->pTaskInfo->env, code); + T_LONG_JMP(pTaskInfo->env, code); } // beginSortGroup would fetch all child blocks of pInfo->currGroupId; if (pInfo->childOpStatus == CHILD_OP_SAME_GROUP) { - code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; - pOperator->pTaskInfo->code = code; + pTaskInfo->code = code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); - T_LONG_JMP(pOperator->pTaskInfo->env, code); + return code; } code = getGroupSortedBlockData(pInfo->pCurrSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 2792e6a197..6d9f0f1497 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -367,7 +367,7 @@ static int32_t sortComparCleanup(SMsortComparParam* cmpParam) { return TSDB_CODE_SUCCESS; } -void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) { +void tsortClearOrderedSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *fetchNum) { for (size_t i = 0; i < taosArrayGetSize(pOrderedSource); i++) { SSortSource** pSource = taosArrayGet(pOrderedSource, i); if (NULL == *pSource) { @@ -394,6 +394,7 @@ void tsortClearOrderdSource(SArray* pOrderedSource, int64_t *fetchUs, int64_t *f (*pSource)->src.pBlock = NULL; } + qInfo("---free:%p", *pSource); taosMemoryFreeClear(*pSource); } @@ -413,10 +414,11 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { destroyDiskbasedBuf(pSortHandle->pBuf); taosMemoryFreeClear(pSortHandle->idStr); blockDataDestroy(pSortHandle->pDataBlock); + if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue); int64_t fetchUs = 0, fetchNum = 0; - tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum); + tsortClearOrderedSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum); qDebug("all source fetch time: %" PRId64 "us num:%" PRId64 " %s", fetchUs, fetchNum, pSortHandle->idStr); taosArrayDestroy(pSortHandle->pOrderedSource); @@ -1083,12 +1085,12 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { code = doAddNewExternalMemSource(pHandle->pBuf, pResList, pBlock, &pHandle->sourceId, pPageIdList); if (code != TSDB_CODE_SUCCESS) { - taosArrayDestroy(pResList); + taosArrayDestroy(pResList); // this may cause memory leak. return code; } } - tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL); void* px = taosArrayAddAll(pHandle->pOrderedSource, pResList); if (px == NULL) { return TSDB_CODE_OUT_OF_MEMORY; @@ -2346,7 +2348,7 @@ static int32_t createBlocksMergeSortInitialSources(SSortHandle* pHandle) { } } - tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL); if (!tsortIsClosed(pHandle)) { void* px = taosArrayAddAll(pHandle->pOrderedSource, aExtSrc); QUERY_CHECK_NULL(px, code, lino, _err, terrno); @@ -2396,19 +2398,19 @@ static void freeSSortSource(SSortSource* source) { static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { int32_t code = 0; size_t sortBufSize = pHandle->numOfPages * pHandle->pageSize; - SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); - if (pSource == NULL) { + SSortSource** p = taosArrayGet(pHandle->pOrderedSource, 0); + if (p == NULL) { return terrno; } - SSortSource* source = *pSource; - *pSource = NULL; + SSortSource* pSource = *p; - tsortClearOrderdSource(pHandle->pOrderedSource, NULL, NULL); + taosArrayRemove(pHandle->pOrderedSource, 0); + tsortClearOrderedSource(pHandle->pOrderedSource, NULL, NULL); while (1) { SSDataBlock* pBlock = NULL; - TAOS_CHECK_RETURN(pHandle->fetchfp(source->param, &pBlock)); + TAOS_CHECK_RETURN(pHandle->fetchfp(pSource->param, &pBlock)); if (pBlock == NULL) { break; } @@ -2422,7 +2424,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { sortBufSize = pHandle->numOfPages * pHandle->pageSize; code = createOneDataBlock(pBlock, false, &pHandle->pDataBlock); if (code) { - freeSSortSource(source); + freeSSortSource(pSource); return code; } } @@ -2433,47 +2435,45 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { code = blockDataMerge(pHandle->pDataBlock, pBlock); if (code != TSDB_CODE_SUCCESS) { - freeSSortSource(source); + freeSSortSource(pSource); return code; } size_t size = blockDataGetSize(pHandle->pDataBlock); if (size > sortBufSize) { // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); + int64_t st = taosGetTimestampUs(); code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); if (code != 0) { - freeSSortSource(source); + freeSSortSource(pSource); return code; } - int64_t el = taosGetTimestampUs() - p; - pHandle->sortElapsed += el; + pHandle->sortElapsed += (taosGetTimestampUs() - st); + if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows); code = doAddToBuf(pHandle->pDataBlock, pHandle); if (code != TSDB_CODE_SUCCESS) { - freeSSortSource(source); + freeSSortSource(pSource); return code; } } } - freeSSortSource(source); + freeSSortSource(pSource); if (pHandle->pDataBlock != NULL && pHandle->pDataBlock->info.rows > 0) { size_t size = blockDataGetSize(pHandle->pDataBlock); // Perform the in-memory sort and then flush data in the buffer into disk. - int64_t p = taosGetTimestampUs(); - + int64_t st = taosGetTimestampUs(); code = blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); if (code != 0) { return code; } if (pHandle->pqMaxRows > 0) blockDataKeepFirstNRows(pHandle->pDataBlock, pHandle->pqMaxRows); - int64_t el = taosGetTimestampUs() - p; - pHandle->sortElapsed += el; + pHandle->sortElapsed += (taosGetTimestampUs() - st); // All sorted data can fit in memory, external memory sort is not needed. Return to directly if (size <= sortBufSize && pHandle->pBuf == NULL) { @@ -2488,6 +2488,7 @@ static int32_t createBlocksQuickSortInitialSources(SSortHandle* pHandle) { code = doAddToBuf(pHandle->pDataBlock, pHandle); } } + return code; }