update sync raft log

This commit is contained in:
Minglei Jin 2024-07-25 11:13:04 +08:00
parent ea7eebc8a9
commit aa921c9870
3 changed files with 50 additions and 37 deletions

View File

@ -1889,10 +1889,9 @@ _exit:
int32_t tsdbOpenCache(STsdb *pTsdb) { int32_t tsdbOpenCache(STsdb *pTsdb) {
int32_t code = 0, lino = 0; int32_t code = 0, lino = 0;
SLRUCache *pCache = NULL;
size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024; size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024;
pCache = taosLRUCacheInit(cfgCapacity, 0, .5); SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5);
if (pCache == NULL) { if (pCache == NULL) {
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err); TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err);
} }

View File

@ -51,7 +51,12 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
taosLRUCacheSetStrictCapacity(pLogStore->pCache, false); taosLRUCacheSetStrictCapacity(pLogStore->pCache, false);
pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData));
ASSERT(pLogStore->data != NULL); if (!pLogStore->data) {
taosMemoryFree(pLogStore);
taosLRUCacheCleanup(pLogStore->pCache);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
pData->pSyncNode = pSyncNode; pData->pSyncNode = pSyncNode;
@ -60,7 +65,13 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
taosThreadMutexInit(&(pData->mutex), NULL); taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0); pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0);
ASSERT(pData->pWalHandle != NULL); if (!pData->pWalHandle) {
taosMemoryFree(pLogStore);
taosLRUCacheCleanup(pLogStore->pCache);
taosThreadMutexDestroy(&(pData->mutex));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex; pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex;
pLogStore->syncLogCommitIndex = raftlogCommitIndex; pLogStore->syncLogCommitIndex = raftlogCommitIndex;
@ -110,7 +121,7 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
int32_t code = walRestoreFromSnapshot(pWal, snapshotIndex); int32_t code = walRestoreFromSnapshot(pWal, snapshotIndex);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = code;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t sysErr = errno; int32_t sysErr = errno;
const char* sysErrStr = strerror(errno); const char* sysErrStr = strerror(errno);
@ -118,10 +129,10 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
sNError(pData->pSyncNode, sNError(pData->pSyncNode,
"wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex, "wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex,
err, errStr, sysErr, sysErrStr); err, errStr, sysErr, sysErrStr);
return -1; TAOS_RETURN(err);
} }
return 0; TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) { SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) {
@ -224,7 +235,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
pEntry->index, err, errStr, sysErr, sysErrStr); pEntry->index, err, errStr, sysErr, sysErrStr);
return -1;
TAOS_RETURN(err);
} }
code = walFsync(pWal, forceSync); code = walFsync(pWal, forceSync);
@ -235,7 +247,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
return 0; TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
// entry found, return 0 // entry found, return 0
@ -253,10 +265,10 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
SWalReader* pWalHandle = pData->pWalHandle; SWalReader* pWalHandle = pData->pWalHandle;
if (pWalHandle == NULL) { if (pWalHandle == NULL) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId); sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId);
taosThreadMutexUnlock(&(pData->mutex)); taosThreadMutexUnlock(&(pData->mutex));
return -1;
TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR);
} }
int64_t ts2 = taosGetTimestampNs(); int64_t ts2 = taosGetTimestampNs();
@ -266,7 +278,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
// code = walReadVerCached(pWalHandle, index); // code = walReadVerCached(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = code;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t sysErr = errno; int32_t sysErr = errno;
const char* sysErrStr = strerror(errno); const char* sysErrStr = strerror(errno);
@ -286,7 +298,8 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
*/ */
taosThreadMutexUnlock(&(pData->mutex)); taosThreadMutexUnlock(&(pData->mutex));
return code;
TAOS_RETURN(code);
} }
*ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen);
@ -319,7 +332,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
", elapsed-build:%" PRId64, ", elapsed-build:%" PRId64,
index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild); index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild);
return code; TAOS_RETURN(code);
} }
// truncate semantic // truncate semantic
@ -329,7 +342,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
int32_t code = walRollback(pWal, fromIndex); int32_t code = walRollback(pWal, fromIndex);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = code;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t sysErr = errno; int32_t sysErr = errno;
const char* sysErrStr = strerror(errno); const char* sysErrStr = strerror(errno);
@ -339,7 +352,8 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
// event log // event log
sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex); sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex);
return code;
TAOS_RETURN(code);
} }
// entry found, return 0 // entry found, return 0
@ -352,16 +366,16 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp
*ppLastEntry = NULL; *ppLastEntry = NULL;
if (walIsEmpty(pWal)) { if (walIsEmpty(pWal)) {
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST);
return -1;
} else { } else {
SyncIndex lastIndex = raftLogLastIndex(pLogStore); SyncIndex lastIndex = raftLogLastIndex(pLogStore);
ASSERT(lastIndex >= SYNC_INDEX_BEGIN); ASSERT(lastIndex >= SYNC_INDEX_BEGIN);
int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry);
return code;
TAOS_RETURN(code);
} }
return -1; TAOS_RETURN(TSDB_CODE_FAILED);
} }
int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
@ -375,20 +389,22 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
if (index < snapshotVer || index > wallastVer) { if (index < snapshotVer || index > wallastVer) {
// ignore // ignore
return 0; TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
int32_t code = walCommit(pWal, index); int32_t code = walCommit(pWal, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = code;
const char* errStr = tstrerror(err); const char* errStr = tstrerror(err);
int32_t sysErr = errno; int32_t sysErr = errno;
const char* sysErrStr = strerror(errno); const char* sysErrStr = strerror(errno);
sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s",
pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr); pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr);
return -1;
TAOS_RETURN(code);
} }
return 0;
TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) { SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) {
@ -405,5 +421,6 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) {
SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) { SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
return walGetCommittedVer(pWal); return walGetCommittedVer(pWal);
} }

