fix: query worker fetch response memory leak issue

This commit is contained in:
dapan1121 2024-10-22 14:27:24 +08:00
parent 0e595fa768
commit 0fd275db66
1 changed files with 14 additions and 10 deletions

View File

@ -329,7 +329,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (len < 0) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
QW_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT);
}
if (len == 0) {
@ -337,18 +337,18 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code);
QW_ERR_JRET(code);
}
QW_TASK_DLOG("no more data in sink and query end, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows);
if (!ctx->dynamicTask) {
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
}
if (NULL == pRsp) {
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, len, &pRsp));
QW_ERR_JRET(qwMallocFetchRsp(!ctx->localExec, len, &pRsp));
*pOutput = output;
} else {
pOutput->queryEnd = output.queryEnd;
@ -368,7 +368,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
*dataLen += len + PAYLOAD_PREFIX_LEN;
*pRawDataLen += rawLen + PAYLOAD_PREFIX_LEN;
QW_ERR_RET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp));
QW_ERR_JRET(qwMallocFetchRsp(!ctx->localExec, *dataLen, &pRsp));
// set the serialize start position
output.pData = pRsp->data + *dataLen - (len + PAYLOAD_PREFIX_LEN);
@ -380,7 +380,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
code = dsGetDataBlock(ctx->sinkHandle, &output);
if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code);
QW_ERR_JRET(code);
}
pOutput->queryEnd = output.queryEnd;
@ -399,7 +399,7 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) {
QW_TASK_DLOG("task all data fetched and done, fetched blocks %d rows %" PRId64, pOutput->numOfBlocks,
pOutput->numOfRows);
QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCC, ctx->dynamicTask));
break;
}
@ -416,8 +416,11 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
}
}
_return:
*rspMsg = pRsp;
return TSDB_CODE_SUCCESS;
return code;
}
int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes) {
@ -877,10 +880,11 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
break;
}
qwFreeFetchRsp(rsp);
rsp = NULL;
if (code && QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) {
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
qwFreeFetchRsp(rsp);
rsp = NULL;
qwMsg->connInfo = ctx->dataConnInfo;
code = qwBuildAndSendFetchRsp(ctx->fetchMsgType + 1, &qwMsg->connInfo, NULL, 0, code);