From 73bb3cef130fa8e19d5ba07b4987d64173872f1e Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 25 Jul 2024 19:07:25 +0800 Subject: [PATCH] enh: refactor return code --- source/libs/sync/inc/syncReplication.h | 2 +- source/libs/sync/inc/syncSnapshot.h | 22 +- source/libs/sync/src/syncMain.c | 13 +- source/libs/sync/src/syncPipeline.c | 21 +- source/libs/sync/src/syncSnapshot.c | 348 +++++++++--------- .../sync/test/syncSnapshotReceiverTest.cpp | 3 +- .../libs/sync/test/syncSnapshotSenderTest.cpp | 3 +- 7 files changed, 200 insertions(+), 212 deletions(-) diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index ecd2b5163e..36c4599527 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -58,7 +58,7 @@ int32_t syncNodeSendAppendEntries(SSyncNode* pNode, const SRaftId* destRaftId, S int32_t syncSnapSendMsg(SSyncSnapshotSender* pSender, int32_t seq, void* pBlock, int32_t len, int32_t typ); int32_t syncSnapSendRsp(SSyncSnapshotReceiver* pReceiver, SyncSnapshotSend* pMsg, void* pBlock, int32_t len, - int32_t typ, int32_t code); + int32_t typ, int32_t rspCode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 66d8edfdfc..540255c200 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -74,12 +74,12 @@ typedef struct SSyncSnapshotSender { int32_t replicaIndex; } SSyncSnapshotSender; -SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex); -void snapshotSenderDestroy(SSyncSnapshotSender *pSender); -bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); -int32_t snapshotSenderStart(SSyncSnapshotSender *pSender); -void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish); -int32_t snapshotReSend(SSyncSnapshotSender *pSender); +int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender); +void snapshotSenderDestroy(SSyncSnapshotSender *pSender); +bool snapshotSenderIsStart(SSyncSnapshotSender *pSender); +int32_t snapshotSenderStart(SSyncSnapshotSender *pSender); +void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish); +int32_t snapshotReSend(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { // update when prep snapshot @@ -101,11 +101,11 @@ typedef struct SSyncSnapshotReceiver { SSyncNode *pSyncNode; } SSyncSnapshotReceiver; -SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId); -void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); -void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg); -void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); -bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); +int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver); +void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); +void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg); +void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); +bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); // on message // int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3033c50984..d6a1efc05f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -949,6 +949,7 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { // open/close -------------- SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { + int32_t code = 0; SSyncNode* pSyncNode = taosMemoryCalloc(1, sizeof(SSyncNode)); if (pSyncNode == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1229,7 +1230,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // snapshot senders for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { - SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i); + SSyncSnapshotSender* pSender = NULL; + code = snapshotSenderCreate(pSyncNode, i, &pSender); if (pSender == NULL) return NULL; pSyncNode->senders[i] = pSender; @@ -1237,7 +1239,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { } // snapshot receivers - pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID); + code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver); if (pSyncNode->pNewNodeReceiver == NULL) return NULL; sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p", pSyncNode->pNewNodeReceiver); @@ -1810,7 +1812,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // create new for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { if (pSyncNode->senders[i] == NULL) { - pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i); + snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]); if (pSyncNode->senders[i] == NULL) { // will be created later while send snapshot sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig"); @@ -2840,8 +2842,9 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum } for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { - SSyncSnapshotSender* pSender = snapshotSenderCreate(ths, i); - if (pSender == NULL) return -1; + SSyncSnapshotSender* pSender = NULL; + int32_t code = snapshotSenderCreate(ths, i, &pSender); + if (pSender == NULL) return terrno = code; ths->senders[i] = pSender; sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender); diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 33174ff85e..93a12f3d16 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -1133,13 +1133,10 @@ void syncNodeLogReplDestroy(SSyncNode* pNode) { } int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) { - int32_t code = 0; - - *ppBuf = NULL; - + int32_t code = 0; SSyncLogBuffer* pBuf = taosMemoryCalloc(1, sizeof(SSyncLogBuffer)); if (pBuf == NULL) { - TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _exit); } pBuf->size = sizeof(pBuf->entries) / sizeof(pBuf->entries[0]); @@ -1149,25 +1146,25 @@ int32_t syncLogBufferCreate(SSyncLogBuffer** ppBuf) { if (taosThreadMutexAttrInit(&pBuf->attr) < 0) { code = TAOS_SYSTEM_ERROR(errno); sError("failed to init log buffer mutexattr due to %s", tstrerror(code)); - goto _err; + goto _exit; } if (taosThreadMutexAttrSetType(&pBuf->attr, PTHREAD_MUTEX_RECURSIVE) < 0) { code = TAOS_SYSTEM_ERROR(errno); sError("failed to set log buffer mutexattr type due to %s", tstrerror(code)); - goto _err; + goto _exit; } if (taosThreadMutexInit(&pBuf->mutex, &pBuf->attr) < 0) { code = TAOS_SYSTEM_ERROR(errno); sError("failed to init log buffer mutex due to %s", tstrerror(code)); - goto _err; + goto _exit; + } +_exit: + if (code != 0) { + taosMemoryFreeClear(pBuf); } - *ppBuf = pBuf; - -_err: - taosMemoryFree(pBuf); TAOS_RETURN(code); } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 8dab694975..0446630717 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -50,27 +50,31 @@ static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) { return; } -static SSyncSnapBuffer *syncSnapBufferCreate() { +static int32_t syncSnapBufferCreate(SSyncSnapBuffer **ppBuf) { SSyncSnapBuffer *pBuf = taosMemoryCalloc(1, sizeof(SSyncSnapBuffer)); if (pBuf == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + *ppBuf = NULL; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pBuf->size = sizeof(pBuf->entries) / sizeof(void *); ASSERT(pBuf->size == TSDB_SYNC_SNAP_BUFFER_SIZE); - taosThreadMutexInit(&pBuf->mutex, NULL); - return pBuf; + (void)taosThreadMutexInit(&pBuf->mutex, NULL); + *ppBuf = pBuf; + TAOS_RETURN(0); } -SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex) { +int32_t snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex, SSyncSnapshotSender **ppSender) { + int32_t code = 0; + *ppSender = NULL; bool condition = (pSyncNode->pFsm->FpSnapshotStartRead != NULL) && (pSyncNode->pFsm->FpSnapshotStopRead != NULL) && (pSyncNode->pFsm->FpSnapshotDoRead != NULL); - if (!condition) return NULL; + if (!condition) { + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); + } SSyncSnapshotSender *pSender = taosMemoryCalloc(1, sizeof(SSyncSnapshotSender)); if (pSender == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pSender->start = false; @@ -85,17 +89,19 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; - SSyncSnapBuffer *pSndBuf = syncSnapBufferCreate(); + SSyncSnapBuffer *pSndBuf = NULL; + code = syncSnapBufferCreate(&pSndBuf); if (pSndBuf == NULL) { taosMemoryFree(pSender); pSender = NULL; - return NULL; + TAOS_RETURN(code); } pSndBuf->entryDeleteCb = syncSnapBlockDestroy; pSender->pSndBuf = pSndBuf; syncSnapBufferReset(pSender->pSndBuf); - return pSender; + *ppSender = pSender; + TAOS_RETURN(code); } void syncSnapBlockDestroy(void *ptr) { @@ -144,7 +150,7 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return atomic_load_8(&pSender->start); } int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { - int32_t code = -1; + int32_t code = 0; int8_t started = atomic_val_compare_exchange_8(&pSender->start, false, true); if (started) return 0; @@ -160,7 +166,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { pSender->snapshot.lastApplyTerm = SYNC_TERM_INVALID; pSender->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; - memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig)); + (void)memset(&pSender->lastConfig, 0, sizeof(pSender->lastConfig)); pSender->sendingMS = 0; pSender->term = raftStoreGetTerm(pSender->pSyncNode); pSender->startTime = taosGetMonoTimestampMs(); @@ -170,8 +176,8 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { // Get snapshot info SSyncNode *pSyncNode = pSender->pSyncNode; SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT}; - if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo) != 0) { - sSError(pSender, "snapshot get info failure since %s", terrstr()); + if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapInfo)) != 0) { + sSError(pSender, "snapshot get info failure since %s", tstrerror(code)); goto _out; } @@ -182,25 +188,24 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { SSyncTLV *datHead = pData; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT) { sSError(pSender, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); - terrno = TSDB_CODE_INVALID_DATA_FMT; + code = TSDB_CODE_INVALID_DATA_FMT; goto _out; } dataLen = sizeof(SSyncTLV) + datHead->len; } - if (syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type) != 0) { + if ((code = syncSnapSendMsg(pSender, pSender->seq, pData, dataLen, type)) != 0) { goto _out; } SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; sSInfo(pSender, "snapshot sender start, to dnode:%d.", DID(&destId)); - code = 0; _out: if (snapInfo.data) { taosMemoryFree(snapInfo.data); snapInfo.data = NULL; } - return code; + TAOS_RETURN(code); } void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { @@ -230,11 +235,11 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { } int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) { - int32_t code = -1; + int32_t code = 0; SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId) != 0) { - sSError(pSender, "failed to build snap replication msg since %s", terrstr()); + if ((code = syncBuildSnapshotSend(&rpcMsg, blockLen, pSender->pSyncNode->vgId)) != 0) { + sSError(pSender, "failed to build snap replication msg since %s", tstrerror(code)); goto _OUT; } @@ -256,20 +261,19 @@ int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, pMsg->payloadType = typ; // send msg - if (syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg) != 0) { - sSError(pSender, "failed to send snap replication msg since %s. seq:%d", terrstr(), seq); + if ((code = syncNodeSendMsgById(&pMsg->destId, pSender->pSyncNode, &rpcMsg)) != 0) { + sSError(pSender, "failed to send snap replication msg since %s. seq:%d", tstrerror(code), seq); goto _OUT; } - code = 0; _OUT: - return code; + TAOS_RETURN(code); } // when sender receive ack, call this function to send msg from seq // seq = ack + 1, already updated static int32_t snapshotSend(SSyncSnapshotSender *pSender) { - int32_t code = -1; + int32_t code = 0; SyncSnapBlock *pBlk = NULL; if (pSender->seq < SYNC_SNAPSHOT_SEQ_END) { @@ -278,7 +282,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (pSender->seq > SYNC_SNAPSHOT_SEQ_BEGIN) { pBlk = taosMemoryCalloc(1, sizeof(SyncSnapBlock)); if (pBlk == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; goto _OUT; } @@ -288,7 +292,6 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { code = pSender->pSyncNode->pFsm->FpSnapshotDoRead(pSender->pSyncNode->pFsm, pSender->pReader, &pBlk->pBlock, &pBlk->blockLen); if (code != 0) { - terrno = code; sSError(pSender, "snapshot sender read failed since %s", tstrerror(code)); goto _OUT; } @@ -300,7 +303,6 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { // read finish, update seq to end pSender->seq = SYNC_SNAPSHOT_SEQ_END; sSInfo(pSender, "snapshot sender read to the end"); - code = 0; goto _OUT; } } @@ -311,7 +313,7 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { // send msg int32_t blockLen = (pBlk) ? pBlk->blockLen : 0; void *pBlock = (pBlk) ? pBlk->pBlock : NULL; - if (syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0) != 0) { + if ((code = syncSnapSendMsg(pSender, pSender->seq, pBlock, blockLen, 0)) != 0) { goto _OUT; } @@ -325,22 +327,22 @@ static int32_t snapshotSend(SSyncSnapshotSender *pSender) { pSender->pSndBuf->end = TMAX(pSender->seq + 1, pSender->pSndBuf->end); } pSender->lastSendTime = nowMs; - code = 0; _OUT:; if (pBlk != NULL) { syncSnapBlockDestroy(pBlk); pBlk = NULL; } - return code; + TAOS_RETURN(code); } // send snapshot data from cache int32_t snapshotReSend(SSyncSnapshotSender *pSender) { SSyncSnapBuffer *pSndBuf = pSender->pSndBuf; - int32_t code = -1; + int32_t code = 0; taosThreadMutexLock(&pSndBuf->mutex); if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; goto _out; } @@ -351,7 +353,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (pBlk->acked || nowMs < pBlk->sendTimeMs + SYNC_SNAP_RESEND_MS) { continue; } - if (syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0) != 0) { + if ((code = syncSnapSendMsg(pSender, pBlk->seq, pBlk->pBlock, pBlk->blockLen, 0)) != 0) { goto _out; } pBlk->sendTimeMs = nowMs; @@ -364,21 +366,20 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { } if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) { - if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) { + if ((code = syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0)) != 0) { goto _out; } } - code = 0; _out:; taosThreadMutexUnlock(&pSndBuf->mutex); - return code; + TAOS_RETURN(code); } int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId); if (pSender == NULL) { sNError(pSyncNode, "snapshot sender start error since get failed"); - return -1; + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); } if (snapshotSenderIsStart(pSender)) { @@ -390,23 +391,26 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { int32_t code = snapshotSenderStart(pSender); if (code != 0) { - sSError(pSender, "snapshot sender start error since %s", terrstr()); - return -1; + sSError(pSender, "snapshot sender start error since %s", tstrerror(code)); + TAOS_RETURN(code); } return 0; } // receiver -SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) { +int32_t snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId, SSyncSnapshotReceiver **ppReceiver) { + int32_t code = 0; + *ppReceiver = NULL; bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) && (pSyncNode->pFsm->FpSnapshotDoWrite != NULL); - if (!condition) return NULL; + if (!condition) { + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); + } SSyncSnapshotReceiver *pReceiver = taosMemoryCalloc(1, sizeof(SSyncSnapshotReceiver)); if (pReceiver == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } pReceiver->start = false; @@ -421,17 +425,19 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from pReceiver->snapshot.lastApplyTerm = 0; pReceiver->snapshot.lastConfigIndex = SYNC_INDEX_INVALID; - SSyncSnapBuffer *pRcvBuf = syncSnapBufferCreate(); + SSyncSnapBuffer *pRcvBuf = NULL; + code = syncSnapBufferCreate(&pRcvBuf); if (pRcvBuf == NULL) { taosMemoryFree(pReceiver); pReceiver = NULL; - return NULL; + TAOS_RETURN(code); } pRcvBuf->entryDeleteCb = rpcFreeCont; pReceiver->pRcvBuf = pRcvBuf; syncSnapBufferReset(pReceiver->pRcvBuf); - return pReceiver; + *ppReceiver = pReceiver; + TAOS_RETURN(code); } static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) { @@ -452,10 +458,11 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { // close writer if (pReceiver->pWriter != NULL) { - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, - &pReceiver->snapshot); - if (ret != 0) { - sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, terrstr()); + int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, + false, &pReceiver->snapshot); + if (code != 0) { + sError("vgId:%d, snapshot receiver stop failed while destroy since %s", pReceiver->pSyncNode->vgId, + tstrerror(code)); } pReceiver->pWriter = NULL; } @@ -486,8 +493,7 @@ static int32_t snapshotReceiverSignatureCmp(SSyncSnapshotReceiver *pReceiver, Sy static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pBeginMsg) { if (pReceiver->pWriter != NULL) { sRError(pReceiver, "vgId:%d, snapshot receiver writer is not null", pReceiver->pSyncNode->vgId); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); } // update ack @@ -501,11 +507,11 @@ static int32_t snapshotReceiverStartWriter(SSyncSnapshotReceiver *pReceiver, Syn pReceiver->snapshotParam.end = pBeginMsg->lastIndex; // start writer - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam, - &pReceiver->pWriter); - if (ret != 0) { - sRError(pReceiver, "snapshot receiver start write failed since %s", terrstr()); - return -1; + int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &pReceiver->snapshotParam, + &pReceiver->pWriter); + if (code != 0) { + sRError(pReceiver, "snapshot receiver start write failed since %s", tstrerror(code)); + TAOS_RETURN(code); } // event log @@ -541,10 +547,10 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { taosThreadMutexLock(&pReceiver->pRcvBuf->mutex); { if (pReceiver->pWriter != NULL) { - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, - false, &pReceiver->snapshot); - if (ret != 0) { - sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr()); + int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, + false, &pReceiver->snapshot); + if (code != 0) { + sRError(pReceiver, "snapshot receiver stop write failed since %s", tstrerror(code)); } pReceiver->pWriter = NULL; } else { @@ -567,8 +573,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); if (code != 0) { - sRError(pReceiver, "failed to finish snapshot receiver write since %s", terrstr()); - return -1; + sRError(pReceiver, "failed to finish snapshot receiver write since %s", tstrerror(code)); + TAOS_RETURN(code); } } @@ -586,8 +592,8 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap code = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, true, &pReceiver->snapshot); if (code != 0) { - sRError(pReceiver, "snapshot receiver apply failed since %s", terrstr()); - return -1; + sRError(pReceiver, "snapshot receiver apply failed since %s", tstrerror(code)); + TAOS_RETURN(code); } pReceiver->pWriter = NULL; sRInfo(pReceiver, "snapshot receiver write stopped"); @@ -604,13 +610,14 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap code = pReceiver->pSyncNode->pLogStore->syncLogRestoreFromSnapshot(pReceiver->pSyncNode->pLogStore, pMsg->lastIndex); if (code != 0) { - sRError(pReceiver, "failed to snapshot receiver log restore since %s", terrstr()); - return -1; + sRError(pReceiver, "failed to snapshot receiver log restore since %s", tstrerror(code)); + TAOS_RETURN(code); } sRInfo(pReceiver, "wal log restored from snapshot"); } else { + code = TSDB_CODE_SYN_INTERNAL_ERROR; sRError(pReceiver, "snapshot receiver finish error since writer is null"); - return -1; + TAOS_RETURN(code); } return 0; @@ -619,14 +626,12 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { if (pMsg->seq != pReceiver->ack + 1) { sRError(pReceiver, "snapshot receiver invalid seq, ack:%d seq:%d", pReceiver->ack, pMsg->seq); - terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG; - return -1; + TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG); } if (pReceiver->pWriter == NULL) { sRError(pReceiver, "snapshot receiver failed to write data since writer is null"); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); } sRDebug(pReceiver, "snapshot receiver continue to write, blockLen:%d seq:%d", pMsg->dataLen, pMsg->seq); @@ -636,8 +641,8 @@ static int32_t snapshotReceiverGotData(SSyncSnapshotReceiver *pReceiver, SyncSna int32_t code = pReceiver->pSyncNode->pFsm->FpSnapshotDoWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, pMsg->data, pMsg->dataLen); if (code != 0) { - sRError(pReceiver, "snapshot receiver continue write failed since %s", terrstr()); - return -1; + sRError(pReceiver, "snapshot receiver continue write failed since %s", tstrerror(code)); + TAOS_RETURN(code); } } @@ -671,29 +676,27 @@ SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths) { static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, SSnapshot *pInfo) { ASSERT(pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT); - int32_t code = 0; + int32_t code = 0, lino = 0; // copy snap info from leader void *data = taosMemoryCalloc(1, pMsg->dataLen); if (data == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = terrno; - goto _out; + TAOS_CHECK_EXIT(TSDB_CODE_OUT_OF_MEMORY); } pInfo->data = data; data = NULL; memcpy(pInfo->data, pMsg->data, pMsg->dataLen); // exchange snap info - if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo) != 0) { + if ((code = pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo)) != 0) { sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType); - goto _out; + goto _exit; } SSyncTLV *datHead = pInfo->data; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); code = TSDB_CODE_INVALID_DATA_FMT; - goto _out; + goto _exit; } int32_t dataLen = sizeof(SSyncTLV) + datHead->len; @@ -701,18 +704,17 @@ static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshot SSnapshotParam *pParam = &pReceiver->snapshotParam; data = taosMemoryRealloc(pParam->data, dataLen); if (data == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; sError("vgId:%d, failed to realloc memory for snapshot prep due to %s. dataLen:%d", pSyncNode->vgId, - strerror(errno), dataLen); - terrno = TSDB_CODE_OUT_OF_MEMORY; - code = terrno; - goto _out; + tstrerror(code), dataLen); + goto _exit; } pParam->data = data; data = NULL; memcpy(pParam->data, pInfo->data, dataLen); -_out: - return code; +_exit: + TAOS_RETURN(code); } static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { @@ -741,8 +743,7 @@ static int32_t syncNodeOnSnapshotPrep(SSyncNode *pSyncNode, SyncSnapshotSend *pM "received a stale snapshot preparation. ignore." " msg signature:(%" PRId64 ", %" PRId64 ")", pMsg->term, pMsg->startTime); - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - code = terrno; + code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; goto _SEND_REPLY; } } else { @@ -764,7 +765,7 @@ _SEND_REPLY:; SSnapshot snapInfo = {.type = TDMT_SYNC_PREP_SNAPSHOT_REPLY}; int32_t dataLen = 0; if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT) { - if (syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo) != 0) { + if ((code = syncSnapReceiverExchgSnapInfo(pSyncNode, pReceiver, pMsg, &snapInfo)) != 0) { goto _out; } SSyncTLV *datHead = snapInfo.data; @@ -773,8 +774,7 @@ _SEND_REPLY:; // send response int32_t type = (snapInfo.data) ? snapInfo.type : 0; - if (syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code) != 0) { - code = terrno; + if ((code = syncSnapSendRsp(pReceiver, pMsg, snapInfo.data, dataLen, type, code)) != 0) { goto _out; } @@ -783,7 +783,7 @@ _out: taosMemoryFree(snapInfo.data); snapInfo.data = NULL; } - return code; + TAOS_RETURN(code); } static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { @@ -797,14 +797,14 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p } if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - sRError(pReceiver, "failed to begin snapshot receiver since %s", terrstr()); + code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + sRError(pReceiver, "failed to begin snapshot receiver since %s", tstrerror(code)); goto _SEND_REPLY; } // start writer - if (snapshotReceiverStartWriter(pReceiver, pMsg) != 0) { - sRError(pReceiver, "failed to start snapshot writer since %s", terrstr()); + if ((code = snapshotReceiverStartWriter(pReceiver, pMsg)) != 0) { + sRError(pReceiver, "failed to start snapshot writer since %s", tstrerror(code)); goto _SEND_REPLY; } @@ -817,26 +817,22 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p code = 0; _SEND_REPLY: - if (code != 0 && terrno != 0) { - code = terrno; - } // send response - if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) { - return -1; - } + TAOS_CHECK_RETURN(syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code)); - return code; + TAOS_RETURN(code); } int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg, void *pBlock, int32_t blockLen, - int32_t type, int32_t code) { + int32_t type, int32_t rspCode) { + int32_t code = 0; SSyncNode *pSyncNode = pReceiver->pSyncNode; // build msg SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) { - sRError(pReceiver, "failed to build snapshot receiver resp since %s", terrstr()); - return -1; + if ((code = syncBuildSnapshotSendRsp(&rpcMsg, blockLen, pSyncNode->vgId)) != 0) { + sRError(pReceiver, "failed to build snapshot receiver resp since %s", tstrerror(code)); + TAOS_RETURN(code); } SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; @@ -847,7 +843,7 @@ int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg pRspMsg->lastTerm = pMsg->lastTerm; pRspMsg->startTime = pMsg->startTime; pRspMsg->ack = pMsg->seq; - pRspMsg->code = code; + pRspMsg->code = rspCode; pRspMsg->snapBeginIndex = pReceiver->snapshotParam.start; pRspMsg->payloadType = type; @@ -856,9 +852,9 @@ int32_t syncSnapSendRsp(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg } // send msg - if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { - sRError(pReceiver, "failed to send snapshot receiver resp since %s", terrstr()); - return -1; + if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) { + sRError(pReceiver, "failed to send snapshot receiver resp since %s", tstrerror(code)); + TAOS_RETURN(code); } return 0; } @@ -867,13 +863,11 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot int32_t code = 0; SSyncSnapBuffer *pRcvBuf = pReceiver->pRcvBuf; SyncSnapshotSend *pMsg = ppMsg[0]; - terrno = TSDB_CODE_SUCCESS; taosThreadMutexLock(&pRcvBuf->mutex); if (pMsg->seq - pRcvBuf->start >= pRcvBuf->size) { - terrno = TSDB_CODE_SYN_BUFFER_FULL; - code = terrno; + code = TSDB_CODE_SYN_BUFFER_FULL; goto _out; } @@ -887,7 +881,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot ppMsg[0] = NULL; pRcvBuf->end = TMAX(pMsg->seq + 1, pRcvBuf->end); } else if (pMsg->seq < pRcvBuf->start) { - syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code); + code = syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code); goto _out; } @@ -900,8 +894,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot } for (int64_t seq = pRcvBuf->start; seq <= pRcvBuf->cursor; ++seq) { - if (snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size]) != 0) { - code = terrno; + if ((code = snapshotReceiverGotData(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size])) != 0) { if (code >= SYNC_SNAPSHOT_SEQ_INVALID) { code = TSDB_CODE_SYN_INTERNAL_ERROR; } @@ -915,7 +908,7 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot _out: taosThreadMutexUnlock(&pRcvBuf->mutex); - return code; + TAOS_RETURN(code); } static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend **ppMsg) { @@ -928,9 +921,9 @@ static int32_t syncNodeOnSnapshotReceive(SSyncNode *pSyncNode, SyncSnapshotSend int32_t code = 0; if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - sRError(pReceiver, "failed to receive snapshot data since %s.", terrstr()); - return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, terrno); + code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + sRError(pReceiver, "failed to receive snapshot data since %s.", tstrerror(code)); + return syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code); } return syncSnapBufferRecv(pReceiver, ppMsg); @@ -940,14 +933,13 @@ static int32_t syncNodeOnSnapshotEnd(SSyncNode *pSyncNode, SyncSnapshotSend *pMs // condition 2 // end, finish FSM SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; - int64_t timeNow = taosGetTimestampMs(); + int64_t timeNow = taosGetTimestampMs(); int32_t code = 0; if (snapshotReceiverSignatureCmp(pReceiver, pMsg) != 0) { sRError(pReceiver, "snapshot end failed since startTime:%" PRId64 " not equal to msg startTime:%" PRId64, pReceiver->startTime, pMsg->startTime); - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - code = terrno; + code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; goto _SEND_REPLY; } @@ -960,9 +952,9 @@ _SEND_REPLY:; // build msg SRpcMsg rpcMsg = {0}; - if (syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId) != 0) { - sRError(pReceiver, "snapshot receiver build rsp failed since %s", terrstr()); - return -1; + if ((code = syncBuildSnapshotSendRsp(&rpcMsg, 0, pSyncNode->vgId)) != 0) { + sRError(pReceiver, "snapshot receiver build rsp failed since %s", tstrerror(code)); + TAOS_RETURN(code); } SyncSnapshotRsp *pRspMsg = rpcMsg.pCont; @@ -978,16 +970,16 @@ _SEND_REPLY:; // send msg syncLogSendSyncSnapshotRsp(pSyncNode, pRspMsg, "snapshot receiver end"); - if (syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg) != 0) { - sRError(pReceiver, "snapshot receiver send rsp failed since %s", terrstr()); - return -1; + if ((code = syncNodeSendMsgById(&pRspMsg->destId, pSyncNode, &rpcMsg)) != 0) { + sRError(pReceiver, "snapshot receiver send rsp failed since %s", tstrerror(code)); + TAOS_RETURN(code); } - return code; + TAOS_RETURN(code); } int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { - SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont; + SyncSnapshotSend **ppMsg = (SyncSnapshotSend **)&pRpcMsg->pCont; SyncSnapshotSend *pMsg = ppMsg[0]; SSyncSnapshotReceiver *pReceiver = pSyncNode->pNewNodeReceiver; int32_t code = 0; @@ -995,35 +987,36 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { syncLogRecvSyncSnapshotSend(pSyncNode, pMsg, "not in my config"); - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - return -1; + code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + TAOS_RETURN(code); } if (pMsg->term < raftStoreGetTerm(pSyncNode)) { sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term, pMsg->seq); - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, terrno); - return -1; + code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code); + TAOS_RETURN(code); } - if(pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER){ + if (pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole != TAOS_SYNC_ROLE_LEARNER) { if (pMsg->term > raftStoreGetTerm(pSyncNode)) { syncNodeStepDown(pSyncNode, pMsg->term); } - } - else{ + } else { syncNodeUpdateTermWithoutStepDown(pSyncNode, pMsg->term); } if (pSyncNode->state != TAOS_SYNC_STATE_FOLLOWER && pSyncNode->state != TAOS_SYNC_STATE_LEARNER) { sRError(pReceiver, "snapshot receiver not a follower or learner"); - return -1; + code = TSDB_CODE_SYN_INTERNAL_ERROR; + TAOS_RETURN(code); } if (pMsg->seq < SYNC_SNAPSHOT_SEQ_PREP || pMsg->seq > SYNC_SNAPSHOT_SEQ_END) { sRError(pReceiver, "snap replication msg with invalid seq:%d", pMsg->seq); - return -1; + code = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG; + TAOS_RETURN(code); } // prepare @@ -1060,14 +1053,14 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { code = syncLogBufferReInit(pSyncNode->pLogBuf, pSyncNode); if (code != 0) { - sRError(pReceiver, "failed to reinit log buffer since %s", terrstr()); + sRError(pReceiver, "failed to reinit log buffer since %s", tstrerror(code)); } goto _out; } _out:; syncNodeResetElectTimer(pSyncNode); - return code; + TAOS_RETURN(code); } static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { @@ -1076,16 +1069,14 @@ static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSe SSyncTLV *datHead = (void *)pMsg->data; if (datHead->typ != pMsg->payloadType) { sSError(pSender, "unexpected data type in data of SyncSnapshotRsp. typ: %d", datHead->typ); - terrno = TSDB_CODE_INVALID_DATA_FMT; - return -1; + TAOS_RETURN(TSDB_CODE_INVALID_DATA_FMT); } int32_t dataLen = sizeof(SSyncTLV) + datHead->len; SSnapshotParam *pParam = &pSender->snapshotParam; void *data = taosMemoryRealloc(pParam->data, dataLen); if (data == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY); } memcpy(data, pMsg->data, dataLen); @@ -1097,19 +1088,18 @@ static int32_t syncSnapSenderExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshotSe // sender static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { - int32_t code = -1; + int32_t code = 0; SSnapshot snapshot = {0}; if (pMsg->snapBeginIndex > pSyncNode->commitIndex) { sSError(pSender, "snapshot begin index is greater than commit index. snapBeginIndex:%" PRId64 ", commitIndex:%" PRId64, pMsg->snapBeginIndex, pSyncNode->commitIndex); - terrno = TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG; - return -1; + TAOS_RETURN(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG); } taosThreadMutexLock(&pSender->pSndBuf->mutex); - pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); + TAOS_CHECK_GOTO(pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot), NULL, _out); // prepare pSender->snapshotParam.start = pMsg->snapBeginIndex; @@ -1123,14 +1113,12 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend // start reader if (pMsg->payloadType == TDMT_SYNC_PREP_SNAPSHOT_REPLY) { - if (syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg) != 0) { - goto _out; - } + TAOS_CHECK_GOTO(syncSnapSenderExchgSnapInfo(pSyncNode, pSender, pMsg), NULL, _out); } code = pSyncNode->pFsm->FpSnapshotStartRead(pSyncNode->pFsm, &pSender->snapshotParam, &pSender->pReader); if (code != 0) { - sSError(pSender, "prepare snapshot failed since %s", terrstr()); + sSError(pSender, "prepare snapshot failed since %s", tstrerror(code)); goto _out; } @@ -1141,7 +1129,7 @@ static int32_t syncNodeOnSnapshotPrepRsp(SSyncNode *pSyncNode, SSyncSnapshotSend _out: taosThreadMutexUnlock(&pSender->pSndBuf->mutex); - return code; + TAOS_RETURN(code); } static int32_t snapshotSenderSignatureCmp(SSyncSnapshotSender *pSender, SyncSnapshotRsp *pMsg) { @@ -1159,17 +1147,17 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp taosThreadMutexLock(&pSndBuf->mutex); if (snapshotSenderSignatureCmp(pSender, pMsg) != 0) { - code = terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; goto _out; } if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) { - code = terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + code = TSDB_CODE_SYN_INTERNAL_ERROR; goto _out; } if (pMsg->ack - pSndBuf->start >= pSndBuf->size) { - code = terrno = TSDB_CODE_SYN_BUFFER_FULL; + code = TSDB_CODE_SYN_BUFFER_FULL; goto _out; } @@ -1209,49 +1197,47 @@ static int32_t syncSnapBufferSend(SSyncSnapshotSender *pSender, SyncSnapshotRsp } _out: taosThreadMutexUnlock(&pSndBuf->mutex); - return code; + TAOS_RETURN(code); } int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { SyncSnapshotRsp **ppMsg = (SyncSnapshotRsp **)&pRpcMsg->pCont; SyncSnapshotRsp *pMsg = ppMsg[0]; + int32_t code = 0; // if already drop replica, do not process if (!syncNodeInRaftGroup(pSyncNode, &pMsg->srcId)) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "maybe replica already dropped"); - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - return -1; + TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE); } // get sender SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &pMsg->srcId); if (pSender == NULL) { syncLogRecvSyncSnapshotRsp(pSyncNode, pMsg, "sender is null"); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); } if (!snapshotSenderIsStart(pSender)) { sSError(pSender, "snapshot sender stopped. sender startTime:%" PRId64 ", msg startTime:%" PRId64, pSender->startTime, pMsg->startTime); - return -1; + TAOS_RETURN(TSDB_CODE_SYN_INTERNAL_ERROR); } // check signature int32_t order = 0; if ((order = snapshotSenderSignatureCmp(pSender, pMsg)) > 0) { sSWarn(pSender, "ignore a stale snap rsp, msg signature:(%" PRId64 ", %" PRId64 ").", pMsg->term, pMsg->startTime); - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; - return -1; + TAOS_RETURN(TSDB_CODE_SYN_MISMATCHED_SIGNATURE); } else if (order < 0) { sSError(pSender, "snapshot sender is stale. stop"); - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; goto _ERROR; } if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) { sSError(pSender, "snapshot sender not leader"); - terrno = TSDB_CODE_SYN_NOT_LEADER; + code = TSDB_CODE_SYN_NOT_LEADER; goto _ERROR; } @@ -1259,29 +1245,29 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { if (pMsg->term != currentTerm) { sSError(pSender, "snapshot sender term mismatch, msg term:%" PRId64 " currentTerm:%" PRId64, pMsg->term, currentTerm); - terrno = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; + code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; goto _ERROR; } if (pMsg->code != 0) { sSError(pSender, "snapshot sender receive error:%s 0x%x and stop sender", tstrerror(pMsg->code), pMsg->code); - terrno = pMsg->code; + code = pMsg->code; goto _ERROR; } // send begin if (pMsg->ack == SYNC_SNAPSHOT_SEQ_PREP) { sSInfo(pSender, "process prepare rsp"); - if (syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg) != 0) { + if ((code = syncNodeOnSnapshotPrepRsp(pSyncNode, pSender, pMsg)) != 0) { goto _ERROR; } } // send msg of data or end if (pMsg->ack >= SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->ack < SYNC_SNAPSHOT_SEQ_END) { - if (syncSnapBufferSend(pSender, ppMsg) != 0) { - sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", terrstr(), pSender->seq, - pSender->pReader, pSender->finish); + if ((code = syncSnapBufferSend(pSender, ppMsg)) != 0) { + sSError(pSender, "failed to replicate snap since %s. seq:%d, pReader:%p, finish:%d", tstrerror(code), + pSender->seq, pSender->pReader, pSender->finish); goto _ERROR; } } @@ -1298,5 +1284,5 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) { _ERROR: snapshotSenderStop(pSender, false); syncNodeReplicateReset(pSyncNode, &pMsg->srcId); - return -1; + TAOS_RETURN(code); } diff --git a/source/libs/sync/test/syncSnapshotReceiverTest.cpp b/source/libs/sync/test/syncSnapshotReceiverTest.cpp index 1fca04a1ad..112c145a45 100644 --- a/source/libs/sync/test/syncSnapshotReceiverTest.cpp +++ b/source/libs/sync/test/syncSnapshotReceiverTest.cpp @@ -42,7 +42,8 @@ SSyncSnapshotReceiver* createReceiver() { id.addr = syncUtilAddr2U64("1.2.3.4", 99); id.vgId = 100; - SSyncSnapshotReceiver* pReceiver = snapshotReceiverCreate(pSyncNode, id); + SSyncSnapshotReceiver* pReceiver = NULL; + (void)snapshotReceiverCreate(pSyncNode, id, &pReceiver); pReceiver->start = true; pReceiver->ack = 20; pReceiver->pWriter = (void*)0x11; diff --git a/source/libs/sync/test/syncSnapshotSenderTest.cpp b/source/libs/sync/test/syncSnapshotSenderTest.cpp index a1768c2ce5..be31df30e9 100644 --- a/source/libs/sync/test/syncSnapshotSenderTest.cpp +++ b/source/libs/sync/test/syncSnapshotSenderTest.cpp @@ -39,7 +39,8 @@ SSyncSnapshotSender* createSender() { pSyncNode->pFsm->FpGetSnapshotInfo = GetSnapshot; #endif - SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, 2); + SSyncSnapshotSender* pSender = NULL; + (void)snapshotSenderCreate(pSyncNode, 2, &pSender); pSender->start = true; pSender->seq = 10; pSender->ack = 20;