diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index f407290c00..6e191e412d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -140,8 +140,6 @@ typedef enum EStreamTaskEvent { TASK_EVENT_RESUME = 0x9, TASK_EVENT_HALT = 0xA, TASK_EVENT_DROPPING = 0xB, - TASK_EVENT_SCAN_TSDB = 0xC, - TASK_EVENT_SCAN_WAL = 0xD, } EStreamTaskEvent; typedef struct { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6899a08602..a870f9d000 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1156,6 +1156,50 @@ static int32_t initStreamNodeList(SMnode* pMnode) { return taosArrayGetSize(execInfo.pNodeList); } +static bool taskNodeIsUpdated(SMnode* pMnode) { + // check if the node update happens or not + taosThreadMutexLock(&execInfo.lock); + int32_t numOfNodes = initStreamNodeList(pMnode); + + if (numOfNodes == 0) { + mDebug("stream task node change checking done, no vgroups exist, do nothing"); + execInfo.ts = taosGetTimestampSec(); + taosThreadMutexUnlock(&execInfo.lock); + return false; + } + + for (int32_t i = 0; i < numOfNodes; ++i) { + SNodeEntry *pNodeEntry = taosArrayGet(execInfo.pNodeList, i); + if (pNodeEntry->stageUpdated) { + mDebug("stream task not ready due to node update detected, checkpoint not issued"); + taosThreadMutexUnlock(&execInfo.lock); + return true; + } + } + + bool allReady = true; + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); + if (!allReady) { + mWarn("not all vnodes ready"); + taosArrayDestroy(pNodeSnapshot); + taosThreadMutexUnlock(&execInfo.lock); + return 0; + } + + SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); + bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); + taosArrayDestroy(changeInfo.pUpdateNodeList); + taosHashCleanup(changeInfo.pDBMap); + taosArrayDestroy(pNodeSnapshot); + + if (nodeUpdated) { + mDebug("stream task not ready due to node update"); + } + + taosThreadMutexUnlock(&execInfo.lock); + return nodeUpdated; +} + static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SMnode * pMnode = pReq->info.node; SSdb * pSdb = pMnode->pSdb; @@ -1163,50 +1207,24 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { SStreamObj *pStream = NULL; int32_t code = 0; - { // check if the node update happens or not - int64_t ts = taosGetTimestampSec(); - - taosThreadMutexLock(&execInfo.lock); - int32_t numOfNodes = initStreamNodeList(pMnode); - taosThreadMutexUnlock(&execInfo.lock); - - if (numOfNodes == 0) { - mDebug("stream task node change checking done, no vgroups exist, do nothing"); - execInfo.ts = ts; - return 0; - } - - for(int32_t i = 0; i < numOfNodes; ++i) { - SNodeEntry* pNodeEntry = taosArrayGet(execInfo.pNodeList, i); - if (pNodeEntry->stageUpdated) { - mDebug("stream task not ready due to node update detected, checkpoint not issued"); - return 0; - } - } - - bool allReady = true; - SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode, &allReady); - if (!allReady) { - mWarn("not all vnodes are ready, ignore the checkpoint") taosArrayDestroy(pNodeSnapshot); - return 0; - } - - SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot); - bool nodeUpdated = (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0); - taosArrayDestroy(changeInfo.pUpdateNodeList); - taosHashCleanup(changeInfo.pDBMap); - taosArrayDestroy(pNodeSnapshot); - - if (nodeUpdated) { - mDebug("stream task not ready due to node update, checkpoint not issued"); - return 0; - } + // check if the node update happens or not + bool updated = taskNodeIsUpdated(pMnode); + if (updated) { + mWarn("checkpoint ignore, stream task nodes update detected"); + return -1; } { // check if all tasks are in TASK_STATUS__READY status bool ready = true; - taosThreadMutexLock(&execInfo.lock); + + // no streams exists, abort + int32_t numOfTasks = taosArrayGetSize(execInfo.pTaskList); + if (numOfTasks <= 0) { + taosThreadMutexUnlock(&execInfo.lock); + return 0; + } + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId * p = taosArrayGet(execInfo.pTaskList, i); STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); @@ -1762,6 +1780,12 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { return -1; } + bool updated = taskNodeIsUpdated(pMnode); + if (updated) { + mError("tasks are not ready for pause, node update detected"); + return -1; + } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream"); if (pTrans == NULL) { mError("stream:%s, failed to pause stream since %s", pauseReq.name, terrstr());