diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index cd1177794e..677fddc85c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1709,22 +1709,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t static void doDestroyTableList(STableListInfo* pTableqinfoList); -static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) { -#if 0 - if (order == TSDB_ORDER_ASC) { - assert( - (pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) && - (pTableQueryInfo->lastKey >= pTaskInfo->window.skey) && - (pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey)); - } else { - assert( - (pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) && - (pTableQueryInfo->lastKey <= pTaskInfo->window.skey) && - (pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey)); - } -#endif -} - typedef struct SFetchRspHandleWrapper { uint32_t exchangeId; int32_t sourceIndex; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c24e04eab1..c6dca79e8d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -4959,6 +4959,12 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR pBlock->info.rows, pSup->numOfExprs); } +static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) { + pRes->info.groupId = pMiaInfo->groupId; + pMiaInfo->curTs = INT64_MIN; + pMiaInfo->groupId = 0; +} + static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4987,8 +4993,8 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { // close last unclosed time window if (pMiaInfo->curTs != INT64_MIN) { finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); - pMiaInfo->curTs = INT64_MIN; - pRes->info.groupId = pMiaInfo->groupId; + resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow)); + cleanupAfterGroupResultGen(pMiaInfo, pRes); } doSetOperatorCompleted(pOperator); @@ -5004,11 +5010,10 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { // if there are unclosed time window, close it firstly. ASSERT(pMiaInfo->curTs != INT64_MIN); finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo); - pMiaInfo->prefetchedBlock = pBlock; + resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow)); - pRes->info.groupId = pMiaInfo->groupId; - pMiaInfo->curTs = INT64_MIN; - pMiaInfo->groupId = 0; + pMiaInfo->prefetchedBlock = pBlock; + cleanupAfterGroupResultGen(pMiaInfo, pRes); break; } else { // continue