enh(stream): add function handler for hb message.
This commit is contained in:
parent
af3a87560a
commit
a0dcec5890
|
@ -54,6 +54,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
|
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
|
static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
|
static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq);
|
||||||
|
static int32_t mndProcessStreamHb(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);
|
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
|
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
|
||||||
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
|
@ -92,6 +93,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHECKPOINT_TIMER, mndProcessStreamCheckpointTmr);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamDoCheckpoint);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
|
||||||
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
|
||||||
|
|
|
@ -93,7 +93,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
pMeta->expandFunc = expandFunc;
|
pMeta->expandFunc = expandFunc;
|
||||||
|
|
||||||
// send heartbeat every 20sec.
|
// send heartbeat every 20sec.
|
||||||
// pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, pMeta, streamEnv.timer);
|
// pMeta->hbTmr = taosTmrStart(metaHbToMnode, 20000, ahandle, streamEnv.timer);
|
||||||
|
|
||||||
pMeta->pTaskBackendUnique =
|
pMeta->pTaskBackendUnique =
|
||||||
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
@ -549,9 +549,17 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void metaHbToMnode(void* param, void* tmrId) {
|
void metaHbToMnode(void* param, void* tmrId) {
|
||||||
SStreamMeta* pMeta = param;
|
STQ* pMeta = param;
|
||||||
SStreamHbMsg hbMsg = {0};
|
SStreamHbMsg hbMsg = {0};
|
||||||
|
|
||||||
|
taosRLockLatch(&pMeta->lock);
|
||||||
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
taosRUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
hbMsg.numOfTasks = numOfTasks;
|
||||||
|
hbMsg.vgId = pMeta->vgId;
|
||||||
|
hbMsg.epset = ;
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
|
|
||||||
|
@ -581,7 +589,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
|
|
||||||
SRpcMsg msg = {0};
|
SRpcMsg msg = {0};
|
||||||
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead));
|
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen + sizeof(SMsgHead));
|
||||||
qDebug("vgId:%d, send hb to mnode", pMeta->mgmtInfo.mnodeId);
|
qDebug("vgId:%d, build and send hb to mnode", pMeta->mgmtInfo.mnodeId);
|
||||||
|
|
||||||
tmsgSendReq(&pMeta->mgmtInfo.epset, &msg);
|
tmsgSendReq(&pMeta->mgmtInfo.epset, &msg);
|
||||||
}
|
}
|
Loading…
Reference in New Issue