From 53a4294e4c2d0d60ba59cb6f0557d813cfa31e31 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 21 Feb 2024 14:48:16 +0800 Subject: [PATCH 1/5] mid agg operator --- source/libs/executor/inc/executorInt.h | 2 + .../executor/src/streamtimewindowoperator.c | 75 +++++++++++++------ tests/script/tsim/stream/pauseAndResume.sim | 6 +- 3 files changed, 58 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index a280fd6e9b..43ca8a3c38 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -600,6 +600,8 @@ typedef struct SStreamIntervalOperatorInfo { bool recvPullover; SSDataBlock* pMidPulloverRes; bool clearState; + SArray* pMidPullDatas; + int32_t midDelIndex; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2838d005ab..69c4e04897 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -221,7 +221,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, - SSHashObj* pUpdatedMap) { + SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; @@ -255,10 +255,15 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); if (chIds) { int32_t childId = getChildIndex(pBlock); + if (pInvalidWins) { + qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId); + taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0); + } + SArray* chArray = *(void**)chIds; int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ); if (index != -1) { - qDebug("===stream===try push delete window%" PRId64 "chId:%d ,continue", win.skey, childId); + qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, winGpId, childId); getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC); continue; } @@ -413,6 +418,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pMidRetriveRes); blockDataDestroy(pInfo->pMidPulloverRes); pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + taosArrayDestroy(pInfo->pMidPullDatas); if (pInfo->pState->dump == 1) { taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner); @@ -642,9 +648,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request - if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { + qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh); + if (IS_MID_INTERVAL_OP(pOperator)) { + SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; + taosArrayPush(pInfo->pMidPullDatas, &winRes); + } else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { addPullWindow(pMap, &winRes, numOfCh); - qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh); } } } @@ -1191,11 +1200,6 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } - if (pInfo->recvPullover) { - pInfo->recvPullover = false; - printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidPulloverRes; - } return NULL; } @@ -1301,7 +1305,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL); if (IS_FINAL_INTERVAL_OP(pOperator)) { int32_t chId = getChildIndex(pBlock); addRetriveWindow(delWins, pInfo, chId); @@ -1337,7 +1341,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->recvRetrive = true; copyDataBlock(pInfo->pMidRetriveRes, pBlock); pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE; - doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap, NULL); break; } continue; @@ -1567,6 +1571,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->clearState = false; + pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -3973,7 +3978,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { - doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap, NULL); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { pInfo->recvGetAll = true; @@ -4266,6 +4271,34 @@ static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t num } } +static SSDataBlock* buildMidIntervalResult(SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + uint16_t opType = pOperator->operatorType; + + if (pInfo->recvPullover) { + pInfo->recvPullover = false; + printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidPulloverRes; + } + + qDebug("===stream=== build mid interval result"); + doBuildDeleteResult(pInfo, pInfo->pMidPullDatas, &pInfo->midDelIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + return pInfo->pDelRes; + } + + if (pInfo->recvRetrive) { + pInfo->recvRetrive = false; + printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidRetriveRes; + } + + return NULL; +} + static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4294,10 +4327,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { return resBlock; } - if (pInfo->recvRetrive) { - pInfo->recvRetrive = false; - printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidRetriveRes; + resBlock = buildMidIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; } if (pInfo->clearState) { @@ -4345,7 +4377,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, pInfo->pFinalPullDataMap); removeResults(delWins, pInfo->pUpdatedMap); taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayDestroy(delWins); @@ -4381,7 +4413,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_MID_RETRIEVE) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL); addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild); taosArrayDestroy(delWins); pInfo->recvRetrive = true; @@ -4426,10 +4458,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { return resBlock; } - if (pInfo->recvRetrive) { - pInfo->recvRetrive = false; - printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidRetriveRes; + resBlock = buildMidIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; } if (pInfo->clearState) { diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index 5eb9eef010..7c62eaed81 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -259,7 +259,7 @@ sql insert into ts4 values(1648791213001,1,12,3,1.0); $loop_count = 0 -loop3: +loop4: $loop_count = $loop_count + 1 if $loop_count == 20 then @@ -276,7 +276,7 @@ if $rows != 1 then print $data00 $data01 $data02 print $data10 $data11 $data12 print $data20 $data21 $data22 - goto loop3 + goto loop4 endi print 2 select * from streamt5; @@ -287,7 +287,7 @@ if $rows != 1 then print $data00 $data01 $data02 print $data10 $data11 $data12 print $data20 $data21 $data22 - goto loop3 + goto loop4 endi print 3 select * from streamt3; From aabab3b4f762d53b2617fecd168f511383c857f2 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 21 Feb 2024 19:12:34 +0800 Subject: [PATCH 2/5] init mid op --- source/libs/executor/src/streamtimewindowoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 69c4e04897..bac1f9de05 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1569,6 +1569,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->recvRetrive = false; pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); + pInfo->recvPullover = false; pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->clearState = false; pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); From 3d3f8ced235038349442fca74032756659aafe89 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 28 Feb 2024 10:24:50 +0800 Subject: [PATCH 3/5] fix:open mult agg logic for test --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ee85a909e7..8b19c01010 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -265,7 +265,7 @@ bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds -int tsStreamAggCnt = 1000; +int tsStreamAggCnt = 2; char tsS3Endpoint[TSDB_FQDN_LEN] = ""; char tsS3AccessKey[TSDB_FQDN_LEN] = ""; From e009e5ab8c613e486cf226c9b9c6fcedc38fadcc Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 1 Mar 2024 15:58:24 +0800 Subject: [PATCH 4/5] fix:modify agg count to default 10 --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 18331e6fe4..bd6ea8d7fb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -268,7 +268,7 @@ bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds -int tsStreamAggCnt = 2; +int tsStreamAggCnt = 10; bool tsDisableCount = true; char tsS3Endpoint[TSDB_FQDN_LEN] = ""; From 01a7052a6f156632d10e7c30629cff52c67cb6ed Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 13 Mar 2024 22:06:35 +0800 Subject: [PATCH 5/5] fix:set tsStreamAggCnt big enough to disable multi agg operator --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 271fa20c54..33ff3e1248 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -268,7 +268,7 @@ bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds -int tsStreamAggCnt = 10; +int tsStreamAggCnt = 100000; char tsS3Endpoint[TSDB_FQDN_LEN] = ""; char tsS3AccessKey[TSDB_FQDN_LEN] = "";