fix: the message cannot be processed until the vnode recovery is complete
This commit is contained in:
parent
d726c2a982
commit
bc9a553db0
|
@ -268,6 +268,7 @@ struct SVnode {
|
||||||
tsem_t canCommit;
|
tsem_t canCommit;
|
||||||
int64_t sync;
|
int64_t sync;
|
||||||
int32_t blockCount;
|
int32_t blockCount;
|
||||||
|
bool restored;
|
||||||
tsem_t syncSem;
|
tsem_t syncSem;
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
};
|
};
|
||||||
|
|
|
@ -224,9 +224,19 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs)
|
||||||
vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
|
vGTrace("vgId:%d, msg:%p get from vnode-write queue, weak:%d block:%d msg:%d:%d pos:%d, handle:%p", vgId, pMsg,
|
||||||
isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
|
isWeak, isBlock, msg, numOfMsgs, arrayPos, pMsg->info.handle);
|
||||||
|
|
||||||
|
if (!pVnode->restored) {
|
||||||
|
vGError("vgId:%d, msg:%p failed to process since not leader", vgId, pMsg);
|
||||||
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
|
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_APP_NOT_READY);
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
taosFreeQitem(pMsg);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pMsgArr == NULL || pIsWeakArr == NULL) {
|
if (pMsgArr == NULL || pIsWeakArr == NULL) {
|
||||||
vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
|
vGError("vgId:%d, msg:%p failed to process since out of memory", vgId, pMsg);
|
||||||
vnodeHandleProposeError(pVnode, pMsg, TSDB_CODE_OUT_OF_MEMORY);
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
vnodeHandleProposeError(pVnode, pMsg, terrno);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
continue;
|
continue;
|
||||||
|
@ -609,6 +619,12 @@ static void vnodeLeaderTransfer(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsm
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void vnodeRestoreFinish(struct SSyncFSM *pFsm) {
|
||||||
|
SVnode *pVnode = pFsm->data;
|
||||||
|
pVnode->restored = true;
|
||||||
|
vDebug("vgId:%d, sync restore finished", pVnode->config.vgId);
|
||||||
|
}
|
||||||
|
|
||||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||||
pFsm->data = pVnode;
|
pFsm->data = pVnode;
|
||||||
|
@ -616,7 +632,7 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
||||||
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
||||||
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
|
pFsm->FpGetSnapshotInfo = vnodeSyncGetSnapshot;
|
||||||
pFsm->FpRestoreFinishCb = NULL;
|
pFsm->FpRestoreFinishCb = vnodeRestoreFinish;
|
||||||
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
|
pFsm->FpLeaderTransferCb = vnodeLeaderTransfer;
|
||||||
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
||||||
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
||||||
|
@ -670,11 +686,10 @@ bool vnodeIsLeader(SVnode *pVnode) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo
|
if (!pVnode->restored) {
|
||||||
// if (!pVnode->restored) {
|
terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
// terrno = TSDB_CODE_APP_NOT_READY;
|
return false;
|
||||||
// return false;
|
}
|
||||||
// }
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
Loading…
Reference in New Issue