refactor: sync integrate into mnode
This commit is contained in:
parent
89c1e82337
commit
68b7f6946d
|
@ -312,6 +312,8 @@ int64_t sdbGetTableVer(SSdb *pSdb, ESdbType type);
|
||||||
*/
|
*/
|
||||||
void sdbSetApplyIndex(SSdb *pSdb, int64_t index);
|
void sdbSetApplyIndex(SSdb *pSdb, int64_t index);
|
||||||
int64_t sdbGetApplyIndex(SSdb *pSdb);
|
int64_t sdbGetApplyIndex(SSdb *pSdb);
|
||||||
|
void sdbSetApplyTerm(SSdb *pSdb, int64_t term);
|
||||||
|
int64_t sdbGetApplyTerm(SSdb *pSdb);
|
||||||
|
|
||||||
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
||||||
void sdbFreeRaw(SSdbRaw *pRaw);
|
void sdbFreeRaw(SSdbRaw *pRaw);
|
||||||
|
@ -340,6 +342,7 @@ typedef struct SSdb {
|
||||||
char *tmpDir;
|
char *tmpDir;
|
||||||
int64_t lastCommitVer;
|
int64_t lastCommitVer;
|
||||||
int64_t curVer;
|
int64_t curVer;
|
||||||
|
int64_t curTerm;
|
||||||
int64_t tableVer[SDB_MAX];
|
int64_t tableVer[SDB_MAX];
|
||||||
int64_t maxId[SDB_MAX];
|
int64_t maxId[SDB_MAX];
|
||||||
EKeyType keyTypes[SDB_MAX];
|
EKeyType keyTypes[SDB_MAX];
|
||||||
|
|
|
@ -53,6 +53,7 @@ static void mndSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
||||||
static int32_t mndSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
static int32_t mndSyncGetSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
|
pSnapshot->lastApplyIndex = sdbGetApplyIndex(pMnode->pSdb);
|
||||||
|
pSnapshot->lastApplyTerm = sdbGetApplyTerm(pMnode->pSdb);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,7 @@ SSdb *sdbInit(SSdbOpt *pOption) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pSdb->curVer = -1;
|
pSdb->curVer = -1;
|
||||||
|
pSdb->curTerm = -1;
|
||||||
pSdb->lastCommitVer = -1;
|
pSdb->lastCommitVer = -1;
|
||||||
pSdb->pMnode = pOption->pMnode;
|
pSdb->pMnode = pOption->pMnode;
|
||||||
mDebug("sdb init successfully");
|
mDebug("sdb init successfully");
|
||||||
|
@ -159,3 +160,7 @@ static int32_t sdbCreateDir(SSdb *pSdb) {
|
||||||
void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; }
|
void sdbSetApplyIndex(SSdb *pSdb, int64_t index) { pSdb->curVer = index; }
|
||||||
|
|
||||||
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
|
int64_t sdbGetApplyIndex(SSdb *pSdb) { return pSdb->curVer; }
|
||||||
|
|
||||||
|
void sdbSetApplyTerm(SSdb *pSdb, int64_t term) { pSdb->curTerm = term; }
|
||||||
|
|
||||||
|
int64_t sdbGetApplyTerm(SSdb *pSdb) { return pSdb->curTerm; }
|
||||||
|
|
|
@ -65,6 +65,16 @@ static int32_t sdbReadFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ret = taosReadFile(pFile, &pSdb->curTerm, sizeof(int64_t));
|
||||||
|
if (ret < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (ret != sizeof(int64_t)) {
|
||||||
|
terrno = TSDB_CODE_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
||||||
int64_t maxId = 0;
|
int64_t maxId = 0;
|
||||||
ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
|
ret = taosReadFile(pFile, &maxId, sizeof(int64_t));
|
||||||
|
@ -123,6 +133,11 @@ static int32_t sdbWriteFileHead(SSdb *pSdb, TdFilePtr pFile) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (taosWriteFile(pFile, &pSdb->curTerm, sizeof(int64_t)) != sizeof(int64_t)) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
for (int32_t i = 0; i < SDB_TABLE_SIZE; ++i) {
|
||||||
int64_t maxId = 0;
|
int64_t maxId = 0;
|
||||||
if (i < SDB_MAX) {
|
if (i < SDB_MAX) {
|
||||||
|
@ -182,6 +197,7 @@ int32_t sdbReadFile(SSdb *pSdb) {
|
||||||
if (sdbReadFileHead(pSdb, pFile) != 0) {
|
if (sdbReadFileHead(pSdb, pFile) != 0) {
|
||||||
mError("failed to read file:%s head since %s", file, terrstr());
|
mError("failed to read file:%s head since %s", file, terrstr());
|
||||||
pSdb->curVer = -1;
|
pSdb->curVer = -1;
|
||||||
|
pSdb->curTerm = -1;
|
||||||
taosMemoryFree(pRaw);
|
taosMemoryFree(pRaw);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -256,8 +272,8 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
||||||
char curfile[PATH_MAX] = {0};
|
char curfile[PATH_MAX] = {0};
|
||||||
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
snprintf(curfile, sizeof(curfile), "%s%ssdb.data", pSdb->currDir, TD_DIRSEP);
|
||||||
|
|
||||||
mDebug("start to write file:%s, current ver:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
|
mDebug("start to write file:%s, current ver:%" PRId64 " term:%" PRId64 ", commit ver:%" PRId64, curfile, pSdb->curVer,
|
||||||
pSdb->lastCommitVer);
|
pSdb->curTerm, pSdb->lastCommitVer);
|
||||||
|
|
||||||
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
TdFilePtr pFile = taosOpenFile(tmpfile, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
|
@ -350,7 +366,7 @@ static int32_t sdbWriteFileImp(SSdb *pSdb) {
|
||||||
mError("failed to write file:%s since %s", curfile, tstrerror(code));
|
mError("failed to write file:%s since %s", curfile, tstrerror(code));
|
||||||
} else {
|
} else {
|
||||||
pSdb->lastCommitVer = pSdb->curVer;
|
pSdb->lastCommitVer = pSdb->curVer;
|
||||||
mDebug("write file:%s successfully, ver:%" PRId64, curfile, pSdb->lastCommitVer);
|
mDebug("write file:%s successfully, ver:%" PRId64 " term:%" PRId64, curfile, pSdb->lastCommitVer, pSdb->curTerm);
|
||||||
}
|
}
|
||||||
|
|
||||||
terrno = code;
|
terrno = code;
|
||||||
|
|
|
@ -294,6 +294,7 @@ void dumpTrans(SSdb *pSdb, SJson *json) {
|
||||||
void dumpHeader(SSdb *pSdb, SJson *json) {
|
void dumpHeader(SSdb *pSdb, SJson *json) {
|
||||||
tjsonAddIntegerToObject(json, "sver", 1);
|
tjsonAddIntegerToObject(json, "sver", 1);
|
||||||
tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer));
|
tjsonAddStringToObject(json, "curVer", i642str(pSdb->curVer));
|
||||||
|
tjsonAddStringToObject(json, "curTerm", i642str(pSdb->curTerm));
|
||||||
|
|
||||||
SJson *maxIdsJson = tjsonCreateObject();
|
SJson *maxIdsJson = tjsonCreateObject();
|
||||||
tjsonAddItemToObject(json, "maxIds", maxIdsJson);
|
tjsonAddItemToObject(json, "maxIds", maxIdsJson);
|
||||||
|
|
Loading…
Reference in New Issue