diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3929e897e6..283d3fde38 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1119,36 +1119,30 @@ static const char *mndGetStreamDB(SMnode *pMnode) { return p; } -static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; - SSdb *pSdb = pMnode->pSdb; - void *pIter = NULL; - SStreamObj *pStream = NULL; - int32_t code = 0; +static int32_t mndCheckNodeStatus(SMnode *pMnode) { + bool ready = true; + // check if the node update happens or not + int64_t ts = taosGetTimestampSec(); - { // check if the node update happens or not - int64_t ts = taosGetTimestampSec(); - - if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) { - if (execNodeList.pNodeEntryList != NULL) { - execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); - } - - execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode); + if (execNodeList.pNodeEntryList == NULL || (taosArrayGetSize(execNodeList.pNodeEntryList) == 0)) { + if (execNodeList.pNodeEntryList != NULL) { + execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList); } - if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { - mDebug("stream task node change checking done, no vgroups exist, do nothing"); - execNodeList.ts = ts; - return 0; - } + execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode); + } - for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { - SNodeEntry *pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i); - if (pNodeEntry->stageUpdated) { - mDebug("stream task not ready due to node update detected, checkpoint not issued"); - return 0; - } + if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { + mDebug("stream task node change checking done, no vgroups exist, do nothing"); + execNodeList.ts = ts; + return -1; + } + + for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { + SNodeEntry *pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i); + if (pNodeEntry->stageUpdated) { + mDebug("stream task not ready due to node update detected, checkpoint not issued"); + return -1; } SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); @@ -1161,34 +1155,39 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { if (nodeUpdated) { mDebug("stream task not ready due to node update, checkpoint not issued"); - return 0; + return -1; } } - { // check if all tasks are in TASK_STATUS__NORMAL status - bool ready = true; + // check if all tasks are in TASK_STATUS__NORMAL status - taosThreadMutexLock(&execNodeList.lock); - for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { - STaskId *p = taosArrayGet(execNodeList.pTaskList, i); - STaskStatusEntry *pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p)); - if (pEntry == NULL) { - continue; - } - - if (pEntry->status != TASK_STATUS__NORMAL) { - mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", - pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status)); - ready = false; - break; - } + taosThreadMutexLock(&execNodeList.lock); + for (int32_t i = 0; i < taosArrayGetSize(execNodeList.pTaskList); ++i) { + STaskId *p = taosArrayGet(execNodeList.pTaskList, i); + STaskStatusEntry *pEntry = taosHashGet(execNodeList.pTaskMap, p, sizeof(*p)); + if (pEntry == NULL) { + continue; } - taosThreadMutexUnlock(&execNodeList.lock); - if (!ready) { - return 0; + if (pEntry->status != TASK_STATUS__NORMAL) { + mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", + pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status)); + ready = false; + break; } } + taosThreadMutexUnlock(&execNodeList.lock); + return ready == true ? 0 : -1; +} +static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SStreamObj *pStream = NULL; + int32_t code = 0; + if ((code = mndCheckNodeStatus(pMnode)) != 0) { + return code; + } SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont; int64_t checkpointId = pMsg->checkpointId;