From 0afed5263e846ec7ae8087330ae640fb4fea7d81 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 31 Jan 2024 18:09:13 +0800 Subject: [PATCH] add mid operator retrive --- include/common/tcommon.h | 1 + source/dnode/mnode/impl/src/mndScheduler.c | 1 + source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/operator.c | 2 +- .../executor/src/streamtimewindowoperator.c | 84 ++++++++++++++----- 5 files changed, 66 insertions(+), 23 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 24e5d186b9..addb02cd7d 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -171,6 +171,7 @@ typedef enum EStreamType { STREAM_CHECKPOINT, STREAM_CREATE_CHILD_TABLE, STREAM_TRANS_STATE, + STREAM_MID_RETRIEVE, } EStreamType; #pragma pack(push, 1) diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index a4721b8a11..9ec879ec59 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -559,6 +559,7 @@ static void bindTwoLevel(SArray* tasks, int32_t begin, int32_t end) { end = end > taosArrayGetSize(pUpTaskList) ? taosArrayGetSize(pUpTaskList): end; for(int i = begin; i < end; i++){ SStreamTask* pUpTask = taosArrayGetP(pUpTaskList, i); + pUpTask->info.selfChildId = i - begin; streamTaskSetFixedDownstreamInfo(pUpTask, *pDownTask); streamTaskSetUpstreamInfo(*pDownTask, pUpTask); } diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index e51869e418..097b8d0136 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -557,6 +557,7 @@ typedef struct SStreamIntervalOperatorInfo { SSDataBlock* pCheckpointRes; struct SUpdateInfo* pUpdateInfo; bool recvRetrive; + SSDataBlock* pMidRetriveRes; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 0c6671c742..fd4b3cd7db 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -490,7 +490,7 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR int32_t children = 0; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL == type) { - int32_t children = 0; + int32_t children = pHandle->numOfVgroups; pOptr = createStreamFinalIntervalOperatorInfo(ops[0], pPhyNode, pTaskInfo, children, pHandle); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL == type) { int32_t children = pHandle->numOfVgroups; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 93a233212b..99b1f5bd6f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -410,6 +410,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pPullDataRes); taosArrayDestroy(pInfo->pDelWins); blockDataDestroy(pInfo->pDelRes); + blockDataDestroy(pInfo->pMidRetriveRes); pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); if (pInfo->pState->dump == 1) { @@ -1246,8 +1247,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (pInfo->recvRetrive) { pInfo->recvRetrive = false; - printDataBlock(pInfo->pPullDataRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pPullDataRes; + printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidRetriveRes; } } } @@ -1324,7 +1325,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_RETRIEVE) { if(!IS_FINAL_INTERVAL_OP(pOperator)) { pInfo->recvRetrive = true; - copyDataBlock(pInfo->pPullDataRes, pBlock); + copyDataBlock(pInfo->pMidRetriveRes, pBlock); + pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE; doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); break; } @@ -1340,6 +1342,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { doStreamIntervalSaveCheckpoint(pOperator); copyDataBlock(pInfo->pCheckpointRes, pBlock); continue; + } else if (IS_FINAL_INTERVAL_OP(pOperator) && pBlock->info.type == STREAM_MID_RETRIEVE) { + continue; } else { ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -1375,8 +1379,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { if (pInfo->recvRetrive) { pInfo->recvRetrive = false; - printDataBlock(pInfo->pPullDataRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pPullDataRes; + printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidRetriveRes; } return NULL; @@ -1550,6 +1554,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->recvGetAll = false; pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->recvRetrive = false; + pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -4142,16 +4147,15 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); - SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SExprSupp* pSup = &pOperator->exprSupp; - int32_t numOfOutput = pSup->numOfExprs; - int32_t step = 1; - SRowBuffPos* pResPos = NULL; - SResultRow* pResult = NULL; - int32_t forwardRows = 1; - uint64_t groupId = pSDataBlock->info.id.groupId; - + SResultRowInfo* pResultRowInfo = &(pInfo->binfo.resultRowInfo); + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; + int32_t numOfOutput = pSup->numOfExprs; + int32_t step = 1; + SRowBuffPos* pResPos = NULL; + SResultRow* pResult = NULL; + int32_t forwardRows = 1; + uint64_t groupId = pSDataBlock->info.id.groupId; SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); TSKEY* tsCol = (int64_t*)pColDataInfo->pData; @@ -4160,6 +4164,27 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS STimeWindow nextWin = getFinalTimeWindow(ts, &pInfo->interval); while (1) { + SWinKey key = { + .ts = nextWin.skey, + .groupId = groupId, + }; + void* chIds = taosHashGet(pInfo->pPullDataMap, &key, sizeof(SWinKey)); + int32_t index = -1; + SArray* chArray = NULL; + int32_t chId = 0; + if (chIds) { + chArray = *(void**)chIds; + chId = getChildIndex(pSDataBlock); + index = taosArraySearchIdx(chArray, &chId, compareInt32Val, TD_EQ); + } + if (!(index == -1 || pSDataBlock->info.type == STREAM_PULL_DATA)) { + startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCol, startPos); + if (startPos < 0) { + break; + } + continue; + } + if (!inSlidingWindow(&pInfo->interval, &nextWin, &pSDataBlock->info)) { startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCol, startPos, nextWin.ekey, &nextWin); if (startPos < 0) { @@ -4175,10 +4200,6 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } - SWinKey key = { - .ts = pResult->win.skey, - .groupId = groupId, - }; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveWinResult(&key, pResPos, pUpdatedMap); } @@ -4215,6 +4236,18 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS } } +static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t numOfChild) { + int32_t size = taosArrayGetSize(wins); + for (int32_t i = 0; i < size; i++) { + SWinKey* winKey = taosArrayGet(wins, i); + void* chIds = taosHashGet(pMidPullMap, winKey, sizeof(SWinKey)); + if (!chIds) { + addPullWindow(pMidPullMap, winKey, numOfChild); + qDebug("===stream===prepare mid operator retrive for delete %" PRId64 ", size:%d", winKey->ts, numOfChild); + } + } +} + static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4300,15 +4333,22 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } break; - } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_PULL_OVER) { + } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { + return pBlock; + } else if (pBlock->info.type == STREAM_PULL_OVER) { + processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, + pInfo->numOfChild, pOperator); return pBlock; } else if (pBlock->info.type == STREAM_CHECKPOINT) { pAPI->stateStore.streamStateCommit(pInfo->pState); doStreamIntervalSaveCheckpoint(pOperator); copyDataBlock(pInfo->pCheckpointRes, pBlock); continue; - } else if (pBlock->info.type == STREAM_RETRIEVE) { - doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); + } else if (pBlock->info.type == STREAM_MID_RETRIEVE) { + SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild); + taosArrayDestroy(delWins); continue; } else { ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type");