From ecc43b669704c7bb04060baf63524e3f3e3e45d1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 21 Jun 2022 16:02:36 +0800 Subject: [PATCH] refactor(sync): delete some assert --- source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncAppendEntries.c | 43 +++-- source/libs/sync/src/syncAppendEntriesReply.c | 2 +- source/libs/sync/src/syncCommit.c | 2 +- source/libs/sync/src/syncElection.c | 10 +- source/libs/sync/src/syncEnv.c | 8 +- source/libs/sync/src/syncIO.c | 48 ++--- source/libs/sync/src/syncIndexMgr.c | 8 +- source/libs/sync/src/syncMain.c | 134 +++++++++----- source/libs/sync/src/syncMessage.c | 170 +++++++++--------- source/libs/sync/src/syncRaftCfg.c | 48 ++--- source/libs/sync/src/syncRaftEntry.c | 16 +- source/libs/sync/src/syncRaftLog.c | 16 +- source/libs/sync/src/syncRaftStore.c | 38 ++-- source/libs/sync/src/syncReplication.c | 14 +- source/libs/sync/src/syncRequestVote.c | 2 +- source/libs/sync/src/syncRequestVoteReply.c | 6 +- source/libs/sync/src/syncRespMgr.c | 2 +- source/libs/sync/src/syncUtil.c | 8 +- source/libs/sync/src/syncVoteMgr.c | 26 +-- 20 files changed, 332 insertions(+), 270 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 999147eda4..1bd0671fb5 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -197,6 +197,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S cJSON* syncNode2Json(const SSyncNode* pSyncNode); char* syncNode2Str(const SSyncNode* pSyncNode); void syncNodeEventLog(const SSyncNode* pSyncNode, char* str); +void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str); char* syncNode2SimpleStr(const SSyncNode* pSyncNode); bool syncNodeInConfig(SSyncNode* pSyncNode, const SSyncCfg* config); void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex lastConfigChangeIndex); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index d2726201cc..c51cbc0513 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -99,19 +99,25 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { if (pMsg->term > ths->pRaftStore->currentTerm) { syncNodeUpdateTerm(ths, pMsg->term); } - assert(pMsg->term <= ths->pRaftStore->currentTerm); + ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); // reset elect timer if (pMsg->term == ths->pRaftStore->currentTerm) { ths->leaderCache = pMsg->srcId; syncNodeResetElectTimer(ths); } - assert(pMsg->dataLen >= 0); + ASSERT(pMsg->dataLen >= 0); SyncTerm localPreLogTerm = 0; if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex); - assert(pEntry != NULL); + if (pEntry == NULL) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "getEntry error, index:%ld, since %s", pMsg->prevLogIndex, terrstr()); + syncNodeErrorLog(ths, logBuf); + return -1; + } + localPreLogTerm = pEntry->term; syncEntryDestory(pEntry); } @@ -160,7 +166,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // accept request if (pMsg->term == ths->pRaftStore->currentTerm && ths->state == TAOS_SYNC_STATE_FOLLOWER && logOK) { // preIndex = -1, or has preIndex entry in local log - assert(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); + ASSERT(pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)); // has extra entries (> preIndex) in local log bool hasExtraEntries = pMsg->prevLogIndex < ths->pLogStore->getLastIndex(ths->pLogStore); @@ -179,13 +185,21 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SyncIndex extraIndex = pMsg->prevLogIndex + 1; SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex); - assert(pExtraEntry != NULL); + if (pExtraEntry == NULL) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "getEntry error2, index:%ld, since %s", extraIndex, terrstr()); + syncNodeErrorLog(ths, logBuf); + return -1; + } SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - assert(pAppendEntry != NULL); + if (pAppendEntry == NULL) { + syncNodeErrorLog(ths, "syncEntryDeserialize pAppendEntry error"); + return -1; + } // log not match, conflict - assert(extraIndex == pAppendEntry->index); + ASSERT(extraIndex == pAppendEntry->index); if (pExtraEntry->term != pAppendEntry->term) { conflict = true; } @@ -201,7 +215,12 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { for (SyncIndex index = delEnd; index >= delBegin; --index) { if (ths->pFsm->FpRollBackCb != NULL) { SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index); - assert(pRollBackEntry != NULL); + if (pRollBackEntry == NULL) { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "getEntry error3, index:%ld, since %s", index, terrstr()); + syncNodeErrorLog(ths, logBuf); + return -1; + } // if (pRollBackEntry->msgType != TDMT_SYNC_NOOP) { if (syncUtilUserRollback(pRollBackEntry->msgType)) { @@ -257,7 +276,10 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } else if (!hasExtraEntries && hasAppendEntries) { SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); - assert(pAppendEntry != NULL); + if (pAppendEntry == NULL) { + syncNodeErrorLog(ths, "syncEntryDeserialize pAppendEntry2 error"); + return -1; + } // append new entries ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); @@ -287,7 +309,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // do nothing } else { - assert(0); + syncNodeLog3("", ths); + ASSERT(0); } SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 7b342cdcff..f13a3604da 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -67,7 +67,7 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p return ret; } - assert(pMsg->term == ths->pRaftStore->currentTerm); + ASSERT(pMsg->term == ths->pRaftStore->currentTerm); if (pMsg->success) { // nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1] diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index ec3e1ab2ba..c92edae381 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -75,7 +75,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { if (agree) { // term SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); - assert(pEntry != NULL); + ASSERT(pEntry != NULL); // cannot commit, even if quorum agree. need check term! if (pEntry->term == pSyncNode->pRaftStore->currentTerm) { diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index fdebbe3990..e0f2b0fed2 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -32,7 +32,7 @@ // /\ UNCHANGED <> // int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); int32_t ret = 0; for (int i = 0; i < pSyncNode->peersNum; ++i) { @@ -44,7 +44,7 @@ int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode) { pMsg->lastLogTerm = pSyncNode->pLogStore->getLastTerm(pSyncNode->pLogStore); ret = syncNodeRequestVote(pSyncNode, &pSyncNode->peersId[i], pMsg); - assert(ret == 0); + ASSERT(ret == 0); syncRequestVoteDestroy(pMsg); } return ret; @@ -75,7 +75,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { if (pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER) { syncNodeFollower2Candidate(pSyncNode); } - assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); // start election raftStoreNextTerm(pSyncNode->pRaftStore); @@ -86,7 +86,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { syncNodeVoteForSelf(pSyncNode); if (voteGrantedMajority(pSyncNode->pVotesGranted)) { // only myself, to leader - assert(!pSyncNode->pVotesGranted->toLeader); + ASSERT(!pSyncNode->pVotesGranted->toLeader); syncNodeCandidate2Leader(pSyncNode); pSyncNode->pVotesGranted->toLeader = true; return ret; @@ -98,7 +98,7 @@ int32_t syncNodeElect(SSyncNode* pSyncNode) { ret = syncNodeRequestVotePeers(pSyncNode); } - assert(ret == 0); + ASSERT(ret == 0); syncNodeResetElectTimer(pSyncNode); return ret; diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index c92af0dd42..fb0bfb8bef 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -14,7 +14,7 @@ */ #include "syncEnv.h" -// #include +// #include SSyncEnv *gSyncEnv = NULL; @@ -40,7 +40,7 @@ int32_t syncEnvStart() { taosSeedRand(seed); // gSyncEnv = doSyncEnvStart(gSyncEnv); gSyncEnv = doSyncEnvStart(); - assert(gSyncEnv != NULL); + ASSERT(gSyncEnv != NULL); sTrace("sync env start ok"); return ret; } @@ -86,7 +86,7 @@ static void syncEnvTick(void *param, void *tmrId) { static SSyncEnv *doSyncEnvStart() { SSyncEnv *pSyncEnv = (SSyncEnv *)taosMemoryMalloc(sizeof(SSyncEnv)); - assert(pSyncEnv != NULL); + ASSERT(pSyncEnv != NULL); memset(pSyncEnv, 0, sizeof(SSyncEnv)); pSyncEnv->envTickTimerCounter = 0; @@ -103,7 +103,7 @@ static SSyncEnv *doSyncEnvStart() { } static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { - assert(pSyncEnv == gSyncEnv); + ASSERT(pSyncEnv == gSyncEnv); if (pSyncEnv != NULL) { atomic_store_8(&(pSyncEnv->isStart), 0); taosTmrCleanUp(pSyncEnv->pTimerManager); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 0b5a9685c0..038c36c417 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -30,7 +30,7 @@ static int32_t syncIODestroy(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io); -static void * syncIOConsumerFunc(void *param); +static void *syncIOConsumerFunc(void *param); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); @@ -47,11 +47,11 @@ static void syncIOTickPing(void *param, void *tmrId); int32_t syncIOStart(char *host, uint16_t port) { int32_t ret = 0; gSyncIO = syncIOCreate(host, port); - assert(gSyncIO != NULL); + ASSERT(gSyncIO != NULL); taosSeedRand(taosGetTimestampSec()); ret = syncIOStartInternal(gSyncIO); - assert(ret == 0); + ASSERT(ret == 0); sTrace("syncIOStart ok, gSyncIO:%p", gSyncIO); return ret; @@ -59,16 +59,16 @@ int32_t syncIOStart(char *host, uint16_t port) { int32_t syncIOStop() { int32_t ret = syncIOStopInternal(gSyncIO); - assert(ret == 0); + ASSERT(ret == 0); ret = syncIODestroy(gSyncIO); - assert(ret == 0); + ASSERT(ret == 0); return ret; } int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { - assert(pEpSet->inUse == 0); - assert(pEpSet->numOfEps == 1); + ASSERT(pEpSet->inUse == 0); + ASSERT(pEpSet->numOfEps == 1); int32_t ret = 0; { @@ -107,25 +107,25 @@ int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { int32_t syncIOQTimerStart() { int32_t ret = syncIOStartQ(gSyncIO); - assert(ret == 0); + ASSERT(ret == 0); return ret; } int32_t syncIOQTimerStop() { int32_t ret = syncIOStopQ(gSyncIO); - assert(ret == 0); + ASSERT(ret == 0); return ret; } int32_t syncIOPingTimerStart() { int32_t ret = syncIOStartPing(gSyncIO); - assert(ret == 0); + ASSERT(ret == 0); return ret; } int32_t syncIOPingTimerStop() { int32_t ret = syncIOStopPing(gSyncIO); - assert(ret == 0); + ASSERT(ret == 0); return ret; } @@ -151,7 +151,7 @@ static SSyncIO *syncIOCreate(char *host, uint16_t port) { static int32_t syncIODestroy(SSyncIO *io) { int32_t ret = 0; int8_t start = atomic_load_8(&io->isStart); - assert(start == 0); + ASSERT(start == 0); if (io->serverRpc != NULL) { rpcClose(io->serverRpc); @@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO * io = param; + SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; qall = taosAllocateQall(); while (1) { @@ -264,7 +264,7 @@ static void *syncIOConsumerFunc(void *param) { if (pRpcMsg->msgType == TDMT_SYNC_PING) { if (io->FpOnSyncPing != NULL) { SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + ASSERT(pSyncMsg != NULL); io->FpOnSyncPing(io->pSyncNode, pSyncMsg); syncPingDestroy(pSyncMsg); } @@ -272,7 +272,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) { if (io->FpOnSyncPingReply != NULL) { SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + ASSERT(pSyncMsg != NULL); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); syncPingReplyDestroy(pSyncMsg); } @@ -280,7 +280,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { if (io->FpOnSyncClientRequest != NULL) { SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + ASSERT(pSyncMsg != NULL); io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg); syncClientRequestDestroy(pSyncMsg); } @@ -288,7 +288,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { if (io->FpOnSyncRequestVote != NULL) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + ASSERT(pSyncMsg != NULL); io->FpOnSyncRequestVote(io->pSyncNode, pSyncMsg); syncRequestVoteDestroy(pSyncMsg); } @@ -296,7 +296,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { if (io->FpOnSyncRequestVoteReply != NULL) { SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + ASSERT(pSyncMsg != NULL); io->FpOnSyncRequestVoteReply(io->pSyncNode, pSyncMsg); syncRequestVoteReplyDestroy(pSyncMsg); } @@ -304,7 +304,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { if (io->FpOnSyncAppendEntries != NULL) { SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + ASSERT(pSyncMsg != NULL); io->FpOnSyncAppendEntries(io->pSyncNode, pSyncMsg); syncAppendEntriesDestroy(pSyncMsg); } @@ -312,7 +312,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { if (io->FpOnSyncAppendEntriesReply != NULL) { SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + ASSERT(pSyncMsg != NULL); io->FpOnSyncAppendEntriesReply(io->pSyncNode, pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg); } @@ -320,7 +320,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) { if (io->FpOnSyncTimeout != NULL) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + ASSERT(pSyncMsg != NULL); io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg); syncTimeoutDestroy(pSyncMsg); } @@ -328,7 +328,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { if (io->FpOnSyncSnapshotSend != NULL) { SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + ASSERT(pSyncMsg != NULL); io->FpOnSyncSnapshotSend(io->pSyncNode, pSyncMsg); syncSnapshotSendDestroy(pSyncMsg); } @@ -336,7 +336,7 @@ static void *syncIOConsumerFunc(void *param) { } else if (pRpcMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { if (io->FpOnSyncSnapshotRsp != NULL) { SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pRpcMsg); - assert(pSyncMsg != NULL); + ASSERT(pSyncMsg != NULL); io->FpOnSyncSnapshotRsp(io->pSyncNode, pSyncMsg); syncSnapshotRspDestroy(pSyncMsg); } diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c index 2827fcc12f..5b432aeec4 100644 --- a/source/libs/sync/src/syncIndexMgr.c +++ b/source/libs/sync/src/syncIndexMgr.c @@ -20,7 +20,7 @@ SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) { SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr)); - assert(pSyncIndexMgr != NULL); + ASSERT(pSyncIndexMgr != NULL); memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr)); pSyncIndexMgr->replicas = &(pSyncNode->replicasId); @@ -63,7 +63,7 @@ void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, } // maybe config change - // assert(0); + // ASSERT(0); char host[128]; uint16_t port; @@ -169,7 +169,7 @@ void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, S } // maybe config change - // assert(0); + // ASSERT(0); char host[128]; uint16_t port; syncUtilU642Addr(pRaftId->addr, host, sizeof(host), &port); @@ -183,5 +183,5 @@ SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftI return term; } } - assert(0); + ASSERT(0); } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b520629e15..e573c0a1af 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -75,7 +75,7 @@ int32_t syncInit() { void syncCleanUp() { int32_t ret = syncEnvStop(); - assert(ret == 0); + ASSERT(ret == 0); if (tsNodeRefId != -1) { taosCloseRef(tsNodeRefId); @@ -85,7 +85,7 @@ void syncCleanUp() { int64_t syncOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); - assert(pSyncNode != NULL); + ASSERT(pSyncNode != NULL); if (gRaftDetailLog) { syncNodeLog2("syncNodeOpen open success", pSyncNode); @@ -318,7 +318,7 @@ bool syncCanLeaderTransfer(int64_t rid) { if (pSyncNode == NULL) { return false; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); if (pSyncNode->replicaNum == 1) { taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -355,7 +355,7 @@ ESyncState syncGetMyRole(int64_t rid) { if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); ESyncState state = pSyncNode->state; taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -367,7 +367,7 @@ bool syncIsReady(int64_t rid) { if (pSyncNode == NULL) { return false; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && pSyncNode->restoreFinish; taosReleaseRef(tsNodeRefId, pSyncNode->rid); return b; @@ -378,7 +378,7 @@ bool syncIsRestoreFinish(int64_t rid) { if (pSyncNode == NULL) { return false; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); bool b = pSyncNode->restoreFinish; taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -390,7 +390,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { if (pSyncNode == NULL) { return -1; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex; sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex); @@ -404,7 +404,7 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct if (pSyncNode == NULL) { return -1; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); ASSERT(pSyncNode->pRaftCfg->configIndexCount >= 1); SyncIndex lastIndex = (pSyncNode->pRaftCfg->configIndexArr)[0]; @@ -448,7 +448,7 @@ SyncTerm syncGetMyTerm(int64_t rid) { if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); SyncTerm term = pSyncNode->pRaftStore->currentTerm; taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -460,7 +460,7 @@ SyncGroupId syncGetVgId(int64_t rid) { if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); SyncGroupId vgId = pSyncNode->vgId; taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -473,7 +473,7 @@ void syncGetEpSet(int64_t rid, SEpSet* pEpSet) { memset(pEpSet, 0, sizeof(*pEpSet)); return; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); pEpSet->numOfEps = 0; for (int i = 0; i < pSyncNode->pRaftCfg->cfg.replicaNum; ++i) { snprintf(pEpSet->eps[i].fqdn, sizeof(pEpSet->eps[i].fqdn), "%s", (pSyncNode->pRaftCfg->cfg.nodeInfo)[i].nodeFqdn); @@ -494,7 +494,7 @@ int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) { if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); SRespStub stub; int32_t ret = syncRespMgrGet(pSyncNode->pSyncRespMgr, index, &stub); @@ -511,7 +511,7 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcHandleInfo* pInfo) if (pSyncNode == NULL) { return TAOS_SYNC_STATE_ERROR; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); SRespStub stub; int32_t ret = syncRespMgrGetAndDel(pSyncNode->pSyncRespMgr, index, &stub); @@ -530,7 +530,7 @@ void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb) { sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid); return; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); pSyncNode->msgcb = msgcb; taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -542,7 +542,7 @@ char* sync2SimpleStr(int64_t rid) { sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid); return NULL; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); char* s = syncNode2SimpleStr(pSyncNode); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -554,7 +554,7 @@ void setPingTimerMS(int64_t rid, int32_t pingTimerMS) { if (pSyncNode == NULL) { return; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); pSyncNode->pingBaseLine = pingTimerMS; pSyncNode->pingTimerMS = pingTimerMS; @@ -566,7 +566,7 @@ void setElectTimerMS(int64_t rid, int32_t electTimerMS) { if (pSyncNode == NULL) { return; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); pSyncNode->electBaseLine = electTimerMS; taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -577,7 +577,7 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) { if (pSyncNode == NULL) { return; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); pSyncNode->hbBaseLine = hbTimerMS; pSyncNode->heartbeatTimerMS = hbTimerMS; @@ -592,7 +592,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } - assert(rid == pSyncNode->rid); + ASSERT(rid == pSyncNode->rid); ret = syncNodePropose(pSyncNode, pMsg, isWeak); taosReleaseRef(tsNodeRefId, pSyncNode->rid); @@ -662,7 +662,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); - assert(pSyncNode != NULL); + ASSERT(pSyncNode != NULL); memset(pSyncNode, 0, sizeof(SSyncNode)); int32_t ret = 0; @@ -682,12 +682,12 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { meta.snapshotEnable = pSyncInfo->snapshotEnable; meta.lastConfigIndex = SYNC_INDEX_INVALID; ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); - assert(ret == 0); + ASSERT(ret == 0); } else { // update syncCfg by raft_config.json pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); - assert(pSyncNode->pRaftCfg != NULL); + ASSERT(pSyncNode->pRaftCfg != NULL); pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; if (gRaftDetailLog) { @@ -712,7 +712,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init raft config pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); - assert(pSyncNode->pRaftCfg != NULL); + ASSERT(pSyncNode->pRaftCfg != NULL); // init internal pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex]; @@ -771,23 +771,23 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); - assert(pSyncNode->pRaftStore != NULL); + ASSERT(pSyncNode->pRaftStore != NULL); // init TLA+ candidate vars pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode); - assert(pSyncNode->pVotesGranted != NULL); + ASSERT(pSyncNode->pVotesGranted != NULL); pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode); - assert(pSyncNode->pVotesRespond != NULL); + ASSERT(pSyncNode->pVotesRespond != NULL); // init TLA+ leader vars pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode); - assert(pSyncNode->pNextIndex != NULL); + ASSERT(pSyncNode->pNextIndex != NULL); pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode); - assert(pSyncNode->pMatchIndex != NULL); + ASSERT(pSyncNode->pMatchIndex != NULL); // init TLA+ log vars pSyncNode->pLogStore = logStoreCreate(pSyncNode); - assert(pSyncNode->pLogStore != NULL); + ASSERT(pSyncNode->pLogStore != NULL); pSyncNode->commitIndex = SYNC_INDEX_INVALID; // timer ms init @@ -845,7 +845,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // tools pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, 0); - assert(pSyncNode->pSyncRespMgr != NULL); + ASSERT(pSyncNode->pSyncRespMgr != NULL); // restore state pSyncNode->restoreFinish = false; @@ -893,7 +893,7 @@ void syncNodeStart(SSyncNode* pSyncNode) { // int32_t ret = 0; // ret = syncNodeStartPingTimer(pSyncNode); - // assert(ret == 0); + // ASSERT(ret == 0); if (gRaftDetailLog) { syncNodeLog2("==state change become leader immediately==", pSyncNode); @@ -915,10 +915,10 @@ void syncNodeClose(SSyncNode* pSyncNode) { syncNodeEventLog(pSyncNode, "sync close"); int32_t ret; - assert(pSyncNode != NULL); + ASSERT(pSyncNode != NULL); ret = raftStoreClose(pSyncNode->pRaftStore); - assert(ret == 0); + ASSERT(ret == 0); syncRespMgrDestroy(pSyncNode->pSyncRespMgr); voteGrantedDestroy(pSyncNode->pVotesGranted); @@ -980,7 +980,7 @@ int32_t syncNodePingSelf(SSyncNode* pSyncNode) { int32_t ret = 0; SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, &pSyncNode->myRaftId, pSyncNode->vgId); ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); - assert(ret == 0); + ASSERT(ret == 0); syncPingDestroy(pMsg); return ret; @@ -992,7 +992,7 @@ int32_t syncNodePingPeers(SSyncNode* pSyncNode) { SRaftId* destId = &(pSyncNode->peersId[i]); SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId); ret = syncNodePing(pSyncNode, destId, pMsg); - assert(ret == 0); + ASSERT(ret == 0); syncPingDestroy(pMsg); } return ret; @@ -1004,7 +1004,7 @@ int32_t syncNodePingAll(SSyncNode* pSyncNode) { SRaftId* destId = &(pSyncNode->replicasId[i]); SyncPing* pMsg = syncPingBuild3(&pSyncNode->myRaftId, destId, pSyncNode->vgId); ret = syncNodePing(pSyncNode, destId, pMsg); - assert(ret == 0); + ASSERT(ret == 0); syncPingDestroy(pMsg); } return ret; @@ -1337,6 +1337,44 @@ void syncNodeEventLog(const SSyncNode* pSyncNode, char* str) { } } +void syncNodeErrorLog(const SSyncNode* pSyncNode, char* str) { + int32_t userStrLen = strlen(str); + + SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; + if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { + pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); + } + SyncIndex logLastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); + SyncIndex logBeginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); + + if (userStrLen < 256) { + char logBuf[128 + 256]; + snprintf(logBuf, sizeof(logBuf), + "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " + "replica-num:%d, " + "lconfig:%ld, changing:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, + pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, + pSyncNode->changing); + sError("%s", logBuf); + + } else { + int len = 128 + userStrLen; + char* s = (char*)taosMemoryMalloc(len); + snprintf(s, len, + "vgId:%d, sync %s %s, term:%lu, commit:%ld, beginlog:%ld, lastlog:%ld, lastsnapshot:%ld, standby:%d, " + "replica-num:%d, " + "lconfig:%ld, changing:%d", + pSyncNode->vgId, syncUtilState2String(pSyncNode->state), str, pSyncNode->pRaftStore->currentTerm, + pSyncNode->commitIndex, logBeginIndex, logLastIndex, snapshot.lastApplyIndex, + pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, pSyncNode->pRaftCfg->lastConfigIndex, + pSyncNode->changing); + sError("%s", s); + taosMemoryFree(s); + } +} + char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { int len = 256; char* s = (char*)taosMemoryMalloc(len); @@ -1702,8 +1740,8 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { } void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); - assert(voteGrantedMajority(pSyncNode->pVotesGranted)); + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + ASSERT(voteGrantedMajority(pSyncNode->pVotesGranted)); syncNodeBecomeLeader(pSyncNode, "candidate to leader"); syncNodeLog2("==state change syncNodeCandidate2Leader==", pSyncNode); @@ -1715,21 +1753,21 @@ void syncNodeCandidate2Leader(SSyncNode* pSyncNode) { } void syncNodeFollower2Candidate(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_FOLLOWER); pSyncNode->state = TAOS_SYNC_STATE_CANDIDATE; syncNodeLog2("==state change syncNodeFollower2Candidate==", pSyncNode); } void syncNodeLeader2Follower(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); syncNodeBecomeFollower(pSyncNode, "leader to follower"); syncNodeLog2("==state change syncNodeLeader2Follower==", pSyncNode); } void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_CANDIDATE); syncNodeBecomeFollower(pSyncNode, "candidate to follower"); syncNodeLog2("==state change syncNodeCandidate2Follower==", pSyncNode); @@ -1740,8 +1778,8 @@ void syncNodeCandidate2Follower(SSyncNode* pSyncNode) { // just called by syncNodeVoteForSelf // need assert void syncNodeVoteForTerm(SSyncNode* pSyncNode, SyncTerm term, SRaftId* pRaftId) { - assert(term == pSyncNode->pRaftStore->currentTerm); - assert(!raftStoreHasVoted(pSyncNode->pRaftStore)); + ASSERT(term == pSyncNode->pRaftStore->currentTerm); + ASSERT(!raftStoreHasVoted(pSyncNode->pRaftStore)); raftStoreVote(pSyncNode->pRaftStore, pRaftId); } @@ -2062,17 +2100,17 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { static int32_t syncNodeEqNoop(SSyncNode* ths) { int32_t ret = 0; - assert(ths->state == TAOS_SYNC_STATE_LEADER); + ASSERT(ths->state == TAOS_SYNC_STATE_LEADER); SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; SyncTerm term = ths->pRaftStore->currentTerm; SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); - assert(pEntry != NULL); + ASSERT(pEntry != NULL); uint32_t entryLen; char* serialized = syncEntrySerialize(pEntry, &entryLen); SyncClientRequest* pSyncMsg = syncClientRequestBuild(entryLen); - assert(pSyncMsg->dataLen == entryLen); + ASSERT(pSyncMsg->dataLen == entryLen); memcpy(pSyncMsg->data, serialized, entryLen); SRpcMsg rpcMsg = {0}; @@ -2095,7 +2133,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { SyncIndex index = ths->pLogStore->getLastIndex(ths->pLogStore) + 1; SyncTerm term = ths->pRaftStore->currentTerm; SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); - assert(pEntry != NULL); + ASSERT(pEntry != NULL); if (ths->state == TAOS_SYNC_STATE_LEADER) { // ths->pLogStore->appendEntry(ths->pLogStore, pEntry); @@ -2158,7 +2196,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); SyncTerm term = ths->pRaftStore->currentTerm; SSyncRaftEntry* pEntry = syncEntryBuild2((SyncClientRequest*)pMsg, term, index); - assert(pEntry != NULL); + ASSERT(pEntry != NULL); if (ths->state == TAOS_SYNC_STATE_LEADER) { // ths->pLogStore->appendEntry(ths->pLogStore, pEntry); diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 454609009c..119a178893 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -186,18 +186,18 @@ void syncTimeoutDestroy(SyncTimeout* pMsg) { } void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); } char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncTimeoutSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -208,9 +208,9 @@ char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len) { SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncTimeout* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncTimeoutDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -228,7 +228,7 @@ void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) { SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncTimeout* pMsg = syncTimeoutDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -322,19 +322,19 @@ void syncPingDestroy(SyncPing* pMsg) { } void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); - assert(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); + ASSERT(len == pMsg->bytes); + ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); } char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncPingSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -345,9 +345,9 @@ char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) { SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncPing* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncPingDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -406,7 +406,7 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { } pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); pMsg->bytes = bytes; if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { @@ -435,7 +435,7 @@ SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) { if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { return NULL; } - assert(len = pMsg->dataLen); + ASSERT(len = pMsg->dataLen); memcpy(pMsg->data, data, len); tEndDecode(&decoder); @@ -457,7 +457,7 @@ void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -584,19 +584,19 @@ void syncPingReplyDestroy(SyncPingReply* pMsg) { } void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); - assert(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); + ASSERT(len == pMsg->bytes); + ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen); } char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncPingReplySerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -607,9 +607,9 @@ char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len) { SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncPingReply* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncPingReplyDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -668,7 +668,7 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { } pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); pMsg->bytes = bytes; if (tDecodeI32(&decoder, &pMsg->vgId) < 0) { @@ -697,7 +697,7 @@ SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen) { if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) { return NULL; } - assert(len = pMsg->dataLen); + ASSERT(len = pMsg->dataLen); memcpy(pMsg->data, data, len); tEndDecode(&decoder); @@ -719,7 +719,7 @@ void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) { SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncPingReply* pMsg = syncPingReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -844,18 +844,18 @@ void syncClientRequestDestroy(SyncClientRequest* pMsg) { } void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); } char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncClientRequestSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -866,9 +866,9 @@ char* syncClientRequestSerialize2(const SyncClientRequest* pMsg, uint32_t* len) SyncClientRequest* syncClientRequestDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncClientRequest* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncClientRequestDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -888,7 +888,7 @@ void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg // step 3. RpcMsg => SyncClientRequest, from queue SyncClientRequest* syncClientRequestFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncClientRequest* pMsg = syncClientRequestDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -974,18 +974,18 @@ void syncRequestVoteDestroy(SyncRequestVote* pMsg) { } void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); } char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncRequestVoteSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -996,9 +996,9 @@ char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len) { SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncRequestVote* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncRequestVoteDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -1016,7 +1016,7 @@ void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg) { SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncRequestVote* pMsg = syncRequestVoteDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -1125,18 +1125,18 @@ void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) { } void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); } char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncRequestVoteReplySerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -1147,9 +1147,9 @@ char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncRequestVoteReply* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncRequestVoteReplyDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -1167,7 +1167,7 @@ void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncRequestVoteReply* pMsg = syncRequestVoteReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -1274,19 +1274,19 @@ void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) { } void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); - assert(pMsg->bytes == sizeof(SyncAppendEntries) + pMsg->dataLen); + ASSERT(len == pMsg->bytes); + ASSERT(pMsg->bytes == sizeof(SyncAppendEntries) + pMsg->dataLen); } char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncAppendEntriesSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -1297,9 +1297,9 @@ char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len) SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncAppendEntries* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncAppendEntriesDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -1317,7 +1317,7 @@ void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncAppendEntries* pMsg = syncAppendEntriesDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -1444,18 +1444,18 @@ void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) { } void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); } char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncAppendEntriesReplySerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -1466,9 +1466,9 @@ char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint3 SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncAppendEntriesReply* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncAppendEntriesReplyDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -1486,7 +1486,7 @@ void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesR SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncAppendEntriesReply* pMsg = syncAppendEntriesReplyDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -1607,18 +1607,18 @@ void syncApplyMsgDestroy(SyncApplyMsg* pMsg) { } void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); } char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncApplyMsgSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -1629,9 +1629,9 @@ char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len) { SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncApplyMsg* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncApplyMsgDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -1752,19 +1752,19 @@ void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg) { } void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); - assert(pMsg->bytes == sizeof(SyncSnapshotSend) + pMsg->dataLen); + ASSERT(len == pMsg->bytes); + ASSERT(pMsg->bytes == sizeof(SyncSnapshotSend) + pMsg->dataLen); } char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncSnapshotSendSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -1775,9 +1775,9 @@ char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len) { SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncSnapshotSend* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncSnapshotSendDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -1795,7 +1795,7 @@ void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg) SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncSnapshotSend* pMsg = syncSnapshotSendDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -1925,18 +1925,18 @@ void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg) { } void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); } char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncSnapshotRspSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -1947,9 +1947,9 @@ char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len) { SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncSnapshotRsp* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncSnapshotRspDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -1967,7 +1967,7 @@ void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg) { SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncSnapshotRsp* pMsg = syncSnapshotRspDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -2085,18 +2085,18 @@ void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg) { } void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); } char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncLeaderTransferSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -2107,9 +2107,9 @@ char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncLeaderTransfer* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncLeaderTransferDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -2127,7 +2127,7 @@ void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pM SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncLeaderTransfer* pMsg = syncLeaderTransferDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } @@ -2247,18 +2247,18 @@ void syncReconfigFinishDestroy(SyncReconfigFinish* pMsg) { } void syncReconfigFinishSerialize(const SyncReconfigFinish* pMsg, char* buf, uint32_t bufLen) { - assert(pMsg->bytes <= bufLen); + ASSERT(pMsg->bytes <= bufLen); memcpy(buf, pMsg, pMsg->bytes); } void syncReconfigFinishDeserialize(const char* buf, uint32_t len, SyncReconfigFinish* pMsg) { memcpy(pMsg, buf, len); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); } char* syncReconfigFinishSerialize2(const SyncReconfigFinish* pMsg, uint32_t* len) { char* buf = taosMemoryMalloc(pMsg->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); syncReconfigFinishSerialize(pMsg, buf, pMsg->bytes); if (len != NULL) { *len = pMsg->bytes; @@ -2269,9 +2269,9 @@ char* syncReconfigFinishSerialize2(const SyncReconfigFinish* pMsg, uint32_t* len SyncReconfigFinish* syncReconfigFinishDeserialize2(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SyncReconfigFinish* pMsg = taosMemoryMalloc(bytes); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); syncReconfigFinishDeserialize(buf, len, pMsg); - assert(len == pMsg->bytes); + ASSERT(len == pMsg->bytes); return pMsg; } @@ -2289,7 +2289,7 @@ void syncReconfigFinishFromRpcMsg(const SRpcMsg* pRpcMsg, SyncReconfigFinish* pM SyncReconfigFinish* syncReconfigFinishFromRpcMsg2(const SRpcMsg* pRpcMsg) { SyncReconfigFinish* pMsg = syncReconfigFinishDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); return pMsg; } diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 7d020f6892..2cc1eb0239 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -24,29 +24,29 @@ SRaftCfg *raftCfgOpen(const char *path) { snprintf(pCfg->path, sizeof(pCfg->path), "%s", path); pCfg->pFile = taosOpenFile(pCfg->path, TD_FILE_READ | TD_FILE_WRITE); - assert(pCfg->pFile != NULL); + ASSERT(pCfg->pFile != NULL); taosLSeekFile(pCfg->pFile, 0, SEEK_SET); char buf[1024] = {0}; int len = taosReadFile(pCfg->pFile, buf, sizeof(buf)); - assert(len > 0); + ASSERT(len > 0); int32_t ret = raftCfgFromStr(buf, pCfg); - assert(ret == 0); + ASSERT(ret == 0); return pCfg; } int32_t raftCfgClose(SRaftCfg *pRaftCfg) { int64_t ret = taosCloseFile(&(pRaftCfg->pFile)); - assert(ret == 0); + ASSERT(ret == 0); taosMemoryFree(pRaftCfg); return 0; } int32_t raftCfgPersist(SRaftCfg *pRaftCfg) { - assert(pRaftCfg != NULL); + ASSERT(pRaftCfg != NULL); char *s = raftCfg2Str(pRaftCfg); taosLSeekFile(pRaftCfg->pFile, 0, SEEK_SET); @@ -61,10 +61,10 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) { snprintf(buf, sizeof(buf), "%s", s); int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf)); - assert(ret == sizeof(buf)); + ASSERT(ret == sizeof(buf)); // int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1); - // assert(ret == strlen(s) + 1); + // ASSERT(ret == strlen(s) + 1); taosMemoryFree(s); taosFsyncFile(pRaftCfg->pFile); @@ -135,27 +135,27 @@ int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) { const cJSON *pJson = pRoot; cJSON *pReplicaNum = cJSON_GetObjectItem(pJson, "replicaNum"); - assert(cJSON_IsNumber(pReplicaNum)); + ASSERT(cJSON_IsNumber(pReplicaNum)); pSyncCfg->replicaNum = cJSON_GetNumberValue(pReplicaNum); cJSON *pMyIndex = cJSON_GetObjectItem(pJson, "myIndex"); - assert(cJSON_IsNumber(pMyIndex)); + ASSERT(cJSON_IsNumber(pMyIndex)); pSyncCfg->myIndex = cJSON_GetNumberValue(pMyIndex); cJSON *pNodeInfoArr = cJSON_GetObjectItem(pJson, "nodeInfo"); int arraySize = cJSON_GetArraySize(pNodeInfoArr); - assert(arraySize == pSyncCfg->replicaNum); + ASSERT(arraySize == pSyncCfg->replicaNum); for (int i = 0; i < arraySize; ++i) { cJSON *pNodeInfo = cJSON_GetArrayItem(pNodeInfoArr, i); - assert(pNodeInfo != NULL); + ASSERT(pNodeInfo != NULL); cJSON *pNodePort = cJSON_GetObjectItem(pNodeInfo, "nodePort"); - assert(cJSON_IsNumber(pNodePort)); + ASSERT(cJSON_IsNumber(pNodePort)); ((pSyncCfg->nodeInfo)[i]).nodePort = cJSON_GetNumberValue(pNodePort); cJSON *pNodeFqdn = cJSON_GetObjectItem(pNodeInfo, "nodeFqdn"); - assert(cJSON_IsString(pNodeFqdn)); + ASSERT(cJSON_IsString(pNodeFqdn)); snprintf(((pSyncCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pSyncCfg->nodeInfo)[i]).nodeFqdn), "%s", pNodeFqdn->valuestring); } @@ -165,10 +165,10 @@ int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg) { int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg) { cJSON *pRoot = cJSON_Parse(s); - assert(pRoot != NULL); + ASSERT(pRoot != NULL); int32_t ret = syncCfgFromJson(pRoot, pSyncCfg); - assert(ret == 0); + ASSERT(ret == 0); cJSON_Delete(pRoot); return 0; @@ -207,10 +207,10 @@ char *raftCfg2Str(SRaftCfg *pRaftCfg) { } int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { - assert(pCfg != NULL); + ASSERT(pCfg != NULL); TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE); - assert(pFile != NULL); + ASSERT(pFile != NULL); SRaftCfg raftCfg; raftCfg.cfg = *pCfg; @@ -227,10 +227,10 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) { ASSERT(strlen(s) + 1 <= CONFIG_FILE_LEN); snprintf(buf, sizeof(buf), "%s", s); int64_t ret = taosWriteFile(pFile, buf, sizeof(buf)); - assert(ret == sizeof(buf)); + ASSERT(ret == sizeof(buf)); // int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1); - // assert(ret == strlen(s) + 1); + // ASSERT(ret == strlen(s) + 1); taosMemoryFree(s); taosCloseFile(&pFile); @@ -255,15 +255,15 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr"); int arraySize = cJSON_GetArraySize(pIndexArr); - assert(arraySize == pRaftCfg->configIndexCount); + ASSERT(arraySize == pRaftCfg->configIndexCount); memset(pRaftCfg->configIndexArr, 0, sizeof(pRaftCfg->configIndexArr)); for (int i = 0; i < arraySize; ++i) { cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i); - assert(pIndexObj != NULL); + ASSERT(pIndexObj != NULL); cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index"); - assert(cJSON_IsString(pIndex)); + ASSERT(cJSON_IsString(pIndex)); (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } @@ -276,10 +276,10 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg) { cJSON *pRoot = cJSON_Parse(s); - assert(pRoot != NULL); + ASSERT(pRoot != NULL); int32_t ret = raftCfgFromJson(pRoot, pRaftCfg); - assert(ret == 0); + ASSERT(ret == 0); cJSON_Delete(pRoot); return 0; diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 05a2dbaa3f..ff334a76bb 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -19,7 +19,7 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { uint32_t bytes = sizeof(SSyncRaftEntry) + dataLen; SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes); - assert(pEntry != NULL); + ASSERT(pEntry != NULL); memset(pEntry, 0, bytes); pEntry->bytes = bytes; pEntry->dataLen = dataLen; @@ -29,14 +29,14 @@ SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { // step 4. SyncClientRequest => SSyncRaftEntry, add term, index SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { SSyncRaftEntry* pEntry = syncEntryBuild3(pMsg, term, index); - assert(pEntry != NULL); + ASSERT(pEntry != NULL); return pEntry; } SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen); - assert(pEntry != NULL); + ASSERT(pEntry != NULL); pEntry->msgType = pMsg->msgType; pEntry->originalRpcType = pMsg->originalRpcType; @@ -63,7 +63,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) memcpy(rpcMsg.pCont, &head, sizeof(head)); SSyncRaftEntry* pEntry = syncEntryBuild(rpcMsg.contLen); - assert(pEntry != NULL); + ASSERT(pEntry != NULL); pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST; pEntry->originalRpcType = TDMT_SYNC_NOOP; @@ -72,7 +72,7 @@ SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) pEntry->term = term; pEntry->index = index; - assert(pEntry->dataLen == rpcMsg.contLen); + ASSERT(pEntry->dataLen == rpcMsg.contLen); memcpy(pEntry->data, rpcMsg.pCont, rpcMsg.contLen); rpcFreeCont(rpcMsg.pCont); @@ -88,7 +88,7 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) { // step 5. SSyncRaftEntry => bin, to raft log char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { char* buf = taosMemoryMalloc(pEntry->bytes); - assert(buf != NULL); + ASSERT(buf != NULL); memcpy(buf, pEntry, pEntry->bytes); if (len != NULL) { *len = pEntry->bytes; @@ -100,9 +100,9 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) { uint32_t bytes = *((uint32_t*)buf); SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes); - assert(pEntry != NULL); + ASSERT(pEntry != NULL); memcpy(pEntry, buf, len); - assert(len == pEntry->bytes); + ASSERT(len == pEntry->bytes); return pEntry; } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 79d9b329c1..a169ea0221 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -245,10 +245,10 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp //------------------------------- SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); - assert(pLogStore != NULL); + ASSERT(pLogStore != NULL); pLogStore->data = taosMemoryMalloc(sizeof(SSyncLogStoreData)); - assert(pLogStore->data != NULL); + ASSERT(pLogStore->data != NULL); SSyncLogStoreData* pData = pLogStore->data; pData->pSyncNode = pSyncNode; @@ -301,7 +301,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SWal* pWal = pData->pWal; SyncIndex lastIndex = logStoreLastIndex(pLogStore); - assert(pEntry->index == lastIndex + 1); + ASSERT(pEntry->index == lastIndex + 1); int code = 0; SSyncLogMeta syncMeta; @@ -347,10 +347,10 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { linuxErrMsg); ASSERT(0); } - // assert(walReadWithHandle(pWalHandle, index) == 0); + // ASSERT(walReadWithHandle(pWalHandle, index) == 0); SSyncRaftEntry* pEntry = syncEntryBuild(pWalHandle->pHead->head.bodyLen); - assert(pEntry != NULL); + ASSERT(pEntry != NULL); pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST; pEntry->originalRpcType = pWalHandle->pHead->head.msgType; @@ -358,7 +358,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { pEntry->isWeak = pWalHandle->pHead->head.syncMeta.isWeek; pEntry->term = pWalHandle->pHead->head.syncMeta.term; pEntry->index = index; - assert(pEntry->dataLen == pWalHandle->pHead->head.bodyLen); + ASSERT(pEntry->dataLen == pWalHandle->pHead->head.bodyLen); memcpy(pEntry->data, pWalHandle->pHead->head.body, pWalHandle->pHead->head.bodyLen); // need to hold, do not new every time!! @@ -373,7 +373,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - // assert(walRollback(pWal, fromIndex) == 0); + // ASSERT(walRollback(pWal, fromIndex) == 0); int32_t code = walRollback(pWal, fromIndex); if (code != 0) { int32_t err = terrno; @@ -407,7 +407,7 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - // assert(walCommit(pWal, index) == 0); + // ASSERT(walCommit(pWal, index) == 0); int32_t code = walCommit(pWal, index); if (code != 0) { int32_t err = terrno; diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 52e8152926..a1ab95c00f 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -39,40 +39,40 @@ SRaftStore *raftStoreOpen(const char *path) { if (!raftStoreFileExist(pRaftStore->path)) { ret = raftStoreInit(pRaftStore); - assert(ret == 0); + ASSERT(ret == 0); } pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE); - assert(pRaftStore->pFile != NULL); + ASSERT(pRaftStore->pFile != NULL); int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE); - assert(len == RAFT_STORE_BLOCK_SIZE); + ASSERT(len == RAFT_STORE_BLOCK_SIZE); ret = raftStoreDeserialize(pRaftStore, storeBuf, len); - assert(ret == 0); + ASSERT(ret == 0); return pRaftStore; } static int32_t raftStoreInit(SRaftStore *pRaftStore) { - assert(pRaftStore != NULL); + ASSERT(pRaftStore != NULL); pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CREATE | TD_FILE_WRITE); - assert(pRaftStore->pFile != NULL); + ASSERT(pRaftStore->pFile != NULL); pRaftStore->currentTerm = 0; pRaftStore->voteFor.addr = 0; pRaftStore->voteFor.vgId = 0; int32_t ret = raftStorePersist(pRaftStore); - assert(ret == 0); + ASSERT(ret == 0); taosCloseFile(&pRaftStore->pFile); return 0; } int32_t raftStoreClose(SRaftStore *pRaftStore) { - assert(pRaftStore != NULL); + ASSERT(pRaftStore != NULL); taosCloseFile(&pRaftStore->pFile); taosMemoryFree(pRaftStore); @@ -81,17 +81,17 @@ int32_t raftStoreClose(SRaftStore *pRaftStore) { } int32_t raftStorePersist(SRaftStore *pRaftStore) { - assert(pRaftStore != NULL); + ASSERT(pRaftStore != NULL); int32_t ret; char storeBuf[RAFT_STORE_BLOCK_SIZE] = {0}; ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); - assert(ret == 0); + ASSERT(ret == 0); taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET); ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf)); - assert(ret == RAFT_STORE_BLOCK_SIZE); + ASSERT(ret == RAFT_STORE_BLOCK_SIZE); taosFsyncFile(pRaftStore->pFile); return 0; @@ -103,7 +103,7 @@ static bool raftStoreFileExist(char *path) { } int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { - assert(pRaftStore != NULL); + ASSERT(pRaftStore != NULL); cJSON *pRoot = cJSON_CreateObject(); @@ -125,7 +125,7 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { char *serialized = cJSON_Print(pRoot); int len2 = strlen(serialized); - assert(len2 < len); + ASSERT(len2 < len); memset(buf, 0, len); snprintf(buf, len, "%s", serialized); taosMemoryFree(serialized); @@ -135,17 +135,17 @@ int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { } int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { - assert(pRaftStore != NULL); + ASSERT(pRaftStore != NULL); - assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); + ASSERT(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); cJSON *pRoot = cJSON_Parse(buf); cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); - assert(cJSON_IsString(pCurrentTerm)); + ASSERT(cJSON_IsString(pCurrentTerm)); sscanf(pCurrentTerm->valuestring, "%lu", &(pRaftStore->currentTerm)); cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); - assert(cJSON_IsString(pVoteForAddr)); + ASSERT(cJSON_IsString(pVoteForAddr)); sscanf(pVoteForAddr->valuestring, "%lu", &(pRaftStore->voteFor.addr)); cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); @@ -161,7 +161,7 @@ bool raftStoreHasVoted(SRaftStore *pRaftStore) { } void raftStoreVote(SRaftStore *pRaftStore, SRaftId *pRaftId) { - assert(!syncUtilEmptyId(pRaftId)); + ASSERT(!syncUtilEmptyId(pRaftId)); pRaftStore->voteFor = *pRaftId; raftStorePersist(pRaftStore); } @@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) { char *raftStore2Str(SRaftStore *pRaftStore) { cJSON *pJson = raftStore2Json(pRaftStore); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index f044ae5733..06a74e8eee 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -49,7 +49,7 @@ // /\ UNCHANGED <> // int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { - assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); + ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER); syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pNextIndex", pSyncNode->pNextIndex); syncIndexMgrLog2("==syncNodeAppendEntriesPeers== pMatchIndex", pSyncNode->pMatchIndex); @@ -68,7 +68,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { SyncTerm preLogTerm = 0; if (preLogIndex >= SYNC_INDEX_BEGIN) { SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex); - assert(pPreEntry != NULL); + ASSERT(pPreEntry != NULL); preLogTerm = pPreEntry->term; syncEntryDestory(pPreEntry); @@ -81,12 +81,12 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex); if (pEntry != NULL) { pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); // add pEntry into msg uint32_t len; char* serialized = syncEntrySerialize(pEntry, &len); - assert(len == pEntry->bytes); + ASSERT(len == pEntry->bytes); memcpy(pMsg->data, serialized, len); taosMemoryFree(serialized); @@ -95,10 +95,10 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { } else { // maybe overflow, send empty record pMsg = syncAppendEntriesBuild(0, pSyncNode->vgId); - assert(pMsg != NULL); + ASSERT(pMsg != NULL); } - assert(pMsg != NULL); + ASSERT(pMsg != NULL); pMsg->srcId = pSyncNode->myRaftId; pMsg->destId = *pDestId; pMsg->term = pSyncNode->pRaftStore->currentTerm; @@ -157,7 +157,7 @@ int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) { // add pEntry into msg uint32_t len; char* serialized = syncEntrySerialize(pEntry, &len); - assert(len == pEntry->bytes); + ASSERT(len == pEntry->bytes); memcpy(pMsg->data, serialized, len); taosMemoryFree(serialized); diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 9ed7f00982..def5c321ad 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -52,7 +52,7 @@ int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { if (pMsg->term > ths->pRaftStore->currentTerm) { syncNodeUpdateTerm(ths, pMsg->term); } - assert(pMsg->term <= ths->pRaftStore->currentTerm); + ASSERT(pMsg->term <= ths->pRaftStore->currentTerm); bool logOK = (pMsg->lastLogTerm > ths->pLogStore->getLastTerm(ths->pLogStore)) || ((pMsg->lastLogTerm == ths->pLogStore->getLastTerm(ths->pLogStore)) && diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 5d041cefcd..60963d0dd3 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -50,7 +50,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) return ret; } - // assert(!(pMsg->term > ths->pRaftStore->currentTerm)); + // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); @@ -65,7 +65,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) return ret; } - assert(pMsg->term == ths->pRaftStore->currentTerm); + ASSERT(pMsg->term == ths->pRaftStore->currentTerm); // This tallies votes even when the current state is not Candidate, // but they won't be looked at, so it doesn't matter. @@ -115,7 +115,7 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl return ret; } - // assert(!(pMsg->term > ths->pRaftStore->currentTerm)); + // ASSERT(!(pMsg->term > ths->pRaftStore->currentTerm)); // no need this code, because if I receive reply.term, then I must have sent for that term. // if (pMsg->term > ths->pRaftStore->currentTerm) { // syncNodeUpdateTerm(ths, pMsg->term); diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 354575d29e..48c1b70a04 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -22,7 +22,7 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) { pObj->pRespHash = taosHashInit(sizeof(uint64_t), taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); - assert(pObj->pRespHash != NULL); + ASSERT(pObj->pRespHash != NULL); pObj->ttl = ttl; pObj->data = data; pObj->seqNum = 0; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 1d1ff7ae53..5b73d980c4 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -86,7 +86,7 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn); - assert(ipv4 != 0xFFFFFFFF); + ASSERT(ipv4 != 0xFFFFFFFF); char ipbuf[128] = {0}; tinet_ntoa(ipbuf, ipv4); raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort); @@ -124,7 +124,7 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { int32_t syncUtilRand(int32_t max) { return taosRand() % max; } int32_t syncUtilElectRandomMS(int32_t min, int32_t max) { - assert(min > 0 && max > 0 && max >= min); + ASSERT(min > 0 && max > 0 && max >= min); return min + syncUtilRand(max - min); } @@ -201,7 +201,7 @@ bool syncUtilCanPrint(char c) { char* syncUtilprintBin(char* ptr, uint32_t len) { char* s = taosMemoryMalloc(len + 1); - assert(s != NULL); + ASSERT(s != NULL); memset(s, 0, len + 1); memcpy(s, ptr, len); @@ -216,7 +216,7 @@ char* syncUtilprintBin(char* ptr, uint32_t len) { char* syncUtilprintBin2(char* ptr, uint32_t len) { uint32_t len2 = len * 4 + 1; char* s = taosMemoryMalloc(len2); - assert(s != NULL); + ASSERT(s != NULL); memset(s, 0, len2); char* p = s; diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 528c2f26c8..d6c2cbd34e 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -24,7 +24,7 @@ static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) { SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { SVotesGranted *pVotesGranted = taosMemoryMalloc(sizeof(SVotesGranted)); - assert(pVotesGranted != NULL); + ASSERT(pVotesGranted != NULL); memset(pVotesGranted, 0, sizeof(SVotesGranted)); pVotesGranted->replicas = &(pSyncNode->replicasId); @@ -62,9 +62,9 @@ bool voteGrantedMajority(SVotesGranted *pVotesGranted) { } void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { - assert(pMsg->voteGranted == true); - assert(pMsg->term == pVotesGranted->term); - assert(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)); + ASSERT(pMsg->voteGranted == true); + ASSERT(pMsg->term == pVotesGranted->term); + ASSERT(syncUtilSameId(&pVotesGranted->pSyncNode->myRaftId, &pMsg->destId)); int j = -1; for (int i = 0; i < pVotesGranted->replicaNum; ++i) { @@ -73,14 +73,14 @@ void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { break; } } - assert(j != -1); - assert(j >= 0 && j < pVotesGranted->replicaNum); + ASSERT(j != -1); + ASSERT(j >= 0 && j < pVotesGranted->replicaNum); if (pVotesGranted->isGranted[j] != true) { ++(pVotesGranted->votes); pVotesGranted->isGranted[j] = true; } - assert(pVotesGranted->votes <= pVotesGranted->replicaNum); + ASSERT(pVotesGranted->votes <= pVotesGranted->replicaNum); } void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) { @@ -127,7 +127,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) { char *voteGranted2Str(SVotesGranted *pVotesGranted) { cJSON *pJson = voteGranted2Json(pVotesGranted); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -162,7 +162,7 @@ void voteGrantedLog2(char *s, SVotesGranted *pObj) { // SVotesRespond ----------------------------- SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { SVotesRespond *pVotesRespond = taosMemoryMalloc(sizeof(SVotesRespond)); - assert(pVotesRespond != NULL); + ASSERT(pVotesRespond != NULL); memset(pVotesRespond, 0, sizeof(SVotesRespond)); pVotesRespond->replicas = &(pSyncNode->replicasId); @@ -198,15 +198,15 @@ bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { } void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) { - assert(pVotesRespond->term == pMsg->term); + ASSERT(pVotesRespond->term == pMsg->term); for (int i = 0; i < pVotesRespond->replicaNum; ++i) { if (syncUtilSameId(&((*(pVotesRespond->replicas))[i]), &pMsg->srcId)) { - // assert(pVotesRespond->isRespond[i] == false); + // ASSERT(pVotesRespond->isRespond[i] == false); pVotesRespond->isRespond[i] = true; return; } } - assert(0); + ASSERT(0); } void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term) { @@ -256,7 +256,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) { char *votesRespond2Str(SVotesRespond *pVotesRespond) { cJSON *pJson = votesRespond2Json(pVotesRespond); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; }