[td-3457]<fix>: fix the bug that cancel request failed to stop query at vnodes.
This commit is contained in:
parent
6cef55cc5e
commit
253ca7d025
|
@ -372,6 +372,7 @@ int32_t qKillQuery(qinfo_t qinfo) {
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qDebug("QInfo:%"PRIu64" query killed", pQInfo->qId);
|
||||||
setQueryKilled(pQInfo);
|
setQueryKilled(pQInfo);
|
||||||
|
|
||||||
// Wait for the query executing thread being stopped/
|
// Wait for the query executing thread being stopped/
|
||||||
|
|
|
@ -364,7 +364,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
||||||
|
|
||||||
// register the qhandle to connect to quit query immediate if connection is broken
|
// register the qhandle to connect to quit query immediate if connection is broken
|
||||||
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
|
||||||
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, %p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
|
vError("vgId:%d, QInfo:%"PRIu64 "-%p, retrieve discarded since link is broken, conn:%p", pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle);
|
||||||
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
qKillQuery(*handle);
|
qKillQuery(*handle);
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
|
@ -409,7 +409,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead) {
|
||||||
// client is broken, the query needs to be killed immediately.
|
// client is broken, the query needs to be killed immediately.
|
||||||
int32_t vnodeNotifyCurrentQhandle(void *handle, uint64_t qId, void *qhandle, int32_t vgId) {
|
int32_t vnodeNotifyCurrentQhandle(void *handle, uint64_t qId, void *qhandle, int32_t vgId) {
|
||||||
SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
|
SRetrieveTableMsg *pMsg = rpcMallocCont(sizeof(SRetrieveTableMsg));
|
||||||
pMsg->qhandle = htobe64((uint64_t)qhandle);
|
pMsg->qId = htobe64(qId);
|
||||||
pMsg->header.vgId = htonl(vgId);
|
pMsg->header.vgId = htonl(vgId);
|
||||||
pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
|
pMsg->header.contLen = htonl(sizeof(SRetrieveTableMsg));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue