refactor: sdb commit index
This commit is contained in:
parent
43f8f34b0f
commit
ad5f9555fb
|
@ -169,11 +169,12 @@ typedef struct SSdb {
|
|||
SWal *pWal;
|
||||
char *currDir;
|
||||
char *tmpDir;
|
||||
int64_t lastCommitVer;
|
||||
int64_t lastCommitTerm;
|
||||
int64_t curVer;
|
||||
int64_t curTerm;
|
||||
int64_t curConfig;
|
||||
int64_t commitIndex;
|
||||
int64_t commitTerm;
|
||||
int64_t commitConfig;
|
||||
int64_t applyIndex;
|
||||
int64_t applyTerm;
|
||||
int64_t applyConfig;
|
||||
int64_t tableVer[SDB_MAX];
|
||||
int64_t maxId[SDB_MAX];
|
||||
EKeyType keyTypes[SDB_MAX];
|
||||
|
|
|
@ -53,11 +53,12 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
|||
}
|
||||
|
||||
pSdb->pWal = pOption->pWal;
|
||||
pSdb->curVer = -1;
|
||||
pSdb->curTerm = -1;
|
||||
pSdb->lastCommitVer = -1;
|
||||
pSdb->lastCommitTerm = -1;
|
||||
pSdb->curConfig = -1;
|
||||
pSdb->applyIndex = -1;
|
||||
pSdb->applyTerm = -1;
|
||||
pSdb->applyConfig = -1;
|
||||
pSdb->commitIndex = -1;
|
||||
pSdb->commitTerm = -1;
|
||||
pSdb->commitConfig = -1;
|
||||
pSdb->pMnode = pOption->pMnode;
|
||||
taosThreadMutexInit(&pSdb->filelock, NULL);
|
||||
mDebug("sdb init successfully");
|
||||
|
@ -159,23 +160,23 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; }
|
||||
void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->applyIndex = index; }
|
||||
|
||||
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; }
|
||||
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->applyTerm = term; }
|
||||
|
||||
void sdbSetCurConfig(SSdb *pSdb, int64_t config) {
|
||||
if (pSdb->curConfig != config) {
|
||||
mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->curConfig, config);
|
||||
pSdb->curConfig = config;
|
||||
if (pSdb->applyConfig != config) {
|
||||
mDebug("mnode sync config set from %" PRId64 " to %" PRId64, pSdb->applyConfig, config);
|
||||
pSdb->applyConfig = config;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
|
||||
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->applyIndex; }
|
||||
|
||||
int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; }
|
||||
int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->applyTerm; }
|
||||
|
||||
int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->lastCommitVer; }
|
||||
int64_t sdbGetCommitIndex(SSdb *pSdb) { return pSdb->commitIndex; }
|
||||
|
||||
int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->lastCommitTerm; }
|
||||
int64_t sdbGetCommitTerm(SSdb *pSdb) { return pSdb->commitTerm; }
|
||||
|
||||
int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->curConfig; }
|
||||
int64_t sdbGetCurConfig(SSdb *pSdb) { return pSdb->commitConfig; }
|
|
@ -67,10 +67,12 @@ static void sdbResetData(SSdb *pSdb) {
|
|||
mDebug("sdb:%s is reset", sdbTableName(i));
|
||||
}
|
||||
|
||||
pSdb->curVer = -1;
|
||||
pSdb->curTerm = -1;
|
||||
pSdb->lastCommitVer = -1;
|
||||
pSdb->lastCommitTerm = -1;
|
||||
pSdb->applyIndex = -1;
|
||||
pSdb->applyTerm = -1;
|
||||
pSdb->applyConfig = -1;
|
||||
pSdb->commitIndex = -1;
|
||||
pSdb->commitTerm = -1;
|
||||
pSdb->commitConfig = -1;
|
||||
mDebug("sdb reset successfully");
|
||||
}
|
||||
|
||||
|
@ -90,7 +92,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
ret = taosReadFile(pFile, &pSdb->curVer, sizeof(int64_t));
|
||||
ret = taosReadFile(pFile, &pSdb->applyIndex, sizeof(int64_t));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
|
@ -100,7 +102,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
|
||||
ret = taosReadFile(pFile, &pSdb->applyTerm, sizeof(int64_t));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
|
@ -110,7 +112,7 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
ret = taosReadFile(pFile, &pSdb->curConfig, sizeof(int64_t));
|
||||
ret = taosReadFile(pFile, &pSdb->applyConfig, sizeof(int64_t));
|
||||
if (ret < 0) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
|
@ -173,17 +175,17 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (taosWriteFile(pFile, &pSdb->curVer, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
if (taosWriteFile(pFile, &pSdb->applyIndex, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
if (taosWriteFile(pFile, &pSdb->applyTerm, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosWriteFile(pFile, &pSdb->curConfig, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
if (taosWriteFile(pFile, &pSdb->applyConfig, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
return -1;
|
||||
}
|
||||
|
@ -300,11 +302,12 @@ static int32_t sdbReadFileImp(SSdb *pSdb) {
|
|||
}
|
||||
|
||||
code = 0;
|
||||
pSdb->lastCommitVer = pSdb->curVer;
|
||||
pSdb->lastCommitTerm = pSdb->curTerm;
|
||||
pSdb->commitIndex = pSdb->applyIndex;
|
||||
pSdb->commitTerm = pSdb->applyTerm;
|
||||
pSdb->commitConfig = pSdb->applyConfig;
|
||||
memcpy(pSdb->tableVer, tableVer, sizeof(tableVer));
|
||||
mDebug("read sdb file:%s successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file, pSdb->lastCommitVer,
|
||||
pSdb->lastCommitTerm, pSdb->curConfig);
|
||||
mDebug("read sdb file:%s successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64, file,
|
||||
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig);
|
||||
|
||||
_OVER:
|
||||
taosCloseFile(&pFile);
|
||||
|
@ -336,9 +339,10 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|||
char curfile[PATH_MAX] = {0};
|
||||
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||
|
||||
mDebug("start to write sdb file, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64 " term:%" PRId64
|
||||
" file:%s",
|
||||
pSdb->curVer, pSdb->curTerm, pSdb->lastCommitVer, pSdb->lastCommitTerm, curfile);
|
||||
mDebug("start to write sdb file, apply index:%" PRId64 " term:%" PRId64 " config:%" PRId64 ", commit index:%" PRId64
|
||||
" term:%" PRId64 " config:%" PRId64 ", file:%s",
|
||||
pSdb->applyIndex, pSdb->applyTerm, pSdb->applyConfig, pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig,
|
||||
curfile);
|
||||
|
||||
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||
if (pFile == NULL) {
|
||||
|
@ -430,10 +434,11 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|||
if (code != 0) {
|
||||
mError("failed to write sdb file:%s since %s", curfile, tstrerror(code));
|
||||
} else {
|
||||
pSdb->lastCommitVer = pSdb->curVer;
|
||||
pSdb->lastCommitTerm = pSdb->curTerm;
|
||||
mDebug("write sdb file successfully, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
|
||||
pSdb->lastCommitVer, pSdb->lastCommitTerm, pSdb->curConfig, curfile);
|
||||
pSdb->commitIndex = pSdb->applyIndex;
|
||||
pSdb->commitTerm = pSdb->applyTerm;
|
||||
pSdb->commitConfig = pSdb->applyConfig;
|
||||
mDebug("write sdb file successfully, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
|
||||
pSdb->commitIndex, pSdb->commitTerm, pSdb->commitConfig, curfile);
|
||||
}
|
||||
|
||||
terrno = code;
|
||||
|
@ -442,13 +447,13 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
|||
|
||||
int32_t sdbWriteFile(SSdb *pSdb) {
|
||||
int32_t code = 0;
|
||||
if (pSdb->curVer == pSdb->lastCommitVer) {
|
||||
if (pSdb->applyIndex == pSdb->commitIndex) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
taosThreadMutexLock(&pSdb->filelock);
|
||||
if (pSdb->pWal != NULL) {
|
||||
code = walBeginSnapshot(pSdb->pWal, pSdb->curVer);
|
||||
code = walBeginSnapshot(pSdb->pWal, pSdb->applyIndex);
|
||||
}
|
||||
if (code == 0) {
|
||||
code = sdbWriteFileImp(pSdb);
|
||||
|
@ -522,9 +527,9 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
|
|||
snprintf(datafile, sizeof(datafile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||
|
||||
taosThreadMutexLock(&pSdb->filelock);
|
||||
int64_t commitIndex = pSdb->lastCommitVer;
|
||||
int64_t commitTerm = pSdb->lastCommitTerm;
|
||||
int64_t curConfig = pSdb->curConfig;
|
||||
int64_t commitIndex = pSdb->commitIndex;
|
||||
int64_t commitTerm = pSdb->commitTerm;
|
||||
int64_t commitConfig = pSdb->commitConfig;
|
||||
if (taosCopyFile(datafile, pIter->name) < 0) {
|
||||
taosThreadMutexUnlock(&pSdb->filelock);
|
||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||
|
@ -543,8 +548,8 @@ int32_t sdbStartRead(SSdb *pSdb, SSdbIter **ppIter) {
|
|||
}
|
||||
|
||||
*ppIter = pIter;
|
||||
mInfo("sdbiter:%p, is created to read snapshot, index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s", pIter,
|
||||
commitIndex, commitTerm, curConfig, pIter->name);
|
||||
mInfo("sdbiter:%p, is created to read snapshot, commit index:%" PRId64 " term:%" PRId64 " config:%" PRId64 " file:%s",
|
||||
pIter, commitIndex, commitTerm, commitConfig, pIter->name);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -294,8 +294,9 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
|
|||
|
||||
void dumpHeader(SSdb *pSdb, SJson *json) {
|
||||
tjsonAddIntegerToObject(json, "sver", 1);
|
||||
tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer));
|
||||
tjsonAddStringToObject(json, "curTerm", i642str(pSdb->curTerm));
|
||||
tjsonAddStringToObject(json, "applyIndex", i642str(pSdb->applyIndex));
|
||||
tjsonAddStringToObject(json, "applyTerm", i642str(pSdb->applyTerm));
|
||||
tjsonAddStringToObject(json, "applyConfig", i642str(pSdb->applyConfig));
|
||||
|
||||
SJson *maxIdsJson = tjsonCreateObject();
|
||||
tjsonAddItemToObject(json, "maxIds", maxIdsJson);
|
||||
|
|
Loading…
Reference in New Issue