refactor: merge data belongs to different data group.

This commit is contained in:
Haojun Liao 2022-08-02 13:58:54 +08:00
parent a62a7cc41f
commit 781c494c2b
3 changed files with 37 additions and 44 deletions

View File

@ -519,6 +519,7 @@ typedef struct SBlockDistInfo {
typedef struct SOptrBasicInfo { typedef struct SOptrBasicInfo {
SResultRowInfo resultRowInfo; SResultRowInfo resultRowInfo;
SSDataBlock* pRes; SSDataBlock* pRes;
bool mergeResultBlock;
} SOptrBasicInfo; } SOptrBasicInfo;
typedef struct SIntervalAggOperatorInfo { typedef struct SIntervalAggOperatorInfo {
@ -593,7 +594,6 @@ typedef struct SAggOperatorInfo {
uint64_t groupId; uint64_t groupId;
SGroupResInfo groupResInfo; SGroupResInfo groupResInfo;
SExprSupp scalarExprSup; SExprSupp scalarExprSup;
SNode *pCondition; SNode *pCondition;
} SAggOperatorInfo; } SAggOperatorInfo;
@ -891,7 +891,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode *pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo); int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode *pNode, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo);

View File

@ -136,9 +136,8 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
void operatorDummyCloseFn(void* param, int32_t numOfCols) {} void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SGroupResInfo* pGroupResInfo);
SqlFunctionCtx* pCtx, int32_t numOfExprs);
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId); static void doSetTableGroupOutputBuf(SOperatorInfo* pOperator, int32_t numOfOutput, uint64_t groupId);
@ -1501,19 +1500,24 @@ int32_t finalizeResultRowIntoResultDataBlock(SDiskbasedBuf* pBuf, SResultRowPosi
return 0; return 0;
} }
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf,
SGroupResInfo* pGroupResInfo, const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, SGroupResInfo* pGroupResInfo) {
int32_t numOfExprs) { SExprInfo* pExprInfo = pSup->pExprInfo;
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfExprs = pSup->numOfExprs;
int32_t start = pGroupResInfo->index; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset;
SqlFunctionCtx* pCtx = pSup->pCtx;
for (int32_t i = start; i < numOfRows; i += 1) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
for (int32_t i = pGroupResInfo->index; i < numOfRows; i += 1) {
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset); SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset); doUpdateNumOfRows(pRow, numOfExprs, rowEntryOffset);
// no results, continue to check the next one
if (pRow->numOfRows == 0) { if (pRow->numOfRows == 0) {
pGroupResInfo->index += 1; pGroupResInfo->index += 1;
releaseBufPage(pBuf, page); releaseBufPage(pBuf, page);
@ -1531,6 +1535,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
} }
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
releaseBufPage(pBuf, page); releaseBufPage(pBuf, page);
break; break;
} }
@ -1540,7 +1545,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
for (int32_t j = 0; j < numOfExprs; ++j) { for (int32_t j = 0; j < numOfExprs; ++j) {
int32_t slotId = pExprInfo[j].base.resSchema.slotId; int32_t slotId = pExprInfo[j].base.resSchema.slotId;
pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowCellOffset); pCtx[j].resultInfo = getResultEntryInfo(pRow, j, rowEntryOffset);
if (pCtx[j].fpSet.finalize) { if (pCtx[j].fpSet.finalize) {
#ifdef BUF_PAGE_DEBUG #ifdef BUF_PAGE_DEBUG
qDebug("\npage_finalize %d", numOfExprs); qDebug("\npage_finalize %d", numOfExprs);
@ -1573,26 +1578,19 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
releaseBufPage(pBuf, page); releaseBufPage(pBuf, page);
pBlock->info.rows += pRow->numOfRows; pBlock->info.rows += pRow->numOfRows;
// if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
// break;
// }
} }
qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, qDebug("%s result generated, rows:%d, groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows,
pBlock->info.groupId); pBlock->info.groupId);
blockDataUpdateTsWindow(pBlock, 0); blockDataUpdateTsWindow(pBlock, 0);
return 0; return 0;
} }
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
SDiskbasedBuf* pBuf) { SDiskbasedBuf* pBuf) {
SExprInfo* pExprInfo = pOperator->exprSupp.pExprInfo;
int32_t numOfExprs = pOperator->exprSupp.numOfExprs;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SSDataBlock* pBlock = pbInfo->pRes;
int32_t* rowCellOffset = pOperator->exprSupp.rowEntryInfoOffset;
SSDataBlock* pBlock = pbInfo->pRes;
SqlFunctionCtx* pCtx = pOperator->exprSupp.pCtx;
// set output datablock version // set output datablock version
pBlock->info.version = pTaskInfo->version; pBlock->info.version = pTaskInfo->version;
@ -1604,30 +1602,22 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// clear the existed group id // clear the existed group id
pBlock->info.groupId = 0; pBlock->info.groupId = 0;
doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs); if (!pbInfo->mergeResultBlock) {
} doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
} else {
static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, while(hasRemainResults(pGroupResInfo)) {
int32_t* rowEntryInfoOffset) { doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
// update the number of result for each, only update the number of rows for the corresponding window result. if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { break;
// return;
// }
#if 0
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
SResultRow* pResult = pResultRowInfo->pResult[i];
for (int32_t j = 0; j < numOfOutput; ++j) {
int32_t functionId = pCtx[j].functionId;
if (functionId == FUNCTION_TS || functionId == FUNCTION_TAG || functionId == FUNCTION_TAGPRJ) {
continue;
} }
SResultRowEntryInfo* pCell = getResultEntryInfo(pResult, j, rowEntryInfoOffset); // clearing group id to continue to merge data that belong to different groups
pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes)); pBlock->info.groupId = 0;
} }
// clear the group id info in SSDataBlock, since the client does not need it
pBlock->info.groupId = 0;
} }
#endif
} }
static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) { static int32_t compressQueryColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, int8_t compressed) {
@ -2979,6 +2969,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
break; break;
} }
} }
size_t rows = blockDataGetNumOfRows(pInfo->pRes); size_t rows = blockDataGetNumOfRows(pInfo->pRes);
pOperator->resultInfo.totalRows += rows; pOperator->resultInfo.totalRows += rows;
@ -3483,7 +3474,7 @@ void cleanupExprSupp(SExprSupp* pSupp) {
SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo, SSDataBlock* pResultBlock, SNode* pCondition, SExprInfo* pScalarExprInfo,
int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo) { int32_t numOfScalarExpr, bool mergeResult, SExecTaskInfo* pTaskInfo) {
SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) { if (pInfo == NULL || pOperator == NULL) {
@ -3505,6 +3496,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo*
goto _error; goto _error;
} }
pInfo->binfo.mergeResultBlock = mergeResult;
pInfo->groupId = UINT64_MAX; pInfo->groupId = UINT64_MAX;
pInfo->pCondition = pCondition; pInfo->pCondition = pCondition;
pOperator->name = "TableAggregate"; pOperator->name = "TableAggregate";
@ -4098,7 +4090,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
pScalarExprInfo, numOfScalarExpr, pTaskInfo); pScalarExprInfo, numOfScalarExpr, pTaskInfo);
} else { } else {
pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions, pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pAggNode->node.pConditions,
pScalarExprInfo, numOfScalarExpr, pTaskInfo); pScalarExprInfo, numOfScalarExpr, pAggNode->mergeDataBlock, pTaskInfo);
} }
} else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL == type || QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL == type) {
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;

View File

@ -1798,6 +1798,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
pInfo->twAggSup = *pTwAggSupp; pInfo->twAggSup = *pTwAggSupp;
pInfo->ignoreExpiredData = pPhyNode->window.igExpired; pInfo->ignoreExpiredData = pPhyNode->window.igExpired;
pInfo->pCondition = pPhyNode->window.node.pConditions; pInfo->pCondition = pPhyNode->window.node.pConditions;
pInfo->binfo.mergeResultBlock = pPhyNode->window.mergeDataBlock;
if (pPhyNode->window.pExprs != NULL) { if (pPhyNode->window.pExprs != NULL) {
int32_t numOfScalar = 0; int32_t numOfScalar = 0;