Merge pull request #14364 from taosdata/feature/3.0_mhli

refactor(sync): add SYNC_TERM_INVALID
This commit is contained in:
Li Minghao 2022-06-29 20:28:10 +08:00 committed by GitHub
commit 3a23fb71ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 107 additions and 168 deletions

View File

@ -324,6 +324,23 @@ void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg);
void syncAppendEntriesLog(const SyncAppendEntries* pMsg); void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
// ---------------------------------------------
typedef struct SyncAppendEntriesBatch {
uint32_t bytes;
int32_t vgId;
uint32_t msgType;
SRaftId srcId;
SRaftId destId;
// private data
SyncTerm term;
SyncIndex prevLogIndex;
SyncTerm prevLogTerm;
SyncIndex commitIndex;
SyncTerm privateTerm;
uint32_t dataLen;
char data[];
} SyncAppendEntriesBatch;
// --------------------------------------------- // ---------------------------------------------
typedef struct SyncAppendEntriesReply { typedef struct SyncAppendEntriesReply {
uint32_t bytes; uint32_t bytes;

View File

@ -221,7 +221,6 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode);
// snapshot -------------- // snapshot --------------
bool syncNodeHasSnapshot(SSyncNode* pSyncNode); bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index);
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode); SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);

View File

@ -860,7 +860,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
} }
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0); if (code != 0) {
return -1;
}
// pre commit // pre commit
code = syncNodePreCommit(ths, pAppendEntry); code = syncNodePreCommit(ths, pAppendEntry);
@ -971,7 +973,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
ASSERT(pAppendEntry != NULL); ASSERT(pAppendEntry != NULL);
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0); if (code != 0) {
return -1;
}
// pre commit // pre commit
code = syncNodePreCommit(ths, pAppendEntry); code = syncNodePreCommit(ths, pAppendEntry);

View File

@ -75,7 +75,11 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) {
if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) {
syncNodeFollower2Candidate(pSyncNode); syncNodeFollower2Candidate(pSyncNode);
} }
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE);
if (pSyncNode->state != TAOS_SYNC_STATE_CANDIDATE) {
syncNodeErrorLog(pSyncNode, "not candidate, can not elect");
return -1;
}
// start election // start election
raftStoreNextTerm(pSyncNode->pRaftStore); raftStoreNextTerm(pSyncNode->pRaftStore);

View File

