From b79313ecd79c2549601081cc2b7c7f92113603f8 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 25 Apr 2023 16:59:09 +0800 Subject: [PATCH] pause&resume --- source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 3 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 2 + source/dnode/mnode/impl/src/mndStream.c | 2 + source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 +- tests/script/tsim/stream/pauseAndResume.sim | 270 ++++++++++++++++++++ 6 files changed, 280 insertions(+), 3 deletions(-) create mode 100644 tests/script/tsim/stream/pauseAndResume.sim diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index fbc17ced59..3bf748ea29 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -183,6 +183,9 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DEPLOY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index d61eb3ec03..cb7178dec2 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -538,6 +538,8 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 20f7bbd17f..e2e09cc0b4 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -67,6 +67,8 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_PAUSE_RSP, mndTransProcessRsp); + mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_RESUME_RSP, mndTransProcessRsp); // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr); // mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 56fb47eaf1..c616f1eaa5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1223,7 +1223,7 @@ int32_t tqProcessTaskResumeReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms tqDebug("vgId:%d s-task:%s set normal flag", pTq->pStreamMeta->vgId, pTask->id.idStr); streamSetStatusNormal(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqStreamTasksScanWal(pTq); + tqStartStreamTasks(pTq); } return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index ec6d9dea48..842e14afa9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -410,12 +410,12 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp } } break; case TDMT_STREAM_TASK_PAUSE: { - if (tqProcessTaskPauseReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + if (pVnode->restored && tqProcessTaskPauseReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } } break; case TDMT_STREAM_TASK_RESUME: { - if (tqProcessTaskResumeReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { + if (pVnode->restored && tqProcessTaskResumeReq(pVnode->pTq, version, pMsg->pCont, pMsg->contLen) < 0) { goto _err; } } break; diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim new file mode 100644 index 0000000000..e1e2a67a22 --- /dev/null +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -0,0 +1,270 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 + +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print ===== step1 +sql drop stream if exists streams1; +sql drop database if exists test; +sql create database test vgroups 10; +sql use test; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create table ts3 using st tags(3,2,2); +sql create table ts4 using st tags(4,2,2); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); + +sql pause stream streams1; + +sql insert into ts1 values(1648791213001,1,12,3,1.0); +sql insert into ts2 values(1648791213001,1,12,3,1.0); + +sql insert into ts3 values(1648791213001,1,12,3,1.0); +sql insert into ts4 values(1648791213001,1,12,3,1.0); + +sleep 1000 +print 1 select * from streamt1; +sql select * from streamt1; + +if $rows != 0 then + print =1====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + return -1 +endi + +sql resume stream streams1; + +$loop_count = 0 +loop0: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 500 + +print 2 select * from streamt1; +sql select * from streamt1; + +if $rows != 1 then + print =====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + goto loop0 +endi + +if $data01 != 4 then + print =====data01=$data01 + goto loop0 +endi + +sql insert into ts1 values(1648791223002,2,2,3,1.1); +sql insert into ts2 values(1648791223002,3,2,3,2.1); +sql insert into ts3 values(1648791223002,4,2,43,73.1); +sql insert into ts4 values(1648791223002,24,22,23,4.1); + +$loop_count = 0 +loop1: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 500 + +print 3 select * from streamt1; +sql select * from streamt1; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + goto loop1 +endi + +if $data01 != 4 then + print =====data01=$data01 + goto loop1 +endi + +if $data11 != 4 then + print =====data01=$data01 + goto loop1 +endi + +print ===== step 1 over + +print ===== step2 +sql drop stream if exists streams2; +sql drop database if exists test2; +sql create database test2 vgroups 1; +sql use test2; +sql create table t1(ts timestamp, a int, b int , c int, d double); + +sql create stream streams2 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt2 as select _wstart, count(*) c1, sum(a) c3 from t1 interval(10s); + +sql pause stream streams2; + +sql insert into t1 values(1648791213001,1,12,3,1.0); + +sleep 1000 +print 1 select * from streamt2; +sql select * from streamt2; + +if $rows != 0 then + print =1====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + return -1 +endi + +sql resume stream streams2; + +$loop_count = 0 +loop10: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 500 + +print 2 select * from streamt2; +sql select * from streamt2; + +if $rows != 1 then + print =====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + goto loop10 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop10 +endi + +sql insert into t1 values(1648791223002,2,2,3,1.1); + +$loop_count = 0 + +loop2: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 500 + +print 3 select * from streamt2; +sql select * from streamt2; + +if $rows != 2 then + print =====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + goto loop2 +endi + +if $data01 != 1 then + print =====data01=$data01 + goto loop2 +endi + +if $data11 != 1 then + print =====data01=$data01 + goto loop2 +endi + +print ===== step 2 over + + +print ===== step3 +sql drop stream if exists streams3; +sql drop database if exists test3; +sql create database test3 vgroups 10; +sql use test3; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); +sql create table ts2 using st tags(2,2,2); +sql create table ts3 using st tags(3,2,2); +sql create table ts4 using st tags(4,2,2); +sql create stream streams3 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt3 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); +sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt4 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); +sql create stream streams5 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt5 as select _wstart, count(*) c1, sum(a) c3 from ts1 interval(10s); + +sql pause stream streams3; + +sql insert into ts1 values(1648791213001,1,12,3,1.0); +sql insert into ts2 values(1648791213001,1,12,3,1.0); + +sql insert into ts3 values(1648791213001,1,12,3,1.0); +sql insert into ts4 values(1648791213001,1,12,3,1.0); + + +$loop_count = 0 +loop3: + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +sleep 500 + +print 1 select * from streamt4; +sql select * from streamt4; + +if $rows != 1 then + print =====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + goto loop3 +endi + +print 2 select * from streamt5; +sql select * from streamt5; + +if $rows != 1 then + print =====rows=$rows + print $data00 $data01 $data02 + print $data10 $data11 $data12 + print $data20 $data21 $data22 + goto loop3 +endi + +print 3 select * from streamt3; +sql select * from streamt3; +if $rows != 0 then + print =====rows=$rows + return -1 +endi + +print ===== step 3 over + +print ===== step 4 + +sql_error pause stream streams3333333; +sql pause stream IF EXISTS streams44444; + +sql_error resume stream streams5555555; +sql resume stream IF EXISTS streams66666666; + +print ===== step 4 over + +system sh/stop_dnodes.sh