From 1b9be71d55a7f0fed8d372cad3e5b4244f6abcba Mon Sep 17 00:00:00 2001 From: Bob Liu Date: Mon, 6 Nov 2023 19:35:48 +0800 Subject: [PATCH] fix(excutor): group agg operator copy from hash table directly --- source/libs/executor/inc/executil.h | 6 +- source/libs/executor/inc/executorInt.h | 6 ++ source/libs/executor/src/executorInt.c | 79 +++++++++++++++++++++++ source/libs/executor/src/groupoperator.c | 82 ++++++++++++++++++++++-- 4 files changed, 167 insertions(+), 6 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 740ff7b0dc..e6b190f82f 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -40,8 +40,10 @@ #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) typedef struct SGroupResInfo { - int32_t index; - SArray* pRows; // SArray + int32_t index; // rows consumed in func:doCopyToSDataBlockXX + int32_t iter; // relate to index-1, last consumed data's slot id in hash table + void* dataPos; // relate to index-1, last consumed data's position, in the nodelist of cur slot + SArray* pRows; // SArray char* pBuf; bool freeItem; } SGroupResInfo; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 288919d709..8666efd437 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -677,6 +677,12 @@ void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); +/** + * @brief copydata from hash table, instead of copying from SGroupResInfo's pRow + */ +int32_t doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup); + bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo); void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 8ad174f366..138da16324 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -655,6 +655,85 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos return 0; } +int32_t doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, + SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, + bool ignoreGroup) { + SExprInfo* pExprInfo = pSup->pExprInfo; + int32_t numOfExprs = pSup->numOfExprs; + int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; + SqlFunctionCtx* pCtx = pSup->pCtx; + + size_t keyLen = 0; + int32_t numOfRows = tSimpleHashGetSize(pHashmap); + + // begin from last iter + void* pData = pGroupResInfo->dataPos; + int32_t iter = pGroupResInfo->iter; + while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) { + void* key = tSimpleHashGetKey(pData, &keyLen); + SResultRowPosition* pos = pData; + uint64_t groupId = *(uint64_t*)key; + + SFilePage* page = getBufPage(pBuf, pos->pageId); + if (page == NULL) { + qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo)); + T_LONG_JMP(pTaskInfo->env, terrno); + } + + SResultRow* pRow = (SResultRow*)((char*)page + pos->offset); + + doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset); + + // no results, continue to check the next one + if (pRow->numOfRows == 0) { + pGroupResInfo->index += 1; + pGroupResInfo->iter = iter; + pGroupResInfo->dataPos = pData; + + releaseBufPage(pBuf, page); + continue; + } + + if (!ignoreGroup) { + if (pBlock->info.id.groupId == 0) { + pBlock->info.id.groupId = groupId; + } else { + // current value belongs to different group, it can't be packed into one datablock + if (pBlock->info.id.groupId != groupId) { + releaseBufPage(pBuf, page); + break; + } + } + } + + if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { + uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - iter) > 1 ? 1 : 0); + blockDataEnsureCapacity(pBlock, newSize); + qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize, + pBlock->info.capacity, GET_TASKID(pTaskInfo)); + // todo set the pOperator->resultInfo size + } + + pGroupResInfo->index += 1; + pGroupResInfo->iter = iter; + pGroupResInfo->dataPos = pData; + + copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo); + + releaseBufPage(pBuf, page); + pBlock->info.rows += pRow->numOfRows; + if (pBlock->info.rows >= threshold) { + break; + } + } + + qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, + pBlock->info.id.groupId); + pBlock->info.dataLoad = 1; + blockDataUpdateTsWindow(pBlock, 0); + return 0; +} + int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) { SExprInfo* pExprInfo = pSup->pExprInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 467a49b37a..df910ae4f4 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -370,6 +370,69 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) { return (pRes->info.rows == 0) ? NULL : pRes; } +bool hasRemainResultByHash(SOperatorInfo* pOperator) { + SGroupbyOperatorInfo* pInfo = pOperator->info; + SSHashObj* pHashmap = pInfo->aggSup.pResultRowHashTable; + return pInfo->groupResInfo.index < tSimpleHashGetSize(pHashmap); +} + +void doBuildResultDatablockByHash(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, + SDiskbasedBuf* pBuf) { + SGroupbyOperatorInfo* pInfo = pOperator->info; + SSHashObj* pHashmap = pInfo->aggSup.pResultRowHashTable; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + SSDataBlock* pBlock = pInfo->binfo.pRes; + + // set output datablock version + pBlock->info.version = pTaskInfo->version; + + blockDataCleanup(pBlock); + if (!hasRemainResultByHash(pOperator)) { + return; + } + + pBlock->info.id.groupId = 0; + if (!pInfo->binfo.mergeResultBlock) { + doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo, + pHashmap, pOperator->resultInfo.threshold, false); + } else { + while (hasRemainResultByHash(pOperator)) { + doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo, + pHashmap, pOperator->resultInfo.threshold, true); + if (pBlock->info.rows >= pOperator->resultInfo.threshold) { + break; + } + pBlock->info.id.groupId = 0; + } + + // clear the group id info in SSDataBlock, since the client does not need it + pBlock->info.id.groupId = 0; + } +} + +static SSDataBlock* buildGroupResultDataBlockByHash(SOperatorInfo* pOperator) { + SGroupbyOperatorInfo* pInfo = pOperator->info; + SSDataBlock* pRes = pInfo->binfo.pRes; + + // after filter, if result block turn to null, get next from whole set + while (1) { + doBuildResultDatablockByHash(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); + + doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL); + if (!hasRemainResultByHash(pOperator)) { + setOperatorCompleted(pOperator); + break; + } + if (pRes->info.rows > 0) { + break; + } + } + + pOperator->resultInfo.totalRows += pRes->info.rows; + return (pRes->info.rows == 0) ? NULL : pRes; +} + static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -379,9 +442,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { SGroupbyOperatorInfo* pInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { - return buildGroupResultDataBlock(pOperator); + return buildGroupResultDataBlockByHash(pOperator); } - + SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo; + int32_t order = pInfo->binfo.inputTsOrder; int64_t st = taosGetTimestampUs(); SOperatorInfo* downstream = pOperator->pDownstream[0]; @@ -425,10 +489,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { } } #endif - initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); + // initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0); + if (pGroupResInfo->pRows != NULL) { + taosArrayDestroy(pGroupResInfo->pRows); + } + if (pGroupResInfo->pBuf) { + taosMemoryFree(pGroupResInfo->pBuf); + pGroupResInfo->pBuf = NULL; + } + pGroupResInfo->index = 0; + pGroupResInfo->iter = 0; + pGroupResInfo->dataPos = NULL; pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0; - return buildGroupResultDataBlock(pOperator); + return buildGroupResultDataBlockByHash(pOperator); } SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) {