enh(sync): add log store2
This commit is contained in:
parent
ba8adb73cb
commit
5908631ec3
|
@ -141,6 +141,21 @@ typedef struct SSyncLogStore {
|
||||||
// return commit index of log
|
// return commit index of log
|
||||||
SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
|
SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore);
|
||||||
|
|
||||||
|
// refactor, log[0 .. n] ==> log[m .. n]
|
||||||
|
int32_t (*syncLogSetBeginIndex)(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
|
||||||
|
SyncIndex (*syncLogBeginIndex)(struct SSyncLogStore* pLogStore);
|
||||||
|
SyncIndex (*syncLogEndIndex)(struct SSyncLogStore* pLogStore);
|
||||||
|
bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
|
||||||
|
int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
|
||||||
|
bool (*syncLogInRange)(struct SSyncLogStore* pLogStore, SyncIndex index);
|
||||||
|
|
||||||
|
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
|
||||||
|
SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore);
|
||||||
|
|
||||||
|
int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
||||||
|
int32_t (*syncLogGetEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
|
int32_t (*syncLogTruncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
|
||||||
|
|
||||||
} SSyncLogStore;
|
} SSyncLogStore;
|
||||||
|
|
||||||
typedef struct SSyncInfo {
|
typedef struct SSyncInfo {
|
||||||
|
|
|
@ -212,9 +212,14 @@ void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId);
|
||||||
void syncNodeVoteForSelf(SSyncNode* pSyncNode);
|
void syncNodeVoteForSelf(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
// snapshot --------------
|
// snapshot --------------
|
||||||
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index);
|
bool syncNodeHasSnapshot(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
|
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index);
|
||||||
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
|
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode);
|
||||||
|
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode);
|
||||||
|
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm);
|
||||||
|
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
|
||||||
|
SyncTerm syncNodeGetPreITerm(SSyncNode* pSyncNode, SyncIndex index);
|
||||||
|
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
|
||||||
|
|
||||||
// for debug --------------
|
// for debug --------------
|
||||||
void syncNodePrint(SSyncNode* pObj);
|
void syncNodePrint(SSyncNode* pObj);
|
||||||
|
|
|
@ -30,6 +30,7 @@ extern "C" {
|
||||||
typedef struct SSyncLogStoreData {
|
typedef struct SSyncLogStoreData {
|
||||||
SSyncNode* pSyncNode;
|
SSyncNode* pSyncNode;
|
||||||
SWal* pWal;
|
SWal* pWal;
|
||||||
|
SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
|
||||||
} SSyncLogStoreData;
|
} SSyncLogStoreData;
|
||||||
|
|
||||||
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
|
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
|
||||||
|
@ -41,15 +42,6 @@ char* logStoreSimple2Str(SSyncLogStore* pLogStore);
|
||||||
|
|
||||||
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore);
|
SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore);
|
||||||
|
|
||||||
// SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
|
|
||||||
// SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
|
|
||||||
// SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
|
|
||||||
// SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index);
|
|
||||||
// int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
|
||||||
// int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
|
|
||||||
// int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
|
|
||||||
// SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
|
|
||||||
|
|
||||||
// for debug
|
// for debug
|
||||||
void logStorePrint(SSyncLogStore* pLogStore);
|
void logStorePrint(SSyncLogStore* pLogStore);
|
||||||
void logStorePrint2(char* s, SSyncLogStore* pLogStore);
|
void logStorePrint2(char* s, SSyncLogStore* pLogStore);
|
||||||
|
|
|
@ -30,7 +30,7 @@ static int32_t syncIODestroy(SSyncIO *io);
|
||||||
static int32_t syncIOStartInternal(SSyncIO *io);
|
static int32_t syncIOStartInternal(SSyncIO *io);
|
||||||
static int32_t syncIOStopInternal(SSyncIO *io);
|
static int32_t syncIOStopInternal(SSyncIO *io);
|
||||||
|
|
||||||
static void *syncIOConsumerFunc(void *param);
|
static void * syncIOConsumerFunc(void *param);
|
||||||
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||||
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
|
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
|
||||||
|
@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void *syncIOConsumerFunc(void *param) {
|
static void *syncIOConsumerFunc(void *param) {
|
||||||
SSyncIO *io = param;
|
SSyncIO * io = param;
|
||||||
STaosQall *qall;
|
STaosQall *qall;
|
||||||
SRpcMsg *pRpcMsg, rpcMsg;
|
SRpcMsg * pRpcMsg, rpcMsg;
|
||||||
qall = taosAllocateQall();
|
qall = taosAllocateQall();
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
|
|
@ -1272,6 +1272,18 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// snapshot --------------
|
// snapshot --------------
|
||||||
|
bool syncNodeHasSnapshot(SSyncNode* pSyncNode) {
|
||||||
|
bool ret = false;
|
||||||
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
|
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
||||||
|
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
||||||
|
if (snapshot.lastApplyIndex >= SYNC_INDEX_BEGIN) {
|
||||||
|
ret = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
|
bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
||||||
|
@ -1279,54 +1291,73 @@ bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) {
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) {
|
||||||
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
|
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
||||||
|
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
||||||
|
}
|
||||||
|
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||||
|
|
||||||
|
SyncIndex lastIndex = logLastIndex > snapshot.lastApplyIndex ? logLastIndex : snapshot.lastApplyIndex;
|
||||||
|
return lastIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode) {
|
||||||
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
|
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
||||||
|
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
||||||
|
}
|
||||||
|
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||||
|
|
||||||
|
SyncTerm lastTerm = 0;
|
||||||
|
if (logLastIndex >= snapshot.lastApplyIndex) {
|
||||||
|
lastTerm = pSyncNode->pLogStore->syncLogLastTerm(pSyncNode->pLogStore);
|
||||||
|
} else {
|
||||||
|
lastTerm = snapshot.lastApplyTerm;
|
||||||
|
}
|
||||||
|
return lastTerm;
|
||||||
|
}
|
||||||
|
|
||||||
// get last index and term along with snapshot
|
// get last index and term along with snapshot
|
||||||
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
|
int32_t syncNodeGetLastIndexTerm(SSyncNode* pSyncNode, SyncIndex* pLastIndex, SyncTerm* pLastTerm) {
|
||||||
SyncIndex logLastIndex = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore);
|
*pLastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||||
SSnapshot snapshot;
|
*pLastTerm = syncNodeGetLastTerm(pSyncNode);
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
|
||||||
SyncIndex snapshotLastIndex = snapshot.lastApplyIndex;
|
|
||||||
|
|
||||||
if (logLastIndex > snapshotLastIndex) {
|
|
||||||
*pLastIndex = logLastIndex;
|
|
||||||
*pLastTerm = pSyncNode->pLogStore->getLastTerm(pSyncNode->pLogStore);
|
|
||||||
} else if (logLastIndex == snapshotLastIndex) {
|
|
||||||
*pLastIndex = snapshotLastIndex;
|
|
||||||
*pLastTerm = snapshot.lastApplyTerm;
|
|
||||||
} else if (logLastIndex < snapshotLastIndex) {
|
|
||||||
// maybe wal is deleted
|
|
||||||
*pLastIndex = snapshotLastIndex;
|
|
||||||
*pLastTerm = snapshot.lastApplyTerm;
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) {
|
||||||
|
ASSERT(index >= SYNC_INDEX_BEGIN);
|
||||||
|
SyncIndex preIndex = index - 1;
|
||||||
|
return preIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncTerm syncNodeGetPreITerm(SSyncNode* pSyncNode, SyncIndex index) {
|
||||||
|
SyncTerm preTerm = 0;
|
||||||
|
SyncIndex preIndex = syncNodeGetPreIndex(pSyncNode, index);
|
||||||
|
|
||||||
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
|
if (pSyncNode->pFsm->FpGetSnapshot != NULL) {
|
||||||
|
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (syncNodeIsIndexInSnapshot(pSyncNode, preIndex) && preIndex == snapshot.lastApplyIndex) {
|
||||||
|
preTerm = snapshot.lastApplyTerm;
|
||||||
|
} else {
|
||||||
|
SSyncRaftEntry* pPreEntry = NULL;
|
||||||
|
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
if (pPreEntry != NULL) {
|
||||||
|
preTerm = pPreEntry->term;
|
||||||
|
taosMemoryFree(pPreEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return preTerm;
|
||||||
|
}
|
||||||
|
|
||||||
// get pre index and term of "index"
|
// get pre index and term of "index"
|
||||||
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
|
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) {
|
||||||
ASSERT(index >= SYNC_INDEX_BEGIN);
|
*pPreIndex = syncNodeGetPreIndex(pSyncNode, index);
|
||||||
// ASSERT(!syncNodeIsIndexInSnapshot(pSyncNode, index));
|
*pPreTerm = syncNodeGetPreITerm(pSyncNode, index);
|
||||||
int ret = 0;
|
|
||||||
|
|
||||||
SyncIndex preIndex = index - 1;
|
|
||||||
if (syncNodeIsIndexInSnapshot(pSyncNode, preIndex)) {
|
|
||||||
SSnapshot snapshot;
|
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
|
||||||
ASSERT(preIndex == snapshot.lastApplyIndex);
|
|
||||||
|
|
||||||
*pPreIndex = snapshot.lastApplyIndex;
|
|
||||||
*pPreTerm = snapshot.lastApplyTerm;
|
|
||||||
} else {
|
|
||||||
SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preIndex);
|
|
||||||
if (pPreEntry != NULL) {
|
|
||||||
*pPreIndex = pPreEntry->index;
|
|
||||||
*pPreTerm = pPreEntry->term;
|
|
||||||
} else {
|
|
||||||
*pPreIndex = SYNC_INDEX_INVALID;
|
|
||||||
*pPreTerm = 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,6 +16,22 @@
|
||||||
#include "syncRaftLog.h"
|
#include "syncRaftLog.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
|
// refactor, log[0 .. n] ==> log[m .. n]
|
||||||
|
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex);
|
||||||
|
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore);
|
||||||
|
static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore);
|
||||||
|
static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore);
|
||||||
|
static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore);
|
||||||
|
static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index);
|
||||||
|
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore);
|
||||||
|
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore);
|
||||||
|
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
||||||
|
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
|
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
|
||||||
|
|
||||||
|
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
|
||||||
|
|
||||||
|
//-------------------------------
|
||||||
static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
|
static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
|
||||||
static SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
|
static SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
|
||||||
static SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
|
static SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
|
||||||
|
@ -25,6 +41,168 @@ static int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex from
|
||||||
static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
|
static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
|
||||||
static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
|
static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
|
||||||
|
|
||||||
|
// refactor, log[0 .. n] ==> log[m .. n]
|
||||||
|
static int32_t raftLogSetBeginIndex(struct SSyncLogStore* pLogStore, SyncIndex beginIndex) {
|
||||||
|
// if beginIndex == 0, donot need call this funciton
|
||||||
|
ASSERT(beginIndex > 0);
|
||||||
|
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
pData->beginIndex = beginIndex;
|
||||||
|
walRestoreFromSnapshot(pWal, beginIndex - 1);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
return pData->beginIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore) { return raftLogLastIndex(pLogStore); }
|
||||||
|
|
||||||
|
static bool raftLogIsEmpty(struct SSyncLogStore* pLogStore) {
|
||||||
|
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
|
||||||
|
SyncIndex endIndex = raftLogEndIndex(pLogStore);
|
||||||
|
return (endIndex >= beginIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore) {
|
||||||
|
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
|
||||||
|
SyncIndex endIndex = raftLogEndIndex(pLogStore);
|
||||||
|
int32_t count = endIndex - beginIndex;
|
||||||
|
return count > 0 ? count : 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static bool raftLogInRange(struct SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
|
SyncIndex beginIndex = raftLogBeginIndex(pLogStore);
|
||||||
|
SyncIndex endIndex = raftLogEndIndex(pLogStore);
|
||||||
|
if (index >= beginIndex && index <= endIndex) {
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) {
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
SyncIndex lastVer = walGetLastVer(pWal);
|
||||||
|
return lastVer;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
|
||||||
|
SyncTerm lastTerm = 0;
|
||||||
|
if (raftLogEntryCount == 0) {
|
||||||
|
lastTerm = 0;
|
||||||
|
} else {
|
||||||
|
SSyncRaftEntry* pLastEntry;
|
||||||
|
int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
lastTerm = pLastEntry->term;
|
||||||
|
taosMemoryFree(pLastEntry);
|
||||||
|
}
|
||||||
|
return lastTerm;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
|
||||||
|
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
|
||||||
|
ASSERT(pEntry->index == lastIndex + 1);
|
||||||
|
|
||||||
|
int code = 0;
|
||||||
|
SSyncLogMeta syncMeta;
|
||||||
|
syncMeta.isWeek = pEntry->isWeak;
|
||||||
|
syncMeta.seqNum = pEntry->seqNum;
|
||||||
|
syncMeta.term = pEntry->term;
|
||||||
|
code = walWriteWithSyncInfo(pWal, pEntry->index, pEntry->originalRpcType, syncMeta, pEntry->data, pEntry->dataLen);
|
||||||
|
if (code != 0) {
|
||||||
|
int32_t err = terrno;
|
||||||
|
const char* errStr = tstrerror(err);
|
||||||
|
int32_t linuxErr = errno;
|
||||||
|
const char* linuxErrMsg = strerror(errno);
|
||||||
|
sError("raftLogAppendEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
|
||||||
|
linuxErrMsg);
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
walFsync(pWal, true);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
*ppEntry = NULL;
|
||||||
|
if (raftLogInRange(pLogStore, index)) {
|
||||||
|
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
|
||||||
|
ASSERT(pWalHandle != NULL);
|
||||||
|
|
||||||
|
code = walReadWithHandle(pWalHandle, index);
|
||||||
|
if (code != 0) {
|
||||||
|
int32_t err = terrno;
|
||||||
|
const char* errStr = tstrerror(err);
|
||||||
|
int32_t linuxErr = errno;
|
||||||
|
const char* linuxErrMsg = strerror(errno);
|
||||||
|
sError("raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
|
||||||
|
linuxErrMsg);
|
||||||
|
walCloseReadHandle(pWalHandle);
|
||||||
|
ASSERT(0);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
|
||||||
|
ASSERT(*ppEntry != NULL);
|
||||||
|
(*ppEntry)->msgType = TDMT_VND_SYNC_CLIENT_REQUEST;
|
||||||
|
(*ppEntry)->originalRpcType = pWalHandle->pHead->head.msgType;
|
||||||
|
(*ppEntry)->seqNum = pWalHandle->pHead->head.syncMeta.seqNum;
|
||||||
|
(*ppEntry)->isWeak = pWalHandle->pHead->head.syncMeta.isWeek;
|
||||||
|
(*ppEntry)->term = pWalHandle->pHead->head.syncMeta.term;
|
||||||
|
(*ppEntry)->index = index;
|
||||||
|
ASSERT((*ppEntry)->dataLen == pWalHandle->pHead->head.bodyLen);
|
||||||
|
memcpy((*ppEntry)->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen);
|
||||||
|
|
||||||
|
// need to hold, do not new every time!!
|
||||||
|
walCloseReadHandle(pWalHandle);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// index not in range
|
||||||
|
code = -2;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex) {
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
int32_t code = walRollback(pWal, fromIndex);
|
||||||
|
if (code != 0) {
|
||||||
|
int32_t err = terrno;
|
||||||
|
const char* errStr = tstrerror(err);
|
||||||
|
int32_t linuxErr = errno;
|
||||||
|
const char* linuxErrMsg = strerror(errno);
|
||||||
|
sError("raftLogTruncate error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
|
||||||
|
linuxErrMsg);
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) {
|
||||||
|
if (raftLogEntryCount(pLogStore) == 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
SyncIndex lastIndex = raftLogLastIndex(pLogStore);
|
||||||
|
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
//-------------------------------
|
||||||
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
||||||
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
|
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
|
||||||
assert(pLogStore != NULL);
|
assert(pLogStore != NULL);
|
||||||
|
@ -36,6 +214,16 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
||||||
pData->pSyncNode = pSyncNode;
|
pData->pSyncNode = pSyncNode;
|
||||||
pData->pWal = pSyncNode->pWal;
|
pData->pWal = pSyncNode->pWal;
|
||||||
|
|
||||||
|
SyncIndex firstVer = walGetFirstVer(pData->pWal);
|
||||||
|
SyncIndex lastVer = walGetLastVer(pData->pWal);
|
||||||
|
if (firstVer >= 0) {
|
||||||
|
pData->beginIndex = firstVer;
|
||||||
|
} else if (firstVer == -1) {
|
||||||
|
pData->beginIndex = lastVer + 1;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
|
||||||
pLogStore->appendEntry = logStoreAppendEntry;
|
pLogStore->appendEntry = logStoreAppendEntry;
|
||||||
pLogStore->getEntry = logStoreGetEntry;
|
pLogStore->getEntry = logStoreGetEntry;
|
||||||
pLogStore->truncate = logStoreTruncate;
|
pLogStore->truncate = logStoreTruncate;
|
||||||
|
@ -43,6 +231,19 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
||||||
pLogStore->getLastTerm = logStoreLastTerm;
|
pLogStore->getLastTerm = logStoreLastTerm;
|
||||||
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
|
pLogStore->updateCommitIndex = logStoreUpdateCommitIndex;
|
||||||
pLogStore->getCommitIndex = logStoreGetCommitIndex;
|
pLogStore->getCommitIndex = logStoreGetCommitIndex;
|
||||||
|
|
||||||
|
pLogStore->syncLogSetBeginIndex = raftLogSetBeginIndex;
|
||||||
|
pLogStore->syncLogBeginIndex = raftLogBeginIndex;
|
||||||
|
pLogStore->syncLogEndIndex = raftLogEndIndex;
|
||||||
|
pLogStore->syncLogIsEmpty = raftLogIsEmpty;
|
||||||
|
pLogStore->syncLogEntryCount = raftLogEntryCount;
|
||||||
|
pLogStore->syncLogInRange = raftLogInRange;
|
||||||
|
pLogStore->syncLogLastIndex = raftLogLastIndex;
|
||||||
|
pLogStore->syncLogLastTerm = raftLogLastTerm;
|
||||||
|
pLogStore->syncLogAppendEntry = raftLogAppendEntry;
|
||||||
|
pLogStore->syncLogGetEntry = raftLogGetEntry;
|
||||||
|
pLogStore->syncLogTruncate = raftLogTruncate;
|
||||||
|
|
||||||
return pLogStore;
|
return pLogStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -53,6 +254,7 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//-------------------------------
|
||||||
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
|
@ -136,7 +338,7 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
|
||||||
linuxErrMsg);
|
linuxErrMsg);
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
return 0; // to avoid compiler error
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
|
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
|
||||||
|
@ -169,7 +371,7 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
|
sError("walCommit error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr, linuxErrMsg);
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
return 0; // to avoid compiler error
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
|
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
|
||||||
|
@ -204,9 +406,20 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
|
||||||
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
|
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
|
||||||
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
|
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
|
||||||
|
|
||||||
cJSON* pEntries = cJSON_CreateArray();
|
cJSON* pEntries = cJSON_CreateArray();
|
||||||
cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
|
cJSON_AddItemToObject(pRoot, "pEntries", pEntries);
|
||||||
SyncIndex lastIndex = logStoreLastIndex(pLogStore);
|
SyncIndex lastIndex = logStoreLastIndex(pLogStore);
|
||||||
|
|
||||||
|
for (SyncIndex i = pData->beginIndex; i <= lastIndex; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i);
|
||||||
|
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
for (SyncIndex i = 0; i <= lastIndex; ++i) {
|
for (SyncIndex i = 0; i <= lastIndex; ++i) {
|
||||||
SyncIndex walFirstVer = walGetFirstVer(pData->pWal);
|
SyncIndex walFirstVer = walGetFirstVer(pData->pWal);
|
||||||
|
|
||||||
|
@ -216,6 +429,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pEntry);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
@ -244,6 +458,9 @@ cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore) {
|
||||||
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
|
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
|
||||||
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
|
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
|
||||||
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
|
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%ld", pData->beginIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
|
|
@ -44,6 +44,9 @@ add_executable(syncSnapshotRspTest "")
|
||||||
add_executable(syncSnapshotSenderTest "")
|
add_executable(syncSnapshotSenderTest "")
|
||||||
add_executable(syncSnapshotReceiverTest "")
|
add_executable(syncSnapshotReceiverTest "")
|
||||||
add_executable(syncTestTool "")
|
add_executable(syncTestTool "")
|
||||||
|
add_executable(syncRaftLogTest "")
|
||||||
|
add_executable(syncRaftLogTest2 "")
|
||||||
|
add_executable(syncRaftLogTest3 "")
|
||||||
|
|
||||||
|
|
||||||
target_sources(syncTest
|
target_sources(syncTest
|
||||||
|
@ -230,6 +233,18 @@ target_sources(syncTestTool
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"syncTestTool.cpp"
|
"syncTestTool.cpp"
|
||||||
)
|
)
|
||||||
|
target_sources(syncRaftLogTest
|
||||||
|
PRIVATE
|
||||||
|
"syncRaftLogTest.cpp"
|
||||||
|
)
|
||||||
|
target_sources(syncRaftLogTest2
|
||||||
|
PRIVATE
|
||||||
|
"syncRaftLogTest2.cpp"
|
||||||
|
)
|
||||||
|
target_sources(syncRaftLogTest3
|
||||||
|
PRIVATE
|
||||||
|
"syncRaftLogTest3.cpp"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
target_include_directories(syncTest
|
target_include_directories(syncTest
|
||||||
|
@ -462,6 +477,21 @@ target_include_directories(syncTestTool
|
||||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
target_include_directories(syncRaftLogTest
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
target_include_directories(syncRaftLogTest2
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
target_include_directories(syncRaftLogTest3
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
target_link_libraries(syncTest
|
target_link_libraries(syncTest
|
||||||
|
@ -648,6 +678,18 @@ target_link_libraries(syncTestTool
|
||||||
sync
|
sync
|
||||||
gtest_main
|
gtest_main
|
||||||
)
|
)
|
||||||
|
target_link_libraries(syncRaftLogTest
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
target_link_libraries(syncRaftLogTest2
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
target_link_libraries(syncRaftLogTest3
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
enable_testing()
|
enable_testing()
|
||||||
|
|
|
@ -0,0 +1,172 @@
|
||||||
|
#include "syncRaftLog.h"
|
||||||
|
//#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncEnv.h"
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
const char *gWalPath = "./syncLogStoreTest_wal";
|
||||||
|
|
||||||
|
void init() { walInit(); }
|
||||||
|
|
||||||
|
void test1() {
|
||||||
|
taosRemoveDir(gWalPath);
|
||||||
|
|
||||||
|
SWalCfg walCfg;
|
||||||
|
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||||
|
walCfg.vgId = 1000;
|
||||||
|
walCfg.fsyncPeriod = 1000;
|
||||||
|
walCfg.retentionPeriod = 1000;
|
||||||
|
walCfg.rollPeriod = 1000;
|
||||||
|
walCfg.retentionSize = 1000;
|
||||||
|
walCfg.segSize = 1000;
|
||||||
|
walCfg.level = TAOS_WAL_FSYNC;
|
||||||
|
SWal *pWal = walOpen(gWalPath, &walCfg);
|
||||||
|
assert(pWal != NULL);
|
||||||
|
|
||||||
|
int64_t firstVer = walGetFirstVer(pWal);
|
||||||
|
int64_t lastVer = walGetLastVer(pWal);
|
||||||
|
printf("firstVer:%ld lastVer:%ld \n", firstVer, lastVer);
|
||||||
|
|
||||||
|
walClose(pWal);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test2() {
|
||||||
|
taosRemoveDir(gWalPath);
|
||||||
|
|
||||||
|
SWalCfg walCfg;
|
||||||
|
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||||
|
walCfg.vgId = 1000;
|
||||||
|
walCfg.fsyncPeriod = 1000;
|
||||||
|
walCfg.retentionPeriod = 1000;
|
||||||
|
walCfg.rollPeriod = 1000;
|
||||||
|
walCfg.retentionSize = 1000;
|
||||||
|
walCfg.segSize = 1000;
|
||||||
|
walCfg.level = TAOS_WAL_FSYNC;
|
||||||
|
SWal *pWal = walOpen(gWalPath, &walCfg);
|
||||||
|
assert(pWal != NULL);
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
int code = walWrite(pWal, i, 100, "aa", 3);
|
||||||
|
if (code != 0) {
|
||||||
|
printf("code:%d terror:%d msg:%s i:%d \n", code, terrno, tstrerror(terrno), i);
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t firstVer = walGetFirstVer(pWal);
|
||||||
|
int64_t lastVer = walGetLastVer(pWal);
|
||||||
|
printf("firstVer:%ld lastVer:%ld \n", firstVer, lastVer);
|
||||||
|
|
||||||
|
walClose(pWal);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test3() {
|
||||||
|
taosRemoveDir(gWalPath);
|
||||||
|
|
||||||
|
SWalCfg walCfg;
|
||||||
|
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||||
|
walCfg.vgId = 1000;
|
||||||
|
walCfg.fsyncPeriod = 1000;
|
||||||
|
walCfg.retentionPeriod = 1000;
|
||||||
|
walCfg.rollPeriod = 1000;
|
||||||
|
walCfg.retentionSize = 1000;
|
||||||
|
walCfg.segSize = 1000;
|
||||||
|
walCfg.level = TAOS_WAL_FSYNC;
|
||||||
|
SWal *pWal = walOpen(gWalPath, &walCfg);
|
||||||
|
assert(pWal != NULL);
|
||||||
|
|
||||||
|
walRestoreFromSnapshot(pWal, 5);
|
||||||
|
|
||||||
|
int64_t firstVer = walGetFirstVer(pWal);
|
||||||
|
int64_t lastVer = walGetLastVer(pWal);
|
||||||
|
printf("firstVer:%ld lastVer:%ld \n", firstVer, lastVer);
|
||||||
|
|
||||||
|
walClose(pWal);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test4() {
|
||||||
|
taosRemoveDir(gWalPath);
|
||||||
|
|
||||||
|
SWalCfg walCfg;
|
||||||
|
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||||
|
walCfg.vgId = 1000;
|
||||||
|
walCfg.fsyncPeriod = 1000;
|
||||||
|
walCfg.retentionPeriod = 1000;
|
||||||
|
walCfg.rollPeriod = 1000;
|
||||||
|
walCfg.retentionSize = 1000;
|
||||||
|
walCfg.segSize = 1000;
|
||||||
|
walCfg.level = TAOS_WAL_FSYNC;
|
||||||
|
SWal *pWal = walOpen(gWalPath, &walCfg);
|
||||||
|
assert(pWal != NULL);
|
||||||
|
|
||||||
|
walRestoreFromSnapshot(pWal, 5);
|
||||||
|
|
||||||
|
for (int i = 6; i < 10; ++i) {
|
||||||
|
int code = walWrite(pWal, i, 100, "aa", 3);
|
||||||
|
if (code != 0) {
|
||||||
|
printf("code:%d terror:%d msg:%s i:%d \n", code, terrno, tstrerror(terrno), i);
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t firstVer = walGetFirstVer(pWal);
|
||||||
|
int64_t lastVer = walGetLastVer(pWal);
|
||||||
|
printf("firstVer:%ld lastVer:%ld \n", firstVer, lastVer);
|
||||||
|
|
||||||
|
walClose(pWal);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test5() {
|
||||||
|
taosRemoveDir(gWalPath);
|
||||||
|
|
||||||
|
SWalCfg walCfg;
|
||||||
|
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||||
|
walCfg.vgId = 1000;
|
||||||
|
walCfg.fsyncPeriod = 1000;
|
||||||
|
walCfg.retentionPeriod = 1000;
|
||||||
|
walCfg.rollPeriod = 1000;
|
||||||
|
walCfg.retentionSize = 1000;
|
||||||
|
walCfg.segSize = 1000;
|
||||||
|
walCfg.level = TAOS_WAL_FSYNC;
|
||||||
|
SWal *pWal = walOpen(gWalPath, &walCfg);
|
||||||
|
assert(pWal != NULL);
|
||||||
|
|
||||||
|
walRestoreFromSnapshot(pWal, 5);
|
||||||
|
walRestoreFromSnapshot(pWal, 7);
|
||||||
|
|
||||||
|
int64_t firstVer = walGetFirstVer(pWal);
|
||||||
|
int64_t lastVer = walGetLastVer(pWal);
|
||||||
|
printf("firstVer:%ld lastVer:%ld \n", firstVer, lastVer);
|
||||||
|
|
||||||
|
walClose(pWal);
|
||||||
|
}
|
||||||
|
|
||||||
|
void cleanup() { walCleanUp(); }
|
||||||
|
|
||||||
|
int main(int argc, char **argv) {
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||||
|
init();
|
||||||
|
|
||||||
|
test1();
|
||||||
|
test2();
|
||||||
|
test3();
|
||||||
|
test4();
|
||||||
|
test5();
|
||||||
|
|
||||||
|
cleanup();
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncEnv.h"
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftLog.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode;
|
||||||
|
SWal* pWal;
|
||||||
|
SSyncLogStore* pLogStore;
|
||||||
|
const char* pWalPath = "./syncLogStoreTest_wal";
|
||||||
|
|
||||||
|
void init() {
|
||||||
|
walInit();
|
||||||
|
taosRemoveDir(pWalPath);
|
||||||
|
|
||||||
|
SWalCfg walCfg;
|
||||||
|
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||||
|
walCfg.vgId = 1000;
|
||||||
|
walCfg.fsyncPeriod = 1000;
|
||||||
|
walCfg.retentionPeriod = 1000;
|
||||||
|
walCfg.rollPeriod = 1000;
|
||||||
|
walCfg.retentionSize = 1000;
|
||||||
|
walCfg.segSize = 1000;
|
||||||
|
walCfg.level = TAOS_WAL_FSYNC;
|
||||||
|
pWal = walOpen(pWalPath, &walCfg);
|
||||||
|
assert(pWal != NULL);
|
||||||
|
|
||||||
|
pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
|
||||||
|
memset(pSyncNode, 0, sizeof(SSyncNode));
|
||||||
|
pSyncNode->pWal = pWal;
|
||||||
|
}
|
||||||
|
|
||||||
|
void cleanup() {
|
||||||
|
walClose(pWal);
|
||||||
|
walCleanUp();
|
||||||
|
taosMemoryFree(pSyncNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void logStoreTest() {
|
||||||
|
pLogStore = logStoreCreate(pSyncNode);
|
||||||
|
assert(pLogStore);
|
||||||
|
assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_INVALID);
|
||||||
|
|
||||||
|
logStoreLog2((char*)"logStoreTest", pLogStore);
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
int32_t dataLen = 10;
|
||||||
|
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
|
||||||
|
assert(pEntry != NULL);
|
||||||
|
pEntry->msgType = 1;
|
||||||
|
pEntry->originalRpcType = 2;
|
||||||
|
pEntry->seqNum = 3;
|
||||||
|
pEntry->isWeak = true;
|
||||||
|
pEntry->term = 100 + i;
|
||||||
|
pEntry->index = pLogStore->getLastIndex(pLogStore) + 1;
|
||||||
|
snprintf(pEntry->data, dataLen, "value%d", i);
|
||||||
|
|
||||||
|
syncEntryLog2((char*)"==write entry== :", pEntry);
|
||||||
|
pLogStore->appendEntry(pLogStore, pEntry);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
|
||||||
|
if (i == 0) {
|
||||||
|
assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_BEGIN);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logStoreLog2((char*)"after appendEntry", pLogStore);
|
||||||
|
|
||||||
|
pLogStore->truncate(pLogStore, 3);
|
||||||
|
logStoreLog2((char*)"after truncate 3", pLogStore);
|
||||||
|
|
||||||
|
logStoreDestory(pLogStore);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||||
|
|
||||||
|
init();
|
||||||
|
logStoreTest();
|
||||||
|
|
||||||
|
taosMsleep(2000);
|
||||||
|
cleanup();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -0,0 +1,98 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncEnv.h"
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftLog.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* pSyncNode;
|
||||||
|
SWal* pWal;
|
||||||
|
SSyncLogStore* pLogStore;
|
||||||
|
const char* pWalPath = "./syncLogStoreTest_wal";
|
||||||
|
|
||||||
|
void init() {
|
||||||
|
walInit();
|
||||||
|
taosRemoveDir(pWalPath);
|
||||||
|
|
||||||
|
SWalCfg walCfg;
|
||||||
|
memset(&walCfg, 0, sizeof(SWalCfg));
|
||||||
|
walCfg.vgId = 1000;
|
||||||
|
walCfg.fsyncPeriod = 1000;
|
||||||
|
walCfg.retentionPeriod = 1000;
|
||||||
|
walCfg.rollPeriod = 1000;
|
||||||
|
walCfg.retentionSize = 1000;
|
||||||
|
walCfg.segSize = 1000;
|
||||||
|
walCfg.level = TAOS_WAL_FSYNC;
|
||||||
|
pWal = walOpen(pWalPath, &walCfg);
|
||||||
|
assert(pWal != NULL);
|
||||||
|
|
||||||
|
pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
|
||||||
|
memset(pSyncNode, 0, sizeof(SSyncNode));
|
||||||
|
pSyncNode->pWal = pWal;
|
||||||
|
}
|
||||||
|
|
||||||
|
void cleanup() {
|
||||||
|
walClose(pWal);
|
||||||
|
walCleanUp();
|
||||||
|
taosMemoryFree(pSyncNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void logStoreTest() {
|
||||||
|
pLogStore = logStoreCreate(pSyncNode);
|
||||||
|
assert(pLogStore);
|
||||||
|
assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_INVALID);
|
||||||
|
|
||||||
|
logStoreLog2((char*)"logStoreTest", pLogStore);
|
||||||
|
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
int32_t dataLen = 10;
|
||||||
|
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
|
||||||
|
assert(pEntry != NULL);
|
||||||
|
pEntry->msgType = 1;
|
||||||
|
pEntry->originalRpcType = 2;
|
||||||
|
pEntry->seqNum = 3;
|
||||||
|
pEntry->isWeak = true;
|
||||||
|
pEntry->term = 100 + i;
|
||||||
|
pEntry->index = pLogStore->getLastIndex(pLogStore) + 1;
|
||||||
|
snprintf(pEntry->data, dataLen, "value%d", i);
|
||||||
|
|
||||||
|
syncEntryLog2((char*)"==write entry== :", pEntry);
|
||||||
|
pLogStore->appendEntry(pLogStore, pEntry);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
|
||||||
|
if (i == 0) {
|
||||||
|
assert(pLogStore->getLastIndex(pLogStore) == SYNC_INDEX_BEGIN);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
logStoreLog2((char*)"after appendEntry", pLogStore);
|
||||||
|
|
||||||
|
pLogStore->truncate(pLogStore, 3);
|
||||||
|
logStoreLog2((char*)"after truncate 3", pLogStore);
|
||||||
|
|
||||||
|
logStoreDestory(pLogStore);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||||
|
|
||||||
|
init();
|
||||||
|
logStoreTest();
|
||||||
|
|
||||||
|
taosMsleep(2000);
|
||||||
|
cleanup();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -184,7 +184,8 @@ SWal* createWal(char* path, int32_t vgId) {
|
||||||
return pWal;
|
return pWal;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy, bool enableSnapshot) {
|
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy,
|
||||||
|
bool enableSnapshot) {
|
||||||
SSyncInfo syncInfo;
|
SSyncInfo syncInfo;
|
||||||
syncInfo.vgId = vgId;
|
syncInfo.vgId = vgId;
|
||||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||||
|
|
Loading…
Reference in New Issue