refactor(sync): add raft log test

This commit is contained in:
Minghao Li 2022-06-08 16:45:40 +08:00
parent f6aed44076
commit 687caac2bf
5 changed files with 388 additions and 238 deletions

View File

@ -434,92 +434,43 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
return ret; return ret;
} }
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) { static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
return true;
}
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (pMsg->prevLogIndex > myLastIndex) {
return false;
}
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
return true;
}
return false;
}
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg, SSyncRaftEntry** ppAppendEntry,
bool* pEntryAlreadyWritten) {
int32_t code; int32_t code;
*ppAppendEntry = NULL;
*pEntryAlreadyWritten = false;
// not conflict by default SyncIndex delBegin = pMsg->prevLogIndex + 1;
bool conflict = false; SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
SyncIndex extraIndex = pMsg->prevLogIndex + 1; // invert roll back!
SSyncRaftEntry* pExtraEntry; for (SyncIndex index = delEnd; index >= delBegin; --index) {
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, extraIndex, &pExtraEntry); if (ths->pFsm->FpRollBackCb != NULL) {
ASSERT(pExtraEntry != NULL); SSyncRaftEntry* pRollBackEntry;
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
ASSERT(code == 0);
ASSERT(pRollBackEntry != NULL);
*ppAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); if (syncUtilUserRollback(pRollBackEntry->msgType)) {
ASSERT(*ppAppendEntry != NULL); SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
ASSERT(extraIndex == (*ppAppendEntry)->index); SFsmCbMeta cbMeta;
if (pExtraEntry->term != (*ppAppendEntry)->term) { cbMeta.index = pRollBackEntry->index;
// log not match, conflict, need delete cbMeta.isWeak = pRollBackEntry->isWeak;
conflict = true; cbMeta.code = 0;
} else { cbMeta.state = ths->state;
// log match, already written cbMeta.seqNum = pRollBackEntry->seqNum;
ASSERT(extraIndex == (*ppAppendEntry)->index && pExtraEntry->term == (*ppAppendEntry)->term); ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
*pEntryAlreadyWritten = true; rpcFreeCont(rpcMsg.pCont);
sInfo("entry already written, term:%lu, index:%ld", pExtraEntry->term, pExtraEntry->index);
}
syncEntryDestory(pExtraEntry);
if (conflict) {
// roll back
SyncIndex delBegin = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
SyncIndex delEnd = extraIndex;
sTrace("entry conflict:%d, delBegin:%ld, delEnd:%ld", conflict, delBegin, delEnd);
// notice! reverse roll back!
for (SyncIndex index = delEnd; index >= delBegin; --index) {
if (ths->pFsm->FpRollBackCb != NULL) {
SSyncRaftEntry* pRollBackEntry;
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
ASSERT(code == 0);
ASSERT(pRollBackEntry != NULL);
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
SRpcMsg rpcMsg;
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
SFsmCbMeta cbMeta;
cbMeta.index = pRollBackEntry->index;
cbMeta.isWeak = pRollBackEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pRollBackEntry->seqNum;
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
rpcFreeCont(rpcMsg.pCont);
}
syncEntryDestory(pRollBackEntry);
} }
}
// delete confict entries syncEntryDestory(pRollBackEntry);
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, extraIndex); }
ASSERT(code == 0);
} }
return 0; // delete confict entries
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
ASSERT(code == 0);
return code;
} }
static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
@ -540,6 +491,26 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
return 0; return 0;
} }
// really pre log match
// prevLogIndex == -1
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
return true;
}
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
if (pMsg->prevLogIndex > myLastIndex) {
return false;
}
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
return true;
}
return false;
}
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) { int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
int32_t ret = 0; int32_t ret = 0;
int32_t code = 0; int32_t code = 0;
@ -549,7 +520,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm); snprintf(logBuf, sizeof(logBuf), "recv SyncAppendEntries, term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesLog2(logBuf, pMsg); syncAppendEntriesLog2(logBuf, pMsg);
// if I am standby, be added into a raft group, I should process SyncAppendEntries msg // if I am standby, to be added into a raft group, I should process SyncAppendEntries msg
/* /*
// if already drop replica, do not process // if already drop replica, do not process
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) { if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
@ -573,57 +544,55 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg); bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
// case1, reject request // candidate to follower
if ((pMsg->term < ths->pRaftStore->currentTerm) || //
((pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK)) { // operation:
sTrace("recv SyncAppendEntries, reject, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d", pMsg->term, // to follower
ths->pRaftStore->currentTerm, ths->state, logOK); do {
bool condition = pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE;
if (condition) {
syncNodeBecomeFollower(ths);
// do not reply?
return ret;
}
} while (0);
// send response // fake match
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); //
pReply->srcId = ths->myRaftId; // condition1:
pReply->destId = pMsg->srcId; // I have snapshot, no log, preIndex > myLastIndex
pReply->term = ths->pRaftStore->currentTerm; //
pReply->success = false; // condition2:
pReply->matchIndex = SYNC_INDEX_INVALID; // I have snapshot, have log, log <= snapshot, preIndex > myLastIndex
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; //
// condition3:
SRpcMsg rpcMsg; // I have snapshot, preIndex <= snapshot.lastApplyIndex
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg); //
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg); // operation:
syncAppendEntriesReplyDestroy(pReply); // match snapshot.lastApplyIndex - 1;
// no operation on log
return ret; do {
} SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
// case 2, return to follower state
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_CANDIDATE) {
sTrace("recv SyncAppendEntries, return to follower, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK);
syncNodeBecomeFollower(ths);
// ret or reply?
return ret;
}
// case 3, index in my snapshot
if (pMsg->term == ths->pRaftStore->currentTerm && syncNodeHasSnapshot(ths)) {
SSnapshot snapshot; SSnapshot snapshot;
ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot); ths->pFsm->FpGetSnapshot(ths->pFsm, &snapshot);
if (pMsg->prevLogIndex < snapshot.lastApplyIndex) {
sTrace(
"recv SyncAppendEntries, accept, in snapshot, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d, "
"snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu",
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
bool condition0 = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
syncNodeHasSnapshot(ths);
bool condition1 =
condition0 && (ths->pLogStore->syncLogEntryCount(ths->pLogStore) == 0) && (pMsg->prevLogIndex > myLastIndex);
bool condition2 = condition0 && (ths->pLogStore->syncLogLastIndex(ths->pLogStore) <= snapshot.lastApplyIndex) &&
(pMsg->prevLogIndex > myLastIndex);
bool condition3 = condition0 && (pMsg->prevLogIndex <= snapshot.lastApplyIndex);
bool condition = condition1 || condition2 || condition3;
if (condition) {
// prepare response msg // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId; pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId; pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm; pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm; pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true;
pReply->matchIndex = snapshot.lastApplyIndex - 1; pReply->matchIndex = snapshot.lastApplyIndex - 1;
// send response // send response
@ -634,105 +603,120 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
return ret; return ret;
} }
} } while (0);
// case 4, accept request // not match
if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { //
// has extra entries (> preIndex) in local log // condition1:
SyncIndex myLastIndex = syncNodeGetLastIndex(ths); // term < myTerm
bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex; //
// condition2:
// !logOK
//
// operation:
// not match
// no operation on log
do {
bool condition1 = pMsg->term < ths->pRaftStore->currentTerm;
bool condition2 =
(pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && !logOK;
bool condition = condition1 || condition2;
// has entries in SyncAppendEntries msg if (condition) {
bool hasAppendEntries = pMsg->dataLen > 0; // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = false;
pReply->matchIndex = SYNC_INDEX_INVALID;
sTrace( // send response
"recv SyncAppendEntries, accept, receive_term:%lu, current_term:%lu, ths->state:%d, logOK:%d, " SRpcMsg rpcMsg;
"hasExtraEntries:%d, hasAppendEntries:%d", syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
pMsg->term, ths->pRaftStore->currentTerm, ths->state, logOK, hasExtraEntries, hasAppendEntries); syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
if (hasExtraEntries && hasAppendEntries) { return ret;
// make log same }
SSyncRaftEntry* pAppendEntry; } while (0);
bool entryAlreadyWritten;
code = syncNodeMakeLogSame(ths, pMsg, &pAppendEntry, &entryAlreadyWritten); // really match
ASSERT(code == 0); //
ASSERT(pAppendEntry != NULL); // condition:
// logOK
//
// operation:
// match
// make log same
do {
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && logOK;
if (condition) {
// has extra entries (> preIndex) in local log
SyncIndex myLastIndex = syncNodeGetLastIndex(ths);
bool hasExtraEntries = myLastIndex > pMsg->prevLogIndex;
// has entries in SyncAppendEntries msg
bool hasAppendEntries = pMsg->dataLen > 0;
if (hasExtraEntries) {
// make log same, rollback deleted entries
code = syncNodeMakeLogSame(ths, pMsg);
ASSERT(code == 0);
}
if (hasAppendEntries) {
// append entry
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen);
ASSERT(pAppendEntry != NULL);
if (!entryAlreadyWritten) {
// append new entries
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
ASSERT(code == 0); ASSERT(code == 0);
// pre commit // pre commit
code = syncNodePreCommit(ths, pAppendEntry); code = syncNodePreCommit(ths, pAppendEntry);
ASSERT(code == 0); ASSERT(code == 0);
syncEntryDestory(pAppendEntry);
} }
syncEntryDestory(pAppendEntry); // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
pReply->success = true;
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + 1 : pMsg->prevLogIndex;
} else if (hasExtraEntries && !hasAppendEntries) { // send response
// do nothing SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
} else if (!hasExtraEntries && hasAppendEntries) { // maybe update commit index, leader notice me
SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); if (pMsg->commitIndex > ths->commitIndex) {
ASSERT(pAppendEntry != NULL); // has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// append new entries // update commit index
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); ths->commitIndex = pMsg->commitIndex;
ASSERT(code == 0);
// pre commit // call back Wal
code = syncNodePreCommit(ths, pAppendEntry); code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0); ASSERT(code == 0);
syncEntryDestory(pAppendEntry); code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
ASSERT(code == 0);
} else if (!hasExtraEntries && !hasAppendEntries) { }
// do nothing
} else {
ASSERT(0);
}
// prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);
pReply->srcId = ths->myRaftId;
pReply->destId = pMsg->srcId;
pReply->term = ths->pRaftStore->currentTerm;
pReply->success = true;
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
if (hasAppendEntries) {
pReply->matchIndex = pMsg->prevLogIndex + 1;
} else {
pReply->matchIndex = pMsg->prevLogIndex;
}
// send response
SRpcMsg rpcMsg;
syncAppendEntriesReply2RpcMsg(pReply, &rpcMsg);
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
syncAppendEntriesReplyDestroy(pReply);
// maybe update commit index from leader
if (pMsg->commitIndex > ths->commitIndex) {
// has commit entry in local
if (pMsg->commitIndex <= ths->pLogStore->syncLogLastIndex(ths->pLogStore)) {
SyncIndex beginIndex = ths->commitIndex + 1;
SyncIndex endIndex = pMsg->commitIndex;
// update commit index
ths->commitIndex = pMsg->commitIndex;
// call back Wal
code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex);
ASSERT(code == 0);
code = syncNodeCommit(ths, beginIndex, endIndex, ths->state);
ASSERT(code == 0);
} }
return ret;
} }
} } while (0);
return ret; return ret;
} }

View File

@ -1283,7 +1283,6 @@ bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
SSnapshot snapshot; SSnapshot snapshot;
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
bool b = (index <= snapshot.lastApplyIndex); bool b = (index <= snapshot.lastApplyIndex);
return b; return b;
} }
@ -1307,17 +1306,10 @@ SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
} }
if (pSyncNode->pLogStore->syncLogEntryCount(pSyncNode->pLogStore) > 0) { SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
// has log if (logLastIndex > snapshot.lastApplyIndex) {
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
if (logLastIndex > snapshot.lastApplyIndex) {
lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
} else {
lastTerm = snapshot.lastApplyTerm;
}
} else { } else {
// no log
lastTerm = snapshot.lastApplyTerm; lastTerm = snapshot.lastApplyTerm;
} }
@ -1346,22 +1338,7 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode);
ASSERT(index <= syncStartIndex); ASSERT(index <= syncStartIndex);
SyncIndex preIndex; SyncIndex preIndex = index - 1;
if (syncNodeHasSnapshot(pSyncNode)) {
// has snapshot
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
}
// ASSERT(index > snapshot.lastApplyIndex);
preIndex = index - 1;
} else {
// no snapshot
preIndex = index - 1;
}
return preIndex; return preIndex;
} }
@ -1382,7 +1359,6 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot); pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
} }
// ASSERT(index > snapshot.lastApplyIndex);
if (index > snapshot.lastApplyIndex + 1) { if (index > snapshot.lastApplyIndex + 1) {
// should be log preTerm // should be log preTerm
SSyncRaftEntry* pPreEntry = NULL; SSyncRaftEntry* pPreEntry = NULL;
@ -1395,10 +1371,19 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) {
} else if (index == snapshot.lastApplyIndex + 1) { } else if (index == snapshot.lastApplyIndex + 1) {
preTerm = snapshot.lastApplyTerm; preTerm = snapshot.lastApplyTerm;
} else { } else {
// ASSERT(0);
// maybe snapshot change // maybe snapshot change
preTerm = snapshot.lastApplyTerm; sError("sync get pre term, bad scene. index:%ld", index);
logStoreLog2("sync get pre term, bad scene", pSyncNode->pLogStore);
SSyncRaftEntry* pPreEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry);
ASSERT(code == 0);
ASSERT(pPreEntry != NULL);
preTerm = pPreEntry->term;
taosMemoryFree(pPreEntry);
} }
} else { } else {

View File

@ -437,10 +437,6 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf); cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex); snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf); cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
@ -452,6 +448,17 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
int32_t count = raftLogEntryCount(pLogStore); int32_t count = raftLogEntryCount(pLogStore);
cJSON_AddNumberToObject(pRoot, "entryCount", count); cJSON_AddNumberToObject(pRoot, "entryCount", count);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogWriteIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "WriteIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%d", raftLogIsEmpty(pLogStore));
cJSON_AddStringToObject(pRoot, "IsEmpty", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
cJSON* pEntries = cJSON_CreateArray(); cJSON* pEntries = cJSON_CreateArray();
cJSON_AddItemToObject(pRoot, "pEntries", pEntries); cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
@ -484,10 +491,6 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf); cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex); snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf); cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
@ -498,6 +501,17 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
int32_t count = raftLogEntryCount(pLogStore); int32_t count = raftLogEntryCount(pLogStore);
cJSON_AddNumberToObject(pRoot, "entryCount", count); cJSON_AddNumberToObject(pRoot, "entryCount", count);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogWriteIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "WriteIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%d", raftLogIsEmpty(pLogStore));
cJSON_AddStringToObject(pRoot, "IsEmpty", u64buf);
snprintf(u64buf, sizeof(u64buf), "%ld", raftLogLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", raftLogLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
} }
cJSON* pJson = cJSON_CreateObject(); cJSON* pJson = cJSON_CreateObject();

View File

@ -32,6 +32,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return 0; return 0;
} }
bool gAssert = true;
void init() { void init() {
walInit(); walInit();
@ -68,6 +70,17 @@ void test1() {
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest1 ----- ", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 0);
assert(pLogStore->syncLogEndIndex(pLogStore) == -1);
assert(pLogStore->syncLogEntryCount(pLogStore) == 0);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 0);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 0);
assert(pLogStore->syncLogLastIndex(pLogStore) == -1);
assert(pLogStore->syncLogLastTerm(pLogStore) == 0);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
@ -76,6 +89,17 @@ void test1() {
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest1 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest1 restart ----- ", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 0);
assert(pLogStore->syncLogEndIndex(pLogStore) == -1);
assert(pLogStore->syncLogEntryCount(pLogStore) == 0);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 0);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 0);
assert(pLogStore->syncLogLastIndex(pLogStore) == -1);
assert(pLogStore->syncLogLastTerm(pLogStore) == 0);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
} }
@ -88,6 +112,17 @@ void test2() {
assert(pLogStore); assert(pLogStore);
pLogStore->syncLogSetBeginIndex(pLogStore, 5); pLogStore->syncLogSetBeginIndex(pLogStore, 5);
logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 5);
assert(pLogStore->syncLogEndIndex(pLogStore) == -1);
assert(pLogStore->syncLogEntryCount(pLogStore) == 0);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 5);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 0);
assert(pLogStore->syncLogLastIndex(pLogStore) == -1);
assert(pLogStore->syncLogLastTerm(pLogStore) == 0);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
@ -96,6 +131,17 @@ void test2() {
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest2 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest2 restart ----- ", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 5);
assert(pLogStore->syncLogEndIndex(pLogStore) == -1);
assert(pLogStore->syncLogEntryCount(pLogStore) == 0);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 5);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 0);
assert(pLogStore->syncLogLastIndex(pLogStore) == -1);
assert(pLogStore->syncLogLastTerm(pLogStore) == 0);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
} }
@ -108,6 +154,16 @@ void test3() {
assert(pLogStore); assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest3 ----- ", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 0);
assert(pLogStore->syncLogEndIndex(pLogStore) == -1);
assert(pLogStore->syncLogEntryCount(pLogStore) == 0);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 0);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 0);
assert(pLogStore->syncLogLastIndex(pLogStore) == -1);
assert(pLogStore->syncLogLastTerm(pLogStore) == 0);
}
for (int i = 0; i <= 4; ++i) { for (int i = 0; i <= 4; ++i) {
int32_t dataLen = 10; int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
@ -124,6 +180,17 @@ void test3() {
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
} }
logStoreLog2((char*)"test3 after appendEntry", pLogStore); logStoreLog2((char*)"test3 after appendEntry", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 0);
assert(pLogStore->syncLogEndIndex(pLogStore) == 4);
assert(pLogStore->syncLogEntryCount(pLogStore) == 5);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 5);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 1);
assert(pLogStore->syncLogLastIndex(pLogStore) == 4);
assert(pLogStore->syncLogLastTerm(pLogStore) == 104);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
@ -132,6 +199,17 @@ void test3() {
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest3 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest3 restart ----- ", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 0);
assert(pLogStore->syncLogEndIndex(pLogStore) == 4);
assert(pLogStore->syncLogEntryCount(pLogStore) == 5);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 5);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 1);
assert(pLogStore->syncLogLastIndex(pLogStore) == 4);
assert(pLogStore->syncLogLastTerm(pLogStore) == 104);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
} }
@ -161,6 +239,17 @@ void test4() {
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
} }
logStoreLog2((char*)"test4 after appendEntry", pLogStore); logStoreLog2((char*)"test4 after appendEntry", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 5);
assert(pLogStore->syncLogEndIndex(pLogStore) == 9);
assert(pLogStore->syncLogEntryCount(pLogStore) == 5);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 10);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 1);
assert(pLogStore->syncLogLastIndex(pLogStore) == 9);
assert(pLogStore->syncLogLastTerm(pLogStore) == 109);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
@ -169,6 +258,17 @@ void test4() {
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest4 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest4 restart ----- ", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 5);
assert(pLogStore->syncLogEndIndex(pLogStore) == 9);
assert(pLogStore->syncLogEntryCount(pLogStore) == 5);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 10);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 1);
assert(pLogStore->syncLogLastIndex(pLogStore) == 9);
assert(pLogStore->syncLogLastTerm(pLogStore) == 109);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
} }
@ -199,9 +299,29 @@ void test5() {
} }
logStoreLog2((char*)"test5 after appendEntry", pLogStore); logStoreLog2((char*)"test5 after appendEntry", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 5);
assert(pLogStore->syncLogEndIndex(pLogStore) == 9);
assert(pLogStore->syncLogEntryCount(pLogStore) == 5);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 10);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 1);
assert(pLogStore->syncLogLastIndex(pLogStore) == 9);
assert(pLogStore->syncLogLastTerm(pLogStore) == 109);
}
pLogStore->syncLogTruncate(pLogStore, 7); pLogStore->syncLogTruncate(pLogStore, 7);
logStoreLog2((char*)"after truncate 7", pLogStore); logStoreLog2((char*)"after truncate 7", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 5);
assert(pLogStore->syncLogEndIndex(pLogStore) == 6);
assert(pLogStore->syncLogEntryCount(pLogStore) == 2);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 7);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 1);
assert(pLogStore->syncLogLastIndex(pLogStore) == 6);
assert(pLogStore->syncLogLastTerm(pLogStore) == 106);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
@ -210,6 +330,17 @@ void test5() {
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest5 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest5 restart ----- ", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 5);
assert(pLogStore->syncLogEndIndex(pLogStore) == 6);
assert(pLogStore->syncLogEntryCount(pLogStore) == 2);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 7);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 1);
assert(pLogStore->syncLogLastIndex(pLogStore) == 6);
assert(pLogStore->syncLogLastTerm(pLogStore) == 106);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
} }
@ -240,9 +371,29 @@ void test6() {
} }
logStoreLog2((char*)"test6 after appendEntry", pLogStore); logStoreLog2((char*)"test6 after appendEntry", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 5);
assert(pLogStore->syncLogEndIndex(pLogStore) == 9);
assert(pLogStore->syncLogEntryCount(pLogStore) == 5);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 10);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 1);
assert(pLogStore->syncLogLastIndex(pLogStore) == 9);
assert(pLogStore->syncLogLastTerm(pLogStore) == 109);
}
pLogStore->syncLogTruncate(pLogStore, 5); pLogStore->syncLogTruncate(pLogStore, 5);
logStoreLog2((char*)"after truncate 5", pLogStore); logStoreLog2((char*)"after truncate 5", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 5);
assert(pLogStore->syncLogEndIndex(pLogStore) == -1);
assert(pLogStore->syncLogEntryCount(pLogStore) == 0);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 5);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 0);
assert(pLogStore->syncLogLastIndex(pLogStore) == -1);
assert(pLogStore->syncLogLastTerm(pLogStore) == 0);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
@ -251,6 +402,17 @@ void test6() {
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
logStoreLog2((char*)"\n\n\ntest6 restart ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest6 restart ----- ", pLogStore);
if (gAssert) {
assert(pLogStore->syncLogBeginIndex(pLogStore) == 5);
assert(pLogStore->syncLogEndIndex(pLogStore) == -1);
assert(pLogStore->syncLogEntryCount(pLogStore) == 0);
assert(pLogStore->syncLogWriteIndex(pLogStore) == 5);
assert(pLogStore->syncLogIsEmpty(pLogStore) == 0);
assert(pLogStore->syncLogLastIndex(pLogStore) == -1);
assert(pLogStore->syncLogLastTerm(pLogStore) == 0);
}
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
} }
@ -259,6 +421,11 @@ int main(int argc, char** argv) {
tsAsyncLog = 0; tsAsyncLog = 0;
sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE; sDebugFlag = DEBUG_TRACE + DEBUG_INFO + DEBUG_SCREEN + DEBUG_FILE;
if (argc == 2) {
gAssert = atoi(argv[1]);
}
sTrace("gAssert : %d", gAssert);
test1(); test1();
test2(); test2();
test3(); test3();

View File

@ -80,7 +80,7 @@ void test5() {
void test6() { void test6() {
SyncTimeout *pMsg = createMsg(); SyncTimeout *pMsg = createMsg();
char * jsonStr = syncTimeout2Str(pMsg); char * jsonStr = syncTimeout2Str(pMsg);
sTrace("jsonStr: %s", jsonStr); sTrace("jsonStr: %s", jsonStr);
syncUtilJson2Line(jsonStr); syncUtilJson2Line(jsonStr);