diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 06000e32b9..62e10255e9 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -549,6 +549,14 @@ typedef struct { int32_t tEncodeCompleteHistoryDataMsg(SEncoder* pEncoder, const SStreamCompleteHistoryMsg* pReq); int32_t tDecodeCompleteHistoryDataMsg(SDecoder* pDecoder, SStreamCompleteHistoryMsg* pReq); +typedef struct { + int32_t nodeId; + SEpSet epset; +} SStreamTaskUpdateInfo; + +int32_t tEncodeTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg); +int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg); + typedef struct { int64_t streamId; int32_t downstreamTaskId; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 46675e4e26..2426e81e3f 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1776,7 +1776,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t nodeId = 0; SEpSet newEpSet = {0}; - { // check all streams that involved this vnode + { // check all streams that involved this vnode should update the epset info SStreamObj *pStream = NULL; void *pIter = NULL; while (1) { @@ -1796,7 +1796,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { for (int32_t k = 0; k < numOfTasks; ++k) { SStreamTask *pTask = taosArrayGetP(pLevel, k); if (pTask->info.nodeId == nodeId) { - //pTask->info.epSet = 0; set the new epset + pTask->info.epSet = newEpSet; continue; } @@ -1813,6 +1813,39 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) { } } + { // build trans to update the epset + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, "stream-checkpoint"); + if (pTrans == NULL) { + mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY)); + return -1; + } +// mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId); + + mndTransSetDbName(pTrans, "checkpoint", "checkpoint"); + if (mndTransCheckConflict(pMnode, pTrans) != 0) { + mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, + tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); + mndTransDrop(pTrans); + return -1; + } + + void* pBuf = NULL; + int32_t len = 0; +// doBuildStreamTaskUpdateMsg(&pBuf, &len, nodeId, newEpSet); + + STransAction action = {0}; + action.epSet = /*mndGetVgroupEpset(pMnode, pVgObj)*/; + action.pCont = pBuf; + action.contLen = len; + action.msgType = TDMT_VND_STREAM_CHECK_POINT_SOURCE; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pBuf); + taosWUnLockLatch(&pStream->lock); + return -1; + } + } taosWUnLockLatch(&pStream->lock); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7b5dc2fdf4..a5820ea8b0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -958,3 +958,19 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { num); return 0; } + +int32_t tEncodeTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskUpdateInfo* pMsg) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI32(pEncoder, pMsg->nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pMsg->epset) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskUpdateInfo* pMsg) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI32(pDecoder, &pMsg->nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pMsg->epset) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} \ No newline at end of file