TD-1661
This commit is contained in:
parent
6137a2b377
commit
438aef99f7
|
@ -38,8 +38,7 @@ void vnodeInitReadFp(void) {
|
|||
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
|
||||
}
|
||||
|
||||
int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||
static int32_t vnodeProcessReadImp(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||
int msgType = pReadMsg->rpcMsg.msgType;
|
||||
|
||||
if (vnodeProcessReadMsgFp[msgType] == NULL) {
|
||||
|
@ -48,16 +47,23 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
|||
}
|
||||
|
||||
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
||||
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
|
||||
vDebug("vgId:%d, msgType:%s not processed, vnode status is %s", pVnode->vgId, taosMsg[msgType],
|
||||
vnodeStatus[pVnode->status]);
|
||||
return TSDB_CODE_APP_NOT_READY;
|
||||
}
|
||||
|
||||
// tsdb may be in reset state
|
||||
if (pVnode->tsdb == NULL) return TSDB_CODE_APP_NOT_READY;
|
||||
if (pVnode->status == TAOS_VN_STATUS_CLOSING) return TSDB_CODE_APP_NOT_READY;
|
||||
if (pVnode->tsdb == NULL) {
|
||||
vDebug("vgId:%d, msgType:%s not processed, tsdb is null", pVnode->vgId, taosMsg[msgType]);
|
||||
return TSDB_CODE_APP_NOT_READY;
|
||||
}
|
||||
|
||||
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
|
||||
vDebug("vgId:%d, msgType:%s not processed, vstatus is %s", pVnode->vgId, taosMsg[msgType],
|
||||
vnodeStatus[pVnode->status]);
|
||||
return TSDB_CODE_APP_NOT_READY;
|
||||
}
|
||||
|
||||
// TODO: Later, let slave to support query
|
||||
// if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%s", pVnode->vgId, taosMsg[msgType],
|
||||
pVnode->syncCfg.replica, syncRole[pVnode->role]);
|
||||
|
@ -67,6 +73,25 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
|||
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
|
||||
}
|
||||
|
||||
int32_t vnodeProcessRead(void *param, SReadMsg *pRead) {
|
||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||
int32_t code = vnodeProcessReadImp(pVnode, pRead);
|
||||
|
||||
if (code == TSDB_CODE_APP_NOT_READY && pRead->rpcMsg.msgType == TSDB_MSG_TYPE_QUERY) {
|
||||
// After the fetch request enters the vnode queue
|
||||
// If the vnode cannot provide services, the following operations are still required
|
||||
// Or, there will be a deadlock
|
||||
void **qhandle = (void **)pRead->pCont;
|
||||
vError("QInfo:%p msg:%p will be killed for vstatus is %s", *qhandle, pRead, vnodeStatus[pVnode->status]);
|
||||
|
||||
// qKillQuery(*qhandle);
|
||||
// qReleaseQInfo(pVnode->qMgmt, (void **)&qhandle, true);
|
||||
return TSDB_CODE_APP_NOT_READY;
|
||||
} else {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
|
||||
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
||||
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
||||
|
|
Loading…
Reference in New Issue