[td-225] do not execute query if the link is already broken.
This commit is contained in:
parent
6dec0284bc
commit
315fa74042
|
@ -70,7 +70,8 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo);
|
||||||
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
|
int32_t qDumpRetrieveResult(qinfo_t qinfo, SRetrieveTableRsp** pRsp, int32_t* contLen);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decide if more results will be produced or not
|
* Decide if more results will be produced or not, NOTE: this function will increase the ref count of QInfo,
|
||||||
|
* so it can be only called once for each retrieve
|
||||||
*
|
*
|
||||||
* @param qinfo
|
* @param qinfo
|
||||||
* @return
|
* @return
|
||||||
|
|
|
@ -61,7 +61,7 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
||||||
|
|
||||||
// notify connection(handle) that current qhandle is created, if current connection from
|
// notify connection(handle) that current qhandle is created, if current connection from
|
||||||
// client is broken, the query needs to be killed immediately.
|
// client is broken, the query needs to be killed immediately.
|
||||||
static void vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
|
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId) {
|
||||||
SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
|
SRetrieveTableMsg* killQueryMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
|
||||||
killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
|
killQueryMsg->qhandle = htobe64((uint64_t) qhandle);
|
||||||
killQueryMsg->free = htons(1);
|
killQueryMsg->free = htons(1);
|
||||||
|
@ -69,7 +69,7 @@ static void vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId)
|
||||||
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
|
killQueryMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
|
||||||
|
|
||||||
vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle);
|
vTrace("QInfo:%p register qhandle to connect:%p", qhandle, handle);
|
||||||
rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
|
return rpcReportProgress(handle, (char*) killQueryMsg, sizeof(SRetrieveTableMsg));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
|
@ -106,7 +106,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
pRet->len = sizeof(SQueryTableRsp);
|
pRet->len = sizeof(SQueryTableRsp);
|
||||||
pRet->rsp = pRsp;
|
pRet->rsp = pRsp;
|
||||||
|
|
||||||
vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId);
|
// current connect is broken
|
||||||
|
if (vnodeNotifyCurrentQhandle(pReadMsg->rpcMsg.handle, pQInfo, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||||
|
vError("vgId:%d, QInfo:%p, dnode query discarded since link is broken, %p", pVnode->vgId, pQInfo, pReadMsg->rpcMsg.handle);
|
||||||
|
pRsp->code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
|
|
||||||
|
//NOTE: there two refcount, needs to kill twice, todo refactor
|
||||||
|
qKillQuery(pQInfo);
|
||||||
|
qKillQuery(pQInfo);
|
||||||
|
|
||||||
|
return pRsp->code;
|
||||||
|
}
|
||||||
|
|
||||||
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
|
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue