Merge pull request #28001 from taosdata/fix/TD-31891-remove-void-sync2

fix/TD-31891-remove-void-sync2
This commit is contained in:
Hongze Cheng 2024-09-23 09:38:04 +08:00 committed by GitHub
commit f20bfd7143
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 83 additions and 46 deletions

View File

@ -179,7 +179,7 @@ _SEND_RESPONSE:
} }
// ack, i.e. send response // ack, i.e. send response
(void)syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp); TAOS_CHECK_RETURN(syncNodeSendMsgById(&pReply->destId, ths, &rpcRsp));
// commit index, i.e. leader notice me // commit index, i.e. leader notice me
if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) { if (ths->fsmState != SYNC_FSM_STATE_INCOMPLETE && syncLogBufferCommit(ths->pLogBuf, ths, ths->commitIndex) < 0) {

View File

@ -87,7 +87,10 @@ int64_t syncNodeAdd(SSyncNode *pNode) {
void syncNodeRemove(int64_t rid) { void syncNodeRemove(int64_t rid) {
sDebug("sync node refId:%" PRId64 " is removed from rsetId:%d", rid, gNodeRefId); sDebug("sync node refId:%" PRId64 " is removed from rsetId:%d", rid, gNodeRefId);
if (rid > 0) { if (rid > 0) {
(void)taosRemoveRef(gNodeRefId, rid); int32_t code = 0;
if ((code = taosRemoveRef(gNodeRefId, rid)) != 0)
sError("failed to remove sync node from refId:%" PRId64 ", rsetId:%d, since %s", rid, gNodeRefId,
tstrerror(code));
} }
} }
@ -103,7 +106,10 @@ SSyncNode *syncNodeAcquire(int64_t rid) {
void syncNodeRelease(SSyncNode *pNode) { void syncNodeRelease(SSyncNode *pNode) {
if (pNode) { if (pNode) {
(void)taosReleaseRef(gNodeRefId, pNode->rid); int32_t code = 0;
if ((code = taosReleaseRef(gNodeRefId, pNode->rid)) != 0)
sError("failed to release sync node from refId:%" PRId64 ", rsetId:%d, since %s", pNode->rid, gNodeRefId,
tstrerror(code));
} }
} }
@ -118,7 +124,9 @@ int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
void syncHbTimerDataRemove(int64_t rid) { void syncHbTimerDataRemove(int64_t rid) {
if (rid > 0) { if (rid > 0) {
(void)taosRemoveRef(gHbDataRefId, rid); int32_t code = 0;
if ((code = taosRemoveRef(gHbDataRefId, rid)) != 0)
sError("failed to remove hbdata from refId:%" PRId64 ", rsetId:%d, since %s", rid, gHbDataRefId, tstrerror(code));
} }
} }
@ -134,6 +142,10 @@ SSyncHbTimerData *syncHbTimerDataAcquire(int64_t rid) {
void syncHbTimerDataRelease(SSyncHbTimerData *pData) { void syncHbTimerDataRelease(SSyncHbTimerData *pData) {
if (pData) { if (pData) {
(void)taosReleaseRef(gHbDataRefId, pData->rid); int32_t code = 0;
if ((code = taosReleaseRef(gHbDataRefId, pData->rid)) != 0) {
sError("failed to release hbdata from refId:%" PRId64 ", rsetId:%d, since %s", pData->rid, gHbDataRefId,
tstrerror(code));
}
} }
} }

View File

@ -3644,7 +3644,9 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
if (code > 0) { if (code > 0) {
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info}; SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
(void)syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info); int32_t num = syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
sDebug("vgId:%d, get response stub for config change, seqNum:%" PRIu64 ", num:%d", ths->vgId, pEntry->seqNum,
num);
if (rsp.info.handle != NULL) { if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} }

View File

@ -450,14 +450,14 @@ int32_t syncLogBufferAccept(SSyncLogBuffer* pBuf, SSyncNode* pNode, SSyncRaftEnt
goto _out; goto _out;
} }
if (pEntry->term != pExist->term) { if (pEntry->term != pExist->term) {
(void)syncLogBufferRollback(pBuf, pNode, index); TAOS_CHECK_GOTO(syncLogBufferRollback(pBuf, pNode, index), NULL, _out);
} else { } else {
sTrace("vgId:%d, duplicate log entry received. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64 sTrace("vgId:%d, duplicate log entry received. index:%" PRId64 ", term:%" PRId64 ". log buffer: [%" PRId64
" %" PRId64 " %" PRId64 ", %" PRId64 ")", " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pNode->vgId, pEntry->index, pEntry->term, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex,
pBuf->endIndex); pBuf->endIndex);
SyncTerm existPrevTerm = -1; SyncTerm existPrevTerm = -1;
(void)syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm); TAOS_CHECK_GOTO(syncLogReplGetPrevLogTerm(NULL, pNode, index, &existPrevTerm), NULL, _out);
if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) { if (!(pEntry->term == pExist->term && (pEntry->index > pBuf->matchIndex || prevTerm == existPrevTerm))) {
sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64 sError("vgId:%d, failed to accept, pEntry->term:%" PRId64 ", pExist->indexpExist->term:%" PRId64
", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64 ", pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64 ", prevTerm:%" PRId64
@ -650,7 +650,10 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
} }
// replicate on demand // replicate on demand
(void)syncNodeReplicateWithoutLock(pNode); if ((code = syncNodeReplicateWithoutLock(pNode)) != 0) {
sError("vgId:%d, failed to replicate since %s. index:%" PRId64, pNode->vgId, tstrerror(code), pEntry->index);
goto _out;
}
if (pEntry->index != pBuf->matchIndex) { if (pEntry->index != pBuf->matchIndex) {
sError("vgId:%d, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId, sError("vgId:%d, failed to proceed, pEntry->index:%" PRId64 ", pBuf->matchIndex:%" PRId64, pNode->vgId,
@ -721,7 +724,8 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
cbMeta.currentTerm = term; cbMeta.currentTerm = term;
cbMeta.flag = -1; cbMeta.flag = -1;
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); int32_t num = syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
sDebug("vgId:%d, get response info, seqNum:%" PRId64 ", num:%d", pNode->vgId, cbMeta.seqNum, num);
code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE); retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
if (retry) { if (retry) {
@ -1033,7 +1037,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
} }
} else { } else {
if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) { if (pMsg->lastSendIndex < pMgr->startIndex || pMsg->lastSendIndex >= pMgr->endIndex) {
(void)syncLogReplRetryOnNeed(pMgr, pNode); TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
return 0; return 0;
} }
@ -1108,7 +1112,7 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
} }
// attempt to replicate the raft log at index // attempt to replicate the raft log at index
(void)syncLogReplReset(pMgr); syncLogReplReset(pMgr);
return syncLogReplProbe(pMgr, pNode, index); return syncLogReplProbe(pMgr, pNode, index);
} }
@ -1136,9 +1140,9 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp
} }
if (pMgr->restored) { if (pMgr->restored) {
(void)syncLogReplContinue(pMgr, pNode, pMsg); TAOS_CHECK_RETURN(syncLogReplContinue(pMgr, pNode, pMsg));
} else { } else {
(void)syncLogReplRecover(pMgr, pNode, pMsg); TAOS_CHECK_RETURN(syncLogReplRecover(pMgr, pNode, pMsg));
} }
(void)taosThreadMutexUnlock(&pBuf->mutex); (void)taosThreadMutexUnlock(&pBuf->mutex);
return 0; return 0;
@ -1146,9 +1150,9 @@ int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncApp
int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) { int32_t syncLogReplStart(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
if (pMgr->restored) { if (pMgr->restored) {
(void)syncLogReplAttempt(pMgr, pNode); TAOS_CHECK_RETURN(syncLogReplAttempt(pMgr, pNode));
} else { } else {
(void)syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex); TAOS_CHECK_RETURN(syncLogReplProbe(pMgr, pNode, pNode->pLogBuf->matchIndex));
} }
return 0; return 0;
} }
@ -1164,7 +1168,7 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) { nowMs < pMgr->states[pMgr->startIndex % pMgr->size].timeMs + retryMaxWaitMs) {
return 0; return 0;
} }
(void)syncLogReplReset(pMgr); syncLogReplReset(pMgr);
SRaftId* pDestId = &pNode->replicasId[pMgr->peerId]; SRaftId* pDestId = &pNode->replicasId[pMgr->peerId];
bool barrier = false; bool barrier = false;
@ -1237,7 +1241,7 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
} }
} }
(void)syncLogReplRetryOnNeed(pMgr, pNode); TAOS_CHECK_RETURN(syncLogReplRetryOnNeed(pMgr, pNode));
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64 sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64
@ -1291,7 +1295,7 @@ void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
if (pMgr == NULL) { if (pMgr == NULL) {
return; return;
} }
(void)taosMemoryFree(pMgr); taosMemoryFree(pMgr);
return; return;
} }
@ -1374,7 +1378,7 @@ void syncLogBufferDestroy(SSyncLogBuffer* pBuf) {
syncLogBufferClear(pBuf); syncLogBufferClear(pBuf);
(void)taosThreadMutexDestroy(&pBuf->mutex); (void)taosThreadMutexDestroy(&pBuf->mutex);
(void)taosThreadMutexAttrDestroy(&pBuf->attr); (void)taosThreadMutexAttrDestroy(&pBuf->attr);
(void)taosMemoryFree(pBuf); taosMemoryFree(pBuf);
return; return;
} }
@ -1395,7 +1399,7 @@ int32_t syncLogBufferRollback(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncIndex
while (index >= toIndex) { while (index >= toIndex) {
SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem; SSyncRaftEntry* pEntry = pBuf->entries[index % pBuf->size].pItem;
if (pEntry != NULL) { if (pEntry != NULL) {
(void)syncEntryDestroy(pEntry); syncEntryDestroy(pEntry);
pEntry = NULL; pEntry = NULL;
(void)memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0])); (void)memset(&pBuf->entries[index % pBuf->size], 0, sizeof(pBuf->entries[0]));
} }
@ -1435,7 +1439,7 @@ int32_t syncLogBufferReset(SSyncLogBuffer* pBuf, SSyncNode* pNode) {
if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR; if (lastVer != pBuf->matchIndex) return TSDB_CODE_SYN_INTERNAL_ERROR;
SyncIndex index = pBuf->endIndex - 1; SyncIndex index = pBuf->endIndex - 1;
(void)syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1); TAOS_CHECK_RETURN(syncLogBufferRollback(pBuf, pNode, pBuf->matchIndex + 1));
sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId, sInfo("vgId:%d, reset sync log buffer. buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pNode->vgId,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex); pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
@ -1483,6 +1487,7 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind
SyncTerm prevLogTerm = -1; SyncTerm prevLogTerm = -1;
SSyncLogBuffer* pBuf = pNode->pLogBuf; SSyncLogBuffer* pBuf = pNode->pLogBuf;
int32_t code = 0; int32_t code = 0;
int32_t lino = 0;
code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry); code = syncLogBufferGetOneEntry(pBuf, pNode, index, &inBuf, &pEntry);
if (pEntry == NULL) { if (pEntry == NULL) {
@ -1492,7 +1497,7 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind
if (pMgr) { if (pMgr) {
sInfo("vgId:%d, reset sync log repl of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr, sInfo("vgId:%d, reset sync log repl of peer:%" PRIx64 " since %s. index:%" PRId64, pNode->vgId, pDestId->addr,
tstrerror(code), index); tstrerror(code), index);
(void)syncLogReplReset(pMgr); syncLogReplReset(pMgr);
} }
} }
goto _err; goto _err;
@ -1512,7 +1517,7 @@ int32_t syncLogReplSendTo(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex ind
goto _err; goto _err;
} }
(void)syncNodeSendAppendEntries(pNode, pDestId, &msgOut); TAOS_CHECK_GOTO(syncNodeSendAppendEntries(pNode, pDestId, &msgOut), &lino, _err);
sTrace("vgId:%d, replicate one msg index:%" PRId64 " term:%" PRId64 " prevterm:%" PRId64 " to dest: 0x%016" PRIx64, sTrace("vgId:%d, replicate one msg index:%" PRId64 " term:%" PRId64 " prevterm:%" PRId64 " to dest: 0x%016" PRIx64,
pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr); pNode->vgId, pEntry->index, pEntry->term, prevLogTerm, pDestId->addr);

