diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f47f895897..257d157d5e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -614,10 +614,11 @@ typedef struct { typedef struct { int32_t vgId; int8_t role; + int64_t numOfTables; + int64_t numOfTimeSeries; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; - int64_t tablesNum; } SVnodeLoad; typedef struct { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index d9d3cca862..a114068936 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -489,10 +489,11 @@ int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) { SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i); tlen += taosEncodeFixedI32(buf, pload->vgId); tlen += taosEncodeFixedI8(buf, pload->role); + tlen += taosEncodeFixedI64(buf, pload->numOfTables); + tlen += taosEncodeFixedI64(buf, pload->numOfTimeSeries); tlen += taosEncodeFixedI64(buf, pload->totalStorage); tlen += taosEncodeFixedI64(buf, pload->compStorage); tlen += taosEncodeFixedI64(buf, pload->pointsWritten); - tlen += taosEncodeFixedI64(buf, pload->tablesNum); } return tlen; @@ -531,10 +532,11 @@ void *tDeserializeSStatusReq(void *buf, SStatusReq *pReq) { SVnodeLoad vload = {0}; buf = taosDecodeFixedI32(buf, &vload.vgId); buf = taosDecodeFixedI8(buf, &vload.role); + buf = taosDecodeFixedI64(buf, &vload.numOfTables); + buf = taosDecodeFixedI64(buf, &vload.numOfTimeSeries); buf = taosDecodeFixedI64(buf, &vload.totalStorage); buf = taosDecodeFixedI64(buf, &vload.compStorage); buf = taosDecodeFixedI64(buf, &vload.pointsWritten); - buf = taosDecodeFixedI64(buf, &vload.tablesNum); if (taosArrayPush(pReq->pVloads, &vload) == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index d26ff1c32f..bf21c4ea7b 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -283,8 +283,8 @@ typedef struct { uint32_t hashEnd; char dbName[TSDB_DB_FNAME_LEN]; int64_t dbUid; - int32_t numOfTables; - int32_t numOfTimeSeries; + int64_t numOfTables; + int64_t numOfTimeSeries; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index c7a2724a44..93c6b492bd 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -322,6 +322,27 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) { } } + int32_t numOfVloads = (int32_t)taosArrayGetSize(statusReq.pVloads); + for (int32_t v = 0; v < numOfVloads; ++v) { + SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v); + + SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId); + if (pVgroup != NULL) { + if (pVload->role == TAOS_SYNC_STATE_LEADER) { + pVgroup->numOfTables = pVload->numOfTables; + pVgroup->numOfTimeSeries = pVload->numOfTimeSeries; + pVgroup->totalStorage = pVload->totalStorage; + pVgroup->compStorage = pVload->compStorage; + pVgroup->pointsWritten = pVload->pointsWritten; + } + for (int32_t vg = 0; vg < pVgroup->replica; ++vg) { + pVgroup->vnodeGid[vg].role = pVload->role; + } + } + + mndReleaseVgroup(pMnode, pVgroup); + } + int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pMnode, pDnode, curMs); bool dnodeChanged = (statusReq.dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE)); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index aef58b7acc..ca851748d6 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -462,6 +462,8 @@ void mndProcessMsg(SMnodeMsg *pMsg) { PROCESS_RPC_END: if (isReq) { + if (pMsg->rpcMsg.handle == NULL) return; + if (code == TSDB_CODE_APP_NOT_READY) { mndSendRedirectRsp(pMnode, &pMsg->rpcMsg); } else if (code != 0) { diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index e625c56db1..e15a2ee883 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -28,6 +28,7 @@ target_link_libraries( PUBLIC scheduler PUBLIC executor PUBLIC qworker + PUBLIC sync ) if(${BUILD_TEST}) diff --git a/source/dnode/vnode/src/vnd/vnodeInt.c b/source/dnode/vnode/src/vnd/vnodeInt.c index 3d23784e13..6d3fa5f7f3 100644 --- a/source/dnode/vnode/src/vnd/vnodeInt.c +++ b/source/dnode/vnode/src/vnd/vnodeInt.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "vnd.h" +#include "sync.h" // #include "vnodeInt.h" int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } @@ -23,7 +24,16 @@ int32_t vnodeCompact(SVnode *pVnode) { return 0; } int32_t vnodeSync(SVnode *pVnode) { return 0; } -int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; } +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { + pLoad->vgId = pVnode->vgId; + pLoad->role = TAOS_SYNC_STATE_LEADER; + pLoad->numOfTables = 500; + pLoad->numOfTimeSeries = 400; + pLoad->totalStorage = 300; + pLoad->compStorage = 200; + pLoad->pointsWritten = 100; + return 0; +} int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vInfo("sync message is processed"); diff --git a/source/libs/transport/src/rpcMain.c b/source/libs/transport/src/rpcMain.c index f5e9153098..3bb7d103d7 100644 --- a/source/libs/transport/src/rpcMain.c +++ b/source/libs/transport/src/rpcMain.c @@ -419,7 +419,7 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t } void rpcSendResponse(const SRpcMsg *pRsp) { - if (pRsp->handle == NULL) return; + ASSERT(pRsp->handle != NULL); int msgLen = 0; SRpcConn *pConn = (SRpcConn *)pRsp->handle;