diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index edc21626f5..63c398618f 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -42,11 +42,8 @@ #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) typedef struct SGroupResInfo { - int32_t totalGroup; - int32_t currentGroup; int32_t index; - SArray* pRows; // SArray - bool ordered; + SArray* pRows; // SArray int32_t position; } SGroupResInfo; @@ -80,18 +77,12 @@ typedef struct SResultRowInfo { SResultRowPosition cur; } SResultRowInfo; -struct STaskAttr; -struct STaskRuntimeEnv; -struct SUdfInfo; struct SqlFunctionCtx; -int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr); - size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size); void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo); -void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo); int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo); void closeAllResultRows(SResultRowInfo* pResultRowInfo); @@ -99,9 +90,7 @@ void initResultRow(SResultRow *pResultRow); void closeResultRow(SResultRow* pResultRow); bool isResultRowClosed(SResultRow* pResultRow); -struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset); - -int32_t getRowNumForMultioutput(struct STaskAttr* pQueryAttr, bool topBottomQuery, bool stable); +struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset); static FORCE_INLINE SResultRow *getResultRow(SDiskbasedBuf* pBuf, SResultRowInfo *pResultRowInfo, int32_t slot) { ASSERT(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size); @@ -118,39 +107,13 @@ static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo return pRow; } -static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFilePage* page, int32_t rowOffset, - int32_t offset) { - assert(rowOffset >= 0 && pQueryAttr != NULL); - - // int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); - return ((char *)page->data); - -} - -static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffset, int32_t offset) { - assert(rowOffset >= 0); - - int32_t numOfRows = 1;//(int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); - return (char*) page + rowOffset + offset * numOfRows; -} - -typedef struct { - SArray* pResult; // SArray - int32_t colId; -} SStddevInterResult; - void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order); void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo); -bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo); -bool hasRemainData(SGroupResInfo* pGroupResInfo); +bool hashRemainDataInGroupInfo(SGroupResInfo* pGroupResInfo); bool incNextGroup(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); -int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct STaskRuntimeEnv *pRuntimeEnv, int32_t* offset); - -//int32_t initUdfInfo(struct SUdfInfo* pUdfInfo); - #endif // TDENGINE_QUERYUTIL_H diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 78f3266d74..0dacbba8e5 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -666,7 +666,6 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows); void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); -void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order); int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes, diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 14cff488ec..5a02547f58 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -114,12 +114,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) { assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size); for (int32_t i = 0; i < pResultRowInfo->size; ++i) { -// SResultRow* pRow = pResultRowInfo->pResult[i]; -// if (pRow->closed) { -// continue; -// } - -// pRow->closed = true; } } @@ -144,11 +138,11 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) { for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) { struct SResultRowEntryInfo *pEntryInfo = NULL;//pResultRow->pEntryInfo[i]; - int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resSchema.bytes; - char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset); - memset(s, 0, size); +// int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resSchema.bytes; +// char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset); +// memset(s, 0, size); - offset += size; +// offset += size; cleanupResultRowEntry(pEntryInfo); } } @@ -161,7 +155,7 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) { } // TODO refactor: use macro -SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset) { +SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset) { assert(index >= 0 && offset != NULL); return (SResultRowEntryInfo*)((char*) pRow->pEntryInfo + offset[index]); } @@ -247,7 +241,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } -bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo) { +bool hashRemainDataInGroupInfo(SGroupResInfo* pGroupResInfo) { if (pGroupResInfo->pRows == NULL) { return false; } @@ -255,18 +249,6 @@ bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo) { return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows); } -bool hasRemainData(SGroupResInfo* pGroupResInfo) { - if (hasRemainDataInCurrentGroup(pGroupResInfo)) { - return true; - } - - return pGroupResInfo->currentGroup < pGroupResInfo->totalGroup; -} - -bool incNextGroup(SGroupResInfo* pGroupResInfo) { - return (++pGroupResInfo->currentGroup) < pGroupResInfo->totalGroup; -} - int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) { assert(pGroupResInfo != NULL); if (pGroupResInfo->pRows == 0) { @@ -387,11 +369,6 @@ void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) { } static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) { - if (!pGroupResInfo->ordered) { - orderTheResultRows(pRuntimeEnv); - pGroupResInfo->ordered = true; - } - if (pGroupResInfo->pRows == NULL) { pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES); } @@ -403,6 +380,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe break; } + int64_t num = getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset); if (num <= 0) { continue; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index ccffe77514..599664ae6d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -156,7 +156,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, void operatorDummyCloseFn(void* param, int32_t numOfCols) {} static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs); + const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs); static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size); static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo); @@ -182,10 +182,11 @@ static int compareRowData(const void* a, const void* b, const void* userData) { SFilePage* page2 = getBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId); int16_t offset = supporter->dataOffset; - char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset); - char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset); + return 0; +// char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset); +// char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset); - return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0; +// return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0; } // setup the output buffer for each operator @@ -333,6 +334,8 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, return NULL; } + setBufPageDirty(pData, true); + // set the number of rows in current disk page SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num); pResultRow->pageId = pageId; @@ -364,12 +367,14 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR if (isIntervalQuery) { if (masterscan && p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists. pResult = getResultRowByPos(pResultBuf, p1); + ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); } } else { // In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the // pResultRowInfo object. if (p1 != NULL) { pResult = getResultRowByPos(pResultBuf, p1); + ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset); } } @@ -1417,21 +1422,6 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) { // } //} -static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) { - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t MIN_ROWS_PER_PAGE = 4; - - *rowsize = (int32_t)(pQueryAttr->resultRowSize * - getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery)); - int32_t overhead = sizeof(SFilePage); - - // one page contains at least two rows - *ps = DEFAULT_INTERN_BUF_PAGE_SIZE; - while (((*rowsize) * MIN_ROWS_PER_PAGE) > (*ps) - overhead) { - *ps = ((*ps) << 1u); - } -} - // static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis, // SqlFunctionCtx *pCtx, int32_t numOfRows) { // STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; @@ -1708,36 +1698,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc return TSDB_CODE_SUCCESS; } -void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) { - pBlock->info.rows = 0; - - int32_t code = TSDB_CODE_SUCCESS; - while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) { - // all results in current group have been returned to client, try next group - if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) { - assert(pGroupResInfo->index == 0); - // if ((code = mergeIntoGroupResult(&pGroupResInfo, pRuntimeEnv, offset)) != TSDB_CODE_SUCCESS) { - return; - // } - } - - // doCopyToSDataBlock(pResBuf, pGroupResInfo, TSDB_ORDER_ASC, pBlock, ); - - // current data are all dumped to result buffer, clear it - if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { - cleanupGroupResInfo(pGroupResInfo); - if (!incNextGroup(pGroupResInfo)) { - break; - } - } - - // enough results in data buffer, return - // if (pBlock->info.rows >= threshold) { - // break; - // } - } -} - static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) { if (pTableQueryInfo == NULL) { return; @@ -1884,35 +1844,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) { } } -// todo merged with the build group result. -void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, - int32_t* rowCellInfoOffset) { - for (int32_t i = 0; i < pResultRowInfo->size; ++i) { - SResultRowPosition* pPos = &pResultRowInfo->pPosition[i]; - - SFilePage* bufPage = getBufPage(pBuf, pPos->pageId); - SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset); - - // TODO ignore the close status anyway. - // if (!isResultRowClosed(pRow)) { - // continue; - // } - - for (int32_t j = 0; j < numOfOutput; ++j) { - struct SResultRowEntryInfo* pResInfo = getResultCell(pRow, j, rowCellInfoOffset); - if (!isRowEntryInitialized(pResInfo)) { - continue; - } - - if (pRow->numOfRows < pResInfo->numOfRes) { - pRow->numOfRows = pResInfo->numOfRes; - } - } - - releaseBufPage(pBuf, bufPage); - } -} - STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win) { STableQueryInfo* pTableQueryInfo = buf; pTableQueryInfo->lastKey = win.skey; @@ -2062,20 +1993,34 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p pAggInfo->groupId = groupId; } +static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) { + for (int32_t j = 0; j < numOfExprs; ++j) { + struct SResultRowEntryInfo* pResInfo = getResultCell(pRow, j, rowCellOffset); + if (!isRowEntryInitialized(pResInfo)) { + continue; + } + + if (pRow->numOfRows < pResInfo->numOfRes) { + pRow->numOfRows = pResInfo->numOfRes; + } + } +} + int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo, - int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { + const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t start = pGroupResInfo->index; - // qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv)); - for (int32_t i = start; i < numOfRows; i += 1) { SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i); SFilePage* page = getBufPage(pBuf, pPos->pos.pageId); SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset); + + doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset); if (pRow->numOfRows == 0) { pGroupResInfo->index += 1; + releaseBufPage(pBuf, page); continue; } @@ -2084,11 +2029,13 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.groupId != pPos->groupId) { + releaseBufPage(pBuf, page); break; } } if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { + releaseBufPage(pBuf, page); break; } @@ -2130,8 +2077,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI } void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { - assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); - SExprInfo* pExprInfo = pOperator->pExpr; int32_t numOfExprs = pOperator->numOfExprs; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2141,7 +2086,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG SqlFunctionCtx* pCtx = pbInfo->pCtx; blockDataCleanup(pBlock); - if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { + if (!hashRemainDataInGroupInfo(pGroupResInfo)) { return; } @@ -3627,9 +3572,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { } closeAllResultRows(&pAggInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, &pAggInfo->binfo.resultRowInfo, - pAggInfo->binfo.rowCellInfoOffset); - initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0); OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; @@ -3651,7 +3593,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf); - if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { + if (pInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index b1e19213dd..16dcf30f39 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -269,7 +269,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + if (pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } return (pRes->info.rows == 0)? NULL:pRes; @@ -303,9 +303,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - - finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, - pInfo->binfo.rowCellInfoOffset); // if (!stableQuery) { // finalize include the update of result rows // finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs); // } else { @@ -320,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); doFilter(pInfo->pCondition, pRes, NULL); - bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo); + bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo); if (!hasRemain) { pOperator->status = OP_EXEC_DONE; break; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3be131e550..c3a447c802 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -815,9 +815,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { } closeAllResultRows(&pInfo->binfo.resultRowInfo); - finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo, - pInfo->binfo.rowCellInfoOffset); - initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order); OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; @@ -933,7 +930,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; } @@ -960,13 +957,11 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, - pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -994,7 +989,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity); doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + if (pBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -1082,7 +1077,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; @@ -1413,7 +1408,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); return NULL; } @@ -1440,13 +1435,11 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); - finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo, - pBInfo->rowCellInfoOffset); initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC); blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity); doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -1461,7 +1454,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { STimeSliceOperatorInfo* pSliceInfo = pOperator->info; if (pOperator->status == OP_RES_TO_RETURN) { // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); - if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { + if (pSliceInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); } @@ -1493,7 +1486,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { // initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo); // doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes); - if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) { + if (pSliceInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } @@ -1695,7 +1688,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return NULL; } else if (pOperator->status == OP_RES_TO_RETURN) { doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { + if (pInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; } return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes; diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 020df6cc3d..ad92d095d5 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -543,7 +543,7 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { } bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { - pEnv->calcMemSize = sizeof(double); + pEnv->calcMemSize = sizeof(SAvgRes); return true; } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 1ad12ef43a..00f1233707 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -187,9 +187,6 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { } } else { // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet. size = pg->length; - if (size == -1) { - printf("----\n"); - } } ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));