View File

@ -68,14 +68,16 @@ int32_t syncNodeReplicate(SSyncNode* pNode) {
int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) { int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
if ((pNode->state != TAOS_SYNC_STATE_LEADER && pNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) || if ((pNode->state != TAOS_SYNC_STATE_LEADER && pNode->state != TAOS_SYNC_STATE_ASSIGNED_LEADER) ||
pNode->raftCfg.cfg.totalReplicaNum == 1) { pNode->raftCfg.cfg.totalReplicaNum == 1) {
TAOS_RETURN(TSDB_CODE_FAILED); TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
for (int32_t i = 0; i < pNode->totalReplicaNum; i++) { for (int32_t i = 0; i < pNode->totalReplicaNum; i++) {
if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) { if (syncUtilSameId(&pNode->replicasId[i], &pNode->myRaftId)) {
continue; continue;
} }
SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i]; SSyncLogReplMgr* pMgr = pNode->logReplMgrs[i];
(void)syncLogReplStart(pMgr, pNode); if (syncLogReplStart(pMgr, pNode) != 0) {
sError("vgId:%d, failed to start log replication to dnode:%d", pNode->vgId, DID(&(pNode->replicasId[i])));
}
} }
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);
@ -84,7 +86,7 @@ int32_t syncNodeReplicateWithoutLock(SSyncNode* pNode) {
int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) { int32_t syncNodeSendAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, SRpcMsg* pRpcMsg) {
SyncAppendEntries* pMsg = pRpcMsg->pCont; SyncAppendEntries* pMsg = pRpcMsg->pCont;
pMsg->destId = *destRaftId; pMsg->destId = *destRaftId;
(void)syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg); TAOS_CHECK_RETURN(syncNodeSendMsgById(destRaftId, pSyncNode, pRpcMsg));
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);
} }
@ -116,7 +118,9 @@ int32_t syncNodeHeartbeatPeers(SSyncNode* pSyncNode) {
STraceId* trace = &(rpcMsg.info.traceId); STraceId* trace = &(rpcMsg.info.traceId);
sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId))); sGTrace("vgId:%d, send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0, 0); syncLogSendHeartbeat(pSyncNode, pSyncMsg, true, 0, 0);
(void)syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg); if (syncNodeSendHeartbeat(pSyncNode, &pSyncMsg->destId, &rpcMsg) != 0) {
sError("vgId:%d, failed to send sync-heartbeat to dnode:%d", pSyncNode->vgId, DID(&(pSyncMsg->destId)));
}
} }
TAOS_RETURN(TSDB_CODE_SUCCESS); TAOS_RETURN(TSDB_CODE_SUCCESS);

