TD-1671
This commit is contained in:
parent
f7656ec7f7
commit
1b59ce04e9
|
@ -459,18 +459,24 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
|
|||
|
||||
static int32_t dnodeProcessCreateMnodeMsg(SRpcMsg *pMsg) {
|
||||
SMDCreateMnodeMsg *pCfg = pMsg->pCont;
|
||||
pCfg->dnodeId = htonl(pCfg->dnodeId);
|
||||
if (pCfg->dnodeId != dnodeGetDnodeId()) {
|
||||
dError("dnodeId:%d in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
|
||||
dError("dnodeId:%d, in create mnode msg is not equal with saved dnodeId:%d", pCfg->dnodeId, dnodeGetDnodeId());
|
||||
return TSDB_CODE_MND_DNODE_ID_NOT_CONFIGURED;
|
||||
}
|
||||
|
||||
if (strcmp(pCfg->dnodeEp, tsLocalEp) != 0) {
|
||||
dError("dnodeEp:%s in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
|
||||
dError("dnodeEp:%s, in create mnode msg is not equal with saved dnodeEp:%s", pCfg->dnodeEp, tsLocalEp);
|
||||
return TSDB_CODE_MND_DNODE_EP_NOT_CONFIGURED;
|
||||
}
|
||||
|
||||
dDebug("dnodeId:%d, create mnode msg is received", pCfg->dnodeId);
|
||||
dnodeStartMnode();
|
||||
dDebug("dnodeId:%d, create mnode msg is received from mnodes, numOfMnodes:%d", pCfg->dnodeId, pCfg->mnodes.nodeNum);
|
||||
for (int i = 0; i < pCfg->mnodes.nodeNum; ++i) {
|
||||
pCfg->mnodes.nodeInfos[i].nodeId = htonl(pCfg->mnodes.nodeInfos[i].nodeId);
|
||||
dDebug("mnode index:%d, mnode:%d:%s", i, pCfg->mnodes.nodeInfos[i].nodeId, pCfg->mnodes.nodeInfos[i].nodeEp);
|
||||
}
|
||||
|
||||
dnodeStartMnode(&pCfg->mnodes);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -485,31 +491,6 @@ void dnodeUpdateMnodeEpSetForPeer(SRpcEpSet *pEpSet) {
|
|||
for (int i = 0; i < pEpSet->numOfEps; ++i) {
|
||||
pEpSet->port[i] -= TSDB_PORT_DNODEDNODE;
|
||||
dInfo("mnode index:%d %s:%u", i, pEpSet->fqdn[i], pEpSet->port[i]);
|
||||
|
||||
#if 0
|
||||
if (!mnodeIsRunning()) {
|
||||
if (strcmp(pEpSet->fqdn[i], tsLocalFqdn) == 0 && pEpSet->port[i] == tsServerPort) {
|
||||
dInfo("mnode index:%d %s:%u self should work as mnode", i, pEpSet->fqdn[i], pEpSet->port[i]);
|
||||
bool find = false;
|
||||
for (int i = 0; i < tsDMnodeInfos.nodeNum; ++i) {
|
||||
if (tsDMnodeInfos.nodeInfos[i].nodeId == dnodeGetDnodeId()) {
|
||||
dInfo("localEp found in mnode infos");
|
||||
find = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (!find) {
|
||||
dInfo("localEp not found in mnode infos, will set into mnode infos");
|
||||
tstrncpy(tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeEp, tsLocalEp, TSDB_EP_LEN);
|
||||
tsDMnodeInfos.nodeInfos[tsDMnodeInfos.nodeNum].nodeId = dnodeGetDnodeId();
|
||||
tsDMnodeInfos.nodeNum++;
|
||||
}
|
||||
|
||||
dnodeStartMnode();
|
||||
}
|
||||
}
|
||||
#endif
|
||||
}
|
||||
|
||||
tsDMnodeEpSet = *pEpSet;
|
||||
|
@ -598,7 +579,6 @@ static void dnodeUpdateMnodeInfos(SDMMnodeInfos *pMnodes) {
|
|||
}
|
||||
|
||||
dnodeSaveMnodeInfos();
|
||||
sdbUpdateSync();
|
||||
}
|
||||
|
||||
static bool dnodeReadMnodeInfos() {
|
||||
|
|
|
@ -146,7 +146,9 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
|
|||
}
|
||||
}
|
||||
|
||||
bool dnodeStartMnode() {
|
||||
bool dnodeStartMnode(void *pMnodes) {
|
||||
SDMMnodeInfos *mnodes = pMnodes;
|
||||
|
||||
if (tsModuleStatus & (1 << TSDB_MOD_MNODE)) {
|
||||
dDebug("mnode module is already started, module status:%d", tsModuleStatus);
|
||||
return false;
|
||||
|
@ -156,6 +158,7 @@ bool dnodeStartMnode() {
|
|||
dInfo("start mnode module, module status:%d, new status:%d", tsModuleStatus, moduleStatus);
|
||||
dnodeProcessModuleStatus(moduleStatus);
|
||||
|
||||
sdbUpdateSync();
|
||||
sdbUpdateSync(mnodes);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -43,7 +43,7 @@ void dnodeGetMnodeEpSetForPeer(void *epSet);
|
|||
void dnodeGetMnodeEpSetForShell(void *epSet);
|
||||
void * dnodeGetMnodeInfos();
|
||||
int32_t dnodeGetDnodeId();
|
||||
bool dnodeStartMnode();
|
||||
bool dnodeStartMnode(void *pModes);
|
||||
|
||||
void dnodeAddClientRspHandle(uint8_t msgType, void (*fp)(SRpcMsg *rpcMsg));
|
||||
void dnodeSendMsgToDnode(SRpcEpSet *epSet, SRpcMsg *rpcMsg);
|
||||
|
|
|
@ -60,7 +60,7 @@ int32_t mnodeInitSystem();
|
|||
int32_t mnodeStartSystem();
|
||||
void mnodeCleanupSystem();
|
||||
void mnodeStopSystem();
|
||||
void sdbUpdateSync();
|
||||
void sdbUpdateSync(void *pMnodes);
|
||||
bool mnodeIsRunning();
|
||||
int32_t mnodeProcessRead(SMnodeMsg *pMsg);
|
||||
int32_t mnodeProcessWrite(SMnodeMsg *pMsg);
|
||||
|
|
|
@ -722,6 +722,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
int32_t dnodeId;
|
||||
char dnodeEp[TSDB_EP_LEN]; // end point, hostname:port
|
||||
SDMMnodeInfos mnodes;
|
||||
} SMDCreateMnodeMsg;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -109,7 +109,7 @@ int32_t mnodeStartSystem() {
|
|||
|
||||
mInfo("mnode is initialized successfully");
|
||||
|
||||
sdbUpdateSync();
|
||||
sdbUpdateSync(NULL);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -276,8 +276,20 @@ static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) {
|
|||
if (pCreate == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
} else {
|
||||
pCreate->dnodeId = dnodeId;
|
||||
pCreate->dnodeId = htonl(dnodeId);
|
||||
tstrncpy(pCreate->dnodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
|
||||
pCreate->mnodes = tsMnodeInfos;
|
||||
bool found = false;
|
||||
for (int i = 0; i < pCreate->mnodes.nodeNum; ++i) {
|
||||
if (pCreate->mnodes.nodeInfos[i].nodeId == htonl(dnodeId)) {
|
||||
found = true;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeId = htonl(dnodeId);
|
||||
tstrncpy(pCreate->mnodes.nodeInfos[pCreate->mnodes.nodeNum].nodeEp, dnodeEp, sizeof(pCreate->dnodeEp));
|
||||
pCreate->mnodes.nodeNum++;
|
||||
}
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {0};
|
||||
|
@ -291,6 +303,8 @@ static int32_t mnodeSendCreateMnodeMsg(int32_t dnodeId, char *dnodeEp) {
|
|||
|
||||
if (rpcRsp.code != TSDB_CODE_SUCCESS) {
|
||||
mError("dnode:%d, failed to send create mnode msg, ep:%s reason:%s", dnodeId, dnodeEp, tstrerror(rpcRsp.code));
|
||||
} else {
|
||||
mDebug("dnode:%d, create mnode msg is disposed, mnode is created in dnode", dnodeId);
|
||||
}
|
||||
|
||||
rpcFreeCont(rpcRsp.pCont);
|
||||
|
@ -301,8 +315,9 @@ static int32_t mnodeCreateMnodeCb(SMnodeMsg *pMsg, int32_t code) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
mError("failed to create mnode, reason:%s", tstrerror(code));
|
||||
} else {
|
||||
mDebug("mnode is created");
|
||||
mDebug("mnode is created successfully");
|
||||
mnodeUpdateMnodeEpSet();
|
||||
sdbUpdateSync(NULL);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -314,9 +329,9 @@ void mnodeCreateMnode(int32_t dnodeId, char *dnodeEp, bool needConfirm) {
|
|||
pMnode->createdTime = taosGetTimestampMs();
|
||||
|
||||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
.table = tsMnodeSdb,
|
||||
.pObj = pMnode,
|
||||
.pObj = pMnode,
|
||||
.writeCb = mnodeCreateMnodeCb
|
||||
};
|
||||
|
||||
|
@ -346,6 +361,7 @@ void mnodeDropMnodeLocal(int32_t dnodeId) {
|
|||
}
|
||||
|
||||
mnodeUpdateMnodeEpSet();
|
||||
sdbUpdateSync(NULL);
|
||||
}
|
||||
|
||||
int32_t mnodeDropMnode(int32_t dnodeId) {
|
||||
|
@ -365,6 +381,7 @@ int32_t mnodeDropMnode(int32_t dnodeId) {
|
|||
sdbDecRef(tsMnodeSdb, pMnode);
|
||||
|
||||
mnodeUpdateMnodeEpSet();
|
||||
sdbUpdateSync(NULL);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -297,27 +297,19 @@ static void sdbConfirmForward(void *ahandle, void *param, int32_t code) {
|
|||
taosFreeQitem(pOper);
|
||||
}
|
||||
|
||||
void sdbUpdateSync() {
|
||||
void sdbUpdateSync(void *pMnodes) {
|
||||
SDMMnodeInfos *mnodes = pMnodes;
|
||||
if (!mnodeIsRunning()) {
|
||||
mDebug("mnode not start yet, update sync info later");
|
||||
mDebug("mnode not start yet, update sync config later");
|
||||
return;
|
||||
}
|
||||
|
||||
mDebug("update sync info in sdb");
|
||||
mDebug("update sync config in sync module, mnodes:%p", pMnodes);
|
||||
|
||||
SSyncCfg syncCfg = {0};
|
||||
int32_t index = 0;
|
||||
|
||||
SDMMnodeInfos *mnodes = dnodeGetMnodeInfos();
|
||||
for (int32_t i = 0; i < mnodes->nodeNum; ++i) {
|
||||
SDMMnodeInfo *node = &mnodes->nodeInfos[i];
|
||||
syncCfg.nodeInfo[i].nodeId = node->nodeId;
|
||||
taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[i].nodeFqdn, &syncCfg.nodeInfo[i].nodePort);
|
||||
syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
|
||||
index++;
|
||||
}
|
||||
|
||||
if (index == 0) {
|
||||
if (mnodes == NULL) {
|
||||
void *pIter = NULL;
|
||||
while (1) {
|
||||
SMnodeObj *pMnode = NULL;
|
||||
|
@ -337,9 +329,19 @@ void sdbUpdateSync() {
|
|||
mnodeDecMnodeRef(pMnode);
|
||||
}
|
||||
sdbFreeIter(pIter);
|
||||
syncCfg.replica = index;
|
||||
mDebug("mnodes info not input, use infos in sdb, numOfMnodes:%d", syncCfg.replica);
|
||||
} else {
|
||||
for (index = 0; index < mnodes->nodeNum; ++index) {
|
||||
SDMMnodeInfo *node = &mnodes->nodeInfos[index];
|
||||
syncCfg.nodeInfo[index].nodeId = node->nodeId;
|
||||
taosGetFqdnPortFromEp(node->nodeEp, syncCfg.nodeInfo[index].nodeFqdn, &syncCfg.nodeInfo[index].nodePort);
|
||||
syncCfg.nodeInfo[index].nodePort += TSDB_PORT_SYNC;
|
||||
}
|
||||
syncCfg.replica = index;
|
||||
mDebug("mnodes info input, numOfMnodes:%d", syncCfg.replica);
|
||||
}
|
||||
|
||||
syncCfg.replica = index;
|
||||
syncCfg.quorum = (syncCfg.replica == 1) ? 1 : 2;
|
||||
|
||||
bool hasThisDnode = false;
|
||||
|
@ -350,8 +352,15 @@ void sdbUpdateSync() {
|
|||
}
|
||||
}
|
||||
|
||||
if (!hasThisDnode) return;
|
||||
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) return;
|
||||
if (!hasThisDnode) {
|
||||
sdbError("update sync config, this dnode not exist");
|
||||
return;
|
||||
}
|
||||
|
||||
if (memcmp(&syncCfg, &tsSdbObj.cfg, sizeof(SSyncCfg)) == 0) {
|
||||
sdbDebug("update sync config, info not changed");
|
||||
return;
|
||||
}
|
||||
|
||||
sdbInfo("work as mnode, replica:%d", syncCfg.replica);
|
||||
for (int32_t i = 0; i < syncCfg.replica; ++i) {
|
||||
|
|
|
@ -25,7 +25,7 @@ sql create dnode $hostname2
|
|||
$x = 0
|
||||
show2:
|
||||
$x = $x + 1
|
||||
sleep 4000
|
||||
sleep 2000
|
||||
if $x == 5 then
|
||||
return -1
|
||||
endi
|
||||
|
|
Loading…
Reference in New Issue