diff --git a/docs/zh/14-reference/03-taos-sql/14-stream.md b/docs/zh/14-reference/03-taos-sql/14-stream.md index 6baad76e23..dc0f404fcf 100644 --- a/docs/zh/14-reference/03-taos-sql/14-stream.md +++ b/docs/zh/14-reference/03-taos-sql/14-stream.md @@ -291,3 +291,4 @@ RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name; CREATE SNODE ON DNODE [id] ``` 其中的 id 是集群中的 dnode 的序号。请注意选择的dnode,流计算的中间状态将自动在其上进行备份。 +从 3.3.4.0 版本开始,在多副本环境中创建流会进行 snode 的**存在性检查**,要求首先创建 snode。如果 snode 不存在,无法创建流。 diff --git a/source/dnode/mnode/impl/inc/mndStream.h b/source/dnode/mnode/impl/inc/mndStream.h index b97eaf31d1..c9155f536c 100644 --- a/source/dnode/mnode/impl/inc/mndStream.h +++ b/source/dnode/mnode/impl/inc/mndStream.h @@ -133,6 +133,7 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream); bool mndStreamNodeIsUpdated(SMnode *pMnode); +int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb); int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId); int32_t mndProcessStreamHb(SRpcMsg *pReq); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index a4327b777f..e3f9adf033 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -795,12 +795,22 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { } if (createReq.sql != NULL) { - sqlLen = strlen(createReq.sql); - sql = taosMemoryMalloc(sqlLen + 1); + sql = taosStrdup(createReq.sql); TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno); + } - memset(sql, 0, sqlLen + 1); - memcpy(sql, createReq.sql, sqlLen); + SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB); + if (pSourceDb == NULL) { + code = terrno; + mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB, + tstrerror(code)); + goto _OVER; + } + + code = mndCheckForSnode(pMnode, pSourceDb); + mndReleaseDb(pMnode, pSourceDb); + if (code != 0) { + goto _OVER; } // build stream obj from request diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 6e48c58b30..423d9df4b6 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1497,6 +1497,30 @@ bool mndStreamNodeIsUpdated(SMnode *pMnode) { return updated; } +int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SSnodeObj *pObj = NULL; + + if (pSrcDb->cfg.replications == 1) { + return TSDB_CODE_SUCCESS; + } else { + while (1) { + pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj); + if (pIter == NULL) { + break; + } + + sdbRelease(pSdb, pObj); + sdbCancelFetch(pSdb, pIter); + return TSDB_CODE_SUCCESS; + } + + mError("snode not existed when trying to create stream in db with multiple replica"); + return TSDB_CODE_SNODE_NOT_DEPLOYED; + } +} + uint32_t seed = 0; static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) { SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature}; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index b359cdfc81..f88cfb9f59 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -887,7 +887,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI } int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) { - int32_t code; + int32_t code = 0; int32_t tlen = 0; int32_t vgId = pTask->pMeta->vgId; const char* id = pTask->id.idStr; diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 817d9f049a..aeaa2567dd 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1317,6 +1317,7 @@ ,,y,script,./test.sh -f tsim/stream/basic2.sim ,,y,script,./test.sh -f tsim/stream/basic3.sim ,,y,script,./test.sh -f tsim/stream/basic4.sim +,,y,script,./test.sh -f tsim/stream/snodeCheck.sim ,,y,script,./test.sh -f tsim/stream/checkpointInterval0.sim ,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim ,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim diff --git a/tests/script/tsim/stream/snodeCheck.sim b/tests/script/tsim/stream/snodeCheck.sim new file mode 100644 index 0000000000..f4ab8c8124 --- /dev/null +++ b/tests/script/tsim/stream/snodeCheck.sim @@ -0,0 +1,64 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/deploy.sh -n dnode2 -i 2 +system sh/deploy.sh -n dnode3 -i 3 + +system sh/cfg.sh -n dnode1 -c supportVnodes -v 4 +system sh/cfg.sh -n dnode2 -c supportVnodes -v 4 +system sh/cfg.sh -n dnode3 -c supportVnodes -v 4 + +print ========== step1 +system sh/exec.sh -n dnode1 -s start +sql connect + +print ========== step2 +sql create dnode $hostname port 7200 +system sh/exec.sh -n dnode2 -s start + +sql create dnode $hostname port 7300 +system sh/exec.sh -n dnode3 -s start + +$x = 0 +step2: + $x = $x + 1 + sleep 1000 + if $x == 10 then + print ====> dnode not ready! + return -1 + endi +sql select * from information_schema.ins_dnodes +print ===> $data00 $data01 $data02 $data03 $data04 $data05 +print ===> $data10 $data11 $data12 $data13 $data14 $data15 +if $rows != 3 then + return -1 +endi +if $data(1)[4] != ready then + goto step2 +endi +if $data(2)[4] != ready then + goto step2 +endi + +print ========== step3 +sql drop database if exists test; +sql create database if not exists test vgroups 4 replica 3 precision "ms" ; +sql use test; + +sql create table test.test (ts timestamp, c1 int) tags (t1 int) ; + +print create stream without snode existing +sql_error create stream stream_t1 trigger at_once into str_dst as select count(*) from test interval(20s); + +print create snode +sql create snode on dnode 1; + +sql create stream stream_t1 trigger at_once into str_dst as select count(*) from test interval(20s); + +print drop snode and then create stream +sql drop snode on dnode 1; + +sql_error create stream stream_t2 trigger at_once into str_dst as select count(*) from test interval(20s); + +system sh/exec.sh -n dnode1 -s stop -x SIGINT +system sh/exec.sh -n dnode2 -s stop -x SIGINT +system sh/exec.sh -n dnode3 -s stop -x SIGINT diff --git a/tests/script/tsim/testsuit.sim b/tests/script/tsim/testsuit.sim index c208a07488..ff3bb82aaf 100644 --- a/tests/script/tsim/testsuit.sim +++ b/tests/script/tsim/testsuit.sim @@ -102,6 +102,7 @@ run tsim/stream/triggerInterval0.sim run tsim/stream/triggerSession0.sim run tsim/stream/distributeIntervalRetrive0.sim run tsim/stream/basic0.sim +run tsim/stream/snodeCheck.sim run tsim/stream/session0.sim run tsim/stream/schedSnode.sim run tsim/stream/partitionby.sim