diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c0a2060907..7a5a7d1afc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -268,7 +268,7 @@ bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds -int tsStreamAggCnt = 1000; +int tsStreamAggCnt = 100000; char tsS3Endpoint[TSDB_FQDN_LEN] = ""; char tsS3AccessKey[TSDB_FQDN_LEN] = ""; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 98e96bb250..5263546765 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -602,6 +602,8 @@ typedef struct SStreamIntervalOperatorInfo { bool recvPullover; SSDataBlock* pMidPulloverRes; bool clearState; + SArray* pMidPullDatas; + int32_t midDelIndex; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 4b3b7a2e31..51071e2b4a 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -221,7 +221,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, - SSHashObj* pUpdatedMap) { + SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; @@ -255,10 +255,15 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); if (chIds) { int32_t childId = getChildIndex(pBlock); + if (pInvalidWins) { + qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId); + taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0); + } + SArray* chArray = *(void**)chIds; int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ); if (index != -1) { - qDebug("===stream===try push delete window%" PRId64 "chId:%d ,continue", win.skey, childId); + qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, winGpId, childId); getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC); continue; } @@ -413,6 +418,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pMidRetriveRes); blockDataDestroy(pInfo->pMidPulloverRes); pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + taosArrayDestroy(pInfo->pMidPullDatas); if (pInfo->pState->dump == 1) { taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner); @@ -642,9 +648,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request - if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { + qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh); + if (IS_MID_INTERVAL_OP(pOperator)) { + SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; + taosArrayPush(pInfo->pMidPullDatas, &winRes); + } else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { addPullWindow(pMap, &winRes, numOfCh); - qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh); } } } @@ -1191,11 +1200,6 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } - if (pInfo->recvPullover) { - pInfo->recvPullover = false; - printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidPulloverRes; - } return NULL; } @@ -1293,7 +1297,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL); if (IS_FINAL_INTERVAL_OP(pOperator)) { int32_t chId = getChildIndex(pBlock); addRetriveWindow(delWins, pInfo, chId); @@ -1329,7 +1333,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->recvRetrive = true; copyDataBlock(pInfo->pMidRetriveRes, pBlock); pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE; - doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap, NULL); break; } continue; @@ -1557,8 +1561,10 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->recvRetrive = false; pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); + pInfo->recvPullover = false; pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->clearState = false; + pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -3966,7 +3972,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { - doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap, NULL); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { pInfo->recvGetAll = true; @@ -4259,6 +4265,34 @@ static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t num } } +static SSDataBlock* buildMidIntervalResult(SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + uint16_t opType = pOperator->operatorType; + + if (pInfo->recvPullover) { + pInfo->recvPullover = false; + printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidPulloverRes; + } + + qDebug("===stream=== build mid interval result"); + doBuildDeleteResult(pInfo, pInfo->pMidPullDatas, &pInfo->midDelIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + return pInfo->pDelRes; + } + + if (pInfo->recvRetrive) { + pInfo->recvRetrive = false; + printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidRetriveRes; + } + + return NULL; +} + static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4287,10 +4321,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { return resBlock; } - if (pInfo->recvRetrive) { - pInfo->recvRetrive = false; - printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidRetriveRes; + resBlock = buildMidIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; } if (pInfo->clearState) { @@ -4330,7 +4363,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, pInfo->pFinalPullDataMap); removeResults(delWins, pInfo->pUpdatedMap); taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayDestroy(delWins); @@ -4366,7 +4399,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_MID_RETRIEVE) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL); addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild); taosArrayDestroy(delWins); pInfo->recvRetrive = true; @@ -4411,10 +4444,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { return resBlock; } - if (pInfo->recvRetrive) { - pInfo->recvRetrive = false; - printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidRetriveRes; + resBlock = buildMidIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; } if (pInfo->clearState) { diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index be89d8e235..9a05e645c2 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -274,7 +274,7 @@ sql insert into ts4 values(1648791213001,1,12,3,1.0); $loop_count = 0 -loop3: +loop4: $loop_count = $loop_count + 1 if $loop_count == 20 then @@ -291,7 +291,7 @@ if $rows != 1 then print $data00 $data01 $data02 print $data10 $data11 $data12 print $data20 $data21 $data22 - goto loop3 + goto loop4 endi print 2 select * from streamt5; @@ -302,7 +302,7 @@ if $rows != 1 then print $data00 $data01 $data02 print $data10 $data11 $data12 print $data20 $data21 $data22 - goto loop3 + goto loop4 endi print 3 select * from streamt3;