diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index f00c4aef30..d1988c3904 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -308,7 +308,10 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { qwFreeTaskHandle(&ctx->taskHandle); if (ctx->sinkHandle) { + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); dsDestroyDataSinker(ctx->sinkHandle); + taosDisableMemoryPoolUsage(); + ctx->sinkHandle = NULL; qDebug("sink handle destroyed"); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 9dfad8c525..21bb6886b1 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -104,7 +104,9 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { } if (!ctx->needFetch) { + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL, NULL); + taosDisableMemoryPoolUsage(); } } @@ -152,7 +154,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { if (taskHandle) { qwDbgSimulateSleep(); + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); + taosDisableMemoryPoolUsage(); + if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); @@ -170,7 +175,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { SSDataBlock *pRes = taosArrayGetP(pResList, j); SInputData inputData = {.pData = pRes}; + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); + taosDisableMemoryPoolUsage(); if (code) { QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_ERR_JRET(code); @@ -198,7 +205,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) { ctx->queryExecDone = true; } + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); dsEndPut(sinkHandle, useconds); + taosDisableMemoryPoolUsage(); if (queryStop) { *queryStop = true; @@ -305,8 +314,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, *pRawDataLen = 0; while (true) { + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); - + taosDisableMemoryPoolUsage(); + if (len < 0) { QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -314,7 +325,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, if (len == 0) { if (queryEnd) { + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); code = dsGetDataBlock(ctx->sinkHandle, &output); + taosDisableMemoryPoolUsage(); + if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_ERR_RET(code); @@ -357,7 +371,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, ((int32_t*) output.pData)[1] = rawLen; output.pData += sizeof(int32_t) * 2; + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); code = dsGetDataBlock(ctx->sinkHandle, &output); + taosDisableMemoryPoolUsage(); + if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_ERR_RET(code); @@ -407,8 +424,10 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes int32_t code = 0; SOutputData output = {0}; + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); - + taosDisableMemoryPoolUsage(); + if (len <= 0 || len != sizeof(SDeleterRes)) { QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); @@ -419,7 +438,10 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); code = dsGetDataBlock(ctx->sinkHandle, &output); + taosDisableMemoryPoolUsage(); + if (code) { QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); taosMemoryFree(output.pData); @@ -495,7 +517,9 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg ctx->queryEnd = false; #endif + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); dsReset(ctx->sinkHandle); + taosDisableMemoryPoolUsage() ; qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg); @@ -753,7 +777,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) { QW_ERR_JRET(code); } + taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); + taosDisableMemoryPoolUsage(); + sql = NULL; if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));