From 288b91a4c3a28fc7c358eb7d09039c630efa70f5 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 22 Jan 2024 10:08:45 +0800 Subject: [PATCH] mid interval retrive --- source/libs/executor/inc/executorInt.h | 1 + .../executor/src/streamtimewindowoperator.c | 39 ++++++++++++++----- 2 files changed, 31 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 72da249f50..e51869e418 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -556,6 +556,7 @@ typedef struct SStreamIntervalOperatorInfo { bool reCkBlock; SSDataBlock* pCheckpointRes; struct SUpdateInfo* pUpdateInfo; + bool recvRetrive; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7f914755a0..93a233212b 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1239,11 +1239,15 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return NULL; } else { if (!IS_FINAL_INTERVAL_OP(pOperator)) { - doBuildDeleteResult(pInfo, pInfo->pDelWins, &pInfo->delIndex, pInfo->pDelRes); - if (pInfo->pDelRes->info.rows != 0) { - // process the rest of the data - printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pDelRes; + SSDataBlock* resBlock = buildIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; + } + + if (pInfo->recvRetrive) { + pInfo->recvRetrive = false; + printDataBlock(pInfo->pPullDataRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pPullDataRes; } } } @@ -1317,9 +1321,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->recvGetAll = true; getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; - } else if (pBlock->info.type == STREAM_RETRIEVE && !IS_FINAL_INTERVAL_OP(pOperator)) { - doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); - if (taosArrayGetSize(pInfo->pUpdated) > 0) { + } else if (pBlock->info.type == STREAM_RETRIEVE) { + if(!IS_FINAL_INTERVAL_OP(pOperator)) { + pInfo->recvRetrive = true; + copyDataBlock(pInfo->pPullDataRes, pBlock); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); break; } continue; @@ -1362,7 +1368,18 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->pUpdated = NULL; blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); - return buildIntervalResult(pOperator); + SSDataBlock* resBlock = buildIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; + } + + if (pInfo->recvRetrive) { + pInfo->recvRetrive = false; + printDataBlock(pInfo->pPullDataRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pPullDataRes; + } + + return NULL; } int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval) { @@ -1532,6 +1549,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + pInfo->recvRetrive = false; pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -4289,6 +4307,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { doStreamIntervalSaveCheckpoint(pOperator); copyDataBlock(pInfo->pCheckpointRes, pBlock); continue; + } else if (pBlock->info.type == STREAM_RETRIEVE) { + doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); + continue; } else { ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); }