Merge pull request #20112 from taosdata/FIX/TD-21669-3.0
fix: resolve coverity scan issues in sync and wal
This commit is contained in:
commit
51b3e50e87
|
@ -68,7 +68,7 @@ void syncNodeLogReplMgrDestroy(SSyncNode* pNode);
|
|||
|
||||
// access
|
||||
static FORCE_INLINE int64_t syncLogGetRetryBackoffTimeMs(SSyncLogReplMgr* pMgr) {
|
||||
return (1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS;
|
||||
return ((int64_t)1 << pMgr->retryBackoff) * SYNC_LOG_REPL_RETRY_WAIT_MS;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t syncLogGetNextRetryBackoff(SSyncLogReplMgr* pMgr) {
|
||||
|
|
|
@ -49,39 +49,6 @@ static FORCE_INLINE bool syncLogIsReplicationBarrier(SSyncRaftEntry* pEntry) {
|
|||
return pEntry->originalRpcType == TDMT_SYNC_NOOP;
|
||||
}
|
||||
|
||||
typedef struct SRaftEntryHashCache {
|
||||
SHashObj* pEntryHash;
|
||||
int32_t maxCount;
|
||||
int32_t currentCount;
|
||||
TdThreadMutex mutex;
|
||||
SSyncNode* pSyncNode;
|
||||
} SRaftEntryHashCache;
|
||||
|
||||
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount);
|
||||
void raftCacheDestroy(SRaftEntryHashCache* pCache);
|
||||
int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry);
|
||||
int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||
int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||
int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index);
|
||||
int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||
int32_t raftCacheClear(struct SRaftEntryHashCache* pCache);
|
||||
|
||||
typedef struct SRaftEntryCache {
|
||||
SSkipList* pSkipList;
|
||||
int32_t maxCount;
|
||||
int32_t currentCount;
|
||||
int32_t refMgr;
|
||||
TdThreadMutex mutex;
|
||||
SSyncNode* pSyncNode;
|
||||
} SRaftEntryCache;
|
||||
|
||||
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount);
|
||||
void raftEntryCacheDestroy(SRaftEntryCache* pCache);
|
||||
int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry);
|
||||
int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||
int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||
int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -104,6 +104,8 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
SyncAppendEntries* pMsg = pRpcMsg->pCont;
|
||||
SRpcMsg rpcRsp = {0};
|
||||
bool accepted = false;
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
|
||||
// if already drop replica, do not process
|
||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId))) {
|
||||
syncLogRecvAppendEntries(ths, pMsg, "not in my config");
|
||||
|
@ -137,14 +139,13 @@ int32_t syncNodeOnAppendEntries(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
|||
syncNodeStepDown(ths, pMsg->term);
|
||||
syncNodeResetElectTimer(ths);
|
||||
|
||||
if (pMsg->dataLen < (int32_t)sizeof(SSyncRaftEntry)) {
|
||||
if (pMsg->dataLen < sizeof(SSyncRaftEntry)) {
|
||||
sError("vgId:%d, incomplete append entries received. prev index:%" PRId64 ", term:%" PRId64 ", datalen:%d",
|
||||
ths->vgId, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
||||
goto _IGNORE;
|
||||
}
|
||||
|
||||
SSyncRaftEntry* pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
|
||||
|
||||
pEntry = syncBuildRaftEntryFromAppendEntries(pMsg);
|
||||
if (pEntry == NULL) {
|
||||
sError("vgId:%d, failed to get raft entry from append entries since %s", ths->vgId, terrstr());
|
||||
goto _IGNORE;
|
||||
|
@ -191,5 +192,6 @@ _out:
|
|||
|
||||
_IGNORE:
|
||||
rpcFreeCont(rpcRsp.pCont);
|
||||
syncEntryDestroy(pEntry);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@
|
|||
//
|
||||
|
||||
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) {
|
||||
SyncAppendEntriesReply* pMsg = pRpcMsg->pCont;
|
||||
SyncAppendEntriesReply* pMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont;
|
||||
int32_t ret = 0;
|
||||
|
||||
// if already drop replica, do not process
|
||||
|
|
|
@ -1126,29 +1126,18 @@ int32_t syncNodeStartStandBy(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
void syncNodePreClose(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode != NULL && pSyncNode->pFsm != NULL && pSyncNode->pFsm->FpApplyQueueItems != NULL) {
|
||||
while (1) {
|
||||
int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
|
||||
sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
|
||||
if (aqItems == 0 || aqItems == -1) {
|
||||
break;
|
||||
}
|
||||
taosMsleep(20);
|
||||
}
|
||||
}
|
||||
ASSERT(pSyncNode != NULL);
|
||||
ASSERT(pSyncNode->pFsm != NULL);
|
||||
ASSERT(pSyncNode->pFsm->FpApplyQueueItems != NULL);
|
||||
|
||||
#if 0
|
||||
if (pSyncNode->pNewNodeReceiver != NULL) {
|
||||
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
||||
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
|
||||
while (1) {
|
||||
int32_t aqItems = pSyncNode->pFsm->FpApplyQueueItems(pSyncNode->pFsm);
|
||||
sTrace("vgId:%d, pre close, %d items in apply queue", pSyncNode->vgId, aqItems);
|
||||
if (aqItems == 0 || aqItems == -1) {
|
||||
break;
|
||||
}
|
||||
|
||||
sDebug("vgId:%d, snapshot receiver destroy while preclose sync node, data:%p", pSyncNode->vgId,
|
||||
pSyncNode->pNewNodeReceiver);
|
||||
snapshotReceiverDestroy(pSyncNode->pNewNodeReceiver);
|
||||
pSyncNode->pNewNodeReceiver = NULL;
|
||||
taosMsleep(20);
|
||||
}
|
||||
#endif
|
||||
|
||||
// stop elect timer
|
||||
syncNodeStopElectTimer(pSyncNode);
|
||||
|
@ -1461,7 +1450,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde
|
|||
}
|
||||
|
||||
// log begin config change
|
||||
sNInfo(pSyncNode, "begin do config change, from %d to %d", pSyncNode->vgId, oldConfig.replicaNum,
|
||||
sNInfo(pSyncNode, "begin do config change, from %d to %d, replicas:%d", pSyncNode->vgId, oldConfig.replicaNum,
|
||||
pNewConfig->replicaNum);
|
||||
|
||||
if (IamInNew) {
|
||||
|
@ -1742,8 +1731,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) {
|
|||
#endif
|
||||
|
||||
// close receiver
|
||||
if (pSyncNode != NULL && pSyncNode->pNewNodeReceiver != NULL &&
|
||||
snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
||||
if (snapshotReceiverIsStart(pSyncNode->pNewNodeReceiver)) {
|
||||
snapshotReceiverStop(pSyncNode->pNewNodeReceiver);
|
||||
}
|
||||
|
||||
|
|
|
@ -901,7 +901,7 @@ int32_t syncLogReplMgrProcessReplyAsNormal(SSyncLogReplMgr* pMgr, SSyncNode* pNo
|
|||
int64_t firstSentMs = pMgr->states[pMgr->startIndex % pMgr->size].timeMs;
|
||||
int64_t lastSentMs = pMgr->states[(pMgr->endIndex - 1) % pMgr->size].timeMs;
|
||||
int64_t timeDiffMs = lastSentMs - firstSentMs;
|
||||
if (timeDiffMs > 0 && timeDiffMs < (SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
|
||||
if (timeDiffMs > 0 && timeDiffMs < ((int64_t)SYNC_LOG_REPL_RETRY_WAIT_MS << (pMgr->retryBackoff - 1))) {
|
||||
pMgr->retryBackoff -= 1;
|
||||
}
|
||||
}
|
||||
|
@ -928,10 +928,6 @@ SSyncLogReplMgr* syncLogReplMgrCreate() {
|
|||
ASSERT(pMgr->size == TSDB_SYNC_LOG_BUFFER_SIZE);
|
||||
|
||||
return pMgr;
|
||||
|
||||
_err:
|
||||
taosMemoryFree(pMgr);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void syncLogReplMgrDestroy(SSyncLogReplMgr* pMgr) {
|
||||
|
|
|
@ -224,7 +224,7 @@ _OVER:
|
|||
|
||||
int32_t syncAddCfgIndex(SSyncNode *pNode, SyncIndex cfgIndex) {
|
||||
SRaftCfg *pCfg = &pNode->raftCfg;
|
||||
if (pCfg->configIndexCount <= MAX_CONFIG_INDEX_COUNT) {
|
||||
if (pCfg->configIndexCount < MAX_CONFIG_INDEX_COUNT) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -102,344 +102,3 @@ void syncEntry2OriginalRpc(const SSyncRaftEntry* pEntry, SRpcMsg* pRpcMsg) {
|
|||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
memcpy(pRpcMsg->pCont, pEntry->data, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
SRaftEntryHashCache* raftCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
||||
SRaftEntryHashCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryHashCache));
|
||||
if (pCache == NULL) {
|
||||
sError("vgId:%d, raft cache create error", pSyncNode->vgId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pCache->pEntryHash =
|
||||
taosHashInit(sizeof(SyncIndex), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK);
|
||||
if (pCache->pEntryHash == NULL) {
|
||||
sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexInit(&(pCache->mutex), NULL);
|
||||
pCache->maxCount = maxCount;
|
||||
pCache->currentCount = 0;
|
||||
pCache->pSyncNode = pSyncNode;
|
||||
|
||||
return pCache;
|
||||
}
|
||||
|
||||
void raftCacheDestroy(SRaftEntryHashCache* pCache) {
|
||||
if (pCache != NULL) {
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
taosHashCleanup(pCache->pEntryHash);
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
taosThreadMutexDestroy(&(pCache->mutex));
|
||||
taosMemoryFree(pCache);
|
||||
}
|
||||
}
|
||||
|
||||
// success, return 1
|
||||
// max count, return 0
|
||||
// error, return -1
|
||||
int32_t raftCachePutEntry(struct SRaftEntryHashCache* pCache, SSyncRaftEntry* pEntry) {
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
|
||||
if (pCache->currentCount >= pCache->maxCount) {
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
taosHashPut(pCache->pEntryHash, &(pEntry->index), sizeof(pEntry->index), pEntry, pEntry->bytes);
|
||||
++(pCache->currentCount);
|
||||
|
||||
sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
|
||||
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
|
||||
pEntry->index, pEntry->bytes);
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return 1;
|
||||
}
|
||||
|
||||
// success, return 0
|
||||
// error, return -1
|
||||
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
||||
int32_t raftCacheGetEntry(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||
if (ppEntry == NULL) {
|
||||
return -1;
|
||||
}
|
||||
*ppEntry = NULL;
|
||||
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
|
||||
if (pTmp != NULL) {
|
||||
SSyncRaftEntry* pEntry = pTmp;
|
||||
*ppEntry = taosMemoryMalloc(pEntry->bytes);
|
||||
memcpy(*ppEntry, pTmp, pEntry->bytes);
|
||||
|
||||
sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
|
||||
TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
|
||||
(*ppEntry)->originalRpcType, (*ppEntry)->index);
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// success, return 0
|
||||
// error, return -1
|
||||
// not exist, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST
|
||||
int32_t raftCacheGetEntryP(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||
if (ppEntry == NULL) {
|
||||
return -1;
|
||||
}
|
||||
*ppEntry = NULL;
|
||||
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
|
||||
if (pTmp != NULL) {
|
||||
SSyncRaftEntry* pEntry = pTmp;
|
||||
*ppEntry = pEntry;
|
||||
|
||||
sNTrace(pCache->pSyncNode, "raft cache get, type:%s,%d, type2:%s,%d, index:%" PRId64,
|
||||
TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
|
||||
(*ppEntry)->originalRpcType, (*ppEntry)->index);
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t raftCacheDelEntry(struct SRaftEntryHashCache* pCache, SyncIndex index) {
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
|
||||
--(pCache->currentCount);
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t raftCacheGetAndDel(struct SRaftEntryHashCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||
if (ppEntry == NULL) {
|
||||
return -1;
|
||||
}
|
||||
*ppEntry = NULL;
|
||||
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
void* pTmp = taosHashGet(pCache->pEntryHash, &index, sizeof(index));
|
||||
if (pTmp != NULL) {
|
||||
SSyncRaftEntry* pEntry = pTmp;
|
||||
*ppEntry = taosMemoryMalloc(pEntry->bytes);
|
||||
memcpy(*ppEntry, pTmp, pEntry->bytes);
|
||||
|
||||
sNTrace(pCache->pSyncNode, "raft cache get-and-del, type:%s,%d, type2:%s,%d, index:%" PRId64,
|
||||
TMSG_INFO((*ppEntry)->msgType), (*ppEntry)->msgType, TMSG_INFO((*ppEntry)->originalRpcType),
|
||||
(*ppEntry)->originalRpcType, (*ppEntry)->index);
|
||||
|
||||
taosHashRemove(pCache->pEntryHash, &index, sizeof(index));
|
||||
--(pCache->currentCount);
|
||||
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
terrno = TSDB_CODE_WAL_LOG_NOT_EXIST;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t raftCacheClear(struct SRaftEntryHashCache* pCache) {
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
taosHashClear(pCache->pEntryHash);
|
||||
pCache->currentCount = 0;
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static char* keyFn(const void* pData) {
|
||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)pData;
|
||||
return (char*)(&(pEntry->index));
|
||||
}
|
||||
|
||||
static int cmpFn(const void* p1, const void* p2) { return memcmp(p1, p2, sizeof(SyncIndex)); }
|
||||
|
||||
static void freeRaftEntry(void* param) {
|
||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)param;
|
||||
syncEntryDestroy(pEntry);
|
||||
}
|
||||
|
||||
SRaftEntryCache* raftEntryCacheCreate(SSyncNode* pSyncNode, int32_t maxCount) {
|
||||
SRaftEntryCache* pCache = taosMemoryMalloc(sizeof(SRaftEntryCache));
|
||||
if (pCache == NULL) {
|
||||
sError("vgId:%d, raft cache create error", pSyncNode->vgId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pCache->pSkipList =
|
||||
tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
|
||||
if (pCache->pSkipList == NULL) {
|
||||
sError("vgId:%d, raft cache create hash error", pSyncNode->vgId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
taosThreadMutexInit(&(pCache->mutex), NULL);
|
||||
pCache->refMgr = taosOpenRef(10, freeRaftEntry);
|
||||
pCache->maxCount = maxCount;
|
||||
pCache->currentCount = 0;
|
||||
pCache->pSyncNode = pSyncNode;
|
||||
|
||||
return pCache;
|
||||
}
|
||||
|
||||
void raftEntryCacheDestroy(SRaftEntryCache* pCache) {
|
||||
if (pCache != NULL) {
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
tSkipListDestroy(pCache->pSkipList);
|
||||
if (pCache->refMgr != -1) {
|
||||
taosCloseRef(pCache->refMgr);
|
||||
pCache->refMgr = -1;
|
||||
}
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
taosThreadMutexDestroy(&(pCache->mutex));
|
||||
taosMemoryFree(pCache);
|
||||
}
|
||||
}
|
||||
|
||||
// success, return 1
|
||||
// max count, return 0
|
||||
// error, return -1
|
||||
int32_t raftEntryCachePutEntry(struct SRaftEntryCache* pCache, SSyncRaftEntry* pEntry) {
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
|
||||
if (pCache->currentCount >= pCache->maxCount) {
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return 0;
|
||||
}
|
||||
|
||||
SSkipListNode* pSkipListNode = tSkipListPut(pCache->pSkipList, pEntry);
|
||||
ASSERT(pSkipListNode != NULL);
|
||||
++(pCache->currentCount);
|
||||
|
||||
pEntry->rid = taosAddRef(pCache->refMgr, pEntry);
|
||||
ASSERT(pEntry->rid >= 0);
|
||||
|
||||
sNTrace(pCache->pSyncNode, "raft cache add, type:%s,%d, type2:%s,%d, index:%" PRId64 ", bytes:%d",
|
||||
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType,
|
||||
pEntry->index, pEntry->bytes);
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return 1;
|
||||
}
|
||||
|
||||
// find one, return 1
|
||||
// not found, return 0
|
||||
// error, return -1
|
||||
int32_t raftEntryCacheGetEntry(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||
ASSERT(ppEntry != NULL);
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
int32_t code = raftEntryCacheGetEntryP(pCache, index, &pEntry);
|
||||
if (code == 1) {
|
||||
int32_t bytes = (int32_t)pEntry->bytes;
|
||||
*ppEntry = taosMemoryMalloc((int64_t)bytes);
|
||||
memcpy(*ppEntry, pEntry, pEntry->bytes);
|
||||
(*ppEntry)->rid = -1;
|
||||
} else {
|
||||
*ppEntry = NULL;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
// find one, return 1
|
||||
// not found, return 0
|
||||
// error, return -1
|
||||
int32_t raftEntryCacheGetEntryP(struct SRaftEntryCache* pCache, SyncIndex index, SSyncRaftEntry** ppEntry) {
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
|
||||
SyncIndex index2 = index;
|
||||
int32_t code = 0;
|
||||
|
||||
SArray* entryPArray = tSkipListGet(pCache->pSkipList, (char*)(&index2));
|
||||
int32_t arraySize = taosArrayGetSize(entryPArray);
|
||||
if (arraySize == 1) {
|
||||
SSkipListNode** ppNode = (SSkipListNode**)taosArrayGet(entryPArray, 0);
|
||||
ASSERT(*ppNode != NULL);
|
||||
*ppEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(*ppNode);
|
||||
taosAcquireRef(pCache->refMgr, (*ppEntry)->rid);
|
||||
code = 1;
|
||||
|
||||
} else if (arraySize == 0) {
|
||||
code = 0;
|
||||
|
||||
} else {
|
||||
ASSERT(0);
|
||||
|
||||
code = -1;
|
||||
}
|
||||
taosArrayDestroy(entryPArray);
|
||||
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return code;
|
||||
}
|
||||
|
||||
// count = -1, clear all
|
||||
// count >= 0, clear count
|
||||
// return -1, error
|
||||
// return delete count
|
||||
int32_t raftEntryCacheClear(struct SRaftEntryCache* pCache, int32_t count) {
|
||||
taosThreadMutexLock(&pCache->mutex);
|
||||
int32_t returnCnt = 0;
|
||||
|
||||
if (count == -1) {
|
||||
// clear all
|
||||
SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
|
||||
while (tSkipListIterNext(pIter)) {
|
||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||
ASSERT(pNode != NULL);
|
||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||
syncEntryDestroy(pEntry);
|
||||
++returnCnt;
|
||||
}
|
||||
tSkipListDestroyIter(pIter);
|
||||
|
||||
tSkipListDestroy(pCache->pSkipList);
|
||||
pCache->pSkipList =
|
||||
tSkipListCreate(MAX_SKIP_LIST_LEVEL, TSDB_DATA_TYPE_BINARY, sizeof(SyncIndex), cmpFn, SL_ALLOW_DUP_KEY, keyFn);
|
||||
ASSERT(pCache->pSkipList != NULL);
|
||||
|
||||
} else {
|
||||
// clear count
|
||||
int i = 0;
|
||||
SSkipListIterator* pIter = tSkipListCreateIter(pCache->pSkipList);
|
||||
SArray* delNodeArray = taosArrayInit(0, sizeof(SSkipListNode*));
|
||||
|
||||
// free entry
|
||||
while (tSkipListIterNext(pIter)) {
|
||||
SSkipListNode* pNode = tSkipListIterGet(pIter);
|
||||
ASSERT(pNode != NULL);
|
||||
if (i++ >= count) {
|
||||
break;
|
||||
}
|
||||
|
||||
// sDebug("push pNode:%p", pNode);
|
||||
taosArrayPush(delNodeArray, &pNode);
|
||||
++returnCnt;
|
||||
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)SL_GET_NODE_DATA(pNode);
|
||||
|
||||
// syncEntryDestroy(pEntry);
|
||||
taosRemoveRef(pCache->refMgr, pEntry->rid);
|
||||
}
|
||||
tSkipListDestroyIter(pIter);
|
||||
|
||||
// delete skiplist node
|
||||
int32_t arraySize = taosArrayGetSize(delNodeArray);
|
||||
for (int32_t i = 0; i < arraySize; ++i) {
|
||||
SSkipListNode** ppNode = taosArrayGet(delNodeArray, i);
|
||||
// sDebug("get pNode:%p", *ppNode);
|
||||
tSkipListRemoveNode(pCache->pSkipList, *ppNode);
|
||||
}
|
||||
taosArrayDestroy(delNodeArray);
|
||||
}
|
||||
|
||||
pCache->currentCount -= returnCnt;
|
||||
taosThreadMutexUnlock(&pCache->mutex);
|
||||
return returnCnt;
|
||||
}
|
||||
|
|
|
@ -168,17 +168,19 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
|||
|
||||
if (pSender->blockLen > 0) {
|
||||
// has read data
|
||||
sSDebug(pSender, "snapshot sender continue to read, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
|
||||
sSDebug(pSender, "vgId:%d, snapshot sender continue to read, blockLen:%d seq:%d", pSender->pSyncNode->vgId,
|
||||
pSender->blockLen, pSender->seq);
|
||||
} else {
|
||||
// read finish, update seq to end
|
||||
pSender->seq = SYNC_SNAPSHOT_SEQ_END;
|
||||
sSInfo(pSender, "snapshot sender read to the end, blockLen:%d seq:%d", pSender->blockLen, pSender->seq);
|
||||
sSInfo(pSender, "vgId:%d, snapshot sender read to the end, blockLen:%d seq:%d", pSender->pSyncNode->vgId,
|
||||
pSender->blockLen, pSender->seq);
|
||||
}
|
||||
|
||||
// build msg
|
||||
SRpcMsg rpcMsg = {0};
|
||||
if (syncBuildSnapshotSend(&rpcMsg, pSender->blockLen, pSender->pSyncNode->vgId) != 0) {
|
||||
sSError(pSender, "snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
|
||||
sSError(pSender, "vgId:%d, snapshot sender build msg failed since %s", pSender->pSyncNode->vgId, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -340,11 +342,13 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {
|
|||
taosMemoryFree(pReceiver);
|
||||
}
|
||||
|
||||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) { return pReceiver->start; }
|
||||
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver) {
|
||||
return (pReceiver != NULL ? pReceiver->start : false);
|
||||
}
|
||||
|
||||
static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) {
|
||||
if (pReceiver->pWriter != NULL) {
|
||||
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null");
|
||||
sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId);
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
@ -851,8 +855,8 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend
|
|||
pMsg->snapBeginIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm);
|
||||
|
||||
if (pMsg->snapBeginIndex > snapshot.lastApplyIndex) {
|
||||
sSError(pSender, "prepare snapshot failed since beginIndex:%d larger than applyIndex:%d", pMsg->snapBeginIndex,
|
||||
snapshot.lastApplyIndex);
|
||||
sSError(pSender, "prepare snapshot failed since beginIndex:%" PRId64 " larger than applyIndex:%" PRId64,
|
||||
pMsg->snapBeginIndex, snapshot.lastApplyIndex);
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
@ -966,7 +970,8 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
|
|||
|
||||
if (pSender->pReader == NULL || pSender->finish) {
|
||||
syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "snapshot sender invalid");
|
||||
sSError(pSender, "snapshot sender invalid, pReader:%p finish:%d", pMsg->code, pSender->pReader, pSender->finish);
|
||||
sSError(pSender, "snapshot sender invalid error:%s 0x%x, pReader:%p finish:%d", tstrerror(pMsg->code), pMsg->code,
|
||||
pSender->pReader, pSender->finish);
|
||||
terrno = pMsg->code;
|
||||
goto _ERROR;
|
||||
}
|
||||
|
|
|
@ -913,7 +913,7 @@ int walLoadMeta(SWal* pWal) {
|
|||
int64_t fileSize = 0;
|
||||
taosStatFile(fnameStr, &fileSize, NULL);
|
||||
if (fileSize == 0) {
|
||||
taosRemoveFile(fnameStr);
|
||||
(void)taosRemoveFile(fnameStr);
|
||||
wDebug("vgId:%d, wal find empty meta ver %d", pWal->cfg.vgId, metaVer);
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -63,7 +63,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
|||
wInfo("vgId:%d, restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
||||
}
|
||||
}
|
||||
walRemoveMeta(pWal);
|
||||
(void)walRemoveMeta(pWal);
|
||||
|
||||
pWal->writeCur = -1;
|
||||
pWal->totSize = 0;
|
||||
|
|
Loading…
Reference in New Issue