From 29ba8ffb2d0da91cb1632cff4f7cdebc91d4404a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 5 Oct 2024 16:12:59 +0800 Subject: [PATCH 1/6] refactor: init the local variable value. --- source/libs/stream/src/streamTask.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 71a2ed3e4a..53b262dff5 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -884,7 +884,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI } int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) { - int32_t code; + int32_t code = 0; int32_t tlen = 0; int32_t vgId = pTask->pMeta->vgId; const char* id = pTask->id.idStr; From 6170bb5c1cd550536c86c2bf3279dcb78a10ef58 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 9 Oct 2024 09:39:32 +0800 Subject: [PATCH 2/6] enh(stream): check the existence for snode. --- source/dnode/mnode/impl/inc/mndStream.h | 1 + source/dnode/mnode/impl/src/mndStream.c | 19 ++++++++++++---- source/dnode/mnode/impl/src/mndStreamUtil.c | 24 +++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 2 +- source/libs/stream/src/streamTimer.c | 2 +- 5 files changed, 42 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b97eaf31d1..ebc2e50065 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -133,6 +133,7 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream); bool mndStreamNodeIsUpdated(SMnode *pMnode); +bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3ec99f6e44..d486570300 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -801,12 +801,23 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } if (createReq.sql != NULL) { - sqlLen = strlen(createReq.sql); - sql = taosMemoryMalloc(sqlLen + 1); + sql = taosStrdup(createReq.sql); TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno); + } - memset(sql, 0, sqlLen + 1); - memcpy(sql, createReq.sql, sqlLen); + SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB); + if (pSourceDb == NULL) { + code = terrno; + mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB, + tstrerror(code)); + goto _OVER; + } + + bool snodeCheckSucc = mndCheckForSnode(pMnode, pSourceDb); + mndReleaseDb(pMnode, pSourceDb); + if (!snodeCheckSucc) { + code = TSDB_CODE_SNODE_NOT_DEPLOYED; + goto _OVER; } // build stream obj from request diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 6e48c58b30..206422b795 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1497,6 +1497,30 @@ bool mndStreamNodeIsUpdated(SMnode *pMnode) { return updated; } +bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SSnodeObj *pObj = NULL; + + if (pSrcDb->cfg.replications == 1) { + return true; + } else { + while (1) { + pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + sdbRelease(pSdb, pObj); + sdbCancelFetch(pSdb, pIter); + return true; + } + + mError("snode not existed when trying to create stream in db with multiple replica"); + return false; + } +} + uint32_t seed = 0; static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) { SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 9dbf16cb48..ab8eede0dd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -629,7 +629,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg } break; case TDMT_STREAM_TASK_DEPLOY: { - int32_t code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len); + code = tqProcessTaskDeployReq(pVnode->pTq, ver, pReq, len); if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _err; diff --git a/source/libs/stream/src/streamTimer.c b/source/libs/stream/src/streamTimer.c index 8b77fe7cb1..0da9acfd1d 100644 --- a/source/libs/stream/src/streamTimer.c +++ b/source/libs/stream/src/streamTimer.c @@ -56,7 +56,7 @@ void streamTmrStart(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* pParam, void* } } - stDebug("vgId:%d start %s tmr succ", vgId, pMsg); + stTrace("vgId:%d start %s tmr succ", vgId, pMsg); } void streamTmrStop(tmr_h tmrId) { From 39495ec935a617aad3d7358dc6dc51cf41abe49a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 9 Oct 2024 10:07:08 +0800 Subject: [PATCH 3/6] refactor: do some internal refactor. --- source/dnode/mnode/impl/inc/mndStream.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 5 ++--- source/dnode/mnode/impl/src/mndStreamUtil.c | 8 ++++---- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index ebc2e50065..c9155f536c 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -133,7 +133,7 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream); bool mndStreamNodeIsUpdated(SMnode *pMnode); -bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb); +int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index d486570300..9c22543265 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -813,10 +813,9 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - bool snodeCheckSucc = mndCheckForSnode(pMnode, pSourceDb); + code = mndCheckForSnode(pMnode, pSourceDb); mndReleaseDb(pMnode, pSourceDb); - if (!snodeCheckSucc) { - code = TSDB_CODE_SNODE_NOT_DEPLOYED; + if (code != 0) { goto _OVER; } diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 206422b795..423d9df4b6 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1497,13 +1497,13 @@ bool mndStreamNodeIsUpdated(SMnode *pMnode) { return updated; } -bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) { +int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SSnodeObj *pObj = NULL; if (pSrcDb->cfg.replications == 1) { - return true; + return TSDB_CODE_SUCCESS; } else { while (1) { pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); @@ -1513,11 +1513,11 @@ bool mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) { sdbRelease(pSdb, pObj); sdbCancelFetch(pSdb, pIter); - return true; + return TSDB_CODE_SUCCESS; } mError("snode not existed when trying to create stream in db with multiple replica"); - return false; + return TSDB_CODE_SNODE_NOT_DEPLOYED; } } From a5501f30461b19be30f15a47c3fa82f8f6f3cec9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 29 Oct 2024 10:07:08 +0800 Subject: [PATCH 4/6] test: add test cases. --- tests/script/tsim/stream/snodeCheck.sim | 64 +++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 tests/script/tsim/stream/snodeCheck.sim diff --git a/tests/script/tsim/stream/snodeCheck.sim b/tests/script/tsim/stream/snodeCheck.sim new file mode 100644 index 0000000000..f4ab8c8124 --- /dev/null +++ b/tests/script/tsim/stream/snodeCheck.sim @@ -0,0 +1,64 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 4 +system sh/cfg.sh -n dnode2 -c supportVnodes -v 4 +system sh/cfg.sh -n dnode3 -c supportVnodes -v 4 + +print ========== step1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print ========== step2 +sql create dnode $hostname port 7200 +system sh/exec.sh -n dnode2 -s start + +sql create dnode $hostname port 7300 +system sh/exec.sh -n dnode3 -s start + +$x = 0 +step2: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql select * from information_schema.ins_dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 3 then + return -1 +endi +if $data(1)[4] != ready then + goto step2 +endi +if $data(2)[4] != ready then + goto step2 +endi + +print ========== step3 +sql drop database if exists test; +sql create database if not exists test vgroups 4 replica 3 precision "ms" ; +sql use test; + +sql create table test.test (ts timestamp, c1 int) tags (t1 int) ; + +print create stream without snode existing +sql_error create stream stream_t1 trigger at_once into str_dst as select count(*) from test interval(20s); + +print create snode +sql create snode on dnode 1; + +sql create stream stream_t1 trigger at_once into str_dst as select count(*) from test interval(20s); + +print drop snode and then create stream +sql drop snode on dnode 1; + +sql_error create stream stream_t2 trigger at_once into str_dst as select count(*) from test interval(20s); + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT From 18c1b76cb4a20205f21dfe3af6464853729135b8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 29 Oct 2024 10:09:07 +0800 Subject: [PATCH 5/6] test: add into test suite --- tests/parallel_test/cases.task | 1 + tests/script/tsim/testsuit.sim | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 09216add82..2cf0ec7dc3 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1317,6 +1317,7 @@ ,,y,script,./test.sh -f tsim/stream/basic2.sim ,,y,script,./test.sh -f tsim/stream/basic3.sim ,,y,script,./test.sh -f tsim/stream/basic4.sim +,,y,script,./test.sh -f tsim/stream/snodeCheck.sim ,,y,script,./test.sh -f tsim/stream/checkpointInterval0.sim ,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim ,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim index c208a07488..ff3bb82aaf 100644 --- a/tests/script/tsim/testsuit.sim +++ b/tests/script/tsim/testsuit.sim @@ -102,6 +102,7 @@ run tsim/stream/triggerInterval0.sim run tsim/stream/triggerSession0.sim run tsim/stream/distributeIntervalRetrive0.sim run tsim/stream/basic0.sim +run tsim/stream/snodeCheck.sim run tsim/stream/session0.sim run tsim/stream/schedSnode.sim run tsim/stream/partitionby.sim From d70db52bb953ae2bf83c72af9ccf708d5e62e031 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 31 Oct 2024 09:29:05 +0800 Subject: [PATCH 6/6] Update 14-stream.md --- docs/zh/14-reference/03-taos-sql/14-stream.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/zh/14-reference/03-taos-sql/14-stream.md b/docs/zh/14-reference/03-taos-sql/14-stream.md index cd5c76a4ad..9567381f7d 100644 --- a/docs/zh/14-reference/03-taos-sql/14-stream.md +++ b/docs/zh/14-reference/03-taos-sql/14-stream.md @@ -291,3 +291,4 @@ RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name; CREATE SNODE ON DNODE [id] ``` 其中的 id 是集群中的 dnode 的序号。请注意选择的dnode,流计算的中间状态将自动在其上进行备份。 +从 3.3.4.0 版本开始,在多副本环境中创建流会进行 snode 的**存在性检查**,要求首先创建 snode。如果 snode 不存在,无法创建流。