Merge pull request #28253 from taosdata/fix/3_liaohj
enh(stream): check the existence for snode.
This commit is contained in:
commit
a31027359f
|
@ -291,3 +291,4 @@ RESUME STREAM [IF EXISTS] [IGNORE UNTREATED] stream_name;
|
|||
CREATE SNODE ON DNODE [id]
|
||||
```
|
||||
其中的 id 是集群中的 dnode 的序号。请注意选择的dnode,流计算的中间状态将自动在其上进行备份。
|
||||
从 3.3.4.0 版本开始,在多副本环境中创建流会进行 snode 的**存在性检查**,要求首先创建 snode。如果 snode 不存在,无法创建流。
|
||||
|
|
|
@ -133,6 +133,7 @@ int32_t mndStreamSetUpdateEpsetAction(SMnode *pMnode, SStreamObj *pStream, SVgr
|
|||
|
||||
int32_t mndGetStreamObj(SMnode *pMnode, int64_t streamId, SStreamObj** pStream);
|
||||
bool mndStreamNodeIsUpdated(SMnode *pMnode);
|
||||
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb);
|
||||
|
||||
int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool *hasEpset, int32_t taskId, int32_t nodeId);
|
||||
int32_t mndProcessStreamHb(SRpcMsg *pReq);
|
||||
|
|
|
@ -795,12 +795,22 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
if (createReq.sql != NULL) {
|
||||
sqlLen = strlen(createReq.sql);
|
||||
sql = taosMemoryMalloc(sqlLen + 1);
|
||||
sql = taosStrdup(createReq.sql);
|
||||
TSDB_CHECK_NULL(sql, code, lino, _OVER, terrno);
|
||||
}
|
||||
|
||||
memset(sql, 0, sqlLen + 1);
|
||||
memcpy(sql, createReq.sql, sqlLen);
|
||||
SDbObj *pSourceDb = mndAcquireDb(pMnode, createReq.sourceDB);
|
||||
if (pSourceDb == NULL) {
|
||||
code = terrno;
|
||||
mInfo("stream:%s failed to create, acquire source db %s failed, code:%s", createReq.name, createReq.sourceDB,
|
||||
tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndCheckForSnode(pMnode, pSourceDb);
|
||||
mndReleaseDb(pMnode, pSourceDb);
|
||||
if (code != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
// build stream obj from request
|
||||
|
|
|
@ -1497,6 +1497,30 @@ bool mndStreamNodeIsUpdated(SMnode *pMnode) {
|
|||
return updated;
|
||||
}
|
||||
|
||||
int32_t mndCheckForSnode(SMnode *pMnode, SDbObj *pSrcDb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SSnodeObj *pObj = NULL;
|
||||
|
||||
if (pSrcDb->cfg.replications == 1) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
sdbRelease(pSdb, pObj);
|
||||
sdbCancelFetch(pSdb, pIter);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
mError("snode not existed when trying to create stream in db with multiple replica");
|
||||
return TSDB_CODE_SNODE_NOT_DEPLOYED;
|
||||
}
|
||||
}
|
||||
|
||||
uint32_t seed = 0;
|
||||
static SRpcMsg createRpcMsg(STransAction* pAction, int64_t traceId, int64_t signature) {
|
||||
SRpcMsg rpcMsg = {.msgType = pAction->msgType, .contLen = pAction->contLen, .info.ahandle = (void *)signature};
|
||||
|
|
|
@ -887,7 +887,7 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
|
|||
}
|
||||
|
||||
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
|
||||
int32_t code;
|
||||
int32_t code = 0;
|
||||
int32_t tlen = 0;
|
||||
int32_t vgId = pTask->pMeta->vgId;
|
||||
const char* id = pTask->id.idStr;
|
||||
|
|
|
@ -1317,6 +1317,7 @@
|
|||
,,y,script,./test.sh -f tsim/stream/basic2.sim
|
||||
,,y,script,./test.sh -f tsim/stream/basic3.sim
|
||||
,,y,script,./test.sh -f tsim/stream/basic4.sim
|
||||
,,y,script,./test.sh -f tsim/stream/snodeCheck.sim
|
||||
,,y,script,./test.sh -f tsim/stream/checkpointInterval0.sim
|
||||
,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim
|
||||
,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
system sh/stop_dnodes.sh
|
||||
system sh/deploy.sh -n dnode1 -i 1
|
||||
system sh/deploy.sh -n dnode2 -i 2
|
||||
system sh/deploy.sh -n dnode3 -i 3
|
||||
|
||||
system sh/cfg.sh -n dnode1 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode2 -c supportVnodes -v 4
|
||||
system sh/cfg.sh -n dnode3 -c supportVnodes -v 4
|
||||
|
||||
print ========== step1
|
||||
system sh/exec.sh -n dnode1 -s start
|
||||
sql connect
|
||||
|
||||
print ========== step2
|
||||
sql create dnode $hostname port 7200
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
|
||||
sql create dnode $hostname port 7300
|
||||
system sh/exec.sh -n dnode3 -s start
|
||||
|
||||
$x = 0
|
||||
step2:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql select * from information_schema.ins_dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 3 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
goto step2
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step2
|
||||
endi
|
||||
|
||||
print ========== step3
|
||||
sql drop database if exists test;
|
||||
sql create database if not exists test vgroups 4 replica 3 precision "ms" ;
|
||||
sql use test;
|
||||
|
||||
sql create table test.test (ts timestamp, c1 int) tags (t1 int) ;
|
||||
|
||||
print create stream without snode existing
|
||||
sql_error create stream stream_t1 trigger at_once into str_dst as select count(*) from test interval(20s);
|
||||
|
||||
print create snode
|
||||
sql create snode on dnode 1;
|
||||
|
||||
sql create stream stream_t1 trigger at_once into str_dst as select count(*) from test interval(20s);
|
||||
|
||||
print drop snode and then create stream
|
||||
sql drop snode on dnode 1;
|
||||
|
||||
sql_error create stream stream_t2 trigger at_once into str_dst as select count(*) from test interval(20s);
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode3 -s stop -x SIGINT
|
|
@ -102,6 +102,7 @@ run tsim/stream/triggerInterval0.sim
|
|||
run tsim/stream/triggerSession0.sim
|
||||
run tsim/stream/distributeIntervalRetrive0.sim
|
||||
run tsim/stream/basic0.sim
|
||||
run tsim/stream/snodeCheck.sim
|
||||
run tsim/stream/session0.sim
|
||||
run tsim/stream/schedSnode.sim
|
||||
run tsim/stream/partitionby.sim
|
||||
|
|
Loading…
Reference in New Issue