refactor(sync): add entries cache
This commit is contained in:
parent
486f268353
commit
49ff32cda5
|
@ -56,6 +56,31 @@ void syncEntryPrint2(char* s, const SSyncRaftEntry* pObj);
|
||||||
void syncEntryLog(const SSyncRaftEntry* pObj);
|
void syncEntryLog(const SSyncRaftEntry* pObj);
|
||||||
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj);
|
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj);
|
||||||
|
|
||||||
|
//-----------------------------------
|
||||||
|
typedef struct SRaftEntryCache {
|
||||||
|
SHashObj* pEntryHash;
|
||||||
|
int32_t maxCount;
|
||||||
|
int32_t currentCount;
|
||||||
|
TdThreadMutex mutex;
|
||||||
|
SSyncNode* pSyncNode;
|
||||||
|
} SRaftEntryCache;
|
||||||
|
|
||||||
|
SRaftEntryCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount);
|
||||||
|
void raftCacheDestroy(SRaftEntryCache* pCache);
|
||||||
|
int32_t raftCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry);
|
||||||
|
int32_t raftCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
|
int32_t raftCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
|
int32_t raftCacheDelEntry(struct SRaftEntryCache* pCache, SyncIndex index);
|
||||||
|
int32_t raftCacheGetAndDel(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
|
int32_t raftCacheClear(struct SRaftEntryCache* pCache);
|
||||||
|
|
||||||
|
cJSON* raftCache2Json(SRaftEntryCache* pObj);
|
||||||
|
char* raftCache2Str(SRaftEntryCache* pObj);
|
||||||
|
void raftCachePrint(SRaftEntryCache* pObj);
|
||||||
|
void raftCachePrint2(char* s, SRaftEntryCache* pObj);
|
||||||
|
void raftCacheLog(SRaftEntryCache* pObj);
|
||||||
|
void raftCacheLog2(char* s, SRaftEntryCache* pObj);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1355,11 +1355,16 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) {
|
||||||
int32_t userStrLen = strlen(str);
|
int32_t userStrLen = strlen(str);
|
||||||
|
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
if (pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot);
|
||||||
}
|
}
|
||||||
SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
|
||||||
SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
SyncIndex logLastIndex = SYNC_INDEX_INVALID;
|
||||||
|
SyncIndex logBeginIndex = SYNC_INDEX_INVALID;
|
||||||
|
if (pSyncNode->pLogStore != NULL) {
|
||||||
|
logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
||||||
|
logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore);
|
||||||
|
}
|
||||||
|
|
||||||
char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
|
char* pCfgStr = syncCfg2SimpleStr(&(pSyncNode->pRaftCfg->cfg));
|
||||||
char* printStr = "";
|
char* printStr = "";
|
||||||
|
|
|
@ -180,3 +180,243 @@ void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) {
|
||||||
sTrace("syncEntryLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncEntryLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//-----------------------------------
|
||||||
|
SRaftEntryCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
||||||
|
SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
|
||||||
|
if (pCache == NULL) {
|
||||||
|
sError("vgId:%d raft cache create error", pSyncNode->vgId);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCache->pEntryHash =
|
||||||
|
taosHashInit(sizeof(SyncIndex), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||||
|
if (pCache->pEntryHash == NULL) {
|
||||||
|
sError("vgId:%d raft cache create hash error", pSyncNode->vgId);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexInit(&(pCache->mutex), NULL);
|
||||||
|
pCache->maxCount = maxCount;
|
||||||
|
pCache->currentCount = 0;
|
||||||
|
pCache->pSyncNode = pSyncNode;
|
||||||
|
|
||||||
|
return pCache;
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftCacheDestroy(SRaftEntryCache* pCache) {
|
||||||
|
if (pCache != NULL) {
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
taosHashCleanup(pCache->pEntryHash);
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
taosThreadMutexDestroy(&(pCache->mutex));
|
||||||
|
taosMemoryFree(pCache);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// success, return 1
|
||||||
|
// max count, return 0
|
||||||
|
// error, return -1
|
||||||
|
int32_t raftCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) {
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
|
||||||
|
if (pCache->currentCount >= pCache->maxCount) {
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashPut(pCache->pEntryHash, &(pEntry->index), sizeof(pEntry->index), pEntry, pEntry->bytes);
|
||||||
|
++(pCache->currentCount);
|
||||||
|
|
||||||
|
do {
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "raft cache add, type:%s,%d, type2:%s,%d, index:%ld, bytes:%d",
|
||||||
|
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
|
||||||
|
pEntry->index, pEntry->bytes);
|
||||||
|
syncNodeEventLog(pCache->pSyncNode, eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// success, return 0
|
||||||
|
// error, return -1
|
||||||
|
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
||||||
|
int32_t raftCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||||
|
if (ppEntry == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*ppEntry = NULL;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
|
||||||
|
if (pTmp != NULL) {
|
||||||
|
SSyncRaftEntry* pEntry = pTmp;
|
||||||
|
*ppEntry = taosMemoryMalloc(pEntry->bytes);
|
||||||
|
memcpy(*ppEntry, pTmp, pEntry->bytes);
|
||||||
|
|
||||||
|
do {
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "raft cache get, type:%s,%d, type2:%s,%d, index:%ld",
|
||||||
|
TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
|
||||||
|
(*ppEntry)->originalRpcType, (*ppEntry)->index);
|
||||||
|
syncNodeEventLog(pCache->pSyncNode, eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// success, return 0
|
||||||
|
// error, return -1
|
||||||
|
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
||||||
|
int32_t raftCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||||
|
if (ppEntry == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*ppEntry = NULL;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
|
||||||
|
if (pTmp != NULL) {
|
||||||
|
SSyncRaftEntry* pEntry = pTmp;
|
||||||
|
*ppEntry = pEntry;
|
||||||
|
|
||||||
|
do {
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "raft cache get, type:%s,%d, type2:%s,%d, index:%ld",
|
||||||
|
TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
|
||||||
|
(*ppEntry)->originalRpcType, (*ppEntry)->index);
|
||||||
|
syncNodeEventLog(pCache->pSyncNode, eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t raftCacheDelEntry(struct SRaftEntryCache* pCache, SyncIndex index) {
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t raftCacheGetAndDel(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||||
|
if (ppEntry == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
*ppEntry = NULL;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
|
||||||
|
if (pTmp != NULL) {
|
||||||
|
SSyncRaftEntry* pEntry = pTmp;
|
||||||
|
*ppEntry = taosMemoryMalloc(pEntry->bytes);
|
||||||
|
memcpy(*ppEntry, pTmp, pEntry->bytes);
|
||||||
|
|
||||||
|
do {
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "raft cache get-and-del, type:%s,%d, type2:%s,%d, index:%ld",
|
||||||
|
TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
|
||||||
|
(*ppEntry)->originalRpcType, (*ppEntry)->index);
|
||||||
|
syncNodeEventLog(pCache->pSyncNode, eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t raftCacheClear(struct SRaftEntryCache* pCache) {
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
taosHashClear(pCache->pEntryHash);
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
//-----------------------------------
|
||||||
|
cJSON* raftCache2Json(SRaftEntryCache* pCache) {
|
||||||
|
char u64buf[128] = {0};
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pCache != NULL) {
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pCache->pSyncNode);
|
||||||
|
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "currentCount", pCache->currentCount);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "maxCount", pCache->maxCount);
|
||||||
|
cJSON* pEntries = cJSON_CreateArray();
|
||||||
|
cJSON_AddItemToObject(pRoot, "entries", pEntries);
|
||||||
|
|
||||||
|
SSyncRaftEntry* pIter = (SSyncRaftEntry*)taosHashIterate(pCache->pEntryHash, NULL);
|
||||||
|
if (pIter != NULL) {
|
||||||
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pIter;
|
||||||
|
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
|
||||||
|
}
|
||||||
|
while (pIter) {
|
||||||
|
pIter = taosHashIterate(pCache->pEntryHash, pIter);
|
||||||
|
if (pIter != NULL) {
|
||||||
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pIter;
|
||||||
|
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SRaftEntryCache", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* raftCache2Str(SRaftEntryCache* pCache) {
|
||||||
|
cJSON* pJson = raftCache2Json(pCache);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftCachePrint(SRaftEntryCache* pCache) {
|
||||||
|
char* serialized = raftCache2Str(pCache);
|
||||||
|
printf("raftCachePrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftCachePrint2(char* s, SRaftEntryCache* pCache) {
|
||||||
|
char* serialized = raftCache2Str(pCache);
|
||||||
|
printf("raftCachePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftCacheLog(SRaftEntryCache* pCache) {
|
||||||
|
char* serialized = raftCache2Str(pCache);
|
||||||
|
sTrace("raftCacheLog | len:%lu | %s", strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftCacheLog2(char* s, SRaftEntryCache* pCache) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
char* serialized = raftCache2Str(pCache);
|
||||||
|
sTraceLong("raftCacheLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
}
|
|
@ -32,11 +32,13 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
||||||
taosThreadMutexLock(&(pObj->mutex));
|
if (pObj != NULL) {
|
||||||
taosHashCleanup(pObj->pRespHash);
|
taosThreadMutexLock(&(pObj->mutex));
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
taosHashCleanup(pObj->pRespHash);
|
||||||
taosThreadMutexDestroy(&(pObj->mutex));
|
taosThreadMutexUnlock(&(pObj->mutex));
|
||||||
taosMemoryFree(pObj);
|
taosThreadMutexDestroy(&(pObj->mutex));
|
||||||
|
taosMemoryFree(pObj);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
|
int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
|
||||||
|
|
|
@ -17,6 +17,7 @@ add_executable(syncVotesRespondTest "")
|
||||||
add_executable(syncIndexMgrTest "")
|
add_executable(syncIndexMgrTest "")
|
||||||
add_executable(syncLogStoreTest "")
|
add_executable(syncLogStoreTest "")
|
||||||
add_executable(syncEntryTest "")
|
add_executable(syncEntryTest "")
|
||||||
|
add_executable(syncEntryCacheTest "")
|
||||||
add_executable(syncRequestVoteTest "")
|
add_executable(syncRequestVoteTest "")
|
||||||
add_executable(syncRequestVoteReplyTest "")
|
add_executable(syncRequestVoteReplyTest "")
|
||||||
add_executable(syncAppendEntriesTest "")
|
add_executable(syncAppendEntriesTest "")
|
||||||
|
@ -129,6 +130,10 @@ target_sources(syncEntryTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"syncEntryTest.cpp"
|
"syncEntryTest.cpp"
|
||||||
)
|
)
|
||||||
|
target_sources(syncEntryCacheTest
|
||||||
|
PRIVATE
|
||||||
|
"syncEntryCacheTest.cpp"
|
||||||
|
)
|
||||||
target_sources(syncRequestVoteTest
|
target_sources(syncRequestVoteTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"syncRequestVoteTest.cpp"
|
"syncRequestVoteTest.cpp"
|
||||||
|
@ -362,6 +367,11 @@ target_include_directories(syncEntryTest
|
||||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
target_include_directories(syncEntryCacheTest
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
target_include_directories(syncRequestVoteTest
|
target_include_directories(syncRequestVoteTest
|
||||||
PUBLIC
|
PUBLIC
|
||||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
|
@ -610,6 +620,10 @@ target_link_libraries(syncEntryTest
|
||||||
sync
|
sync
|
||||||
gtest_main
|
gtest_main
|
||||||
)
|
)
|
||||||
|
target_link_libraries(syncEntryCacheTest
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
target_link_libraries(syncRequestVoteTest
|
target_link_libraries(syncRequestVoteTest
|
||||||
sync
|
sync
|
||||||
gtest_main
|
gtest_main
|
||||||
|
|
|
@ -0,0 +1,162 @@
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncEnv.h"
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftLog.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncRaftEntry* createEntry(int i) {
|
||||||
|
int32_t dataLen = 20;
|
||||||
|
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
|
||||||
|
assert(pEntry != NULL);
|
||||||
|
pEntry->msgType = 88;
|
||||||
|
pEntry->originalRpcType = 99;
|
||||||
|
pEntry->seqNum = 3;
|
||||||
|
pEntry->isWeak = true;
|
||||||
|
pEntry->term = 100 + i;
|
||||||
|
pEntry->index = i;
|
||||||
|
snprintf(pEntry->data, dataLen, "value%d", i);
|
||||||
|
|
||||||
|
return pEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* createFakeNode() {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
|
||||||
|
ASSERT(pSyncNode != NULL);
|
||||||
|
memset(pSyncNode, 0, sizeof(SSyncNode));
|
||||||
|
|
||||||
|
return pSyncNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRaftEntryCache* createCache(int maxCount) {
|
||||||
|
SSyncNode* pSyncNode = createFakeNode();
|
||||||
|
ASSERT(pSyncNode != NULL);
|
||||||
|
|
||||||
|
SRaftEntryCache* pCache = raftCacheCreate(pSyncNode, maxCount);
|
||||||
|
ASSERT(pCache != NULL);
|
||||||
|
|
||||||
|
return pCache;
|
||||||
|
}
|
||||||
|
|
||||||
|
void test1() {
|
||||||
|
int32_t code = 0;
|
||||||
|
SRaftEntryCache* pCache = createCache(5);
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
code = raftCachePutEntry(pCache, pEntry);
|
||||||
|
ASSERT(code == 1);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
raftCacheLog2((char*)"==test1 write 5 entries==", pCache);
|
||||||
|
|
||||||
|
SyncIndex index;
|
||||||
|
index = 1;
|
||||||
|
code = raftCacheDelEntry(pCache, index);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
index = 3;
|
||||||
|
code = raftCacheDelEntry(pCache, index);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
raftCacheLog2((char*)"==test1 delete 1,3==", pCache);
|
||||||
|
|
||||||
|
code = raftCacheClear(pCache);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
raftCacheLog2((char*)"==clear all==", pCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test2() {
|
||||||
|
int32_t code = 0;
|
||||||
|
SRaftEntryCache* pCache = createCache(5);
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
code = raftCachePutEntry(pCache, pEntry);
|
||||||
|
ASSERT(code == 1);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
raftCacheLog2((char*)"==test2 write 5 entries==", pCache);
|
||||||
|
|
||||||
|
SyncIndex index;
|
||||||
|
index = 1;
|
||||||
|
SSyncRaftEntry* pEntry;
|
||||||
|
code = raftCacheGetEntry(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
syncEntryLog2((char*)"==test2 get entry 1==", pEntry);
|
||||||
|
|
||||||
|
index = 2;
|
||||||
|
code = raftCacheGetEntryP(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
syncEntryLog2((char*)"==test2 get entry pointer 2==", pEntry);
|
||||||
|
|
||||||
|
// not found
|
||||||
|
index = 8;
|
||||||
|
code = raftCacheGetEntry(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||||
|
sTrace("==test2 get entry 8 not found==");
|
||||||
|
|
||||||
|
// not found
|
||||||
|
index = 9;
|
||||||
|
code = raftCacheGetEntryP(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||||
|
sTrace("==test2 get entry pointer 9 not found==");
|
||||||
|
}
|
||||||
|
|
||||||
|
void test3() {
|
||||||
|
int32_t code = 0;
|
||||||
|
SRaftEntryCache* pCache = createCache(5);
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
code = raftCachePutEntry(pCache, pEntry);
|
||||||
|
ASSERT(code == 1);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
for (int i = 6; i < 10; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
code = raftCachePutEntry(pCache, pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
raftCacheLog2((char*)"==test3 write 10 entries, max count is 5==", pCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test4() {
|
||||||
|
int32_t code = 0;
|
||||||
|
SRaftEntryCache* pCache = createCache(5);
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
code = raftCachePutEntry(pCache, pEntry);
|
||||||
|
ASSERT(code == 1);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
raftCacheLog2((char*)"==test4 write 5 entries==", pCache);
|
||||||
|
|
||||||
|
SyncIndex index;
|
||||||
|
index = 3;
|
||||||
|
SSyncRaftEntry* pEntry;
|
||||||
|
code = raftCacheGetAndDel(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
syncEntryLog2((char*)"==test4 get-and-del entry 3==", pEntry);
|
||||||
|
raftCacheLog2((char*)"==test4 after get-and-del entry 3==", pCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
gRaftDetailLog = true;
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG;
|
||||||
|
|
||||||
|
test1();
|
||||||
|
test2();
|
||||||
|
test3();
|
||||||
|
test4();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue