remove un necessary vload
This commit is contained in:
parent
8e444435d1
commit
d099ab73b9
|
@ -51,7 +51,6 @@ typedef struct SDnodeObj {
|
||||||
int8_t reserved[15];
|
int8_t reserved[15];
|
||||||
int8_t updateEnd[1];
|
int8_t updateEnd[1];
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
SVnodeLoad vload[TSDB_MAX_VNODES];
|
|
||||||
uint32_t moduleStatus;
|
uint32_t moduleStatus;
|
||||||
uint32_t lastReboot; // time stamp for last reboot
|
uint32_t lastReboot; // time stamp for last reboot
|
||||||
float score; // calc in balance function
|
float score; // calc in balance function
|
||||||
|
@ -72,13 +71,6 @@ typedef struct SMnodeObj {
|
||||||
SDnodeObj *pDnode;
|
SDnodeObj *pDnode;
|
||||||
} SMnodeObj;
|
} SMnodeObj;
|
||||||
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t dnodeId;
|
|
||||||
uint32_t privateIp;
|
|
||||||
uint32_t publicIp;
|
|
||||||
} SVnodeGid;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char tableId[TSDB_TABLE_ID_LEN + 1];
|
char tableId[TSDB_TABLE_ID_LEN + 1];
|
||||||
int8_t type;
|
int8_t type;
|
||||||
|
@ -120,24 +112,34 @@ typedef struct {
|
||||||
SSuperTableObj *superTable;
|
SSuperTableObj *superTable;
|
||||||
} SChildTableObj;
|
} SChildTableObj;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t dnodeId;
|
||||||
|
int8_t role;
|
||||||
|
int8_t reserved[3];
|
||||||
|
SDnodeObj* pDnode;
|
||||||
|
} SVnodeGid;
|
||||||
|
|
||||||
typedef struct SVgObj {
|
typedef struct SVgObj {
|
||||||
uint32_t vgId;
|
uint32_t vgId;
|
||||||
char dbName[TSDB_DB_NAME_LEN + 1];
|
char dbName[TSDB_DB_NAME_LEN + 1];
|
||||||
int64_t createdTime;
|
int64_t createdTime;
|
||||||
SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT];
|
SVnodeGid vnodeGid[TSDB_VNODES_SUPPORT];
|
||||||
int32_t numOfVnodes;
|
int32_t numOfVnodes;
|
||||||
int32_t lbDnodeId;
|
int32_t lbDnodeId;
|
||||||
int32_t lbTime;
|
int32_t lbTime;
|
||||||
int8_t status;
|
int8_t status;
|
||||||
int8_t inUse;
|
int8_t inUse;
|
||||||
int8_t reserved[13];
|
int8_t reserved[13];
|
||||||
int8_t updateEnd[1];
|
int8_t updateEnd[1];
|
||||||
int32_t refCount;
|
int32_t refCount;
|
||||||
struct SVgObj *prev, *next;
|
struct SVgObj *prev, *next;
|
||||||
struct SDbObj *pDb;
|
struct SDbObj *pDb;
|
||||||
int32_t numOfTables;
|
int32_t numOfTables;
|
||||||
void * idPool;
|
int64_t totalStorage;
|
||||||
SChildTableObj ** tableList;
|
int64_t compStorage;
|
||||||
|
int64_t pointsWritten;
|
||||||
|
void * idPool;
|
||||||
|
SChildTableObj **tableList;
|
||||||
} SVgObj;
|
} SVgObj;
|
||||||
|
|
||||||
typedef struct SDbObj {
|
typedef struct SDbObj {
|
||||||
|
|
|
@ -35,7 +35,8 @@ void mgmtMonitorDnodeModule();
|
||||||
|
|
||||||
int32_t mgmtGetDnodesNum();
|
int32_t mgmtGetDnodesNum();
|
||||||
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode);
|
void * mgmtGetNextDnode(void *pNode, SDnodeObj **pDnode);
|
||||||
void mgmtReleaseDnode(SDnodeObj *pDnode);
|
void mgmtIncDnodeRef(SDnodeObj *pDnode);
|
||||||
|
void mgmtDecDnodeRef(SDnodeObj *pDnode);
|
||||||
void * mgmtGetDnode(int32_t dnodeId);
|
void * mgmtGetDnode(int32_t dnodeId);
|
||||||
void * mgmtGetDnodeByIp(uint32_t ip);
|
void * mgmtGetDnodeByIp(uint32_t ip);
|
||||||
void mgmtUpdateDnode(SDnodeObj *pDnode);
|
void mgmtUpdateDnode(SDnodeObj *pDnode);
|
||||||
|
|
|
@ -24,7 +24,8 @@ extern "C" {
|
||||||
int32_t mgmtInitUsers();
|
int32_t mgmtInitUsers();
|
||||||
void mgmtCleanUpUsers();
|
void mgmtCleanUpUsers();
|
||||||
SUserObj *mgmtGetUser(char *name);
|
SUserObj *mgmtGetUser(char *name);
|
||||||
void mgmtReleaseUser(SUserObj *pUser);
|
void mgmtIncUserRef(SUserObj *pUser);
|
||||||
|
void mgmtDecUserRef(SUserObj *pUser);
|
||||||
SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp);
|
SUserObj *mgmtGetUserFromConn(void *pConn, bool *usePublicIp);
|
||||||
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
|
int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass);
|
||||||
void mgmtDropAllUsers(SAcctObj *pAcct);
|
void mgmtDropAllUsers(SAcctObj *pAcct);
|
||||||
|
|
|
@ -30,12 +30,13 @@ enum _TSDB_VG_STATUS {
|
||||||
int32_t mgmtInitVgroups();
|
int32_t mgmtInitVgroups();
|
||||||
void mgmtCleanUpVgroups();
|
void mgmtCleanUpVgroups();
|
||||||
SVgObj *mgmtGetVgroup(int32_t vgId);
|
SVgObj *mgmtGetVgroup(int32_t vgId);
|
||||||
void mgmtReleaseVgroup(SVgObj *pVgroup);
|
void mgmtIncVgroupRef(SVgObj *pVgroup);
|
||||||
|
void mgmtDecVgroupRef(SVgObj *pVgroup);
|
||||||
void mgmtDropAllVgroups(SDbObj *pDropDb);
|
void mgmtDropAllVgroups(SDbObj *pDropDb);
|
||||||
|
|
||||||
void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup);
|
void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup);
|
||||||
void mgmtUpdateVgroup(SVgObj *pVgroup);
|
void mgmtUpdateVgroup(SVgObj *pVgroup);
|
||||||
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, int32_t dnodeId, SVnodeLoad *pVload);
|
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *dnodeId, SVnodeLoad *pVload);
|
||||||
|
|
||||||
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb);
|
void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb);
|
||||||
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
|
void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle);
|
||||||
|
|
|
@ -47,7 +47,7 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
|
||||||
vnodeUsage = usage;
|
vnodeUsage = usage;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSelDnode == NULL) {
|
if (pSelDnode == NULL) {
|
||||||
|
@ -56,8 +56,7 @@ int32_t balanceAllocVnodes(SVgObj *pVgroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pVgroup->vnodeGid[0].dnodeId = pSelDnode->dnodeId;
|
pVgroup->vnodeGid[0].dnodeId = pSelDnode->dnodeId;
|
||||||
pVgroup->vnodeGid[0].privateIp = pSelDnode->privateIp;
|
pVgroup->vnodeGid[0].pDnode = pSelDnode;
|
||||||
pVgroup->vnodeGid[0].publicIp = pSelDnode->publicIp;
|
|
||||||
|
|
||||||
mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes);
|
mTrace("dnode:%d, alloc one vnode to vgroup, openVnodes:%d", pSelDnode->dnodeId, pSelDnode->openVnodes);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -527,7 +527,7 @@ static int32_t mgmtGetDbMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn)
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
pShow->numOfRows = pUser->pAcct->acctInfo.numOfDbs;
|
pShow->numOfRows = pUser->pAcct->acctInfo.numOfDbs;
|
||||||
|
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -647,7 +647,7 @@ static int32_t mgmtRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void *
|
||||||
}
|
}
|
||||||
|
|
||||||
pShow->numOfReads += numOfRows;
|
pShow->numOfReads += numOfRows;
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,6 +23,7 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "tsocket.h"
|
#include "tsocket.h"
|
||||||
#include "tbalance.h"
|
#include "tbalance.h"
|
||||||
|
#include "tsync.h"
|
||||||
#include "dnode.h"
|
#include "dnode.h"
|
||||||
#include "mgmtDef.h"
|
#include "mgmtDef.h"
|
||||||
#include "mgmtLog.h"
|
#include "mgmtLog.h"
|
||||||
|
@ -139,7 +140,7 @@ static int32_t mgmtDnodeActionRestored() {
|
||||||
mgmtCreateDnode(ip);
|
mgmtCreateDnode(ip);
|
||||||
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
|
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
|
||||||
mgmtAddMnode(pDnode->dnodeId);
|
mgmtAddMnode(pDnode->dnodeId);
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -215,13 +216,17 @@ void *mgmtGetDnodeByIp(uint32_t ip) {
|
||||||
if (ip == pDnode->privateIp) {
|
if (ip == pDnode->privateIp) {
|
||||||
return pDnode;
|
return pDnode;
|
||||||
}
|
}
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtReleaseDnode(SDnodeObj *pDnode) {
|
void mgmtIncDnodeRef(SDnodeObj *pDnode) {
|
||||||
|
sdbIncRef(tsDnodeSdb, pDnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mgmtDecDnodeRef(SDnodeObj *pDnode) {
|
||||||
sdbDecRef(tsDnodeSdb, pDnode);
|
sdbDecRef(tsDnodeSdb, pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -326,19 +331,21 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
|
||||||
int32_t openVnodes = htons(pStatus->openVnodes);
|
int32_t openVnodes = htons(pStatus->openVnodes);
|
||||||
for (int32_t j = 0; j < openVnodes; ++j) {
|
for (int32_t j = 0; j < openVnodes; ++j) {
|
||||||
SVnodeLoad *pVload = &pStatus->load[j];
|
SVnodeLoad *pVload = &pStatus->load[j];
|
||||||
pDnode->vload[j].vgId = htonl(pVload->vgId);
|
pVload->vgId = htonl(pVload->vgId);
|
||||||
pDnode->vload[j].totalStorage = htobe64(pVload->totalStorage);
|
|
||||||
pDnode->vload[j].compStorage = htobe64(pVload->compStorage);
|
SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId);
|
||||||
pDnode->vload[j].pointsWritten = htobe64(pVload->pointsWritten);
|
|
||||||
|
|
||||||
SVgObj *pVgroup = mgmtGetVgroup(pDnode->vload[j].vgId);
|
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
|
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp);
|
||||||
mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId);
|
mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pVload->vgId);
|
||||||
mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL);
|
mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL);
|
||||||
} else {
|
} else {
|
||||||
mgmtUpdateVgroupStatus(pVgroup, pDnode->dnodeId, pVload);
|
mgmtUpdateVgroupStatus(pVgroup, pDnode, pVload);
|
||||||
mgmtReleaseVgroup(pVgroup);
|
if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
|
||||||
|
pVgroup->totalStorage = htobe64(pVload->totalStorage);
|
||||||
|
pVgroup->compStorage = htobe64(pVload->compStorage);
|
||||||
|
pVgroup->pointsWritten = htobe64(pVload->pointsWritten);
|
||||||
|
}
|
||||||
|
mgmtDecVgroupRef(pVgroup);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -348,7 +355,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
|
||||||
balanceNotify();
|
balanceNotify();
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
|
|
||||||
int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SDMVgroupAccess);
|
int32_t contLen = sizeof(SDMStatusRsp) + TSDB_MAX_VNODES * sizeof(SDMVgroupAccess);
|
||||||
SDMStatusRsp *pRsp = rpcMallocCont(contLen);
|
SDMStatusRsp *pRsp = rpcMallocCont(contLen);
|
||||||
|
@ -554,7 +561,7 @@ static int32_t mgmtGetDnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
pShow->pNode = NULL;
|
pShow->pNode = NULL;
|
||||||
|
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -604,7 +611,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
|
||||||
|
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
pShow->numOfReads += numOfRows;
|
pShow->numOfReads += numOfRows;
|
||||||
|
@ -661,7 +668,7 @@ static int32_t mgmtGetModuleMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
|
||||||
pShow->numOfRows = mgmtGetDnodesNum() * TSDB_MOD_MAX;
|
pShow->numOfRows = mgmtGetDnodesNum() * TSDB_MOD_MAX;
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
pShow->pNode = NULL;
|
pShow->pNode = NULL;
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -712,7 +719,7 @@ int32_t mgmtRetrieveModules(SShowObj *pShow, char *data, int32_t rows, void *pCo
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
pShow->numOfReads += numOfRows;
|
pShow->numOfReads += numOfRows;
|
||||||
|
@ -762,7 +769,7 @@ static int32_t mgmtGetConfigMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pC
|
||||||
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
pShow->pNode = NULL;
|
pShow->pNode = NULL;
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -840,35 +847,18 @@ static int32_t mgmtGetVnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
|
||||||
if (pShow->payloadLen > 0 ) {
|
if (pShow->payloadLen > 0 ) {
|
||||||
uint32_t ip = ip2uint(pShow->payload);
|
uint32_t ip = ip2uint(pShow->payload);
|
||||||
pDnode = mgmtGetDnodeByIp(ip);
|
pDnode = mgmtGetDnodeByIp(ip);
|
||||||
if (NULL == pDnode) {
|
|
||||||
return TSDB_CODE_NODE_OFFLINE;
|
|
||||||
}
|
|
||||||
|
|
||||||
SVnodeLoad* pVnode;
|
|
||||||
pShow->numOfRows = 0;
|
|
||||||
for (int32_t i = 0 ; i < TSDB_MAX_VNODES; i++) {
|
|
||||||
pVnode = &pDnode->vload[i];
|
|
||||||
if (0 != pVnode->vgId) {
|
|
||||||
pShow->numOfRows++;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pShow->pNode = pDnode;
|
|
||||||
} else {
|
} else {
|
||||||
while (true) {
|
mgmtGetNextDnode(NULL, (SDnodeObj **)&pDnode);
|
||||||
pShow->pNode = mgmtGetNextDnode(pShow->pNode, (SDnodeObj **)&pDnode);
|
}
|
||||||
if (pDnode == NULL) break;
|
|
||||||
pShow->numOfRows += pDnode->openVnodes;
|
|
||||||
|
|
||||||
if (0 == pShow->numOfRows) return TSDB_CODE_NODE_OFFLINE;
|
if (pDnode != NULL) {
|
||||||
}
|
pShow->numOfRows += pDnode->openVnodes;
|
||||||
|
mgmtDecDnodeRef(pDnode);
|
||||||
pShow->pNode = NULL;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
mgmtReleaseDnode(pDnode);
|
pShow->pNode = pDnode;
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -881,35 +871,35 @@ static int32_t mgmtRetrieveVnodes(SShowObj *pShow, char *data, int32_t rows, voi
|
||||||
|
|
||||||
if (0 == rows) return 0;
|
if (0 == rows) return 0;
|
||||||
|
|
||||||
if (pShow->payloadLen) {
|
pDnode = (SDnodeObj *)(pShow->pNode);
|
||||||
// output the vnodes info of the designated dnode. And output all vnodes of this dnode, instead of rows (max 100)
|
if (pDnode != NULL) {
|
||||||
pDnode = (SDnodeObj *)(pShow->pNode);
|
void *pNode = NULL;
|
||||||
if (pDnode != NULL) {
|
SVgObj *pVgroup;
|
||||||
SVnodeLoad* pVnode;
|
while (1) {
|
||||||
for (int32_t i = 0 ; i < TSDB_MAX_VNODES; i++) {
|
pNode = mgmtGetNextVgroup(pNode, &pVgroup);
|
||||||
pVnode = &pDnode->vload[i];
|
if (pVgroup == NULL) break;
|
||||||
if (0 == pVnode->vgId) {
|
|
||||||
continue;
|
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||||
|
if (pVgid->pDnode == pDnode) {
|
||||||
|
cols = 0;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
*(uint32_t *)pWrite = pVgroup->vgId;
|
||||||
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
strcpy(pWrite, mgmtGetMnodeRoleStr(pVgid->role));
|
||||||
|
cols++;
|
||||||
}
|
}
|
||||||
|
|
||||||
cols = 0;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
*(uint32_t *)pWrite = pVnode->vgId;
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
strcpy(pWrite, pVnode->status ? "ready" : "offline");
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
numOfRows++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mgmtDecVgroupRef(pVgroup);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// TODO: output all vnodes of all dnodes
|
|
||||||
numOfRows = 0;
|
numOfRows = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
pShow->numOfReads += numOfRows;
|
pShow->numOfReads += numOfRows;
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,7 +47,7 @@ static int32_t mgmtMnodeActionInsert(SSdbOper *pOper) {
|
||||||
|
|
||||||
pMnode->pDnode = pDnode;
|
pMnode->pDnode = pDnode;
|
||||||
pDnode->isMgmt = true;
|
pDnode->isMgmt = true;
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -58,7 +58,7 @@ static int32_t mgmtMnodeActionDelete(SSdbOper *pOper) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
|
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
|
||||||
if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
|
if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
|
||||||
pDnode->isMgmt = false;
|
pDnode->isMgmt = false;
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
|
|
||||||
mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId);
|
mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -314,7 +314,7 @@ static int32_t mgmtGetMnodeMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCo
|
||||||
pShow->numOfRows = mgmtGetMnodesNum();
|
pShow->numOfRows = mgmtGetMnodesNum();
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
pShow->pNode = NULL;
|
pShow->pNode = NULL;
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -790,12 +790,12 @@ void *mgmtMallocQueuedMsg(SRpcMsg *rpcMsg) {
|
||||||
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
|
void mgmtFreeQueuedMsg(SQueuedMsg *pMsg) {
|
||||||
if (pMsg != NULL) {
|
if (pMsg != NULL) {
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
if (pMsg->pUser) mgmtReleaseUser(pMsg->pUser);
|
if (pMsg->pUser) mgmtDecUserRef(pMsg->pUser);
|
||||||
if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb);
|
if (pMsg->pDb) mgmtDecDbRef(pMsg->pDb);
|
||||||
if (pMsg->pVgroup) mgmtReleaseVgroup(pMsg->pVgroup);
|
if (pMsg->pVgroup) mgmtDecVgroupRef(pMsg->pVgroup);
|
||||||
if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable);
|
if (pMsg->pTable) mgmtDecTableRef(pMsg->pTable);
|
||||||
if (pMsg->pAcct) mgmtDecAcctRef(pMsg->pAcct);
|
if (pMsg->pAcct) mgmtDecAcctRef(pMsg->pAcct);
|
||||||
if (pMsg->pDnode) mgmtReleaseDnode(pMsg->pDnode);
|
if (pMsg->pDnode) mgmtDecDnodeRef(pMsg->pDnode);
|
||||||
free(pMsg);
|
free(pMsg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -371,11 +371,11 @@ static int mgmtShellRetriveAuth(char *user, char *spi, char *encrypt, char *secr
|
||||||
SUserObj *pUser = mgmtGetUser(user);
|
SUserObj *pUser = mgmtGetUser(user);
|
||||||
if (pUser == NULL) {
|
if (pUser == NULL) {
|
||||||
*secret = 0;
|
*secret = 0;
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
return TSDB_CODE_INVALID_USER;
|
return TSDB_CODE_INVALID_USER;
|
||||||
} else {
|
} else {
|
||||||
memcpy(secret, pUser->pass, TSDB_KEY_LEN);
|
memcpy(secret, pUser->pass, TSDB_KEY_LEN);
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -97,7 +97,7 @@ static int32_t mgmtChildTableActionInsert(SSdbOper *pOper) {
|
||||||
mError("ctable:%s, not in vgroup:%d", pTable->info.tableId, pTable->vgId);
|
mError("ctable:%s, not in vgroup:%d", pTable->info.tableId, pTable->vgId);
|
||||||
return TSDB_CODE_INVALID_VGROUP_ID;
|
return TSDB_CODE_INVALID_VGROUP_ID;
|
||||||
}
|
}
|
||||||
mgmtReleaseVgroup(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
|
|
||||||
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
|
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
|
@ -139,7 +139,7 @@ static int32_t mgmtChildTableActionDelete(SSdbOper *pOper) {
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
return TSDB_CODE_INVALID_VGROUP_ID;
|
return TSDB_CODE_INVALID_VGROUP_ID;
|
||||||
}
|
}
|
||||||
mgmtReleaseVgroup(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
|
|
||||||
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
|
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
|
@ -275,7 +275,7 @@ static int32_t mgmtChildTableActionRestored() {
|
||||||
pNode = pLastNode;
|
pNode = pLastNode;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
mgmtReleaseVgroup(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
|
|
||||||
if (strcmp(pVgroup->dbName, pDb->name) != 0) {
|
if (strcmp(pVgroup->dbName, pDb->name) != 0) {
|
||||||
mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
|
mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
|
||||||
|
@ -1194,17 +1194,15 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
|
||||||
|
|
||||||
pRsp->vgroups[vg].vgId = htonl(vgId);
|
pRsp->vgroups[vg].vgId = htonl(vgId);
|
||||||
for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
|
for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[vn].dnodeId);
|
SDnodeObj *pDnode = pVgroup->vnodeGid[vn].pDnode;
|
||||||
if (pDnode == NULL) break;
|
if (pDnode == NULL) break;
|
||||||
|
|
||||||
pRsp->vgroups[vg].ipAddr[vn].ip = htonl(pDnode->privateIp);
|
pRsp->vgroups[vg].ipAddr[vn].ip = htonl(pDnode->privateIp);
|
||||||
pRsp->vgroups[vg].ipAddr[vn].port = htons(tsDnodeShellPort);
|
pRsp->vgroups[vg].ipAddr[vn].port = htons(tsDnodeShellPort);
|
||||||
pRsp->vgroups[vg].numOfIps++;
|
pRsp->vgroups[vg].numOfIps++;
|
||||||
|
|
||||||
mgmtReleaseDnode(pDnode);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtReleaseVgroup(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
}
|
}
|
||||||
pRsp->numOfVgroups = htonl(vg);
|
pRsp->numOfVgroups = htonl(vg);
|
||||||
|
|
||||||
|
@ -1613,7 +1611,7 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
|
||||||
pMeta->vgroup.ipAddr[i].port = htonl(tsDnodeShellPort);
|
pMeta->vgroup.ipAddr[i].port = htonl(tsDnodeShellPort);
|
||||||
}
|
}
|
||||||
pMeta->vgroup.numOfIps++;
|
pMeta->vgroup.numOfIps++;
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
pMeta->vgroup.vgId = htonl(pVgroup->vgId);
|
pMeta->vgroup.vgId = htonl(pVgroup->vgId);
|
||||||
|
|
||||||
|
@ -1742,7 +1740,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_
|
||||||
|
|
||||||
SChildTableObj *pTable = pVgroup->tableList[sid];
|
SChildTableObj *pTable = pVgroup->tableList[sid];
|
||||||
mgmtIncTableRef((STableObj *)pTable);
|
mgmtIncTableRef((STableObj *)pTable);
|
||||||
mgmtReleaseVgroup(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
return pTable;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -150,7 +150,11 @@ SUserObj *mgmtGetUser(char *name) {
|
||||||
return (SUserObj *)sdbGetRow(tsUserSdb, name);
|
return (SUserObj *)sdbGetRow(tsUserSdb, name);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtReleaseUser(SUserObj *pUser) {
|
void mgmtIncUserRef(SUserObj *pUser) {
|
||||||
|
return sdbIncRef(tsUserSdb, pUser);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mgmtDecUserRef(SUserObj *pUser) {
|
||||||
return sdbDecRef(tsUserSdb, pUser);
|
return sdbDecRef(tsUserSdb, pUser);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -183,7 +187,7 @@ int32_t mgmtCreateUser(SAcctObj *pAcct, char *name, char *pass) {
|
||||||
SUserObj *pUser = mgmtGetUser(name);
|
SUserObj *pUser = mgmtGetUser(name);
|
||||||
if (pUser != NULL) {
|
if (pUser != NULL) {
|
||||||
mTrace("user:%s is already there", name);
|
mTrace("user:%s is already there", name);
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
return TSDB_CODE_USER_ALREADY_EXIST;
|
return TSDB_CODE_USER_ALREADY_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,7 +277,7 @@ static int32_t mgmtGetUserMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pCon
|
||||||
pShow->numOfRows = pUser->pAcct->acctInfo.numOfUsers;
|
pShow->numOfRows = pUser->pAcct->acctInfo.numOfUsers;
|
||||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||||
|
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -308,7 +312,7 @@ static int32_t mgmtRetrieveUsers(SShowObj *pShow, char *data, int32_t rows, void
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
}
|
}
|
||||||
pShow->numOfReads += numOfRows;
|
pShow->numOfReads += numOfRows;
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
|
@ -356,7 +360,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
|
||||||
|
|
||||||
if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
|
if (strcmp(pUser->user, "monitor") == 0 || (strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -432,7 +436,7 @@ static void mgmtProcessAlterUserMsg(SQueuedMsg *pMsg) {
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
|
static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
|
||||||
|
@ -449,7 +453,7 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
|
||||||
if (strcmp(pUser->user, "monitor") == 0 || strcmp(pUser->user, pUser->acct) == 0 ||
|
if (strcmp(pUser->user, "monitor") == 0 || strcmp(pUser->user, pUser->acct) == 0 ||
|
||||||
(strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
|
(strcmp(pUser->user + 1, pUser->acct) == 0 && pUser->user[0] == '_')) {
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_NO_RIGHTS);
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
return ;
|
return ;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -478,7 +482,7 @@ static void mgmtProcessDropUserMsg(SQueuedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtSendSimpleResp(pMsg->thandle, code);
|
mgmtSendSimpleResp(pMsg->thandle, code);
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtDropAllUsers(SAcctObj *pAcct) {
|
void mgmtDropAllUsers(SAcctObj *pAcct) {
|
||||||
|
@ -504,7 +508,7 @@ void mgmtDropAllUsers(SAcctObj *pAcct) {
|
||||||
numOfUsers++;
|
numOfUsers++;
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtReleaseUser(pUser);
|
mgmtDecUserRef(pUser);
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("acct:%s, all users:%d is dropped from sdb", pAcct->user, numOfUsers);
|
mTrace("acct:%s, all users:%d is dropped from sdb", pAcct->user, numOfUsers);
|
||||||
|
|
|
@ -68,7 +68,6 @@ static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) {
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
return TSDB_CODE_INVALID_DB;
|
return TSDB_CODE_INVALID_DB;
|
||||||
}
|
}
|
||||||
mgmtDecDbRef(pDb);
|
|
||||||
|
|
||||||
pVgroup->pDb = pDb;
|
pVgroup->pDb = pDb;
|
||||||
pVgroup->prev = NULL;
|
pVgroup->prev = NULL;
|
||||||
|
@ -91,15 +90,13 @@ static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) {
|
||||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
|
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
|
||||||
if (pDnode != NULL) {
|
if (pDnode != NULL) {
|
||||||
pVgroup->vnodeGid[i].privateIp = pDnode->privateIp;
|
pVgroup->vnodeGid[i].pDnode = pDnode;
|
||||||
pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
|
atomic_add_fetch_32(&pDnode->openVnodes, 1);
|
||||||
atomic_add_fetch_32(&pDnode->openVnodes, 1);
|
mgmtDecDnodeRef(pDnode);
|
||||||
mgmtReleaseDnode(pDnode);
|
}
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtAddVgroupIntoDb(pVgroup);
|
mgmtAddVgroupIntoDb(pVgroup);
|
||||||
mgmtIncDbRef(pVgroup->pDb);
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -115,10 +112,10 @@ static int32_t mgmtVgroupActionDelete(SSdbOper *pOper) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
|
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
|
||||||
if (pDnode) {
|
if (pDnode != NULL) {
|
||||||
atomic_sub_fetch_32(&pDnode->openVnodes, 1);
|
atomic_sub_fetch_32(&pDnode->openVnodes, 1);
|
||||||
}
|
}
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -150,6 +147,12 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) {
|
||||||
static int32_t mgmtVgroupActionEncode(SSdbOper *pOper) {
|
static int32_t mgmtVgroupActionEncode(SSdbOper *pOper) {
|
||||||
SVgObj *pVgroup = pOper->pObj;
|
SVgObj *pVgroup = pOper->pObj;
|
||||||
memcpy(pOper->rowData, pVgroup, tsVgUpdateSize);
|
memcpy(pOper->rowData, pVgroup, tsVgUpdateSize);
|
||||||
|
SVgObj *pTmpVgroup = pOper->rowData;
|
||||||
|
for (int32_t i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
|
||||||
|
pTmpVgroup->vnodeGid[i].pDnode = NULL;
|
||||||
|
pTmpVgroup->vnodeGid[i].role = 0;
|
||||||
|
}
|
||||||
|
|
||||||
pOper->rowSize = tsVgUpdateSize;
|
pOper->rowSize = tsVgUpdateSize;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -204,7 +207,11 @@ int32_t mgmtInitVgroups() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtReleaseVgroup(SVgObj *pVgroup) {
|
void mgmtIncVgroupRef(SVgObj *pVgroup) {
|
||||||
|
return sdbIncRef(tsVgroupSdb, pVgroup);
|
||||||
|
}
|
||||||
|
|
||||||
|
void mgmtDecVgroupRef(SVgObj *pVgroup) {
|
||||||
return sdbDecRef(tsVgroupSdb, pVgroup);
|
return sdbDecRef(tsVgroupSdb, pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,14 +231,15 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) {
|
||||||
mgmtSendCreateVgroupMsg(pVgroup, NULL);
|
mgmtSendCreateVgroupMsg(pVgroup, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, int32_t dnodeId, SVnodeLoad *pVload) {
|
void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVload) {
|
||||||
if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
|
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||||
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
if (pVgid->pDnode == pDnode) {
|
||||||
if (pVgid->dnodeId == dnodeId) {
|
pVgid->role = pVload->role;
|
||||||
|
if (pVload->role == TAOS_SYNC_ROLE_MASTER) {
|
||||||
pVgroup->inUse = i;
|
pVgroup->inUse = i;
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -340,7 +348,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||||
mgmtDecTableRef(pTable);
|
mgmtDecTableRef(pTable);
|
||||||
pVgroup = mgmtGetVgroup(((SChildTableObj*)pTable)->vgId);
|
pVgroup = mgmtGetVgroup(((SChildTableObj*)pTable)->vgId);
|
||||||
if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;
|
if (NULL == pVgroup) return TSDB_CODE_INVALID_TABLE_ID;
|
||||||
mgmtReleaseVgroup(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
|
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
|
||||||
} else {
|
} else {
|
||||||
SVgObj *pVgroup = pDb->pHead;
|
SVgObj *pVgroup = pDb->pHead;
|
||||||
|
@ -391,27 +399,6 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *mgmtGetVnodeStatus(SVgObj *pVgroup, SVnodeGid *pVnode) {
|
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(pVnode->dnodeId);
|
|
||||||
if (pDnode == NULL) {
|
|
||||||
mError("vgroup:%d, not exist in dnode:%d", pVgroup->vgId, pDnode->dnodeId);
|
|
||||||
return "null";
|
|
||||||
}
|
|
||||||
mgmtReleaseDnode(pDnode);
|
|
||||||
|
|
||||||
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) {
|
|
||||||
return "offline";
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int i = 0; i < pDnode->openVnodes; ++i) {
|
|
||||||
if (pDnode->vload[i].vgId == pVgroup->vgId) {
|
|
||||||
return pDnode->vload[i].status ? "ready" : "offline";
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return "null";
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
|
int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn) {
|
||||||
int32_t numOfRows = 0;
|
int32_t numOfRows = 0;
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
|
@ -453,19 +440,24 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
|
||||||
*(int16_t *) pWrite = pVgroup->vnodeGid[i].dnodeId;
|
*(int16_t *) pWrite = pVgroup->vnodeGid[i].dnodeId;
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp);
|
SDnodeObj *pDnode = pVgroup->vnodeGid[i].pDnode;
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
|
||||||
strcpy(pWrite, ipstr);
|
|
||||||
cols++;
|
|
||||||
|
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
if (pDnode != NULL) {
|
||||||
if (pVgroup->vnodeGid[i].dnodeId != 0) {
|
tinet_ntoa(ipstr, pDnode->privateIp);
|
||||||
char *vnodeStatus = mgmtGetVnodeStatus(pVgroup, pVgroup->vnodeGid + i);
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
strcpy(pWrite, vnodeStatus);
|
strcpy(pWrite, ipstr);
|
||||||
|
cols++;
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
strcpy(pWrite, mgmtGetMnodeRoleStr(pVgroup->vnodeGid[i].role));
|
||||||
|
cols++;
|
||||||
} else {
|
} else {
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
strcpy(pWrite, "null");
|
strcpy(pWrite, "null");
|
||||||
|
cols++;
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
strcpy(pWrite, "null");
|
||||||
|
cols++;
|
||||||
}
|
}
|
||||||
cols++;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
|
@ -526,16 +518,17 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
|
||||||
pCfg->commitLog = pDb->cfg.commitLog;
|
pCfg->commitLog = pDb->cfg.commitLog;
|
||||||
pCfg->replications = (int8_t) pVgroup->numOfVnodes;
|
pCfg->replications = (int8_t) pVgroup->numOfVnodes;
|
||||||
pCfg->quorum = 1;
|
pCfg->quorum = 1;
|
||||||
pCfg->arbitratorIp = htonl(pVgroup->vnodeGid[0].privateIp);
|
|
||||||
|
|
||||||
SMDVnodeDesc *pNodes = pVnode->nodes;
|
SMDVnodeDesc *pNodes = pVnode->nodes;
|
||||||
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
|
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
|
||||||
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[j].dnodeId);
|
SDnodeObj *pDnode = pVgroup->vnodeGid[0].pDnode;
|
||||||
if (pDnode != NULL) {
|
if (pDnode != NULL) {
|
||||||
pNodes[j].nodeId = htonl(pDnode->dnodeId);
|
pNodes[j].nodeId = htonl(pDnode->dnodeId);
|
||||||
pNodes[j].nodeIp = htonl(pDnode->privateIp);
|
pNodes[j].nodeIp = htonl(pDnode->privateIp);
|
||||||
strcpy(pNodes[j].nodeName, pDnode->dnodeName);
|
strcpy(pNodes[j].nodeName, pDnode->dnodeName);
|
||||||
mgmtReleaseDnode(pDnode);
|
if (j == 0) {
|
||||||
|
pCfg->arbitratorIp = htonl(pDnode->privateIp);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -549,7 +542,7 @@ SRpcIpSet mgmtGetIpSetFromVgroup(SVgObj *pVgroup) {
|
||||||
.port = tsDnodeMnodePort
|
.port = tsDnodeMnodePort
|
||||||
};
|
};
|
||||||
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
|
for (int i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
ipSet.ip[i] = pVgroup->vnodeGid[i].privateIp;
|
ipSet.ip[i] = pVgroup->vnodeGid[i].pDnode->privateIp;
|
||||||
}
|
}
|
||||||
return ipSet;
|
return ipSet;
|
||||||
}
|
}
|
||||||
|
@ -580,7 +573,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
|
||||||
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||||
mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].privateIp);
|
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->privateIp);
|
||||||
mgmtSendCreateVnodeMsg(pVgroup, &ipSet, ahandle);
|
mgmtSendCreateVnodeMsg(pVgroup, &ipSet, ahandle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -646,7 +639,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
|
||||||
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||||
mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].privateIp);
|
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->privateIp);
|
||||||
mgmtSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle);
|
mgmtSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -697,7 +690,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
|
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
mgmtReleaseDnode(pDnode);
|
mgmtDecDnodeRef(pDnode);
|
||||||
|
|
||||||
SVgObj *pVgroup = mgmtGetVgroup(pCfg->vgId);
|
SVgObj *pVgroup = mgmtGetVgroup(pCfg->vgId);
|
||||||
if (pVgroup == NULL) {
|
if (pVgroup == NULL) {
|
||||||
|
@ -705,7 +698,7 @@ static void mgmtProcessVnodeCfgMsg(SRpcMsg *rpcMsg) {
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
|
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_NOT_ACTIVE_VNODE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
mgmtReleaseVgroup(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
|
|
||||||
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);
|
mgmtSendSimpleResp(rpcMsg->handle, TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
|
@ -721,7 +714,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb) {
|
||||||
SVgObj *pVgroup = NULL;
|
SVgObj *pVgroup = NULL;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
mgmtReleaseVgroup(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
|
pNode = sdbFetchRow(tsVgroupSdb, pNode, (void **)&pVgroup);
|
||||||
if (pVgroup == NULL) break;
|
if (pVgroup == NULL) break;
|
||||||
|
|
||||||
|
|
|
@ -408,82 +408,82 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
||||||
if (len <= 0) {
|
if (len <= 0) {
|
||||||
free(content);
|
free(content);
|
||||||
fclose(fp);
|
fclose(fp);
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, content is null", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, content is null", pVnode, pVnode->vgId);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *root = cJSON_Parse(content);
|
cJSON *root = cJSON_Parse(content);
|
||||||
if (root == NULL) {
|
if (root == NULL) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, invalid json format", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, invalid json format", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON *precision = cJSON_GetObjectItem(root, "precision");
|
cJSON *precision = cJSON_GetObjectItem(root, "precision");
|
||||||
if (!precision || precision->type != cJSON_Number) {
|
if (!precision || precision->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, precision not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, precision not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->tsdbCfg.precision = (int8_t)precision->valueint;
|
pVnode->tsdbCfg.precision = (int8_t)precision->valueint;
|
||||||
|
|
||||||
cJSON *compression = cJSON_GetObjectItem(root, "compression");
|
cJSON *compression = cJSON_GetObjectItem(root, "compression");
|
||||||
if (!compression || compression->type != cJSON_Number) {
|
if (!compression || compression->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, compression not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, compression not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->tsdbCfg.compression = (int8_t)compression->valueint;
|
pVnode->tsdbCfg.compression = (int8_t)compression->valueint;
|
||||||
|
|
||||||
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
||||||
if (!maxTables || maxTables->type != cJSON_Number) {
|
if (!maxTables || maxTables->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, maxTables not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, maxTables not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
||||||
|
|
||||||
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
|
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
|
||||||
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
|
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, daysPerFile not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysPerFile not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint;
|
pVnode->tsdbCfg.daysPerFile = daysPerFile->valueint;
|
||||||
|
|
||||||
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
|
cJSON *minRowsPerFileBlock = cJSON_GetObjectItem(root, "minRowsPerFileBlock");
|
||||||
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
|
if (!minRowsPerFileBlock || minRowsPerFileBlock->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, minRowsPerFileBlock not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, minRowsPerFileBlock not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->tsdbCfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint;
|
pVnode->tsdbCfg.minRowsPerFileBlock = minRowsPerFileBlock->valueint;
|
||||||
|
|
||||||
cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
|
cJSON *maxRowsPerFileBlock = cJSON_GetObjectItem(root, "maxRowsPerFileBlock");
|
||||||
if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
|
if (!maxRowsPerFileBlock || maxRowsPerFileBlock->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, maxRowsPerFileBlock not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, maxRowsPerFileBlock not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
|
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
|
||||||
|
|
||||||
cJSON *keep = cJSON_GetObjectItem(root, "keep");
|
cJSON *daysToKeep = cJSON_GetObjectItem(root, "daysToKeep");
|
||||||
if (!keep || keep->type != cJSON_Number) {
|
if (!daysToKeep || daysToKeep->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, keep not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, daysToKeep not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->tsdbCfg.keep = keep->valueint;
|
pVnode->tsdbCfg.keep = daysToKeep->valueint;
|
||||||
|
|
||||||
cJSON *maxCacheSize = cJSON_GetObjectItem(root, "maxCacheSize");
|
cJSON *maxCacheSize = cJSON_GetObjectItem(root, "maxCacheSize");
|
||||||
if (!maxCacheSize || maxCacheSize->type != cJSON_Number) {
|
if (!maxCacheSize || maxCacheSize->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, maxCacheSize not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, maxCacheSize not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->tsdbCfg.maxCacheSize = maxCacheSize->valueint;
|
pVnode->tsdbCfg.maxCacheSize = maxCacheSize->valueint;
|
||||||
|
|
||||||
cJSON *commitLog = cJSON_GetObjectItem(root, "commitLog");
|
cJSON *commitLog = cJSON_GetObjectItem(root, "commitLog");
|
||||||
if (!commitLog || commitLog->type != cJSON_Number) {
|
if (!commitLog || commitLog->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, commitLog not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, commitLog not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->walCfg.commitLog = (int8_t)commitLog->valueint;
|
pVnode->walCfg.commitLog = (int8_t)commitLog->valueint;
|
||||||
|
|
||||||
cJSON *wals = cJSON_GetObjectItem(root, "wals");
|
cJSON *wals = cJSON_GetObjectItem(root, "wals");
|
||||||
if (!wals || wals->type != cJSON_Number) {
|
if (!wals || wals->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, wals not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, wals not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->walCfg.wals = (int8_t)wals->valueint;
|
pVnode->walCfg.wals = (int8_t)wals->valueint;
|
||||||
|
@ -491,34 +491,34 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
||||||
|
|
||||||
cJSON *arbitratorIp = cJSON_GetObjectItem(root, "arbitratorIp");
|
cJSON *arbitratorIp = cJSON_GetObjectItem(root, "arbitratorIp");
|
||||||
if (!arbitratorIp || arbitratorIp->type != cJSON_String || arbitratorIp->valuestring == NULL) {
|
if (!arbitratorIp || arbitratorIp->type != cJSON_String || arbitratorIp->valuestring == NULL) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, arbitratorIp not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, arbitratorIp not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->syncCfg.arbitratorIp = inet_addr(arbitratorIp->valuestring);
|
pVnode->syncCfg.arbitratorIp = inet_addr(arbitratorIp->valuestring);
|
||||||
|
|
||||||
cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
|
cJSON *quorum = cJSON_GetObjectItem(root, "quorum");
|
||||||
if (!quorum || quorum->type != cJSON_Number) {
|
if (!quorum || quorum->type != cJSON_Number) {
|
||||||
dError("failed to vnode cfg, quorum not found", pVnode, pVnode->vgId);
|
dError("failed to read vnode cfg, quorum not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->syncCfg.quorum = (int8_t)quorum->valueint;
|
pVnode->syncCfg.quorum = (int8_t)quorum->valueint;
|
||||||
|
|
||||||
cJSON *replica = cJSON_GetObjectItem(root, "replica");
|
cJSON *replica = cJSON_GetObjectItem(root, "replica");
|
||||||
if (!replica || replica->type != cJSON_Number) {
|
if (!replica || replica->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, replica not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, replica not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->syncCfg.replica = (int8_t)replica->valueint;
|
pVnode->syncCfg.replica = (int8_t)replica->valueint;
|
||||||
|
|
||||||
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
|
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
|
||||||
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
|
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeInfos not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeInfos not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
int size = cJSON_GetArraySize(nodeInfos);
|
int size = cJSON_GetArraySize(nodeInfos);
|
||||||
if (size != pVnode->syncCfg.replica) {
|
if (size != pVnode->syncCfg.replica) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeInfos size not matched", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeInfos size not matched", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -528,21 +528,21 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
||||||
|
|
||||||
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
|
cJSON *nodeId = cJSON_GetObjectItem(nodeInfo, "nodeId");
|
||||||
if (!nodeId || nodeId->type != cJSON_Number) {
|
if (!nodeId || nodeId->type != cJSON_Number) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeId not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeId not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint;
|
pVnode->syncCfg.nodeInfo[i].nodeId = nodeId->valueint;
|
||||||
|
|
||||||
cJSON *nodeIp = cJSON_GetObjectItem(nodeInfo, "nodeIp");
|
cJSON *nodeIp = cJSON_GetObjectItem(nodeInfo, "nodeIp");
|
||||||
if (!nodeIp || nodeIp->type != cJSON_String || nodeIp->valuestring == NULL) {
|
if (!nodeIp || nodeIp->type != cJSON_String || nodeIp->valuestring == NULL) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeIp not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeIp not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
pVnode->syncCfg.nodeInfo[i].nodeIp = inet_addr(nodeIp->valuestring);
|
pVnode->syncCfg.nodeInfo[i].nodeIp = inet_addr(nodeIp->valuestring);
|
||||||
|
|
||||||
cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName");
|
cJSON *nodeName = cJSON_GetObjectItem(nodeInfo, "nodeName");
|
||||||
if (!nodeName || nodeName->type != cJSON_String || nodeName->valuestring == NULL) {
|
if (!nodeName || nodeName->type != cJSON_String || nodeName->valuestring == NULL) {
|
||||||
dError("pVnode:%p vgId:%d, failed to vnode cfg, nodeName not found", pVnode, pVnode->vgId);
|
dError("pVnode:%p vgId:%d, failed to read vnode cfg, nodeName not found", pVnode, pVnode->vgId);
|
||||||
goto PARSE_OVER;
|
goto PARSE_OVER;
|
||||||
}
|
}
|
||||||
strncpy(pVnode->syncCfg.nodeInfo[i].name, nodeName->valuestring, TSDB_NODE_NAME_LEN);
|
strncpy(pVnode->syncCfg.nodeInfo[i].name, nodeName->valuestring, TSDB_NODE_NAME_LEN);
|
||||||
|
|
Loading…
Reference in New Issue