Merge pull request #9998 from taosdata/feature/3.0_liaohj
[td-11818] Fix memory leak in query.
This commit is contained in:
commit
24136f0104
|
@ -74,6 +74,7 @@ void columnListCopy(SArray* dst, const SArray* src, uint64_t uid);
|
|||
void columnListDestroy(SArray* pColumnList);
|
||||
|
||||
void dropAllExprInfo(SArray** pExprInfo, int32_t numOfLevel);
|
||||
void dropOneLevelExprInfo(SArray* pExprInfo);
|
||||
|
||||
typedef struct SSourceParam {
|
||||
SArray *pExprNodeList; //Array<struct tExprNode*>
|
||||
|
|
|
@ -218,6 +218,7 @@ int32_t getPlan(SRequestObj* pRequest, SQueryNode* pQueryNode, SQueryDag** pDag,
|
|||
|
||||
if (pQueryNode->type == TSDB_SQL_SELECT) {
|
||||
setResSchemaInfo(&pRequest->body.resInfo, pSchema, numOfCols);
|
||||
tfree(pSchema);
|
||||
pRequest->type = TDMT_VND_QUERY;
|
||||
} else {
|
||||
tfree(pSchema);
|
||||
|
|
|
@ -171,6 +171,8 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs);
|
|||
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols);
|
||||
void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
|
||||
void destroyBlockArrayList(SArray* pDataBlockList);
|
||||
void destroyBlockHashmap(SHashObj* pDataBlockHash);
|
||||
|
||||
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen);
|
||||
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
|
|
|
@ -213,10 +213,11 @@ SQueryStmtInfo *createQueryInfo() {
|
|||
|
||||
pQueryInfo->slimit.limit = -1;
|
||||
pQueryInfo->slimit.offset = 0;
|
||||
pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES);
|
||||
pQueryInfo->pDownstream = taosArrayInit(4, POINTER_BYTES);
|
||||
pQueryInfo->window = TSWINDOW_INITIALIZER;
|
||||
|
||||
pQueryInfo->exprList = calloc(10, POINTER_BYTES);
|
||||
|
||||
for(int32_t i = 0; i < 10; ++i) {
|
||||
pQueryInfo->exprList[i] = taosArrayInit(4, POINTER_BYTES);
|
||||
}
|
||||
|
@ -232,7 +233,8 @@ static void destroyQueryInfoImpl(SQueryStmtInfo* pQueryInfo) {
|
|||
cleanupFieldInfo(&pQueryInfo->fieldsInfo);
|
||||
|
||||
dropAllExprInfo(pQueryInfo->exprList, 10);
|
||||
pQueryInfo->exprList = NULL;
|
||||
|
||||
tfree(pQueryInfo->exprList);
|
||||
|
||||
columnListDestroy(pQueryInfo->colList);
|
||||
pQueryInfo->colList = NULL;
|
||||
|
@ -258,10 +260,10 @@ void destroyQueryInfo(SQueryStmtInfo* pQueryInfo) {
|
|||
|
||||
size_t numOfUpstream = taosArrayGetSize(pQueryInfo->pDownstream);
|
||||
for (int32_t i = 0; i < numOfUpstream; ++i) {
|
||||
SQueryStmtInfo* pUpQueryInfo = taosArrayGetP(pQueryInfo->pDownstream, i);
|
||||
destroyQueryInfoImpl(pUpQueryInfo);
|
||||
clearAllTableMetaInfo(pUpQueryInfo, false, 0);
|
||||
tfree(pUpQueryInfo);
|
||||
SQueryStmtInfo* pDownstream = taosArrayGetP(pQueryInfo->pDownstream, i);
|
||||
destroyQueryInfoImpl(pDownstream);
|
||||
clearAllTableMetaInfo(pDownstream, false, 0);
|
||||
tfree(pDownstream);
|
||||
}
|
||||
|
||||
destroyQueryInfoImpl(pQueryInfo);
|
||||
|
@ -1395,6 +1397,13 @@ int32_t validateFillNode(SQueryStmtInfo *pQueryInfo, SSqlNode* pSqlNode, SMsgBuf
|
|||
static void pushDownAggFuncExprInfo(SQueryStmtInfo* pQueryInfo);
|
||||
static void addColumnNodeFromLowerLevel(SQueryStmtInfo* pQueryInfo);
|
||||
|
||||
static void freeItemHelper(void* pItem) {
|
||||
void** p = pItem;
|
||||
if (*p != NULL) {
|
||||
tfree(*p);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf* pMsgBuf) {
|
||||
assert(pSqlNode != NULL && (pSqlNode->from == NULL || taosArrayGetSize(pSqlNode->from->list) > 0));
|
||||
|
||||
|
@ -1590,7 +1599,10 @@ int32_t validateSqlNode(SSqlNode* pSqlNode, SQueryStmtInfo* pQueryInfo, SMsgBuf*
|
|||
SArray* functionList = extractFunctionList(pQueryInfo->exprList[i]);
|
||||
extractFunctionDesc(functionList, &pQueryInfo->info);
|
||||
|
||||
if ((code = checkForInvalidExpr(pQueryInfo, pMsgBuf)) != TSDB_CODE_SUCCESS) {
|
||||
code = checkForInvalidExpr(pQueryInfo, pMsgBuf);
|
||||
taosArrayDestroyEx(functionList, freeItemHelper);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
@ -2902,6 +2914,8 @@ int32_t doAddOneProjectCol(SQueryStmtInfo* pQueryInfo, int32_t outputColIndex, S
|
|||
}
|
||||
|
||||
pQueryInfo->info.projectionQuery = true;
|
||||
|
||||
taosArrayDestroy(pColumnList);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3983,5 +3997,9 @@ int32_t qParserValidateSqlNode(SParseContext *pCtx, SSqlInfo* pInfo, SQueryStmtI
|
|||
validateSqlNode(p, pQueryInfo, &buf);
|
||||
}
|
||||
|
||||
taosArrayDestroy(data.pTableMeta);
|
||||
taosArrayDestroy(req.pUdf);
|
||||
taosArrayDestroy(req.pTableName);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -249,7 +249,7 @@ static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlo
|
|||
}
|
||||
}
|
||||
|
||||
void destroyDataBlock(STableDataBlocks* pDataBlock) {
|
||||
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
|
||||
if (pDataBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
@ -273,12 +273,29 @@ void destroyBlockArrayList(SArray* pDataBlockList) {
|
|||
|
||||
size_t size = taosArrayGetSize(pDataBlockList);
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
destroyDataBlock(taosArrayGetP(pDataBlockList, i));
|
||||
void* p = taosArrayGetP(pDataBlockList, i);
|
||||
destroyDataBlock(p);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pDataBlockList);
|
||||
}
|
||||
|
||||
void destroyBlockHashmap(SHashObj* pDataBlockHash) {
|
||||
if (pDataBlockHash == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
void** p1 = taosHashIterate(pDataBlockHash, NULL);
|
||||
while (p1) {
|
||||
STableDataBlocks* pBlocks = *p1;
|
||||
destroyDataBlock(pBlocks);
|
||||
|
||||
p1 = taosHashIterate(pDataBlockHash, p1);
|
||||
}
|
||||
|
||||
taosHashCleanup(pDataBlockHash);
|
||||
}
|
||||
|
||||
// data block is disordered, sort it in ascending order
|
||||
void sortRemoveDataBlockDupRowsRaw(STableDataBlocks *dataBuf) {
|
||||
SSubmitBlk *pBlocks = (SSubmitBlk *)dataBuf->pData;
|
||||
|
@ -490,7 +507,7 @@ int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t
|
|||
}
|
||||
|
||||
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
|
||||
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
|
||||
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
|
||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
|
||||
sizeof(STColumn) * getNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
|
||||
|
|
|
@ -525,10 +525,28 @@ static void destroyInsertParseContextForTable(SInsertParseContext* pCxt) {
|
|||
tdDestroyKVRowBuilder(&pCxt->tagsBuilder);
|
||||
}
|
||||
|
||||
static void destroyDataBlock(STableDataBlocks* pDataBlock) {
|
||||
if (pDataBlock == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
tfree(pDataBlock->pData);
|
||||
if (!pDataBlock->cloned) {
|
||||
// free the refcount for metermeta
|
||||
if (pDataBlock->pTableMeta != NULL) {
|
||||
tfree(pDataBlock->pTableMeta);
|
||||
}
|
||||
|
||||
destroyBoundColumnInfo(&pDataBlock->boundColumnInfo);
|
||||
}
|
||||
tfree(pDataBlock);
|
||||
}
|
||||
|
||||
static void destroyInsertParseContext(SInsertParseContext* pCxt) {
|
||||
destroyInsertParseContextForTable(pCxt);
|
||||
taosHashCleanup(pCxt->pVgroupsHashObj);
|
||||
taosHashCleanup(pCxt->pTableBlockHashObj);
|
||||
|
||||
destroyBlockHashmap(pCxt->pTableBlockHashObj);
|
||||
destroyBlockArrayList(pCxt->pTableDataBlocks);
|
||||
destroyBlockArrayList(pCxt->pVgDataBlocks);
|
||||
}
|
||||
|
|
|
@ -248,10 +248,15 @@ void qDestroyQuery(SQueryNode* pQueryNode) {
|
|||
if (NULL == pQueryNode) {
|
||||
return;
|
||||
}
|
||||
if (nodeType(pQueryNode) == TSDB_SQL_INSERT || nodeType(pQueryNode) == TSDB_SQL_CREATE_TABLE) {
|
||||
|
||||
int32_t type = nodeType(pQueryNode);
|
||||
if (type == TSDB_SQL_INSERT || type == TSDB_SQL_CREATE_TABLE) {
|
||||
SVnodeModifOpStmtInfo* pModifInfo = (SVnodeModifOpStmtInfo*)pQueryNode;
|
||||
taosArrayDestroy(pModifInfo->pDataBlocks);
|
||||
}
|
||||
|
||||
tfree(pQueryNode);
|
||||
tfree(pQueryNode);
|
||||
} else if (type == TSDB_SQL_SELECT) {
|
||||
SQueryStmtInfo* pQueryStmtInfo = (SQueryStmtInfo*) pQueryNode;
|
||||
destroyQueryInfo(pQueryStmtInfo);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -732,18 +732,8 @@ void cleanupFieldInfo(SFieldInfo* pFieldInfo) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (pFieldInfo->internalField != NULL) {
|
||||
size_t num = taosArrayGetSize(pFieldInfo->internalField);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
// SInternalField* pfield = taosArrayGet(pFieldInfo->internalField, i);
|
||||
// if (pfield->pExpr != NULL && pfield->pExpr->pExpr != NULL) {
|
||||
// sqlExprDestroy(pfield->pExpr);
|
||||
// }
|
||||
}
|
||||
}
|
||||
|
||||
taosArrayDestroy(pFieldInfo->internalField);
|
||||
// tfree(pFieldInfo->final);
|
||||
tfree(pFieldInfo->final);
|
||||
|
||||
memset(pFieldInfo, 0, sizeof(SFieldInfo));
|
||||
}
|
||||
|
|
|
@ -191,10 +191,12 @@ void destroyExprInfo(SExprInfo* pExprInfo) {
|
|||
for(int32_t i = 0; i < pExprInfo->base.numOfParams; ++i) {
|
||||
taosVariantDestroy(&pExprInfo->base.param[i]);
|
||||
}
|
||||
|
||||
tfree(pExprInfo->base.pColumns);
|
||||
tfree(pExprInfo);
|
||||
}
|
||||
|
||||
static void dropOneLevelExprInfo(SArray* pExprInfo) {
|
||||
void dropOneLevelExprInfo(SArray* pExprInfo) {
|
||||
size_t size = taosArrayGetSize(pExprInfo);
|
||||
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
|
@ -239,6 +241,9 @@ void assignExprInfo(SExprInfo* dst, const SExprInfo* src) {
|
|||
#endif
|
||||
|
||||
dst->pExpr = exprdup(src->pExpr);
|
||||
dst->base.pColumns = calloc(src->base.numOfCols, sizeof(SColumn));
|
||||
memcpy(dst->base.pColumns, src->base.pColumns, sizeof(SColumn) * src->base.numOfCols);
|
||||
|
||||
memset(dst->base.param, 0, sizeof(SVariant) * tListLen(dst->base.param));
|
||||
for (int32_t j = 0; j < src->base.numOfParams; ++j) {
|
||||
taosVariantAssign(&dst->base.param[j], &src->base.param[j]);
|
||||
|
|
|
@ -265,7 +265,6 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(const SQueryStmtInfo*
|
|||
} else {
|
||||
// here we can push down the projection to tablescan operator.
|
||||
pNode->numOfExpr = num;
|
||||
pNode->pExpr = taosArrayInit(num, POINTER_BYTES);
|
||||
taosArrayAddAll(pNode->pExpr, p);
|
||||
}
|
||||
}
|
||||
|
@ -357,7 +356,6 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
|
|||
SArray* exprList = taosArrayInit(4, POINTER_BYTES);
|
||||
if (copyExprInfoList(exprList, pQueryInfo->exprList[0], uid, true) != 0) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
// dropAllExprInfo(exprList);
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
|
@ -373,7 +371,6 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
|
|||
// 4. add the projection query node
|
||||
SQueryPlanNode* pNode = doAddTableColumnNode(pQueryInfo, &info, exprList, tableColumnList);
|
||||
columnListDestroy(tableColumnList);
|
||||
// dropAllExprInfo(exprList);
|
||||
taosArrayPush(pDownstream, &pNode);
|
||||
}
|
||||
|
||||
|
@ -398,7 +395,8 @@ SArray* createQueryPlanImpl(const SQueryStmtInfo* pQueryInfo) {
|
|||
}
|
||||
|
||||
static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
|
||||
if (pQueryNode->info.type == QNODE_MODIFY) {
|
||||
int32_t type = nodeType(pQueryNode);
|
||||
if (type == QNODE_MODIFY) {
|
||||
SDataPayloadInfo* pInfo = pQueryNode->pExtInfo;
|
||||
|
||||
size_t size = taosArrayGetSize(pInfo->payload);
|
||||
|
@ -410,10 +408,17 @@ static void doDestroyQueryNode(SQueryPlanNode* pQueryNode) {
|
|||
taosArrayDestroy(pInfo->payload);
|
||||
}
|
||||
|
||||
if (type == QNODE_STREAMSCAN || type == QNODE_TABLESCAN) {
|
||||
SQueryTableInfo* pQueryTableInfo = pQueryNode->pExtInfo;
|
||||
tfree(pQueryTableInfo->tableName);
|
||||
}
|
||||
|
||||
printf("----------->Free:%p\n", pQueryNode->pExpr);
|
||||
taosArrayDestroy(pQueryNode->pExpr);
|
||||
|
||||
tfree(pQueryNode->pExtInfo);
|
||||
tfree(pQueryNode->pSchema);
|
||||
tfree(pQueryNode->info.name);
|
||||
// dropAllExprInfo(pQueryNode->pExpr);
|
||||
|
||||
if (pQueryNode->pChildren != NULL) {
|
||||
int32_t size = (int32_t) taosArrayGetSize(pQueryNode->pChildren);
|
||||
|
|
|
@ -155,6 +155,16 @@ static SPhyNode* initPhyNode(SQueryPlanNode* pPlanNode, int32_t type, int32_t si
|
|||
return node;
|
||||
}
|
||||
|
||||
static void cleanupPhyNode(SPhyNode* pPhyNode) {
|
||||
if (pPhyNode == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
dropOneLevelExprInfo(pPhyNode->pTargets);
|
||||
tfree(pPhyNode->targetSchema.pSchema);
|
||||
tfree(pPhyNode);
|
||||
}
|
||||
|
||||
static SPhyNode* initScanNode(SQueryPlanNode* pPlanNode, SQueryTableInfo* pTable, int32_t type, int32_t size) {
|
||||
SScanPhyNode* node = (SScanPhyNode*) initPhyNode(pPlanNode, type, size);
|
||||
|
||||
|
@ -445,3 +455,29 @@ void setExchangSourceNode(uint64_t templateId, SDownstreamSource *pSource, SPhyN
|
|||
void setSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SDownstreamSource* pSource) {
|
||||
setExchangSourceNode(templateId, pSource, subplan->pNode);
|
||||
}
|
||||
|
||||
static void destroyDataSinkNode(SDataSink* pSinkNode) {
|
||||
if (pSinkNode == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (nodeType(pSinkNode) == DSINK_Dispatch) {
|
||||
SDataDispatcher* pDdSink = (SDataDispatcher*)pSinkNode;
|
||||
tfree(pDdSink->sink.schema.pSchema);
|
||||
}
|
||||
|
||||
tfree(pSinkNode);
|
||||
}
|
||||
|
||||
void qDestroySubplan(SSubplan* pSubplan) {
|
||||
if (pSubplan == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pSubplan->pChildren);
|
||||
taosArrayDestroy(pSubplan->pParents);
|
||||
destroyDataSinkNode(pSubplan->pDataSink);
|
||||
cleanupPhyNode(pSubplan->pNode);
|
||||
|
||||
tfree(pSubplan);
|
||||
}
|
||||
|
|
|
@ -1121,8 +1121,10 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
|
|||
}
|
||||
|
||||
*str = cJSON_Print(json);
|
||||
// printf("====Physical plan:====\n");
|
||||
// printf("%s\n", *str);
|
||||
cJSON_Delete(json);
|
||||
|
||||
printf("====Physical plan:====\n");
|
||||
printf("%s\n", *str);
|
||||
*len = strlen(*str) + 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -18,25 +18,6 @@
|
|||
|
||||
static void extractResSchema(struct SQueryDag* const* pDag, SSchema** pResSchema, int32_t* numOfCols);
|
||||
|
||||
static void destroyDataSinkNode(SDataSink* pSinkNode) {
|
||||
if (pSinkNode == NULL) {
|
||||
return;
|
||||
}
|
||||
tfree(pSinkNode);
|
||||
}
|
||||
|
||||
void qDestroySubplan(SSubplan* pSubplan) {
|
||||
if (pSubplan == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
taosArrayDestroy(pSubplan->pChildren);
|
||||
taosArrayDestroy(pSubplan->pParents);
|
||||
destroyDataSinkNode(pSubplan->pDataSink);
|
||||
// todo destroy pNode
|
||||
tfree(pSubplan);
|
||||
}
|
||||
|
||||
void qDestroyQueryDag(struct SQueryDag* pDag) {
|
||||
if (pDag == NULL) {
|
||||
return;
|
||||
|
@ -51,6 +32,7 @@ void qDestroyQueryDag(struct SQueryDag* pDag) {
|
|||
SSubplan* pSubplan = taosArrayGetP(pa, j);
|
||||
qDestroySubplan(pSubplan);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pa);
|
||||
}
|
||||
|
||||
|
|
|
@ -1512,9 +1512,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
|
|||
info = NULL;
|
||||
|
||||
_return:
|
||||
|
||||
schedulerFreeTaskList(info);
|
||||
|
||||
SCH_RET(code);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue