diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 4d4b2387ef..60eb501dd2 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -541,6 +541,8 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { break; } + int32_t rows = pRes->info.rows; + ASSERT(pRes->info.rows > 0); SInputData inputData = {.pData = pRes}; @@ -550,7 +552,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { QW_ERR_RET(code); } - QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", pRes->info.rows, qcontinue); + QW_TASK_DLOG("data put into sink, rows:%d, continueExecTask:%d", rows, qcontinue); if (!qcontinue) { break; diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 2d27884466..cc4233bd47 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -249,6 +249,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { if (0 == pRsp->code && 0 == rsp->completed) { qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); + rpcFreeCont(rsp); return; }