fix(query): fix memory leak.

This commit is contained in:
Haojun Liao 2022-07-08 17:28:37 +08:00
parent 91eaba87ef
commit 3e81daf578
7 changed files with 76 additions and 44 deletions

View File

@ -172,7 +172,13 @@ typedef struct tExprNode {
void tExprTreeDestroy(tExprNode *pNode, void (*fp)(void *));
typedef enum {
CREATED_COLDATA = 0x1, // the newly created column data needs to be destroyed.
INPUT_COLDATA = 0x2, // input column data should not be released.
} SCOLDATA_TYPE_E;
struct SScalarParam {
SCOLDATA_TYPE_E type;
SColumnInfoData *columnData;
SHashObj *pHashFilter;
int32_t hashValueType;

View File

@ -2181,12 +2181,21 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
return VND_TSDB(pVnode);
}
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) {
int64_t startVer = (pCond->startVersion == -1)? 0:pCond->startVersion;
if (VND_IS_RSMA(pVnode)) {
return (SVersionRange){.minVer = pCond->startVersion, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
return (SVersionRange){.minVer = startVer, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)};
}
return (SVersionRange){.minVer = pCond->startVersion, .maxVer = pVnode->state.applied};
int64_t endVer = 0;
if (pCond->endVersion == -1) { // user not specified end version, set current maximum version of vnode as the endVersion
endVer = pVnode->state.applied;
} else {
endVer = (pCond->endVersion > pVnode->state.applied)? pVnode->state.applied:pCond->endVersion;
}
return (SVersionRange){.minVer = startVer, .maxVer = endVer};
}
// // todo not unref yet, since it is not support multi-group interpolation query

View File

@ -748,11 +748,12 @@ SInterval extractIntervalInfo(const STableScanPhysiNode* pTableScanNode) {
SColumn extractColumnFromColumnNode(SColumnNode* pColNode) {
SColumn c = {0};
c.slotId = pColNode->slotId;
c.colId = pColNode->colId;
c.type = pColNode->node.resType.type;
c.bytes = pColNode->node.resType.bytes;
c.scale = pColNode->node.resType.scale;
c.slotId = pColNode->slotId;
c.colId = pColNode->colId;
c.type = pColNode->node.resType.type;
c.bytes = pColNode->node.resType.bytes;
c.scale = pColNode->node.resType.scale;
c.precision = pColNode->node.resType.precision;
return c;
}
@ -774,6 +775,8 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
pCond->suid = pTableScanNode->scan.suid;
pCond->type = BLOCK_LOAD_OFFSET_ORDER;
pCond->startVersion = -1;
pCond->endVersion = -1;
// pCond->type = pTableScanNode->scanFlag;
int32_t j = 0;

View File

@ -57,7 +57,7 @@ typedef struct SScalarCtx {
#define SCL_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { terrno = code; goto _return; } } while (0)
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out);
SColumnInfoData* sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows);
int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam);
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
#define GET_PARAM_TYPE(_c) ((_c)->columnData ? (_c)->columnData->info.type : (_c)->hashValueType)

View File

@ -3827,13 +3827,20 @@ bool filterExecute(SFilterInfo *info, SSDataBlock *pSrc, int8_t** p, SColumnData
SScalarParam output = {0};
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
output.columnData = sclCreateColumnInfoData(&type, pSrc->info.rows);
int32_t code = sclCreateColumnInfoData(&type, pSrc->info.rows, &output);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
SArray *pList = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(pList, &pSrc);
FLT_ERR_RET(scalarCalculate(info->sclCtx.node, pList, &output));
*p = (int8_t *)output.columnData->pData;
*p = taosMemoryMalloc(output.numOfRows * sizeof(bool));
memcpy(*p, output.columnData->pData, output.numOfRows);
colDataDestroy(output.columnData);
taosMemoryFree(output.columnData);
taosArrayDestroy(pList);
return false;

View File

@ -35,12 +35,11 @@ int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode) {
return TSDB_CODE_SUCCESS;
}
SColumnInfoData* sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows) {
int32_t sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
if (pColumnData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
return terrno;
}
pColumnData->info.type = pType->type;
@ -52,19 +51,25 @@ SColumnInfoData* sclCreateColumnInfoData(SDataType* pType, int32_t numOfRows) {
if (code != TSDB_CODE_SUCCESS) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
taosMemoryFree(pColumnData);
return NULL;
} else {
return pColumnData;
return terrno;
}
pParam->columnData = pColumnData;
pParam->type = CREATED_COLDATA;
return TSDB_CODE_SUCCESS;
}
int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out) {
SScalarParam in = {.numOfRows = 1};
in.columnData = sclCreateColumnInfoData(&pValueNode->node.resType, 1);
int32_t code = sclCreateColumnInfoData(&pValueNode->node.resType, 1, &in);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
colDataAppend(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
colInfoDataEnsureCapacity(out->columnData, 1);
int32_t code = vectorConvertImpl(&in, out);
code = vectorConvertImpl(&in, out);
sclFreeParam(&in);
return code;
@ -190,8 +195,9 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
case QUERY_NODE_VALUE: {
SValueNode *valueNode = (SValueNode *)node;
ASSERT(param->columnData == NULL);
param->numOfRows = 1;
param->columnData = sclCreateColumnInfoData(&valueNode->node.resType, 1);
/*int32_t code = */sclCreateColumnInfoData(&valueNode->node.resType, 1, param);
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
colDataAppendNULL(param->columnData, 0);
} else {
@ -429,10 +435,9 @@ int32_t sclExecFunction(SFunctionNode *node, SScalarCtx *ctx, SScalarParam *outp
SCL_ERR_JRET(code);
}
output->columnData = sclCreateColumnInfoData(&node->node.resType, rowNum);
if (output->columnData == NULL) {
sclError("calloc %d failed", (int32_t)(rowNum * output->columnData->info.bytes));
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
if (code != TSDB_CODE_SUCCESS) {
SCL_ERR_JRET(code);
}
code = (*ffpSet.process)(params, paramNum, output);
@ -482,10 +487,9 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
output->numOfRows = rowNum;
SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
output->columnData = sclCreateColumnInfoData(&t, rowNum);
if (output->columnData == NULL) {
sclError("calloc %d failed", (int32_t)(rowNum * sizeof(bool)));
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
code = sclCreateColumnInfoData(&t, rowNum, output);
if (code != TSDB_CODE_SUCCESS) {
SCL_ERR_JRET(code);
}
bool value = false;
@ -537,18 +541,19 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
int32_t code = 0;
// json not support in in operator
if(nodeType(node->pLeft) == QUERY_NODE_VALUE){
if (nodeType(node->pLeft) == QUERY_NODE_VALUE) {
SValueNode *valueNode = (SValueNode *)node->pLeft;
if(valueNode->node.resType.type == TSDB_DATA_TYPE_JSON && (node->opType == OP_TYPE_IN || node->opType == OP_TYPE_NOT_IN)){
if (valueNode->node.resType.type == TSDB_DATA_TYPE_JSON && (node->opType == OP_TYPE_IN || node->opType == OP_TYPE_NOT_IN)) {
SCL_RET(TSDB_CODE_QRY_JSON_IN_ERROR);
}
}
SCL_ERR_RET(sclInitOperatorParams(&params, node, ctx, &rowNum));
output->columnData = sclCreateColumnInfoData(&node->node.resType, rowNum);
if (output->columnData == NULL) {
sclError("calloc failed, size:%d", (int32_t)rowNum * node->node.resType.bytes);
SCL_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
code = sclCreateColumnInfoData(&node->node.resType, rowNum, output);
if (code != TSDB_CODE_SUCCESS) {
SCL_ERR_JRET(code);
}
}
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(node->opType);
@ -563,7 +568,10 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
_return:
for (int32_t i = 0; i < paramNum; ++i) {
// sclFreeParam(&params[i]);
if (params[i].type == CREATED_COLDATA) {
colDataDestroy(params[i].columnData);
taosMemoryFree(params[i].columnData);
}
}
taosMemoryFreeClear(params);
@ -766,7 +774,7 @@ EDealRes sclRewriteOperator(SNode** pNode, SScalarCtx *ctx) {
return sclRewriteNonConstOperator(pNode, ctx);
}
SScalarParam output = {.columnData = taosMemoryCalloc(1, sizeof(SColumnInfoData))};
SScalarParam output = {0};
ctx->code = sclExecOperator(node, ctx, &output);
if (ctx->code) {
return DEAL_RES_ERROR;
@ -1026,7 +1034,8 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
pDst->numOfRows = res->numOfRows;
}
sclFreeParam(res);
taosHashRemove(ctx.pRes, (void *)&pNode, POINTER_BYTES);
}

View File

@ -865,12 +865,11 @@ int32_t vectorGetConvertType(int32_t type1, int32_t type2) {
}
int32_t vectorConvertScalarParam(SScalarParam *input, SScalarParam *output, int32_t type) {
int32_t code = 0;
SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
output->numOfRows = input->numOfRows;
output->columnData = sclCreateColumnInfoData(&t, input->numOfRows);
if (output->columnData == NULL) {
int32_t code = sclCreateColumnInfoData(&t, input->numOfRows, output);
if (code != TSDB_CODE_SUCCESS) {
return TSDB_CODE_OUT_OF_MEMORY;
}
@ -940,13 +939,12 @@ static int32_t doConvertHelper(SScalarParam* pDest, int32_t* convert, const SSca
pDest->numOfRows = pParam->numOfRows;
SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
pDest->columnData = sclCreateColumnInfoData(&t, pParam->numOfRows);
if (pDest->columnData == NULL) {
sclError("malloc %d failed", (int32_t)(pParam->numOfRows * sizeof(double)));
return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = sclCreateColumnInfoData(&t, pParam->numOfRows, pDest);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
int32_t code = vectorConvertImpl(pParam, pDest);
code = vectorConvertImpl(pParam, pDest);
if (code != TSDB_CODE_SUCCESS) {
return code;
}