fix(excutor): group agg operator copy from hash table directly
This commit is contained in:
parent
0c4040b48e
commit
1b9be71d55
|
@ -40,7 +40,9 @@
|
||||||
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
|
#define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t))
|
||||||
|
|
||||||
typedef struct SGroupResInfo {
|
typedef struct SGroupResInfo {
|
||||||
int32_t index;
|
int32_t index; // rows consumed in func:doCopyToSDataBlockXX
|
||||||
|
int32_t iter; // relate to index-1, last consumed data's slot id in hash table
|
||||||
|
void* dataPos; // relate to index-1, last consumed data's position, in the nodelist of cur slot
|
||||||
SArray* pRows; // SArray<SResKeyPos>
|
SArray* pRows; // SArray<SResKeyPos>
|
||||||
char* pBuf;
|
char* pBuf;
|
||||||
bool freeItem;
|
bool freeItem;
|
||||||
|
|
|
@ -677,6 +677,12 @@ void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
|
||||||
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
void doBuildResultDatablock(struct SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
SDiskbasedBuf* pBuf);
|
SDiskbasedBuf* pBuf);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief copydata from hash table, instead of copying from SGroupResInfo's pRow
|
||||||
|
*/
|
||||||
|
int32_t doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
|
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold, bool ignoreGroup);
|
||||||
|
|
||||||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||||
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
|
bool hasSlimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||||
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
||||||
|
|
|
@ -655,6 +655,85 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t doCopyToSDataBlockByHash(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
|
SGroupResInfo* pGroupResInfo, SSHashObj* pHashmap, int32_t threshold,
|
||||||
|
bool ignoreGroup) {
|
||||||
|
SExprInfo* pExprInfo = pSup->pExprInfo;
|
||||||
|
int32_t numOfExprs = pSup->numOfExprs;
|
||||||
|
int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
|
||||||
|
SqlFunctionCtx* pCtx = pSup->pCtx;
|
||||||
|
|
||||||
|
size_t keyLen = 0;
|
||||||
|
int32_t numOfRows = tSimpleHashGetSize(pHashmap);
|
||||||
|
|
||||||
|
// begin from last iter
|
||||||
|
void* pData = pGroupResInfo->dataPos;
|
||||||
|
int32_t iter = pGroupResInfo->iter;
|
||||||
|
while ((pData = tSimpleHashIterate(pHashmap, pData, &iter)) != NULL) {
|
||||||
|
void* key = tSimpleHashGetKey(pData, &keyLen);
|
||||||
|
SResultRowPosition* pos = pData;
|
||||||
|
uint64_t groupId = *(uint64_t*)key;
|
||||||
|
|
||||||
|
SFilePage* page = getBufPage(pBuf, pos->pageId);
|
||||||
|
if (page == NULL) {
|
||||||
|
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||||
|
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||||
|
}
|
||||||
|
|
||||||
|
SResultRow* pRow = (SResultRow*)((char*)page + pos->offset);
|
||||||
|
|
||||||
|
doUpdateNumOfRows(pCtx, pRow, numOfExprs, rowEntryOffset);
|
||||||
|
|
||||||
|
// no results, continue to check the next one
|
||||||
|
if (pRow->numOfRows == 0) {
|
||||||
|
pGroupResInfo->index += 1;
|
||||||
|
pGroupResInfo->iter = iter;
|
||||||
|
pGroupResInfo->dataPos = pData;
|
||||||
|
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!ignoreGroup) {
|
||||||
|
if (pBlock->info.id.groupId == 0) {
|
||||||
|
pBlock->info.id.groupId = groupId;
|
||||||
|
} else {
|
||||||
|
// current value belongs to different group, it can't be packed into one datablock
|
||||||
|
if (pBlock->info.id.groupId != groupId) {
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||||
|
uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - iter) > 1 ? 1 : 0);
|
||||||
|
blockDataEnsureCapacity(pBlock, newSize);
|
||||||
|
qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
|
||||||
|
pBlock->info.capacity, GET_TASKID(pTaskInfo));
|
||||||
|
// todo set the pOperator->resultInfo size
|
||||||
|
}
|
||||||
|
|
||||||
|
pGroupResInfo->index += 1;
|
||||||
|
pGroupResInfo->iter = iter;
|
||||||
|
pGroupResInfo->dataPos = pData;
|
||||||
|
|
||||||
|
copyResultrowToDataBlock(pExprInfo, numOfExprs, pRow, pCtx, pBlock, rowEntryOffset, pTaskInfo);
|
||||||
|
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
|
if (pBlock->info.rows >= threshold) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
|
||||||
|
pBlock->info.id.groupId);
|
||||||
|
pBlock->info.dataLoad = 1;
|
||||||
|
blockDataUpdateTsWindow(pBlock, 0);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
|
||||||
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) {
|
SGroupResInfo* pGroupResInfo, int32_t threshold, bool ignoreGroup) {
|
||||||
SExprInfo* pExprInfo = pSup->pExprInfo;
|
SExprInfo* pExprInfo = pSup->pExprInfo;
|
||||||
|
|
|
@ -370,6 +370,69 @@ static SSDataBlock* buildGroupResultDataBlock(SOperatorInfo* pOperator) {
|
||||||
return (pRes->info.rows == 0) ? NULL : pRes;
|
return (pRes->info.rows == 0) ? NULL : pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool hasRemainResultByHash(SOperatorInfo* pOperator) {
|
||||||
|
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SSHashObj* pHashmap = pInfo->aggSup.pResultRowHashTable;
|
||||||
|
return pInfo->groupResInfo.index < tSimpleHashGetSize(pHashmap);
|
||||||
|
}
|
||||||
|
|
||||||
|
void doBuildResultDatablockByHash(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
|
SDiskbasedBuf* pBuf) {
|
||||||
|
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SSHashObj* pHashmap = pInfo->aggSup.pResultRowHashTable;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
SSDataBlock* pBlock = pInfo->binfo.pRes;
|
||||||
|
|
||||||
|
// set output datablock version
|
||||||
|
pBlock->info.version = pTaskInfo->version;
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
if (!hasRemainResultByHash(pOperator)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pBlock->info.id.groupId = 0;
|
||||||
|
if (!pInfo->binfo.mergeResultBlock) {
|
||||||
|
doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo,
|
||||||
|
pHashmap, pOperator->resultInfo.threshold, false);
|
||||||
|
} else {
|
||||||
|
while (hasRemainResultByHash(pOperator)) {
|
||||||
|
doCopyToSDataBlockByHash(pTaskInfo, pBlock, &pOperator->exprSupp, pInfo->aggSup.pResultBuf, &pInfo->groupResInfo,
|
||||||
|
pHashmap, pOperator->resultInfo.threshold, true);
|
||||||
|
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pBlock->info.id.groupId = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// clear the group id info in SSDataBlock, since the client does not need it
|
||||||
|
pBlock->info.id.groupId = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* buildGroupResultDataBlockByHash(SOperatorInfo* pOperator) {
|
||||||
|
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
|
|
||||||
|
// after filter, if result block turn to null, get next from whole set
|
||||||
|
while (1) {
|
||||||
|
doBuildResultDatablockByHash(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
|
|
||||||
|
doFilter(pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
|
if (!hasRemainResultByHash(pOperator)) {
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
if (pRes->info.rows > 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->resultInfo.totalRows += pRes->info.rows;
|
||||||
|
return (pRes->info.rows == 0) ? NULL : pRes;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -379,8 +442,9 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
SGroupbyOperatorInfo* pInfo = pOperator->info;
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
return buildGroupResultDataBlock(pOperator);
|
return buildGroupResultDataBlockByHash(pOperator);
|
||||||
}
|
}
|
||||||
|
SGroupResInfo* pGroupResInfo = &pInfo->groupResInfo;
|
||||||
|
|
||||||
int32_t order = pInfo->binfo.inputTsOrder;
|
int32_t order = pInfo->binfo.inputTsOrder;
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
|
@ -425,10 +489,20 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
|
// initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, 0);
|
||||||
|
if (pGroupResInfo->pRows != NULL) {
|
||||||
|
taosArrayDestroy(pGroupResInfo->pRows);
|
||||||
|
}
|
||||||
|
if (pGroupResInfo->pBuf) {
|
||||||
|
taosMemoryFree(pGroupResInfo->pBuf);
|
||||||
|
pGroupResInfo->pBuf = NULL;
|
||||||
|
}
|
||||||
|
pGroupResInfo->index = 0;
|
||||||
|
pGroupResInfo->iter = 0;
|
||||||
|
pGroupResInfo->dataPos = NULL;
|
||||||
|
|
||||||
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
pOperator->cost.openCost = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
return buildGroupResultDataBlock(pOperator);
|
return buildGroupResultDataBlockByHash(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) {
|
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo) {
|
||||||
|
|
Loading…
Reference in New Issue