feature/qnode
This commit is contained in:
parent
4366d24c2f
commit
77898958bb
|
@ -160,5 +160,6 @@ void mmInitMsgHandles(SMgmtWrapper *pWrapper) {
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_CONTINUE, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_FETCH, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
||||||
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
dndSetMsgHandle(pWrapper, TDMT_VND_DROP_TASK, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
||||||
|
dndSetMsgHandle(pWrapper, TDMT_VND_QUERY_HEARTBEAT, (NodeMsgFp)mmProcessReadMsg, MND_VGID);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -92,8 +92,9 @@ int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) {
|
||||||
|
|
||||||
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
||||||
SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
SSingleWorkerCfg cfg = {.minNum = 0, .maxNum = 1, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||||
|
SSingleWorkerCfg readCfg = {.minNum = 2, .maxNum = 2, .name = "mnode-read", .fp = (FItem)mmProcessQueue, .param = pMgmt};
|
||||||
|
|
||||||
if (tSingleWorkerInit(&pMgmt->readWorker, &cfg) != 0) {
|
if (tSingleWorkerInit(&pMgmt->readWorker, &readCfg) != 0) {
|
||||||
dError("failed to start mnode-read worker since %s", terrstr());
|
dError("failed to start mnode-read worker since %s", terrstr());
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1626,8 +1626,9 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId) {
|
||||||
SRpcCtx rpcCtx = {0};
|
SRpcCtx rpcCtx = {0};
|
||||||
SSchTrans trans = {0};
|
SSchTrans trans = {0};
|
||||||
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT;
|
int32_t msgType = TDMT_VND_QUERY_HEARTBEAT;
|
||||||
|
|
||||||
|
req.header.vgId = htonl(nodeEpId->nodeId);
|
||||||
req.sId = schMgmt.sId;
|
req.sId = schMgmt.sId;
|
||||||
req.header.vgId = nodeEpId->nodeId;
|
|
||||||
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
|
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
|
||||||
|
|
||||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
|
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
|
||||||
|
|
Loading…
Reference in New Issue