adj operator result

This commit is contained in:
54liuyao 2024-07-26 20:48:06 +08:00
parent 31dc0f62e8
commit 607f210821
6 changed files with 35 additions and 12 deletions

View File

@ -153,8 +153,8 @@ static FORCE_INLINE SResultRow* getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
return pRow; return pRow;
} }
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order); int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order);
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
bool hasRemainResults(SGroupResInfo* pGroupResInfo); bool hasRemainResults(SGroupResInfo* pGroupResInfo);

View File

@ -241,7 +241,8 @@ static bool nextGroupedResult(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
} }
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); code = initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
QUERY_CHECK_CODE(code, lino, _end);
_end: _end:
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -302,7 +303,7 @@ _end:
static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
int32_t code = getAggregateResultNext(pOperator, &pRes); int32_t code = getAggregateResultNext(pOperator, &pRes);
return pRes; return pRes;
} }

View File

@ -128,7 +128,9 @@ int32_t resultrowComparAsc(const void* p1, const void* p2) {
static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); } static int32_t resultrowComparDesc(const void* p1, const void* p2) { return resultrowComparAsc(p2, p1); }
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) { int32_t initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t order) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t lino = 0;
if (pGroupResInfo->pRows != NULL) { if (pGroupResInfo->pRows != NULL) {
taosArrayDestroy(pGroupResInfo->pRows); taosArrayDestroy(pGroupResInfo->pRows);
} }
@ -154,6 +156,7 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
} }
pGroupResInfo->pBuf = taosMemoryMalloc(bufLen); pGroupResInfo->pBuf = taosMemoryMalloc(bufLen);
QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
iter = 0; iter = 0;
while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) { while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
@ -164,7 +167,8 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
p->groupId = *(uint64_t*)key; p->groupId = *(uint64_t*)key;
p->pos = *(SResultRowPosition*)pData; p->pos = *(SResultRowPosition*)pData;
memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t)); memcpy(p->key, (char*)key + sizeof(uint64_t), keyLen - sizeof(uint64_t));
taosArrayPush(pGroupResInfo->pRows, &p); void* tmp = taosArrayPush(pGroupResInfo->pRows, &p);
QUERY_CHECK_NULL(pGroupResInfo->pBuf, code, lino, _end, terrno);
offset += keyLen + sizeof(struct SResultRowPosition); offset += keyLen + sizeof(struct SResultRowPosition);
} }
@ -176,6 +180,12 @@ void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, in
} }
pGroupResInfo->index = 0; pGroupResInfo->index = 0;
_end:
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
}
return code;
} }
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) { void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {

View File

@ -5399,7 +5399,12 @@ SSDataBlock* getSortedTableMergeScanBlockData(SSortHandle* pHandle, SSDataBlock*
break; break;
} }
tsortAppendTupleToBlock(pInfo->pSortHandle, pResBlock, pTupleHandle); code = tsortAppendTupleToBlock(pInfo->pSortHandle, pResBlock, pTupleHandle);
if (code != TSDB_CODE_SUCCESS) {
qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
T_LONG_JMP(pOperator->pTaskInfo->env, terrno);
}
if (pResBlock->info.rows >= capacity) { if (pResBlock->info.rows >= capacity) {
break; break;
} }

View File

@ -334,7 +334,8 @@ static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp
int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
if (pkey->isNull == false) { if (pkey->isNull == false) {
colDataSetVal(pDst, rows, pkey->pData, false); code = colDataSetVal(pDst, rows, pkey->pData, false);
QUERY_CHECK_CODE(code, lino, _end);
} else { } else {
colDataSetNULL(pDst, rows); colDataSetNULL(pDst, rows);
} }

View File

@ -903,7 +903,9 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break; if (hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, scanFlag)) break;
} }
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder); code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->binfo.outputTsOrder);
QUERY_CHECK_CODE(code, lino, _end);
OPTR_SET_OPENED(pOperator); OPTR_SET_OPENED(pOperator);
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
@ -1042,7 +1044,9 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
} }
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
QUERY_CHECK_CODE(code, lino, _end);
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
_end: _end:
@ -1512,7 +1516,9 @@ static int32_t doSessionWindowAggNext(SOperatorInfo* pOperator, SSDataBlock** pp
// restore the value // restore the value
pOperator->status = OP_RES_TO_RETURN; pOperator->status = OP_RES_TO_RETURN;
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); code = initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
QUERY_CHECK_CODE(code, lino, _end);
code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); code = blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
QUERY_CHECK_CODE(code, lino, _end); QUERY_CHECK_CODE(code, lino, _end);
while (1) { while (1) {
@ -2272,7 +2278,7 @@ _end:
static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock* pRes = NULL; SSDataBlock* pRes = NULL;
int32_t code = doMergeIntervalAggNext(pOperator, &pRes); int32_t code = doMergeIntervalAggNext(pOperator, &pRes);
return pRes; return pRes;
} }