View File

@ -110,7 +110,9 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p
*pInfo = pStub->rpcMsg.info; *pInfo = pStub->rpcMsg.info;
sNTrace(pObj->data, "get-and-del message handle:%p, type:%s seq:%" PRIu64, pStub->rpcMsg.info.handle, sNTrace(pObj->data, "get-and-del message handle:%p, type:%s seq:%" PRIu64, pStub->rpcMsg.info.handle,
TMSG_INFO(pStub->rpcMsg.msgType), seq); TMSG_INFO(pStub->rpcMsg.msgType), seq);
(void)taosHashRemove(pObj->pRespHash, &seq, sizeof(uint64_t)); if (taosHashRemove(pObj->pRespHash, &seq, sizeof(uint64_t)) != 0) {
sError("failed to remove seq:%" PRIu64, seq);
}
(void)taosThreadMutexUnlock(&pObj->mutex); (void)taosThreadMutexUnlock(&pObj->mutex);
return 1; // get one object return 1; // get one object
@ -165,7 +167,9 @@ static int32_t syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT}; SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT};
sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pNode->vgId, rpcMsg.info.handle, sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pNode->vgId, rpcMsg.info.handle,
TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle); TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle);
(void)rpcSendResponse(&rpcMsg); if (rpcSendResponse(&rpcMsg) != 0) {
sError("vgId:%d, failed to send response, handle:%p", pNode->vgId, rpcMsg.info.handle);
}
} }
pStub = taosHashIterate(pObj->pRespHash, pStub); pStub = taosHashIterate(pObj->pRespHash, pStub);
@ -176,7 +180,9 @@ static int32_t syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
for (int32_t i = 0; i < arraySize; ++i) { for (int32_t i = 0; i < arraySize; ++i) {
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i); uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
(void)taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t)); if (taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t)) != 0) {
sError("vgId:%d, failed to remove seq:%" PRIu64, pNode->vgId, *pSeqNum);
}
sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pNode->vgId, *pSeqNum); sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pNode->vgId, *pSeqNum);
} }
taosArrayDestroy(delIndexArray); taosArrayDestroy(delIndexArray);
@ -191,7 +197,10 @@ void syncRespCleanRsp(SSyncRespMgr *pObj) {
sTrace("vgId:%d, clean all resp", pNode->vgId); sTrace("vgId:%d, clean all resp", pNode->vgId);
(void)taosThreadMutexLock(&pObj->mutex); (void)taosThreadMutexLock(&pObj->mutex);
(void)syncRespCleanByTTL(pObj, -1, true); int32_t code = 0;
if ((code = syncRespCleanByTTL(pObj, -1, true)) != 0) {
sError("vgId:%d, failed to clean all resp since %s", pNode->vgId, tstrerror(code));
}
(void)taosThreadMutexUnlock(&pObj->mutex); (void)taosThreadMutexUnlock(&pObj->mutex);
} }
@ -200,6 +209,9 @@ void syncRespClean(SSyncRespMgr *pObj) {
sTrace("vgId:%d, clean resp by ttl", pNode->vgId); sTrace("vgId:%d, clean resp by ttl", pNode->vgId);
(void)taosThreadMutexLock(&pObj->mutex); (void)taosThreadMutexLock(&pObj->mutex);
(void)syncRespCleanByTTL(pObj, pObj->ttl, false); int32_t code = 0;
if ((code = syncRespCleanByTTL(pObj, pObj->ttl, false)) != 0) {
sError("vgId:%d, failed to clean resp by ttl since %s", pNode->vgId, tstrerror(code));
}
(void)taosThreadMutexUnlock(&pObj->mutex); (void)taosThreadMutexUnlock(&pObj->mutex);
} }

