fix: memory leak

This commit is contained in:
Xiaoyu Wang 2022-11-15 09:39:17 +08:00
parent 901c9b8f77
commit 14b4adc6ce
2 changed files with 91 additions and 82 deletions

View File

@ -23,12 +23,13 @@ int32_t scalarGetOperatorParamNum(EOperatorType type) {
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode *valueNode) {
char *timeStr = valueNode->datum.p;
int32_t code =
convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &valueNode->datum.i);
int64_t value = 0;
int32_t code = convertStringToTimestamp(valueNode->node.resType.type, valueNode->datum.p, precision, &value);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
taosMemoryFree(timeStr);
valueNode->datum.i = value;
valueNode->typeData = valueNode->datum.i;
valueNode->node.resType.type = TSDB_DATA_TYPE_TIMESTAMP;
@ -582,14 +583,17 @@ int32_t sclGetNodeRes(SNode* node, SScalarCtx *ctx, SScalarParam **res) {
return TSDB_CODE_SUCCESS;
}
int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCell, SScalarParam *pCase, SScalarParam *pElse, SScalarParam *pComp, SScalarParam *output, int32_t rowIdx, int32_t totalRows, bool *complete) {
int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell *pCell, SScalarParam *pCase,
SScalarParam *pElse, SScalarParam *pComp, SScalarParam *output, int32_t rowIdx,
int32_t totalRows, bool *complete) {
SNode *node = NULL;
SWhenThenNode *pWhenThen = NULL;
SScalarParam *pWhen = NULL;
SScalarParam *pThen = NULL;
int32_t code = 0;
for (SListCell* cell = pCell; (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext) {
for (SListCell *cell = pCell; (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false));
cell = cell->pNext) {
pWhenThen = (SWhenThenNode *)node;
SCL_ERR_RET(sclGetNodeRes(pWhenThen->pWhen, ctx, &pWhen));
@ -599,7 +603,8 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell*
bool *equal = (bool *)colDataGetData(pComp->columnData, rowIdx);
if (*equal) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)));
colDataAppend(output->columnData, rowIdx, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)),
colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)));
if (0 == rowIdx && 1 == pCase->numOfRows && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
@ -611,7 +616,8 @@ int32_t sclWalkCaseWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell*
}
if (pElse) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)));
colDataAppend(output->columnData, rowIdx, colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)),
colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)));
if (0 == rowIdx && 1 == pCase->numOfRows && 1 == pElse->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
@ -638,15 +644,16 @@ _return:
SCL_RET(code);
}
int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCell, SScalarParam *pElse, SScalarParam *output,
int32_t rowIdx, int32_t totalRows, bool *complete, bool preSingle) {
int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList *pList, struct SListCell *pCell, SScalarParam *pElse,
SScalarParam *output, int32_t rowIdx, int32_t totalRows, bool *complete, bool preSingle) {
SNode *node = NULL;
SWhenThenNode *pWhenThen = NULL;
SScalarParam *pWhen = NULL;
SScalarParam *pThen = NULL;
int32_t code = 0;
for (SListCell* cell = pCell; (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false)); cell = cell->pNext) {
for (SListCell *cell = pCell; (NULL != cell ? (node = cell->pNode, true) : (node = NULL, false));
cell = cell->pNext) {
pWhenThen = (SWhenThenNode *)node;
pWhen = NULL;
pThen = NULL;
@ -657,7 +664,8 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCe
bool *whenValue = (bool *)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? rowIdx : 0));
if (*whenValue) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)));
colDataAppend(output->columnData, rowIdx, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)),
colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? rowIdx : 0)));
if (preSingle && 0 == rowIdx && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
@ -674,7 +682,8 @@ int32_t sclWalkWhenList(SScalarCtx *ctx, SNodeList* pList, struct SListCell* pCe
}
if (pElse) {
colDataAppend(output->columnData, rowIdx, colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)), colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)));
colDataAppend(output->columnData, rowIdx, colDataGetData(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)),
colDataIsNull_s(pElse->columnData, (pElse->numOfRows > 1 ? rowIdx : 0)));
if (preSingle && 0 == rowIdx && 1 == pElse->numOfRows && totalRows > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
@ -907,13 +916,15 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
for (int32_t i = 0; i < rowNum; ++i) {
bool *equal = (bool *)colDataGetData(comp.columnData, (comp.numOfRows > 1 ? i : 0));
if (*equal) {
colDataAppend(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
colDataAppend(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)),
colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
if (0 == i && 1 == pCase->numOfRows && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
break;
}
} else {
SCL_ERR_JRET(sclWalkCaseWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pCase, pElse, &comp, output, i, rowNum, &complete));
SCL_ERR_JRET(sclWalkCaseWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pCase, pElse,
&comp, output, i, rowNum, &complete));
if (complete) {
break;
}
@ -923,13 +934,15 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
for (int32_t i = 0; i < rowNum; ++i) {
bool *whenValue = (bool *)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? i : 0));
if (*whenValue) {
colDataAppend(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
colDataAppend(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)),
colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
if (0 == i && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
break;
}
} else {
SCL_ERR_JRET(sclWalkWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pElse, output, i, rowNum, &complete, (pWhen->numOfRows == 1 && pThen->numOfRows == 1)));
SCL_ERR_JRET(sclWalkWhenList(ctx, node->pWhenThenList, node->pWhenThenList->pHead->pNext, pElse, output, i,
rowNum, &complete, (pWhen->numOfRows == 1 && pThen->numOfRows == 1)));
if (complete) {
break;
}
@ -965,7 +978,6 @@ _return:
SCL_RET(code);
}
EDealRes sclRewriteNullInOptr(SNode **pNode, SScalarCtx *ctx, EOperatorType opType) {
if (opType <= OP_TYPE_CALC_MAX) {
SValueNode *res = (SValueNode *)nodesMakeNode(QUERY_NODE_VALUE);
@ -1089,8 +1101,7 @@ EDealRes sclRewriteNonConstOperator(SNode **pNode, SScalarCtx *ctx) {
EDealRes sclRewriteFunction(SNode **pNode, SScalarCtx *ctx) {
SFunctionNode *node = (SFunctionNode *)*pNode;
SNode *tnode = NULL;
if ((!fmIsScalarFunc(node->funcId) && (!ctx->dual)) ||
fmIsUserDefinedFunc(node->funcId)) {
if ((!fmIsScalarFunc(node->funcId) && (!ctx->dual)) || fmIsUserDefinedFunc(node->funcId)) {
return DEAL_RES_CONTINUE;
}
@ -1281,7 +1292,6 @@ EDealRes sclRewriteCaseWhen(SNode** pNode, SScalarCtx *ctx) {
return DEAL_RES_CONTINUE;
}
EDealRes sclConstantsRewriter(SNode **pNode, void *pContext) {
SScalarCtx *ctx = (SScalarCtx *)pContext;
@ -1425,11 +1435,10 @@ EDealRes sclWalkCaseWhen(SNode* pNode, SScalarCtx *ctx) {
return DEAL_RES_CONTINUE;
}
EDealRes sclCalcWalker(SNode *pNode, void *pContext) {
if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode)
|| QUERY_NODE_COLUMN == nodeType(pNode) || QUERY_NODE_LEFT_VALUE == nodeType(pNode)
|| QUERY_NODE_WHEN_THEN == nodeType(pNode)) {
if (QUERY_NODE_VALUE == nodeType(pNode) || QUERY_NODE_NODE_LIST == nodeType(pNode) ||
QUERY_NODE_COLUMN == nodeType(pNode) || QUERY_NODE_LEFT_VALUE == nodeType(pNode) ||
QUERY_NODE_WHEN_THEN == nodeType(pNode)) {
return DEAL_RES_CONTINUE;
}