add config table message
This commit is contained in:
parent
2f0d82cadb
commit
116a5b04a2
|
@ -168,6 +168,44 @@ static int dnodeRetrieveUserAuthInfo(char *user, char *spi, char *encrypt, char
|
||||||
return rpcRsp.code;
|
return rpcRsp.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid) {
|
||||||
|
dTrace("vgId:%d, sid:%d send config table msg to mnode", vgId, sid);
|
||||||
|
|
||||||
|
int32_t contLen = sizeof(SDMConfigTableMsg);
|
||||||
|
SDMConfigTableMsg *pMsg = rpcMallocCont(contLen);
|
||||||
|
|
||||||
|
pMsg->dnodeId = htonl(dnodeGetDnodeId());
|
||||||
|
pMsg->vgId = htonl(vgId);
|
||||||
|
pMsg->sid = htonl(sid);
|
||||||
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
rpcMsg.pCont = pMsg;
|
||||||
|
rpcMsg.contLen = contLen;
|
||||||
|
rpcMsg.msgType = TSDB_MSG_TYPE_DM_CONFIG_TABLE;
|
||||||
|
|
||||||
|
SRpcMsg rpcRsp = {0};
|
||||||
|
dnodeSendMsgToDnodeRecv(&rpcMsg, &rpcRsp);
|
||||||
|
terrno = rpcRsp.code;
|
||||||
|
|
||||||
|
if (rpcRsp.code != 0) {
|
||||||
|
rpcFreeCont(rpcRsp.pCont);
|
||||||
|
dError("vgId:%d, sid:%d failed to config table from mnode", vgId, sid);
|
||||||
|
return NULL;
|
||||||
|
} else {
|
||||||
|
dPrint("vgId:%d, sid:%d config table msg is received", vgId, sid);
|
||||||
|
|
||||||
|
// delete this after debug finished
|
||||||
|
SMDCreateTableMsg *pTable = rpcRsp.pCont;
|
||||||
|
int16_t numOfColumns = htons(pTable->numOfColumns);
|
||||||
|
int16_t numOfTags = htons(pTable->numOfTags);
|
||||||
|
int32_t sid = htonl(pTable->sid);
|
||||||
|
uint64_t uid = htobe64(pTable->uid);
|
||||||
|
dPrint("table:%s, numOfColumns:%d numOfTags:%d sid:%d uid:%d", pTable->tableId, numOfColumns, numOfTags, sid, uid);
|
||||||
|
|
||||||
|
return rpcRsp.pCont;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SDnodeStatisInfo dnodeGetStatisInfo() {
|
SDnodeStatisInfo dnodeGetStatisInfo() {
|
||||||
SDnodeStatisInfo info = {0};
|
SDnodeStatisInfo info = {0};
|
||||||
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
|
if (dnodeGetRunStatus() == TSDB_DNODE_RUN_STATUS_RUNING) {
|
||||||
|
|
|
@ -47,6 +47,7 @@ int32_t dnodeGetDnodeId();
|
||||||
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
||||||
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg);
|
void dnodeSendMsgToDnode(SRpcIpSet *ipSet, SRpcMsg *rpcMsg);
|
||||||
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
|
void dnodeSendMsgToDnodeRecv(SRpcMsg *rpcMsg, SRpcMsg *rpcRsp);
|
||||||
|
void *dnodeSendCfgTableToRecv(int32_t vgId, int32_t sid);
|
||||||
|
|
||||||
void *dnodeAllocateVnodeWqueue(void *pVnode);
|
void *dnodeAllocateVnodeWqueue(void *pVnode);
|
||||||
void dnodeFreeVnodeWqueue(void *queue);
|
void dnodeFreeVnodeWqueue(void *queue);
|
||||||
|
|
|
@ -676,8 +676,8 @@ typedef struct {
|
||||||
} SCMCreateDnodeMsg, SCMDropDnodeMsg;
|
} SCMCreateDnodeMsg, SCMDropDnodeMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
uint32_t dnode;
|
int32_t dnodeId;
|
||||||
int32_t vnode;
|
int32_t vgId;
|
||||||
int32_t sid;
|
int32_t sid;
|
||||||
} SDMConfigTableMsg;
|
} SDMConfigTableMsg;
|
||||||
|
|
||||||
|
|
|
@ -1877,38 +1877,25 @@ static SChildTableObj* mnodeGetTableByPos(int32_t vnode, int32_t sid) {
|
||||||
|
|
||||||
static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
|
static int32_t mnodeProcessTableCfgMsg(SMnodeMsg *pMsg) {
|
||||||
SDMConfigTableMsg *pCfg = pMsg->rpcMsg.pCont;
|
SDMConfigTableMsg *pCfg = pMsg->rpcMsg.pCont;
|
||||||
pCfg->dnode = htonl(pCfg->dnode);
|
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||||
pCfg->vnode = htonl(pCfg->vnode);
|
pCfg->vgId = htonl(pCfg->vgId);
|
||||||
pCfg->sid = htonl(pCfg->sid);
|
pCfg->sid = htonl(pCfg->sid);
|
||||||
mTrace("dnode:%s, vnode:%d, sid:%d, receive table config msg", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
mTrace("dnode:%d, vgId:%d sid:%d, receive table config msg", pCfg->dnodeId, pCfg->vgId, pCfg->sid);
|
||||||
|
|
||||||
SChildTableObj *pTable = mnodeGetTableByPos(pCfg->vnode, pCfg->sid);
|
SChildTableObj *pTable = mnodeGetTableByPos(pCfg->vgId, pCfg->sid);
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
mError("dnode:%s, vnode:%d, sid:%d, table not found", taosIpStr(pCfg->dnode), pCfg->vnode, pCfg->sid);
|
mError("dnode:%d, vgId:%d sid:%d, table not found", pCfg->dnodeId, pCfg->vgId, pCfg->sid);
|
||||||
return TSDB_CODE_NOT_ACTIVE_TABLE;
|
return TSDB_CODE_NOT_ACTIVE_TABLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMDCreateTableMsg *pMDCreate = NULL;
|
SMDCreateTableMsg *pCreate = NULL;
|
||||||
pMDCreate = mnodeBuildCreateChildTableMsg(NULL, (SChildTableObj *)pTable);
|
pCreate = mnodeBuildCreateChildTableMsg(NULL, (SChildTableObj *)pTable);
|
||||||
if (pMDCreate == NULL) {
|
|
||||||
mnodeDecTableRef(pTable);
|
mnodeDecTableRef(pTable);
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDnodeObj *pDnode = mnodeGetDnode(pCfg->dnode);
|
if (pCreate == NULL) return terrno;
|
||||||
SRpcIpSet ipSet = mnodeGetIpSetFromIp(pDnode->dnodeEp);
|
|
||||||
SRpcMsg rpcRsp = {
|
|
||||||
.handle = NULL,
|
|
||||||
.pCont = pMDCreate,
|
|
||||||
.contLen = htonl(pMDCreate->contLen),
|
|
||||||
.code = 0,
|
|
||||||
.msgType = TSDB_MSG_TYPE_MD_CREATE_TABLE
|
|
||||||
};
|
|
||||||
dnodeSendMsgToDnode(&ipSet, &rpcRsp);
|
|
||||||
|
|
||||||
mnodeDecTableRef(pTable);
|
|
||||||
mnodeDecDnodeRef(pDnode);
|
|
||||||
|
|
||||||
|
pMsg->rpcRsp.rsp = pCreate;
|
||||||
|
pMsg->rpcRsp.len = htonl(pCreate->contLen);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue