enh(stream): update the hb info.

This commit is contained in:
Haojun Liao 2023-07-28 14:23:53 +08:00
parent 74834aea11
commit 9f943213ed
5 changed files with 32 additions and 17 deletions

View File

@ -352,6 +352,11 @@ struct SStreamTask {
SSHashObj* pNameMap; SSHashObj* pNameMap;
}; };
typedef struct SMgmtInfo {
SEpSet epset;
int32_t mnodeId;
} SMgmtInfo;
// meta // meta
typedef struct SStreamMeta { typedef struct SStreamMeta {
char* path; char* path;
@ -371,6 +376,7 @@ typedef struct SStreamMeta {
SHashObj* pTaskBackendUnique; SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex; TdThreadMutex backendMutex;
tmr_h hbTmr; tmr_h hbTmr;
SMgmtInfo mgmtInfo;
int32_t chkptNotReadyTasks; int32_t chkptNotReadyTasks;
SArray* checkpointSaved; SArray* checkpointSaved;
@ -484,6 +490,7 @@ typedef struct {
int64_t checkpointId; int64_t checkpointId;
int32_t taskId; int32_t taskId;
int32_t nodeId; int32_t nodeId;
SEpSet mgmtEps;
int32_t mnodeId; int32_t mnodeId;
int64_t expireTime; int64_t expireTime;
} SStreamCheckpointSourceReq; } SStreamCheckpointSourceReq;

View File

@ -1668,6 +1668,11 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
// set the initial value for generating check point // set the initial value for generating check point
int32_t total = 0; int32_t total = 0;
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
// set the mgmt epset info according to the checkout source msg from mnode, todo opt perf
pMeta->mgmtInfo.epset = req.mgmtEps;
pMeta->mgmtInfo.mnodeId = req.mnodeId;
if (pMeta->chkptNotReadyTasks == 0) { if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList); pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);
} }

View File

@ -21,6 +21,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pReq->mgmtEps) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
@ -33,6 +34,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pReq->mgmtEps) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);

View File

@ -16,6 +16,7 @@
#include "streamInt.h" #include "streamInt.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "tmisce.h"
#define MAX_BLOCK_NAME_NUM 1024 #define MAX_BLOCK_NAME_NUM 1024
#define DISPATCH_RETRY_INTERVAL_MS 300 #define DISPATCH_RETRY_INTERVAL_MS 300

View File

@ -542,21 +542,21 @@ void metaHbToMnode(void* param, void* tmrId) {
return; return;
} }
// ((SMsgHead*)buf)->vgId = htonl(nodeId); ((SMsgHead*)buf)->vgId = htonl(pMeta->mgmtInfo.mnodeId);
// void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); void* pBuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
//
// SEncoder encoder; SEncoder encoder;
// tEncoderInit(&encoder, pBuf, tlen); tEncoderInit(&encoder, pBuf, tlen);
// if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) { if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
// rpcFreeCont(buf); rpcFreeCont(buf);
// qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code)); qError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
// return; return;
// } }
// tEncoderClear(&encoder); tEncoderClear(&encoder);
//
// SRpcMsg msg = {0}; SRpcMsg msg = {0};
// initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead)); initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead));
// qDebug("vgId:%d, send hb to mnode", nodeId); qDebug("vgId:%d, send hb to mnode", pMeta->mgmtInfo.mnodeId);
//
// tmsgSendReq(pEpSet, &msg); tmsgSendReq(&pMeta->mgmtInfo.epset, &msg);
} }