Merge pull request #14857 from taosdata/feature/3.0_mhli
refactor(sync): add skiplist entry cache
This commit is contained in:
commit
60703bdd05
|
@ -26,6 +26,8 @@ extern "C" {
|
||||||
|
|
||||||
extern bool gRaftDetailLog;
|
extern bool gRaftDetailLog;
|
||||||
|
|
||||||
|
#define SYNC_RESP_TTL_MS 5000
|
||||||
|
|
||||||
#define SYNC_MAX_BATCH_SIZE 500
|
#define SYNC_MAX_BATCH_SIZE 500
|
||||||
#define SYNC_INDEX_BEGIN 0
|
#define SYNC_INDEX_BEGIN 0
|
||||||
#define SYNC_INDEX_INVALID -1
|
#define SYNC_INDEX_INVALID -1
|
||||||
|
|
|
@ -427,6 +427,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
||||||
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
||||||
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
|
if (cbMeta.code == 0) {
|
||||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||||
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
|
||||||
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
|
||||||
|
@ -434,6 +435,14 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
||||||
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
||||||
rpcMsg.info.conn.applyTerm = cbMeta.term;
|
rpcMsg.info.conn.applyTerm = cbMeta.term;
|
||||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||||
|
} else {
|
||||||
|
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
|
||||||
|
vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), pMsg->msgType,
|
||||||
|
TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
|
||||||
|
if (rsp.info.handle != NULL) {
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
|
|
|
@ -26,6 +26,7 @@ extern "C" {
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
#include "syncMessage.h"
|
#include "syncMessage.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
#include "tskiplist.h"
|
||||||
|
|
||||||
typedef struct SSyncRaftEntry {
|
typedef struct SSyncRaftEntry {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
|
@ -58,29 +59,52 @@ void syncEntryLog(const SSyncRaftEntry* pObj);
|
||||||
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj);
|
void syncEntryLog2(char* s, const SSyncRaftEntry* pObj);
|
||||||
|
|
||||||
//-----------------------------------
|
//-----------------------------------
|
||||||
typedef struct SRaftEntryCache {
|
typedef struct SRaftEntryHashCache {
|
||||||
SHashObj* pEntryHash;
|
SHashObj* pEntryHash;
|
||||||
int32_t maxCount;
|
int32_t maxCount;
|
||||||
int32_t currentCount;
|
int32_t currentCount;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
SSyncNode* pSyncNode;
|
SSyncNode* pSyncNode;
|
||||||
|
} SRaftEntryHashCache;
|
||||||
|
|
||||||
|
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount);
|
||||||
|
void raftCacheDestroy(SRaftEntryHashCache* pCache);
|
||||||
|
int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry);
|
||||||
|
int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
|
int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
|
int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index);
|
||||||
|
int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
|
int32_t raftCacheClear(struct SRaftEntryHashCache* pCache);
|
||||||
|
|
||||||
|
cJSON* raftCache2Json(SRaftEntryHashCache* pObj);
|
||||||
|
char* raftCache2Str(SRaftEntryHashCache* pObj);
|
||||||
|
void raftCachePrint(SRaftEntryHashCache* pObj);
|
||||||
|
void raftCachePrint2(char* s, SRaftEntryHashCache* pObj);
|
||||||
|
void raftCacheLog(SRaftEntryHashCache* pObj);
|
||||||
|
void raftCacheLog2(char* s, SRaftEntryHashCache* pObj);
|
||||||
|
|
||||||
|
//-----------------------------------
|
||||||
|
typedef struct SRaftEntryCache {
|
||||||
|
SSkipList* pSkipList;
|
||||||
|
int32_t maxCount;
|
||||||
|
int32_t currentCount;
|
||||||
|
TdThreadMutex mutex;
|
||||||
|
SSyncNode* pSyncNode;
|
||||||
} SRaftEntryCache;
|
} SRaftEntryCache;
|
||||||
|
|
||||||
SRaftEntryCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount);
|
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount);
|
||||||
void raftCacheDestroy(SRaftEntryCache* pCache);
|
void raftEntryCacheDestroy(SRaftEntryCache* pCache);
|
||||||
int32_t raftCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry);
|
int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry);
|
||||||
int32_t raftCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
int32_t raftCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
int32_t raftCacheDelEntry(struct SRaftEntryCache* pCache, SyncIndex index);
|
int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count);
|
||||||
int32_t raftCacheGetAndDel(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
|
||||||
int32_t raftCacheClear(struct SRaftEntryCache* pCache);
|
|
||||||
|
|
||||||
cJSON* raftCache2Json(SRaftEntryCache* pObj);
|
cJSON* raftEntryCache2Json(SRaftEntryCache* pObj);
|
||||||
char* raftCache2Str(SRaftEntryCache* pObj);
|
char* raftEntryCache2Str(SRaftEntryCache* pObj);
|
||||||
void raftCachePrint(SRaftEntryCache* pObj);
|
void raftEntryCachePrint(SRaftEntryCache* pObj);
|
||||||
void raftCachePrint2(char* s, SRaftEntryCache* pObj);
|
void raftEntryCachePrint2(char* s, SRaftEntryCache* pObj);
|
||||||
void raftCacheLog(SRaftEntryCache* pObj);
|
void raftEntryCacheLog(SRaftEntryCache* pObj);
|
||||||
void raftCacheLog2(char* s, SRaftEntryCache* pObj);
|
void raftEntryCacheLog2(char* s, SRaftEntryCache* pObj);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1055,19 +1055,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// tools
|
// tools
|
||||||
pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, 0);
|
pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS);
|
||||||
ASSERT(pSyncNode->pSyncRespMgr != NULL);
|
ASSERT(pSyncNode->pSyncRespMgr != NULL);
|
||||||
|
|
||||||
// restore state
|
// restore state
|
||||||
pSyncNode->restoreFinish = false;
|
pSyncNode->restoreFinish = false;
|
||||||
|
|
||||||
// pSyncNode->pSnapshot = NULL;
|
|
||||||
// if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) {
|
|
||||||
// pSyncNode->pSnapshot = taosMemoryMalloc(sizeof(SSnapshot));
|
|
||||||
// pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pSyncNode->pSnapshot);
|
|
||||||
// }
|
|
||||||
// tsem_init(&(pSyncNode->restoreSem), 0, 0);
|
|
||||||
|
|
||||||
// snapshot senders
|
// snapshot senders
|
||||||
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
|
SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i);
|
||||||
|
|
|
@ -198,8 +198,8 @@ void syncEntryLog2(char* s, const SSyncRaftEntry* pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//-----------------------------------
|
//-----------------------------------
|
||||||
SRaftEntryCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
||||||
SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
|
SRaftEntryHashCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryHashCache));
|
||||||
if (pCache == NULL) {
|
if (pCache == NULL) {
|
||||||
sError("vgId:%d raft cache create error", pSyncNode->vgId);
|
sError("vgId:%d raft cache create error", pSyncNode->vgId);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -220,7 +220,7 @@ SRaftEntryCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
||||||
return pCache;
|
return pCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
void raftCacheDestroy(SRaftEntryCache* pCache) {
|
void raftCacheDestroy(SRaftEntryHashCache* pCache) {
|
||||||
if (pCache != NULL) {
|
if (pCache != NULL) {
|
||||||
taosThreadMutexLock(&(pCache->mutex));
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
taosHashCleanup(pCache->pEntryHash);
|
taosHashCleanup(pCache->pEntryHash);
|
||||||
|
@ -233,7 +233,7 @@ void raftCacheDestroy(SRaftEntryCache* pCache) {
|
||||||
// success, return 1
|
// success, return 1
|
||||||
// max count, return 0
|
// max count, return 0
|
||||||
// error, return -1
|
// error, return -1
|
||||||
int32_t raftCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) {
|
int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry) {
|
||||||
taosThreadMutexLock(&(pCache->mutex));
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
|
||||||
if (pCache->currentCount >= pCache->maxCount) {
|
if (pCache->currentCount >= pCache->maxCount) {
|
||||||
|
@ -259,7 +259,7 @@ int32_t raftCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry
|
||||||
// success, return 0
|
// success, return 0
|
||||||
// error, return -1
|
// error, return -1
|
||||||
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
||||||
int32_t raftCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||||
if (ppEntry == NULL) {
|
if (ppEntry == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -292,7 +292,7 @@ int32_t raftCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSync
|
||||||
// success, return 0
|
// success, return 0
|
||||||
// error, return -1
|
// error, return -1
|
||||||
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
||||||
int32_t raftCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||||
if (ppEntry == NULL) {
|
if (ppEntry == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -321,7 +321,7 @@ int32_t raftCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyn
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftCacheDelEntry(struct SRaftEntryCache* pCache, SyncIndex index) {
|
int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index) {
|
||||||
taosThreadMutexLock(&(pCache->mutex));
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
|
taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
|
||||||
--(pCache->currentCount);
|
--(pCache->currentCount);
|
||||||
|
@ -329,7 +329,7 @@ int32_t raftCacheDelEntry(struct SRaftEntryCache* pCache, SyncIndex index) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftCacheGetAndDel(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||||
if (ppEntry == NULL) {
|
if (ppEntry == NULL) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -362,7 +362,7 @@ int32_t raftCacheGetAndDel(struct SRaftEntryCache* pCache, SyncIndex index, SSyn
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t raftCacheClear(struct SRaftEntryCache* pCache) {
|
int32_t raftCacheClear(struct SRaftEntryHashCache* pCache) {
|
||||||
taosThreadMutexLock(&(pCache->mutex));
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
taosHashClear(pCache->pEntryHash);
|
taosHashClear(pCache->pEntryHash);
|
||||||
pCache->currentCount = 0;
|
pCache->currentCount = 0;
|
||||||
|
@ -371,7 +371,7 @@ int32_t raftCacheClear(struct SRaftEntryCache* pCache) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//-----------------------------------
|
//-----------------------------------
|
||||||
cJSON* raftCache2Json(SRaftEntryCache* pCache) {
|
cJSON* raftCache2Json(SRaftEntryHashCache* pCache) {
|
||||||
char u64buf[128] = {0};
|
char u64buf[128] = {0};
|
||||||
cJSON* pRoot = cJSON_CreateObject();
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
@ -402,41 +402,283 @@ cJSON* raftCache2Json(SRaftEntryCache* pCache) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* pJson = cJSON_CreateObject();
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
cJSON_AddItemToObject(pJson, "SRaftEntryCache", pRoot);
|
cJSON_AddItemToObject(pJson, "SRaftEntryHashCache", pRoot);
|
||||||
return pJson;
|
return pJson;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* raftCache2Str(SRaftEntryCache* pCache) {
|
char* raftCache2Str(SRaftEntryHashCache* pCache) {
|
||||||
cJSON* pJson = raftCache2Json(pCache);
|
cJSON* pJson = raftCache2Json(pCache);
|
||||||
char* serialized = cJSON_Print(pJson);
|
char* serialized = cJSON_Print(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
|
||||||
void raftCachePrint(SRaftEntryCache* pCache) {
|
void raftCachePrint(SRaftEntryHashCache* pCache) {
|
||||||
char* serialized = raftCache2Str(pCache);
|
char* serialized = raftCache2Str(pCache);
|
||||||
printf("raftCachePrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized);
|
printf("raftCachePrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized);
|
||||||
fflush(NULL);
|
fflush(NULL);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void raftCachePrint2(char* s, SRaftEntryCache* pCache) {
|
void raftCachePrint2(char* s, SRaftEntryHashCache* pCache) {
|
||||||
char* serialized = raftCache2Str(pCache);
|
char* serialized = raftCache2Str(pCache);
|
||||||
printf("raftCachePrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized);
|
printf("raftCachePrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
fflush(NULL);
|
fflush(NULL);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void raftCacheLog(SRaftEntryCache* pCache) {
|
void raftCacheLog(SRaftEntryHashCache* pCache) {
|
||||||
char* serialized = raftCache2Str(pCache);
|
char* serialized = raftCache2Str(pCache);
|
||||||
sTrace("raftCacheLog | len:%" PRIu64 " | %s", strlen(serialized), serialized);
|
sTrace("raftCacheLog | len:%" PRIu64 " | %s", strlen(serialized), serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
void raftCacheLog2(char* s, SRaftEntryCache* pCache) {
|
void raftCacheLog2(char* s, SRaftEntryHashCache* pCache) {
|
||||||
if (gRaftDetailLog) {
|
if (gRaftDetailLog) {
|
||||||
char* serialized = raftCache2Str(pCache);
|
char* serialized = raftCache2Str(pCache);
|
||||||
sTraceLong("raftCacheLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized);
|
sTraceLong("raftCacheLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//-----------------------------------
|
||||||
|
static char* keyFn(const void* pData) {
|
||||||
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
|
||||||
|
return (char*)(&(pEntry->index));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }
|
||||||
|
|
||||||
|
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
||||||
|
SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
|
||||||
|
if (pCache == NULL) {
|
||||||
|
sError("vgId:%d raft cache create error", pSyncNode->vgId);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCache->pSkipList =
|
||||||
|
tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
|
||||||
|
if (pCache->pSkipList == NULL) {
|
||||||
|
sError("vgId:%d raft cache create hash error", pSyncNode->vgId);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexInit(&(pCache->mutex), NULL);
|
||||||
|
pCache->maxCount = maxCount;
|
||||||
|
pCache->currentCount = 0;
|
||||||
|
pCache->pSyncNode = pSyncNode;
|
||||||
|
|
||||||
|
return pCache;
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftEntryCacheDestroy(SRaftEntryCache* pCache) {
|
||||||
|
if (pCache != NULL) {
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
tSkipListDestroy(pCache->pSkipList);
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
taosThreadMutexDestroy(&(pCache->mutex));
|
||||||
|
taosMemoryFree(pCache);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// success, return 1
|
||||||
|
// max count, return 0
|
||||||
|
// error, return -1
|
||||||
|
int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) {
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
|
||||||
|
if (pCache->currentCount >= pCache->maxCount) {
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSkipListNode* pSkipListNode = tSkipListPut(pCache->pSkipList, pEntry);
|
||||||
|
ASSERT(pSkipListNode != NULL);
|
||||||
|
++(pCache->currentCount);
|
||||||
|
|
||||||
|
do {
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
|
||||||
|
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
|
||||||
|
pEntry->index, pEntry->bytes);
|
||||||
|
syncNodeEventLog(pCache->pSyncNode, eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// find one, return 1
|
||||||
|
// not found, return 0
|
||||||
|
// error, return -1
|
||||||
|
int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||||
|
ASSERT(ppEntry != NULL);
|
||||||
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
|
int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
|
||||||
|
if (code == 1) {
|
||||||
|
*ppEntry = taosMemoryMalloc(pEntry->bytes);
|
||||||
|
memcpy(*ppEntry, pEntry, pEntry->bytes);
|
||||||
|
} else {
|
||||||
|
*ppEntry = NULL;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// find one, return 1
|
||||||
|
// not found, return 0
|
||||||
|
// error, return -1
|
||||||
|
int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
|
||||||
|
SyncIndex index2 = index;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SArray* entryPArray = tSkipListGet(pCache->pSkipList, (char*)(&index2));
|
||||||
|
int32_t arraySize = taosArrayGetSize(entryPArray);
|
||||||
|
if (arraySize == 1) {
|
||||||
|
SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
|
||||||
|
ASSERT(*ppNode != NULL);
|
||||||
|
*ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
|
||||||
|
code = 1;
|
||||||
|
|
||||||
|
} else if (arraySize == 0) {
|
||||||
|
code = 0;
|
||||||
|
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
taosArrayDestroy(entryPArray);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// count = -1, clear all
|
||||||
|
// count >= 0, clear count
|
||||||
|
// return -1, error
|
||||||
|
// return delete count
|
||||||
|
int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
int32_t returnCnt = 0;
|
||||||
|
|
||||||
|
if (count == -1) {
|
||||||
|
// clear all
|
||||||
|
SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
|
||||||
|
while (tSkipListIterNext(pIter)) {
|
||||||
|
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||||
|
ASSERT(pNode != NULL);
|
||||||
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
++returnCnt;
|
||||||
|
}
|
||||||
|
tSkipListDestroyIter(pIter);
|
||||||
|
|
||||||
|
tSkipListDestroy(pCache->pSkipList);
|
||||||
|
pCache->pSkipList =
|
||||||
|
tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
|
||||||
|
ASSERT(pCache->pSkipList != NULL);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// clear count
|
||||||
|
int i = 0;
|
||||||
|
SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
|
||||||
|
SArray* delNodeArray = taosArrayInit(0, sizeof(SSkipListNode*));
|
||||||
|
|
||||||
|
// free entry
|
||||||
|
while (tSkipListIterNext(pIter)) {
|
||||||
|
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||||
|
ASSERT(pNode != NULL);
|
||||||
|
if (i++ >= count) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// sDebug("push pNode:%p", pNode);
|
||||||
|
taosArrayPush(delNodeArray, &pNode);
|
||||||
|
++returnCnt;
|
||||||
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
tSkipListDestroyIter(pIter);
|
||||||
|
|
||||||
|
// delete skiplist node
|
||||||
|
int32_t arraySize = taosArrayGetSize(delNodeArray);
|
||||||
|
for (int32_t i = 0; i < arraySize; ++i) {
|
||||||
|
SSkipListNode** ppNode = taosArrayGet(delNodeArray, i);
|
||||||
|
// sDebug("get pNode:%p", *ppNode);
|
||||||
|
tSkipListRemoveNode(pCache->pSkipList, *ppNode);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(delNodeArray);
|
||||||
|
}
|
||||||
|
|
||||||
|
pCache->currentCount -= returnCnt;
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
return returnCnt;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* raftEntryCache2Json(SRaftEntryCache* pCache) {
|
||||||
|
char u64buf[128] = {0};
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pCache != NULL) {
|
||||||
|
taosThreadMutexLock(&(pCache->mutex));
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%p", pCache->pSyncNode);
|
||||||
|
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "currentCount", pCache->currentCount);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "maxCount", pCache->maxCount);
|
||||||
|
cJSON* pEntries = cJSON_CreateArray();
|
||||||
|
cJSON_AddItemToObject(pRoot, "entries", pEntries);
|
||||||
|
|
||||||
|
SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
|
||||||
|
while (tSkipListIterNext(pIter)) {
|
||||||
|
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||||
|
ASSERT(pNode != NULL);
|
||||||
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||||
|
cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry));
|
||||||
|
}
|
||||||
|
tSkipListDestroyIter(pIter);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&(pCache->mutex));
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SRaftEntryCache", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* raftEntryCache2Str(SRaftEntryCache* pObj) {
|
||||||
|
cJSON* pJson = raftEntryCache2Json(pObj);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftEntryCachePrint(SRaftEntryCache* pObj) {
|
||||||
|
char* serialized = raftEntryCache2Str(pObj);
|
||||||
|
printf("raftEntryCachePrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftEntryCachePrint2(char* s, SRaftEntryCache* pObj) {
|
||||||
|
char* serialized = raftEntryCache2Str(pObj);
|
||||||
|
printf("raftEntryCachePrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftEntryCacheLog(SRaftEntryCache* pObj) {
|
||||||
|
char* serialized = raftEntryCache2Str(pObj);
|
||||||
|
sTrace("raftEntryCacheLog | len:%" PRIu64 " | %s", strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void raftEntryCacheLog2(char* s, SRaftEntryCache* pObj) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
char* serialized = raftEntryCache2Str(pObj);
|
||||||
|
sTraceLong("raftEntryCacheLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
}
|
|
@ -122,54 +122,45 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
|
||||||
int cnt = 0;
|
int cnt = 0;
|
||||||
SSyncNode *pSyncNode = pObj->data;
|
SSyncNode *pSyncNode = pObj->data;
|
||||||
|
|
||||||
SArray *delIndexArray = taosArrayInit(0, sizeof(SyncIndex));
|
SArray *delIndexArray = taosArrayInit(0, sizeof(uint64_t));
|
||||||
ASSERT(delIndexArray != NULL);
|
ASSERT(delIndexArray != NULL);
|
||||||
|
|
||||||
while (pStub) {
|
while (pStub) {
|
||||||
size_t len;
|
size_t len;
|
||||||
void * key = taosHashGetKey(pStub, &len);
|
void * key = taosHashGetKey(pStub, &len);
|
||||||
SyncIndex *pIndex = (SyncIndex *)key;
|
uint64_t *pSeqNum = (uint64_t *)key;
|
||||||
|
|
||||||
int64_t nowMS = taosGetTimestampMs();
|
int64_t nowMS = taosGetTimestampMs();
|
||||||
if (nowMS - pStub->createTime > ttl) {
|
if (nowMS - pStub->createTime > ttl) {
|
||||||
taosArrayPush(delIndexArray, pIndex);
|
taosArrayPush(delIndexArray, pSeqNum);
|
||||||
cnt++;
|
cnt++;
|
||||||
|
|
||||||
SSyncRaftEntry *pEntry = NULL;
|
|
||||||
int32_t code = 0;
|
|
||||||
if (pSyncNode->pLogStore != NULL) {
|
|
||||||
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, *pIndex, &pEntry);
|
|
||||||
if (code == 0 && pEntry != NULL) {
|
|
||||||
SFsmCbMeta cbMeta = {0};
|
SFsmCbMeta cbMeta = {0};
|
||||||
cbMeta.index = pEntry->index;
|
cbMeta.index = SYNC_INDEX_INVALID;
|
||||||
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, cbMeta.index);
|
cbMeta.lastConfigIndex = SYNC_INDEX_INVALID;
|
||||||
cbMeta.isWeak = pEntry->isWeak;
|
cbMeta.isWeak = false;
|
||||||
cbMeta.code = TSDB_CODE_SYN_TIMEOUT;
|
cbMeta.code = TSDB_CODE_SYN_TIMEOUT;
|
||||||
cbMeta.state = pSyncNode->state;
|
cbMeta.state = pSyncNode->state;
|
||||||
cbMeta.seqNum = pEntry->seqNum;
|
cbMeta.seqNum = *pSeqNum;
|
||||||
cbMeta.term = pEntry->term;
|
cbMeta.term = SYNC_TERM_INVALID;
|
||||||
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
|
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
|
||||||
cbMeta.flag = 0;
|
cbMeta.flag = 0;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = pStub->rpcMsg;
|
pStub->rpcMsg.pCont = NULL;
|
||||||
rpcMsg.pCont = rpcMallocCont(pEntry->dataLen);
|
pStub->rpcMsg.contLen = 0;
|
||||||
memcpy(rpcMsg.pCont, pEntry->data, pEntry->dataLen);
|
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &(pStub->rpcMsg), cbMeta);
|
||||||
pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &rpcMsg, cbMeta);
|
|
||||||
|
|
||||||
syncEntryDestory(pEntry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
|
pStub = (SRespStub *)taosHashIterate(pObj->pRespHash, pStub);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t arraySize = taosArrayGetSize(delIndexArray);
|
int32_t arraySize = taosArrayGetSize(delIndexArray);
|
||||||
sDebug("vgId:%d, resp clean by ttl, cnt:%d, array-size:%d", pSyncNode->vgId, cnt, arraySize);
|
sDebug("vgId:%d, resp mgr clean by ttl, cnt:%d, array-size:%d", pSyncNode->vgId, cnt, arraySize);
|
||||||
|
|
||||||
for (int32_t i = 0; i < arraySize; ++i) {
|
for (int32_t i = 0; i < arraySize; ++i) {
|
||||||
SyncIndex *pIndex = taosArrayGet(delIndexArray, i);
|
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
|
||||||
taosHashRemove(pObj->pRespHash, pIndex, sizeof(SyncIndex));
|
taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
|
||||||
|
sDebug("vgId:%d, resp mgr clean by ttl, seq:%d", pSyncNode->vgId, *pSeqNum);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(delIndexArray);
|
taosArrayDestroy(delIndexArray);
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,9 +16,14 @@
|
||||||
#include "syncTimeout.h"
|
#include "syncTimeout.h"
|
||||||
#include "syncElection.h"
|
#include "syncElection.h"
|
||||||
#include "syncReplication.h"
|
#include "syncReplication.h"
|
||||||
|
#include "syncRespMgr.h"
|
||||||
|
|
||||||
int32_t syncNodeTimerRoutine(SSyncNode* ths) {
|
int32_t syncNodeTimerRoutine(SSyncNode* ths) {
|
||||||
syncNodeEventLog(ths, "timer routines ... ");
|
syncNodeEventLog(ths, "timer routines ... ");
|
||||||
|
|
||||||
|
if (ths->vgId != 1) {
|
||||||
|
syncRespClean(ths->pSyncRespMgr);
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ add_executable(syncIndexMgrTest "")
|
||||||
add_executable(syncLogStoreTest "")
|
add_executable(syncLogStoreTest "")
|
||||||
add_executable(syncEntryTest "")
|
add_executable(syncEntryTest "")
|
||||||
add_executable(syncEntryCacheTest "")
|
add_executable(syncEntryCacheTest "")
|
||||||
|
add_executable(syncHashCacheTest "")
|
||||||
add_executable(syncRequestVoteTest "")
|
add_executable(syncRequestVoteTest "")
|
||||||
add_executable(syncRequestVoteReplyTest "")
|
add_executable(syncRequestVoteReplyTest "")
|
||||||
add_executable(syncAppendEntriesTest "")
|
add_executable(syncAppendEntriesTest "")
|
||||||
|
@ -137,6 +138,10 @@ target_sources(syncEntryCacheTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"syncEntryCacheTest.cpp"
|
"syncEntryCacheTest.cpp"
|
||||||
)
|
)
|
||||||
|
target_sources(syncHashCacheTest
|
||||||
|
PRIVATE
|
||||||
|
"syncHashCacheTest.cpp"
|
||||||
|
)
|
||||||
target_sources(syncRequestVoteTest
|
target_sources(syncRequestVoteTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"syncRequestVoteTest.cpp"
|
"syncRequestVoteTest.cpp"
|
||||||
|
@ -387,6 +392,11 @@ target_include_directories(syncEntryCacheTest
|
||||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
target_include_directories(syncHashCacheTest
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
target_include_directories(syncRequestVoteTest
|
target_include_directories(syncRequestVoteTest
|
||||||
PUBLIC
|
PUBLIC
|
||||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
|
@ -654,6 +664,10 @@ target_link_libraries(syncEntryCacheTest
|
||||||
sync
|
sync
|
||||||
gtest_main
|
gtest_main
|
||||||
)
|
)
|
||||||
|
target_link_libraries(syncHashCacheTest
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
target_link_libraries(syncRequestVoteTest
|
target_link_libraries(syncRequestVoteTest
|
||||||
sync
|
sync
|
||||||
gtest_main
|
gtest_main
|
||||||
|
|
|
@ -43,7 +43,7 @@ SRaftEntryCache* createCache(int maxCount) {
|
||||||
SSyncNode* pSyncNode = createFakeNode();
|
SSyncNode* pSyncNode = createFakeNode();
|
||||||
ASSERT(pSyncNode != NULL);
|
ASSERT(pSyncNode != NULL);
|
||||||
|
|
||||||
SRaftEntryCache* pCache = raftCacheCreate(pSyncNode, maxCount);
|
SRaftEntryCache* pCache = raftEntryCacheCreate(pSyncNode, maxCount);
|
||||||
ASSERT(pCache != NULL);
|
ASSERT(pCache != NULL);
|
||||||
|
|
||||||
return pCache;
|
return pCache;
|
||||||
|
@ -52,213 +52,73 @@ SRaftEntryCache* createCache(int maxCount) {
|
||||||
void test1() {
|
void test1() {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SRaftEntryCache* pCache = createCache(5);
|
SRaftEntryCache* pCache = createCache(5);
|
||||||
for (int i = 0; i < 5; ++i) {
|
for (int i = 0; i < 10; ++i) {
|
||||||
SSyncRaftEntry* pEntry = createEntry(i);
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
code = raftCachePutEntry(pCache, pEntry);
|
code = raftEntryCachePutEntry(pCache, pEntry);
|
||||||
ASSERT(code == 1);
|
sTrace("put entry code:%d, pEntry:%p", code, pEntry);
|
||||||
syncEntryDestory(pEntry);
|
|
||||||
}
|
}
|
||||||
raftCacheLog2((char*)"==test1 write 5 entries==", pCache);
|
raftEntryCacheLog2((char*)"==test1 write 5 entries==", pCache);
|
||||||
|
|
||||||
SyncIndex index;
|
raftEntryCacheClear(pCache, 3);
|
||||||
index = 1;
|
raftEntryCacheLog2((char*)"==test1 evict 3 entries==", pCache);
|
||||||
code = raftCacheDelEntry(pCache, index);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
index = 3;
|
|
||||||
code = raftCacheDelEntry(pCache, index);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
raftCacheLog2((char*)"==test1 delete 1,3==", pCache);
|
|
||||||
|
|
||||||
code = raftCacheClear(pCache);
|
raftEntryCacheClear(pCache, -1);
|
||||||
ASSERT(code == 0);
|
raftEntryCacheLog2((char*)"==test1 evict -1(all) entries==", pCache);
|
||||||
raftCacheLog2((char*)"==clear all==", pCache);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void test2() {
|
void test2() {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SRaftEntryCache* pCache = createCache(5);
|
SRaftEntryCache* pCache = createCache(5);
|
||||||
for (int i = 0; i < 5; ++i) {
|
for (int i = 0; i < 10; ++i) {
|
||||||
SSyncRaftEntry* pEntry = createEntry(i);
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
code = raftCachePutEntry(pCache, pEntry);
|
code = raftEntryCachePutEntry(pCache, pEntry);
|
||||||
ASSERT(code == 1);
|
sTrace("put entry code:%d, pEntry:%p", code, pEntry);
|
||||||
syncEntryDestory(pEntry);
|
|
||||||
}
|
}
|
||||||
raftCacheLog2((char*)"==test2 write 5 entries==", pCache);
|
raftEntryCacheLog2((char*)"==test1 write 5 entries==", pCache);
|
||||||
|
|
||||||
SyncIndex index;
|
SyncIndex index = 2;
|
||||||
index = 1;
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
SSyncRaftEntry* pEntry;
|
|
||||||
code = raftCacheGetEntry(pCache, index, &pEntry);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
syncEntryDestory(pEntry);
|
|
||||||
syncEntryLog2((char*)"==test2 get entry 1==", pEntry);
|
|
||||||
|
|
||||||
index = 2;
|
code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
|
||||||
code = raftCacheGetEntryP(pCache, index, &pEntry);
|
ASSERT(code == 1 && index == pEntry->index);
|
||||||
ASSERT(code == 0);
|
sTrace("get entry:%p for %ld", pEntry, index);
|
||||||
syncEntryLog2((char*)"==test2 get entry pointer 2==", pEntry);
|
syncEntryLog2((char*)"==test2 get entry pointer 2==", pEntry);
|
||||||
|
|
||||||
|
code = raftEntryCacheGetEntry(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == 1 && index == pEntry->index);
|
||||||
|
sTrace("get entry:%p for %ld", pEntry, index);
|
||||||
|
syncEntryLog2((char*)"==test2 get entry 2==", pEntry);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
|
||||||
// not found
|
// not found
|
||||||
index = 8;
|
index = 8;
|
||||||
code = raftCacheGetEntry(pCache, index, &pEntry);
|
code = raftEntryCacheGetEntry(pCache, index, &pEntry);
|
||||||
ASSERT(code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
|
ASSERT(code == 0);
|
||||||
|
sTrace("get entry:%p for %ld", pEntry, index);
|
||||||
sTrace("==test2 get entry 8 not found==");
|
sTrace("==test2 get entry 8 not found==");
|
||||||
|
|
||||||
// not found
|
// not found
|
||||||
index = 9;
|
index = 9;
|
||||||
code = raftCacheGetEntryP(pCache, index, &pEntry);
|
code = raftEntryCacheGetEntry(pCache, index, &pEntry);
|
||||||
ASSERT(code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
|
ASSERT(code == 0);
|
||||||
sTrace("==test2 get entry pointer 9 not found==");
|
sTrace("get entry:%p for %ld", pEntry, index);
|
||||||
|
sTrace("==test2 get entry 9 not found==");
|
||||||
}
|
}
|
||||||
|
|
||||||
void test3() {
|
void test3() {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SRaftEntryCache* pCache = createCache(5);
|
SRaftEntryCache* pCache = createCache(20);
|
||||||
for (int i = 0; i < 5; ++i) {
|
for (int i = 0; i <= 4; ++i) {
|
||||||
SSyncRaftEntry* pEntry = createEntry(i);
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
code = raftCachePutEntry(pCache, pEntry);
|
code = raftEntryCachePutEntry(pCache, pEntry);
|
||||||
ASSERT(code == 1);
|
sTrace("put entry code:%d, pEntry:%p", code, pEntry);
|
||||||
syncEntryDestory(pEntry);
|
|
||||||
}
|
}
|
||||||
for (int i = 6; i < 10; ++i) {
|
|
||||||
SSyncRaftEntry* pEntry = createEntry(i);
|
|
||||||
code = raftCachePutEntry(pCache, pEntry);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
syncEntryDestory(pEntry);
|
|
||||||
}
|
|
||||||
raftCacheLog2((char*)"==test3 write 10 entries, max count is 5==", pCache);
|
|
||||||
}
|
|
||||||
|
|
||||||
void test4() {
|
|
||||||
int32_t code = 0;
|
|
||||||
SRaftEntryCache* pCache = createCache(5);
|
|
||||||
for (int i = 0; i < 5; ++i) {
|
|
||||||
SSyncRaftEntry* pEntry = createEntry(i);
|
|
||||||
code = raftCachePutEntry(pCache, pEntry);
|
|
||||||
ASSERT(code == 1);
|
|
||||||
syncEntryDestory(pEntry);
|
|
||||||
}
|
|
||||||
raftCacheLog2((char*)"==test4 write 5 entries==", pCache);
|
|
||||||
|
|
||||||
SyncIndex index;
|
|
||||||
index = 3;
|
|
||||||
SSyncRaftEntry* pEntry;
|
|
||||||
code = raftCacheGetAndDel(pCache, index, &pEntry);
|
|
||||||
ASSERT(code == 0);
|
|
||||||
syncEntryLog2((char*)"==test4 get-and-del entry 3==", pEntry);
|
|
||||||
raftCacheLog2((char*)"==test4 after get-and-del entry 3==", pCache);
|
|
||||||
}
|
|
||||||
|
|
||||||
static char* keyFn(const void* pData) {
|
|
||||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
|
|
||||||
return (char*)(&(pEntry->index));
|
|
||||||
}
|
|
||||||
|
|
||||||
static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }
|
|
||||||
|
|
||||||
void printSkipList(SSkipList* pSkipList) {
|
|
||||||
ASSERT(pSkipList != NULL);
|
|
||||||
|
|
||||||
SSkipListIterator* pIter = tSkipListCreateIter(pSkipList);
|
|
||||||
while (tSkipListIterNext(pIter)) {
|
|
||||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
|
||||||
ASSERT(pNode != NULL);
|
|
||||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
|
||||||
syncEntryPrint2((char*)"", pEntry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void delSkipListFirst(SSkipList* pSkipList, int n) {
|
|
||||||
ASSERT(pSkipList != NULL);
|
|
||||||
|
|
||||||
sTrace("delete first %d -------------", n);
|
|
||||||
SSkipListIterator* pIter = tSkipListCreateIter(pSkipList);
|
|
||||||
for (int i = 0; i < n; ++i) {
|
|
||||||
tSkipListIterNext(pIter);
|
|
||||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
|
||||||
tSkipListRemoveNode(pSkipList, pNode);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
SSyncRaftEntry* getLogEntry2(SSkipList* pSkipList, SyncIndex index) {
|
|
||||||
SyncIndex index2 = index;
|
|
||||||
SSyncRaftEntry *pEntry = NULL;
|
|
||||||
int arraySize = 0;
|
|
||||||
|
|
||||||
SArray* entryPArray = tSkipListGet(pSkipList, (char*)(&index2));
|
|
||||||
arraySize = taosArrayGetSize(entryPArray);
|
|
||||||
if (arraySize > 0) {
|
|
||||||
SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
|
|
||||||
ASSERT(*ppNode != NULL);
|
|
||||||
pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(entryPArray);
|
|
||||||
|
|
||||||
sTrace("get index2: %ld, arraySize:%d -------------", index, arraySize);
|
|
||||||
syncEntryLog2((char*)"getLogEntry2", pEntry);
|
|
||||||
return pEntry;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
SSyncRaftEntry* getLogEntry(SSkipList* pSkipList, SyncIndex index) {
|
|
||||||
sTrace("get index: %ld -------------", index);
|
|
||||||
SyncIndex index2 = index;
|
|
||||||
SSyncRaftEntry *pEntry = NULL;
|
|
||||||
SSkipListIterator* pIter = tSkipListCreateIterFromVal(pSkipList, (const char *)&index2, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
|
||||||
if (tSkipListIterNext(pIter)) {
|
|
||||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
|
||||||
ASSERT(pNode != NULL);
|
|
||||||
pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
|
||||||
}
|
|
||||||
|
|
||||||
syncEntryLog2((char*)"getLogEntry", pEntry);
|
|
||||||
return pEntry;
|
|
||||||
}
|
|
||||||
|
|
||||||
void test5() {
|
|
||||||
SSkipList* pSkipList =
|
|
||||||
tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
|
|
||||||
ASSERT(pSkipList != NULL);
|
|
||||||
|
|
||||||
sTrace("insert 9 - 5");
|
|
||||||
for (int i = 9; i >= 5; --i) {
|
for (int i = 9; i >= 5; --i) {
|
||||||
SSyncRaftEntry* pEntry = createEntry(i);
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry);
|
code = raftEntryCachePutEntry(pCache, pEntry);
|
||||||
|
sTrace("put entry code:%d, pEntry:%p", code, pEntry);
|
||||||
}
|
}
|
||||||
|
raftEntryCacheLog2((char*)"==test3 write 10 entries==", pCache);
|
||||||
sTrace("insert 0 - 4");
|
|
||||||
for (int i = 0; i <= 4; ++i) {
|
|
||||||
SSyncRaftEntry* pEntry = createEntry(i);
|
|
||||||
SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry);
|
|
||||||
}
|
|
||||||
|
|
||||||
sTrace("insert 7 7 7 7 7");
|
|
||||||
for (int i = 0; i <= 4; ++i) {
|
|
||||||
SSyncRaftEntry* pEntry = createEntry(7);
|
|
||||||
SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry);
|
|
||||||
}
|
|
||||||
|
|
||||||
sTrace("print: -------------");
|
|
||||||
printSkipList(pSkipList);
|
|
||||||
|
|
||||||
delSkipListFirst(pSkipList, 3);
|
|
||||||
|
|
||||||
sTrace("print: -------------");
|
|
||||||
printSkipList(pSkipList);
|
|
||||||
|
|
||||||
getLogEntry(pSkipList, 2);
|
|
||||||
getLogEntry(pSkipList, 5);
|
|
||||||
getLogEntry(pSkipList, 7);
|
|
||||||
getLogEntry(pSkipList, 7);
|
|
||||||
|
|
||||||
getLogEntry2(pSkipList, 2);
|
|
||||||
getLogEntry2(pSkipList, 5);
|
|
||||||
getLogEntry2(pSkipList, 7);
|
|
||||||
getLogEntry2(pSkipList, 7);
|
|
||||||
|
|
||||||
|
|
||||||
tSkipListDestroy(pSkipList);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -266,14 +126,9 @@ int main(int argc, char** argv) {
|
||||||
tsAsyncLog = 0;
|
tsAsyncLog = 0;
|
||||||
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG;
|
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG;
|
||||||
|
|
||||||
/*
|
|
||||||
test1();
|
test1();
|
||||||
test2();
|
test2();
|
||||||
test3();
|
test3();
|
||||||
test4();
|
|
||||||
*/
|
|
||||||
|
|
||||||
test5();
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,277 @@
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncEnv.h"
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncRaftLog.h"
|
||||||
|
#include "syncRaftStore.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
#include "tskiplist.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncRaftEntry* createEntry(int i) {
|
||||||
|
int32_t dataLen = 20;
|
||||||
|
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
|
||||||
|
assert(pEntry != NULL);
|
||||||
|
pEntry->msgType = 88;
|
||||||
|
pEntry->originalRpcType = 99;
|
||||||
|
pEntry->seqNum = 3;
|
||||||
|
pEntry->isWeak = true;
|
||||||
|
pEntry->term = 100 + i;
|
||||||
|
pEntry->index = i;
|
||||||
|
snprintf(pEntry->data, dataLen, "value%d", i);
|
||||||
|
|
||||||
|
return pEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncNode* createFakeNode() {
|
||||||
|
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
|
||||||
|
ASSERT(pSyncNode != NULL);
|
||||||
|
memset(pSyncNode, 0, sizeof(SSyncNode));
|
||||||
|
|
||||||
|
return pSyncNode;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRaftEntryHashCache* createCache(int maxCount) {
|
||||||
|
SSyncNode* pSyncNode = createFakeNode();
|
||||||
|
ASSERT(pSyncNode != NULL);
|
||||||
|
|
||||||
|
SRaftEntryHashCache* pCache = raftCacheCreate(pSyncNode, maxCount);
|
||||||
|
ASSERT(pCache != NULL);
|
||||||
|
|
||||||
|
return pCache;
|
||||||
|
}
|
||||||
|
|
||||||
|
void test1() {
|
||||||
|
int32_t code = 0;
|
||||||
|
SRaftEntryHashCache* pCache = createCache(5);
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
code = raftCachePutEntry(pCache, pEntry);
|
||||||
|
ASSERT(code == 1);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
raftCacheLog2((char*)"==test1 write 5 entries==", pCache);
|
||||||
|
|
||||||
|
SyncIndex index;
|
||||||
|
index = 1;
|
||||||
|
code = raftCacheDelEntry(pCache, index);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
index = 3;
|
||||||
|
code = raftCacheDelEntry(pCache, index);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
raftCacheLog2((char*)"==test1 delete 1,3==", pCache);
|
||||||
|
|
||||||
|
code = raftCacheClear(pCache);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
raftCacheLog2((char*)"==clear all==", pCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test2() {
|
||||||
|
int32_t code = 0;
|
||||||
|
SRaftEntryHashCache* pCache = createCache(5);
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
code = raftCachePutEntry(pCache, pEntry);
|
||||||
|
ASSERT(code == 1);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
raftCacheLog2((char*)"==test2 write 5 entries==", pCache);
|
||||||
|
|
||||||
|
SyncIndex index;
|
||||||
|
index = 1;
|
||||||
|
SSyncRaftEntry* pEntry;
|
||||||
|
code = raftCacheGetEntry(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
syncEntryLog2((char*)"==test2 get entry 1==", pEntry);
|
||||||
|
|
||||||
|
index = 2;
|
||||||
|
code = raftCacheGetEntryP(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
syncEntryLog2((char*)"==test2 get entry pointer 2==", pEntry);
|
||||||
|
|
||||||
|
// not found
|
||||||
|
index = 8;
|
||||||
|
code = raftCacheGetEntry(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||||
|
sTrace("==test2 get entry 8 not found==");
|
||||||
|
|
||||||
|
// not found
|
||||||
|
index = 9;
|
||||||
|
code = raftCacheGetEntryP(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == -1 && terrno == TSDB_CODE_WAL_LOG_NOT_EXIST);
|
||||||
|
sTrace("==test2 get entry pointer 9 not found==");
|
||||||
|
}
|
||||||
|
|
||||||
|
void test3() {
|
||||||
|
int32_t code = 0;
|
||||||
|
SRaftEntryHashCache* pCache = createCache(5);
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
code = raftCachePutEntry(pCache, pEntry);
|
||||||
|
ASSERT(code == 1);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
for (int i = 6; i < 10; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
code = raftCachePutEntry(pCache, pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
raftCacheLog2((char*)"==test3 write 10 entries, max count is 5==", pCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test4() {
|
||||||
|
int32_t code = 0;
|
||||||
|
SRaftEntryHashCache* pCache = createCache(5);
|
||||||
|
for (int i = 0; i < 5; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
code = raftCachePutEntry(pCache, pEntry);
|
||||||
|
ASSERT(code == 1);
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
raftCacheLog2((char*)"==test4 write 5 entries==", pCache);
|
||||||
|
|
||||||
|
SyncIndex index;
|
||||||
|
index = 3;
|
||||||
|
SSyncRaftEntry* pEntry;
|
||||||
|
code = raftCacheGetAndDel(pCache, index, &pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
syncEntryLog2((char*)"==test4 get-and-del entry 3==", pEntry);
|
||||||
|
raftCacheLog2((char*)"==test4 after get-and-del entry 3==", pCache);
|
||||||
|
}
|
||||||
|
|
||||||
|
static char* keyFn(const void* pData) {
|
||||||
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
|
||||||
|
return (char*)(&(pEntry->index));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }
|
||||||
|
|
||||||
|
void printSkipList(SSkipList* pSkipList) {
|
||||||
|
ASSERT(pSkipList != NULL);
|
||||||
|
|
||||||
|
SSkipListIterator* pIter = tSkipListCreateIter(pSkipList);
|
||||||
|
while (tSkipListIterNext(pIter)) {
|
||||||
|
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||||
|
ASSERT(pNode != NULL);
|
||||||
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||||
|
syncEntryPrint2((char*)"", pEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void delSkipListFirst(SSkipList* pSkipList, int n) {
|
||||||
|
ASSERT(pSkipList != NULL);
|
||||||
|
|
||||||
|
sTrace("delete first %d -------------", n);
|
||||||
|
SSkipListIterator* pIter = tSkipListCreateIter(pSkipList);
|
||||||
|
for (int i = 0; i < n; ++i) {
|
||||||
|
tSkipListIterNext(pIter);
|
||||||
|
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||||
|
tSkipListRemoveNode(pSkipList, pNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncRaftEntry* getLogEntry2(SSkipList* pSkipList, SyncIndex index) {
|
||||||
|
SyncIndex index2 = index;
|
||||||
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
|
int arraySize = 0;
|
||||||
|
|
||||||
|
SArray* entryPArray = tSkipListGet(pSkipList, (char*)(&index2));
|
||||||
|
arraySize = taosArrayGetSize(entryPArray);
|
||||||
|
if (arraySize > 0) {
|
||||||
|
SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
|
||||||
|
ASSERT(*ppNode != NULL);
|
||||||
|
pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(entryPArray);
|
||||||
|
|
||||||
|
sTrace("get index2: %ld, arraySize:%d -------------", index, arraySize);
|
||||||
|
syncEntryLog2((char*)"getLogEntry2", pEntry);
|
||||||
|
return pEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
SSyncRaftEntry* getLogEntry(SSkipList* pSkipList, SyncIndex index) {
|
||||||
|
sTrace("get index: %ld -------------", index);
|
||||||
|
SyncIndex index2 = index;
|
||||||
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
|
SSkipListIterator* pIter =
|
||||||
|
tSkipListCreateIterFromVal(pSkipList, (const char*)&index2, TSDB_DATA_TYPE_BINARY, TSDB_ORDER_ASC);
|
||||||
|
if (tSkipListIterNext(pIter)) {
|
||||||
|
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||||
|
ASSERT(pNode != NULL);
|
||||||
|
pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
syncEntryLog2((char*)"getLogEntry", pEntry);
|
||||||
|
return pEntry;
|
||||||
|
}
|
||||||
|
|
||||||
|
void test5() {
|
||||||
|
SSkipList* pSkipList =
|
||||||
|
tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
|
||||||
|
ASSERT(pSkipList != NULL);
|
||||||
|
|
||||||
|
sTrace("insert 9 - 5");
|
||||||
|
for (int i = 9; i >= 5; --i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
sTrace("insert 0 - 4");
|
||||||
|
for (int i = 0; i <= 4; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(i);
|
||||||
|
SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
sTrace("insert 7 7 7 7 7");
|
||||||
|
for (int i = 0; i <= 4; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = createEntry(7);
|
||||||
|
SSkipListNode* pSkipListNode = tSkipListPut(pSkipList, pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
sTrace("print: -------------");
|
||||||
|
printSkipList(pSkipList);
|
||||||
|
|
||||||
|
delSkipListFirst(pSkipList, 3);
|
||||||
|
|
||||||
|
sTrace("print: -------------");
|
||||||
|
printSkipList(pSkipList);
|
||||||
|
|
||||||
|
getLogEntry(pSkipList, 2);
|
||||||
|
getLogEntry(pSkipList, 5);
|
||||||
|
getLogEntry(pSkipList, 7);
|
||||||
|
getLogEntry(pSkipList, 7);
|
||||||
|
|
||||||
|
getLogEntry2(pSkipList, 2);
|
||||||
|
getLogEntry2(pSkipList, 5);
|
||||||
|
getLogEntry2(pSkipList, 7);
|
||||||
|
getLogEntry2(pSkipList, 7);
|
||||||
|
|
||||||
|
tSkipListDestroy(pSkipList);
|
||||||
|
}
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
gRaftDetailLog = true;
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE + DEBUG_DEBUG;
|
||||||
|
|
||||||
|
/*
|
||||||
|
test1();
|
||||||
|
test2();
|
||||||
|
test3();
|
||||||
|
test4();
|
||||||
|
*/
|
||||||
|
|
||||||
|
test5();
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
Loading…
Reference in New Issue