View File

@ -20,6 +20,7 @@
#include "tarray.h" #include "tarray.h"
#include "tdef.h" #include "tdef.h"
#include "tlog.h" #include "tlog.h"
#include "tutil.h"
typedef struct SLRUEntry SLRUEntry; typedef struct SLRUEntry SLRUEntry;
typedef struct SLRUEntryTable SLRUEntryTable; typedef struct SLRUEntryTable SLRUEntryTable;
@ -114,13 +115,13 @@ static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) {
table->lengthBits = 4; table->lengthBits = 4;
table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *)); table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *));
if (!table->list) { if (!table->list) {
return -1; TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
} }
table->elems = 0; table->elems = 0;
table->maxLengthBits = maxUpperHashBits; table->maxLengthBits = maxUpperHashBits;
return 0; TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t func, uint32_t begin, uint32_t end) { static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t func, uint32_t begin, uint32_t end) {
@ -349,9 +350,7 @@ static void taosLRUCacheShardSetCapacity(SLRUCacheShard *shard, size_t capacity)
static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio, static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio,
int maxUpperHashBits) { int maxUpperHashBits) {
if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) { TAOS_CHECK_RETURN(taosLRUEntryTableInit(&shard->table, maxUpperHashBits));
return -1;
}
taosThreadMutexInit(&shard->mutex, NULL); taosThreadMutexInit(&shard->mutex, NULL);
@ -372,7 +371,7 @@ static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool st
taosLRUCacheShardSetCapacity(shard, capacity); taosLRUCacheShardSetCapacity(shard, capacity);
return 0; TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) { static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) {
@ -671,16 +670,13 @@ static int getDefaultCacheShardBits(size_t capacity) {
SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) { SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) {
if (numShardBits >= 20) { if (numShardBits >= 20) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL; return NULL;
} }
if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) { if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL; return NULL;
} }
SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache)); SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache));
if (!cache) { if (!cache) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
@ -692,14 +688,15 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard)); cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard));
if (!cache->shards) { if (!cache->shards) {
taosMemoryFree(cache); taosMemoryFree(cache);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
bool strictCapacity = 1; bool strictCapacity = 1;
size_t perShard = (capacity + (numShards - 1)) / numShards; size_t perShard = (capacity + (numShards - 1)) / numShards;
for (int i = 0; i < numShards; ++i) { for (int i = 0; i < numShards; ++i) {
taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits); if (TSDB_CODE_SUCCESS !=
taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits))
return NULL;
} }
cache->numShards = numShards; cache->numShards = numShards;