[td-171] fix memory leaks in super table query.
This commit is contained in:
parent
a2f054014d
commit
7493384d66
|
@ -108,7 +108,7 @@ typedef struct STableQueryInfo {
|
||||||
SWindowResInfo windowResInfo;
|
SWindowResInfo windowResInfo;
|
||||||
} STableQueryInfo;
|
} STableQueryInfo;
|
||||||
|
|
||||||
typedef struct STableDataInfo {
|
typedef struct STableDataInfo { // todo merge with the STableQueryInfo struct
|
||||||
int32_t tableIndex;
|
int32_t tableIndex;
|
||||||
int32_t groupIdx; // group id in table list
|
int32_t groupIdx; // group id in table list
|
||||||
STableQueryInfo* pTableQInfo;
|
STableQueryInfo* pTableQInfo;
|
||||||
|
|
|
@ -111,7 +111,7 @@ static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
|
||||||
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY *tsCol, int32_t size,
|
static void setExecParams(SQuery *pQuery, SQLFunctionCtx *pCtx, void *inputData, TSKEY *tsCol, int32_t size,
|
||||||
int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag);
|
int32_t functionId, SDataStatis *pStatis, bool hasNull, void *param, int32_t scanFlag);
|
||||||
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
|
static void initCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
|
||||||
static void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols);
|
static void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols);
|
||||||
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
|
static void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv);
|
||||||
static bool hasMainOutput(SQuery *pQuery);
|
static bool hasMainOutput(SQuery *pQuery);
|
||||||
static void createTableDataInfo(SQInfo *pQInfo);
|
static void createTableDataInfo(SQInfo *pQInfo);
|
||||||
|
@ -3478,7 +3478,7 @@ STableQueryInfo *createTableQueryInfo(SQueryRuntimeEnv *pRuntimeEnv, int32_t tid
|
||||||
return pTableQueryInfo;
|
return pTableQueryInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
UNUSED_FUNC void destroyMeterQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) {
|
void destroyTableQueryInfo(STableQueryInfo *pTableQueryInfo, int32_t numOfCols) {
|
||||||
if (pTableQueryInfo == NULL) {
|
if (pTableQueryInfo == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -4187,7 +4187,10 @@ int32_t doInitQInfo(SQInfo *pQInfo, void *param, void *tsdb, bool isSTableQuery)
|
||||||
.numOfCols = pQuery->numOfCols,
|
.numOfCols = pQuery->numOfCols,
|
||||||
};
|
};
|
||||||
|
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo);
|
if (!isSTableQuery || isIntervalQuery(pQuery) || isFixedOutputQuery(pQuery)) {
|
||||||
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(tsdb, &cond, &pQInfo->groupInfo);
|
||||||
|
}
|
||||||
|
|
||||||
pQInfo->tsdb = tsdb;
|
pQInfo->tsdb = tsdb;
|
||||||
|
|
||||||
pRuntimeEnv->pQuery = pQuery;
|
pRuntimeEnv->pQuery = pQuery;
|
||||||
|
@ -4410,7 +4413,14 @@ static bool multiTableMultioutputHelper(SQInfo *pQInfo, int32_t index) {
|
||||||
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
|
STableGroupInfo gp = {.numOfTables = 1, .pGroupList = g1};
|
||||||
|
|
||||||
// include only current table
|
// include only current table
|
||||||
|
if (pRuntimeEnv->pQueryHandle != NULL) {
|
||||||
|
tsdbCleanupQueryHandle(pRuntimeEnv->pQueryHandle);
|
||||||
|
pRuntimeEnv->pQueryHandle = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp);
|
pRuntimeEnv->pQueryHandle = tsdbQueryTables(pQInfo->tsdb, &cond, &gp);
|
||||||
|
taosArrayDestroy(tx);
|
||||||
|
taosArrayDestroy(g1);
|
||||||
|
|
||||||
if (pRuntimeEnv->pTSBuf != NULL) {
|
if (pRuntimeEnv->pTSBuf != NULL) {
|
||||||
if (pRuntimeEnv->cur.vnodeIndex == -1) {
|
if (pRuntimeEnv->cur.vnodeIndex == -1) {
|
||||||
|
@ -5760,14 +5770,7 @@ static SQInfo *createQInfoImpl(SQueryTableMsg *pQueryMsg, SSqlGroupbyExpr *pGrou
|
||||||
pColInfo->filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pColInfo->numOfFilters);
|
pColInfo->filters = tscFilterInfoClone(pQueryMsg->colList[i].filters, pColInfo->numOfFilters);
|
||||||
}
|
}
|
||||||
|
|
||||||
pQuery->tagColList = calloc(pQueryMsg->numOfTags, sizeof(SColumnInfo));
|
pQuery->tagColList = pTagCols;
|
||||||
if (pQuery->tagColList == NULL) {
|
|
||||||
goto _cleanup;
|
|
||||||
}
|
|
||||||
|
|
||||||
for(int16_t i = 0; i < pQuery->numOfTags; ++i) {
|
|
||||||
pQuery->tagColList[i] = pTagCols[i];
|
|
||||||
}
|
|
||||||
|
|
||||||
// calculate the result row size
|
// calculate the result row size
|
||||||
for (int16_t col = 0; col < numOfOutput; ++col) {
|
for (int16_t col = 0; col < numOfOutput; ++col) {
|
||||||
|
@ -5930,10 +5933,6 @@ static void freeQInfo(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pQuery->pFilterInfo);
|
|
||||||
tfree(pQuery->colList);
|
|
||||||
tfree(pQuery->sdata);
|
|
||||||
|
|
||||||
if (pQuery->pSelectExpr != NULL) {
|
if (pQuery->pSelectExpr != NULL) {
|
||||||
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQuery->numOfOutput; ++i) {
|
||||||
SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
|
SExprInfo *pBinExprInfo = &pQuery->pSelectExpr[i].binExprInfo;
|
||||||
|
@ -5951,16 +5950,34 @@ static void freeQInfo(SQInfo *pQInfo) {
|
||||||
tfree(pQuery->defaultVal);
|
tfree(pQuery->defaultVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pQuery->pGroupbyExpr);
|
// todo refactor, extract method to destroytableDataInfo
|
||||||
tfree(pQuery);
|
|
||||||
|
|
||||||
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
int32_t numOfGroups = taosArrayGetSize(pQInfo->groupInfo.pGroupList);
|
||||||
for (int32_t i = 0; i < numOfGroups; ++i) {
|
for (int32_t i = 0; i < numOfGroups; ++i) {
|
||||||
SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
|
SArray *p = taosArrayGetP(pQInfo->groupInfo.pGroupList, i);
|
||||||
|
|
||||||
|
size_t num = taosArrayGetSize(p);
|
||||||
|
for(int32_t j = 0; j < num; ++j) {
|
||||||
|
SPair* pair = taosArrayGet(p, j);
|
||||||
|
if (pair->sec != NULL) {
|
||||||
|
destroyTableQueryInfo(((STableDataInfo*)pair->sec)->pTableQInfo, pQuery->numOfOutput);
|
||||||
|
tfree(pair->sec);
|
||||||
|
}
|
||||||
|
}
|
||||||
taosArrayDestroy(p);
|
taosArrayDestroy(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pQuery->pGroupbyExpr != NULL) {
|
||||||
|
taosArrayDestroy(pQuery->pGroupbyExpr->columnInfo);
|
||||||
|
tfree(pQuery->pGroupbyExpr);
|
||||||
|
}
|
||||||
|
|
||||||
|
tfree(pQuery->tagColList);
|
||||||
|
tfree(pQuery->pFilterInfo);
|
||||||
|
tfree(pQuery->colList);
|
||||||
|
tfree(pQuery->sdata);
|
||||||
|
|
||||||
taosArrayDestroy(pQInfo->groupInfo.pGroupList);
|
taosArrayDestroy(pQInfo->groupInfo.pGroupList);
|
||||||
|
tfree(pQuery);
|
||||||
|
|
||||||
qTrace("QInfo:%p QInfo is freed", pQInfo);
|
qTrace("QInfo:%p QInfo is freed", pQInfo);
|
||||||
|
|
||||||
|
@ -6036,8 +6053,8 @@ int32_t qCreateQueryInfo(void *tsdb, SQueryTableMsg *pQueryMsg, qinfo_t *pQInfo)
|
||||||
SColIndex * pGroupColIndex = NULL;
|
SColIndex * pGroupColIndex = NULL;
|
||||||
SColumnInfo* pTagColumnInfo = NULL;
|
SColumnInfo* pTagColumnInfo = NULL;
|
||||||
|
|
||||||
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond,
|
if ((code = convertQueryMsg(pQueryMsg, &pTableIdList, &pExprMsg, &tagCond, &pGroupColIndex, &pTagColumnInfo)) !=
|
||||||
&pGroupColIndex, &pTagColumnInfo)) != TSDB_CODE_SUCCESS) {
|
TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1133,6 +1133,7 @@ static int32_t getAllTableIdList(STsdbRepo* tsdb, int64_t uid, SArray* list) {
|
||||||
taosArrayPush(list, &t);
|
taosArrayPush(list, &t);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tSkipListDestroyIter(iter);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1445,6 +1446,7 @@ int32_t tsdbQueryByTagsCond(TsdbRepoT* tsdb, int64_t uid, const char* pTagCond,
|
||||||
|
|
||||||
pGroupInfo->numOfTables = taosArrayGetSize(res);
|
pGroupInfo->numOfTables = taosArrayGetSize(res);
|
||||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
|
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
|
||||||
|
taosArrayDestroy(res);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1463,6 +1465,7 @@ int32_t tsdbQueryByTagsCond(TsdbRepoT* tsdb, int64_t uid, const char* pTagCond,
|
||||||
pGroupInfo->numOfTables = taosArrayGetSize(res);
|
pGroupInfo->numOfTables = taosArrayGetSize(res);
|
||||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
|
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols);
|
||||||
|
|
||||||
|
taosArrayDestroy(res);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1483,6 +1486,7 @@ int32_t tsdbGetOneTableGroup(TsdbRepoT* tsdb, int64_t uid, STableGroupInfo* pGro
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
|
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
|
||||||
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
|
STsdbQueryHandle* pQueryHandle = (STsdbQueryHandle*)queryHandle;
|
||||||
if (pQueryHandle == NULL) {
|
if (pQueryHandle == NULL) {
|
||||||
|
@ -1516,5 +1520,6 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
|
||||||
|
|
||||||
tfree(pQueryHandle->pDataBlockInfo);
|
tfree(pQueryHandle->pDataBlockInfo);
|
||||||
tsdbDestroyHelper(&pQueryHandle->rhelper);
|
tsdbDestroyHelper(&pQueryHandle->rhelper);
|
||||||
|
|
||||||
tfree(pQueryHandle);
|
tfree(pQueryHandle);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue