feat: integrate with query

This commit is contained in:
dapan1121 2024-07-01 13:45:36 +08:00
parent 2927e1d6e4
commit 05917ea2a4
2 changed files with 32 additions and 2 deletions

View File

@ -308,7 +308,10 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
qwFreeTaskHandle(&ctx->taskHandle); qwFreeTaskHandle(&ctx->taskHandle);
if (ctx->sinkHandle) { if (ctx->sinkHandle) {
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
dsDestroyDataSinker(ctx->sinkHandle); dsDestroyDataSinker(ctx->sinkHandle);
taosDisableMemoryPoolUsage();
ctx->sinkHandle = NULL; ctx->sinkHandle = NULL;
qDebug("sink handle destroyed"); qDebug("sink handle destroyed");
} }

View File

@ -104,7 +104,9 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
} }
if (!ctx->needFetch) { if (!ctx->needFetch) {
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL, NULL); dsGetDataLength(ctx->sinkHandle, &ctx->affectedRows, NULL, NULL);
taosDisableMemoryPoolUsage();
} }
} }
@ -152,7 +154,10 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
if (taskHandle) { if (taskHandle) {
qwDbgSimulateSleep(); qwDbgSimulateSleep();
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch); code = qExecTaskOpt(taskHandle, pResList, &useconds, &hasMore, &localFetch);
taosDisableMemoryPoolUsage();
if (code) { if (code) {
if (code != TSDB_CODE_OPS_NOT_SUPPORT) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qExecTask failed, code:%x - %s", code, tstrerror(code));
@ -170,7 +175,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
SSDataBlock *pRes = taosArrayGetP(pResList, j); SSDataBlock *pRes = taosArrayGetP(pResList, j);
SInputData inputData = {.pData = pRes}; SInputData inputData = {.pData = pRes};
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
taosDisableMemoryPoolUsage();
if (code) { if (code) {
QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("dsPutDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_JRET(code); QW_ERR_JRET(code);
@ -198,7 +205,9 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
ctx->queryExecDone = true; ctx->queryExecDone = true;
} }
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
dsEndPut(sinkHandle, useconds); dsEndPut(sinkHandle, useconds);
taosDisableMemoryPoolUsage();
if (queryStop) { if (queryStop) {
*queryStop = true; *queryStop = true;
@ -305,8 +314,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
*pRawDataLen = 0; *pRawDataLen = 0;
while (true) { while (true) {
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd);
taosDisableMemoryPoolUsage();
if (len < 0) { if (len < 0) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len); QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64 "", len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -314,7 +325,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
if (len == 0) { if (len == 0) {
if (queryEnd) { if (queryEnd) {
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
code = dsGetDataBlock(ctx->sinkHandle, &output); code = dsGetDataBlock(ctx->sinkHandle, &output);
taosDisableMemoryPoolUsage();
if (code) { if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code); QW_ERR_RET(code);
@ -357,7 +371,10 @@ int32_t qwGetQueryResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen,
((int32_t*) output.pData)[1] = rawLen; ((int32_t*) output.pData)[1] = rawLen;
output.pData += sizeof(int32_t) * 2; output.pData += sizeof(int32_t) * 2;
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
code = dsGetDataBlock(ctx->sinkHandle, &output); code = dsGetDataBlock(ctx->sinkHandle, &output);
taosDisableMemoryPoolUsage();
if (code) { if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
QW_ERR_RET(code); QW_ERR_RET(code);
@ -407,8 +424,10 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
int32_t code = 0; int32_t code = 0;
SOutputData output = {0}; SOutputData output = {0};
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd); dsGetDataLength(ctx->sinkHandle, &len, &rawLen, &queryEnd);
taosDisableMemoryPoolUsage();
if (len <= 0 || len != sizeof(SDeleterRes)) { if (len <= 0 || len != sizeof(SDeleterRes)) {
QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len); QW_TASK_ELOG("invalid length from dsGetDataLength, length:%" PRId64, len);
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
@ -419,7 +438,10 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); QW_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
code = dsGetDataBlock(ctx->sinkHandle, &output); code = dsGetDataBlock(ctx->sinkHandle, &output);
taosDisableMemoryPoolUsage();
if (code) { if (code) {
QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("dsGetDataBlock failed, code:%x - %s", code, tstrerror(code));
taosMemoryFree(output.pData); taosMemoryFree(output.pData);
@ -495,7 +517,9 @@ int32_t qwStartDynamicTaskNewExec(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg
ctx->queryEnd = false; ctx->queryEnd = false;
#endif #endif
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
dsReset(ctx->sinkHandle); dsReset(ctx->sinkHandle);
taosDisableMemoryPoolUsage() ;
qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg); qUpdateOperatorParam(ctx->taskHandle, qwMsg->msg);
@ -753,7 +777,10 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, char *sql) {
QW_ERR_JRET(code); QW_ERR_JRET(code);
} }
taosEnableMemoryPoolUsage(gQueryPoolHandle, ctx->memPoolSession);
code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH); code = qCreateExecTask(qwMsg->node, mgmt->nodeId, tId, plan, &pTaskInfo, &sinkHandle, qwMsg->msgInfo.compressMsg, sql, OPTR_EXEC_MODEL_BATCH);
taosDisableMemoryPoolUsage();
sql = NULL; sql = NULL;
if (code) { if (code) {
QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code)); QW_TASK_ELOG("qCreateExecTask failed, code:%x - %s", code, tstrerror(code));