From 53a4294e4c2d0d60ba59cb6f0557d813cfa31e31 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 21 Feb 2024 14:48:16 +0800 Subject: [PATCH 01/24] mid agg operator --- source/libs/executor/inc/executorInt.h | 2 + .../executor/src/streamtimewindowoperator.c | 75 +++++++++++++------ tests/script/tsim/stream/pauseAndResume.sim | 6 +- 3 files changed, 58 insertions(+), 25 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index a280fd6e9b..43ca8a3c38 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -600,6 +600,8 @@ typedef struct SStreamIntervalOperatorInfo { bool recvPullover; SSDataBlock* pMidPulloverRes; bool clearState; + SArray* pMidPullDatas; + int32_t midDelIndex; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 2838d005ab..69c4e04897 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -221,7 +221,7 @@ static bool doDeleteWindow(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId) static int32_t getChildIndex(SSDataBlock* pBlock) { return pBlock->info.childId; } static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDataBlock* pBlock, SArray* pUpWins, - SSHashObj* pUpdatedMap) { + SSHashObj* pUpdatedMap, SHashObj* pInvalidWins) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); TSKEY* startTsCols = (TSKEY*)pStartTsCol->pData; @@ -255,10 +255,15 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); if (chIds) { int32_t childId = getChildIndex(pBlock); + if (pInvalidWins) { + qDebug("===stream===save mid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId); + taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0); + } + SArray* chArray = *(void**)chIds; int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ); if (index != -1) { - qDebug("===stream===try push delete window%" PRId64 "chId:%d ,continue", win.skey, childId); + qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, winGpId, childId); getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC); continue; } @@ -413,6 +418,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pMidRetriveRes); blockDataDestroy(pInfo->pMidPulloverRes); pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); + taosArrayDestroy(pInfo->pMidPullDatas); if (pInfo->pState->dump == 1) { taosMemoryFreeClear(pInfo->pState->pTdbState->pOwner); @@ -642,9 +648,12 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request - if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { + qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh); + if (IS_MID_INTERVAL_OP(pOperator)) { + SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; + taosArrayPush(pInfo->pMidPullDatas, &winRes); + } else if (savePullWindow(&pull, pPullWins) == TSDB_CODE_SUCCESS) { addPullWindow(pMap, &winRes, numOfCh); - qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh); } } } @@ -1191,11 +1200,6 @@ static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } - if (pInfo->recvPullover) { - pInfo->recvPullover = false; - printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidPulloverRes; - } return NULL; } @@ -1301,7 +1305,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL); if (IS_FINAL_INTERVAL_OP(pOperator)) { int32_t chId = getChildIndex(pBlock); addRetriveWindow(delWins, pInfo, chId); @@ -1337,7 +1341,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->recvRetrive = true; copyDataBlock(pInfo->pMidRetriveRes, pBlock); pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE; - doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, NULL, pInfo->pUpdatedMap, NULL); break; } continue; @@ -1567,6 +1571,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->clearState = false; + pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -3973,7 +3978,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { - doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap, NULL); continue; } else if (pBlock->info.type == STREAM_GET_ALL) { pInfo->recvGetAll = true; @@ -4266,6 +4271,34 @@ static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t num } } +static SSDataBlock* buildMidIntervalResult(SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + uint16_t opType = pOperator->operatorType; + + if (pInfo->recvPullover) { + pInfo->recvPullover = false; + printDataBlock(pInfo->pMidPulloverRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidPulloverRes; + } + + qDebug("===stream=== build mid interval result"); + doBuildDeleteResult(pInfo, pInfo->pMidPullDatas, &pInfo->midDelIndex, pInfo->pDelRes); + if (pInfo->pDelRes->info.rows != 0) { + // process the rest of the data + printDataBlock(pInfo->pDelRes, getStreamOpName(opType), GET_TASKID(pTaskInfo)); + return pInfo->pDelRes; + } + + if (pInfo->recvRetrive) { + pInfo->recvRetrive = false; + printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pMidRetriveRes; + } + + return NULL; +} + static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4294,10 +4327,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { return resBlock; } - if (pInfo->recvRetrive) { - pInfo->recvRetrive = false; - printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidRetriveRes; + resBlock = buildMidIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; } if (pInfo->clearState) { @@ -4345,7 +4377,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, pInfo->pFinalPullDataMap); removeResults(delWins, pInfo->pUpdatedMap); taosArrayAddAll(pInfo->pDelWins, delWins); taosArrayDestroy(delWins); @@ -4381,7 +4413,7 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_MID_RETRIEVE) { SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); - doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap); + doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, NULL); addMidRetriveWindow(delWins, pInfo->pPullDataMap, pInfo->numOfChild); taosArrayDestroy(delWins); pInfo->recvRetrive = true; @@ -4426,10 +4458,9 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { return resBlock; } - if (pInfo->recvRetrive) { - pInfo->recvRetrive = false; - printDataBlock(pInfo->pMidRetriveRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); - return pInfo->pMidRetriveRes; + resBlock = buildMidIntervalResult(pOperator); + if (resBlock != NULL) { + return resBlock; } if (pInfo->clearState) { diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index 5eb9eef010..7c62eaed81 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -259,7 +259,7 @@ sql insert into ts4 values(1648791213001,1,12,3,1.0); $loop_count = 0 -loop3: +loop4: $loop_count = $loop_count + 1 if $loop_count == 20 then @@ -276,7 +276,7 @@ if $rows != 1 then print $data00 $data01 $data02 print $data10 $data11 $data12 print $data20 $data21 $data22 - goto loop3 + goto loop4 endi print 2 select * from streamt5; @@ -287,7 +287,7 @@ if $rows != 1 then print $data00 $data01 $data02 print $data10 $data11 $data12 print $data20 $data21 $data22 - goto loop3 + goto loop4 endi print 3 select * from streamt3; From aabab3b4f762d53b2617fecd168f511383c857f2 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 21 Feb 2024 19:12:34 +0800 Subject: [PATCH 02/24] init mid op --- source/libs/executor/src/streamtimewindowoperator.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 69c4e04897..bac1f9de05 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1569,6 +1569,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pInfo->recvRetrive = false; pInfo->pMidRetriveRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); + pInfo->recvPullover = false; pInfo->pMidPulloverRes = createSpecialDataBlock(STREAM_MID_RETRIEVE); pInfo->clearState = false; pInfo->pMidPullDatas = taosArrayInit(4, sizeof(SWinKey)); From 3d3f8ced235038349442fca74032756659aafe89 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 28 Feb 2024 10:24:50 +0800 Subject: [PATCH 03/24] fix:open mult agg logic for test --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index ee85a909e7..8b19c01010 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -265,7 +265,7 @@ bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds -int tsStreamAggCnt = 1000; +int tsStreamAggCnt = 2; char tsS3Endpoint[TSDB_FQDN_LEN] = ""; char tsS3AccessKey[TSDB_FQDN_LEN] = ""; From e009e5ab8c613e486cf226c9b9c6fcedc38fadcc Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 1 Mar 2024 15:58:24 +0800 Subject: [PATCH 04/24] fix:modify agg count to default 10 --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 18331e6fe4..bd6ea8d7fb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -268,7 +268,7 @@ bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds -int tsStreamAggCnt = 2; +int tsStreamAggCnt = 10; bool tsDisableCount = true; char tsS3Endpoint[TSDB_FQDN_LEN] = ""; From fd408b63daa0b04dfb8501dd279550c06086b8ec Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 12 Mar 2024 16:40:36 +0800 Subject: [PATCH 05/24] feat: add showAD feature for community --- tools/shell/src/shellEngine.c | 28 ++++++++++++++++++++++++---- 1 file changed, 24 insertions(+), 4 deletions(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 995d3d04ec..cd9aa45c5e 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -56,7 +56,7 @@ static void shellWriteHistory(); static void shellPrintError(TAOS_RES *tres, int64_t st); static bool shellIsCommentLine(char *line); static void shellSourceFile(const char *file); -static void shellGetGrantInfo(); +static bool shellGetGrantInfo(); static void shellCleanup(void *arg); static void *shellCancelHandler(void *arg); @@ -1150,7 +1150,17 @@ void shellSourceFile(const char *file) { taosCloseFile(&pFile); } -void shellGetGrantInfo() { +// show enterprise AD +void showAD() { + fprintf(stdout, "You are using the TDengine Community Edition. \ + If you want to experience more advanced TDengine features and have professional service, \ + please try the TDengine Enterprise Edition.\r\n\ + https://www.taosdata.com/tdengine-enterprise + "); +} + +bool shellGetGrantInfo() { + bool community = true; char sinfo[1024] = {0}; tstrncpy(sinfo, taos_get_server_info(shell.conn), sizeof(sinfo)); strtok(sinfo, "\r\n"); @@ -1194,15 +1204,19 @@ void shellGetGrantInfo() { memcpy(expired, row[2], fields[2].bytes); if (strcmp(serverVersion, "community") == 0) { - fprintf(stdout, "Server is Community Edition.\r\n"); + community = true; + showAD() } else if (strcmp(expiretime, "unlimited") == 0) { + community = false; fprintf(stdout, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo); } else { + community = false; fprintf(stdout, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo, expiretime); } taos_free_result(tres); + return community; } fprintf(stdout, "\r\n"); @@ -1367,7 +1381,7 @@ int32_t shellExecute() { #ifndef WINDOWS printfIntroduction(); #endif - shellGetGrantInfo(); + bool community = shellGetGrantInfo(); #ifdef WEBSOCKET } #endif @@ -1380,6 +1394,12 @@ int32_t shellExecute() { break; } } + + // commnuity + if (community) { + showAD(); + } + taosThreadJoin(spid, NULL); shellCleanupHistory(); From 1e7cfefc5ec1abc7200e6d1f25dd70d3dc8a6e8a Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Tue, 12 Mar 2024 17:34:18 +0800 Subject: [PATCH 06/24] fix: build error --- tools/shell/src/shellEngine.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index cd9aa45c5e..4b1fc3ba2a 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1152,11 +1152,11 @@ void shellSourceFile(const char *file) { // show enterprise AD void showAD() { - fprintf(stdout, "You are using the TDengine Community Edition. \ - If you want to experience more advanced TDengine features and have professional service, \ + fprintf(stdout, "\r\n\ + You are using the TDengine Community Edition. \r\n\ + If you want to experience more advanced TDengine features and have professional service,\r\n\ please try the TDengine Enterprise Edition.\r\n\ - https://www.taosdata.com/tdengine-enterprise - "); + https://www.taosdata.com/tdengine-enterprise\r\n\r\n"); } bool shellGetGrantInfo() { @@ -1175,7 +1175,7 @@ bool shellGetGrantInfo() { code != TSDB_CODE_PAR_PERMISSION_DENIED) { fprintf(stderr, "Failed to check Server Edition, Reason:0x%04x:%s\r\n\r\n", code, taos_errstr(tres)); } - return; + return community; } int32_t num_fields = taos_field_count(tres); @@ -1205,7 +1205,7 @@ bool shellGetGrantInfo() { if (strcmp(serverVersion, "community") == 0) { community = true; - showAD() + showAD(); } else if (strcmp(expiretime, "unlimited") == 0) { community = false; fprintf(stdout, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo); @@ -1216,10 +1216,10 @@ bool shellGetGrantInfo() { } taos_free_result(tres); - return community; } fprintf(stdout, "\r\n"); + return community; } #ifdef WINDOWS From c13338d96568490df978fe01939a75ec098a2f25 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 13 Mar 2024 16:16:44 +0800 Subject: [PATCH 07/24] stream event window mem leak --- source/libs/executor/src/streameventwindowoperator.c | 7 +++++++ source/libs/executor/src/streamtimewindowoperator.c | 9 ++------- source/libs/stream/src/streamSessionState.c | 3 +-- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 11d8d1487a..ef5c2572d9 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -99,6 +99,11 @@ int32_t getEndCondIndex(bool* pEnd, int32_t start, int32_t rows) { return -1; } +int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { + pAPI->streamStateReleaseBuf(pState, pPos, true); + return TSDB_CODE_SUCCESS; +} + void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupId, bool* pStart, bool* pEnd, int32_t index, int32_t rows, SEventWindowInfo* pCurWin, SSessionKey* pNextWinKey) { int32_t code = TSDB_CODE_SUCCESS; int32_t size = pAggSup->resultRowSize; @@ -143,6 +148,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI pCurWin->winInfo.isOutput = false; _end: + reuseOutputBuf(pAggSup->pState, pCurWin->winInfo.pStatePos, &pAggSup->stateStore); pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pCur); pNextWinKey->groupId = groupId; code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, pNextWinKey, NULL, 0); @@ -341,6 +347,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } if (isWindowIncomplete(&curWin)) { + releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAggSup->stateStore); continue; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index 7a1bb2729c..9085694aed 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1845,11 +1845,6 @@ int32_t releaseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { return TSDB_CODE_SUCCESS; } -int32_t reuseOutputBuf(void* pState, SRowBuffPos* pPos, SStateStore* pAPI) { - pAPI->streamStateReleaseBuf(pState, pPos, true); - return TSDB_CODE_SUCCESS; -} - void removeSessionResult(SStreamAggSupporter* pAggSup, SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) { SSessionKey key = {0}; getSessionHashKey(pKey, &key); @@ -2495,7 +2490,7 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { return; } SResultWindowInfo* pWinInfo = taosArrayGet(pAllWins, size - 1); - SSessionKey* pSeKey = pWinInfo->pStatePos->pKey; + SSessionKey* pSeKey = &pWinInfo->sessionWin; taosArrayPush(pMaxWins, pSeKey); if (pSeKey->groupId == 0) { return; @@ -2503,7 +2498,7 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { uint64_t preGpId = pSeKey->groupId; for (int32_t i = size - 2; i >= 0; i--) { pWinInfo = taosArrayGet(pAllWins, i); - pSeKey = pWinInfo->pStatePos->pKey; + pSeKey = &pWinInfo->sessionWin; if (preGpId != pSeKey->groupId) { taosArrayPush(pMaxWins, pSeKey); preGpId = pSeKey->groupId; diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 723f04c499..06f7b6a268 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -310,7 +310,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream int32_t size = taosArrayGetSize(pWinStates); if (pCur->buffIndex >= 0) { if (pCur->buffIndex >= size) { - pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, size); + pNewPos = addNewSessionWindow(pFileState, pWinStates, pWinKey); goto _end; } pNewPos = insertNewSessionWindow(pFileState, pWinStates, pWinKey, pCur->buffIndex); @@ -332,7 +332,6 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream } _end: - memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); (*ppVal) = pNewPos; return TSDB_CODE_SUCCESS; } From 987e0b845fae8bdf4f80cc25e60c115ee28166b5 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 13 Mar 2024 17:16:42 +0800 Subject: [PATCH 08/24] feat: version2 modify tip msg --- tools/shell/inc/shellAuto.h | 3 +++ tools/shell/src/shellAuto.c | 17 ++++++++++++++++- tools/shell/src/shellEngine.c | 21 ++++++++------------- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/tools/shell/inc/shellAuto.h b/tools/shell/inc/shellAuto.h index 6a317fe5c9..a4fca51ee7 100644 --- a/tools/shell/inc/shellAuto.h +++ b/tools/shell/inc/shellAuto.h @@ -41,6 +41,9 @@ void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb); // introduction void printfIntroduction(); +// show enterprise AD at start or end +void showAD(bool end); + // show all commands help void showHelp(); diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 847bbcf4be..5fdbbe1be7 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -400,7 +400,7 @@ SMatch* lastMatch = NULL; // save last match result int cntDel = 0; // delete byte count after next press tab // show auto tab introduction -void printfIntroduction() { +void printfIntroduction(bool community) { printf(" ******************************** Tab Completion ************************************\n"); char secondLine[160] = "\0"; sprintf(secondLine, " * The %s CLI supports tab completion for a variety of items, ", shell.info.cusName); @@ -420,9 +420,24 @@ void printfIntroduction() { printf(" * [ Ctrl + L ] ...... clear the entire screen *\n"); printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n"); printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n"); + if(community) { + printf(" * ----------------------------------------------------------------------------- *\n"); + printf(" * You are using TDengine OSS. To experience more advanced TDengine features and *\n"); + printf(" * receive professional technical support, try TDengine Enterprise: *\n"); + printf(" * http://www.tdengine.com *\n"); + } printf(" **************************************************************************************\n\n"); } +// show enterprise AD +void showAD(bool end) { + printf("\n"); + printf(" You are using TDengine OSS. To experience more advanced TDengine features and \n"); + printf(" receive professional technical support, try TDengine Enterprise: \n"); + printf(" http://www.tdengine.com \n"); + printf("\n"); +} + void showHelp() { printf("\nThe %s CLI supports the following commands:", shell.info.cusName); printf( diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 4b1fc3ba2a..512f188fba 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1150,15 +1150,6 @@ void shellSourceFile(const char *file) { taosCloseFile(&pFile); } -// show enterprise AD -void showAD() { - fprintf(stdout, "\r\n\ - You are using the TDengine Community Edition. \r\n\ - If you want to experience more advanced TDengine features and have professional service,\r\n\ - please try the TDengine Enterprise Edition.\r\n\ - https://www.taosdata.com/tdengine-enterprise\r\n\r\n"); -} - bool shellGetGrantInfo() { bool community = true; char sinfo[1024] = {0}; @@ -1205,7 +1196,6 @@ bool shellGetGrantInfo() { if (strcmp(serverVersion, "community") == 0) { community = true; - showAD(); } else if (strcmp(expiretime, "unlimited") == 0) { community = false; fprintf(stdout, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo); @@ -1378,10 +1368,15 @@ int32_t shellExecute() { #ifdef WEBSOCKET if (!shell.args.restful && !shell.args.cloud) { #endif +bool community = shellGetGrantInfo(); #ifndef WINDOWS - printfIntroduction(); + printfIntroduction(community); +#else + if(community) { + showAD(false) + } #endif - bool community = shellGetGrantInfo(); + #ifdef WEBSOCKET } #endif @@ -1397,7 +1392,7 @@ int32_t shellExecute() { // commnuity if (community) { - showAD(); + showAD(true); } taosThreadJoin(spid, NULL); From ef32863f0754da1de15ad1ed81666f3768f00863 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Wed, 13 Mar 2024 19:44:59 +0800 Subject: [PATCH 09/24] stream event window mem leak --- source/libs/stream/src/streamSessionState.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 06f7b6a268..295132a4f5 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -327,6 +327,7 @@ int32_t allocSessioncWinBuffByNextPosition(SStreamFileState* pFileState, SStream } } pNewPos = getNewRowPosForWrite(pFileState); + memcpy(pNewPos->pKey, pWinKey, sizeof(SSessionKey)); pNewPos->needFree = true; pNewPos->beFlushed = true; } From cca0bf6af037c9dc863f3f672db234855e7fd6d8 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 13 Mar 2024 20:36:43 +0800 Subject: [PATCH 10/24] fix: last modify information --- tools/shell/inc/shellAuto.h | 2 +- tools/shell/src/shellAuto.c | 17 ++++++++--------- tools/shell/src/shellEngine.c | 13 +++++++++---- 3 files changed, 18 insertions(+), 14 deletions(-) diff --git a/tools/shell/inc/shellAuto.h b/tools/shell/inc/shellAuto.h index a4fca51ee7..bcf500fefc 100644 --- a/tools/shell/inc/shellAuto.h +++ b/tools/shell/inc/shellAuto.h @@ -39,7 +39,7 @@ void shellAutoExit(); void callbackAutoTab(char* sqlstr, TAOS* pSql, bool usedb); // introduction -void printfIntroduction(); +void printfIntroduction(bool community); // show enterprise AD at start or end void showAD(bool end); diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 5fdbbe1be7..c279b03b14 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -421,21 +421,20 @@ void printfIntroduction(bool community) { printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n"); printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n"); if(community) { - printf(" * ----------------------------------------------------------------------------- *\n"); - printf(" * You are using TDengine OSS. To experience more advanced TDengine features and *\n"); - printf(" * receive professional technical support, try TDengine Enterprise: *\n"); - printf(" * http://www.tdengine.com *\n"); + printf(" * ---------------------------------------------------------------------------------- *\n"); + printf(" * You are using TDengine OSS. To experience more advanced features and receive *\n"); + printf(" * professional technical support, try TDengine Enterprise or Cloud, *\n"); + printf(" * learn more at https://tdengine.com *\n"); } printf(" **************************************************************************************\n\n"); } // show enterprise AD void showAD(bool end) { - printf("\n"); - printf(" You are using TDengine OSS. To experience more advanced TDengine features and \n"); - printf(" receive professional technical support, try TDengine Enterprise: \n"); - printf(" http://www.tdengine.com \n"); - printf("\n"); + printf(" You are using TDengine OSS. To experience more advanced features and receive \n"); + printf(" professional technical support, try TDengine Enterprise or Cloud, \n"); + printf(" learn more at https://tdengine.com \n"); + printf(" \n"); } void showHelp() { diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 512f188fba..69625fe01f 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1150,7 +1150,7 @@ void shellSourceFile(const char *file) { taosCloseFile(&pFile); } -bool shellGetGrantInfo() { +bool shellGetGrantInfo(char* buf) { bool community = true; char sinfo[1024] = {0}; tstrncpy(sinfo, taos_get_server_info(shell.conn), sizeof(sinfo)); @@ -1198,10 +1198,10 @@ bool shellGetGrantInfo() { community = true; } else if (strcmp(expiretime, "unlimited") == 0) { community = false; - fprintf(stdout, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo); + fprintf(buf, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo); } else { community = false; - fprintf(stdout, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo, + fprintf(buf, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo, expiretime); } @@ -1368,7 +1368,8 @@ int32_t shellExecute() { #ifdef WEBSOCKET if (!shell.args.restful && !shell.args.cloud) { #endif -bool community = shellGetGrantInfo(); +char buf[512] = ""; +bool community = shellGetGrantInfo(buf); #ifndef WINDOWS printfIntroduction(community); #else @@ -1376,6 +1377,10 @@ bool community = shellGetGrantInfo(); showAD(false) } #endif +// printf version +if(!community) { + printf(buf); +} #ifdef WEBSOCKET } From ef553bf2104d0544b3674f7b5cd16715d0e6f20a Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 13 Mar 2024 20:40:01 +0800 Subject: [PATCH 11/24] fix:build error --- tools/shell/src/shellEngine.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 69625fe01f..59013cb6ff 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1198,10 +1198,10 @@ bool shellGetGrantInfo(char* buf) { community = true; } else if (strcmp(expiretime, "unlimited") == 0) { community = false; - fprintf(buf, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo); + sprintf(buf, "Server is Enterprise %s Edition, %s and will never expire.\r\n", serverVersion, sinfo); } else { community = false; - fprintf(buf, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo, + sprintf(buf, "Server is Enterprise %s Edition, %s and will expire at %s.\r\n", serverVersion, sinfo, expiretime); } @@ -1379,7 +1379,7 @@ bool community = shellGetGrantInfo(buf); #endif // printf version if(!community) { - printf(buf); + printf("%s", buf); } #ifdef WEBSOCKET From f90ba610af8b9f68c2f66ebed8c39b72d435271c Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Wed, 13 Mar 2024 20:43:03 +0800 Subject: [PATCH 12/24] fix:build error1 --- tools/shell/src/shellEngine.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 59013cb6ff..3b58d32d0a 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1379,7 +1379,7 @@ bool community = shellGetGrantInfo(buf); #endif // printf version if(!community) { - printf("%s", buf); + printf("%s\n", buf); } #ifdef WEBSOCKET From 01a7052a6f156632d10e7c30629cff52c67cb6ed Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 13 Mar 2024 22:06:35 +0800 Subject: [PATCH 13/24] fix:set tsStreamAggCnt big enough to disable multi agg operator --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 271fa20c54..33ff3e1248 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -268,7 +268,7 @@ bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; bool tsFilterScalarMode = false; int tsResolveFQDNRetryTime = 100; // seconds -int tsStreamAggCnt = 10; +int tsStreamAggCnt = 100000; char tsS3Endpoint[TSDB_FQDN_LEN] = ""; char tsS3AccessKey[TSDB_FQDN_LEN] = ""; From 2b7fd0d15c50075cb89c51e1410317791b86c2d3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Mar 2024 10:04:14 +0800 Subject: [PATCH 14/24] fix(tsdb): set strict varchar length check. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 90a26d17dc..d740f9491c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -658,11 +658,12 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int colDataSetNULL(pColInfoData, rowIndex); } else { varDataSetLen(pSup->buildBuf[colIndex], pColVal->value.nData); - if (pColVal->value.nData > pColInfoData->info.bytes) { + if ((pColVal->value.nData + VARSTR_HEADER_SIZE) > pColInfoData->info.bytes) { tsdbWarn("column cid:%d actual data len %d is bigger than schema len %d", pColVal->cid, pColVal->value.nData, pColInfoData->info.bytes); return TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER; } + if (pColVal->value.nData > 0) { // pData may be null, if nData is 0 memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData); } From 190b02dd1aa7973cb7eb99ca2d6ccb99c2a86d09 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Mar 2024 10:04:43 +0800 Subject: [PATCH 15/24] fix(stream):set correct version for sink/agg tasks. --- source/dnode/vnode/src/tq/tq.c | 2 + source/libs/stream/src/streamTask.c | 73 +++++++++++++++-------------- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 47900d540c..f98a69097b 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -860,6 +860,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer, pTask->info.selfChildId, pTask->info.taskLevel, p, pNext, pTask->info.fillHistory, (int32_t)pTask->hTaskInfo.id.taskId, pTask->info.triggerParam, nextProcessVer); + + ASSERT(pChkInfo->checkpointVer <= pChkInfo->nextProcessVer); } return 0; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index cf606f2fc9..959d1382a3 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -458,12 +458,49 @@ void tFreeStreamTask(SStreamTask* pTask) { stDebug("s-task:0x%x free task completed", taskId); } +static void setInitialVersionInfo(SStreamTask* pTask, int64_t ver) { + SCheckpointInfo* pChkInfo = &pTask->chkInfo; + SDataRange* pRange = &pTask->dataRange; + + // only set the version info for stream tasks without fill-history task + if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) { + pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint + pChkInfo->processedVer = ver - 1; // already processed version + pChkInfo->nextProcessVer = ver; // next processed version + + pRange->range.maxVer = ver; + pRange->range.minVer = ver; + } else { + // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task + // is set at the mnode. + if (pTask->info.fillHistory == 1) { + pChkInfo->checkpointVer = pRange->range.maxVer; + pChkInfo->processedVer = pRange->range.maxVer; + pChkInfo->nextProcessVer = pRange->range.maxVer + 1; + } else { + pChkInfo->checkpointVer = pRange->range.minVer - 1; + pChkInfo->processedVer = pRange->range.minVer - 1; + pChkInfo->nextProcessVer = pRange->range.minVer; + + { // for compatible purpose, remove it later + if (pRange->range.minVer == 0) { + pChkInfo->checkpointVer = 0; + pChkInfo->processedVer = 0; + pChkInfo->nextProcessVer = 1; + stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr); + } + } + } + } +} + int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) { pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL; + pTask->inputq.queue = streamQueueOpen(512 << 10); pTask->outputq.queue = streamQueueOpen(512 << 10); if (pTask->inputq.queue == NULL || pTask->outputq.queue == NULL) { @@ -481,41 +518,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i } pTask->execInfo.created = taosGetTimestampMs(); - SCheckpointInfo* pChkInfo = &pTask->chkInfo; - SDataRange* pRange = &pTask->dataRange; - - // only set the version info for stream tasks without fill-history task - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - if ((pTask->info.fillHistory == 0) && (!HAS_RELATED_FILLHISTORY_TASK(pTask))) { - pChkInfo->checkpointVer = ver - 1; // only update when generating checkpoint - pChkInfo->processedVer = ver - 1; // already processed version - pChkInfo->nextProcessVer = ver; // next processed version - - pRange->range.maxVer = ver; - pRange->range.minVer = ver; - } else { - // the initial value of processedVer/nextProcessVer/checkpointVer for stream task with related fill-history task - // is set at the mnode. - if (pTask->info.fillHistory == 1) { - pChkInfo->checkpointVer = pRange->range.maxVer; - pChkInfo->processedVer = pRange->range.maxVer; - pChkInfo->nextProcessVer = pRange->range.maxVer + 1; - } else { - pChkInfo->checkpointVer = pRange->range.minVer - 1; - pChkInfo->processedVer = pRange->range.minVer - 1; - pChkInfo->nextProcessVer = pRange->range.minVer; - - { // for compatible purpose, remove it later - if (pRange->range.minVer == 0) { - pChkInfo->checkpointVer = 0; - pChkInfo->processedVer = 0; - pChkInfo->nextProcessVer = 1; - stDebug("s-task:%s update the processedVer to 0 from -1 due to compatible purpose", pTask->id.idStr); - } - } - } - } - } + setInitialVersionInfo(pTask, ver); pTask->pMeta = pMeta; pTask->pMsgCb = pMsgCb; From 61ce6602d8e2d333aba53e56e7eaddbeb36beeaf Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 14 Mar 2024 10:09:23 +0800 Subject: [PATCH 16/24] fix: version3 information --- tools/shell/src/shellAuto.c | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index c279b03b14..0038866669 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -410,30 +410,30 @@ void printfIntroduction(bool community) { printf(" "); } printf("*\n"); - printf(" * including database names, table names, function names and keywords. *\n"); - printf(" * The full list of shortcut keys is as follows: *\n"); - printf(" * [ TAB ] ...... complete the current word *\n"); - printf(" * ...... if used on a blank line, display all supported commands *\n"); - printf(" * [ Ctrl + A ] ...... move cursor to the st[A]rt of the line *\n"); - printf(" * [ Ctrl + E ] ...... move cursor to the [E]nd of the line *\n"); - printf(" * [ Ctrl + W ] ...... move cursor to the middle of the line *\n"); - printf(" * [ Ctrl + L ] ...... clear the entire screen *\n"); - printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n"); - printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n"); + printf(" * including database names, table names, function names and keywords. *\n"); + printf(" * The full list of shortcut keys is as follows: *\n"); + printf(" * [ TAB ] ...... complete the current word *\n"); + printf(" * ...... if used on a blank line, display all supported commands *\n"); + printf(" * [ Ctrl + A ] ...... move cursor to the st[A]rt of the line *\n"); + printf(" * [ Ctrl + E ] ...... move cursor to the [E]nd of the line *\n"); + printf(" * [ Ctrl + W ] ...... move cursor to the middle of the line *\n"); + printf(" * [ Ctrl + L ] ...... clear the entire screen *\n"); + printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n"); + printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n"); if(community) { - printf(" * ---------------------------------------------------------------------------------- *\n"); - printf(" * You are using TDengine OSS. To experience more advanced features and receive *\n"); - printf(" * professional technical support, try TDengine Enterprise or Cloud, *\n"); - printf(" * learn more at https://tdengine.com *\n"); + printf(" * ----------------------------------------------------------------------------------- *\n"); + printf(" * You are using TDengine OSS. To experience advanced features, like backup/restore, *\n"); + printf(" * privilege control and more, or receive 7x24 technical support, try TDengine *\n"); + printf(" * Enterprise or Free Cloud Trial. Learn more at https://tdengine.com *\n"); } printf(" **************************************************************************************\n\n"); } // show enterprise AD void showAD(bool end) { - printf(" You are using TDengine OSS. To experience more advanced features and receive \n"); - printf(" professional technical support, try TDengine Enterprise or Cloud, \n"); - printf(" learn more at https://tdengine.com \n"); + printf(" You are using TDengine OSS. To experience advanced features, like backup/restore, \n"); + printf(" privilege control and more, or receive 7x24 technical support, try TDengine \n"); + printf(" Enterprise or Free Cloud Trial. Learn more at https://tdengine.com \n"); printf(" \n"); } From 16c277b7ec2f166b88246b699522f433ad30aa5c Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 14 Mar 2024 10:13:51 +0800 Subject: [PATCH 17/24] fix: version4 information --- tools/shell/src/shellAuto.c | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 0038866669..79a4124fb7 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -401,12 +401,12 @@ int cntDel = 0; // delete byte count after next press tab // show auto tab introduction void printfIntroduction(bool community) { - printf(" ******************************** Tab Completion ************************************\n"); + printf(" ********************************* Tab Completion *************************************\n"); char secondLine[160] = "\0"; sprintf(secondLine, " * The %s CLI supports tab completion for a variety of items, ", shell.info.cusName); printf("%s", secondLine); int secondLineLen = strlen(secondLine); - while (87 - (secondLineLen++) > 0) { + while (89 - (secondLineLen++) > 0) { printf(" "); } printf("*\n"); @@ -422,18 +422,18 @@ void printfIntroduction(bool community) { printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n"); if(community) { printf(" * ----------------------------------------------------------------------------------- *\n"); - printf(" * You are using TDengine OSS. To experience advanced features, like backup/restore, *\n"); + printf(" * You are using TDengine OSS. To experience advanced features, like backup/restore *\n"); printf(" * privilege control and more, or receive 7x24 technical support, try TDengine *\n"); printf(" * Enterprise or Free Cloud Trial. Learn more at https://tdengine.com *\n"); } - printf(" **************************************************************************************\n\n"); + printf(" ****************************************************************************************\n\n"); } // show enterprise AD void showAD(bool end) { - printf(" You are using TDengine OSS. To experience advanced features, like backup/restore, \n"); - printf(" privilege control and more, or receive 7x24 technical support, try TDengine \n"); - printf(" Enterprise or Free Cloud Trial. Learn more at https://tdengine.com \n"); + printf(" You are using TDengine OSS. To experience advanced features, like backup/restore \n"); + printf(" privilege control and more, or receive 7x24 technical support, try TDengine Enterprise \n"); + printf(" or Free Cloud Trial. Learn more at https://tdengine.com \n"); printf(" \n"); } From e38ff4e7bf43b28ba753f31dc6651bbe9ee8e1b7 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 14 Mar 2024 10:15:56 +0800 Subject: [PATCH 18/24] fix: version4 information --- tools/shell/src/shellAuto.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 79a4124fb7..6ab94761f7 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -422,7 +422,7 @@ void printfIntroduction(bool community) { printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n"); if(community) { printf(" * ----------------------------------------------------------------------------------- *\n"); - printf(" * You are using TDengine OSS. To experience advanced features, like backup/restore *\n"); + printf(" * You are using TDengine OSS. To experience advanced features, like backup/restore, *\n"); printf(" * privilege control and more, or receive 7x24 technical support, try TDengine *\n"); printf(" * Enterprise or Free Cloud Trial. Learn more at https://tdengine.com *\n"); } @@ -431,7 +431,7 @@ void printfIntroduction(bool community) { // show enterprise AD void showAD(bool end) { - printf(" You are using TDengine OSS. To experience advanced features, like backup/restore \n"); + printf(" You are using TDengine OSS. To experience advanced features, like backup/restore, \n"); printf(" privilege control and more, or receive 7x24 technical support, try TDengine Enterprise \n"); printf(" or Free Cloud Trial. Learn more at https://tdengine.com \n"); printf(" \n"); From 18a669219efa5d9b374f7a6be67b363b553076ce Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 14 Mar 2024 10:21:55 +0800 Subject: [PATCH 19/24] fix: version4 information --- tools/shell/src/shellAuto.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/shell/src/shellAuto.c b/tools/shell/src/shellAuto.c index 6ab94761f7..f39fbfbdf5 100644 --- a/tools/shell/src/shellAuto.c +++ b/tools/shell/src/shellAuto.c @@ -421,10 +421,10 @@ void printfIntroduction(bool community) { printf(" * [ Ctrl + K ] ...... clear the screen after the cursor *\n"); printf(" * [ Ctrl + U ] ...... clear the screen before the cursor *\n"); if(community) { - printf(" * ----------------------------------------------------------------------------------- *\n"); - printf(" * You are using TDengine OSS. To experience advanced features, like backup/restore, *\n"); - printf(" * privilege control and more, or receive 7x24 technical support, try TDengine *\n"); - printf(" * Enterprise or Free Cloud Trial. Learn more at https://tdengine.com *\n"); + printf(" * ------------------------------------------------------------------------------------ *\n"); + printf(" * You are using TDengine OSS. To experience advanced features, like backup/restore, *\n"); + printf(" * privilege control and more, or receive 7x24 technical support, try TDengine *\n"); + printf(" * Enterprise or Free Cloud Trial. Learn more at https://tdengine.com *\n"); } printf(" ****************************************************************************************\n\n"); } From 40c99f23553fcfd3d0eb433bcadd518ff4422c2f Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 14 Mar 2024 12:56:59 +0800 Subject: [PATCH 20/24] fix: websocket mode test --- tools/shell/src/shellEngine.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 3b58d32d0a..7dc2d05352 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1373,9 +1373,11 @@ bool community = shellGetGrantInfo(buf); #ifndef WINDOWS printfIntroduction(community); #else +#ifndef WEBSOCKET if(community) { showAD(false) } +#endif #endif // printf version if(!community) { @@ -1394,11 +1396,12 @@ if(!community) { break; } } - +#ifndef WEBSOCKET // commnuity if (community) { showAD(true); } +#endif taosThreadJoin(spid, NULL); From 5ff836db247c71c186ac3350b6808c5fe6451da3 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Thu, 14 Mar 2024 14:09:27 +0800 Subject: [PATCH 21/24] fix: windows build --- tools/shell/src/shellEngine.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 7dc2d05352..5c71396ea9 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -1375,7 +1375,7 @@ bool community = shellGetGrantInfo(buf); #else #ifndef WEBSOCKET if(community) { - showAD(false) + showAD(false); } #endif #endif From 3b14017c9ad25140db7dcea20d19b58f2ffd9ebe Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Thu, 14 Mar 2024 14:11:54 +0800 Subject: [PATCH 22/24] Update 14-stream.md --- docs/zh/12-taos-sql/14-stream.md | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index 8868b728f8..2ed3c9afae 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -30,7 +30,7 @@ subquery: SELECT select_list [window_clause] ``` -支持会话窗口、状态窗口与滑动窗口,其中,会话窗口与状态窗口搭配超级表时必须与partition by tbname一起使用 +支持会话窗口、状态窗口、滑动窗口、事件窗口和计数窗口,其中,状态窗口、事件窗口和计数窗口搭配超级表时必须与partition by tbname一起使用 stb_name 是保存计算结果的超级表的表名,如果该超级表不存在,会自动创建;如果已存在,则检查列的schema信息。详见 写入已存在的超级表 @@ -60,7 +60,11 @@ COUNT_WINDOW 是计数窗口,按固定的数据行数来划分窗口。 count_va 窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) -例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 +例如,如下语句创建流式计算。第一个流计算,自动创建名为 avg_vol 的超级表,以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 + +第二个流计算,自动创建名为 streamt0 的超级表,将数据按时间戳的顺序,以 voltage < 0 作为窗口的开始条件,voltage > 9作为窗口的结束条件,划分窗口做聚合运算,并将来自 meters 表的数据的计算结果写入 streamt0 表,不同 partition 的数据会分别创建子表并写入不同子表。 + +第三个流计算,自动创建名为 streamt1 的超级表,将数据按时间戳的顺序,以10条数据为一组,划分窗口做聚合运算,并将来自 meters 表的数据的计算结果写入 streamt1 表,不同 partition 的数据会分别创建子表并写入不同子表。 ```sql CREATE STREAM avg_vol_s INTO avg_vol AS From bc015c0aa01f72ac89f8a76b24cedd2ea09b311a Mon Sep 17 00:00:00 2001 From: liuyao <38781207+54liuyao@users.noreply.github.com> Date: Thu, 14 Mar 2024 14:15:41 +0800 Subject: [PATCH 23/24] Update 06-stream.md --- docs/zh/07-develop/06-stream.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/07-develop/06-stream.md b/docs/zh/07-develop/06-stream.md index 4a5ebb96e8..a6c8b02772 100644 --- a/docs/zh/07-develop/06-stream.md +++ b/docs/zh/07-develop/06-stream.md @@ -8,7 +8,7 @@ title: 流式计算 TDengine 3.0 的流式计算引擎提供了实时处理写入的数据流的能力,使用 SQL 定义实时流变换,当数据被写入流的源表后,数据会被以定义的方式自动处理,并根据定义的触发模式向目的表推送结果。它提供了替代复杂流处理系统的轻量级解决方案,并能够在高吞吐的数据写入的情况下,提供毫秒级的计算结果延迟。 -流式计算可以包含数据过滤,标量函数计算(含UDF),以及窗口聚合(支持滑动窗口、会话窗口与状态窗口),可以以超级表、子表、普通表为源表,写入到目的超级表。在创建流时,目的超级表将被自动创建,随后新插入的数据会被流定义的方式处理并写入其中,通过 partition by 子句,可以以表名或标签划分 partition,不同的 partition 将写入到目的超级表的不同子表。 +流式计算可以包含数据过滤,标量函数计算(含UDF),以及窗口聚合(支持滑动窗口、会话窗口、状态窗口、事件窗口与计数窗口),可以以超级表、子表、普通表为源表,写入到目的超级表。在创建流时,目的超级表将被自动创建,随后新插入的数据会被流定义的方式处理并写入其中,通过 partition by 子句,可以以表名或标签划分 partition,不同的 partition 将写入到目的超级表的不同子表。 TDengine 的流式计算能够支持分布在多个 vnode 中的超级表聚合;还能够处理乱序数据的写入:它提供了 watermark 机制以度量容忍数据乱序的程度,并提供了 ignore expired 配置项以决定乱序数据的处理策略——丢弃或者重新计算。 From cc0b32b1f83ca1ebe895e03cb8b81077450f58a9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Mar 2024 16:13:41 +0800 Subject: [PATCH 24/24] fix(stream): transfer state before do checkpoint, to avoid the retrieve and deadlock by using waiting . --- source/dnode/vnode/src/tq/tq.c | 11 ++++- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamCheckpoint.c | 29 ++++++++---- source/libs/stream/src/streamDispatch.c | 4 +- source/libs/stream/src/streamExec.c | 57 ++++++++++++++--------- source/libs/stream/src/streamTask.c | 4 +- 6 files changed, 70 insertions(+), 37 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f98a69097b..ffebd783ac 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1205,8 +1205,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamProcessCheckpointSourceReq(pTask, &req); taosThreadMutexUnlock(&pTask->lock); - qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d", pTask->id.idStr, - vgId, pTask->info.taskLevel, req.checkpointId, req.transId); + if (req.mndTrigger) { + qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 ", transId:%d, ", pTask->id.idStr, + vgId, pTask->info.taskLevel, req.checkpointId, req.transId); + } else { + const char* pPrevStatus = streamTaskGetStatusStr(streamTaskGetPrevStatus(pTask)); + qInfo("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg chkpt:%" PRId64 + ", transId:%d after transfer-state, prev status:%s", + pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, req.transId, pPrevStatus); + } code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 87f63b48ed..830fbdcfff 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -130,7 +130,7 @@ int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); const char* streamQueueItemGetTypeStr(int32_t type); SStreamQueueItem* streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); -int32_t streamTransferStateToStreamTask(SStreamTask* pTask); +int32_t streamTransferStatePrepare(SStreamTask* pTask); SStreamQueue* streamQueueOpen(int64_t cap); void streamQueueClose(SStreamQueue* pQueue, int32_t taskId); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 607e31bfe6..f58c72eded 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -300,6 +300,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { taosThreadMutexLock(&p->lock); SStreamTaskState* pStatus = streamTaskGetStatus(p); + ETaskStatus prevStatus = pStatus->state; + if (pStatus->state == TASK_STATUS__CK) { ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId && pCKInfo->checkpointVer <= pCKInfo->processedVer); @@ -325,8 +327,9 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) { } stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64 - ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: normal, prev:%s", - vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, pStatus->name); + ", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: ready, prev:%s", + vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer, + streamTaskGetStatusStr(prevStatus)); // save the task if not sink task if (p->info.taskLevel <= TASK_LEVEL__SINK) { @@ -437,9 +440,11 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { if (type == UPLOAD_DISABLE) { return 0; } + if (pTask == NULL || pTask->pBackend == NULL) { return 0; } + SAsyncUploadArg* arg = taosMemoryCalloc(1, sizeof(SAsyncUploadArg)); arg->type = type; arg->taskId = taosStrdup(taskId); @@ -448,16 +453,19 @@ int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) { return streamMetaAsyncExec(pTask->pMeta, doUploadChkp, arg, NULL); } -int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { - int32_t code = TSDB_CODE_SUCCESS; - int64_t startTs = pTask->chkInfo.startTs; - int64_t ckId = pTask->chkInfo.checkpointingId; - const char* id = pTask->id.idStr; - bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); - // sink task do not need to save the status, and generated the checkpoint +int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { + int32_t code = TSDB_CODE_SUCCESS; + int64_t startTs = pTask->chkInfo.startTs; + int64_t ckId = pTask->chkInfo.checkpointingId; + const char* id = pTask->id.idStr; + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + SStreamMeta* pMeta = pTask->pMeta; + + // sink task does not need to save the status, and generated the checkpoint if (pTask->info.taskLevel != TASK_LEVEL__SINK) { stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId); + code = streamBackendDoCheckpoint(pTask->pBackend, ckId); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno)); @@ -500,10 +508,11 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId}; stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId); - streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId, 1); + streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &hTaskId, 1); } else { stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId); } + taosThreadMutexUnlock(&pTask->lock); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 78b914c3db..b76195f214 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1086,10 +1086,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { - stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId); + stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to prepare transfer state", id, msgId); ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + code = streamTransferStatePrepare(pTask); if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 27cd98aac6..fa1b508e23 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,7 +21,7 @@ #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data #define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms -static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask); +static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); bool streamTaskShouldStop(const SStreamTask* pTask) { SStreamTaskState* pState = streamTaskGetStatus(pTask); @@ -316,7 +316,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { } } -int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { +int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; const char* id = pTask->id.idStr; @@ -340,9 +340,9 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } else { double el = (taosGetTimestampMs() - pTask->execInfo.step2Start) / 1000.; stDebug( - "s-task:%s fill-history task end, scan wal elapsed time:%.2fSec,update related stream task:%s info, transfer " - "exec state", - id, el, pStreamTask->id.idStr); + "s-task:%s fill-history task end, status:%s, scan wal elapsed time:%.2fSec, update related stream task:%s " + "info, prepare transfer exec state", + id, streamTaskGetStatus(pTask)->name, el, pStreamTask->id.idStr); } ETaskStatus status = streamTaskGetStatus(pStreamTask)->state; @@ -365,9 +365,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { } } - // wait for the stream task to handle all in the inputQ, and to be idle - waitForTaskIdle(pTask, pStreamTask); - // In case of sink tasks, no need to halt them. // In case of source tasks and agg tasks, we should HALT them, and wait for them to be idle. And then, it's safe to // start the task state transfer procedure. @@ -393,17 +390,14 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { stDebug("s-task:%s no need to update/reset filter time window for non-source tasks", pStreamTask->id.idStr); } - // 2. transfer the ownership of executor state - streamTaskReleaseState(pTask); - streamTaskReloadState(pStreamTask); - - // 3. send msg to mnode to launch a checkpoint to keep the state for current stream + // NOTE: transfer the ownership of executor state before handle the checkpoint block during stream exec + // 2. send msg to mnode to launch a checkpoint to keep the state for current stream streamTaskSendCheckpointReq(pStreamTask); - // 4. assign the status to the value that will be kept in disk + // 3. assign the status to the value that will be kept in disk pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask)->state; - // 5. open the inputQ for all upstream tasks + // 4. open the inputQ for all upstream tasks streamTaskOpenAllUpstreamInput(pStreamTask); streamMetaReleaseTask(pMeta, pStreamTask); @@ -416,7 +410,7 @@ static int32_t haltCallback(SStreamTask* pTask, void* param) { return TSDB_CODE_SUCCESS; } -int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { +int32_t streamTransferStatePrepare(SStreamTask* pTask) { int32_t code = TSDB_CODE_SUCCESS; SStreamMeta* pMeta = pTask->pMeta; @@ -424,7 +418,7 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { int32_t level = pTask->info.taskLevel; if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) { // do transfer task operator states. - code = streamDoTransferStateToStreamTask(pTask); + code = streamTransferStateDoPrepare(pTask); } else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task. SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId); @@ -540,7 +534,7 @@ int32_t streamProcessTransstateBlock(SStreamTask* pTask, SStreamDataBlock* pBloc stDebug("s-task:%s non-dispatch task, level:%d start to transfer state directly", id, level); ASSERT(pTask->info.fillHistory == 1); - code = streamTransferStateToStreamTask(pTask); + code = streamTransferStatePrepare(pTask); if (code != TSDB_CODE_SUCCESS) { /*int8_t status = */ streamTaskSetSchedStatusInactive(pTask); } @@ -621,10 +615,31 @@ int32_t doStreamExecTask(SStreamTask* pTask) { } } - int64_t st = taosGetTimestampMs(); + if (type == STREAM_INPUT__CHECKPOINT) { + // transfer the state from fill-history to related stream task before generating the checkpoint. + bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT); + if (dropRelHTask) { + ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask)); - const SStreamQueueItem* pItem = pInput; - stDebug("s-task:%s start to process batch of blocks, num:%d, type:%d", id, numOfBlocks, pItem->type); + STaskId* pHTaskId = &pTask->hTaskInfo.id; + SStreamTask* pHTask = streamMetaAcquireTask(pTask->pMeta, pHTaskId->streamId, pHTaskId->taskId); + if (pHTask != NULL) { + // 2. transfer the ownership of executor state + streamTaskReleaseState(pHTask); + streamTaskReloadState(pTask); + stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr, + streamTaskGetStatus(pHTask)->name); + + streamMetaReleaseTask(pTask->pMeta, pHTask); + } else { + stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id, + (int32_t)pHTaskId->taskId); + } + } + } + + int64_t st = taosGetTimestampMs(); + stDebug("s-task:%s start to process batch of blocks, num:%d, type:%s", id, numOfBlocks, streamQueueItemGetTypeStr(type)); int64_t ver = pTask->chkInfo.processedVer; doSetStreamInputBlock(pTask, pInput, &ver, id); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 959d1382a3..af11cc9ddb 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -791,8 +791,10 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool CLEAR_RELATED_FILLHISTORY_TASK((*ppStreamTask)); if (resetRelHalt) { + stDebug("s-task:0x%" PRIx64 " set the persistent status attr to be ready, prev:%s, status in sm:%s", + sTaskId.taskId, streamTaskGetStatusStr((*ppStreamTask)->status.taskStatus), + streamTaskGetStatus(*ppStreamTask)->name); (*ppStreamTask)->status.taskStatus = TASK_STATUS__READY; - stDebug("s-task:0x%" PRIx64 " set the status to be ready", sTaskId.taskId); } streamMetaSaveTask(pMeta, *ppStreamTask);