seralize status request

This commit is contained in:
Shengliang Guan 2022-02-10 11:40:16 +08:00
parent 15c09f8905
commit 17be3f19aa
6 changed files with 144 additions and 91 deletions

View File

@ -614,7 +614,6 @@ typedef struct {
typedef struct { typedef struct {
int32_t vgId; int32_t vgId;
int8_t role; int8_t role;
int8_t align[3];
int64_t totalStorage; int64_t totalStorage;
int64_t compStorage; int64_t compStorage;
int64_t pointsWritten; int64_t pointsWritten;
@ -622,24 +621,23 @@ typedef struct {
} SVnodeLoad; } SVnodeLoad;
typedef struct { typedef struct {
int32_t num; int32_t mver; // msg version
SVnodeLoad data[]; int32_t sver; // software version
} SVnodeLoads; int64_t dver; // dnode table version in sdb
typedef struct {
int32_t sver;
int32_t dnodeId; int32_t dnodeId;
int64_t clusterId; int64_t clusterId;
int64_t dver;
int64_t rebootTime; int64_t rebootTime;
int64_t updateTime; int64_t updateTime;
int32_t numOfCores; int32_t numOfCores;
int32_t numOfSupportVnodes; int32_t numOfSupportVnodes;
char dnodeEp[TSDB_EP_LEN]; char dnodeEp[TSDB_EP_LEN];
SClusterCfg clusterCfg; SClusterCfg clusterCfg;
SVnodeLoads vnodeLoads; SArray* pVloads; // array of SVnodeLoad
} SStatusReq; } SStatusReq;
int32_t tSerializeSStatusReq(void** buf, SStatusReq* pReq);
void* tDeserializeSStatusReq(void* buf, SStatusReq* pReq);
typedef struct { typedef struct {
int32_t reserved; int32_t reserved;
} STransReq; } STransReq;

View File

@ -459,3 +459,87 @@ void *tDeserializeSMAlterStbReq(void *buf, SMAltertbReq *pReq) {
return buf; return buf;
} }
int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) {
int32_t tlen = 0;
// status
tlen += taosEncodeFixedI32(buf, pReq->mver);
tlen += taosEncodeFixedI32(buf, pReq->sver);
tlen += taosEncodeFixedI64(buf, pReq->dver);
tlen += taosEncodeFixedI32(buf, pReq->dnodeId);
tlen += taosEncodeFixedI64(buf, pReq->clusterId);
tlen += taosEncodeFixedI64(buf, pReq->rebootTime);
tlen += taosEncodeFixedI64(buf, pReq->updateTime);
tlen += taosEncodeFixedI32(buf, pReq->numOfCores);
tlen += taosEncodeFixedI32(buf, pReq->numOfSupportVnodes);
tlen += taosEncodeString(buf, pReq->dnodeEp);
// cluster cfg
tlen += taosEncodeFixedI32(buf, pReq->clusterCfg.statusInterval);
tlen += taosEncodeFixedI64(buf, pReq->clusterCfg.checkTime);
tlen += taosEncodeString(buf, pReq->clusterCfg.timezone);
tlen += taosEncodeString(buf, pReq->clusterCfg.locale);
tlen += taosEncodeString(buf, pReq->clusterCfg.charset);
// vnode loads
int32_t vlen = (int32_t)taosArrayGetSize(pReq->pVloads);
tlen += taosEncodeFixedI32(buf, vlen);
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i);
tlen += taosEncodeFixedI32(buf, pload->vgId);
tlen += taosEncodeFixedI8(buf, pload->role);
tlen += taosEncodeFixedI64(buf, pload->totalStorage);
tlen += taosEncodeFixedI64(buf, pload->compStorage);
tlen += taosEncodeFixedI64(buf, pload->pointsWritten);
tlen += taosEncodeFixedI64(buf, pload->tablesNum);
}
return tlen;
}
void *tDeserializeSStatusReq(void *buf, SStatusReq *pReq) {
// status
buf = taosDecodeFixedI32(buf, &pReq->mver);
buf = taosDecodeFixedI32(buf, &pReq->sver);
buf = taosDecodeFixedI64(buf, &pReq->dver);
buf = taosDecodeFixedI32(buf, &pReq->dnodeId);
buf = taosDecodeFixedI64(buf, &pReq->clusterId);
buf = taosDecodeFixedI64(buf, &pReq->rebootTime);
buf = taosDecodeFixedI64(buf, &pReq->updateTime);
buf = taosDecodeFixedI32(buf, &pReq->numOfCores);
buf = taosDecodeFixedI32(buf, &pReq->numOfSupportVnodes);
buf = taosDecodeStringTo(buf, pReq->dnodeEp);
// cluster cfg
buf = taosDecodeFixedI32(buf, &pReq->clusterCfg.statusInterval);
buf = taosDecodeFixedI64(buf, &pReq->clusterCfg.checkTime);
buf = taosDecodeStringTo(buf, pReq->clusterCfg.timezone);
buf = taosDecodeStringTo(buf, pReq->clusterCfg.locale);
buf = taosDecodeStringTo(buf, pReq->clusterCfg.charset);
// vnode loads
int32_t vlen = 0;
buf = taosDecodeFixedI32(buf, &vlen);
pReq->pVloads = taosArrayInit(vlen, sizeof(SVnodeLoad));
if (pReq->pVloads == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
for (int32_t i = 0; i < vlen; ++i) {
SVnodeLoad vload = {0};
buf = taosDecodeFixedI32(buf, &vload.vgId);
buf = taosDecodeFixedI8(buf, &vload.role);
buf = taosDecodeFixedI64(buf, &vload.totalStorage);
buf = taosDecodeFixedI64(buf, &vload.compStorage);
buf = taosDecodeFixedI64(buf, &vload.pointsWritten);
buf = taosDecodeFixedI64(buf, &vload.tablesNum);
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
}
return buf;
}

View File

@ -23,7 +23,7 @@ extern "C" {
int32_t dndInitVnodes(SDnode *pDnode); int32_t dndInitVnodes(SDnode *pDnode);
void dndCleanupVnodes(SDnode *pDnode); void dndCleanupVnodes(SDnode *pDnode);
void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pVloads); void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads);
void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);
void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet);