@ -1923,6 +1923,8 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
} }
// snapshot -------------- // snapshot --------------
// return if has a snapshot
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
bool ret = false; bool ret = false;
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
@ -1935,21 +1937,10 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
return ret; return ret;
} }
#if 0 // return max(logLastIndex, snapshotLastIndex)
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) { // if no snapshot and log, return -1
ASSERT(syncNodeHasSnapshot(pSyncNode));
ASSERT(pSyncNode->pFsm->FpGetSnapshotInfo != NULL);
ASSERT(index >= SYNC_INDEX_BEGIN);
SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
bool b = (index <= snapshot.lastApplyIndex);
return b;
}
#endif
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
} }
@ -1959,6 +1950,8 @@ SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
return lastIndex; return lastIndex;
} }
// return the last term of snapshot and log
// if error, return SYNC_TERM_INVALID (by syncLogLastTerm)
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) { SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
SyncTerm lastTerm = 0; SyncTerm lastTerm = 0;
if (syncNodeHasSnapshot(pSyncNode)) { if (syncNodeHasSnapshot(pSyncNode)) {
@ -1990,11 +1983,14 @@ int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, Sy
return 0; return 0;
} }
// return append-entries first try index
SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) { SyncIndex syncNodeSyncStartIndex(SSyncNode* pSyncNode) {
SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1; SyncIndex syncStartIndex = syncNodeGetLastIndex(pSyncNode) + 1;
return syncStartIndex; return syncStartIndex;
} }
// if index > 0, return index - 1
// else, return -1
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncIndex preIndex = index - 1; SyncIndex preIndex = index - 1;
if (preIndex < SYNC_INDEX_INVALID) { if (preIndex < SYNC_INDEX_INVALID) {
@ -2004,21 +2000,10 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
return preIndex; return preIndex;
} }
/* // if index < 0, return SYNC_TERM_INVALID
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { // if index == 0, return 0
ASSERT(index >= SYNC_INDEX_BEGIN); // if index > 0, return preTerm
// if error, return SYNC_TERM_INVALID
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
if (index > syncStartIndex) {
syncNodeLog3("syncNodeGetPreIndex", pSyncNode);
ASSERT(0);
}
SyncIndex preIndex = index - 1;
return preIndex;
}
*/
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
if (index < SYNC_INDEX_BEGIN) { if (index < SYNC_INDEX_BEGIN) {
return SYNC_TERM_INVALID; return SYNC_TERM_INVALID;
@ -2056,112 +2041,6 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
return SYNC_TERM_INVALID; return SYNC_TERM_INVALID;
} }
#if 0
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
if (index > syncStartIndex) {
syncNodeLog3("syncNodeGetPreTerm", pSyncNode);
ASSERT(0);
}
if (index == SYNC_INDEX_BEGIN) {
return 0;
}
SyncTerm preTerm = 0;
SyncIndex preIndex = index - 1;
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
if (code == 0) {
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
taosMemoryFree(pPreEntry);
return preTerm;
} else {
if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) {
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
if (snapshot.lastApplyIndex == preIndex) {
return snapshot.lastApplyTerm;
}
}
}
}
ASSERT(0);
return -1;
}
#endif
#if 0
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
ASSERT(index >= SYNC_INDEX_BEGIN);
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
if (index > syncStartIndex) {
syncNodeLog3("syncNodeGetPreTerm", pSyncNode);
ASSERT(0);
}
if (index == SYNC_INDEX_BEGIN) {
return 0;
}
SyncTerm preTerm = 0;
if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1};
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
}
if (index > snapshot.lastApplyIndex + 1) {
// should be log preTerm
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
ASSERT(code == 0);
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
taosMemoryFree(pPreEntry);
} else if (index == snapshot.lastApplyIndex + 1) {
preTerm = snapshot.lastApplyTerm;
} else {
// maybe snapshot change
sError("sync get pre term, bad scene. index:%ld", index);
logStoreLog2("sync get pre term, bad scene", pSyncNode->pLogStore);
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
ASSERT(code == 0);
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
taosMemoryFree(pPreEntry);
}
} else {
// no snapshot
ASSERT(index > SYNC_INDEX_BEGIN);
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
ASSERT(code == 0);
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
taosMemoryFree(pPreEntry);
}
return preTerm;
}
#endif
// get pre index and term of "index" // get pre index and term of "index"
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) { int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
*pPreIndex = syncNodeGetPreIndex(pSyncNode, index); *pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
@ -2351,8 +2230,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
// ths->pLogStore->appendEntry(ths->pLogStore, pEntry); int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); ASSERT(code == 0);
syncNodeReplicate(ths); syncNodeReplicate(ths);
} }
@ -2406,6 +2285,7 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
// //
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) { int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
int32_t ret = 0; int32_t ret = 0;
int32_t code = 0;
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
@ -2414,18 +2294,24 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
ASSERT(pEntry != NULL); ASSERT(pEntry != NULL);
if (ths->state == TAOS_SYNC_STATE_LEADER) { if (ths->state == TAOS_SYNC_STATE_LEADER) {
// ths->pLogStore->appendEntry(ths->pLogStore, pEntry); // append entry
ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
if (code != 0) {
// del resp mgr, call FpCommitCb
ASSERT(0);
return -1;
}
// start replicate right now! // if mulit replica, start replicate right now
syncNodeReplicate(ths); if (ths->replicaNum > 1) {
syncNodeReplicate(ths);
}
// pre commit // pre commit
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;
@ -2439,8 +2325,10 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
} }
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
// only myself, maybe commit // if only myself, maybe commit right now
syncMaybeAdvanceCommitIndex(ths); if (ths->replicaNum == 1) {
syncMaybeAdvanceCommitIndex(ths);
}
} else { } else {
// pre commit // pre commit
@ -2448,7 +2336,6 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
if (ths->pFsm != NULL) { if (ths->pFsm != NULL) {
// if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) {
if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;

View File

@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg); cJSON *pJson = syncCfg2Json(pSyncCfg);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) { if (pSyncCfg != NULL) {
int32_t len = 512; int32_t len = 512;
char * s = taosMemoryMalloc(len); char *s = taosMemoryMalloc(len);
memset(s, 0, len); memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg); cJSON *pJson = raftCfg2Json(pRaftCfg);
char * serialized = cJSON_Print(pJson); char *serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -214,7 +214,16 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
ASSERT(pCfg != NULL); ASSERT(pCfg != NULL);
TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE); TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE);
ASSERT(pFile != NULL); if (pFile == NULL) {
int32_t err = terrno;
const char *errStr = tstrerror(err);
int32_t sysErr = errno;
const char *sysErrStr = strerror(errno);
sError("create raft cfg file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, sysErrStr);
ASSERT(0);
return -1;
}
SRaftCfg raftCfg; SRaftCfg raftCfg;
raftCfg.cfg = *pCfg; raftCfg.cfg = *pCfg;
@ -271,7 +280,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
} }
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0); ASSERT(code == 0);

