Merge pull request #17931 from taosdata/feature/3.0_mhli
fix(sync): when apply queue not empty, can not read
This commit is contained in:
commit
b0f469adb1
|
@ -138,6 +138,7 @@ typedef struct SSyncFSM {
|
||||||
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm);
|
void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm);
|
||||||
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
|
void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta);
|
||||||
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
|
void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta);
|
||||||
|
bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm);
|
||||||
|
|
||||||
void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm);
|
void (*FpBecomeLeaderCb)(const struct SSyncFSM* pFsm);
|
||||||
void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm);
|
void (*FpBecomeFollowerCb)(const struct SSyncFSM* pFsm);
|
||||||
|
|
|
@ -202,6 +202,13 @@ static void mndBecomeLeader(const SSyncFSM *pFsm) {
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool mndApplyQueueEmpty(const SSyncFSM *pFsm) {
|
||||||
|
SMnode *pMnode = pFsm->data;
|
||||||
|
|
||||||
|
int32_t itemSize = tmsgGetQueueSize(&pMnode->msgCb, 1, APPLY_QUEUE);
|
||||||
|
return (itemSize == 0);
|
||||||
|
}
|
||||||
|
|
||||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||||
pFsm->data = pMnode;
|
pFsm->data = pMnode;
|
||||||
|
@ -210,6 +217,7 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
pFsm->FpRollBackCb = NULL;
|
pFsm->FpRollBackCb = NULL;
|
||||||
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
||||||
pFsm->FpLeaderTransferCb = NULL;
|
pFsm->FpLeaderTransferCb = NULL;
|
||||||
|
pFsm->FpApplyQueueEmptyCb = mndApplyQueueEmpty;
|
||||||
pFsm->FpReConfigCb = NULL;
|
pFsm->FpReConfigCb = NULL;
|
||||||
pFsm->FpBecomeLeaderCb = mndBecomeLeader;
|
pFsm->FpBecomeLeaderCb = mndBecomeLeader;
|
||||||
pFsm->FpBecomeFollowerCb = mndBecomeFollower;
|
pFsm->FpBecomeFollowerCb = mndBecomeFollower;
|
||||||
|
|
|
@ -436,6 +436,12 @@ static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
|
||||||
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool vnodeApplyQueueEmpty(const SSyncFSM *pFsm) {
|
||||||
|
SVnode *pVnode = pFsm->data;
|
||||||
|
int32_t itemSize = tmsgGetQueueSize(&pVnode->msgCb, pVnode->config.vgId, APPLY_QUEUE);
|
||||||
|
return (itemSize == 0);
|
||||||
|
}
|
||||||
|
|
||||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||||
pFsm->data = pVnode;
|
pFsm->data = pVnode;
|
||||||
|
@ -445,6 +451,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
|
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
|
||||||
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
|
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
|
||||||
pFsm->FpLeaderTransferCb = NULL;
|
pFsm->FpLeaderTransferCb = NULL;
|
||||||
|
pFsm->FpApplyQueueEmptyCb = vnodeApplyQueueEmpty;
|
||||||
pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
|
pFsm->FpBecomeLeaderCb = vnodeBecomeLeader;
|
||||||
pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
|
pFsm->FpBecomeFollowerCb = vnodeBecomeFollower;
|
||||||
pFsm->FpReConfigCb = NULL;
|
pFsm->FpReConfigCb = NULL;
|
||||||
|
|
|
@ -452,6 +452,11 @@ bool syncIsReadyForRead(int64_t rid) {
|
||||||
|
|
||||||
bool ready = false;
|
bool ready = false;
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
|
||||||
|
if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
|
||||||
|
// apply queue not empty
|
||||||
|
ready = false;
|
||||||
|
|
||||||
|
} else {
|
||||||
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
|
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
|
||||||
SSyncRaftEntry* pEntry = NULL;
|
SSyncRaftEntry* pEntry = NULL;
|
||||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
|
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
|
||||||
|
@ -465,6 +470,7 @@ bool syncIsReadyForRead(int64_t rid) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!ready) {
|
if (!ready) {
|
||||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||||
|
|
Loading…
Reference in New Issue