update sync raft store
This commit is contained in:
parent
aa921c9870
commit
a5cd3444d1
|
@ -25,17 +25,17 @@ static int32_t raftStoreDecode(const SJson *pJson, SRaftStore *pStore) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
tjsonGetNumberValue(pJson, "current_term", pStore->currentTerm, code);
|
tjsonGetNumberValue(pJson, "current_term", pStore->currentTerm, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
tjsonGetNumberValue(pJson, "vote_for_addr", pStore->voteFor.addr, code);
|
tjsonGetNumberValue(pJson, "vote_for_addr", pStore->voteFor.addr, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
tjsonGetInt32ValueFromDouble(pJson, "vote_for_vgid", pStore->voteFor.vgId, code);
|
tjsonGetInt32ValueFromDouble(pJson, "vote_for_vgid", pStore->voteFor.vgId, code);
|
||||||
if (code < 0) return -1;
|
if (code < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
|
|
||||||
return 0;
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftStoreReadFile(SSyncNode *pNode) {
|
int32_t raftStoreReadFile(SSyncNode *pNode) {
|
||||||
int32_t code = -1;
|
int32_t code = -1, lino = 0;
|
||||||
TdFilePtr pFile = NULL;
|
TdFilePtr pFile = NULL;
|
||||||
char *pData = NULL;
|
char *pData = NULL;
|
||||||
SJson *pJson = NULL;
|
SJson *pJson = NULL;
|
||||||
|
@ -52,41 +52,38 @@ int32_t raftStoreReadFile(SSyncNode *pNode) {
|
||||||
|
|
||||||
pFile = taosOpenFile(file, TD_FILE_READ);
|
pFile = taosOpenFile(file, TD_FILE_READ);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
sError("vgId:%d, failed to open raft store file:%s since %s", pNode->vgId, file, terrstr());
|
sError("vgId:%d, failed to open raft store file:%s since %s", pNode->vgId, file, terrstr());
|
||||||
goto _OVER;
|
|
||||||
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t size = 0;
|
int64_t size = 0;
|
||||||
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
if (taosFStatFile(pFile, &size, NULL) < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
sError("vgId:%d, failed to fstat raft store file:%s since %s", pNode->vgId, file, terrstr());
|
sError("vgId:%d, failed to fstat raft store file:%s since %s", pNode->vgId, file, terrstr());
|
||||||
goto _OVER;
|
|
||||||
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
pData = taosMemoryMalloc(size + 1);
|
pData = taosMemoryMalloc(size + 1);
|
||||||
if (pData == NULL) {
|
if (pData == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosReadFile(pFile, pData, size) != size) {
|
if (taosReadFile(pFile, pData, size) != size) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
|
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
|
||||||
goto _OVER;
|
|
||||||
|
TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
}
|
}
|
||||||
|
|
||||||
pData[size] = '\0';
|
pData[size] = '\0';
|
||||||
|
|
||||||
pJson = tjsonParse(pData);
|
pJson = tjsonParse(pData);
|
||||||
if (pJson == NULL) {
|
if (pJson == NULL) {
|
||||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_JSON_FORMAT, &lino, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (raftStoreDecode(pJson, pStore) < 0) {
|
if (raftStoreDecode(pJson, pStore) < 0) {
|
||||||
terrno = TSDB_CODE_INVALID_JSON_FORMAT;
|
TAOS_CHECK_GOTO(TSDB_CODE_INVALID_JSON_FORMAT, &lino, _OVER);
|
||||||
goto _OVER;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -100,18 +97,20 @@ _OVER:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
|
sError("vgId:%d, failed to read raft store file:%s since %s", pNode->vgId, file, terrstr());
|
||||||
}
|
}
|
||||||
return code;
|
|
||||||
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t raftStoreEncode(SJson *pJson, SRaftStore *pStore) {
|
static int32_t raftStoreEncode(SJson *pJson, SRaftStore *pStore) {
|
||||||
if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "current_term", pStore->currentTerm) < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "vote_for_addr", pStore->voteFor.addr) < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) return -1;
|
if (tjsonAddDoubleToObject(pJson, "vote_for_vgid", pStore->voteFor.vgId) < 0) TAOS_RETURN(TSDB_CODE_FAILED);
|
||||||
return 0;
|
|
||||||
|
TAOS_RETURN(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftStoreWriteFile(SSyncNode *pNode) {
|
int32_t raftStoreWriteFile(SSyncNode *pNode) {
|
||||||
int32_t code = -1;
|
int32_t code = -1, lino = 0;
|
||||||
char *buffer = NULL;
|
char *buffer = NULL;
|
||||||
SJson *pJson = NULL;
|
SJson *pJson = NULL;
|
||||||
TdFilePtr pFile = NULL;
|
TdFilePtr pFile = NULL;
|
||||||
|
@ -120,23 +119,23 @@ int32_t raftStoreWriteFile(SSyncNode *pNode) {
|
||||||
char file[PATH_MAX] = {0};
|
char file[PATH_MAX] = {0};
|
||||||
snprintf(file, sizeof(file), "%s.bak", realfile);
|
snprintf(file, sizeof(file), "%s.bak", realfile);
|
||||||
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
pJson = tjsonCreateObject();
|
pJson = tjsonCreateObject();
|
||||||
if (pJson == NULL) goto _OVER;
|
if (pJson == NULL) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
||||||
if (raftStoreEncode(pJson, pStore) != 0) goto _OVER;
|
if (raftStoreEncode(pJson, pStore) != 0) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
||||||
|
|
||||||
buffer = tjsonToString(pJson);
|
buffer = tjsonToString(pJson);
|
||||||
if (buffer == NULL) goto _OVER;
|
if (buffer == NULL) TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _OVER);
|
||||||
terrno = 0;
|
|
||||||
|
|
||||||
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||||
if (pFile == NULL) goto _OVER;
|
if (pFile == NULL) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
|
|
||||||
int32_t len = strlen(buffer);
|
int32_t len = strlen(buffer);
|
||||||
if (taosWriteFile(pFile, buffer, len) <= 0) goto _OVER;
|
if (taosWriteFile(pFile, buffer, len) <= 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
if (taosFsyncFile(pFile) < 0) goto _OVER;
|
|
||||||
|
if (taosFsyncFile(pFile) < 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
|
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
if (taosRenameFile(file, realfile) != 0) goto _OVER;
|
if (taosRenameFile(file, realfile) != 0) TAOS_CHECK_GOTO(TAOS_SYSTEM_ERROR(errno), &lino, _OVER);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
sInfo("vgId:%d, succeed to write raft store file:%s, term:%" PRId64, pNode->vgId, realfile, pStore->currentTerm);
|
sInfo("vgId:%d, succeed to write raft store file:%s, term:%" PRId64, pNode->vgId, realfile, pStore->currentTerm);
|
||||||
|
@ -147,7 +146,6 @@ _OVER:
|
||||||
if (pFile != NULL) taosCloseFile(&pFile);
|
if (pFile != NULL) taosCloseFile(&pFile);
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (terrno == 0) terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
sError("vgId:%d, failed to write raft store file:%s since %s", pNode->vgId, realfile, terrstr());
|
sError("vgId:%d, failed to write raft store file:%s since %s", pNode->vgId, realfile, terrstr());
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
|
|
Loading…
Reference in New Issue