diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d1ebc02655..2192418c50 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -707,13 +707,36 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWe syncClientRequestBatch2RpcMsg(pSyncMsg, &rpcMsg); taosMemoryFree(pSyncMsg); // only free msg body, do not free rpc msg content - if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { - // enqueue msg ok + if (pSyncNode->replicaNum == 1 && pSyncNode->vgId != 1) { + int32_t code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); + if (code == 0) { + // update rpc msg applyIndex + SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pSyncMsg); + ASSERT(arrSize == pSyncMsg->dataCount); + for (int i = 0; i < arrSize; ++i) { + pMsgArr[i].info.conn.applyIndex = msgArr[i].info.conn.applyIndex; + syncRespMgrDel(pSyncNode->pSyncRespMgr, raftArr[i].seqNum); + } + + rpcFreeCont(rpcMsg.pCont); + terrno = 0; + return 1; + + } else { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } } else { - sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId); - terrno = TSDB_CODE_SYN_INTERNAL_ERROR; - return -1; + if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { + // enqueue msg ok + return 0; + + } else { + sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId); + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; + return -1; + } } return 0; @@ -2490,6 +2513,9 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p ASSERT(0); return -1; } + + // update rpc msg conn apply.index + msgArr[i].info.conn.applyIndex = pEntry->index; } // fsync once @@ -2498,7 +2524,7 @@ int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* p walFsync(pWal, true); if (ths->replicaNum > 1) { - // if mulit replica, start replicate right now + // if multi replica, start replicate right now syncNodeReplicate(ths); } else if (ths->replicaNum == 1) {