fix: not allow to read if sync is restoring
This commit is contained in:
parent
4353e70c89
commit
d51c3c36fb
|
@ -351,7 +351,6 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
vTrace("message in vnode query queue is processing");
|
vTrace("message in vnode query queue is processing");
|
||||||
// if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
|
|
||||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) {
|
if ((pMsg->msgType == TDMT_SCH_QUERY) && !syncIsReadyForRead(pVnode->sync)) {
|
||||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -375,7 +374,6 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||||
!syncIsReadyForRead(pVnode->sync)) {
|
!syncIsReadyForRead(pVnode->sync)) {
|
||||||
// !vnodeIsLeader(pVnode)) {
|
|
||||||
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
vnodeRedirectRpcMsg(pVnode, pMsg, terrno);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -436,55 +436,12 @@ bool syncNodeIsReadyForRead(SSyncNode* pSyncNode) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSyncNode->restoreFinish) {
|
if (!pSyncNode->restoreFinish) {
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool ready = false;
|
|
||||||
if (!pSyncNode->pFsm->FpApplyQueueEmptyCb(pSyncNode->pFsm)) {
|
|
||||||
// apply queue not empty
|
|
||||||
ready = false;
|
|
||||||
|
|
||||||
} else {
|
|
||||||
if (!pSyncNode->pLogStore->syncLogIsEmpty(pSyncNode->pLogStore)) {
|
|
||||||
SyncIndex lastIndex = pSyncNode->pLogStore->syncLogLastIndex(pSyncNode->pLogStore);
|
|
||||||
SSyncRaftEntry* pEntry = NULL;
|
|
||||||
SLRUCache* pCache = pSyncNode->pLogStore->pCache;
|
|
||||||
LRUHandle* h = taosLRUCacheLookup(pCache, &lastIndex, sizeof(lastIndex));
|
|
||||||
int32_t code = 0;
|
|
||||||
if (h) {
|
|
||||||
pEntry = (SSyncRaftEntry*)taosLRUCacheValue(pCache, h);
|
|
||||||
code = 0;
|
|
||||||
|
|
||||||
pSyncNode->pLogStore->cacheHit++;
|
|
||||||
sNTrace(pSyncNode, "hit cache index:%" PRId64 ", bytes:%u, %p", lastIndex, pEntry->bytes, pEntry);
|
|
||||||
|
|
||||||
} else {
|
|
||||||
pSyncNode->pLogStore->cacheMiss++;
|
|
||||||
sNTrace(pSyncNode, "miss cache index:%" PRId64, lastIndex);
|
|
||||||
|
|
||||||
code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, lastIndex, &pEntry);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code == 0 && pEntry != NULL) {
|
|
||||||
if (pEntry->originalRpcType == TDMT_SYNC_NOOP && pEntry->term == raftStoreGetTerm(pSyncNode)) {
|
|
||||||
ready = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (h) {
|
|
||||||
taosLRUCacheRelease(pCache, h, false);
|
|
||||||
} else {
|
|
||||||
syncEntryDestroy(pEntry);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!ready) {
|
|
||||||
terrno = TSDB_CODE_SYN_RESTORING;
|
terrno = TSDB_CODE_SYN_RESTORING;
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return ready;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool syncIsReadyForRead(int64_t rid) {
|
bool syncIsReadyForRead(int64_t rid) {
|
||||||
|
|
Loading…
Reference in New Issue