From d08e9972b6f05f7b6e36d9d138ef5d4d6664f580 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 5 Aug 2020 00:48:19 +0800 Subject: [PATCH] [td-225] prevent qinfo from being released while it is in queue. --- src/query/src/qExecutor.c | 3 ++- src/vnode/src/vnodeRead.c | 26 +++++++++++++++++--------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/query/src/qExecutor.c b/src/query/src/qExecutor.c index ac90ca9595..b5fc67e4f1 100644 --- a/src/query/src/qExecutor.c +++ b/src/query/src/qExecutor.c @@ -6408,6 +6408,7 @@ int32_t qRetrieveQueryResultInfo(qinfo_t qinfo, bool* buildRes, void* pRspContex return TSDB_CODE_QRY_INVALID_QHANDLE; } + *buildRes = false; SQuery *pQuery = pQInfo->runtimeEnv.pQuery; if (IS_QUERY_KILLED(pQInfo)) { qDebug("QInfo:%p query is killed, code:%d", pQInfo, pQInfo->code); @@ -6751,7 +6752,7 @@ void** qRegisterQInfo(void* pMgmt, uint64_t qInfo) { return NULL; } - const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2; + const int32_t DEFAULT_QHANDLE_LIFE_SPAN = tsShellActivityTimer * 2 * 1000; SQueryMgmt *pQueryMgmt = pMgmt; if (pQueryMgmt->qinfoPool == NULL) { diff --git a/src/vnode/src/vnodeRead.c b/src/vnode/src/vnodeRead.c index 3cf5c1382f..896b27e03f 100644 --- a/src/vnode/src/vnodeRead.c +++ b/src/vnode/src/vnodeRead.c @@ -159,7 +159,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { // current connect is broken if (code == TSDB_CODE_SUCCESS) { handle = qRegisterQInfo(pVnode->qMgmt, (uint64_t) pQInfo); - if (handle == NULL) { // failed to register qhandle + if (handle == NULL) { // failed to register qhandle, todo add error test case vError("vgId:%d QInfo:%p register qhandle failed, return to app, code:%s", pVnode->vgId, (void *)pQInfo, tstrerror(pRsp->code)); pRsp->code = TSDB_CODE_QRY_INVALID_QHANDLE; @@ -180,12 +180,9 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } if (handle != NULL) { - vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); - + vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, create qhandle and returns to app", vgId, *handle); vnodePutItemIntoReadQueue(pVnode, *handle); - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); } - } else { assert(pCont != NULL); @@ -194,18 +191,19 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { vWarn("QInfo:%p invalid qhandle in continuing exec query, conn:%p", (void*) pCont, pReadMsg->rpcMsg.handle); code = TSDB_CODE_QRY_INVALID_QHANDLE; } else { - vDebug("vgId:%d, QInfo:%p, dnode continue exec query", pVnode->vgId, (void*) pCont); + vDebug("vgId:%d, QInfo:%p, dnode continues to exec query", pVnode->vgId, (void*) pCont); bool freehandle = false; bool buildRes = qTableQuery(*handle); // do execute query - // build query rsp + // build query rsp, the retrieve request has reached here already if (buildRes) { // update the connection info according to the retrieve connection pReadMsg->rpcMsg.handle = qGetResultRetrieveMsg(*handle); assert(pReadMsg->rpcMsg.handle != NULL); - vDebug("vgId:%d, QInfo:%p, start to build result rsp after query paused, %p", pVnode->vgId, *handle, pReadMsg->rpcMsg.handle); + vDebug("vgId:%d, QInfo:%p, start to build retrieval rsp after query paused, %p", pVnode->vgId, *handle, + pReadMsg->rpcMsg.handle); code = vnodeDumpQueryResult(&pReadMsg->rspRet, pVnode, *handle, &freehandle); // todo test the error code case @@ -214,7 +212,17 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { } } - qReleaseQInfo(pVnode->qMgmt, (void**) &handle, freehandle); + // If retrieval request has not arrived, release the qhandle and decrease the reference count to allow + // the queryMgmt to free it when expired + qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); + + // NOTE: + // if the qhandle is put into query vread queue and wait to be executed by worker in read queue, + // the reference count of qhandle can not be decreased. Otherwise, qhandle may be released before or in the + // procedure of query execution + if (freehandle) { + qReleaseQInfo(pVnode->qMgmt, (void **)&handle, freehandle); + } } }