Merge pull request #13711 from taosdata/fix/mnode
fix: save sync config in mnode
This commit is contained in:
commit
230b210266
|
@ -38,9 +38,13 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
|
||||||
SStatusRsp statusRsp = {0};
|
SStatusRsp statusRsp = {0};
|
||||||
if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
|
if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
|
||||||
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
|
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
|
||||||
pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
|
dTrace("status msg received from mnode, dnodeVer:%" PRId64 " saved:%" PRId64, statusRsp.dnodeVer,
|
||||||
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
|
pMgmt->pData->dnodeVer);
|
||||||
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
|
if (pMgmt->pData->dnodeVer != statusRsp.dnodeVer) {
|
||||||
|
pMgmt->pData->dnodeVer = statusRsp.dnodeVer;
|
||||||
|
dmUpdateDnodeCfg(pMgmt, &statusRsp.dnodeCfg);
|
||||||
|
dmUpdateEps(pMgmt->pData, statusRsp.pDnodeEps);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
rpcFreeCont(pRsp->pCont);
|
rpcFreeCont(pRsp->pCont);
|
||||||
tFreeSStatusRsp(&statusRsp);
|
tFreeSStatusRsp(&statusRsp);
|
||||||
|
@ -89,7 +93,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527};
|
SRpcMsg rpcMsg = {.pCont = pHead, .contLen = contLen, .msgType = TDMT_MND_STATUS, .info.ahandle = (void *)0x9527};
|
||||||
SRpcMsg rpcRsp = {0};
|
SRpcMsg rpcRsp = {0};
|
||||||
|
|
||||||
dTrace("send status msg to mnode");
|
dTrace("send status msg to mnode, dnodeVer:%" PRId64, req.dnodeVer);
|
||||||
|
|
||||||
SEpSet epSet = {0};
|
SEpSet epSet = {0};
|
||||||
dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
||||||
|
|
|
@ -81,6 +81,13 @@ int32_t dmReadEps(SDnodeData *pData) {
|
||||||
}
|
}
|
||||||
pData->dnodeId = dnodeId->valueint;
|
pData->dnodeId = dnodeId->valueint;
|
||||||
|
|
||||||
|
cJSON *dnodeVer = cJSON_GetObjectItem(root, "dnodeVer");
|
||||||
|
if (!dnodeVer || dnodeVer->type != cJSON_String) {
|
||||||
|
dError("failed to read %s since dnodeVer not found", file);
|
||||||
|
goto _OVER;
|
||||||
|
}
|
||||||
|
pData->dnodeVer = atoll(dnodeVer->valuestring);
|
||||||
|
|
||||||
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
|
cJSON *clusterId = cJSON_GetObjectItem(root, "clusterId");
|
||||||
if (!clusterId || clusterId->type != cJSON_String) {
|
if (!clusterId || clusterId->type != cJSON_String) {
|
||||||
dError("failed to read %s since clusterId not found", file);
|
dError("failed to read %s since clusterId not found", file);
|
||||||
|
@ -193,6 +200,7 @@ int32_t dmWriteEps(SDnodeData *pData) {
|
||||||
|
|
||||||
len += snprintf(content + len, maxLen - len, "{\n");
|
len += snprintf(content + len, maxLen - len, "{\n");
|
||||||
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pData->dnodeId);
|
len += snprintf(content + len, maxLen - len, " \"dnodeId\": %d,\n", pData->dnodeId);
|
||||||
|
len += snprintf(content + len, maxLen - len, " \"dnodeVer\": \"%" PRId64 "\",\n", pData->dnodeVer);
|
||||||
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pData->clusterId);
|
len += snprintf(content + len, maxLen - len, " \"clusterId\": \"%" PRId64 "\",\n", pData->clusterId);
|
||||||
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pData->dropped);
|
len += snprintf(content + len, maxLen - len, " \"dropped\": %d,\n", pData->dropped);
|
||||||
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
|
len += snprintf(content + len, maxLen - len, " \"dnodes\": [{\n");
|
||||||
|
@ -224,30 +232,15 @@ int32_t dmWriteEps(SDnodeData *pData) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pData->updateTime = taosGetTimestampMs();
|
pData->updateTime = taosGetTimestampMs();
|
||||||
dDebug("successed to write %s", realfile);
|
dDebug("successed to write %s, dnodeVer:%" PRId64, realfile, pData->dnodeVer);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
|
void dmUpdateEps(SDnodeData *pData, SArray *eps) {
|
||||||
int32_t numOfEps = taosArrayGetSize(eps);
|
|
||||||
if (numOfEps <= 0) return;
|
|
||||||
|
|
||||||
taosThreadRwlockWrlock(&pData->lock);
|
taosThreadRwlockWrlock(&pData->lock);
|
||||||
|
dDebug("new dnode list get from mnode, dnodeVer:%" PRId64, pData->dnodeVer);
|
||||||
int32_t numOfEpsOld = (int32_t)taosArrayGetSize(pData->dnodeEps);
|
dmResetEps(pData, eps);
|
||||||
if (numOfEps != numOfEpsOld) {
|
dmWriteEps(pData);
|
||||||
dDebug("new dnode list get from mnode");
|
|
||||||
dmResetEps(pData, eps);
|
|
||||||
dmWriteEps(pData);
|
|
||||||
} else {
|
|
||||||
int32_t size = numOfEps * sizeof(SDnodeEp);
|
|
||||||
if (memcmp(pData->dnodeEps->pData, eps->pData, size) != 0) {
|
|
||||||
dDebug("new dnode list get from mnode");
|
|
||||||
dmResetEps(pData, eps);
|
|
||||||
dmWriteEps(pData);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
taosThreadRwlockUnlock(&pData->lock);
|
taosThreadRwlockUnlock(&pData->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -385,7 +385,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
|
int64_t dnodeVer = sdbGetTableVer(pMnode->pSdb, SDB_DNODE) + sdbGetTableVer(pMnode->pSdb, SDB_MNODE);
|
||||||
int64_t curMs = taosGetTimestampMs();
|
int64_t curMs = taosGetTimestampMs();
|
||||||
bool online = mndIsDnodeOnline(pDnode, curMs);
|
bool online = mndIsDnodeOnline(pDnode, curMs);
|
||||||
bool dnodeChanged = (statusReq.dnodeVer != dnodeVer);
|
bool dnodeChanged = (statusReq.dnodeVer == 0) || (statusReq.dnodeVer != dnodeVer);
|
||||||
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
|
bool reboot = (pDnode->rebootTime != statusReq.rebootTime);
|
||||||
bool needCheck = !online || dnodeChanged || reboot;
|
bool needCheck = !online || dnodeChanged || reboot;
|
||||||
|
|
||||||
|
@ -427,7 +427,7 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
|
||||||
if (!online) {
|
if (!online) {
|
||||||
mInfo("dnode:%d, from offline to online", pDnode->id);
|
mInfo("dnode:%d, from offline to online", pDnode->id);
|
||||||
} else {
|
} else {
|
||||||
mDebug("dnode:%d, send dnode epset, online:%d dnode_ver:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
|
mDebug("dnode:%d, send dnode epset, online:%d dnodeVer:%" PRId64 ":%" PRId64 " reboot:%d", pDnode->id, online,
|
||||||
statusReq.dnodeVer, dnodeVer, reboot);
|
statusReq.dnodeVer, dnodeVer, reboot);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -200,14 +200,14 @@ int32_t mndInitSync(SMnode *pMnode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("mnode sync is opened, id:%" PRId64, pMgmt->sync);
|
mDebug("mnode-sync is opened, id:%" PRId64, pMgmt->sync);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndCleanupSync(SMnode *pMnode) {
|
void mndCleanupSync(SMnode *pMnode) {
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
syncStop(pMgmt->sync);
|
syncStop(pMgmt->sync);
|
||||||
mDebug("mnode sync is stopped, id:%" PRId64, pMgmt->sync);
|
mDebug("mnode-sync is stopped, id:%" PRId64, pMgmt->sync);
|
||||||
|
|
||||||
tsem_destroy(&pMgmt->syncSem);
|
tsem_destroy(&pMgmt->syncSem);
|
||||||
memset(pMgmt, 0, sizeof(SSyncMgmt));
|
memset(pMgmt, 0, sizeof(SSyncMgmt));
|
||||||
|
|
|
@ -297,7 +297,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||||
action.pRaw = taosMemoryMalloc(dataLen);
|
action.pRaw = taosMemoryMalloc(dataLen);
|
||||||
if (action.pRaw == NULL) goto _OVER;
|
if (action.pRaw == NULL) goto _OVER;
|
||||||
mTrace("raw:%p, is created", pData);
|
// mTrace("raw:%p, is created", pData);
|
||||||
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
||||||
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
|
if (taosArrayPush(pTrans->redoActions, &action) == NULL) goto _OVER;
|
||||||
action.pRaw = NULL;
|
action.pRaw = NULL;
|
||||||
|
@ -330,7 +330,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||||
action.pRaw = taosMemoryMalloc(dataLen);
|
action.pRaw = taosMemoryMalloc(dataLen);
|
||||||
if (action.pRaw == NULL) goto _OVER;
|
if (action.pRaw == NULL) goto _OVER;
|
||||||
mTrace("raw:%p, is created", pData);
|
// mTrace("raw:%p, is created", action.pRaw);
|
||||||
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
||||||
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
|
if (taosArrayPush(pTrans->undoActions, &action) == NULL) goto _OVER;
|
||||||
action.pRaw = NULL;
|
action.pRaw = NULL;
|
||||||
|
@ -363,7 +363,7 @@ static SSdbRow *mndTransActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &dataLen, _OVER)
|
||||||
action.pRaw = taosMemoryMalloc(dataLen);
|
action.pRaw = taosMemoryMalloc(dataLen);
|
||||||
if (action.pRaw == NULL) goto _OVER;
|
if (action.pRaw == NULL) goto _OVER;
|
||||||
mTrace("raw:%p, is created", action.pRaw);
|
// mTrace("raw:%p, is created", action.pRaw);
|
||||||
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
SDB_GET_BINARY(pRaw, dataPos, (void *)action.pRaw, dataLen, _OVER);
|
||||||
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
|
if (taosArrayPush(pTrans->commitActions, &action) == NULL) goto _OVER;
|
||||||
action.pRaw = NULL;
|
action.pRaw = NULL;
|
||||||
|
|
|
@ -163,7 +163,12 @@ void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; }
|
||||||
|
|
||||||
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; }
|
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; }
|
||||||
|
|
||||||
void sdbSetCurConfig(SSdb *pSdb, int64_t config) { pSdb->curConfig = config; }
|
void sdbSetCurConfig(SSdb *pSdb, int64_t config) {
|
||||||
|
if (pSdb->curConfig != config) {
|
||||||
|
mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->curConfig, config);
|
||||||
|
pSdb->curConfig = config;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
|
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
|
||||||
|
|
||||||
|
|
|
@ -432,8 +432,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
||||||
} else {
|
} else {
|
||||||
pSdb->lastCommitVer = pSdb->curVer;
|
pSdb->lastCommitVer = pSdb->curVer;
|
||||||
pSdb->lastCommitTerm = pSdb->curTerm;
|
pSdb->lastCommitTerm = pSdb->curTerm;
|
||||||
mDebug("write sdb file successfully, ver:%" PRId64 " term:%" PRId64 " file:%s", pSdb->lastCommitVer,
|
mDebug("write sdb file successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
|
||||||
pSdb->lastCommitTerm, curfile);
|
pSdb->lastCommitVer, pSdb->lastCommitTerm, pSdb->curConfig, curfile);
|
||||||
}
|
}
|
||||||
|
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
|
Loading…
Reference in New Issue