diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6b880582ce..e9047af7e3 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -614,7 +614,6 @@ typedef struct { typedef struct { int32_t vgId; int8_t role; - int8_t align[3]; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; @@ -622,24 +621,23 @@ typedef struct { } SVnodeLoad; typedef struct { - int32_t num; - SVnodeLoad data[]; -} SVnodeLoads; - -typedef struct { - int32_t sver; + int32_t mver; // msg version + int32_t sver; // software version + int64_t dver; // dnode table version in sdb int32_t dnodeId; int64_t clusterId; - int64_t dver; int64_t rebootTime; int64_t updateTime; int32_t numOfCores; int32_t numOfSupportVnodes; char dnodeEp[TSDB_EP_LEN]; SClusterCfg clusterCfg; - SVnodeLoads vnodeLoads; + SArray* pVloads; // array of SVnodeLoad } SStatusReq; +int32_t tSerializeSStatusReq(void** buf, SStatusReq* pReq); +void* tDeserializeSStatusReq(void* buf, SStatusReq* pReq); + typedef struct { int32_t reserved; } STransReq; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e4e9c62137..562e08635d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -459,3 +459,87 @@ void *tDeserializeSMAlterStbReq(void *buf, SMAltertbReq *pReq) { return buf; } + +int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) { + int32_t tlen = 0; + + // status + tlen += taosEncodeFixedI32(buf, pReq->mver); + tlen += taosEncodeFixedI32(buf, pReq->sver); + tlen += taosEncodeFixedI64(buf, pReq->dver); + tlen += taosEncodeFixedI32(buf, pReq->dnodeId); + tlen += taosEncodeFixedI64(buf, pReq->clusterId); + tlen += taosEncodeFixedI64(buf, pReq->rebootTime); + tlen += taosEncodeFixedI64(buf, pReq->updateTime); + tlen += taosEncodeFixedI32(buf, pReq->numOfCores); + tlen += taosEncodeFixedI32(buf, pReq->numOfSupportVnodes); + tlen += taosEncodeString(buf, pReq->dnodeEp); + + // cluster cfg + tlen += taosEncodeFixedI32(buf, pReq->clusterCfg.statusInterval); + tlen += taosEncodeFixedI64(buf, pReq->clusterCfg.checkTime); + tlen += taosEncodeString(buf, pReq->clusterCfg.timezone); + tlen += taosEncodeString(buf, pReq->clusterCfg.locale); + tlen += taosEncodeString(buf, pReq->clusterCfg.charset); + + // vnode loads + int32_t vlen = (int32_t)taosArrayGetSize(pReq->pVloads); + tlen += taosEncodeFixedI32(buf, vlen); + for (int32_t i = 0; i < vlen; ++i) { + SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i); + tlen += taosEncodeFixedI32(buf, pload->vgId); + tlen += taosEncodeFixedI8(buf, pload->role); + tlen += taosEncodeFixedI64(buf, pload->totalStorage); + tlen += taosEncodeFixedI64(buf, pload->compStorage); + tlen += taosEncodeFixedI64(buf, pload->pointsWritten); + tlen += taosEncodeFixedI64(buf, pload->tablesNum); + } + + return tlen; +} + +void *tDeserializeSStatusReq(void *buf, SStatusReq *pReq) { + // status + buf = taosDecodeFixedI32(buf, &pReq->mver); + buf = taosDecodeFixedI32(buf, &pReq->sver); + buf = taosDecodeFixedI64(buf, &pReq->dver); + buf = taosDecodeFixedI32(buf, &pReq->dnodeId); + buf = taosDecodeFixedI64(buf, &pReq->clusterId); + buf = taosDecodeFixedI64(buf, &pReq->rebootTime); + buf = taosDecodeFixedI64(buf, &pReq->updateTime); + buf = taosDecodeFixedI32(buf, &pReq->numOfCores); + buf = taosDecodeFixedI32(buf, &pReq->numOfSupportVnodes); + buf = taosDecodeStringTo(buf, pReq->dnodeEp); + + // cluster cfg + buf = taosDecodeFixedI32(buf, &pReq->clusterCfg.statusInterval); + buf = taosDecodeFixedI64(buf, &pReq->clusterCfg.checkTime); + buf = taosDecodeStringTo(buf, pReq->clusterCfg.timezone); + buf = taosDecodeStringTo(buf, pReq->clusterCfg.locale); + buf = taosDecodeStringTo(buf, pReq->clusterCfg.charset); + + // vnode loads + int32_t vlen = 0; + buf = taosDecodeFixedI32(buf, &vlen); + pReq->pVloads = taosArrayInit(vlen, sizeof(SVnodeLoad)); + if (pReq->pVloads == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < vlen; ++i) { + SVnodeLoad vload = {0}; + buf = taosDecodeFixedI32(buf, &vload.vgId); + buf = taosDecodeFixedI8(buf, &vload.role); + buf = taosDecodeFixedI64(buf, &vload.totalStorage); + buf = taosDecodeFixedI64(buf, &vload.compStorage); + buf = taosDecodeFixedI64(buf, &vload.pointsWritten); + buf = taosDecodeFixedI64(buf, &vload.tablesNum); + if (taosArrayPush(pReq->pVloads, &vload) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } + + return buf; +} diff --git a/source/dnode/mgmt/impl/inc/dndVnodes.h b/source/dnode/mgmt/impl/inc/dndVnodes.h index f5177778ec..895e94060f 100644 --- a/source/dnode/mgmt/impl/inc/dndVnodes.h +++ b/source/dnode/mgmt/impl/inc/dndVnodes.h @@ -23,7 +23,7 @@ extern "C" { int32_t dndInitVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode); -void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads); +void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 0674d719b9..1c7ea224fa 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -355,40 +355,38 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { } void dndSendStatusReq(SDnode *pDnode) { - int32_t contLen = sizeof(SStatusReq) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); - - SStatusReq *pStatus = rpcMallocCont(contLen); - if (pStatus == NULL) { - dError("failed to malloc status message"); - return; - } + SStatusReq req = {0}; SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosRLockLatch(&pMgmt->latch); - pStatus->sver = htonl(pDnode->env.sver); - pStatus->dver = htobe64(pMgmt->dver); - pStatus->dnodeId = htonl(pMgmt->dnodeId); - pStatus->clusterId = htobe64(pMgmt->clusterId); - pStatus->rebootTime = htobe64(pMgmt->rebootTime); - pStatus->updateTime = htobe64(pMgmt->updateTime); - pStatus->numOfCores = htonl(pDnode->env.numOfCores); - pStatus->numOfSupportVnodes = htonl(pDnode->cfg.numOfSupportVnodes); - tstrncpy(pStatus->dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN); + req.sver = pDnode->env.sver; + req.dver = pMgmt->dver; + req.dnodeId = pMgmt->dnodeId; + req.clusterId = pMgmt->clusterId; + req.rebootTime = pMgmt->rebootTime; + req.updateTime = pMgmt->updateTime; + req.numOfCores = pDnode->env.numOfCores; + req.numOfSupportVnodes = pDnode->cfg.numOfSupportVnodes; + memcpy(req.dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN); - pStatus->clusterCfg.statusInterval = htonl(pDnode->cfg.statusInterval); - pStatus->clusterCfg.checkTime = 0; + req.clusterCfg.statusInterval = pDnode->cfg.statusInterval; + req.clusterCfg.checkTime = 0; char timestr[32] = "1970-01-01 00:00:00.00"; - (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime); - tstrncpy(pStatus->clusterCfg.timezone, pDnode->env.timezone, TSDB_TIMEZONE_LEN); - tstrncpy(pStatus->clusterCfg.locale, pDnode->env.locale, TSDB_LOCALE_LEN); - tstrncpy(pStatus->clusterCfg.charset, pDnode->env.charset, TSDB_LOCALE_LEN); + (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + memcpy(req.clusterCfg.timezone, pDnode->env.timezone, TSDB_TIMEZONE_LEN); + memcpy(req.clusterCfg.locale, pDnode->env.locale, TSDB_LOCALE_LEN); + memcpy(req.clusterCfg.charset, pDnode->env.charset, TSDB_LOCALE_LEN); taosRUnLockLatch(&pMgmt->latch); - dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); - contLen = sizeof(SStatusReq) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); + req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); + dndGetVnodeLoads(pDnode, req.pVloads); - SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; + int32_t contLen = tSerializeSStatusReq(NULL, &req); + void *pHead = rpcMallocCont(contLen); + void *pBuf = pHead; + tSerializeSStatusReq(&pBuf, &req); + + SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; pMgmt->statusSent = 1; dTrace("pDnode:%p, send status req to mnode", pDnode); diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index c4d14ef697..78eab3151f 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -1008,27 +1008,21 @@ void dndCleanupVnodes(SDnode *pDnode) { dInfo("dnode-vnodes is cleaned up"); } -void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) { +void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; taosRLockLatch(&pMgmt->latch); - pLoads->num = taosHashGetSize(pMgmt->hash); int32_t v = 0; - void * pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL || *ppVnode == NULL) continue; - SVnodeObj * pVnode = *ppVnode; - SVnodeLoad *pLoad = &pLoads->data[v++]; - - vnodeGetLoad(pVnode->pImpl, pLoad); - pLoad->vgId = htonl(pLoad->vgId); - pLoad->totalStorage = htobe64(pLoad->totalStorage); - pLoad->compStorage = htobe64(pLoad->compStorage); - pLoad->pointsWritten = htobe64(pLoad->pointsWritten); - pLoad->tablesNum = htobe64(pLoad->tablesNum); + SVnodeObj *pVnode = *ppVnode; + SVnodeLoad vload = {0}; + vnodeGetLoad(pVnode->pImpl, &vload); + taosArrayPush(pLoads, &vload); pIter = taosHashIterate(pMgmt->hash, pIter); } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 3b43cd8081..811e22b92d 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -299,50 +299,29 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { return 0; } -static void mndParseStatusMsg(SStatusReq *pStatus) { - pStatus->sver = htonl(pStatus->sver); - pStatus->dver = htobe64(pStatus->dver); - pStatus->dnodeId = htonl(pStatus->dnodeId); - pStatus->clusterId = htobe64(pStatus->clusterId); - pStatus->rebootTime = htobe64(pStatus->rebootTime); - pStatus->updateTime = htobe64(pStatus->updateTime); - pStatus->numOfCores = htonl(pStatus->numOfCores); - pStatus->numOfSupportVnodes = htonl(pStatus->numOfSupportVnodes); - pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); - pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); - for (int32_t v = 0; v < pStatus->vnodeLoads.num; ++v) { - SVnodeLoad *pVload = &pStatus->vnodeLoads.data[v]; - pVload->vgId = htonl(pVload->vgId); - pVload->totalStorage = htobe64(pVload->totalStorage); - pVload->compStorage = htobe64(pVload->compStorage); - pVload->pointsWritten = htobe64(pVload->pointsWritten); - pVload->tablesNum = htobe64(pVload->tablesNum); - } -} - static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SStatusReq *pStatus = pReq->rpcMsg.pCont; - SDnodeObj *pDnode = NULL; - int32_t code = -1; + SMnode *pMnode = pReq->pMnode; + SStatusReq statusReq = {0}; + SDnodeObj *pDnode = NULL; + int32_t code = -1; - mndParseStatusMsg(pStatus); + if (tDeserializeSStatusReq(pReq->rpcMsg.pCont, &statusReq) == NULL) goto PROCESS_STATUS_MSG_OVER; - if (pStatus->dnodeId == 0) { - pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); + if (statusReq.dnodeId == 0) { + pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); if (pDnode == NULL) { - mDebug("dnode:%s, not created yet", pStatus->dnodeEp); + mDebug("dnode:%s, not created yet", statusReq.dnodeEp); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; goto PROCESS_STATUS_MSG_OVER; } } else { - pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId); + pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId); if (pDnode == NULL) { - pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); + pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); if (pDnode != NULL) { pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; } - mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp); + mError("dnode:%d, %s not exist", statusReq.dnodeId, statusReq.dnodeEp); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; goto PROCESS_STATUS_MSG_OVER; } @@ -350,28 +329,28 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); - bool dnodeChanged = (pStatus->dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); - bool reboot = (pDnode->rebootTime != pStatus->rebootTime); + bool dnodeChanged = (statusReq.dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); + bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool needCheck = !online || dnodeChanged || reboot; if (needCheck) { - if (pStatus->sver != pMnode->cfg.sver) { + if (statusReq.sver != pMnode->cfg.sver) { if (pDnode != NULL) { pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; } - mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); + mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, pMnode->cfg.sver); terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; goto PROCESS_STATUS_MSG_OVER; } - if (pStatus->dnodeId == 0) { + if (statusReq.dnodeId == 0) { mDebug("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); } else { - if (pStatus->clusterId != pMnode->clusterId) { + if (statusReq.clusterId != pMnode->clusterId) { if (pDnode != NULL) { pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; } - mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, + mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId, pMnode->clusterId); terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID; goto PROCESS_STATUS_MSG_OVER; @@ -382,7 +361,7 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { } // Verify whether the cluster parameters are consistent when status change from offline to ready - int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg); + int32_t ret = mndCheckClusterCfgPara(pMnode, &statusReq.clusterCfg); if (0 != ret) { pDnode->offlineReason = ret; mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]); @@ -396,9 +375,9 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { mDebug("dnode:%d, send dnode eps", pDnode->id); } - pDnode->rebootTime = pStatus->rebootTime; - pDnode->numOfCores = pStatus->numOfCores; - pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; + pDnode->rebootTime = statusReq.rebootTime; + pDnode->numOfCores = statusReq.numOfCores; + pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes; int32_t numOfEps = mndGetDnodeSize(pMnode); int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);