diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a52e87bbb3..5b3d021141 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -669,6 +669,13 @@ int32_t taosGetErrSize(); #define TSDB_CODE_SYN_BUFFER_FULL TAOS_DEF_ERROR_CODE(0, 0x0916) #define TSDB_CODE_SYN_WRITE_STALL TAOS_DEF_ERROR_CODE(0, 0x0917) #define TSDB_CODE_SYN_NEGOTIATION_WIN_FULL TAOS_DEF_ERROR_CODE(0, 0x0918) +#define TSDB_CODE_SYN_WRONG_TERM TAOS_DEF_ERROR_CODE(0, 0x0919) +#define TSDB_CODE_SYN_WRONG_FSM_STATE TAOS_DEF_ERROR_CODE(0, 0x091A) +#define TSDB_CODE_SYN_WRONG_SYNC_STATE TAOS_DEF_ERROR_CODE(0, 0x091B) +#define TSDB_CODE_SYN_WRONG_REF TAOS_DEF_ERROR_CODE(0, 0x091C) +#define TSDB_CODE_SYN_INVALID_ID TAOS_DEF_ERROR_CODE(0, 0x091D) +#define TSDB_CODE_SYN_RETURN_VALUE_NULL TAOS_DEF_ERROR_CODE(0, 0x091E) +#define TSDB_CODE_SYN_WRONG_ROLE TAOS_DEF_ERROR_CODE(0, 0x091F) #define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF) // tq diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d6a1efc05f..7b9b2ee276 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -83,36 +83,42 @@ int64_t syncOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { } int32_t syncStart(int64_t rid) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid); - return -1; + TAOS_RETURN(code); } - if (syncNodeRestore(pSyncNode) < 0) { - sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, terrstr()); + if ((code = syncNodeRestore(pSyncNode)) < 0) { + sError("vgId:%d, failed to restore sync log buffer since %s", pSyncNode->vgId, tstrerror(code)); goto _err; } - if (syncNodeStart(pSyncNode) < 0) { - sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, terrstr()); + if ((code = syncNodeStart(pSyncNode)) < 0) { + sError("vgId:%d, failed to start sync node since %s", pSyncNode->vgId, tstrerror(code)); goto _err; } syncNodeRelease(pSyncNode); - return 0; + TAOS_RETURN(code); _err: syncNodeRelease(pSyncNode); - return -1; + TAOS_RETURN(code); } int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("failed to acquire rid:%" PRId64 " of tsNodeReftId for pSyncNode", rid); - return terrno; + TAOS_RETURN(code); } *cfg = pSyncNode->raftCfg.cfg; @@ -153,8 +159,13 @@ static bool syncNodeCheckNewConfig(SSyncNode* pSyncNode, const SSyncCfg* pCfg) { } int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) return -1; + if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } if (pSyncNode->raftCfg.lastConfigIndex >= pNewCfg->lastIndex) { syncNodeRelease(pSyncNode); @@ -165,9 +176,9 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { if (!syncNodeCheckNewConfig(pSyncNode, pNewCfg)) { syncNodeRelease(pSyncNode); - terrno = TSDB_CODE_SYN_NEW_CONFIG_ERROR; + code = TSDB_CODE_SYN_NEW_CONFIG_ERROR; sError("vgId:%d, failed to reconfig since invalid new config", pSyncNode->vgId); - return -1; + TAOS_RETURN(code); } syncNodeUpdateNewConfigIndex(pSyncNode, pNewCfg); @@ -185,15 +196,23 @@ int32_t syncReconfig(int64_t rid, SSyncCfg* pNewCfg) { } syncNodeRelease(pSyncNode); - return 0; + TAOS_RETURN(code); } int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { int32_t code = -1; - if (!syncIsInit()) return code; + if (!syncIsInit()) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) return code; + if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } switch (pMsg->msgType) { case TDMT_SYNC_HEARTBEAT: @@ -239,21 +258,25 @@ int32_t syncProcessMsg(int64_t rid, SRpcMsg* pMsg) { code = syncBecomeAssignedLeader(pSyncNode, pMsg); break; default: - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - code = -1; + code = TSDB_CODE_MSG_NOT_PROCESSED; } syncNodeRelease(pSyncNode); if (code != 0) { sDebug("vgId:%d, failed to process sync msg:%p type:%s since 0x%x", pSyncNode->vgId, pMsg, TMSG_INFO(pMsg->msgType), - terrno); + code); } - return code; + TAOS_RETURN(code); } int32_t syncLeaderTransfer(int64_t rid) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); - if (pSyncNode == NULL) return -1; + if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } int32_t ret = syncNodeLeaderTransfer(pSyncNode); syncNodeRelease(pSyncNode); @@ -275,16 +298,14 @@ int32_t syncForceBecomeFollower(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) { - int32_t ret = -1; - int32_t errcode = TSDB_CODE_MND_ARB_TOKEN_MISMATCH; + int32_t code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH; void* pHead = NULL; int32_t contLen = 0; SVArbSetAssignedLeaderReq req = {0}; if (tDeserializeSVArbSetAssignedLeaderReq((char*)pRpcMsg->pCont + sizeof(SMsgHead), pRpcMsg->contLen, &req) != 0) { sError("vgId:%d, failed to deserialize SVArbSetAssignedLeaderReq", ths->vgId); - terrno = TSDB_CODE_INVALID_MSG; - errcode = terrno; + code = TSDB_CODE_INVALID_MSG; goto _OVER; } @@ -303,17 +324,17 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) { } if (ths->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) { - terrno = TSDB_CODE_SUCCESS; + code = TSDB_CODE_SUCCESS; raftStoreNextTerm(ths); if (terrno != TSDB_CODE_SUCCESS) { - sError("vgId:%d, failed to set next term since:%s", ths->vgId, terrstr()); - errcode = terrno; + code = terrno; + sError("vgId:%d, failed to set next term since:%s", ths->vgId, tstrerror(code)); goto _OVER; } syncNodeBecomeAssignedLeader(ths); - if (syncNodeAppendNoop(ths) < 0) { - sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, terrstr()); + if ((code = syncNodeAppendNoop(ths)) < 0) { + sError("vgId:%d, assigned leader failed to append noop entry since %s", ths->vgId, tstrerror(code)); } } @@ -324,32 +345,28 @@ int32_t syncBecomeAssignedLeader(SSyncNode* ths, SRpcMsg* pRpcMsg) { contLen = tSerializeSVArbSetAssignedLeaderRsp(NULL, 0, &rsp); if (contLen <= 0) { + code = TSDB_CODE_OUT_OF_MEMORY; sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId); - terrno = TSDB_CODE_OUT_OF_MEMORY; - errcode = terrno; goto _OVER; } pHead = rpcMallocCont(contLen); if (!pHead) { + code = TSDB_CODE_OUT_OF_MEMORY; sError("vgId:%d, failed to malloc memory for SVArbSetAssignedLeaderRsp", ths->vgId); - terrno = TSDB_CODE_OUT_OF_MEMORY; - errcode = terrno; goto _OVER; } if (tSerializeSVArbSetAssignedLeaderRsp(pHead, contLen, &rsp) <= 0) { + code = TSDB_CODE_OUT_OF_MEMORY; sError("vgId:%d, failed to serialize SVArbSetAssignedLeaderRsp", ths->vgId); - terrno = TSDB_CODE_OUT_OF_MEMORY; - errcode = terrno; rpcFreeCont(pHead); goto _OVER; } - errcode = TSDB_CODE_SUCCESS; - ret = 0; + code = TSDB_CODE_SUCCESS; _OVER:; SRpcMsg rspMsg = { - .code = errcode, + .code = code, .pCont = pHead, .contLen = contLen, .info = pRpcMsg->info, @@ -358,12 +375,17 @@ _OVER:; tmsgSendRsp(&rspMsg); tFreeSVArbSetAssignedLeaderReq(&req); - return ret; + TAOS_RETURN(code); } int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { + int32_t code = 0; SSyncNode* pNode = syncNodeAcquire(rid); - if (pNode == NULL) return -1; + if (pNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } SRpcMsg rpcMsg = {0}; int32_t ret = syncRespMgrGetAndDel(pNode->pSyncRespMgr, seq, &rpcMsg.info); @@ -376,7 +398,7 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) { return 0; } else { sError("no message handle to send timeout response, seq:%" PRId64, seq); - return -1; + return TSDB_CODE_SYN_INTERNAL_ERROR; } } @@ -402,9 +424,12 @@ static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) { int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { SSyncNode* pSyncNode = syncNodeAcquire(rid); + int32_t code = 0; if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("sync begin snapshot error"); - return -1; + TAOS_RETURN(code); } SyncIndex beginIndex = pSyncNode->pLogStore->syncLogBeginIndex(pSyncNode->pLogStore); @@ -417,7 +442,6 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { return 0; } - int32_t code = 0; int64_t logRetention = 0; if (syncNodeIsMnode(pSyncNode)) { @@ -475,24 +499,26 @@ _DEL_WAL: } while (0); syncNodeRelease(pSyncNode); - return code; + TAOS_RETURN(code); } int32_t syncEndSnapshot(int64_t rid) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("sync end snapshot error"); - return -1; + TAOS_RETURN(code); } - int32_t code = 0; if (atomic_load_64(&pSyncNode->snapshottingIndex) != SYNC_INDEX_INVALID) { SSyncLogStoreData* pData = pSyncNode->pLogStore->data; code = walEndSnapshot(pData->pWal); if (code != 0) { - sNError(pSyncNode, "wal snapshot end error since:%s", terrstr()); + sNError(pSyncNode, "wal snapshot end error since:%s", tstrerror(code)); syncNodeRelease(pSyncNode); - return -1; + TAOS_RETURN(code); } else { sNTrace(pSyncNode, "wal snapshot end, index:%" PRId64, atomic_load_64(&pSyncNode->snapshottingIndex)); atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID); @@ -500,7 +526,7 @@ int32_t syncEndSnapshot(int64_t rid) { } syncNodeRelease(pSyncNode); - return code; + TAOS_RETURN(code); } bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) { @@ -585,13 +611,13 @@ int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) { int32_t syncNodeLeaderTransferTo(SSyncNode* pSyncNode, SNodeInfo newLeader) { if (pSyncNode->replicaNum == 1) { sDebug("vgId:%d, only one replica, cannot leader transfer", pSyncNode->vgId); - return -1; + return TSDB_CODE_SYN_INTERNAL_ERROR; } sNTrace(pSyncNode, "begin leader transfer to %s:%u", newLeader.nodeFqdn, newLeader.nodePort); SRpcMsg rpcMsg = {0}; - (void)syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId); + TAOS_CHECK_RETURN(syncBuildLeaderTransfer(&rpcMsg, pSyncNode->vgId)); SyncLeaderTransfer* pMsg = rpcMsg.pCont; pMsg->newLeaderId.addr = SYNC_ADDR(&newLeader); @@ -639,10 +665,12 @@ SSyncState syncGetState(int64_t rid) { } int32_t syncGetArbToken(int64_t rid, char* outToken) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { - terrno = TSDB_CODE_NOT_FOUND; - return -1; + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } memset(outToken, 0, TSDB_ARB_TOKEN_SIZE); @@ -650,41 +678,44 @@ int32_t syncGetArbToken(int64_t rid, char* outToken) { strncpy(outToken, pSyncNode->arbToken, TSDB_ARB_TOKEN_SIZE); taosThreadMutexUnlock(&pSyncNode->arbTokenMutex); - terrno = TSDB_CODE_SUCCESS; syncNodeRelease(pSyncNode); - return 0; + TAOS_RETURN(code); } int32_t syncGetAssignedLogSynced(int64_t rid) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { - terrno = TSDB_CODE_NOT_FOUND; - return -1; + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) { - terrno = TSDB_CODE_VND_ARB_NOT_SYNCED; + code = TSDB_CODE_VND_ARB_NOT_SYNCED; syncNodeRelease(pSyncNode); - return 0; + TAOS_RETURN(code); } bool isSync = pSyncNode->commitIndex >= pSyncNode->assignedCommitIndex; - terrno = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED); + code = (isSync ? TSDB_CODE_SUCCESS : TSDB_CODE_VND_ARB_NOT_SYNCED); syncNodeRelease(pSyncNode); - return 0; + TAOS_RETURN(code); } int32_t syncUpdateArbTerm(int64_t rid, SyncTerm arbTerm) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { - terrno = TSDB_CODE_NOT_FOUND; - return -1; + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); } pSyncNode->arbTerm = TMAX(arbTerm, pSyncNode->arbTerm); syncNodeRelease(pSyncNode); - return 0; + TAOS_RETURN(code); } SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex) { @@ -730,10 +761,13 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) { } int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("sync propose error"); - return -1; + TAOS_RETURN(code); } int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak, seq); @@ -742,24 +776,30 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { } int32_t syncCheckMember(int64_t rid) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("sync propose error"); - return -1; + TAOS_RETURN(code); } if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) { - return -1; + return TSDB_CODE_SYN_WRONG_ROLE; } return 0; } int32_t syncIsCatchUp(int64_t rid) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("sync Node Acquire error since %d", errno); - return -1; + TAOS_RETURN(code); } int32_t isCatchUp = 0; @@ -781,10 +821,13 @@ int32_t syncIsCatchUp(int64_t rid) { } ESyncRole syncGetRole(int64_t rid) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("sync Node Acquire error since %d", errno); - return -1; + TAOS_RETURN(code); } ESyncRole role = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex].nodeRole; @@ -794,10 +837,13 @@ ESyncRole syncGetRole(int64_t rid) { } int64_t syncGetTerm(int64_t rid) { + int32_t code = 0; SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("sync Node Acquire error since %d", errno); - return -1; + TAOS_RETURN(code); } int64_t term = raftStoreGetTerm(pSyncNode); @@ -807,25 +853,26 @@ int64_t syncGetTerm(int64_t rid) { } int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_t* seq) { + int32_t code = 0; if (pSyncNode->state != TAOS_SYNC_STATE_LEADER && pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) { - terrno = TSDB_CODE_SYN_NOT_LEADER; + code = TSDB_CODE_SYN_NOT_LEADER; sNWarn(pSyncNode, "sync propose not leader, type:%s", TMSG_INFO(pMsg->msgType)); - return -1; + TAOS_RETURN(code); } if (!pSyncNode->restoreFinish) { - terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; + code = TSDB_CODE_SYN_PROPOSE_NOT_READY; sNWarn(pSyncNode, "failed to sync propose since not ready, type:%s, last:%" PRId64 ", cmt:%" PRId64, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); - return -1; + TAOS_RETURN(code); } // heartbeat timeout if (pSyncNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER && syncNodeHeartbeatReplyTimeout(pSyncNode)) { - terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; + code = TSDB_CODE_SYN_PROPOSE_NOT_READY; sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64, TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); - return -1; + TAOS_RETURN(code); } // optimized one replica @@ -850,10 +897,10 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ return 0; } } else { - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + code = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, failed to propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType)); - return -1; + TAOS_RETURN(code); } } else { SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg}; @@ -863,7 +910,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ if (code != 0) { sError("vgId:%d, failed to propose msg while serialize since %s", pSyncNode->vgId, terrstr()); (void)syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); - return -1; + TAOS_RETURN(code); } sNTrace(pSyncNode, "propose msg, type:%s", TMSG_INFO(pMsg->msgType)); @@ -874,7 +921,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_ } if (seq != NULL) *seq = seqNum; - return code; + TAOS_RETURN(code); } } @@ -912,6 +959,7 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS / HEARTBEAT_TICK_NUM, (void*)(pData->rid), syncEnv()->pTimerManager, &pSyncTimer->pTimer); } else { + ret = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, start ctrl hb timer error, sync env is stop", pSyncNode->vgId); } return ret; @@ -920,7 +968,9 @@ static int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; atomic_add_fetch_64(&pSyncTimer->logicClock, 1); - taosTmrStop(pSyncTimer->pTimer); + if (!taosTmrStop(pSyncTimer->pTimer)) { + return TSDB_CODE_SYN_INTERNAL_ERROR; + } pSyncTimer->pTimer = NULL; syncHbTimerDataRemove(pSyncTimer->hbDataRid); pSyncTimer->hbDataRid = -1; @@ -928,6 +978,7 @@ static int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { } int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { + int32_t code = 0; ASSERTS(pNode->pLogStore != NULL, "log store not created"); ASSERTS(pNode->pFsm != NULL, "pFsm not registered"); ASSERTS(pNode->pFsm->FpGetSnapshotInfo != NULL, "FpGetSnapshotInfo not registered"); @@ -938,13 +989,13 @@ int32_t syncNodeLogStoreRestoreOnNeed(SSyncNode* pNode) { SyncIndex firstVer = pNode->pLogStore->syncLogBeginIndex(pNode->pLogStore); SyncIndex lastVer = pNode->pLogStore->syncLogLastIndex(pNode->pLogStore); if ((lastVer < commitIndex || firstVer > commitIndex + 1) || pNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) { - if (pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) { + if ((code = pNode->pLogStore->syncLogRestoreFromSnapshot(pNode->pLogStore, commitIndex)) != 0) { sError("vgId:%d, failed to restore log store from snapshot since %s. lastVer:%" PRId64 ", snapshotVer:%" PRId64, pNode->vgId, terrstr(), lastVer, commitIndex); - return -1; + TAOS_RETURN(code); } } - return 0; + TAOS_RETURN(code); } // open/close -------------- @@ -981,13 +1032,15 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { pSyncNode->raftCfg.configIndexCount = 1; pSyncNode->raftCfg.configIndexArr[0] = -1; - if (syncWriteCfgFile(pSyncNode) != 0) { + if ((code = syncWriteCfgFile(pSyncNode)) != 0) { + terrno = code; sError("vgId:%d, failed to create sync cfg file", pSyncNode->vgId); goto _error; } } else { // update syncCfg by raft_config.json - if (syncReadCfgFile(pSyncNode) != 0) { + if ((code = syncReadCfgFile(pSyncNode)) != 0) { + terrno = code; sError("vgId:%d, failed to read sync cfg file", pSyncNode->vgId); goto _error; } @@ -996,7 +1049,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) { sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId); pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg; - if (syncWriteCfgFile(pSyncNode) != 0) { + if ((code = syncWriteCfgFile(pSyncNode)) != 0) { + terrno = code; sError("vgId:%d, failed to write sync cfg file", pSyncNode->vgId); goto _error; } @@ -1028,7 +1082,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) { if (updated) { sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId); - if (syncWriteCfgFile(pSyncNode) != 0) { + if ((code = syncWriteCfgFile(pSyncNode)) != 0) { + terrno = code; sError("vgId:%d, failed to write sync cfg file on dnode info updated", pSyncNode->vgId); goto _error; } @@ -1042,7 +1097,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { pSyncNode->syncEqCtrlMsg = pSyncInfo->syncEqCtrlMsg; // create raft log ring buffer - (void)syncLogBufferCreate(&pSyncNode->pLogBuf); // TODO: check return value + pSyncNode->pLogBuf = syncLogBufferCreate(); if (pSyncNode->pLogBuf == NULL) { sError("failed to init sync log buffer since %s. vgId:%d", terrstr(), pSyncNode->vgId); goto _error; @@ -1051,6 +1106,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // init internal pSyncNode->myNodeInfo = pSyncNode->raftCfg.cfg.nodeInfo[pSyncNode->raftCfg.cfg.myIndex]; if (!syncUtilNodeInfo2RaftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId)) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, failed to determine my raft member id", pSyncNode->vgId); goto _error; } @@ -1072,6 +1128,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { } for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { if (!syncUtilNodeInfo2RaftId(&pSyncNode->peersNodeInfo[i], pSyncNode->vgId, &pSyncNode->peersId[i])) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, failed to determine raft member id, peer:%d", pSyncNode->vgId, i); goto _error; } @@ -1082,6 +1139,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { pSyncNode->totalReplicaNum = pSyncNode->raftCfg.cfg.totalReplicaNum; for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) { if (!syncUtilNodeInfo2RaftId(&pSyncNode->raftCfg.cfg.nodeInfo[i], pSyncNode->vgId, &pSyncNode->replicasId[i])) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, failed to determine raft member id, replica:%d", pSyncNode->vgId, i); goto _error; } @@ -1122,7 +1180,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; pSyncNode->roleTimeMs = taosGetTimestampMs(); - if (raftStoreOpen(pSyncNode) != 0) { + if ((code = raftStoreOpen(pSyncNode)) != 0) { + terrno = code; sError("vgId:%d, failed to open raft store at path %s", pSyncNode->vgId, pSyncNode->raftStorePath); goto _error; } @@ -1170,6 +1229,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { if (pSyncNode->fsmState == SYNC_FSM_STATE_INCOMPLETE) { sError("vgId:%d, fsm state is incomplete.", pSyncNode->vgId); if (pSyncNode->replicaNum == 1) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; goto _error; } } @@ -1178,7 +1238,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { sInfo("vgId:%d, sync node commitIndex initialized as %" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); // restore log store on need - if (syncNodeLogStoreRestoreOnNeed(pSyncNode) < 0) { + if ((code = syncNodeLogStoreRestoreOnNeed(pSyncNode)) < 0) { + terrno = code; sError("vgId:%d, failed to restore log store since %s.", pSyncNode->vgId, terrstr()); goto _error; } @@ -1215,11 +1276,14 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // init peer heartbeat timer for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { - syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]); + if (code = syncHbTimerInit(pSyncNode, &(pSyncNode->peerHeartbeatTimerArr[i]), (pSyncNode->replicasId)[i]) != 0) { + errno = code; + goto _error; + } } // tools - (void)syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS, &pSyncNode->pSyncRespMgr); // TODO: check return value + pSyncNode->pSyncRespMgr = syncRespMgrCreate(pSyncNode, SYNC_RESP_TTL_MS); if (pSyncNode->pSyncRespMgr == NULL) { sError("vgId:%d, failed to create SyncRespMgr", pSyncNode->vgId); goto _error; @@ -1230,8 +1294,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { // snapshot senders for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { - SSyncSnapshotSender* pSender = NULL; - code = snapshotSenderCreate(pSyncNode, i, &pSender); + SSyncSnapshotSender* pSender = snapshotSenderCreate(pSyncNode, i); if (pSender == NULL) return NULL; pSyncNode->senders[i] = pSender; @@ -1239,7 +1302,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { } // snapshot receivers - code = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID, &pSyncNode->pNewNodeReceiver); + pSyncNode->pNewNodeReceiver = snapshotReceiverCreate(pSyncNode, EMPTY_RAFT_ID); if (pSyncNode->pNewNodeReceiver == NULL) return NULL; sRDebug(pSyncNode->pNewNodeReceiver, "snapshot receiver create while open sync node, data:%p", pSyncNode->pNewNodeReceiver); @@ -1248,13 +1311,15 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { pSyncNode->changing = false; // replication mgr - if (syncNodeLogReplInit(pSyncNode) < 0) { + if ((code = syncNodeLogReplInit(pSyncNode)) < 0) { + terrno = code; sError("vgId:%d, failed to init repl mgr since %s.", pSyncNode->vgId, terrstr()); goto _error; } // peer state - if (syncNodePeerStateInit(pSyncNode) < 0) { + if ((code = syncNodePeerStateInit(pSyncNode)) < 0) { + terrno = code; sError("vgId:%d, failed to init peer stat since %s.", pSyncNode->vgId, terrstr()); goto _error; } @@ -1274,7 +1339,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) { atomic_store_64(&pSyncNode->snapshottingIndex, SYNC_INDEX_INVALID); // init log buffer - if (syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode) < 0) { + if ((code = syncLogBufferInit(pSyncNode->pLogBuf, pSyncNode)) < 0) { + terrno = code; sError("vgId:%d, failed to init sync log buffer since %s", pSyncNode->vgId, terrstr()); goto _error; } @@ -1315,6 +1381,7 @@ void syncNodeMaybeUpdateCommitBySnapshot(SSyncNode* pSyncNode) { #endif int32_t syncNodeRestore(SSyncNode* pSyncNode) { + int32_t code = 0; ASSERTS(pSyncNode->pLogStore != NULL, "log store not created"); ASSERTS(pSyncNode->pLogBuf != NULL, "ring log buffer not created"); @@ -1325,22 +1392,25 @@ int32_t syncNodeRestore(SSyncNode* pSyncNode) { taosThreadMutexUnlock(&pSyncNode->pLogBuf->mutex); if (lastVer != -1 && endIndex != lastVer + 1) { - terrno = TSDB_CODE_WAL_LOG_INCOMPLETE; + code = TSDB_CODE_WAL_LOG_INCOMPLETE; sError("vgId:%d, failed to restore sync node since %s. expected lastLogIndex:%" PRId64 ", lastVer:%" PRId64 "", pSyncNode->vgId, terrstr(), endIndex - 1, lastVer); - return -1; + TAOS_RETURN(code); } ASSERT(endIndex == lastVer + 1); pSyncNode->commitIndex = TMAX(pSyncNode->commitIndex, commitIndex); sInfo("vgId:%d, restore sync until commitIndex:%" PRId64, pSyncNode->vgId, pSyncNode->commitIndex); - if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE && - syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex) < 0) { - return -1; + if (pSyncNode->fsmState != SYNC_FSM_STATE_INCOMPLETE) { + return TSDB_CODE_SYN_WRONG_FSM_STATE; } - return 0; + if ((code = syncLogBufferCommit(pSyncNode->pLogBuf, pSyncNode, pSyncNode->commitIndex)) < 0) { + TAOS_RETURN(code); + } + + TAOS_RETURN(code); } int32_t syncNodeStart(SSyncNode* pSyncNode) { @@ -1353,7 +1423,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) { syncNodeBecomeLeader(pSyncNode, "one replica start"); // Raft 3.6.2 Committing entries from previous terms - syncNodeAppendNoop(pSyncNode); + TAOS_CHECK_RETURN(syncNodeAppendNoop(pSyncNode)); } else { syncNodeBecomeFollower(pSyncNode, "first start"); } @@ -1362,7 +1432,7 @@ int32_t syncNodeStart(SSyncNode* pSyncNode) { int32_t ret = 0; ret = syncNodeStartPingTimer(pSyncNode); if (ret != 0) { - sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, terrstr()); + sError("vgId:%d, failed to start ping timer since %s", pSyncNode->vgId, tstrerror(ret)); } return ret; } @@ -1518,8 +1588,10 @@ int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode, int32_t ms) { pSyncNode->electTimerParam.pSyncNode = pSyncNode; pSyncNode->electTimerParam.pData = NULL; - taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), syncEnv()->pTimerManager, - &pSyncNode->pElectTimer); + if (!taosTmrReset(pSyncNode->FpElectTimerCB, pSyncNode->electTimerMS, (void*)(pSyncNode->rid), + syncEnv()->pTimerManager, &pSyncNode->pElectTimer)) { + ret = TSDB_CODE_SYN_INTERNAL_ERROR; + } } else { sError("vgId:%d, start elect timer error, sync env is stop", pSyncNode->vgId); @@ -1538,12 +1610,13 @@ int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) { int32_t ret = 0; - syncNodeStopElectTimer(pSyncNode); - syncNodeStartElectTimer(pSyncNode, ms); + TAOS_CHECK_RETURN(syncNodeStopElectTimer(pSyncNode)); + TAOS_CHECK_RETURN(syncNodeStartElectTimer(pSyncNode, ms)); return ret; } void syncNodeResetElectTimer(SSyncNode* pSyncNode) { + int32_t code = 0; int32_t electMS; if (pSyncNode->raftCfg.isStandBy) { @@ -1552,7 +1625,9 @@ void syncNodeResetElectTimer(SSyncNode* pSyncNode) { electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine); } - (void)syncNodeRestartElectTimer(pSyncNode, electMS); + if ((code = syncNodeRestartElectTimer(pSyncNode, electMS)) != 0) { + sError("failed to restart elect timer since %s", tstrerror(code)); + } sNTrace(pSyncNode, "reset elect timer, min:%d, max:%d, ms:%d", pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine, electMS); @@ -1585,7 +1660,7 @@ int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); if (pSyncTimer != NULL) { - syncHbTimerStart(pSyncNode, pSyncTimer); + TAOS_CHECK_RETURN(syncHbTimerStart(pSyncNode, pSyncTimer)); } } @@ -1604,7 +1679,7 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { for (int32_t i = 0; i < pSyncNode->peersNum; ++i) { SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i])); if (pSyncTimer != NULL) { - syncHbTimerStop(pSyncNode, pSyncTimer); + TAOS_CHECK_RETURN(syncHbTimerStop(pSyncNode, pSyncTimer)); } } @@ -1636,12 +1711,12 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pNode, SRpcMsg } if (code < 0) { - sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, terrstr(), epSet, - DID(destRaftId), destRaftId->addr); + sError("vgId:%d, failed to send sync msg since %s. epset:%p dnode:%d addr:%" PRId64, pNode->vgId, tstrerror(code), + epSet, DID(destRaftId), destRaftId->addr); rpcFreeCont(pMsg->pCont); } - return code; + TAOS_RETURN(code); } inline bool syncNodeInConfig(SSyncNode* pNode, const SSyncCfg* pCfg) { @@ -1812,7 +1887,7 @@ void syncNodeDoConfigChange(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncInde // create new for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { if (pSyncNode->senders[i] == NULL) { - snapshotSenderCreate(pSyncNode, i, &pSyncNode->senders[i]); + pSyncNode->senders[i] = snapshotSenderCreate(pSyncNode, i); if (pSyncNode->senders[i] == NULL) { // will be created later while send snapshot sSError(pSyncNode->senders[i], "snapshot sender create failed while reconfig"); @@ -2145,7 +2220,7 @@ int32_t syncNodeAssignedLeader2Leader(SSyncNode* pSyncNode) { int32_t ret = syncNodeAppendNoop(pSyncNode); if (ret < 0) { - sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, terrstr()); + sError("vgId:%d, failed to append noop entry since %s", pSyncNode->vgId, tstrerror(ret)); } SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore); @@ -2594,8 +2669,9 @@ void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TO } int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) { + int32_t code = 0; if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) { - return -1; + return TSDB_CODE_SYN_INTERNAL_ERROR; } SMsgHead* head = (SMsgHead*)pEntry->data; @@ -2603,8 +2679,8 @@ int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) { SAlterVnodeTypeReq req = {0}; if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + TAOS_RETURN(code); } SSyncCfg cfg = {0}; @@ -2755,6 +2831,7 @@ void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) { } int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) { + int32_t code = 0; // 1.rebuild replicasId, remove deleted one SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA]; memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId)); @@ -2762,13 +2839,19 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum ths->replicaNum = ths->raftCfg.cfg.replicaNum; ths->totalReplicaNum = ths->raftCfg.cfg.totalReplicaNum; for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) { - syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]); + if (!syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i])) + return TSDB_CODE_SYN_INTERNAL_ERROR; } // 2.rebuild MatchIndex, remove deleted one SSyncIndexMgr* oldIndex = ths->pMatchIndex; ths->pMatchIndex = syncIndexMgrCreate(ths); + if (ths->pMatchIndex == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } syncIndexMgrCopyIfExist(ths->pMatchIndex, oldIndex, oldReplicasId); @@ -2778,6 +2861,11 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum SSyncIndexMgr* oldNextIndex = ths->pNextIndex; ths->pNextIndex = syncIndexMgrCreate(ths); + if (ths->pNextIndex == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + TAOS_RETURN(code); + } syncIndexMgrCopyIfExist(ths->pNextIndex, oldNextIndex, oldReplicasId); @@ -2797,7 +2885,7 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum SSyncLogReplMgr* oldLogReplMgrs = NULL; int64_t length = sizeof(SSyncLogReplMgr) * (TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA); oldLogReplMgrs = taosMemoryMalloc(length); - if (NULL == oldLogReplMgrs) return -1; + if (NULL == oldLogReplMgrs) return TSDB_CODE_OUT_OF_MEMORY; memset(oldLogReplMgrs, 0, length); for (int i = 0; i < oldtotalReplicaNum; i++) { @@ -2805,7 +2893,10 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum } syncNodeLogReplDestroy(ths); - syncNodeLogReplInit(ths); + if ((code = syncNodeLogReplInit(ths)) != 0) { + taosMemoryFree(oldLogReplMgrs); + TAOS_RETURN(code); + } for (int i = 0; i < ths->totalReplicaNum; ++i) { for (int j = 0; j < oldtotalReplicaNum; j++) { @@ -2842,9 +2933,13 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum } for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { - SSyncSnapshotSender* pSender = NULL; - int32_t code = snapshotSenderCreate(ths, i, &pSender); - if (pSender == NULL) return terrno = code; + SSyncSnapshotSender* pSender = snapshotSenderCreate(ths, i); + if (pSender == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; + taosMemoryFree(oldLogReplMgrs); + TAOS_RETURN(code); + } ths->senders[i] = pSender; sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender); @@ -2856,13 +2951,22 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum } // 7.rebuild synctimer - syncNodeStopHeartbeatTimer(ths); - - for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { - syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i]); + if ((code = syncNodeStopHeartbeatTimer(ths)) != 0) { + taosMemoryFree(oldLogReplMgrs); + TAOS_RETURN(code); } - syncNodeStartHeartbeatTimer(ths); + for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) { + if ((code = syncHbTimerInit(ths, &ths->peerHeartbeatTimerArr[i], ths->replicasId[i])) != 0) { + taosMemoryFree(oldLogReplMgrs); + TAOS_RETURN(code); + } + } + + if ((code = syncNodeStartHeartbeatTimer(ths)) != 0) { + taosMemoryFree(oldLogReplMgrs); + TAOS_RETURN(code); + } // 8.rebuild peerStates SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0}; @@ -2920,8 +3024,9 @@ void syncNodeResetPeerAndCfg(SSyncNode* ths) { } int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) { + int32_t code = 0; if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) { - return -1; + return TSDB_CODE_SYN_INTERNAL_ERROR; } SMsgHead* head = (SMsgHead*)pEntry->data; @@ -2929,8 +3034,8 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) SAlterVnodeTypeReq req = {0}; if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; + code = TSDB_CODE_INVALID_MSG; + TAOS_RETURN(code); } SSyncCfg cfg = {0}; @@ -2994,12 +3099,12 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) // no need to change myNodeInfo - if (syncNodeRebuildPeerAndCfg(ths, &cfg) != 0) { - return -1; + if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) { + TAOS_RETURN(code); }; - if (syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0) { - return -1; + if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) { + TAOS_RETURN(code); }; } else { // remove myself // no need to do anything actually, to change the following to reduce distruptive server chance @@ -3016,8 +3121,8 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) ths->raftCfg.cfg.totalReplicaNum = 1; // change other - if (syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0) { - return -1; + if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) { + TAOS_RETURN(code); } // change state @@ -3059,13 +3164,13 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) // no need to change myNodeInfo // change peer and cfg - if (syncNodeRebuildPeerAndCfg(ths, &cfg) != 0) { - return -1; + if ((code = syncNodeRebuildPeerAndCfg(ths, &cfg)) != 0) { + TAOS_RETURN(code); }; // change other - if (syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0) { - return -1; + if ((code = syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum)) != 0) { + TAOS_RETURN(code); }; // no need to change state @@ -3084,17 +3189,18 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) syncNodeLogConfigInfo(ths, &cfg, "after config change"); - if (syncWriteCfgFile(ths) != 0) { + if ((code = syncWriteCfgFile(ths)) != 0) { sError("vgId:%d, failed to create sync cfg file", ths->vgId); - return -1; + TAOS_RETURN(code); }; - return 0; + TAOS_RETURN(code); } int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { int32_t code = -1; if (pEntry->dataLen < sizeof(SMsgHead)) { + code = TSDB_CODE_SYN_INTERNAL_ERROR; sError("vgId:%d, cannot append an invalid client request with no msg head. type:%s, dataLen:%d", ths->vgId, TMSG_INFO(pEntry->originalRpcType), pEntry->dataLen); syncEntryDestroy(pEntry); @@ -3102,7 +3208,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) { } // append to log buffer - if (syncLogBufferAppend(ths->pLogBuf, ths, pEntry) < 0) { + if ((code = syncLogBufferAppend(ths->pLogBuf, ths, pEntry)) < 0) { sError("vgId:%d, failed to enqueue sync log buffer, index:%" PRId64, ths->vgId, pEntry->index); ASSERT(terrno != 0); (void)syncFsmExecute(ths, ths->pFsm, ths->state, raftStoreGetTerm(ths), pEntry, terrno, false); @@ -3126,24 +3232,24 @@ _out:; if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->assignedCommitIndex) < 0) { sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex); - code = -1; + code = TSDB_CODE_SYN_INTERNAL_ERROR; } } // multi replica if (ths->replicaNum > 1) { - return code; + TAOS_RETURN(code); } // single replica (void)syncNodeUpdateCommitIndex(ths, matchIndex); - if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { + if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && + (code = syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex)) < 0) { sError("vgId:%d, failed to commit until commitIndex:%" PRId64 "", ths->vgId, ths->commitIndex); - code = -1; } - return code; + TAOS_RETURN(code); } bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) { @@ -3192,17 +3298,18 @@ bool syncNodeSnapshotRecving(SSyncNode* pSyncNode) { } static int32_t syncNodeAppendNoop(SSyncNode* ths) { + int32_t code = 0; SyncIndex index = syncLogBufferGetEndIndex(ths->pLogBuf); SyncTerm term = raftStoreGetTerm(ths); SSyncRaftEntry* pEntry = syncEntryBuildNoop(term, index, ths->vgId); if (pEntry == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = TSDB_CODE_OUT_OF_MEMORY; + TAOS_RETURN(code); } - int32_t ret = syncNodeAppend(ths, pEntry); - return 0; + code = syncNodeAppend(ths, pEntry); + TAOS_RETURN(code); } #ifdef BUILD_NO_CALL @@ -3328,13 +3435,14 @@ int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pRpcMsg) { } // reply - syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + TAOS_CHECK_RETURN(syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg)); if (resetElect) syncNodeResetElectTimer(ths); return 0; } int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { + int32_t code = 0; const STraceId* trace = &pRpcMsg->info.traceId; char tbuf[40] = {0}; TRACE_TO_STR(trace, tbuf); @@ -3342,8 +3450,10 @@ int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pRpcMsg) { SyncHeartbeatReply* pMsg = pRpcMsg->pCont; SSyncLogReplMgr* pMgr = syncNodeGetLogReplMgr(ths, &pMsg->srcId); if (pMgr == NULL) { + code = TSDB_CODE_SYN_RETURN_VALUE_NULL; + if (terrno != 0) code = terrno; sError("vgId:%d, failed to get log repl mgr for the peer at addr 0x016%" PRIx64 "", ths->vgId, pMsg->srcId.addr); - return -1; + TAOS_RETURN(code); } int64_t tsMs = taosGetTimestampMs(); @@ -3424,6 +3534,11 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index); } + if (pEntry == NULL) { + sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr()); + return TSDB_CODE_SYN_INTERNAL_ERROR; + } + // 1->2, config change is add in write thread, and will continue in sync thread // need save message for it if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) { @@ -3432,11 +3547,6 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn pEntry->seqNum = seqNum; } - if (pEntry == NULL) { - sError("vgId:%d, failed to process client request since %s.", ths->vgId, terrstr()); - return -1; - } - if (ths->state == TAOS_SYNC_STATE_LEADER || ths->state == TAOS_SYNC_STATE_ASSIGNED_LEADER) { if (pRetIndex) { (*pRetIndex) = index; @@ -3448,7 +3558,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr()); syncEntryDestroy(pEntry); pEntry = NULL; - return -1; + TAOS_RETURN(code); } if (code > 0) { @@ -3459,7 +3569,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn } syncEntryDestroy(pEntry); pEntry = NULL; - return -1; + TAOS_RETURN(code); } } @@ -3468,7 +3578,7 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn } else { syncEntryDestroy(pEntry); pEntry = NULL; - return -1; + return TSDB_CODE_SYN_INTERNAL_ERROR; } } @@ -3506,7 +3616,7 @@ int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg) { } } - return -1; + return TSDB_CODE_SYN_INTERNAL_ERROR; } bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 61672478c9..e8a12fadfc 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -528,6 +528,13 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG, "Sync invalid snapshot TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BUFFER_FULL, "Sync buffer is full") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRITE_STALL, "Sync write stall") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEGOTIATION_WIN_FULL, "Sync negotiation win is full") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_TERM, "Sync got a wrong term") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_FSM_STATE, "Sync got a wrong fsm state") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_SYNC_STATE, "Sync got a wrong state") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_REF, "Sync got a wrong ref") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_ID, "Sync invalid id") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RETURN_VALUE_NULL, "Sync got a null return") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_WRONG_ROLE, "Sync got a wrong role") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") //tq