Merge pull request #10198 from taosdata/feature/mnode
collect vload infos
This commit is contained in:
commit
8b3a865134
|
@ -614,10 +614,11 @@ typedef struct {
|
|||
typedef struct {
|
||||
int32_t vgId;
|
||||
int8_t role;
|
||||
int64_t numOfTables;
|
||||
int64_t numOfTimeSeries;
|
||||
int64_t totalStorage;
|
||||
int64_t compStorage;
|
||||
int64_t pointsWritten;
|
||||
int64_t tablesNum;
|
||||
} SVnodeLoad;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -489,10 +489,11 @@ int32_t tSerializeSStatusReq(void **buf, SStatusReq *pReq) {
|
|||
SVnodeLoad *pload = taosArrayGet(pReq->pVloads, i);
|
||||
tlen += taosEncodeFixedI32(buf, pload->vgId);
|
||||
tlen += taosEncodeFixedI8(buf, pload->role);
|
||||
tlen += taosEncodeFixedI64(buf, pload->numOfTables);
|
||||
tlen += taosEncodeFixedI64(buf, pload->numOfTimeSeries);
|
||||
tlen += taosEncodeFixedI64(buf, pload->totalStorage);
|
||||
tlen += taosEncodeFixedI64(buf, pload->compStorage);
|
||||
tlen += taosEncodeFixedI64(buf, pload->pointsWritten);
|
||||
tlen += taosEncodeFixedI64(buf, pload->tablesNum);
|
||||
}
|
||||
|
||||
return tlen;
|
||||
|
@ -531,10 +532,11 @@ void *tDeserializeSStatusReq(void *buf, SStatusReq *pReq) {
|
|||
SVnodeLoad vload = {0};
|
||||
buf = taosDecodeFixedI32(buf, &vload.vgId);
|
||||
buf = taosDecodeFixedI8(buf, &vload.role);
|
||||
buf = taosDecodeFixedI64(buf, &vload.numOfTables);
|
||||
buf = taosDecodeFixedI64(buf, &vload.numOfTimeSeries);
|
||||
buf = taosDecodeFixedI64(buf, &vload.totalStorage);
|
||||
buf = taosDecodeFixedI64(buf, &vload.compStorage);
|
||||
buf = taosDecodeFixedI64(buf, &vload.pointsWritten);
|
||||
buf = taosDecodeFixedI64(buf, &vload.tablesNum);
|
||||
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
|
|
@ -283,8 +283,8 @@ typedef struct {
|
|||
uint32_t hashEnd;
|
||||
char dbName[TSDB_DB_FNAME_LEN];
|
||||
int64_t dbUid;
|
||||
int32_t numOfTables;
|
||||
int32_t numOfTimeSeries;
|
||||
int64_t numOfTables;
|
||||
int64_t numOfTimeSeries;
|
||||
int64_t totalStorage;
|
||||
int64_t compStorage;
|
||||
int64_t pointsWritten;
|
||||
|
|
|
@ -322,6 +322,27 @@ static int32_t mndProcessStatusReq(SMnodeMsg *pReq) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t numOfVloads = (int32_t)taosArrayGetSize(statusReq.pVloads);
|
||||
for (int32_t v = 0; v < numOfVloads; ++v) {
|
||||
SVnodeLoad *pVload = taosArrayGet(statusReq.pVloads, v);
|
||||
|
||||
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVload->vgId);
|
||||
if (pVgroup != NULL) {
|
||||
if (pVload->role == TAOS_SYNC_STATE_LEADER) {
|
||||
pVgroup->numOfTables = pVload->numOfTables;
|
||||
pVgroup->numOfTimeSeries = pVload->numOfTimeSeries;
|
||||
pVgroup->totalStorage = pVload->totalStorage;
|
||||
pVgroup->compStorage = pVload->compStorage;
|
||||
pVgroup->pointsWritten = pVload->pointsWritten;
|
||||
}
|
||||
for (int32_t vg = 0; vg < pVgroup->replica; ++vg) {
|
||||
pVgroup->vnodeGid[vg].role = pVload->role;
|
||||
}
|
||||
}
|
||||
|
||||
mndReleaseVgroup(pMnode, pVgroup);
|
||||
}
|
||||
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
bool online = mndIsDnodeOnline(pMnode, pDnode, curMs);
|
||||
bool dnodeChanged = (statusReq.dver != sdbGetTableVer(pMnode->pSdb, SDB_DNODE));
|
||||
|
|
|
@ -462,6 +462,8 @@ void mndProcessMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
PROCESS_RPC_END:
|
||||
if (isReq) {
|
||||
if (pMsg->rpcMsg.handle == NULL) return;
|
||||
|
||||
if (code == TSDB_CODE_APP_NOT_READY) {
|
||||
mndSendRedirectRsp(pMnode, &pMsg->rpcMsg);
|
||||
} else if (code != 0) {
|
||||
|
|
|
@ -28,6 +28,7 @@ target_link_libraries(
|
|||
PUBLIC scheduler
|
||||
PUBLIC executor
|
||||
PUBLIC qworker
|
||||
PUBLIC sync
|
||||
)
|
||||
|
||||
if(${BUILD_TEST})
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "vnd.h"
|
||||
#include "sync.h"
|
||||
// #include "vnodeInt.h"
|
||||
|
||||
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
|
||||
|
@ -23,7 +24,16 @@ int32_t vnodeCompact(SVnode *pVnode) { return 0; }
|
|||
|
||||
int32_t vnodeSync(SVnode *pVnode) { return 0; }
|
||||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; }
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||
pLoad->vgId = pVnode->vgId;
|
||||
pLoad->role = TAOS_SYNC_STATE_LEADER;
|
||||
pLoad->numOfTables = 500;
|
||||
pLoad->numOfTimeSeries = 400;
|
||||
pLoad->totalStorage = 300;
|
||||
pLoad->compStorage = 200;
|
||||
pLoad->pointsWritten = 100;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
vInfo("sync message is processed");
|
||||
|
|
|
@ -419,7 +419,7 @@ void rpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t
|
|||
}
|
||||
|
||||
void rpcSendResponse(const SRpcMsg *pRsp) {
|
||||
if (pRsp->handle == NULL) return;
|
||||
ASSERT(pRsp->handle != NULL);
|
||||
|
||||
int msgLen = 0;
|
||||
SRpcConn *pConn = (SRpcConn *)pRsp->handle;
|
||||
|
|
Loading…
Reference in New Issue