View File

@ -916,7 +916,9 @@ static int32_t syncSnapBufferRecv(SSyncSnapshotReceiver *pReceiver, SyncSnapshot
} }
} }
pRcvBuf->start = seq + 1; pRcvBuf->start = seq + 1;
(void)syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code); if (syncSnapSendRsp(pReceiver, pRcvBuf->entries[seq % pRcvBuf->size], NULL, 0, 0, code) != 0) {
sError("failed to send snap rsp");
}
pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]); pRcvBuf->entryDeleteCb(pRcvBuf->entries[seq % pRcvBuf->size]);
pRcvBuf->entries[seq % pRcvBuf->size] = NULL; pRcvBuf->entries[seq % pRcvBuf->size] = NULL;
if (code) goto _out; if (code) goto _out;
@ -1011,7 +1013,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term, sRError(pReceiver, "reject snap replication with smaller term. msg term:%" PRId64 ", seq:%d", pMsg->term,
pMsg->seq); pMsg->seq);
code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE; code = TSDB_CODE_SYN_MISMATCHED_SIGNATURE;
(void)syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code); if (syncSnapSendRsp(pReceiver, pMsg, NULL, 0, 0, code) != 0) sError("failed to send snap rsp");
TAOS_RETURN(code); TAOS_RETURN(code);
} }
@ -1298,13 +1300,13 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, SRpcMsg *pRpcMsg) {
if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) { if (pMsg->ack == SYNC_SNAPSHOT_SEQ_END) {
sSInfo(pSender, "process end rsp"); sSInfo(pSender, "process end rsp");
snapshotSenderStop(pSender, true); snapshotSenderStop(pSender, true);
(void)syncNodeReplicateReset(pSyncNode, &pMsg->srcId); TAOS_CHECK_GOTO(syncNodeReplicateReset(pSyncNode, &pMsg->srcId), NULL, _ERROR);
} }
return 0; return 0;
_ERROR: _ERROR:
snapshotSenderStop(pSender, false); snapshotSenderStop(pSender, false);
(void)syncNodeReplicateReset(pSyncNode, &pMsg->srcId); if (syncNodeReplicateReset(pSyncNode, &pMsg->srcId) != 0) sError("failed to reset replicate");
TAOS_RETURN(code); TAOS_RETURN(code);
} }

View File

@ -65,7 +65,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
} }
// timer replicate // timer replicate
(void)syncNodeReplicate(ths); TAOS_CHECK_RETURN(syncNodeReplicate(ths));
// clean mnode index // clean mnode index
if (syncNodeIsMnode(ths)) { if (syncNodeIsMnode(ths)) {
@ -89,7 +89,7 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
snapshotSenderStop(pSender, false); snapshotSenderStop(pSender, false);
} else { } else {
sSWarn(pSender, "snap replication resend."); sSWarn(pSender, "snap replication resend.");
(void)snapshotReSend(pSender); TAOS_CHECK_RETURN(snapshotReSend(pSender));
} }
} }
} }
@ -112,14 +112,14 @@ int32_t syncNodeOnTimeout(SSyncNode* ths, const SRpcMsg* pRpc) {
if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) { if (atomic_load_64(&ths->pingTimerLogicClockUser) <= pMsg->logicClock) {
++(ths->pingTimerCounter); ++(ths->pingTimerCounter);
(void)syncNodeTimerRoutine(ths); TAOS_CHECK_RETURN(syncNodeTimerRoutine(ths));
} }
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
if (atomic_load_64(&ths->electTimerLogicClock) <= pMsg->logicClock) { if (atomic_load_64(&ths->electTimerLogicClock) <= pMsg->logicClock) {
++(ths->electTimerCounter); ++(ths->electTimerCounter);
(void)syncNodeElect(ths); TAOS_CHECK_RETURN(syncNodeElect(ths));
} }
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {

View File

@ -77,7 +77,7 @@ SSyncNode *createSyncNode() {
void test1() { void test1() {
printf("------- test1 ---------\n"); printf("------- test1 ---------\n");
(void)syncRespMgrCreate(createSyncNode(), 0, &pMgr); assert(syncRespMgrCreate(createSyncNode(), 0, &pMgr) == 0);
assert(pMgr != NULL); assert(pMgr != NULL);
syncRespMgrInsert(10); syncRespMgrInsert(10);
@ -102,7 +102,7 @@ void test1() {
void test2() { void test2() {
printf("------- test2 ---------\n"); printf("------- test2 ---------\n");
(void)syncRespMgrCreate(createSyncNode(), 0, &pMgr); assert(syncRespMgrCreate(createSyncNode(), 0, &pMgr) == 0);
assert(pMgr != NULL); assert(pMgr != NULL);
syncRespMgrInsert(10); syncRespMgrInsert(10);
@ -119,7 +119,7 @@ void test2() {
void test3() { void test3() {
printf("------- test3 ---------\n"); printf("------- test3 ---------\n");
(void)syncRespMgrCreate(createSyncNode(), 0, &pMgr); assert(syncRespMgrCreate(createSyncNode(), 0, &pMgr) == 0);
assert(pMgr != NULL); assert(pMgr != NULL);
syncRespMgrInsert(10); syncRespMgrInsert(10);
@ -136,7 +136,7 @@ void test3() {
void test4() { void test4() {
printf("------- test4 ---------\n"); printf("------- test4 ---------\n");
(void)syncRespMgrCreate(createSyncNode(), 2, &pMgr); assert(syncRespMgrCreate(createSyncNode(), 2, &pMgr) == 0);
assert(pMgr != NULL); assert(pMgr != NULL);
syncRespMgrInsert(5); syncRespMgrInsert(5);

View File

@ -40,7 +40,7 @@ SSyncSnapshotSender* createSender() {
#endif #endif
SSyncSnapshotSender* pSender = NULL; SSyncSnapshotSender* pSender = NULL;
(void)snapshotSenderCreate(pSyncNode, 2, &pSender); assert(snapshotSenderCreate(pSyncNode, 2, &pSender) == 0);
pSender->start = true; pSender->start = true;
pSender->seq = 10; pSender->seq = 10;
pSender->ack = 20; pSender->ack = 20;