enh(query): support event window.

This commit is contained in:
Haojun Liao 2023-01-03 17:51:50 +08:00
parent 53c3cbd661
commit 169a6703d1
1 changed files with 16 additions and 5 deletions

View File

@ -172,7 +172,9 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
SExprSupp* pSup = &pOperator->exprSupp;
int32_t order = TSDB_ORDER_ASC;
blockDataCleanup(pInfo->binfo.pRes);
SSDataBlock* pRes = pInfo->binfo.pRes;
blockDataCleanup(pRes);
SOperatorInfo* downstream = pOperator->pDownstream[0];
while (1) {
@ -194,9 +196,12 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
}
eventWindowAggImpl(pOperator, pInfo, pBlock);
if (pRes->info.rows >= pOperator->resultInfo.threshold) {
return pRes;
}
}
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
return pRes->info.rows == 0 ? NULL : pRes;
}
static int32_t setSingleOutputTupleBufv1(SResultRowInfo* pResultRowInfo, STimeWindow* win, SResultRow** pResult,
@ -236,6 +241,7 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SExprSupp* pSup = &pOperator->exprSupp;
SSDataBlock* pRes = pInfo->binfo.pRes;
int64_t gid = pBlock->info.id.groupId;
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pInfo->tsSlotId);
@ -272,13 +278,18 @@ void eventWindowAggImpl(SOperatorInfo* pOperator, SEventWindowOperatorInfo* pInf
if (rowIndex < pBlock->info.rows) {
doEventWindowAggImpl(pInfo, pSup, startIndex, rowIndex, pBlock, tsList, pTaskInfo);
// todo check if pblock buffer is enough
doUpdateNumOfRows(pSup->pCtx, pInfo->pRow, pSup->numOfExprs, pSup->rowEntryInfoOffset);
copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pInfo->binfo.pRes,
// check buffer size
if (pRes->info.rows + pInfo->pRow->numOfRows >= pRes->info.capacity) {
int32_t newSize = pRes->info.rows + pInfo->pRow->numOfRows;
blockDataEnsureCapacity(pRes, newSize);
}
copyResultrowToDataBlock(pSup->pExprInfo, pSup->numOfExprs, pInfo->pRow, pSup->pCtx, pRes,
pSup->rowEntryInfoOffset, pTaskInfo);
pInfo->binfo.pRes->info.rows += pInfo->pRow->numOfRows;
pRes->info.rows += pInfo->pRow->numOfRows;
pInfo->inWindow = false;
rowIndex += 1;