diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index b6c389842e..1726d8696b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -1888,11 +1888,10 @@ _exit: } int32_t tsdbOpenCache(STsdb *pTsdb) { - int32_t code = 0, lino = 0; - SLRUCache *pCache = NULL; - size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024; + int32_t code = 0, lino = 0; + size_t cfgCapacity = pTsdb->pVnode->config.cacheLastSize * 1024 * 1024; - pCache = taosLRUCacheInit(cfgCapacity, 0, .5); + SLRUCache *pCache = taosLRUCacheInit(cfgCapacity, 0, .5); if (pCache == NULL) { TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err); } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 19c8837d83..57c70d8df1 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -51,7 +51,12 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { taosLRUCacheSetStrictCapacity(pLogStore->pCache, false); pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); - ASSERT(pLogStore->data != NULL); + if (!pLogStore->data) { + taosMemoryFree(pLogStore); + taosLRUCacheCleanup(pLogStore->pCache); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } SSyncLogStoreData* pData = pLogStore->data; pData->pSyncNode = pSyncNode; @@ -60,7 +65,13 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { taosThreadMutexInit(&(pData->mutex), NULL); pData->pWalHandle = walOpenReader(pData->pWal, NULL, 0); - ASSERT(pData->pWalHandle != NULL); + if (!pData->pWalHandle) { + taosMemoryFree(pLogStore); + taosLRUCacheCleanup(pLogStore->pCache); + taosThreadMutexDestroy(&(pData->mutex)); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } pLogStore->syncLogUpdateCommitIndex = raftLogUpdateCommitIndex; pLogStore->syncLogCommitIndex = raftlogCommitIndex; @@ -110,7 +121,7 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI SWal* pWal = pData->pWal; int32_t code = walRestoreFromSnapshot(pWal, snapshotIndex); if (code != 0) { - int32_t err = terrno; + int32_t err = code; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); @@ -118,10 +129,10 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI sNError(pData->pSyncNode, "wal restore from snapshot error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", snapshotIndex, err, errStr, sysErr, sysErrStr); - return -1; + TAOS_RETURN(err); } - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore) { @@ -224,7 +235,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr sNError(pData->pSyncNode, "wal write error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", pEntry->index, err, errStr, sysErr, sysErrStr); - return -1; + + TAOS_RETURN(err); } code = walFsync(pWal, forceSync); @@ -235,7 +247,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index, TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed); - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } // entry found, return 0 @@ -253,10 +265,10 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR SWalReader* pWalHandle = pData->pWalHandle; if (pWalHandle == NULL) { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, wal handle is NULL", pData->pSyncNode->vgId); taosThreadMutexUnlock(&(pData->mutex)); - return -1; + + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); } int64_t ts2 = taosGetTimestampNs(); @@ -266,7 +278,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR // code = walReadVerCached(pWalHandle, index); if (code != 0) { - int32_t err = terrno; + int32_t err = code; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); @@ -286,7 +298,8 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR */ taosThreadMutexUnlock(&(pData->mutex)); - return code; + + TAOS_RETURN(code); } *ppEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); @@ -319,7 +332,7 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR ", elapsed-build:%" PRId64, index, tsElapsed, tsElapsedLock, tsElapsedRead, tsElapsedBuild); - return code; + TAOS_RETURN(code); } // truncate semantic @@ -329,7 +342,7 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn int32_t code = walRollback(pWal, fromIndex); if (code != 0) { - int32_t err = terrno; + int32_t err = code; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); @@ -339,7 +352,8 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn // event log sNTrace(pData->pSyncNode, "log truncate, from-index:%" PRId64, fromIndex); - return code; + + TAOS_RETURN(code); } // entry found, return 0 @@ -352,16 +366,16 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp *ppLastEntry = NULL; if (walIsEmpty(pWal)) { - terrno = TSDB_CODE_WAL_LOG_NOT_EXIST; - return -1; + TAOS_RETURN(TSDB_CODE_WAL_LOG_NOT_EXIST); } else { SyncIndex lastIndex = raftLogLastIndex(pLogStore); ASSERT(lastIndex >= SYNC_INDEX_BEGIN); int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); - return code; + + TAOS_RETURN(code); } - return -1; + TAOS_RETURN(TSDB_CODE_FAILED); } int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { @@ -375,20 +389,22 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { if (index < snapshotVer || index > wallastVer) { // ignore - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } int32_t code = walCommit(pWal, index); if (code != 0) { - int32_t err = terrno; + int32_t err = code; const char* errStr = tstrerror(err); int32_t sysErr = errno; const char* sysErrStr = strerror(errno); sError("vgId:%d, wal update commit index error, index:%" PRId64 ", err:0x%x, msg:%s, syserr:%d, sysmsg:%s", pData->pSyncNode->vgId, index, err, errStr, sysErr, sysErrStr); - return -1; + + TAOS_RETURN(code); } - return 0; + + TAOS_RETURN(TSDB_CODE_SUCCESS); } SyncIndex raftlogCommitIndex(SSyncLogStore* pLogStore) { @@ -405,5 +421,6 @@ SyncIndex logStoreFirstIndex(SSyncLogStore* pLogStore) { SyncIndex logStoreWalCommitVer(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; + return walGetCommittedVer(pWal); } diff --git a/source/util/src/tlrucache.c b/source/util/src/tlrucache.c index 3de159797f..4f8e96c4c7 100644 --- a/source/util/src/tlrucache.c +++ b/source/util/src/tlrucache.c @@ -20,6 +20,7 @@ #include "tarray.h" #include "tdef.h" #include "tlog.h" +#include "tutil.h" typedef struct SLRUEntry SLRUEntry; typedef struct SLRUEntryTable SLRUEntryTable; @@ -114,13 +115,13 @@ static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) { table->lengthBits = 4; table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *)); if (!table->list) { - return -1; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } table->elems = 0; table->maxLengthBits = maxUpperHashBits; - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } static void taosLRUEntryTableApply(SLRUEntryTable *table, _taos_lru_table_func_t func, uint32_t begin, uint32_t end) { @@ -349,9 +350,7 @@ static void taosLRUCacheShardSetCapacity(SLRUCacheShard *shard, size_t capacity) static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio, int maxUpperHashBits) { - if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) { - return -1; - } + TAOS_CHECK_RETURN(taosLRUEntryTableInit(&shard->table, maxUpperHashBits)); taosThreadMutexInit(&shard->mutex, NULL); @@ -372,7 +371,7 @@ static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool st taosLRUCacheShardSetCapacity(shard, capacity); - return 0; + TAOS_RETURN(TSDB_CODE_SUCCESS); } static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) { @@ -671,16 +670,13 @@ static int getDefaultCacheShardBits(size_t capacity) { SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoolRatio) { if (numShardBits >= 20) { - terrno = TSDB_CODE_INVALID_PARA; return NULL; } if (highPriPoolRatio < 0.0 || highPriPoolRatio > 1.0) { - terrno = TSDB_CODE_INVALID_PARA; return NULL; } SLRUCache *cache = taosMemoryCalloc(1, sizeof(SLRUCache)); if (!cache) { - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -692,14 +688,15 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard)); if (!cache->shards) { taosMemoryFree(cache); - terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } bool strictCapacity = 1; size_t perShard = (capacity + (numShards - 1)) / numShards; for (int i = 0; i < numShards; ++i) { - taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits); + if (TSDB_CODE_SUCCESS != + taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits)) + return NULL; } cache->numShards = numShards;