From 8df1f09561420ffbb0d98a1ba5248d60d3081783 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 1 Mar 2024 11:27:33 +0800 Subject: [PATCH 1/9] optimze plan and add ci --- source/libs/parser/src/parTranslater.c | 42 +++++++++++------------- tests/script/tsim/query/query_count1.sim | 22 +++++++++++++ 2 files changed, 42 insertions(+), 22 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index cc522c9b5f..3a622eeea6 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4009,6 +4009,26 @@ static int32_t translateEventWindow(STranslateContext* pCxt, SSelectStmt* pSelec } static int32_t translateCountWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { + SCountWindowNode* pCountWin = (SCountWindowNode*)pSelect->pWindow; + if (pCountWin->windowCount <= 1) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Size of Count window must exceed 1."); + } + + if (pCountWin->windowSliding <= 0) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Size of Count window must exceed 0."); + } + + if (pCountWin->windowSliding > pCountWin->windowCount) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "sliding value no larger than the count value."); + } + + if (pCountWin->windowCount > INT32_MAX) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Size of Count window must less than 2147483647(INT32_MAX)."); + } if (QUERY_NODE_TEMP_TABLE == nodeType(pSelect->pFromTable) && !isGlobalTimeLineQuery(((STempTableNode*)pSelect->pFromTable)->pSubquery)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_QUERY, @@ -7828,29 +7848,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm if (pStmt->pOptions->ignoreExpired != 1) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Ignore expired data of Count window must be 1."); - } - - SCountWindowNode* pCountWin = (SCountWindowNode*)pSelect->pWindow; - if (pCountWin->windowCount <= 1) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "Size of Count window must exceed 1."); } - - if (pCountWin->windowSliding <= 0) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "Size of Count window must exceed 0."); - } - - if (pCountWin->windowSliding > pCountWin->windowCount) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "sliding value no larger than the count value."); - } - - if (pCountWin->windowCount > INT32_MAX) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, - "Size of Count window must less than 2147483647(INT32_MAX)."); - } - } return TSDB_CODE_SUCCESS; diff --git a/tests/script/tsim/query/query_count1.sim b/tests/script/tsim/query/query_count1.sim index 0694ab062a..043b604263 100644 --- a/tests/script/tsim/query/query_count1.sim +++ b/tests/script/tsim/query/query_count1.sim @@ -79,5 +79,27 @@ if $data22 != 4 then goto loop3 endi + +print step2 +print =============== create database +sql create database test1 vgroups 1; +sql use test1; + +sql create stable st(ts timestamp,a int,b int,c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +#2~INT32_MAX +sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(-1); +sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(0); +sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(1); +sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2147483648); +sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(10, 0); +sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(10, -1); +sql_error select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(10, 11); + +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2); +sql select _wstart as s, count(*) c1, sum(b), max(c) from t1 count_window(2147483647); + print query_count0 end system sh/exec.sh -n dnode1 -s stop -x SIGINT From dfae8a9471e3bd3b9ed5697ea070759ae6488d89 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Fri, 1 Mar 2024 11:31:46 +0800 Subject: [PATCH 2/9] optimze plan and add ci --- source/libs/planner/src/planSpliter.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index d9a7475f59..9664be1da2 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -274,7 +274,7 @@ static bool stbSplNeedSplitWindow(bool streamQuery, SLogicNode* pNode) { } } - if (WINDOW_TYPE_STATE == pWindow->winType) { + if (WINDOW_TYPE_STATE == pWindow->winType || WINDOW_TYPE_COUNT == pWindow->winType) { if (!streamQuery) { return stbSplHasMultiTbScan(streamQuery, pNode); } else { From 5554362399a92249053b21b72efb8cace06945de Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 3 Mar 2024 16:41:16 +0800 Subject: [PATCH 3/9] fix(stream):remove unused def. --- include/libs/stream/tstream.h | 1 - 1 file changed, 1 deletion(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 64ce735843..cf980400d6 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -56,7 +56,6 @@ extern "C" { #define STREAM_EXEC_T_RESTART_ALL_TASKS (-4) #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) -#define STREAM_EXEC_T_UPDATE_TASK_EPSET (-7) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; From 550d0fe5aef660aa50eab83d833e62e715a969b0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 3 Mar 2024 16:46:13 +0800 Subject: [PATCH 4/9] fix(stream): add an assert. --- source/libs/stream/src/streamDispatch.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 78b914c3db..9702096b60 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -740,6 +740,8 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) { int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + ASSERT(dataStrLen > 0); + void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) return -1; From 0a814e97b554eae97a2e7b39f9ab3cbc937dece2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Mar 2024 09:26:54 +0800 Subject: [PATCH 5/9] enh(stream): reduce the threads requirements. --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c5a26c5c10..55df80ca44 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -58,7 +58,7 @@ int32_t tsNumOfMnodeQueryThreads = 4; int32_t tsNumOfMnodeFetchThreads = 1; int32_t tsNumOfMnodeReadThreads = 1; int32_t tsNumOfVnodeQueryThreads = 4; -float tsRatioOfVnodeStreamThreads = 1.5F; +float tsRatioOfVnodeStreamThreads = 0.5F; int32_t tsNumOfVnodeFetchThreads = 4; int32_t tsNumOfVnodeRsmaThreads = 2; int32_t tsNumOfQnodeQueryThreads = 4; From 80fd3e044546a16221816829e768ceaf095671cb Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Mar 2024 19:31:47 +0800 Subject: [PATCH 6/9] fix(stream): remove related fill-history if task in stop status. --- include/libs/stream/tstream.h | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 36 +++++++++++++++------- source/libs/stream/src/streamTask.c | 14 +-------- 3 files changed, 27 insertions(+), 25 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cf980400d6..0df6b76cf7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -782,7 +782,7 @@ bool streamTaskIsAllUpstreamClosed(SStreamTask* pTask); bool streamTaskSetSchedStatusWait(SStreamTask* pTask); int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask); int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask); -int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt, bool metaLock); +int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t clearRelHalt); int32_t streamTaskHandleEvent(SStreamTaskSM* pSM, EStreamTaskEvent event); int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a2d45062b9..8c66ecc6a9 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -142,8 +142,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); if (ppHTask == NULL || *ppHTask == NULL) { - tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already", - vgId, req.taskId); + tqError( + "vgId:%d failed to acquire fill-history task:0x%x when handling update, may have been dropped already, rel " + "stream task:0x%x", + vgId, (uint32_t)pTask->hTaskInfo.id.taskId, req.taskId); CLEAR_RELATED_FILLHISTORY_TASK(pTask); } else { tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); @@ -612,23 +614,35 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; + int32_t vgId = pMeta->vgId; + STaskId hTaskId = {0}; - int32_t vgId = pMeta->vgId; tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId); - SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId); - if (pTask != NULL) { - // drop the related fill-history task firstly + streamMetaWLock(pMeta); + + STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId}; + SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if ((ppTask != NULL) && ((*ppTask) != NULL)) { + streamMetaAcquireOneTask(*ppTask); + SStreamTask* pTask = *ppTask; + if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - STaskId* pHTaskId = &pTask->hTaskInfo.id; - streamMetaUnregisterTask(pMeta, pHTaskId->streamId, pHTaskId->taskId); - tqDebug("s-task:0x%x vgId:%d drop fill-history task:0x%x firstly", pReq->taskId, vgId, - (int32_t)pHTaskId->taskId); + hTaskId.streamId = pTask->hTaskInfo.id.streamId; + hTaskId.taskId = pTask->hTaskInfo.id.taskId; + streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt); } + streamMetaReleaseTask(pMeta, pTask); } - streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt, true); + streamMetaWUnLock(pMeta); + + // drop the related fill-history task firstly + if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { + streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); + tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); + } // drop the stream task now streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 9639921c77..68b4ba2296 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -766,21 +766,13 @@ int8_t streamTaskSetSchedStatusInactive(SStreamTask* pTask) { return status; } -int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool metaLock) { - if (pTask == NULL) { - return TSDB_CODE_SUCCESS; - } - +int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt) { SStreamMeta* pMeta = pTask->pMeta; STaskId sTaskId = {.streamId = pTask->streamTaskId.streamId, .taskId = pTask->streamTaskId.taskId}; if (pTask->info.fillHistory == 0) { return TSDB_CODE_SUCCESS; } - if (metaLock) { - streamMetaWLock(pMeta); - } - SStreamTask** ppStreamTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &sTaskId, sizeof(sTaskId)); if (ppStreamTask != NULL) { stDebug("s-task:%s clear the related stream task:0x%x attr to fill-history task", pTask->id.idStr, @@ -798,10 +790,6 @@ int32_t streamTaskClearHTaskAttr(SStreamTask* pTask, int32_t resetRelHalt, bool taosThreadMutexUnlock(&(*ppStreamTask)->lock); } - if (metaLock) { - streamMetaWUnLock(pMeta); - } - return TSDB_CODE_SUCCESS; } From ad1780dbdc8211a7fec7414ffecb904c6aa52a89 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Mar 2024 19:32:22 +0800 Subject: [PATCH 7/9] fix(stream): reset the pData after transferring state. --- source/libs/stream/inc/streamInt.h | 1 + source/libs/stream/src/streamTask.c | 4 +--- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 87f63b48ed..d0055d5400 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -99,6 +99,7 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); int32_t getNumOfDispatchBranch(SStreamTask* pTask); +void clearBufferedDispatchMsg(SStreamTask* pTask); int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 68b4ba2296..8be5b94096 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -412,9 +412,7 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList); if (pTask->msgInfo.pData != NULL) { - destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); - pTask->msgInfo.pData = NULL; - pTask->msgInfo.dispatchMsgType = 0; + clearBufferedDispatchMsg(pTask); } if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { From a38161299b2f64db97459d8f38e572e4624fc4ad Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Mar 2024 19:51:01 +0800 Subject: [PATCH 8/9] fix(stream): add clear msgData buf impl. --- source/libs/stream/src/streamDispatch.c | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9702096b60..dc790b5b2d 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -315,6 +315,16 @@ int32_t getNumOfDispatchBranch(SStreamTask* pTask) { : taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos); } +void clearBufferedDispatchMsg(SStreamTask* pTask) { + SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; + if (pMsgInfo->pData != NULL) { + destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask)); + } + + pMsgInfo->pData = NULL; + pMsgInfo->dispatchMsgType = 0; +} + static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pData) { int32_t code = 0; int32_t numOfBlocks = taosArrayGetSize(pData->blocks); @@ -678,8 +688,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { // todo deal with only partially success dispatch case atomic_store_32(&pTask->outputInfo.shuffleDispatcher.waitingRspCnt, 0); if (terrno == TSDB_CODE_APP_IS_STOPPING) { // in case of this error, do not retry anymore - destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); - pTask->msgInfo.pData = NULL; + clearBufferedDispatchMsg(pTask); return code; } @@ -938,15 +947,12 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) { // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); - destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask)); - bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); if (delayDispatch) { pTask->chkInfo.dispatchCheckpointTrigger = true; } - pTask->msgInfo.pData = NULL; - pTask->msgInfo.dispatchMsgType = 0; + clearBufferedDispatchMsg(pTask); int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; @@ -1086,7 +1092,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } else { // this message has been sent successfully, let's try next one. pTask->msgInfo.retryCount = 0; - // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state + // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) { stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId); ASSERT(pTask->info.fillHistory == 1); @@ -1095,6 +1101,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens } + clearBufferedDispatchMsg(pTask); + // now ready for next data output atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); } else { From 2ac803bf3c5975185f25be9fb62350dc945ce935 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 4 Mar 2024 22:16:41 +0800 Subject: [PATCH 9/9] fix(stream): clear and set task status. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 8c66ecc6a9..6a5bd444ef 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -630,9 +630,9 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { hTaskId.streamId = pTask->hTaskInfo.id.streamId; hTaskId.taskId = pTask->hTaskInfo.id.taskId; - streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt); } + streamTaskClearHTaskAttr(pTask, pReq->resetRelHalt); streamMetaReleaseTask(pMeta, pTask); }