Merge branch 'fix/TD-19325' into feature/tq
This commit is contained in:
commit
752e13f94c
|
@ -99,7 +99,6 @@ void vnodeSyncStart(SVnode* pVnode);
|
||||||
void vnodeSyncClose(SVnode* pVnode);
|
void vnodeSyncClose(SVnode* pVnode);
|
||||||
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
|
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
|
||||||
bool vnodeIsLeader(SVnode* pVnode);
|
bool vnodeIsLeader(SVnode* pVnode);
|
||||||
bool vnodeIsReadyForRead(SVnode* pVnode);
|
|
||||||
bool vnodeIsRoleLeader(SVnode* pVnode);
|
bool vnodeIsRoleLeader(SVnode* pVnode);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -289,7 +289,7 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
vTrace("message in vnode query queue is processing");
|
vTrace("message in vnode query queue is processing");
|
||||||
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsReadyForRead(pVnode)) {
|
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
|
||||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -311,7 +311,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
||||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||||
!vnodeIsReadyForRead(pVnode)) {
|
!vnodeIsLeader(pVnode)) {
|
||||||
vnodeRedirectRpcMsg(pVnode, pMsg);
|
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -796,22 +796,3 @@ bool vnodeIsLeader(SVnode *pVnode) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool vnodeIsReadyForRead(SVnode *pVnode) {
|
|
||||||
if (!pVnode->restored) {
|
|
||||||
vDebug("vgId:%d, vnode restore not finished", pVnode->config.vgId);
|
|
||||||
terrno = TSDB_CODE_APP_NOT_READY;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (syncIsReady(pVnode->sync)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (syncIsReadyForRead(pVnode->sync)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
vDebug("vgId:%d, vnode not ready for read, state:%s, last:%ld, cmt:%ld", pVnode->config.vgId,
|
|
||||||
syncGetMyRoleStr(pVnode->sync), syncGetLastIndex(pVnode->sync), syncGetCommitIndex(pVnode->sync));
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
|
@ -396,29 +396,6 @@ bool syncIsReady(int64_t rid) {
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool syncIsReadyForRead(int64_t rid) {
|
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
|
||||||
if (pSyncNode == NULL) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
ASSERT(rid == pSyncNode->rid);
|
|
||||||
|
|
||||||
// TODO: last not noop?
|
|
||||||
SyncIndex lastIndex = syncNodeGetLastIndex(pSyncNode);
|
|
||||||
bool b = (pSyncNode->state == TAOS_SYNC_STATE_LEADER) && (pSyncNode->commitIndex >= lastIndex - SYNC_MAX_READ_RANGE);
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
|
||||||
|
|
||||||
// if false, set error code
|
|
||||||
if (false == b) {
|
|
||||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
|
||||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
|
||||||
} else {
|
|
||||||
terrno = TSDB_CODE_APP_NOT_READY;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return b;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool syncIsRestoreFinish(int64_t rid) {
|
bool syncIsRestoreFinish(int64_t rid) {
|
||||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue