From 9f943213ed3d7439fbbeecc8bc66111d43257b36 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 28 Jul 2023 14:23:53 +0800 Subject: [PATCH] enh(stream): update the hb info. --- include/libs/stream/tstream.h | 7 +++++ source/dnode/vnode/src/tq/tq.c | 5 ++++ source/libs/stream/src/streamCheckpoint.c | 2 ++ source/libs/stream/src/streamDispatch.c | 1 + source/libs/stream/src/streamMeta.c | 34 +++++++++++------------ 5 files changed, 32 insertions(+), 17 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e5f7093aef..c537c92ca3 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -352,6 +352,11 @@ struct SStreamTask { SSHashObj* pNameMap; }; +typedef struct SMgmtInfo { + SEpSet epset; + int32_t mnodeId; +} SMgmtInfo; + // meta typedef struct SStreamMeta { char* path; @@ -371,6 +376,7 @@ typedef struct SStreamMeta { SHashObj* pTaskBackendUnique; TdThreadMutex backendMutex; tmr_h hbTmr; + SMgmtInfo mgmtInfo; int32_t chkptNotReadyTasks; SArray* checkpointSaved; @@ -484,6 +490,7 @@ typedef struct { int64_t checkpointId; int32_t taskId; int32_t nodeId; + SEpSet mgmtEps; int32_t mnodeId; int64_t expireTime; } SStreamCheckpointSourceReq; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d9355d26da..37acd397d4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1668,6 +1668,11 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) { // set the initial value for generating check point int32_t total = 0; taosWLockLatch(&pMeta->lock); + + // set the mgmt epset info according to the checkout source msg from mnode, todo opt perf + pMeta->mgmtInfo.epset = req.mgmtEps; + pMeta->mgmtInfo.mnodeId = req.mnodeId; + if (pMeta->chkptNotReadyTasks == 0) { pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 352561bb43..45fce8b62c 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -21,6 +21,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1; if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; tEndEncode(pEncoder); @@ -33,6 +34,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; tEndDecode(pDecoder); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 0792ed208b..7b5dc2fdf4 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -16,6 +16,7 @@ #include "streamInt.h" #include "trpc.h" #include "ttimer.h" +#include "tmisce.h" #define MAX_BLOCK_NAME_NUM 1024 #define DISPATCH_RETRY_INTERVAL_MS 300 diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 67b81bc291..8e2b21a88c 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -542,21 +542,21 @@ void metaHbToMnode(void* param, void* tmrId) { return; } -// ((SMsgHead*)buf)->vgId = htonl(nodeId); -// void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); -// -// SEncoder encoder; -// tEncoderInit(&encoder, pBuf, tlen); -// if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { -// rpcFreeCont(buf); -// qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); -// return; -// } -// tEncoderClear(&encoder); -// -// SRpcMsg msg = {0}; -// initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead)); -// qDebug("vgId:%d, send hb to mnode", nodeId); -// -// tmsgSendReq(pEpSet, &msg); + ((SMsgHead*)buf)->vgId = htonl(pMeta->mgmtInfo.mnodeId); + void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, pBuf, tlen); + if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { + rpcFreeCont(buf); + qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); + return; + } + tEncoderClear(&encoder); + + SRpcMsg msg = {0}; + initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead)); + qDebug("vgId:%d, send hb to mnode", pMeta->mgmtInfo.mnodeId); + + tmsgSendReq(&pMeta->mgmtInfo.epset, &msg); } \ No newline at end of file