Merge pull request #16960 from taosdata/enh/sync-le-cache
enh(sync): log entry cache for sync
This commit is contained in:
commit
a7b728df3c
|
@ -22,6 +22,7 @@ extern "C" {
|
||||||
|
|
||||||
#include "cJSON.h"
|
#include "cJSON.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
|
#include "tlrucache.h"
|
||||||
#include "tmsgcb.h"
|
#include "tmsgcb.h"
|
||||||
|
|
||||||
extern bool gRaftDetailLog;
|
extern bool gRaftDetailLog;
|
||||||
|
@ -153,7 +154,8 @@ typedef struct SSyncFSM {
|
||||||
// abstract definition of log store in raft
|
// abstract definition of log store in raft
|
||||||
// SWal implements it
|
// SWal implements it
|
||||||
typedef struct SSyncLogStore {
|
typedef struct SSyncLogStore {
|
||||||
void* data;
|
SLRUCache* pCache;
|
||||||
|
void* data;
|
||||||
|
|
||||||
// append one log entry
|
// append one log entry
|
||||||
int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
||||||
|
|
|
@ -69,15 +69,26 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
|
|
||||||
if (agree) {
|
if (agree) {
|
||||||
// term
|
// term
|
||||||
SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
ASSERT(pEntry != NULL);
|
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
|
||||||
|
LRUHandle* h = taosLRUCacheLookup(pCache, &index, sizeof(index));
|
||||||
|
if (h) {
|
||||||
|
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
||||||
|
} else {
|
||||||
|
pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index);
|
||||||
|
ASSERT(pEntry != NULL);
|
||||||
|
}
|
||||||
// cannot commit, even if quorum agree. need check term!
|
// cannot commit, even if quorum agree. need check term!
|
||||||
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) {
|
if (pEntry->term <= pSyncNode->pRaftStore->currentTerm) {
|
||||||
// update commit index
|
// update commit index
|
||||||
newCommitIndex = index;
|
newCommitIndex = index;
|
||||||
|
|
||||||
syncEntryDestory(pEntry);
|
if (h) {
|
||||||
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
|
} else {
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
do {
|
do {
|
||||||
|
@ -88,7 +99,11 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
|
||||||
} while (0);
|
} while (0);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncEntryDestory(pEntry);
|
if (h) {
|
||||||
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
|
} else {
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2581,6 +2581,20 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void deleteCacheEntry(const void* key, size_t keyLen, void* value) { taosMemoryFree(value); }
|
||||||
|
|
||||||
|
static int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHandle** h) {
|
||||||
|
int code = 0;
|
||||||
|
int entryLen = sizeof(*pEntry) + pEntry->dataLen;
|
||||||
|
LRUStatus status = taosLRUCacheInsert(pLogStore->pCache, &pEntry->index, sizeof(pEntry->index), pEntry, entryLen,
|
||||||
|
deleteCacheEntry, h, TAOS_LRU_PRIORITY_LOW);
|
||||||
|
if (status != TAOS_LRU_STATUS_OK) {
|
||||||
|
code = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
@ -2589,13 +2603,21 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) {
|
||||||
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
|
SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId);
|
||||||
ASSERT(pEntry != NULL);
|
ASSERT(pEntry != NULL);
|
||||||
|
|
||||||
|
LRUHandle* h = NULL;
|
||||||
|
syncCacheEntry(ths->pLogStore, pEntry, &h);
|
||||||
|
|
||||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
syncNodeReplicate(ths, false);
|
syncNodeReplicate(ths, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncEntryDestory(pEntry);
|
if (h) {
|
||||||
|
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
|
||||||
|
} else {
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2654,6 +2676,9 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
|
||||||
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
|
SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index);
|
||||||
ASSERT(pEntry != NULL);
|
ASSERT(pEntry != NULL);
|
||||||
|
|
||||||
|
LRUHandle* h = NULL;
|
||||||
|
syncCacheEntry(ths->pLogStore, pEntry, &h);
|
||||||
|
|
||||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
// append entry
|
// append entry
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||||
|
@ -2685,7 +2710,12 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
syncEntryDestory(pEntry);
|
if (h) {
|
||||||
|
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
|
||||||
|
} else {
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2973,9 +3003,15 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
for (SyncIndex i = beginIndex; i <= endIndex; ++i) {
|
||||||
if (i != SYNC_INDEX_INVALID) {
|
if (i != SYNC_INDEX_INVALID) {
|
||||||
SSyncRaftEntry* pEntry;
|
SSyncRaftEntry* pEntry;
|
||||||
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
|
SLRUCache* pCache = ths->pLogStore->pCache;
|
||||||
ASSERT(code == 0);
|
LRUHandle* h = taosLRUCacheLookup(pCache, &i, sizeof(i));
|
||||||
ASSERT(pEntry != NULL);
|
if (h) {
|
||||||
|
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
||||||
|
} else {
|
||||||
|
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, i, &pEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
ASSERT(pEntry != NULL);
|
||||||
|
}
|
||||||
|
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||||
|
@ -3058,7 +3094,11 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
}
|
}
|
||||||
|
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
syncEntryDestory(pEntry);
|
if (h) {
|
||||||
|
taosLRUCacheRelease(pCache, h, false);
|
||||||
|
} else {
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,6 +53,15 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
||||||
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
|
SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore));
|
||||||
ASSERT(pLogStore != NULL);
|
ASSERT(pLogStore != NULL);
|
||||||
|
|
||||||
|
pLogStore->pCache = taosLRUCacheInit(10 * 1024 * 1024, 1, .5);
|
||||||
|
if (pLogStore->pCache == NULL) {
|
||||||
|
terrno = TSDB_CODE_WAL_OUT_OF_MEMORY;
|
||||||
|
taosMemoryFree(pLogStore);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosLRUCacheSetStrictCapacity(pLogStore->pCache, false);
|
||||||
|
|
||||||
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
|
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
|
||||||
ASSERT(pLogStore->data != NULL);
|
ASSERT(pLogStore->data != NULL);
|
||||||
|
|
||||||
|
@ -102,6 +111,10 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
|
||||||
taosThreadMutexDestroy(&(pData->mutex));
|
taosThreadMutexDestroy(&(pData->mutex));
|
||||||
|
|
||||||
taosMemoryFree(pLogStore->data);
|
taosMemoryFree(pLogStore->data);
|
||||||
|
|
||||||
|
taosLRUCacheEraseUnrefEntries(pLogStore->pCache);
|
||||||
|
taosLRUCacheCleanup(pLogStore->pCache);
|
||||||
|
|
||||||
taosMemoryFree(pLogStore);
|
taosMemoryFree(pLogStore);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -243,7 +256,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||||
SSyncLogStoreData* pData = pLogStore->data;
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
SWal* pWal = pData->pWal;
|
SWal* pWal = pData->pWal;
|
||||||
int32_t code;
|
int32_t code = 0;
|
||||||
|
|
||||||
*ppEntry = NULL;
|
*ppEntry = NULL;
|
||||||
|
|
||||||
|
@ -257,6 +270,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
|
||||||
taosThreadMutexLock(&(pData->mutex));
|
taosThreadMutexLock(&(pData->mutex));
|
||||||
|
|
||||||
code = walReadVer(pWalHandle, index);
|
code = walReadVer(pWalHandle, index);
|
||||||
|
// code = walReadVerCached(pWalHandle, index);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
int32_t err = terrno;
|
int32_t err = terrno;
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
|
@ -412,6 +426,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
ASSERT(pWalHandle != NULL);
|
ASSERT(pWalHandle != NULL);
|
||||||
|
|
||||||
int32_t code = walReadVer(pWalHandle, index);
|
int32_t code = walReadVer(pWalHandle, index);
|
||||||
|
// int32_t code = walReadVerCached(pWalHandle, index);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
int32_t err = terrno;
|
int32_t err = terrno;
|
||||||
const char* errStr = tstrerror(err);
|
const char* errStr = tstrerror(err);
|
||||||
|
|
Loading…
Reference in New Issue