fix(sync): persist isStandby
This commit is contained in:
parent
9b051104ae
commit
8c5f6b6849
|
@ -146,6 +146,7 @@ typedef struct SSyncLogStore {
|
|||
} SSyncLogStore;
|
||||
|
||||
typedef struct SSyncInfo {
|
||||
bool isStandBy;
|
||||
SyncGroupId vgId;
|
||||
SSyncCfg syncCfg;
|
||||
char path[TSDB_FILENAME_LEN];
|
||||
|
@ -160,7 +161,6 @@ int32_t syncInit();
|
|||
void syncCleanUp();
|
||||
int64_t syncOpen(const SSyncInfo* pSyncInfo);
|
||||
void syncStart(int64_t rid);
|
||||
void syncStartStandBy(int64_t rid);
|
||||
void syncStop(int64_t rid);
|
||||
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
|
||||
ESyncState syncGetMyRole(int64_t rid);
|
||||
|
@ -173,6 +173,10 @@ bool syncEnvIsStart();
|
|||
const char* syncStr(ESyncState state);
|
||||
bool syncIsRestoreFinish(int64_t rid);
|
||||
|
||||
// to be moved to static
|
||||
void syncStartNormal(int64_t rid);
|
||||
void syncStartStandBy(int64_t rid);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -126,6 +126,7 @@ int32_t mndInitSync(SMnode *pMnode) {
|
|||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s%ssync", pMnode->path, TD_DIRSEP);
|
||||
syncInfo.pWal = pMgmt->pWal;
|
||||
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
|
||||
syncInfo.isStandBy = pMgmt->standby;
|
||||
|
||||
SSyncCfg *pCfg = &syncInfo.syncCfg;
|
||||
pCfg->replicaNum = pMnode->replica;
|
||||
|
@ -192,11 +193,17 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw) {
|
|||
void mndSyncStart(SMnode *pMnode) {
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
|
||||
|
||||
syncStart(pMgmt->sync);
|
||||
|
||||
#if 0
|
||||
if (pMgmt->standby) {
|
||||
syncStartStandBy(pMgmt->sync);
|
||||
} else {
|
||||
syncStart(pMgmt->sync);
|
||||
}
|
||||
#endif
|
||||
|
||||
mDebug("sync:%" PRId64 " is started", pMgmt->sync);
|
||||
}
|
||||
|
||||
|
|
|
@ -31,6 +31,7 @@ typedef struct SRaftCfg {
|
|||
SSyncCfg cfg;
|
||||
TdFilePtr pFile;
|
||||
char path[TSDB_FILENAME_LEN * 2];
|
||||
int8_t isStandBy;
|
||||
} SRaftCfg;
|
||||
|
||||
SRaftCfg *raftCfgOpen(const char *path);
|
||||
|
@ -42,10 +43,12 @@ char * syncCfg2Str(SSyncCfg *pSyncCfg);
|
|||
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
|
||||
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
|
||||
|
||||
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg);
|
||||
cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
|
||||
char * raftCfg2Str(SRaftCfg *pRaftCfg);
|
||||
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
|
||||
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
|
||||
|
||||
int32_t syncCfgCreateFile(SSyncCfg *pCfg, const char *path);
|
||||
int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncCfgPrint(SSyncCfg *pCfg);
|
||||
|
|
|
@ -100,6 +100,21 @@ void syncStart(int64_t rid) {
|
|||
if (pSyncNode == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (pSyncNode->pRaftCfg->isStandBy) {
|
||||
syncNodeStartStandBy(pSyncNode);
|
||||
} else {
|
||||
syncNodeStart(pSyncNode);
|
||||
}
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
}
|
||||
|
||||
void syncStartNormal(int64_t rid) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
return;
|
||||
}
|
||||
syncNodeStart(pSyncNode);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
|
@ -368,7 +383,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
|||
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
|
||||
if (!taosCheckExistFile(pSyncNode->configPath)) {
|
||||
// create raft config file
|
||||
ret = syncCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncNode->configPath);
|
||||
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncInfo->isStandBy, pSyncNode->configPath);
|
||||
assert(ret == 0);
|
||||
|
||||
} else {
|
||||
|
@ -979,6 +994,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig) {
|
|||
voteGrantedUpdate(pSyncNode->pVotesGranted, pSyncNode);
|
||||
votesRespondUpdate(pSyncNode->pVotesRespond, pSyncNode);
|
||||
|
||||
pSyncNode->pRaftCfg->isStandBy = 0;
|
||||
raftCfgPersist(pSyncNode->pRaftCfg);
|
||||
|
||||
syncNodeLog2("==syncNodeUpdateConfig==", pSyncNode);
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ SRaftCfg *raftCfgOpen(const char *path) {
|
|||
int len = taosReadFile(pCfg->pFile, buf, sizeof(buf));
|
||||
assert(len > 0);
|
||||
|
||||
int32_t ret = syncCfgFromStr(buf, &(pCfg->cfg));
|
||||
int32_t ret = raftCfgFromStr(buf, pCfg);
|
||||
assert(ret == 0);
|
||||
|
||||
return pCfg;
|
||||
|
@ -48,7 +48,7 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg) {
|
|||
int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
|
||||
assert(pRaftCfg != NULL);
|
||||
|
||||
char *s = syncCfg2Str(&(pRaftCfg->cfg));
|
||||
char *s = raftCfg2Str(pRaftCfg);
|
||||
taosLSeekFile(pRaftCfg->pFile, 0, SEEK_SET);
|
||||
int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1);
|
||||
assert(ret == strlen(s) + 1);
|
||||
|
@ -76,9 +76,12 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
|
|||
}
|
||||
}
|
||||
|
||||
return pRoot;
|
||||
/*
|
||||
cJSON *pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SSyncCfg", pRoot);
|
||||
return pJson;
|
||||
*/
|
||||
}
|
||||
|
||||
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
||||
|
@ -90,7 +93,8 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
|||
|
||||
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) {
|
||||
memset(pSyncCfg, 0, sizeof(SSyncCfg));
|
||||
cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
|
||||
// cJSON *pJson = cJSON_GetObjectItem(pRoot, "SSyncCfg");
|
||||
const cJSON *pJson = pRoot;
|
||||
|
||||
cJSON *pReplicaNum = cJSON_GetObjectItem(pJson, "replicaNum");
|
||||
assert(cJSON_IsNumber(pReplicaNum));
|
||||
|
@ -133,22 +137,32 @@ int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg) {
|
|||
}
|
||||
|
||||
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
||||
cJSON *pJson = syncCfg2Json(&(pRaftCfg->cfg));
|
||||
cJSON *pRoot = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
|
||||
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
|
||||
|
||||
cJSON *pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "RaftCfg", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
|
||||
char *s = syncCfg2Str(&(pRaftCfg->cfg));
|
||||
return s;
|
||||
cJSON *pJson = raftCfg2Json(pRaftCfg);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
int32_t syncCfgCreateFile(SSyncCfg *pCfg, const char *path) {
|
||||
int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) {
|
||||
assert(pCfg != NULL);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE);
|
||||
assert(pFile != NULL);
|
||||
|
||||
char * s = syncCfg2Str(pCfg);
|
||||
SRaftCfg raftCfg;
|
||||
raftCfg.cfg = *pCfg;
|
||||
raftCfg.isStandBy = isStandBy;
|
||||
char * s = raftCfg2Str(&raftCfg);
|
||||
int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1);
|
||||
assert(ret == strlen(s) + 1);
|
||||
|
||||
|
@ -157,6 +171,31 @@ int32_t syncCfgCreateFile(SSyncCfg *pCfg, const char *path) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
|
||||
// memset(pRaftCfg, 0, sizeof(SRaftCfg));
|
||||
cJSON *pJson = cJSON_GetObjectItem(pRoot, "RaftCfg");
|
||||
|
||||
cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy");
|
||||
pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy);
|
||||
|
||||
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
|
||||
ASSERT(code == 0);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg) {
|
||||
cJSON *pRoot = cJSON_Parse(s);
|
||||
assert(pRoot != NULL);
|
||||
|
||||
int32_t ret = raftCfgFromJson(pRoot, pRaftCfg);
|
||||
assert(ret == 0);
|
||||
|
||||
cJSON_Delete(pRoot);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncCfgPrint(SSyncCfg *pCfg) {
|
||||
char *serialized = syncCfg2Str(pCfg);
|
||||
|
|
|
@ -15,6 +15,21 @@ void logTest() {
|
|||
sFatal("--- sync log test: fatal");
|
||||
}
|
||||
|
||||
SRaftCfg* createRaftCfg() {
|
||||
SRaftCfg* pCfg = (SRaftCfg*)taosMemoryMalloc(sizeof(SRaftCfg));
|
||||
memset(pCfg, 0, sizeof(SRaftCfg));
|
||||
|
||||
pCfg->cfg.replicaNum = 3;
|
||||
pCfg->cfg.myIndex = 1;
|
||||
for (int i = 0; i < pCfg->cfg.replicaNum; ++i) {
|
||||
((pCfg->cfg.nodeInfo)[i]).nodePort = i * 100;
|
||||
snprintf(((pCfg->cfg.nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->cfg.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i);
|
||||
}
|
||||
pCfg->isStandBy = taosGetTimestampSec() % 100;
|
||||
|
||||
return pCfg;
|
||||
}
|
||||
|
||||
SSyncCfg* createSyncCfg() {
|
||||
SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg));
|
||||
memset(pCfg, 0, sizeof(SSyncCfg));
|
||||
|
@ -56,7 +71,7 @@ void test3() {
|
|||
if (taosCheckExistFile(s)) {
|
||||
printf("%s file: %s already exist! \n", (char*)__FUNCTION__, s);
|
||||
} else {
|
||||
syncCfgCreateFile(pCfg, s);
|
||||
raftCfgCreateFile(pCfg, 7, s);
|
||||
printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
|
||||
}
|
||||
|
||||
|
@ -78,6 +93,7 @@ void test5() {
|
|||
assert(pCfg != NULL);
|
||||
|
||||
pCfg->cfg.myIndex = taosGetTimestampSec();
|
||||
pCfg->isStandBy += 2;
|
||||
raftCfgPersist(pCfg);
|
||||
|
||||
printf("%s update json file: %s myIndex->%d \n", (char*)__FUNCTION__, "./test3_raft_cfg.json", pCfg->cfg.myIndex);
|
||||
|
|
Loading…
Reference in New Issue