View File

@ -168,6 +168,9 @@ static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
return lastVer + 1; return lastVer + 1;
} }
// if success, return last term
// if not log, return 0
// if error, return SYNC_TERM_INVALID
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
@ -176,15 +179,17 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
} else { } else {
SSyncRaftEntry* pLastEntry; SSyncRaftEntry* pLastEntry;
int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry); int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry);
ASSERT(code == 0); if (code == 0 && pLastEntry != NULL) {
ASSERT(pLastEntry != NULL); SyncTerm lastTerm = pLastEntry->term;
taosMemoryFree(pLastEntry);
SyncTerm lastTerm = pLastEntry->term; return lastTerm;
taosMemoryFree(pLastEntry); } else {
return lastTerm; return SYNC_TERM_INVALID;
}
} }
return 0; // can not be here!
return SYNC_TERM_INVALID;
} }
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
@ -218,16 +223,21 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
ASSERT(0); ASSERT(0);
} }
walFsync(pWal, true); // walFsync(pWal, true);
char eventLog[128]; do {
snprintf(eventLog, sizeof(eventLog), "write index:%ld, type:%s,%d, type2:%s,%d", pEntry->index, char eventLog[128];
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); snprintf(eventLog, sizeof(eventLog), "write index:%ld, type:%s,%d, type2:%s,%d", pEntry->index,
syncNodeEventLog(pData->pSyncNode, eventLog); TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
syncNodeEventLog(pData->pSyncNode, eventLog);
} while (0);
return code; return code;
} }
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// other error, return -1
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) { static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
@ -238,6 +248,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle; SWalReadHandle* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) { if (pWalHandle == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
return -1; return -1;
} }
@ -309,6 +320,9 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
return code; return code;
} }
// entry found, return 0
// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
// other error, return -1
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) { static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
@ -320,7 +334,8 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
return -1; return -1;
} else { } else {
SyncIndex lastIndex = raftLogLastIndex(pLogStore); SyncIndex lastIndex = raftLogLastIndex(pLogStore);
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); ASSERT(lastIndex >= SYNC_INDEX_BEGIN);
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
return code; return code;
} }
@ -356,7 +371,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
ASSERT(0); ASSERT(0);
} }
walFsync(pWal, true); // walFsync(pWal, true);
char eventLog[128]; char eventLog[128];
snprintf(eventLog, sizeof(eventLog), "old write index:%ld, type:%s,%d, type2:%s,%d", pEntry->index, snprintf(eventLog, sizeof(eventLog), "old write index:%ld, type:%s,%d, type2:%s,%d", pEntry->index,

View File

@ -156,6 +156,10 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM
SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode); SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode);
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (myLastTerm == SYNC_TERM_INVALID) {
return false;
}
if (pMsg->lastLogTerm > myLastTerm) { if (pMsg->lastLogTerm > myLastTerm) {
return true; return true;
} }