change super table vgroup
This commit is contained in:
parent
d7d51ad10b
commit
de76b5bd30
|
@ -88,6 +88,7 @@ typedef struct _dnode_obj {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t dnodeId;
|
int32_t dnodeId;
|
||||||
|
uint16_t port;
|
||||||
uint32_t privateIp;
|
uint32_t privateIp;
|
||||||
uint32_t publicIp;
|
uint32_t publicIp;
|
||||||
} SVnodeGid;
|
} SVnodeGid;
|
||||||
|
|
|
@ -634,14 +634,13 @@ typedef struct SCMSTableVgroupMsg {
|
||||||
} SCMSTableVgroupMsg;
|
} SCMSTableVgroupMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SIpAddr ipAddr;
|
int32_t vgId;
|
||||||
int32_t numOfVgroups;
|
SIpAddr ipAddr[TSDB_REPLICA_MAX_NUM];
|
||||||
int32_t vgId[];
|
} SCMVgroupInfo;
|
||||||
} STableDnodeVgroupInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t numOfDnodes;
|
int32_t numOfVgroups;
|
||||||
STableDnodeVgroupInfo dnodeVgroups[];
|
SCMVgroupInfo vgroups[];
|
||||||
} SCMSTableVgroupRspMsg;
|
} SCMSTableVgroupRspMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -1166,40 +1166,48 @@ static void mgmtGetSuperTableMeta(SQueuedMsg *pMsg) {
|
||||||
|
|
||||||
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
|
static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
|
||||||
SCMSTableVgroupMsg *pInfo = pMsg->pCont;
|
SCMSTableVgroupMsg *pInfo = pMsg->pCont;
|
||||||
pMsg->pTable = mgmtGetSuperTable(pInfo->tableId);
|
SSuperTableObj *pTable = mgmtGetSuperTable(pInfo->tableId);
|
||||||
|
|
||||||
|
pMsg->pTable = pTable;
|
||||||
if (pMsg->pTable == NULL) {
|
if (pMsg->pTable == NULL) {
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCMSTableVgroupRspMsg *pRsp = rpcMallocCont(sizeof(SCMSTableVgroupRspMsg) + sizeof(uint32_t) * clusterGetDnodesNum());
|
int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + sizeof(SCMVgroupInfo) * pTable->vgLen;
|
||||||
|
SCMSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
|
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SERV_OUT_OF_MEMORY);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfVgroups = 1;
|
int32_t vg = 0;
|
||||||
int32_t numOfDnodes = 1;
|
for (; vg < pTable->vgLen; ++vg) {
|
||||||
|
int32_t vgId = pTable->vgList[vg];
|
||||||
pRsp->numOfDnodes = htonl(numOfDnodes);
|
if (vgId == 0) break;
|
||||||
STableDnodeVgroupInfo* pVgroupInfo = pRsp->dnodeVgroups;
|
|
||||||
pVgroupInfo->ipAddr.ip = htonl(inet_addr(tsPrivateIp));
|
SVgObj *pVgroup = mgmtGetVgroup(vgId);
|
||||||
|
if (pVgroup == NULL) break;
|
||||||
pVgroupInfo->ipAddr.port = htons(0); // todo fix it
|
|
||||||
pVgroupInfo->numOfVgroups = htonl(numOfVgroups); // todo fix it
|
pRsp->vgroups[vg].vgId = htonl(vgId);
|
||||||
int32_t* vgIdList = pVgroupInfo->vgId;
|
for (int32_t vn = 0; vn < pVgroup->numOfVnodes; ++vn) {
|
||||||
|
SDnodeObj *pDnode = clusterGetDnode(pVgroup->vnodeGid[vn].dnodeId);
|
||||||
for(int32_t i = 0; i < numOfVgroups; ++i) {
|
if (pDnode == NULL) break;
|
||||||
vgIdList[i] = htonl(2); // todo fix it
|
|
||||||
|
pRsp->vgroups[vg].ipAddr[vn].ip = htonl(pDnode->privateIp);
|
||||||
|
pRsp->vgroups[vg].ipAddr[vn].port = htons(tsDnodeShellPort);
|
||||||
|
|
||||||
|
clusterReleaseDnode(pDnode);
|
||||||
|
}
|
||||||
|
|
||||||
|
mgmtReleaseVgroup(pVgroup);
|
||||||
}
|
}
|
||||||
|
pRsp->numOfVgroups = htonl(vg);
|
||||||
assert(numOfDnodes == 1); // this size is valid only when numOfDnodes equals 1
|
|
||||||
int32_t msgLen = sizeof(SCMSTableVgroupRspMsg) + sizeof(STableDnodeVgroupInfo) + numOfVgroups * sizeof(int32_t);
|
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {0};
|
SRpcMsg rpcRsp = {0};
|
||||||
rpcRsp.handle = pMsg->thandle;
|
rpcRsp.handle = pMsg->thandle;
|
||||||
rpcRsp.pCont = pRsp;
|
rpcRsp.pCont = pRsp;
|
||||||
rpcRsp.contLen = msgLen;
|
rpcRsp.contLen = sizeof(SCMSTableVgroupRspMsg) + sizeof(SCMVgroupInfo) * vg;
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue