[td-225] prevent qinfo from being released while it is in queue.
This commit is contained in:
parent
28f2f102af
commit
d08e9972b6
|
@ -6408,6 +6408,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex
|
||||||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*buildRes = false;
|
||||||
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
SQuery *pQuery = pQInfo->runtimeEnv.pQuery;
|
||||||
if (IS_QUERY_KILLED(pQInfo)) {
|
if (IS_QUERY_KILLED(pQInfo)) {
|
||||||
qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
|
qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code);
|
||||||
|
@ -6751,7 +6752,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2;
|
const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2 * 1000;
|
||||||
|
|
||||||
SQueryMgmt *pQueryMgmt = pMgmt;
|
SQueryMgmt *pQueryMgmt = pMgmt;
|
||||||
if (pQueryMgmt->qinfoPool == NULL) {
|
if (pQueryMgmt->qinfoPool == NULL) {
|
||||||
|
|
|
@ -159,7 +159,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
// current connect is broken
|
// current connect is broken
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
|
handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo);
|
||||||
if (handle == NULL) { // failed to register qhandle
|
if (handle == NULL) { // failed to register qhandle, todo add error test case
|
||||||
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
|
vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo,
|
||||||
tstrerror(pRsp->code));
|
tstrerror(pRsp->code));
|
||||||
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
|
@ -180,12 +180,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (handle != NULL) {
|
if (handle != NULL) {
|
||||||
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);
|
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle);
|
||||||
|
|
||||||
vnodePutItemIntoReadQueue(pVnode, *handle);
|
vnodePutItemIntoReadQueue(pVnode, *handle);
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
assert(pCont != NULL);
|
assert(pCont != NULL);
|
||||||
|
|
||||||
|
@ -194,18 +191,19 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
|
vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle);
|
||||||
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
code = TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||||
} else {
|
} else {
|
||||||
vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont);
|
vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, (void*) pCont);
|
||||||
|
|
||||||
bool freehandle = false;
|
bool freehandle = false;
|
||||||
bool buildRes = qTableQuery(*handle); // do execute query
|
bool buildRes = qTableQuery(*handle); // do execute query
|
||||||
|
|
||||||
// build query rsp
|
// build query rsp, the retrieve request has reached here already
|
||||||
if (buildRes) {
|
if (buildRes) {
|
||||||
// update the connection info according to the retrieve connection
|
// update the connection info according to the retrieve connection
|
||||||
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
|
pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle);
|
||||||
assert(pReadMsg->rpcMsg.handle != NULL);
|
assert(pReadMsg->rpcMsg.handle != NULL);
|
||||||
|
|
||||||
vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle);
|
vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *handle,
|
||||||
|
pReadMsg->rpcMsg.handle);
|
||||||
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
|
code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle);
|
||||||
|
|
||||||
// todo test the error code case
|
// todo test the error code case
|
||||||
|
@ -214,9 +212,19 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If retrieval request has not arrived, release the qhandle and decrease the reference count to allow
|
||||||
|
// the queryMgmt to free it when expired
|
||||||
|
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
||||||
|
|
||||||
|
// NOTE:
|
||||||
|
// if the qhandle is put into query vread queue and wait to be executed by worker in read queue,
|
||||||
|
// the reference count of qhandle can not be decreased. Otherwise, qhandle may be released before or in the
|
||||||
|
// procedure of query execution
|
||||||
|
if (freehandle) {
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, freehandle);
|
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, freehandle);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue