refactor(sync): add syncIsReadyForRead
This commit is contained in:
parent
1fc79e289d
commit
a4b54c4f0e
|
@ -219,6 +219,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex);
|
|||
int32_t syncEndSnapshot(int64_t rid);
|
||||
int32_t syncLeaderTransfer(int64_t rid);
|
||||
int32_t syncStepDown(int64_t rid, SyncTerm newTerm);
|
||||
bool syncIsReadyForRead(int64_t rid);
|
||||
|
||||
SSyncState syncGetState(int64_t rid);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
|
|
|
@ -334,7 +334,8 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
|
||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
vTrace("message in vnode query queue is processing");
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
|
||||
// if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
|
||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
@ -356,7 +357,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||
!vnodeIsLeader(pVnode)) {
|
||||
!syncIsReadyForRead(pVnode->sync)) {
|
||||
// !vnodeIsLeader(pVnode)) {
|
||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -437,6 +437,39 @@ int32_t syncStepDown(int64_t rid, SyncTerm newTerm) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
bool syncIsReadyForRead(int64_t rid) {
|
||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return false;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && pSyncNode->restoreFinish) {
|
||||
syncNodeRelease(pSyncNode);
|
||||
return true;
|
||||
}
|
||||
|
||||
bool ready = false;
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER && !pSyncNode->restoreFinish) {
|
||||
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
|
||||
SSyncRaftEntry* pEntry = NULL;
|
||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(
|
||||
pSyncNode->pLogStore, pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore), &pEntry);
|
||||
if (code == 0 && pEntry != NULL) {
|
||||
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == pSyncNode->pRaftStore->currentTerm) {
|
||||
ready = true;
|
||||
}
|
||||
|
||||
syncEntryDestory(pEntry);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
return ready;
|
||||
}
|
||||
|
||||
int32_t syncNodeLeaderTransfer(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->peersNum == 0) {
|
||||
sDebug("only one replica, cannot leader transfer");
|
||||
|
|
Loading…
Reference in New Issue