fix(query): fix memory leak.
This commit is contained in:
parent
18924f9a7d
commit
79b0d1fcbd
|
@ -480,6 +480,7 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = ret;
|
pTaskInfo->code = ret;
|
||||||
cleanUpUdfs();
|
cleanUpUdfs();
|
||||||
|
|
||||||
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
|
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
|
||||||
atomic_store_64(&pTaskInfo->owner, 0);
|
atomic_store_64(&pTaskInfo->owner, 0);
|
||||||
|
|
||||||
|
@ -512,8 +513,8 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds) {
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanUpUdfs();
|
cleanUpUdfs();
|
||||||
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
|
||||||
|
|
||||||
|
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
|
||||||
qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
qDebug("%s task suspended, %d rows in %d blocks returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
|
||||||
GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
|
GET_TASKID(pTaskInfo), current, (int32_t)taosArrayGetSize(pResList), total, 0, el / 1000.0);
|
||||||
|
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
|
#include "tdatablock.h"
|
||||||
|
|
||||||
SQWorkerMgmt gQwMgmt = {
|
SQWorkerMgmt gQwMgmt = {
|
||||||
.lock = 0,
|
.lock = 0,
|
||||||
|
@ -16,6 +17,11 @@ SQWorkerMgmt gQwMgmt = {
|
||||||
.qwNum = 0,
|
.qwNum = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static void freeBlock(void* param) {
|
||||||
|
SSDataBlock* pBlock = *(SSDataBlock**)param;
|
||||||
|
blockDataDestroy(pBlock);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SSchedulerHbRsp rsp = {0};
|
SSchedulerHbRsp rsp = {0};
|
||||||
|
@ -71,7 +77,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
\
|
||||||
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
bool qcontinue = true;
|
bool qcontinue = true;
|
||||||
|
@ -88,6 +94,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
||||||
// if *taskHandle is NULL, it's killed right now
|
// if *taskHandle is NULL, it's killed right now
|
||||||
if (taskHandle) {
|
if (taskHandle) {
|
||||||
qwDbgSimulateSleep();
|
qwDbgSimulateSleep();
|
||||||
|
|
||||||
code = qExecTaskOpt(taskHandle, pResList, &useconds);
|
code = qExecTaskOpt(taskHandle, pResList, &useconds);
|
||||||
if (code) {
|
if (code) {
|
||||||
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
|
if (code != TSDB_CODE_OPS_NOT_SUPPORT) {
|
||||||
|
@ -150,8 +157,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryStop) {
|
||||||
}
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
taosArrayDestroyEx(pResList, freeBlock);
|
||||||
taosArrayDestroy(pResList);
|
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -915,13 +921,13 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
||||||
|
|
||||||
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
void *pIter = taosHashIterate(mgmt->schHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SQWSchStatus *sch = (SQWSchStatus *)pIter;
|
SQWSchStatus *sch1 = (SQWSchStatus *)pIter;
|
||||||
if (NULL == sch->hbConnInfo.handle) {
|
if (NULL == sch1->hbConnInfo.handle) {
|
||||||
uint64_t *sId = taosHashGetKey(pIter, NULL);
|
uint64_t *sId = taosHashGetKey(pIter, NULL);
|
||||||
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
|
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
|
||||||
|
|
||||||
if (sch->hbBrokenTs > 0 && ((currentMs - sch->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
|
if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
|
||||||
taosHashGetSize(sch->tasksHash) <= 0) {
|
taosHashGetSize(sch1->tasksHash) <= 0) {
|
||||||
taosArrayPush(pExpiredSch, sId);
|
taosArrayPush(pExpiredSch, sId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue