[td-225]
This commit is contained in:
parent
b3165d4193
commit
40c7208cc2
|
@ -6229,6 +6229,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
||||||
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
|
pQInfo->dataReady = QUERY_RESULT_NOT_READY;
|
||||||
pQInfo->rspContext = NULL;
|
pQInfo->rspContext = NULL;
|
||||||
pthread_mutex_init(&pQInfo->lock, NULL);
|
pthread_mutex_init(&pQInfo->lock, NULL);
|
||||||
|
tsem_init(&pQInfo->ready, 0, 0);
|
||||||
|
|
||||||
pQuery->pos = -1;
|
pQuery->pos = -1;
|
||||||
pQuery->window = pQueryMsg->window;
|
pQuery->window = pQueryMsg->window;
|
||||||
|
|
|
@ -272,6 +272,11 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
if (pRetrieve->free == 1) {
|
if (pRetrieve->free == 1) {
|
||||||
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
vWarn("vgId:%d, QInfo:%p, retrieve msg received to kill query and free qhandle", pVnode->vgId, *handle);
|
||||||
qKillQuery(*handle);
|
qKillQuery(*handle);
|
||||||
|
#if !(_NON_BLOCKING_RETRIEVE)
|
||||||
|
void** p = handle;
|
||||||
|
qReleaseQInfo(pVnode->qMgmt, (void **)&p, false);
|
||||||
|
#endif
|
||||||
|
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
|
|
||||||
vnodeBuildNoResultQueryRsp(pRet);
|
vnodeBuildNoResultQueryRsp(pRet);
|
||||||
|
@ -285,6 +290,12 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
pReadMsg->rpcMsg.handle);
|
pReadMsg->rpcMsg.handle);
|
||||||
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
code = TSDB_CODE_RPC_NETWORK_UNAVAIL;
|
||||||
qKillQuery(*handle);
|
qKillQuery(*handle);
|
||||||
|
|
||||||
|
#if !(_NON_BLOCKING_RETRIEVE)
|
||||||
|
void** p = handle;
|
||||||
|
qReleaseQInfo(pVnode->qMgmt, (void **)&p, false);
|
||||||
|
#endif
|
||||||
|
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -314,6 +325,11 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle);
|
code = vnodeDumpQueryResult(pRet, pVnode, handle, &freeHandle, pReadMsg->rpcMsg.ahandle);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if !(_NON_BLOCKING_RETRIEVE)
|
||||||
|
void** p = handle;
|
||||||
|
qReleaseQInfo(pVnode->qMgmt, (void **)&p, false);
|
||||||
|
#endif
|
||||||
|
|
||||||
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
|
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
|
||||||
// Here free qhandle immediately
|
// Here free qhandle immediately
|
||||||
if (freeHandle) {
|
if (freeHandle) {
|
||||||
|
|
Loading…
Reference in New Issue