From 5473c22585723ee9a9e9d1417c04de13fc07ee43 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 7 Feb 2024 11:30:13 +0800 Subject: [PATCH] mid pull over --- source/libs/executor/inc/executorInt.h | 2 ++ .../executor/src/streamtimewindowoperator.c | 21 ++++++++++++++++--- 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 46d7588604..a9ee304656 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -559,6 +559,8 @@ typedef struct SStreamIntervalOperatorInfo { struct SUpdateInfo* pUpdateInfo; bool recvRetrive; SSDataBlock* pMidRetriveRes; + bool recvPullover; + SSDataBlock* pMidPulloverRes; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7c9b998c7d..660b181b0e 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -411,6 +411,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { taosArrayDestroy(pInfo->pDelWins); blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pMidRetriveRes); + blockDataDestroy(pInfo->pMidPulloverRes); pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); if (pInfo->pState->dump == 1) { @@ -603,7 +604,7 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB blockDataUpdateTsWindow(pBlock, 0); } -void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, +static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) { SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); TSKEY* tsData = (TSKEY*)pStartCol->pData; @@ -612,6 +613,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; int32_t chId = getChildIndex(pBlock); + bool res = false; for (int32_t i = 0; i < pBlock->info.rows; i++) { TSKEY winTs = tsData[i]; while (winTs <= tsEndData[i]) { @@ -627,6 +629,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S // pull data is over taosArrayDestroy(chArray); taosHashRemove(pMap, &winRes, sizeof(SWinKey)); + res =true; qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts); void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey)); @@ -650,6 +653,7 @@ void processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, S winTs = taosTimeAdd(winTs, pInterval->sliding, pInterval->slidingUnit, pInterval->precision); } } + return res; } static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, int32_t childId) { @@ -1186,6 +1190,12 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { printDataBlock(pInfo->binfo.pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); return pInfo->binfo.pRes; } + + if (pInfo->recvPullover) { + pInfo->recvPullover = false; + printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidPulloverRes; + } return NULL; } @@ -1555,6 +1565,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->recvRetrive = false; pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); + pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -4336,8 +4347,12 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; } else if (pBlock->info.type == STREAM_PULL_OVER) { - processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, - pInfo->numOfChild, pOperator); + pInfo->recvPullover = processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, + pInfo->pPullWins, pInfo->numOfChild, pOperator); + if (pInfo->recvPullover) { + copyDataBlock(pInfo->pMidPulloverRes, pBlock); + break; + } continue; } else if (pBlock->info.type == STREAM_CHECKPOINT) { pAPI->stateStore.streamStateCommit(pInfo->pState);