diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9407ce5efc..f47f895897 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -614,7 +614,6 @@ typedef struct { typedef struct { int32_t vgId; int8_t role; - int8_t align[3]; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; @@ -622,27 +621,22 @@ typedef struct { } SVnodeLoad; typedef struct { - int32_t num; - SVnodeLoad data[]; -} SVnodeLoads; - -typedef struct { - int32_t sver; + int32_t mver; // msg version + int32_t sver; // software version + int64_t dver; // dnode table version in sdb int32_t dnodeId; int64_t clusterId; - int64_t dver; int64_t rebootTime; int64_t updateTime; int32_t numOfCores; int32_t numOfSupportVnodes; char dnodeEp[TSDB_EP_LEN]; SClusterCfg clusterCfg; - SVnodeLoads vnodeLoads; + SArray* pVloads; // array of SVnodeLoad } SStatusReq; -typedef struct { - int32_t reserved; -} STransReq; +int32_t tSerializeSStatusReq(void** buf, SStatusReq* pReq); +void* tDeserializeSStatusReq(void* buf, SStatusReq* pReq); typedef struct { int32_t dnodeId; @@ -652,21 +646,23 @@ typedef struct { typedef struct { int32_t id; int8_t isMnode; - int8_t align; SEp ep; } SDnodeEp; typedef struct { - int32_t num; - SDnodeEp eps[]; -} SDnodeEps; - -typedef struct { + int32_t mver; int64_t dver; SDnodeCfg dnodeCfg; - SDnodeEps dnodeEps; + SArray* pDnodeEps; // Array of SDnodeEp } SStatusRsp; +int32_t tSerializeSStatusRsp(void** buf, SStatusRsp* pRsp); +void* tDeserializeSStatusRsp(void* buf, SStatusRsp* pRsp); + +typedef struct { + int32_t mver; +} STransReq; + typedef struct { int32_t id; uint16_t port; // node sync Port @@ -1207,8 +1203,6 @@ typedef struct { int32_t tSerializeSVCreateTbReq(void** buf, SVCreateTbReq* pReq); void* tDeserializeSVCreateTbReq(void* buf, SVCreateTbReq* pReq); -int32_t tSerializeSVCreateTbRsp(void** buf, SVCreateTbRsp* pRsp); -void* tDeserializeSVCreateTbRsp(void* buf, SVCreateTbRsp* pRsp); typedef struct { uint64_t ver; // use a general definition @@ -1220,8 +1214,6 @@ typedef struct { int32_t tSerializeSVCreateTbBatchReq(void** buf, SVCreateTbBatchReq* pReq); void* tDeserializeSVCreateTbBatchReq(void* buf, SVCreateTbBatchReq* pReq); -int32_t tSerializeSVCreateTbBatchRsp(void** buf, SVCreateTbBatchRsp* pRsp); -void* tDeserializeSVCreateTbBatchRsp(void* buf, SVCreateTbBatchRsp* pRsp); typedef struct { uint64_t ver; @@ -1235,8 +1227,6 @@ typedef struct { int32_t tSerializeSVDropTbReq(void** buf, SVDropTbReq* pReq); void* tDeserializeSVDropTbReq(void* buf, SVDropTbReq* pReq); -int32_t tSerializeSVDropTbRsp(void** buf, SVDropTbRsp* pRsp); -void* tDeserializeSVDropTbRsp(void* buf, SVDropTbRsp* pRsp); typedef struct { SMsgHead head; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e4e9c62137..d9d3cca862 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -459,3 +459,145 @@ void *tDeserializeSMAlterStbReq(void *buf, SMAltertbReq *pReq) { return buf; } + +int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) { + int32_t tlen = 0; + + // status + tlen += taosEncodeFixedI32(buf, pReq->mver); + tlen += taosEncodeFixedI32(buf, pReq->sver); + tlen += taosEncodeFixedI64(buf, pReq->dver); + tlen += taosEncodeFixedI32(buf, pReq->dnodeId); + tlen += taosEncodeFixedI64(buf, pReq->clusterId); + tlen += taosEncodeFixedI64(buf, pReq->rebootTime); + tlen += taosEncodeFixedI64(buf, pReq->updateTime); + tlen += taosEncodeFixedI32(buf, pReq->numOfCores); + tlen += taosEncodeFixedI32(buf, pReq->numOfSupportVnodes); + tlen += taosEncodeString(buf, pReq->dnodeEp); + + // cluster cfg + tlen += taosEncodeFixedI32(buf, pReq->clusterCfg.statusInterval); + tlen += taosEncodeFixedI64(buf, pReq->clusterCfg.checkTime); + tlen += taosEncodeString(buf, pReq->clusterCfg.timezone); + tlen += taosEncodeString(buf, pReq->clusterCfg.locale); + tlen += taosEncodeString(buf, pReq->clusterCfg.charset); + + // vnode loads + int32_t vlen = (int32_t)taosArrayGetSize(pReq->pVloads); + tlen += taosEncodeFixedI32(buf, vlen); + for (int32_t i = 0; i < vlen; ++i) { + SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i); + tlen += taosEncodeFixedI32(buf, pload->vgId); + tlen += taosEncodeFixedI8(buf, pload->role); + tlen += taosEncodeFixedI64(buf, pload->totalStorage); + tlen += taosEncodeFixedI64(buf, pload->compStorage); + tlen += taosEncodeFixedI64(buf, pload->pointsWritten); + tlen += taosEncodeFixedI64(buf, pload->tablesNum); + } + + return tlen; +} + +void *tDeserializeSStatusReq(void *buf, SStatusReq *pReq) { + // status + buf = taosDecodeFixedI32(buf, &pReq->mver); + buf = taosDecodeFixedI32(buf, &pReq->sver); + buf = taosDecodeFixedI64(buf, &pReq->dver); + buf = taosDecodeFixedI32(buf, &pReq->dnodeId); + buf = taosDecodeFixedI64(buf, &pReq->clusterId); + buf = taosDecodeFixedI64(buf, &pReq->rebootTime); + buf = taosDecodeFixedI64(buf, &pReq->updateTime); + buf = taosDecodeFixedI32(buf, &pReq->numOfCores); + buf = taosDecodeFixedI32(buf, &pReq->numOfSupportVnodes); + buf = taosDecodeStringTo(buf, pReq->dnodeEp); + + // cluster cfg + buf = taosDecodeFixedI32(buf, &pReq->clusterCfg.statusInterval); + buf = taosDecodeFixedI64(buf, &pReq->clusterCfg.checkTime); + buf = taosDecodeStringTo(buf, pReq->clusterCfg.timezone); + buf = taosDecodeStringTo(buf, pReq->clusterCfg.locale); + buf = taosDecodeStringTo(buf, pReq->clusterCfg.charset); + + // vnode loads + int32_t vlen = 0; + buf = taosDecodeFixedI32(buf, &vlen); + pReq->pVloads = taosArrayInit(vlen, sizeof(SVnodeLoad)); + if (pReq->pVloads == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < vlen; ++i) { + SVnodeLoad vload = {0}; + buf = taosDecodeFixedI32(buf, &vload.vgId); + buf = taosDecodeFixedI8(buf, &vload.role); + buf = taosDecodeFixedI64(buf, &vload.totalStorage); + buf = taosDecodeFixedI64(buf, &vload.compStorage); + buf = taosDecodeFixedI64(buf, &vload.pointsWritten); + buf = taosDecodeFixedI64(buf, &vload.tablesNum); + if (taosArrayPush(pReq->pVloads, &vload) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } + + return buf; +} + +int32_t tSerializeSStatusRsp(void **buf, SStatusRsp *pRsp) { + int32_t tlen = 0; + + // status + tlen += taosEncodeFixedI32(buf, pRsp->mver); + tlen += taosEncodeFixedI64(buf, pRsp->dver); + + // dnode cfg + tlen += taosEncodeFixedI32(buf, pRsp->dnodeCfg.dnodeId); + tlen += taosEncodeFixedI64(buf, pRsp->dnodeCfg.clusterId); + + // dnode eps + int32_t dlen = (int32_t)taosArrayGetSize(pRsp->pDnodeEps); + tlen += taosEncodeFixedI32(buf, dlen); + for (int32_t i = 0; i < dlen; ++i) { + SDnodeEp *pDnodeEp = taosArrayGet(pRsp->pDnodeEps, i); + tlen += taosEncodeFixedI32(buf, pDnodeEp->id); + tlen += taosEncodeFixedI8(buf, pDnodeEp->isMnode); + tlen += taosEncodeString(buf, pDnodeEp->ep.fqdn); + tlen += taosEncodeFixedU16(buf, pDnodeEp->ep.port); + } + + return tlen; +} + +void *tDeserializeSStatusRsp(void *buf, SStatusRsp *pRsp) { + // status + buf = taosDecodeFixedI32(buf, &pRsp->mver); + buf = taosDecodeFixedI64(buf, &pRsp->dver); + + // cluster cfg + buf = taosDecodeFixedI32(buf, &pRsp->dnodeCfg.dnodeId); + buf = taosDecodeFixedI64(buf, &pRsp->dnodeCfg.clusterId); + + // dnode eps + int32_t dlen = 0; + buf = taosDecodeFixedI32(buf, &dlen); + pRsp->pDnodeEps = taosArrayInit(dlen, sizeof(SDnodeEp)); + if (pRsp->pDnodeEps == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + for (int32_t i = 0; i < dlen; ++i) { + SDnodeEp dnodeEp = {0}; + buf = taosDecodeFixedI32(buf, &dnodeEp.id); + buf = taosDecodeFixedI8(buf, &dnodeEp.isMnode); + buf = taosDecodeStringTo(buf, dnodeEp.ep.fqdn); + buf = taosDecodeFixedU16(buf, &dnodeEp.ep.port); + if (taosArrayPush(pRsp->pDnodeEps, &dnodeEp) == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } + + return buf; +} diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index 4214dca11d..9eff246323 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -55,7 +55,7 @@ typedef struct { SEpSet mnodeEpSet; char *file; SHashObj *dnodeHash; - SDnodeEps *dnodeEps; + SArray *pDnodeEps; pthread_t *threadId; SRWLatch latch; SDnodeWorker mgmtWorker; diff --git a/source/dnode/mgmt/impl/inc/dndVnodes.h b/source/dnode/mgmt/impl/inc/dndVnodes.h index f5177778ec..895e94060f 100644 --- a/source/dnode/mgmt/impl/inc/dndVnodes.h +++ b/source/dnode/mgmt/impl/inc/dndVnodes.h @@ -23,7 +23,7 @@ extern "C" { int32_t dndInitVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode); -void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads); +void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 0674d719b9..41dd54a0ff 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -113,35 +113,31 @@ static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { static void dndPrintDnodes(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; - dDebug("print dnode ep list, num:%d", pMgmt->dnodeEps->num); - for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { - SDnodeEp *pEp = &pMgmt->dnodeEps->eps[i]; + int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); + dDebug("print dnode ep list, num:%d", numOfEps); + for (int32_t i = 0; i < numOfEps; i++) { + SDnodeEp *pEp = taosArrayGet(pMgmt->pDnodeEps, i); dDebug("dnode:%d, fqdn:%s port:%u isMnode:%d", pEp->id, pEp->ep.fqdn, pEp->ep.port, pEp->isMnode); } } -static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) { +static void dndResetDnodes(SDnode *pDnode, SArray *pDnodeEps) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; - int32_t size = sizeof(SDnodeEps) + pDnodeEps->num * sizeof(SDnodeEp); - if (pDnodeEps->num > pMgmt->dnodeEps->num) { - SDnodeEps *tmp = calloc(1, size); - if (tmp == NULL) return; - - tfree(pMgmt->dnodeEps); - pMgmt->dnodeEps = tmp; - } - - if (pMgmt->dnodeEps != pDnodeEps) { - memcpy(pMgmt->dnodeEps, pDnodeEps, size); + if (pMgmt->pDnodeEps != pDnodeEps) { + SArray *tmp = pMgmt->pDnodeEps; + pMgmt->pDnodeEps = taosArrayDup(pDnodeEps); + taosArrayDestroy(tmp); } pMgmt->mnodeEpSet.inUse = 0; pMgmt->mnodeEpSet.numOfEps = 0; int32_t mIndex = 0; - for (int32_t i = 0; i < pMgmt->dnodeEps->num; i++) { - SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; + int32_t numOfEps = (int32_t)taosArrayGetSize(pDnodeEps); + + for (int32_t i = 0; i < numOfEps; i++) { + SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i); if (!pDnodeEp->isMnode) continue; if (mIndex >= TSDB_MAX_REPLICA) continue; pMgmt->mnodeEpSet.numOfEps++; @@ -150,8 +146,8 @@ static void dndResetDnodes(SDnode *pDnode, SDnodeEps *pDnodeEps) { mIndex++; } - for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { - SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; + for (int32_t i = 0; i < numOfEps; i++) { + SDnodeEp *pDnodeEp = taosArrayGet(pDnodeEps, i); taosHashPut(pMgmt->dnodeHash, &pDnodeEp->id, sizeof(int32_t), pDnodeEp, sizeof(SDnodeEp)); } @@ -178,6 +174,12 @@ static bool dndIsEpChanged(SDnode *pDnode, int32_t dnodeId, char *pEp) { static int32_t dndReadDnodes(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; + pMgmt->pDnodeEps = taosArrayInit(1, sizeof(SDnodeEp)); + if (pMgmt->pDnodeEps == NULL) { + dError("failed to calloc dnodeEp array since %s", strerror(errno)); + goto PRASE_DNODE_OVER; + } + int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; int32_t len = 0; int32_t maxLen = 256 * 1024; @@ -238,18 +240,11 @@ static int32_t dndReadDnodes(SDnode *pDnode) { goto PRASE_DNODE_OVER; } - pMgmt->dnodeEps = calloc(1, numOfDnodes * sizeof(SDnodeEp) + sizeof(SDnodeEps)); - if (pMgmt->dnodeEps == NULL) { - dError("failed to calloc dnodeEpList since %s", strerror(errno)); - goto PRASE_DNODE_OVER; - } - pMgmt->dnodeEps->num = numOfDnodes; - for (int32_t i = 0; i < numOfDnodes; ++i) { cJSON *node = cJSON_GetArrayItem(dnodes, i); if (node == NULL) break; - SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; + SDnodeEp dnodeEp = {0}; cJSON *did = cJSON_GetObjectItem(node, "id"); if (!did || did->type != cJSON_Number) { @@ -257,14 +252,14 @@ static int32_t dndReadDnodes(SDnode *pDnode) { goto PRASE_DNODE_OVER; } - pDnodeEp->id = dnodeId->valueint; + dnodeEp.id = dnodeId->valueint; cJSON *dnodeFqdn = cJSON_GetObjectItem(node, "fqdn"); if (!dnodeFqdn || dnodeFqdn->type != cJSON_String || dnodeFqdn->valuestring == NULL) { dError("failed to read %s since dnodeFqdn not found", pMgmt->file); goto PRASE_DNODE_OVER; } - tstrncpy(pDnodeEp->ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); + tstrncpy(dnodeEp.ep.fqdn, dnodeFqdn->valuestring, TSDB_FQDN_LEN); cJSON *dnodePort = cJSON_GetObjectItem(node, "port"); if (!dnodePort || dnodePort->type != cJSON_Number) { @@ -272,14 +267,16 @@ static int32_t dndReadDnodes(SDnode *pDnode) { goto PRASE_DNODE_OVER; } - pDnodeEp->ep.port = dnodePort->valueint; + dnodeEp.ep.port = dnodePort->valueint; cJSON *isMnode = cJSON_GetObjectItem(node, "isMnode"); if (!isMnode || isMnode->type != cJSON_Number) { dError("failed to read %s since isMnode not found", pMgmt->file); goto PRASE_DNODE_OVER; } - pDnodeEp->isMnode = isMnode->valueint; + dnodeEp.isMnode = isMnode->valueint; + + taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); } code = 0; @@ -296,15 +293,14 @@ PRASE_DNODE_OVER: return -1; } - if (pMgmt->dnodeEps == NULL) { - pMgmt->dnodeEps = calloc(1, sizeof(SDnodeEps) + sizeof(SDnodeEp)); - pMgmt->dnodeEps->num = 1; - pMgmt->dnodeEps->eps[0].isMnode = 1; - - taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &(pMgmt->dnodeEps->eps[0].ep)); + if (taosArrayGetSize(pMgmt->pDnodeEps) == 0) { + SDnodeEp dnodeEp = {0}; + dnodeEp.isMnode = 1; + taosGetFqdnPortFromEp(pDnode->cfg.firstEp, &dnodeEp.ep); + taosArrayPush(pMgmt->pDnodeEps, &dnodeEp); } - dndResetDnodes(pDnode, pMgmt->dnodeEps); + dndResetDnodes(pDnode, pMgmt->pDnodeEps); terrno = 0; return 0; @@ -329,13 +325,15 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pMgmt->clusterId); len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pMgmt->dropped); len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n"); - for (int32_t i = 0; i < pMgmt->dnodeEps->num; ++i) { - SDnodeEp *pDnodeEp = &pMgmt->dnodeEps->eps[i]; + + int32_t numOfEps = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); + for (int32_t i = 0; i < numOfEps; ++i) { + SDnodeEp *pDnodeEp = taosArrayGet(pMgmt->pDnodeEps, i); len += snprintf(content + len, maxLen - len, " \"id\": %d,\n", pDnodeEp->id); len += snprintf(content + len, maxLen - len, " \"fqdn\": \"%s\",\n", pDnodeEp->ep.fqdn); len += snprintf(content + len, maxLen - len, " \"port\": %u,\n", pDnodeEp->ep.port); len += snprintf(content + len, maxLen - len, " \"isMnode\": %d\n", pDnodeEp->isMnode); - if (i < pMgmt->dnodeEps->num - 1) { + if (i < numOfEps - 1) { len += snprintf(content + len, maxLen - len, " },{\n"); } else { len += snprintf(content + len, maxLen - len, " }]\n"); @@ -355,40 +353,39 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { } void dndSendStatusReq(SDnode *pDnode) { - int32_t contLen = sizeof(SStatusReq) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); - - SStatusReq *pStatus = rpcMallocCont(contLen); - if (pStatus == NULL) { - dError("failed to malloc status message"); - return; - } + SStatusReq req = {0}; SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosRLockLatch(&pMgmt->latch); - pStatus->sver = htonl(pDnode->env.sver); - pStatus->dver = htobe64(pMgmt->dver); - pStatus->dnodeId = htonl(pMgmt->dnodeId); - pStatus->clusterId = htobe64(pMgmt->clusterId); - pStatus->rebootTime = htobe64(pMgmt->rebootTime); - pStatus->updateTime = htobe64(pMgmt->updateTime); - pStatus->numOfCores = htonl(pDnode->env.numOfCores); - pStatus->numOfSupportVnodes = htonl(pDnode->cfg.numOfSupportVnodes); - tstrncpy(pStatus->dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN); + req.sver = pDnode->env.sver; + req.dver = pMgmt->dver; + req.dnodeId = pMgmt->dnodeId; + req.clusterId = pMgmt->clusterId; + req.rebootTime = pMgmt->rebootTime; + req.updateTime = pMgmt->updateTime; + req.numOfCores = pDnode->env.numOfCores; + req.numOfSupportVnodes = pDnode->cfg.numOfSupportVnodes; + memcpy(req.dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN); - pStatus->clusterCfg.statusInterval = htonl(pDnode->cfg.statusInterval); - pStatus->clusterCfg.checkTime = 0; + req.clusterCfg.statusInterval = pDnode->cfg.statusInterval; + req.clusterCfg.checkTime = 0; char timestr[32] = "1970-01-01 00:00:00.00"; - (void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); - pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime); - tstrncpy(pStatus->clusterCfg.timezone, pDnode->env.timezone, TSDB_TIMEZONE_LEN); - tstrncpy(pStatus->clusterCfg.locale, pDnode->env.locale, TSDB_LOCALE_LEN); - tstrncpy(pStatus->clusterCfg.charset, pDnode->env.charset, TSDB_LOCALE_LEN); + (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); + memcpy(req.clusterCfg.timezone, pDnode->env.timezone, TSDB_TIMEZONE_LEN); + memcpy(req.clusterCfg.locale, pDnode->env.locale, TSDB_LOCALE_LEN); + memcpy(req.clusterCfg.charset, pDnode->env.charset, TSDB_LOCALE_LEN); taosRUnLockLatch(&pMgmt->latch); - dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); - contLen = sizeof(SStatusReq) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); + req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad)); + dndGetVnodeLoads(pDnode, req.pVloads); - SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; + int32_t contLen = tSerializeSStatusReq(NULL, &req); + void *pHead = rpcMallocCont(contLen); + void *pBuf = pHead; + tSerializeSStatusReq(&pBuf, &req); + taosArrayDestroy(req.pVloads); + + SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; pMgmt->statusSent = 1; dTrace("pDnode:%p, send status req to mnode", pDnode); @@ -407,18 +404,20 @@ static void dndUpdateDnodeCfg(SDnode *pDnode, SDnodeCfg *pCfg) { } } -static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) { - if (pDnodeEps == NULL || pDnodeEps->num <= 0) return; +static void dndUpdateDnodeEps(SDnode *pDnode, SArray *pDnodeEps) { + int32_t numOfEps = taosArrayGetSize(pDnodeEps); + if (numOfEps <= 0) return; SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosWLockLatch(&pMgmt->latch); - if (pDnodeEps->num != pMgmt->dnodeEps->num) { + int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pMgmt->pDnodeEps); + if (numOfEps != numOfEpsOld) { dndResetDnodes(pDnode, pDnodeEps); dndWriteDnodes(pDnode); } else { - int32_t size = pDnodeEps->num * sizeof(SDnodeEp) + sizeof(SDnodeEps); - if (memcmp(pMgmt->dnodeEps, pDnodeEps, size) != 0) { + int32_t size = numOfEps * sizeof(SDnodeEp); + if (memcmp(pMgmt->pDnodeEps->pData, pDnodeEps->pData, size) != 0) { dndResetDnodes(pDnode, pDnodeEps); dndWriteDnodes(pDnode); } @@ -431,33 +430,21 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; if (pRsp->code != TSDB_CODE_SUCCESS) { - pMgmt->statusSent = 0; if (pRsp->code == TSDB_CODE_MND_DNODE_NOT_EXIST && !pMgmt->dropped && pMgmt->dnodeId > 0) { dInfo("dnode:%d, set to dropped since not exist in mnode", pMgmt->dnodeId); pMgmt->dropped = 1; dndWriteDnodes(pDnode); } - return; - } - - if (pRsp->pCont != NULL && pRsp->contLen != 0) { - SStatusRsp *pStatus = pRsp->pCont; - pMgmt->dver = htobe64(pStatus->dver); - - SDnodeCfg *pCfg = &pStatus->dnodeCfg; - pCfg->dnodeId = htonl(pCfg->dnodeId); - pCfg->clusterId = htobe64(pCfg->clusterId); - dndUpdateDnodeCfg(pDnode, pCfg); - - SDnodeEps *pDnodeEps = &pStatus->dnodeEps; - pDnodeEps->num = htonl(pDnodeEps->num); - for (int32_t i = 0; i < pDnodeEps->num; ++i) { - pDnodeEps->eps[i].id = htonl(pDnodeEps->eps[i].id); - pDnodeEps->eps[i].ep.port = htons(pDnodeEps->eps[i].ep.port); + } else { + SStatusRsp statusRsp = {0}; + if (pRsp->pCont != NULL && pRsp->contLen != 0 && tDeserializeSStatusRsp(pRsp->pCont, &statusRsp) != NULL) { + pMgmt->dver = statusRsp.dver; + dndUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg); + dndUpdateDnodeEps(pDnode, statusRsp.pDnodeEps); } - - dndUpdateDnodeEps(pDnode, pDnodeEps); + taosArrayDestroy(statusRsp.pDnodeEps); } + pMgmt->statusSent = 0; } @@ -572,9 +559,9 @@ void dndCleanupMgmt(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosWLockLatch(&pMgmt->latch); - if (pMgmt->dnodeEps != NULL) { - free(pMgmt->dnodeEps); - pMgmt->dnodeEps = NULL; + if (pMgmt->pDnodeEps != NULL) { + taosArrayDestroy(pMgmt->pDnodeEps); + pMgmt->pDnodeEps = NULL; } if (pMgmt->dnodeHash != NULL) { diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index c4d14ef697..78eab3151f 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -1008,27 +1008,21 @@ void dndCleanupVnodes(SDnode *pDnode) { dInfo("dnode-vnodes is cleaned up"); } -void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) { +void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; taosRLockLatch(&pMgmt->latch); - pLoads->num = taosHashGetSize(pMgmt->hash); int32_t v = 0; - void * pIter = taosHashIterate(pMgmt->hash, NULL); + void *pIter = taosHashIterate(pMgmt->hash, NULL); while (pIter) { SVnodeObj **ppVnode = pIter; if (ppVnode == NULL || *ppVnode == NULL) continue; - SVnodeObj * pVnode = *ppVnode; - SVnodeLoad *pLoad = &pLoads->data[v++]; - - vnodeGetLoad(pVnode->pImpl, pLoad); - pLoad->vgId = htonl(pLoad->vgId); - pLoad->totalStorage = htobe64(pLoad->totalStorage); - pLoad->compStorage = htobe64(pLoad->compStorage); - pLoad->pointsWritten = htobe64(pLoad->pointsWritten); - pLoad->tablesNum = htobe64(pLoad->tablesNum); + SVnodeObj *pVnode = *ppVnode; + SVnodeLoad vload = {0}; + vnodeGetLoad(pVnode->pImpl, &vload); + taosArrayPush(pLoads, &vload); pIter = taosHashIterate(pMgmt->hash, pIter); } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 3b43cd8081..c7a2724a44 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -244,7 +244,7 @@ bool mndIsDnodeOnline(SMnode *pMnode, SDnodeObj *pDnode, int64_t curMs) { return true; } -static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) { +static void mndGetDnodeData(SMnode *pMnode, SArray *pDnodeEps) { SSdb *pSdb = pMnode->pSdb; int32_t numOfEps = 0; @@ -253,25 +253,20 @@ static void mndGetDnodeData(SMnode *pMnode, SDnodeEps *pEps, int32_t maxEps) { SDnodeObj *pDnode = NULL; pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode); if (pIter == NULL) break; - if (numOfEps >= maxEps) { - sdbCancelFetch(pSdb, pIter); - sdbRelease(pSdb, pDnode); - break; - } - SDnodeEp *pEp = &pEps->eps[numOfEps]; - pEp->id = htonl(pDnode->id); - pEp->ep.port = htons(pDnode->port); - memcpy(pEp->ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN); - pEp->isMnode = 0; + SDnodeEp dnodeEp = {0}; + dnodeEp.id = pDnode->id; + dnodeEp.isMnode = 0; + dnodeEp.ep.port = pDnode->port; + memcpy(dnodeEp.ep.fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + if (mndIsMnode(pMnode, pDnode->id)) { - pEp->isMnode = 1; + dnodeEp.isMnode = 1; } - numOfEps++; - sdbRelease(pSdb, pDnode); - } - pEps->num = htonl(numOfEps); + sdbRelease(pSdb, pDnode); + taosArrayPush(pDnodeEps, &dnodeEp); + } } static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { @@ -299,50 +294,29 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) { return 0; } -static void mndParseStatusMsg(SStatusReq *pStatus) { - pStatus->sver = htonl(pStatus->sver); - pStatus->dver = htobe64(pStatus->dver); - pStatus->dnodeId = htonl(pStatus->dnodeId); - pStatus->clusterId = htobe64(pStatus->clusterId); - pStatus->rebootTime = htobe64(pStatus->rebootTime); - pStatus->updateTime = htobe64(pStatus->updateTime); - pStatus->numOfCores = htonl(pStatus->numOfCores); - pStatus->numOfSupportVnodes = htonl(pStatus->numOfSupportVnodes); - pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval); - pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime); - for (int32_t v = 0; v < pStatus->vnodeLoads.num; ++v) { - SVnodeLoad *pVload = &pStatus->vnodeLoads.data[v]; - pVload->vgId = htonl(pVload->vgId); - pVload->totalStorage = htobe64(pVload->totalStorage); - pVload->compStorage = htobe64(pVload->compStorage); - pVload->pointsWritten = htobe64(pVload->pointsWritten); - pVload->tablesNum = htobe64(pVload->tablesNum); - } -} - static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { - SMnode *pMnode = pReq->pMnode; - SStatusReq *pStatus = pReq->rpcMsg.pCont; - SDnodeObj *pDnode = NULL; - int32_t code = -1; + SMnode *pMnode = pReq->pMnode; + SStatusReq statusReq = {0}; + SDnodeObj *pDnode = NULL; + int32_t code = -1; - mndParseStatusMsg(pStatus); + if (tDeserializeSStatusReq(pReq->rpcMsg.pCont, &statusReq) == NULL) goto PROCESS_STATUS_MSG_OVER; - if (pStatus->dnodeId == 0) { - pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); + if (statusReq.dnodeId == 0) { + pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); if (pDnode == NULL) { - mDebug("dnode:%s, not created yet", pStatus->dnodeEp); + mDebug("dnode:%s, not created yet", statusReq.dnodeEp); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; goto PROCESS_STATUS_MSG_OVER; } } else { - pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId); + pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId); if (pDnode == NULL) { - pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); + pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp); if (pDnode != NULL) { pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; } - mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp); + mError("dnode:%d, %s not exist", statusReq.dnodeId, statusReq.dnodeEp); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; goto PROCESS_STATUS_MSG_OVER; } @@ -350,28 +324,28 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); - bool dnodeChanged = (pStatus->dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); - bool reboot = (pDnode->rebootTime != pStatus->rebootTime); + bool dnodeChanged = (statusReq.dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); + bool reboot = (pDnode->rebootTime != statusReq.rebootTime); bool needCheck = !online || dnodeChanged || reboot; if (needCheck) { - if (pStatus->sver != pMnode->cfg.sver) { + if (statusReq.sver != pMnode->cfg.sver) { if (pDnode != NULL) { pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; } - mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); + mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, pMnode->cfg.sver); terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; goto PROCESS_STATUS_MSG_OVER; } - if (pStatus->dnodeId == 0) { + if (statusReq.dnodeId == 0) { mDebug("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); } else { - if (pStatus->clusterId != pMnode->clusterId) { + if (statusReq.clusterId != pMnode->clusterId) { if (pDnode != NULL) { pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; } - mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, + mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId, pMnode->clusterId); terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID; goto PROCESS_STATUS_MSG_OVER; @@ -382,7 +356,7 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { } // Verify whether the cluster parameters are consistent when status change from offline to ready - int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg); + int32_t ret = mndCheckClusterCfgPara(pMnode, &statusReq.clusterCfg); if (0 != ret) { pDnode->offlineReason = ret; mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]); @@ -396,25 +370,30 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { mDebug("dnode:%d, send dnode eps", pDnode->id); } - pDnode->rebootTime = pStatus->rebootTime; - pDnode->numOfCores = pStatus->numOfCores; - pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; + pDnode->rebootTime = statusReq.rebootTime; + pDnode->numOfCores = statusReq.numOfCores; + pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes; - int32_t numOfEps = mndGetDnodeSize(pMnode); - int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); - SStatusRsp *pRsp = rpcMallocCont(contLen); - if (pRsp == NULL) { + SStatusRsp statusRsp = {0}; + statusRsp.dver = sdbGetTableVer(pMnode->pSdb, SDB_DNODE); + statusRsp.dnodeCfg.dnodeId = pDnode->id; + statusRsp.dnodeCfg.clusterId = pMnode->clusterId; + statusRsp.pDnodeEps = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(SDnodeEp)); + if (statusRsp.pDnodeEps == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto PROCESS_STATUS_MSG_OVER; } - pRsp->dver = htobe64(sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); - pRsp->dnodeCfg.dnodeId = htonl(pDnode->id); - pRsp->dnodeCfg.clusterId = htobe64(pMnode->clusterId); - mndGetDnodeData(pMnode, &pRsp->dnodeEps, numOfEps); + mndGetDnodeData(pMnode, statusRsp.pDnodeEps); + + int32_t contLen = tSerializeSStatusRsp(NULL, &statusRsp); + void *pHead = rpcMallocCont(contLen); + void *pBuf = pHead; + tSerializeSStatusRsp(&pBuf, &statusRsp); + taosArrayDestroy(statusRsp.pDnodeEps); pReq->contLen = contLen; - pReq->pCont = pRsp; + pReq->pCont = pHead; } pDnode->lastAccessTime = curMs; @@ -422,6 +401,7 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { PROCESS_STATUS_MSG_OVER: mndReleaseDnode(pMnode, pDnode); + taosArrayDestroy(statusReq.pVloads); return code; } diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index 3ddea2290e..2b61ae7176 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -592,8 +592,8 @@ CREATE_STB_OVER: mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pTopicStb); mndReleaseDb(pMnode, pDb); - taosArrayClear(createReq.pColumns); - taosArrayClear(createReq.pTags); + taosArrayDestroy(createReq.pColumns); + taosArrayDestroy(createReq.pTags); return code; } @@ -1049,7 +1049,7 @@ ALTER_STB_OVER: mndReleaseStb(pMnode, pStb); mndReleaseDb(pMnode, pDb); - taosArrayClear(alterReq.pFields); + taosArrayDestroy(alterReq.pFields); return code; } diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index d8ef0462fb..f5e9153098 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -1383,7 +1383,7 @@ static void rpcSendMsgToPeer(SRpcConn *pConn, void *msg, int msgLen) { static void rpcProcessConnError(void *param, void *id) { SRpcReqContext *pContext = (SRpcReqContext *)param; SRpcInfo * pRpc = pContext->pRpc; - SRpcMsg rpcMsg; + SRpcMsg rpcMsg = {0}; if (pRpc == NULL) { return;