[TD-549] fix crash while vread return TSDB_CODE_NOT_READY errno
This commit is contained in:
parent
353dc70f4b
commit
fa2a96e520
|
@ -192,13 +192,14 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
|
||||||
if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return;
|
if (code == TSDB_CODE_VND_ACTION_IN_PROGRESS) return;
|
||||||
if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) {
|
if (code == TSDB_CODE_VND_ACTION_NEED_REPROCESSED) {
|
||||||
dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
|
dnodeContinueExecuteQuery(pVnode, pRead->rspRet.qhandle, pRead);
|
||||||
|
code = TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
.handle = pRead->rpcMsg.handle,
|
.handle = pRead->rpcMsg.handle,
|
||||||
.pCont = pRead->rspRet.rsp,
|
.pCont = pRead->rspRet.rsp,
|
||||||
.contLen = pRead->rspRet.len,
|
.contLen = pRead->rspRet.len,
|
||||||
.code = pRead->rspRet.code,
|
.code = code,
|
||||||
};
|
};
|
||||||
|
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
|
@ -216,7 +217,7 @@ static void *dnodeProcessReadQueue(void *param) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("%p, msg:%s will be processed", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
|
dTrace("%p, msg:%s will be processed in vread queue", pReadMsg->rpcMsg.ahandle, taosMsg[pReadMsg->rpcMsg.msgType]);
|
||||||
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
|
int32_t code = vnodeProcessRead(pVnode, pReadMsg->rpcMsg.msgType, pReadMsg->pCont, pReadMsg->contLen, &pReadMsg->rspRet);
|
||||||
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
|
dnodeSendRpcReadRsp(pVnode, pReadMsg, code);
|
||||||
taosFreeQitem(pReadMsg);
|
taosFreeQitem(pReadMsg);
|
||||||
|
|
|
@ -216,7 +216,7 @@ static void *dnodeProcessWriteQueue(void *param) {
|
||||||
pHead->msgType = pWrite->rpcMsg.msgType;
|
pHead->msgType = pWrite->rpcMsg.msgType;
|
||||||
pHead->version = 0;
|
pHead->version = 0;
|
||||||
pHead->len = pWrite->contLen;
|
pHead->len = pWrite->contLen;
|
||||||
dTrace("%p, msg:%s will be processed", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
|
dTrace("%p, msg:%s will be processed in vwrite queue", pWrite->rpcMsg.ahandle, taosMsg[pWrite->rpcMsg.msgType]);
|
||||||
} else {
|
} else {
|
||||||
pHead = (SWalHead *)item;
|
pHead = (SWalHead *)item;
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,6 @@ typedef enum _VN_STATUS {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int len;
|
int len;
|
||||||
int code;
|
|
||||||
void *rsp;
|
void *rsp;
|
||||||
void *qhandle; //used by query and retrieve msg
|
void *qhandle; //used by query and retrieve msg
|
||||||
} SRspRet;
|
} SRspRet;
|
||||||
|
|
|
@ -39,15 +39,21 @@ void vnodeInitReadFp(void) {
|
||||||
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
|
int32_t vnodeProcessRead(void *param, int msgType, void *pCont, int32_t contLen, SRspRet *ret) {
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||||
|
|
||||||
if (vnodeProcessReadMsgFp[msgType] == NULL)
|
if (vnodeProcessReadMsgFp[msgType] == NULL) {
|
||||||
|
vTrace("vgId:%d, msgType:%s not processed, no handle", pVnode->vgId, taosMsg[msgType]);
|
||||||
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
|
return TSDB_CODE_VND_MSG_NOT_PROCESSED;
|
||||||
|
}
|
||||||
|
|
||||||
if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING)
|
if (pVnode->status == TAOS_VN_STATUS_DELETING || pVnode->status == TAOS_VN_STATUS_CLOSING) {
|
||||||
|
vTrace("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[msgType], pVnode->status);
|
||||||
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
return TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
|
}
|
||||||
|
|
||||||
// TODO: Later, let slave to support query
|
// TODO: Later, let slave to support query
|
||||||
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER)
|
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
|
vTrace("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
|
||||||
return TSDB_CODE_RPC_NOT_READY;
|
return TSDB_CODE_RPC_NOT_READY;
|
||||||
|
}
|
||||||
|
|
||||||
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
|
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pCont, contLen, ret);
|
||||||
}
|
}
|
||||||
|
@ -60,11 +66,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
|
||||||
|
|
||||||
qinfo_t pQInfo = NULL;
|
qinfo_t pQInfo = NULL;
|
||||||
if (contLen != 0) {
|
if (contLen != 0) {
|
||||||
pRet->code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
|
code = qCreateQueryInfo(pVnode->tsdb, pVnode->vgId, pQueryTableMsg, &pQInfo);
|
||||||
|
|
||||||
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
SQueryTableRsp *pRsp = (SQueryTableRsp *) rpcMallocCont(sizeof(SQueryTableRsp));
|
||||||
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
|
pRsp->qhandle = htobe64((uint64_t) (pQInfo));
|
||||||
pRsp->code = pRet->code;
|
pRsp->code = code;
|
||||||
|
|
||||||
pRet->len = sizeof(SQueryTableRsp);
|
pRet->len = sizeof(SQueryTableRsp);
|
||||||
pRet->rsp = pRsp;
|
pRet->rsp = pRsp;
|
||||||
|
@ -74,9 +80,11 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
|
||||||
assert(pCont != NULL);
|
assert(pCont != NULL);
|
||||||
pQInfo = pCont;
|
pQInfo = pCont;
|
||||||
code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_VND_ACTION_IN_PROGRESS;
|
||||||
|
vTrace("vgId:%d, QInfo:%p, dnode query msg in progress", pVnode->vgId, pQInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pQInfo != NULL) {
|
if (pQInfo != NULL) {
|
||||||
|
vTrace("vgId:%d, QInfo:%p, do qTableQuery", pVnode->vgId, pQInfo);
|
||||||
qTableQuery(pQInfo); // do execute query
|
qTableQuery(pQInfo); // do execute query
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -88,18 +96,16 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
|
||||||
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
|
void *pQInfo = (void*) htobe64(pRetrieve->qhandle);
|
||||||
memset(pRet, 0, sizeof(SRspRet));
|
memset(pRet, 0, sizeof(SRspRet));
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
|
|
||||||
vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
|
vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
|
||||||
|
|
||||||
pRet->code = qRetrieveQueryResultInfo(pQInfo);
|
int32_t code = qRetrieveQueryResultInfo(pQInfo);
|
||||||
if (pRet->code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
//TODO
|
//TODO
|
||||||
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
pRet->rsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||||
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
memset(pRet->rsp, 0, sizeof(SRetrieveTableRsp));
|
||||||
} else {
|
} else {
|
||||||
// todo check code and handle error in build result set
|
// todo check code and handle error in build result set
|
||||||
pRet->code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
|
code = qDumpRetrieveResult(pQInfo, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len);
|
||||||
|
|
||||||
if (qHasMoreResultsToRetrieve(pQInfo)) {
|
if (qHasMoreResultsToRetrieve(pQInfo)) {
|
||||||
pRet->qhandle = pQInfo;
|
pRet->qhandle = pQInfo;
|
||||||
|
|
Loading…
Reference in New Issue