Merge pull request #17984 from taosdata/feature/3.0_mhli
refactor(sync): delete assert, call FpCommitCb when multi replica
This commit is contained in:
commit
0656cd8fe4
|
@ -225,7 +225,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
|
||||||
sError("sync begin snapshot error");
|
sError("sync begin snapshot error");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
if (syncNodeIsMnode(pSyncNode)) {
|
if (syncNodeIsMnode(pSyncNode)) {
|
||||||
|
@ -390,7 +390,7 @@ bool syncIsReadyForRead(int64_t rid) {
|
||||||
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
|
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
|
||||||
SSyncRaftEntry* pEntry = NULL;
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
|
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
|
||||||
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
|
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
|
||||||
if (code == 0 && pEntry != NULL) {
|
if (code == 0 && pEntry != NULL) {
|
||||||
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
|
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
|
||||||
ready = true;
|
ready = true;
|
||||||
|
@ -2462,11 +2462,36 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
|
||||||
} else {
|
} else {
|
||||||
syncEntryDestory(pEntry);
|
syncEntryDestory(pEntry);
|
||||||
}
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// del resp mgr, call FpCommitCb
|
// del resp mgr, call FpCommitCb
|
||||||
ASSERT(0);
|
|
||||||
|
SRpcMsg rpcMsg = {0};
|
||||||
|
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
|
||||||
|
|
||||||
|
SFsmCbMeta cbMeta = {
|
||||||
|
.index = pEntry->index,
|
||||||
|
.lastConfigIndex = SYNC_INDEX_INVALID,
|
||||||
|
.isWeak = pEntry->isWeak,
|
||||||
|
.code = -1,
|
||||||
|
.state = ths->state,
|
||||||
|
.seqNum = pEntry->seqNum,
|
||||||
|
.term = pEntry->term,
|
||||||
|
.currentTerm = ths->pRaftStore->currentTerm,
|
||||||
|
.flag = 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
syncRespMgrGetAndDel(ths->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
|
||||||
|
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, &cbMeta);
|
||||||
|
|
||||||
|
if (h) {
|
||||||
|
taosLRUCacheRelease(ths->pLogStore->pCache, h, false);
|
||||||
|
} else {
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
}
|
||||||
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue