fix(query): fix memory leak.
This commit is contained in:
parent
7adc21028a
commit
8205168428
|
@ -67,7 +67,7 @@ struct SExecTaskInfo {
|
||||||
};
|
};
|
||||||
|
|
||||||
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
|
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
|
||||||
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, char* dbFName);
|
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model);
|
||||||
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
||||||
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
|
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
|
||||||
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
|
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
|
||||||
|
|
|
@ -250,7 +250,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks,
|
||||||
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
|
qTaskInfo_t qCreateQueueExecTaskInfo(void* msg, SReadHandle* pReaderHandle, int32_t vgId, int32_t* numOfCols,
|
||||||
uint64_t id) {
|
uint64_t id) {
|
||||||
if (msg == NULL) { // create raw scan
|
if (msg == NULL) { // create raw scan
|
||||||
SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE, "");
|
SExecTaskInfo* pTaskInfo = doCreateTask(0, id, vgId, OPTR_EXEC_MODEL_QUEUE);
|
||||||
if (NULL == pTaskInfo) {
|
if (NULL == pTaskInfo) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -35,8 +35,7 @@
|
||||||
|
|
||||||
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
#define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st)))
|
||||||
|
|
||||||
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model,
|
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model) {
|
||||||
char* dbFName) {
|
|
||||||
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
SExecTaskInfo* pTaskInfo = taosMemoryCalloc(1, sizeof(SExecTaskInfo));
|
||||||
if (pTaskInfo == NULL) {
|
if (pTaskInfo == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -46,17 +45,17 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
pTaskInfo->cost.created = taosGetTimestampUs();
|
pTaskInfo->cost.created = taosGetTimestampUs();
|
||||||
|
|
||||||
pTaskInfo->schemaInfo.dbname = taosStrdup(dbFName);
|
|
||||||
pTaskInfo->execModel = model;
|
pTaskInfo->execModel = model;
|
||||||
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
|
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
|
||||||
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
|
pTaskInfo->pResultBlockList = taosArrayInit(128, POINTER_BYTES);
|
||||||
|
|
||||||
taosInitRWLatch(&pTaskInfo->lock);
|
taosInitRWLatch(&pTaskInfo->lock);
|
||||||
|
|
||||||
pTaskInfo->id.vgId = vgId;
|
pTaskInfo->id.vgId = vgId;
|
||||||
pTaskInfo->id.queryId = queryId;
|
pTaskInfo->id.queryId = queryId;
|
||||||
|
|
||||||
pTaskInfo->id.str = taosMemoryMalloc(64);
|
pTaskInfo->id.str = taosMemoryMalloc(64);
|
||||||
buildTaskId(taskId, queryId, pTaskInfo->id.str);
|
buildTaskId(taskId, queryId, pTaskInfo->id.str);
|
||||||
|
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,7 +78,7 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
|
||||||
|
|
||||||
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||||
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
|
int32_t vgId, char* sql, EOPTR_EXEC_MODEL model) {
|
||||||
*pTaskInfo = doCreateTask(pPlan->id.queryId, taskId, vgId, model, pPlan->dbFName);
|
*pTaskInfo = doCreateTask(pPlan->id.queryId, taskId, vgId, model);
|
||||||
if (*pTaskInfo == NULL) {
|
if (*pTaskInfo == NULL) {
|
||||||
goto _complete;
|
goto _complete;
|
||||||
}
|
}
|
||||||
|
@ -90,8 +89,7 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pTaskInfo)->sql = sql;
|
TSWAP((*pTaskInfo)->sql, sql);
|
||||||
sql = NULL;
|
|
||||||
|
|
||||||
(*pTaskInfo)->pSubplan = pPlan;
|
(*pTaskInfo)->pSubplan = pPlan;
|
||||||
(*pTaskInfo)->pRoot = createOperator(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond,
|
(*pTaskInfo)->pRoot = createOperator(pPlan->pNode, *pTaskInfo, pHandle, pPlan->pTagCond, pPlan->pTagIndexCond,
|
||||||
|
|
Loading…
Reference in New Issue