From 63a6428e341ffd0a3d012332daea125d16976b16 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 17 Dec 2024 17:59:53 +0800 Subject: [PATCH] fix(stream):adjust status of ins_streams --- source/dnode/mnode/impl/src/mndStream.c | 11 -- source/dnode/mnode/impl/src/mndStreamUtil.c | 48 +++++- tests/script/tsim/stream/pauseAndResume.sim | 162 ++++++++++++++++++++ 3 files changed, 207 insertions(+), 14 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0a107518df..32446a6fd3 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1680,11 +1680,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { mInfo("stream:%s,%" PRId64 " start to pause stream", pauseReq.name, pStream->uid); - if (pStream->status == STREAM_STATUS__PAUSE) { - sdbRelease(pMnode->pSdb, pStream); - return 0; - } - if ((code = mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb)) != 0) { sdbRelease(pMnode->pSdb, pStream); return code; @@ -1773,7 +1768,6 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { // pause stream taosWLockLatch(&pStream->lock); - pStream->status = STREAM_STATUS__PAUSE; code = mndPersistTransLog(pStream, pTrans, SDB_STATUS_READY); if (code) { taosWUnLockLatch(&pStream->lock); @@ -1824,11 +1818,6 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } } - if (pStream->status != STREAM_STATUS__PAUSE) { - sdbRelease(pMnode->pSdb, pStream); - return 0; - } - mInfo("stream:%s,%" PRId64 " start to resume stream from pause", resumeReq.name, pStream->uid); if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { sdbRelease(pMnode->pSdb, pStream); diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index bb666eb6dd..ef0b3fa9cc 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -854,8 +854,7 @@ int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId) { return TSDB_CODE_MND_STREAM_NOT_EXIST; } -static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { - int8_t status = atomic_load_8(&pStream->status); +static void mndShowStreamStatus(char *dst, int8_t status) { if (status == STREAM_STATUS__NORMAL) { strcpy(dst, "ready"); } else if (status == STREAM_STATUS__STOP) { @@ -891,6 +890,41 @@ static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) { varDataSetLen(pBuf, len + 2); } +static int32_t isAllTaskPaused(SStreamObj *pStream, bool *pRes) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SStreamTaskIter *pIter = NULL; + bool isPaused = true; + + taosRLockLatch(&pStream->lock); + code = createStreamTaskIter(pStream, &pIter); + TSDB_CHECK_CODE(code, lino, _end); + + while (streamTaskIterNextTask(pIter)) { + SStreamTask *pTask = NULL; + code = streamTaskIterGetCurrent(pIter, &pTask); + TSDB_CHECK_CODE(code, lino, _end); + + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); + if (pe == NULL) { + continue; + } + if (pe->status != TASK_STATUS__PAUSE) { + isPaused = false; + } + } + (*pRes) = isPaused; + +_end: + destroyStreamTaskIter(pIter); + taosRUnLockLatch(&pStream->lock); + if (code != TSDB_CODE_SUCCESS) { + mError("error happens when get stream status, lino:%d, code:%s", lino, tstrerror(code)); + } + return code; +} + int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows) { int32_t code = 0; int32_t cols = 0; @@ -939,7 +973,15 @@ int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_ char status[20 + VARSTR_HEADER_SIZE] = {0}; char status2[20] = {0}; - mndShowStreamStatus(status2, pStream); + bool isPaused = false; + code = isAllTaskPaused(pStream, &isPaused); + TSDB_CHECK_CODE(code, lino, _end); + + int8_t streamStatus = atomic_load_8(&pStream->status); + if (isPaused) { + streamStatus = STREAM_STATUS__PAUSE; + } + mndShowStreamStatus(status2, streamStatus); STR_WITH_MAXSIZE_TO_VARSTR(status, status2, sizeof(status)); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); TSDB_CHECK_NULL(pColInfo, code, lino, _end, terrno); diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index 1f4caf5c03..66d26dad49 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -398,4 +398,166 @@ endi print ===== step5 over +print ===== step6 +sql drop database if exists test6; +sql create database test7 vgroups 1; +sql use test7; +sql create stable st(ts timestamp, a int, b int , c int, d double) tags(ta int,tb int,tc int); +sql create table ts1 using st tags(1,1,1); + +sql create stream streams8 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt8 as select _wstart, count(*) c1 from st interval(10s); +sql create stream streams9 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt9 as select _wstart, count(*) c1 from st partition by tbname interval(10s); + +run tsim/stream/checkTaskStatus.sim + +$loop_count = 0 +loop7: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + return -1 +endi + +sleep 500 + +sql select status, * from information_schema.ins_streams where status != "ready"; + +if $rows != 0 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop7 +endi + +sql pause stream streams8; + +sql pause stream streams9; + +sql pause stream streams8; + +sql pause stream streams9; + +sleep 1000 + +sql pause stream streams8; + +sql pause stream streams9; + +sleep 1000 + +$loop_count = 0 +loop80: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + print pause stream failed + goto end_step_6 +endi + +sleep 1000 + +sql select status, * from information_schema.ins_stream_tasks where status != "paused"; + +if $rows != 2 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop80 +endi + +$loop_count = 0 +loop8: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + return -1 +endi + +sleep 1000 + +sql select status, * from information_schema.ins_streams where status == "paused"; + +if $rows != 2 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop8 +endi + + +sql resume stream streams8; + +sql resume stream streams9; + +sql resume stream streams8; + +sql resume stream streams9; + +sleep 1000 + +sql resume stream streams8; + +sql resume stream streams9; + +sleep 1000 + + +$loop_count = 0 +loop90: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + print pause stream failed + goto end_step_6 +endi + +sleep 1000 + +sql select status, * from information_schema.ins_stream_tasks where status == "paused"; + +if $rows != 0 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop90 +endi + +$loop_count = 0 +loop9: + +$loop_count = $loop_count + 1 +if $loop_count == 40 then + return -1 +endi + +sleep 1000 + +sql select status, * from information_schema.ins_streams where status != "paused"; + +if $rows != 2 then + print $data00 $data01 $data02 $data03 $data04 + print $data10 $data11 $data12 $data13 $data14 + print $data20 $data21 $data22 $data23 $data24 + print $data30 $data31 $data32 $data33 $data34 + print $data40 $data41 $data42 $data43 $data44 + print $data50 $data51 $data52 $data53 $data54 + goto loop9 +endi + +end_step_6: + +print ===== step6 over + system sh/stop_dnodes.sh