diff --git a/include/common/tmsg.h b/include/common/tmsg.h index b0ce6b42c9..7bc56daab0 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1127,6 +1127,7 @@ typedef struct { SQnodeLoad qload; SClusterCfg clusterCfg; SArray* pVloads; // array of SVnodeLoad + int32_t statusSeq; } SStatusReq; int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq); @@ -1148,6 +1149,7 @@ typedef struct { int64_t dnodeVer; SDnodeCfg dnodeCfg; SArray* pDnodeEps; // Array of SDnodeEp + int32_t statusSeq; } SStatusRsp; int32_t tSerializeSStatusRsp(void* buf, int32_t bufLen, SStatusRsp* pRsp); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index ef6f9246a4..4eaa934676 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1020,6 +1020,7 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tEncodeI64(&encoder, pReq->qload.timeInQueryQueue) < 0) return -1; if (tEncodeI64(&encoder, pReq->qload.timeInFetchQueue) < 0) return -1; + if (tEncodeI32(&encoder, pReq->statusSeq) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1095,6 +1096,7 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) { if (tDecodeI64(&decoder, &pReq->qload.timeInQueryQueue) < 0) return -1; if (tDecodeI64(&decoder, &pReq->qload.timeInFetchQueue) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->statusSeq) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); return 0; @@ -1126,6 +1128,7 @@ int32_t tSerializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { if (tEncodeU16(&encoder, pDnodeEp->ep.port) < 0) return -1; } + if (tEncodeI32(&encoder, pRsp->statusSeq) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -1167,6 +1170,7 @@ int32_t tDeserializeSStatusRsp(void *buf, int32_t bufLen, SStatusRsp *pRsp) { } } + if (tDecodeI32(&decoder, &pRsp->statusSeq) < 0) return -1; tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index dc4412b77b..c776beb3f0 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -36,6 +36,7 @@ typedef struct SDnodeMgmt { GetVnodeLoadsFp getVnodeLoadsFp; GetMnodeLoadsFp getMnodeLoadsFp; GetQnodeLoadsFp getQnodeLoadsFp; + int32_t statusSeq; } SDnodeMgmt; // dmHandle.c diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index f12dce5149..85a09b79fd 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -32,9 +32,13 @@ static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) { } static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { + const STraceId *trace = &pRsp->info.traceId; + dGTrace("status rsp received from mnode, statusSeq:%d code:0x%x", pMgmt->statusSeq, pRsp->code); + if (pRsp->code != 0) { if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->pData->dropped && pMgmt->pData->dnodeId > 0) { - dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->pData->dnodeId); + dGInfo("dnode:%d, set to dropped since not exist in mnode, statusSeq:%d", pMgmt->pData->dnodeId, + pMgmt->statusSeq); pMgmt->pData->dropped = 1; dmWriteEps(pMgmt->pData); } @@ -42,9 +46,9 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) { SStatusRsp statusRsp = {0}; if (pRsp->pCont != NULL && pRsp->contLen > 0 && tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { - dTrace("status msg received from mnode, dnodeVer:%" PRId64 " saved:%" PRId64, statusRsp.dnodeVer, - pMgmt->pData->dnodeVer); if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) { + dGInfo("status rsp received from mnode, statusSeq:%d:%d dnodeVer:%" PRId64 ":%" PRId64, pMgmt->statusSeq, + statusRsp.statusSeq, pMgmt->pData->dnodeVer, statusRsp.dnodeVer); pMgmt->pData->dnodeVer = statusRsp.dnodeVer; dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg); dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps); @@ -91,6 +95,9 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { (*pMgmt->getQnodeLoadsFp)(&req.qload); + pMgmt->statusSeq++; + req.statusSeq = pMgmt->statusSeq; + int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); void *pHead = rpcMallocCont(contLen); tSerializeSStatusReq(pHead, contLen, &req); @@ -99,13 +106,13 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527}; SRpcMsg rpcRsp = {0}; - dTrace("send status msg to mnode, dnodeVer:%" PRId64, req.dnodeVer); + dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq); SEpSet epSet = {0}; dmGetMnodeEpSet(pMgmt->pData, &epSet); rpcSendRecv(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp); if (rpcRsp.code != 0) { - dError("failed to send status msg since %s, numOfEps:%d inUse:%d", tstrerror(rpcRsp.code), epSet.numOfEps, + dError("failed to send status req since %s, numOfEps:%d inUse:%d", tstrerror(rpcRsp.code), epSet.numOfEps, epSet.inUse); for (int32_t i = 0; i < epSet.numOfEps; ++i) { dDebug("index:%d, mnode ep:%s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 9d91420656..04f340b0ff 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -345,6 +345,19 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { } } + int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); + int64_t curMs = taosGetTimestampMs(); + bool online = mndIsDnodeOnline(pDnode, curMs); + bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer); + bool reboot = (pDnode->rebootTime != statusReq.rebootTime); + bool needCheck = !online || dnodeChanged || reboot; + + pDnode->accessTimes++; + pDnode->lastAccessTime = curMs; + const STraceId *trace = &pReq->info.traceId; + mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id, + pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq); + for (int32_t v = 0; v < taosArrayGetSize(statusReq.pVloads); ++v) { SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v); @@ -396,18 +409,6 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { mndReleaseQnode(pMnode, pQnode); } - int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE); - int64_t curMs = taosGetTimestampMs(); - bool online = mndIsDnodeOnline(pDnode, curMs); - bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer); - bool reboot = (pDnode->rebootTime != statusReq.rebootTime); - bool needCheck = !online || dnodeChanged || reboot; - - pDnode->accessTimes++; - pDnode->lastAccessTime = curMs; - mTrace("dnode:%d, status received, access times:%d check:%d online:%d reboot:%d changed:%d", pDnode->id, - pDnode->accessTimes, needCheck, online, reboot, dnodeChanged); - if (needCheck) { if (statusReq.sver != tsVersion) { if (pDnode != NULL) { @@ -455,6 +456,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) { pDnode->memTotal = statusReq.memTotal; SStatusRsp statusRsp = {0}; + statusRsp.statusSeq++; statusRsp.dnodeVer = dnodeVer; statusRsp.dnodeCfg.dnodeId = pDnode->id; statusRsp.dnodeCfg.clusterId = pMnode->clusterId;