From b4419bda659baf9a66299546a89971fe49036db1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 20 Sep 2023 10:35:01 +0800 Subject: [PATCH] enh(stream): add node stage check. --- include/libs/stream/tstream.h | 2 + source/dnode/mnode/impl/src/mndStream.c | 63 ++++++++++++++++++------- source/libs/stream/src/streamMeta.c | 21 +++++---- 3 files changed, 61 insertions(+), 25 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 5329da2f17..1dd016526f 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -556,6 +556,8 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea typedef struct STaskStatusEntry { STaskId id; int32_t status; + int32_t stage; + int32_t nodeId; } STaskStatusEntry; typedef struct SStreamHbMsg { diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3018326c74..c442faf69d 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -37,17 +37,18 @@ typedef struct SNodeEntry { int32_t nodeId; + bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot. SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. int64_t hbTimestamp; // second } SNodeEntry; -typedef struct SStreamVnodeRevertIndex { +typedef struct SStreamExecNodeInfo { SArray *pNodeEntryList; int64_t ts; // snapshot ts SHashObj *pTaskMap; SArray *pTaskList; TdThreadMutex lock; -} SStreamVnodeRevertIndex; +} SStreamExecNodeInfo; typedef struct SVgroupChangeInfo { SHashObj *pDBMap; @@ -55,7 +56,7 @@ typedef struct SVgroupChangeInfo { } SVgroupChangeInfo; static int32_t mndNodeCheckSentinel = 0; -static SStreamVnodeRevertIndex execNodeList; +static SStreamExecNodeInfo execNodeList; static int32_t mndStreamActionInsert(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); @@ -75,7 +76,6 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in int64_t streamId, int32_t taskId); static int32_t mndProcessNodeCheck(SRpcMsg *pReq); static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg); -static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode); static SArray *doExtractNodeListFromStream(SMnode *pMnode); static SArray *mndTakeVgroupSnapshot(SMnode *pMnode); @@ -83,8 +83,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans); static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset); -static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode); -static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode); +static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode); +static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode); int32_t mndInitStream(SMnode *pMnode) { SSdbTable table = { @@ -1158,12 +1158,19 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } if (taosArrayGetSize(execNodeList.pNodeEntryList) == 0) { - mDebug("end to do stream task node change checking, no vgroup exists, do nothing"); + mDebug("stream task node change checking done, no vgroups exist, do nothing"); execNodeList.ts = ts; - atomic_store_32(&mndNodeCheckSentinel, 0); return 0; } + for(int32_t i = 0; i < taosArrayGetSize(execNodeList.pNodeEntryList); ++i) { + SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, i); + if (pNodeEntry->stageUpdated) { + mDebug("stream task not ready due to node update detected, checkpoint not issued"); + return 0; + } + } + SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); @@ -1173,7 +1180,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { taosArrayDestroy(pNodeSnapshot); if (nodeUpdated) { - mDebug("stream task not ready due to node update, not generate checkpoint"); + mDebug("stream task not ready due to node update, checkpoint not issued"); return 0; } } @@ -1190,7 +1197,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { } if (pEntry->status != TASK_STATUS__NORMAL) { - mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, create checkpoint msg not issued", + mDebug("s-task:0x%" PRIx64 "-0x%x (nodeId:%d) status:%s not ready, checkpoint msg not issued", pEntry->id.streamId, (int32_t)pEntry->id.taskId, 0, streamGetTaskStatusStr(pEntry->status)); ready = false; break; @@ -2028,7 +2035,7 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP SNodeEntry *pCurrent = taosArrayGet(pNodeList, j); if (pCurrent->nodeId == pPrevEntry->nodeId) { - if (isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) { + if (pPrevEntry->stageUpdated || isNodeEpsetChanged(&pPrevEntry->epset, &pCurrent->epset)) { const SEp *pPrevEp = GET_ACTIVE_EP(&pPrevEntry->epset); char buf[256] = {0}; @@ -2202,6 +2209,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { SArray *pNodeSnapshot = mndTakeVgroupSnapshot(pMnode); + taosThreadMutexLock(&execNodeList.lock); SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot); if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) { code = mndProcessVgroupChange(pMnode, &changeInfo); @@ -2218,6 +2226,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) { taosArrayDestroy(pNodeSnapshot); } + taosThreadMutexUnlock(&execNodeList.lock); taosArrayDestroy(changeInfo.pUpdateNodeList); taosHashCleanup(changeInfo.pDBMap); @@ -2244,7 +2253,7 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) { return 0; } -void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNode) { +void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2256,8 +2265,11 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNod STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; void *p = taosHashGet(pExecNode->pTaskMap, &id, sizeof(id)); if (p == NULL) { - STaskStatusEntry entry = { - .id.streamId = pTask->id.streamId, .id.taskId = pTask->id.taskId, .status = TASK_STATUS__STOP}; + STaskStatusEntry entry = {.id.streamId = pTask->id.streamId, + .id.taskId = pTask->id.taskId, + .stage = -1, + .nodeId = pTask->info.nodeId, + .status = TASK_STATUS__STOP}; taosHashPut(pExecNode->pTaskMap, &id, sizeof(id), &entry, sizeof(entry)); taosArrayPush(pExecNode->pTaskList, &id); } @@ -2265,7 +2277,7 @@ void keepStreamTasksInBuf(SStreamObj *pStream, SStreamVnodeRevertIndex *pExecNod } } -void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecNode) { +void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode) { int32_t level = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < level; i++) { SArray *pLevel = taosArrayGetP(pStream->tasks, i); @@ -2296,9 +2308,8 @@ void removeStreamTasksInBuf(SStreamObj* pStream, SStreamVnodeRevertIndex* pExecN // todo: this process should be executed by the write queue worker of the mnode int32_t mndProcessStreamHb(SRpcMsg *pReq) { - SMnode *pMnode = pReq->info.node; + SMnode *pMnode = pReq->info.node; SStreamHbMsg req = {0}; - int32_t code = TSDB_CODE_SUCCESS; SDecoder decoder = {0}; tDecoderInit(&decoder, pReq->pCont, pReq->contLen); @@ -2326,11 +2337,29 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { continue; } + if (p->stage != pEntry->stage && pEntry->stage != -1) { + int32_t numOfNodes = taosArrayGetSize(execNodeList.pNodeEntryList); + for(int32_t j = 0; j < numOfNodes; ++j) { + SNodeEntry* pNodeEntry = taosArrayGet(execNodeList.pNodeEntryList, j); + if (pNodeEntry->nodeId == pEntry->nodeId) { + mInfo("vgId:%d stage updated, from %d to %d, nodeUpdate should be trigger by s-task:0x%" PRIx64, + pEntry->nodeId, pEntry->stage, p->stage, pEntry->id.taskId); + + pNodeEntry->stageUpdated = true; + pEntry->stage = p->stage; + break; + } + } + } else { + pEntry->stage = p->stage; + } + pEntry->status = p->status; if (p->status != TASK_STATUS__NORMAL) { mDebug("received s-task:0x%"PRIx64" not in ready status:%s", p->id.taskId, streamGetTaskStatusStr(p->status)); } } + taosThreadMutexUnlock(&execNodeList.lock); taosArrayDestroy(req.pTaskStatus); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 93f4b7c4dd..6f1c2a88ce 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -766,6 +766,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tEncodeI64(pEncoder, ps->id.streamId) < 0) return -1; if (tEncodeI32(pEncoder, ps->id.taskId) < 0) return -1; if (tEncodeI32(pEncoder, ps->status) < 0) return -1; + if (tEncodeI32(pEncoder, ps->stage) < 0) return -1; + if (tEncodeI32(pEncoder, ps->nodeId) < 0) return -1; } tEndEncode(pEncoder); return pEncoder->pos; @@ -778,15 +780,17 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { pReq->pTaskStatus = taosArrayInit(pReq->numOfTasks, sizeof(STaskStatusEntry)); for (int32_t i = 0; i < pReq->numOfTasks; ++i) { - STaskStatusEntry hb = {0}; - if (tDecodeI64(pDecoder, &hb.id.streamId) < 0) return -1; - int32_t taskId = 0; + int32_t taskId = 0; + STaskStatusEntry entry = {0}; + + if (tDecodeI64(pDecoder, &entry.id.streamId) < 0) return -1; if (tDecodeI32(pDecoder, &taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &entry.status) < 0) return -1; + if (tDecodeI32(pDecoder, &entry.stage) < 0) return -1; + if (tDecodeI32(pDecoder, &entry.nodeId) < 0) return -1; - hb.id.taskId = taskId; - if (tDecodeI32(pDecoder, &hb.status) < 0) return -1; - - taosArrayPush(pReq->pTaskStatus, &hb); + entry.id.taskId = taskId; + taosArrayPush(pReq->pTaskStatus, &entry); } tEndDecode(pDecoder); @@ -855,7 +859,8 @@ void metaHbToMnode(void* param, void* tmrId) { continue; } - STaskStatusEntry entry = {.id = *pId, .status = (*pTask)->status.taskStatus}; + STaskStatusEntry entry = { + .id = *pId, .status = (*pTask)->status.taskStatus, .nodeId = pMeta->vgId, .stage = pMeta->stage}; taosArrayPush(hbMsg.pTaskStatus, &entry); if (!hasValEpset) {