diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 902674b9dd..d9a7d3a456 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -532,6 +532,7 @@ int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointRea typedef struct { int32_t vgId; + SEpSet epset; int32_t numOfTasks; } SStreamHbMsg; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3488df8dc0..dfbc6e92be 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -27,6 +27,7 @@ #include "mndVgroup.h" #include "parser.h" #include "tname.h" +#include "tmisce.h" #define MND_STREAM_VER_NUMBER 3 #define MND_STREAM_RESERVE_SIZE 64 @@ -1783,12 +1784,13 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3 return -1; } - mDebug("start to build stream:0x%"PRIx64" task DAG update", pStream->uid); + mDebug("start to build stream:0x%" PRIx64 " task DAG update", pStream->uid); ASSERT(0); -// mndTransSetDbName(pTrans, "stream-task-update", "checkpoint"); + // mndTransSetDbName(pTrans, "stream-task-update", "checkpoint"); if (mndTransCheckConflict(pMnode, pTrans) != 0) { - mError("failed to build stream:0x%"PRIx64" task DAG update, code:%s", pStream->uid, tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); + mError("failed to build stream:0x%" PRIx64 " task DAG update, code:%s", pStream->uid, + tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); mndTransDrop(pTrans); return -1; } @@ -1818,7 +1820,6 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3 taosWUnLockLatch(&pStream->lock); return -1; } - } } @@ -1843,6 +1844,35 @@ static int32_t createStreamUpdateTrans(SMnode* pMnode, SStreamObj* pStream, int3 return TSDB_CODE_SUCCESS; } +static int32_t updateTaskEpInfo(SStreamObj* pStream, int32_t nodeId, SEpSet* pEpSet) { + int32_t numOfLevels = 0; + for (int32_t j = 0; j < numOfLevels; ++j) { + SArray *pLevel = taosArrayGetP(pStream->tasks, j); + + int32_t numOfTasks = taosArrayGetSize(pLevel); + for (int32_t k = 0; k < numOfTasks; ++k) { + SStreamTask *pTask = taosArrayGetP(pLevel, k); + if (pTask->info.nodeId == nodeId) { + pTask->info.epSet = *pEpSet; + continue; + } + + // check for the dispath info and the upstream task info + int32_t level = pTask->info.taskLevel; + if (level == TASK_LEVEL__SOURCE) { + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + } else if (level == TASK_LEVEL__AGG) { + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet); + } else { // TASK_LEVEL__SINK + streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet); + } + } + } + return 0; + +} + // todo: handle the database drop/stream drop case int32_t mndProcessStreamHb(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; @@ -1862,7 +1892,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { int64_t now = taosGetTimestampSec(); - // only handle the vnode transfer case. + // timeout list + bool nodeChanged = false; SArray* pList = taosArrayInit(4, sizeof(int32_t)); // record the timeout node @@ -1876,13 +1907,25 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { continue; } + pEntry->hbTimestamp = now; + // check epset to identify whether the node has been transferred to other dnodes. - // 1. if the epset is changed - taosArrayPush(pList, &pEntry); + // 1. node the epset is changed, which means the node transfer has occurred for this node. + if (!isEpsetEqual(&pEntry->epset, &req.epset)) { + nodeChanged = true; + break; + } } - int32_t nodeId = 0; - SEpSet newEpSet = {0}; + // todo handle the node timeout case. + + // handle the node changed case + if (!nodeChanged) { + return TSDB_CODE_SUCCESS; + } + + int32_t nodeId = req.vgId; + SEpSet newEpSet = req.epset; { // check all streams that involved this vnode should update the epset info SStreamObj *pStream = NULL; @@ -1936,7 +1979,6 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { // } // } // mndTransDrop(pTrans); - return code; mTrace("receive stream-meta hb from vgId:%d, active numOfTasks:%d", req.vgId, req.numOfTasks); return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index d45eac58e9..8f58843c0a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -553,6 +553,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) { if (tStartEncode(pEncoder) < 0) return -1; if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->numOfTasks) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pReq->epset) < 0) return -1; tEndEncode(pEncoder); return pEncoder->pos; } @@ -561,6 +562,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) { if (tStartDecode(pDecoder) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pReq->epset) < 0) return -1; tEndDecode(pDecoder); return 0; }