From 9800c8d552dca332af04bb51bb84fc72f41328fb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 14 Feb 2022 10:58:58 +0800 Subject: [PATCH] [td-13043]refactor operator of multitable aggregate. --- source/libs/executor/inc/executil.h | 7 ++ source/libs/executor/inc/executorimpl.h | 11 ++- source/libs/executor/src/executorimpl.c | 107 +++++++++++++++--------- 3 files changed, 78 insertions(+), 47 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index cfb225fc51..10d884cb3f 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -126,6 +126,13 @@ static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFile // return ((char *)page->data) + rowOffset + offset * numOfRows; } +static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffset, int32_t offset) { + assert(rowOffset >= 0); + + int32_t numOfRows = 1;//(int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); + return ((char *)page->data) + rowOffset + offset * numOfRows; +} + //bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); //bool notNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index c845d3d5ab..f46e00cd25 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -99,10 +99,7 @@ typedef struct SSingleColumnFilterInfo { typedef struct STableQueryInfo { TSKEY lastKey; int32_t groupIndex; // group id in table list - SVariant tag; -// STimeWindow win; // todo remove it later -// STSCursor cur; -// void* pTable; // for retrieve the page id list +// SVariant tag; SResultRowInfo resInfo; } STableQueryInfo; @@ -442,6 +439,7 @@ typedef struct SOptrBasicInfo { int32_t* rowCellInfoOffset; // offset value for each row result cell info SqlFunctionCtx* pCtx; SSDataBlock* pRes; + uint32_t resRowSize; } SOptrBasicInfo; typedef struct SOptrBasicInfo STableIntervalOperatorInfo; @@ -449,13 +447,14 @@ typedef struct SOptrBasicInfo STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { SOptrBasicInfo binfo; uint32_t seed; - SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + SDiskbasedBuf *pResultBuf; // query result buffer based on blocked-wised disk file SHashObj* pResultRowHashTable; // quick locate the window object for each result SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. - STableQueryInfo* current; + STableQueryInfo *current; + uint32_t groupId; } SAggOperatorInfo; typedef struct SProjectOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d3a819c6db..cba912bc60 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -226,10 +226,9 @@ static void setResultBufSize(STaskAttr* pQueryAttr, SRspResultInfo* pResultInfo) static void setCtxTagForJoin(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, void* pTable); static void setParamForStableStddev(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr); static void setParamForStableStddevByColData(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, SExprInfo* pExpr, char* val, int16_t bytes); -static void doSetTableGroupOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, - SqlFunctionCtx* pCtx, int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId); +static void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo); -SArray* getOrderCheckColumns(STaskAttr* pQuery); + SArray* getOrderCheckColumns(STaskAttr* pQuery); typedef struct SRowCompSupporter { @@ -3642,7 +3641,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo *pTableQueryInfo) { return; } - taosVariantDestroy(&pTableQueryInfo->tag); +// taosVariantDestroy(&pTableQueryInfo->tag); cleanupResultRowInfo(&pTableQueryInfo->resInfo); } @@ -3679,14 +3678,49 @@ void setResultRowOutputBufInitCtx(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pRes } } -void doSetTableGroupOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, SqlFunctionCtx* pCtx, - int32_t* rowCellInfoOffset, int32_t numOfOutput, int32_t tableGroupId) { +void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { + // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group + SFilePage* bufPage = getBufPage(pBuf, pResult->pageId); + + int32_t offset = 0; + for (int32_t i = 0; i < numOfOutput; ++i) { + pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); + + struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; + if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { + offset += pCtx[i].resDataInfo.bytes; + continue; + } + + pCtx[i].pOutput = getPosInResultPage_rv(bufPage, pResult->offset, offset); + offset += pCtx[i].resDataInfo.bytes; + + int32_t functionId = pCtx[i].functionId; + if (functionId < 0) { + continue; + } + + if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { + if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput; + } + + // if (!pResInfo->initialized) { + // aAggs[functionId].init(&pCtx[i], pResInfo); + // } + } +} + +void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, int32_t tableGroupId, SExecTaskInfo* pTaskInfo) { // for simple group by query without interval, all the tables belong to one group result. int64_t uid = 0; int64_t tid = 0; + SResultRowInfo* pResultRowInfo = &pAggInfo->binfo.resultRowInfo; + SqlFunctionCtx* pCtx = pAggInfo->binfo.pCtx; + int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; + SResultRow* pResultRow = - doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid); + doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid, pTaskInfo, false, pAggInfo); assert (pResultRow != NULL); /* @@ -3694,29 +3728,26 @@ void doSetTableGroupOutputBuf(STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pRes * all group belong to one result set, and each group result has different group id so set the id to be one */ if (pResultRow->pageId == -1) { - int32_t ret = addNewWindowResultBuf(pResultRow, pRuntimeEnv->pResultBuf, tableGroupId, pRuntimeEnv->pQueryAttr->resultRowSize); + int32_t ret = addNewWindowResultBuf(pResultRow, pAggInfo->pResultBuf, tableGroupId, pAggInfo->binfo.resRowSize); if (ret != TSDB_CODE_SUCCESS) { return; } } - setResultRowOutputBufInitCtx(pRuntimeEnv, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); + setResultRowOutputBufInitCtx_rv(pAggInfo->pResultBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); } -void setExecutionContext(STaskRuntimeEnv* pRuntimeEnv, SOptrBasicInfo* pInfo, int32_t numOfOutput, int32_t tableGroupId, - TSKEY nextKey) { - STableQueryInfo *pTableQueryInfo = pRuntimeEnv->current; - +void setExecutionContext(int32_t numOfOutput, int32_t tableGroupId, TSKEY nextKey, SExecTaskInfo* pTaskInfo, STableQueryInfo *pTableQueryInfo, SAggOperatorInfo* pAggInfo) { // lastKey needs to be updated pTableQueryInfo->lastKey = nextKey; - if (pRuntimeEnv->prevGroupId != INT32_MIN && pRuntimeEnv->prevGroupId == tableGroupId) { + if (pAggInfo->groupId != INT32_MIN && pAggInfo->groupId == tableGroupId) { return; } - doSetTableGroupOutputBuf(pRuntimeEnv, &pInfo->resultRowInfo, pInfo->pCtx, pInfo->rowCellInfoOffset, numOfOutput, tableGroupId); + doSetTableGroupOutputBuf(pAggInfo, numOfOutput, tableGroupId, pTaskInfo); // record the current active group id - pRuntimeEnv->prevGroupId = tableGroupId; + pAggInfo->groupId = tableGroupId; } void setResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResult, SqlFunctionCtx* pCtx, @@ -4013,14 +4044,12 @@ static void toSDatablock(SGroupResInfo *pGroupResInfo, STaskRuntimeEnv* pRuntime } } -static void updateNumOfRowsInResultRows(STaskRuntimeEnv* pRuntimeEnv, SqlFunctionCtx* pCtx, int32_t numOfOutput, +static void updateNumOfRowsInResultRows(SqlFunctionCtx* pCtx, int32_t numOfOutput, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - // update the number of result for each, only update the number of rows for the corresponding window result. - if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { - return; - } +// if (QUERY_IS_INTERVAL_QUERY(pQueryAttr)) { +// return; +// } for (int32_t i = 0; i < pResultRowInfo->size; ++i) { SResultRow *pResult = pResultRowInfo->pResult[i]; @@ -4031,8 +4060,8 @@ static void updateNumOfRowsInResultRows(STaskRuntimeEnv* pRuntimeEnv, SqlFunctio continue; } -// SResultRowEntryInfo* pCell = getResultCell(pResult, j, rowCellInfoOffset); -// pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes)); + SResultRowEntryInfo* pCell = getResultCell(pResult, j, rowCellInfoOffset); + pResult->numOfRows = (uint16_t)(TMAX(pResult->numOfRows, pCell->numOfRes)); } } } @@ -6163,21 +6192,20 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { SAggOperatorInfo* pAggInfo = pOperator->info; SOptrBasicInfo* pInfo = &pAggInfo->binfo; - - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); - if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pInfo->pRes->info.rows == 0 /*|| !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { pOperator->status = OP_EXEC_DONE; } return pInfo->pRes; } - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t order = pQueryAttr->order.order; + // table scan order + int32_t order = TSDB_ORDER_ASC;//pQueryAttr->order.order; SOperatorInfo* downstream = pOperator->pDownstream[0]; @@ -6191,7 +6219,6 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { } // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); - // if (downstream->operatorType == OP_DataBlocksOptScan) { // STableScanInfo* pScanInfo = downstream->info; // order = getTableScanOrder(pScanInfo); @@ -6201,7 +6228,7 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); TSKEY key = 0; - if (QUERY_IS_ASC_QUERY(pQueryAttr)) { + if (order == TSDB_ORDER_ASC) { key = pBlock->info.window.ekey; TSKEY_MAX_ADD(key, 1); } else { @@ -6209,20 +6236,18 @@ static SSDataBlock* doMultiTableAggregate(void* param, bool* newgroup) { TSKEY_MIN_SUB(key, -1); } - setExecutionContext(pRuntimeEnv, pInfo, pOperator->numOfOutput, pRuntimeEnv->current->groupIndex, key); - doAggregateImpl(pOperator, pQueryAttr->window.skey, pInfo->pCtx, pBlock); + setExecutionContext(pOperator->numOfOutput, pAggInfo->current->groupIndex, key, pTaskInfo, pAggInfo->current, pAggInfo); + doAggregateImpl(pOperator, 0, pInfo->pCtx, pBlock); } pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->resultRowInfo); + updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); - updateNumOfRowsInResultRows(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, - pInfo->rowCellInfoOffset); +// initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo); - - toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); - if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { +// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes); + if (pInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { doSetOperatorCompleted(pOperator); } @@ -6878,7 +6903,7 @@ static SSDataBlock* hashGroupbyAggregate(void* param, bool* newgroup) { if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); } else { - updateNumOfRowsInResultRows(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); + updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); } initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo);