From 607f21082156d220f7f2d684b9e3224e56c6f98b Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 26 Jul 2024 20:48:06 +0800 Subject: [PATCH] adj operator result --- source/libs/executor/inc/executil.h | 4 ++-- source/libs/executor/src/aggregateoperator.c | 5 +++-- source/libs/executor/src/executil.c | 14 ++++++++++++-- source/libs/executor/src/scanoperator.c | 7 ++++++- source/libs/executor/src/timesliceoperator.c | 3 ++- source/libs/executor/src/timewindowoperator.c | 14 ++++++++++---- 6 files changed, 35 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index fbb61c7c90..36d81382f5 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -153,8 +153,8 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo return pRow; } -void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); -void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); +int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); +void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); bool hasRemainResults(SGroupResInfo* pGroupResInfo); diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index f23f01c80f..24da11eb06 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -241,7 +241,8 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); + code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); + QUERY_CHECK_CODE(code, lino, _end); _end: if (code != TSDB_CODE_SUCCESS) { @@ -302,7 +303,7 @@ _end: static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = getAggregateResultNext(pOperator, &pRes); + int32_t code = getAggregateResultNext(pOperator, &pRes); return pRes; } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 7dd1fc1f4e..a8e7d0e03f 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -128,7 +128,9 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) { static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); } -void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) { +int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pGroupResInfo->pRows != NULL) { taosArrayDestroy(pGroupResInfo->pRows); } @@ -154,6 +156,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in } pGroupResInfo->pBuf = taosMemoryMalloc(bufLen); + QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno); iter = 0; while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) { @@ -164,7 +167,8 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in p->groupId = *(uint64_t*)key; p->pos = *(SResultRowPosition*)pData; memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t)); - taosArrayPush(pGroupResInfo->pRows, &p); + void* tmp = taosArrayPush(pGroupResInfo->pRows, &p); + QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno); offset += keyLen + sizeof(struct SResultRowPosition); } @@ -176,6 +180,12 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in } pGroupResInfo->index = 0; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e1db1efb59..0c6bdbe5c1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -5399,7 +5399,12 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock* break; } - tsortAppendTupleToBlock(pInfo->pSortHandle, pResBlock, pTupleHandle); + code = tsortAppendTupleToBlock(pInfo->pSortHandle, pResBlock, pTupleHandle); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + T_LONG_JMP(pOperator->pTaskInfo->env, terrno); + } + if (pResBlock->info.rows >= capacity) { break; } diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 5fbc8007e3..35ae59a555 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -334,7 +334,8 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); if (pkey->isNull == false) { - colDataSetVal(pDst, rows, pkey->pData, false); + code = colDataSetVal(pDst, rows, pkey->pData, false); + QUERY_CHECK_CODE(code, lino, _end); } else { colDataSetNULL(pDst, rows); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8265d270b2..14d1908e55 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -903,7 +903,9 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break; } - initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder); + code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder); + QUERY_CHECK_CODE(code, lino, _end); + OPTR_SET_OPENED(pOperator); pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; @@ -1042,7 +1044,9 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) { } pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; - initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); + code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); + QUERY_CHECK_CODE(code, lino, _end); + pOperator->status = OP_RES_TO_RETURN; _end: @@ -1512,7 +1516,9 @@ static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** pp // restore the value pOperator->status = OP_RES_TO_RETURN; - initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); + code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); + QUERY_CHECK_CODE(code, lino, _end); + code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); QUERY_CHECK_CODE(code, lino, _end); while (1) { @@ -2272,7 +2278,7 @@ _end: static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { SSDataBlock* pRes = NULL; - int32_t code = doMergeIntervalAggNext(pOperator, &pRes); + int32_t code = doMergeIntervalAggNext(pOperator, &pRes); return pRes; }