diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index d625bb113a..69d4093221 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -153,6 +153,7 @@ int32_t qwRetrieveJobInfo(QW_FPARAMS_DEF, SQWJobInfo** ppJob) { if (atomic_load_8(&pJob->destroyed)) { QW_UNLOCK(QW_READ, &pJob->lock); + taosHashRelease(gQueryMgmt.pJobInfo, pJob); continue; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 641fa03f7a..fdbe93965a 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -449,51 +449,64 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes return TSDB_CODE_SUCCESS; } -int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32_t code) { - if (QUERY_RSP_POLICY_QUICK == tsQueryRspPolicy && ctx != NULL) { - if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - void *rsp = NULL; - int32_t dataLen = 0; - int32_t rawLen = 0; - SOutputData sOutput = {0}; - if (TSDB_CODE_SUCCESS == code) { - code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput); +int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWMsg *qwMsg, int32_t code) { + if (QUERY_RSP_POLICY_QUICK != tsQueryRspPolicy) { + return TSDB_CODE_SUCCESS; + } + + SQWTaskCtx *ctx = NULL; + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); + + if (!QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { + goto _return; + } + + void *rsp = NULL; + int32_t dataLen = 0; + int32_t rawLen = 0; + SOutputData sOutput = {0}; + if (TSDB_CODE_SUCCESS == code) { + code = qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rawLen, &rsp, &sOutput); + } + + if (code) { + qwFreeFetchRsp(rsp); + rsp = NULL; + dataLen = 0; + } + + if (NULL == rsp && TSDB_CODE_SUCCESS == code) { + goto _return; + } + + if (NULL != rsp) { + bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); + + qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); + if (qComplete) { + atomic_store_8((int8_t *)&ctx->queryEnd, true); + if (!ctx->dynamicTask) { + qwFreeSinkHandle(ctx); } - - if (code) { - qwFreeFetchRsp(rsp); - rsp = NULL; - dataLen = 0; - } - - if (NULL == rsp && TSDB_CODE_SUCCESS == code) { - return TSDB_CODE_SUCCESS; - } - - if (NULL != rsp) { - bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); - - qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); - if (qComplete) { - atomic_store_8((int8_t *)&ctx->queryEnd, true); - if (!ctx->dynamicTask) { - qwFreeSinkHandle(ctx); - } - } - } - - qwMsg->connInfo = ctx->dataConnInfo; - QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); - - QW_ERR_RET(qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code)); - rsp = NULL; - - QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), - dataLen); } } - return TSDB_CODE_SUCCESS; + qwMsg->connInfo = ctx->dataConnInfo; + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); + + QW_ERR_JRET(qwBuildAndSendFetchRsp(ctx, ctx->fetchMsgType + 1, &qwMsg->connInfo, rsp, dataLen, code)); + rsp = NULL; + + QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code), + dataLen); + +_return: + + if (ctx) { + qwReleaseTaskCtx(mgmt, ctx); + } + + return code; } int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg) { @@ -748,6 +761,8 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); + QW_LOCK(QW_WRITE, &ctx->lock); + ctx->ctrlConnInfo = qwMsg->connInfo; ctx->sId = sId; ctx->phase = -1; @@ -767,7 +782,8 @@ _return: if (code) { (void)qwDropTask(QW_FPARAMS()); } - + + QW_UNLOCK(QW_WRITE, &ctx->lock); qwReleaseTaskCtx(mgmt, ctx); } @@ -848,7 +864,7 @@ _return: input.msgType = qwMsg->msgType; code = qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_QUERY, &input, NULL); - QW_ERR_RET(qwQuickRspFetchReq(QW_FPARAMS(), ctx, qwMsg, code)); + QW_ERR_RET(qwQuickRspFetchReq(QW_FPARAMS(), qwMsg, code)); QW_RET(TSDB_CODE_SUCCESS); }