View File

@ -355,40 +355,38 @@ static int32_t dndWriteDnodes(SDnode *pDnode) {
} }
void dndSendStatusReq(SDnode *pDnode) { void dndSendStatusReq(SDnode *pDnode) {
int32_t contLen = sizeof(SStatusReq) + TSDB_MAX_VNODES * sizeof(SVnodeLoad); SStatusReq req = {0};
SStatusReq *pStatus = rpcMallocCont(contLen);
if (pStatus == NULL) {
dError("failed to malloc status message");
return;
}
SDnodeMgmt *pMgmt = &pDnode->dmgmt; SDnodeMgmt *pMgmt = &pDnode->dmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
pStatus->sver = htonl(pDnode->env.sver); req.sver = pDnode->env.sver;
pStatus->dver = htobe64(pMgmt->dver); req.dver = pMgmt->dver;
pStatus->dnodeId = htonl(pMgmt->dnodeId); req.dnodeId = pMgmt->dnodeId;
pStatus->clusterId = htobe64(pMgmt->clusterId); req.clusterId = pMgmt->clusterId;
pStatus->rebootTime = htobe64(pMgmt->rebootTime); req.rebootTime = pMgmt->rebootTime;
pStatus->updateTime = htobe64(pMgmt->updateTime); req.updateTime = pMgmt->updateTime;
pStatus->numOfCores = htonl(pDnode->env.numOfCores); req.numOfCores = pDnode->env.numOfCores;
pStatus->numOfSupportVnodes = htonl(pDnode->cfg.numOfSupportVnodes); req.numOfSupportVnodes = pDnode->cfg.numOfSupportVnodes;
tstrncpy(pStatus->dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN); memcpy(req.dnodeEp, pDnode->cfg.localEp, TSDB_EP_LEN);
pStatus->clusterCfg.statusInterval = htonl(pDnode->cfg.statusInterval); req.clusterCfg.statusInterval = pDnode->cfg.statusInterval;
pStatus->clusterCfg.checkTime = 0; req.clusterCfg.checkTime = 0;
char timestr[32] = "1970-01-01 00:00:00.00"; char timestr[32] = "1970-01-01 00:00:00.00";
(void)taosParseTime(timestr, &pStatus->clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0); (void)taosParseTime(timestr, &req.clusterCfg.checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
pStatus->clusterCfg.checkTime = htonl(pStatus->clusterCfg.checkTime); memcpy(req.clusterCfg.timezone, pDnode->env.timezone, TSDB_TIMEZONE_LEN);
tstrncpy(pStatus->clusterCfg.timezone, pDnode->env.timezone, TSDB_TIMEZONE_LEN); memcpy(req.clusterCfg.locale, pDnode->env.locale, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.locale, pDnode->env.locale, TSDB_LOCALE_LEN); memcpy(req.clusterCfg.charset, pDnode->env.charset, TSDB_LOCALE_LEN);
tstrncpy(pStatus->clusterCfg.charset, pDnode->env.charset, TSDB_LOCALE_LEN);
taosRUnLockLatch(&pMgmt->latch); taosRUnLockLatch(&pMgmt->latch);
dndGetVnodeLoads(pDnode, &pStatus->vnodeLoads); req.pVloads = taosArrayInit(TSDB_MAX_VNODES, sizeof(SVnodeLoad));
contLen = sizeof(SStatusReq) + pStatus->vnodeLoads.num * sizeof(SVnodeLoad); dndGetVnodeLoads(pDnode, req.pVloads);
SRpcMsg rpcMsg = {.pCont = pStatus, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527}; int32_t contLen = tSerializeSStatusReq(NULL, &req);
void *pHead = rpcMallocCont(contLen);
void *pBuf = pHead;
tSerializeSStatusReq(&pBuf, &req);
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .ahandle = (void *)9527};
pMgmt->statusSent = 1; pMgmt->statusSent = 1;
dTrace("pDnode:%p, send status req to mnode", pDnode); dTrace("pDnode:%p, send status req to mnode", pDnode);

View File

@ -1008,11 +1008,10 @@ void dndCleanupVnodes(SDnode *pDnode) {
dInfo("dnode-vnodes is cleaned up"); dInfo("dnode-vnodes is cleaned up");
} }
void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) { void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodesMgmt *pMgmt = &pDnode->vmgmt;
taosRLockLatch(&pMgmt->latch); taosRLockLatch(&pMgmt->latch);
pLoads->num = taosHashGetSize(pMgmt->hash);
int32_t v = 0; int32_t v = 0;
void *pIter = taosHashIterate(pMgmt->hash, NULL); void *pIter = taosHashIterate(pMgmt->hash, NULL);
@ -1021,14 +1020,9 @@ void dndGetVnodeLoads(SDnode *pDnode, SVnodeLoads *pLoads) {
if (ppVnode == NULL || *ppVnode == NULL) continue; if (ppVnode == NULL || *ppVnode == NULL) continue;
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
SVnodeLoad *pLoad = &pLoads->data[v++]; SVnodeLoad vload = {0};
vnodeGetLoad(pVnode->pImpl, &vload);
vnodeGetLoad(pVnode->pImpl, pLoad); taosArrayPush(pLoads, &vload);
pLoad->vgId = htonl(pLoad->vgId);
pLoad->totalStorage = htobe64(pLoad->totalStorage);
pLoad->compStorage = htobe64(pLoad->compStorage);
pLoad->pointsWritten = htobe64(pLoad->pointsWritten);
pLoad->tablesNum = htobe64(pLoad->tablesNum);
pIter = taosHashIterate(pMgmt->hash, pIter); pIter = taosHashIterate(pMgmt->hash, pIter);
} }

View File

@ -299,50 +299,29 @@ static int32_t mndCheckClusterCfgPara(SMnode *pMnode, const SClusterCfg *pCfg) {
return 0; return 0;
} }
static void mndParseStatusMsg(SStatusReq *pStatus) {
pStatus->sver = htonl(pStatus->sver);
pStatus->dver = htobe64(pStatus->dver);
pStatus->dnodeId = htonl(pStatus->dnodeId);
pStatus->clusterId = htobe64(pStatus->clusterId);
pStatus->rebootTime = htobe64(pStatus->rebootTime);
pStatus->updateTime = htobe64(pStatus->updateTime);
pStatus->numOfCores = htonl(pStatus->numOfCores);
pStatus->numOfSupportVnodes = htonl(pStatus->numOfSupportVnodes);
pStatus->clusterCfg.statusInterval = htonl(pStatus->clusterCfg.statusInterval);
pStatus->clusterCfg.checkTime = htobe64(pStatus->clusterCfg.checkTime);
for (int32_t v = 0; v < pStatus->vnodeLoads.num; ++v) {
SVnodeLoad *pVload = &pStatus->vnodeLoads.data[v];
pVload->vgId = htonl(pVload->vgId);
pVload->totalStorage = htobe64(pVload->totalStorage);
pVload->compStorage = htobe64(pVload->compStorage);
pVload->pointsWritten = htobe64(pVload->pointsWritten);
pVload->tablesNum = htobe64(pVload->tablesNum);
}
}
static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
SMnode *pMnode = pReq->pMnode; SMnode *pMnode = pReq->pMnode;
SStatusReq *pStatus = pReq->rpcMsg.pCont; SStatusReq statusReq = {0};
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
int32_t code = -1; int32_t code = -1;
mndParseStatusMsg(pStatus); if (tDeserializeSStatusReq(pReq->rpcMsg.pCont, &statusReq) == NULL) goto PROCESS_STATUS_MSG_OVER;
if (pStatus->dnodeId == 0) { if (statusReq.dnodeId == 0) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
if (pDnode == NULL) { if (pDnode == NULL) {
mDebug("dnode:%s, not created yet", pStatus->dnodeEp); mDebug("dnode:%s, not created yet", statusReq.dnodeEp);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
} }
} else { } else {
pDnode = mndAcquireDnode(pMnode, pStatus->dnodeId); pDnode = mndAcquireDnode(pMnode, statusReq.dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
pDnode = mndAcquireDnodeByEp(pMnode, pStatus->dnodeEp); pDnode = mndAcquireDnodeByEp(pMnode, statusReq.dnodeEp);
if (pDnode != NULL) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH; pDnode->offlineReason = DND_REASON_DNODE_ID_NOT_MATCH;
} }
mError("dnode:%d, %s not exist", pStatus->dnodeId, pStatus->dnodeEp); mError("dnode:%d, %s not exist", statusReq.dnodeId, statusReq.dnodeEp);
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
} }
@ -350,28 +329,28 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
bool dnodeChanged = (pStatus->dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); bool dnodeChanged = (statusReq.dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE));
bool reboot = (pDnode->rebootTime != pStatus->rebootTime); bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
bool needCheck = !online || dnodeChanged || reboot; bool needCheck = !online || dnodeChanged || reboot;
if (needCheck) { if (needCheck) {
if (pStatus->sver != pMnode->cfg.sver) { if (statusReq.sver != pMnode->cfg.sver) {
if (pDnode != NULL) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH; pDnode->offlineReason = DND_REASON_VERSION_NOT_MATCH;
} }
mError("dnode:%d, status msg version:%d not match cluster:%d", pStatus->dnodeId, pStatus->sver, pMnode->cfg.sver); mError("dnode:%d, status msg version:%d not match cluster:%d", statusReq.dnodeId, statusReq.sver, pMnode->cfg.sver);
terrno = TSDB_CODE_MND_INVALID_MSG_VERSION; terrno = TSDB_CODE_MND_INVALID_MSG_VERSION;
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
} }
if (pStatus->dnodeId == 0) { if (statusReq.dnodeId == 0) {
mDebug("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId); mDebug("dnode:%d, %s first access, set clusterId %" PRId64, pDnode->id, pDnode->ep, pMnode->clusterId);
} else { } else {
if (pStatus->clusterId != pMnode->clusterId) { if (statusReq.clusterId != pMnode->clusterId) {
if (pDnode != NULL) { if (pDnode != NULL) {
pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH; pDnode->offlineReason = DND_REASON_CLUSTER_ID_NOT_MATCH;
} }
mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, pStatus->clusterId, mError("dnode:%d, clusterId %" PRId64 " not match exist %" PRId64, pDnode->id, statusReq.clusterId,
pMnode->clusterId); pMnode->clusterId);
terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID; terrno = TSDB_CODE_MND_INVALID_CLUSTER_ID;
goto PROCESS_STATUS_MSG_OVER; goto PROCESS_STATUS_MSG_OVER;
@ -382,7 +361,7 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
} }
// Verify whether the cluster parameters are consistent when status change from offline to ready // Verify whether the cluster parameters are consistent when status change from offline to ready
int32_t ret = mndCheckClusterCfgPara(pMnode, &pStatus->clusterCfg); int32_t ret = mndCheckClusterCfgPara(pMnode, &statusReq.clusterCfg);
if (0 != ret) { if (0 != ret) {
pDnode->offlineReason = ret; pDnode->offlineReason = ret;
mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]); mError("dnode:%d, cluster cfg inconsistent since:%s", pDnode->id, offlineReason[ret]);
@ -396,9 +375,9 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
mDebug("dnode:%d, send dnode eps", pDnode->id); mDebug("dnode:%d, send dnode eps", pDnode->id);
} }
pDnode->rebootTime = pStatus->rebootTime; pDnode->rebootTime = statusReq.rebootTime;
pDnode->numOfCores = pStatus->numOfCores; pDnode->numOfCores = statusReq.numOfCores;
pDnode->numOfSupportVnodes = pStatus->numOfSupportVnodes; pDnode->numOfSupportVnodes = statusReq.numOfSupportVnodes;
int32_t numOfEps = mndGetDnodeSize(pMnode); int32_t numOfEps = mndGetDnodeSize(pMnode);
int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp); int32_t contLen = sizeof(SStatusRsp) + numOfEps * sizeof(SDnodeEp);