fix(query): reset buffer after group results generated.

This commit is contained in:
Haojun Liao 2022-09-14 11:44:39 +08:00
parent 0bb1dc9d1f
commit 00908a649c
2 changed files with 11 additions and 22 deletions

View File

@ -1709,22 +1709,6 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
static void doDestroyTableList(STableListInfo* pTableqinfoList);
static void doTableQueryInfoTimeWindowCheck(SExecTaskInfo* pTaskInfo, STableQueryInfo* pTableQueryInfo, int32_t order) {
#if 0
if (order == TSDB_ORDER_ASC) {
assert(
(pTableQueryInfo->win.skey <= pTableQueryInfo->win.ekey) &&
(pTableQueryInfo->lastKey >= pTaskInfo->window.skey) &&
(pTableQueryInfo->win.skey >= pTaskInfo->window.skey && pTableQueryInfo->win.ekey <= pTaskInfo->window.ekey));
} else {
assert(
(pTableQueryInfo->win.skey >= pTableQueryInfo->win.ekey) &&
(pTableQueryInfo->lastKey <= pTaskInfo->window.skey) &&
(pTableQueryInfo->win.skey <= pTaskInfo->window.skey && pTableQueryInfo->win.ekey >= pTaskInfo->window.ekey));
}
#endif
}
typedef struct SFetchRspHandleWrapper {
uint32_t exchangeId;
int32_t sourceIndex;

View File

@ -4959,6 +4959,12 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
pBlock->info.rows, pSup->numOfExprs);
}
static void cleanupAfterGroupResultGen(SMergeAlignedIntervalAggOperatorInfo* pMiaInfo, SSDataBlock* pRes) {
pRes->info.groupId = pMiaInfo->groupId;
pMiaInfo->curTs = INT64_MIN;
pMiaInfo->groupId = 0;
}
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -4987,8 +4993,8 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
// close last unclosed time window
if (pMiaInfo->curTs != INT64_MIN) {
finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
pMiaInfo->curTs = INT64_MIN;
pRes->info.groupId = pMiaInfo->groupId;
resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
cleanupAfterGroupResultGen(pMiaInfo, pRes);
}
doSetOperatorCompleted(pOperator);
@ -5004,11 +5010,10 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
// if there are unclosed time window, close it firstly.
ASSERT(pMiaInfo->curTs != INT64_MIN);
finalizeResultRows(pIaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pRes, pTaskInfo);
pMiaInfo->prefetchedBlock = pBlock;
resetResultRow(pMiaInfo->pResultRow, pIaInfo->aggSup.resultRowSize - sizeof(SResultRow));
pRes->info.groupId = pMiaInfo->groupId;
pMiaInfo->curTs = INT64_MIN;
pMiaInfo->groupId = 0;
pMiaInfo->prefetchedBlock = pBlock;
cleanupAfterGroupResultGen(pMiaInfo, pRes);
break;
} else {
// continue