fix TD-20751 mem leak

This commit is contained in:
yihaoDeng 2022-11-25 19:38:06 +08:00
parent c757b26d15
commit ef8c68a098
2 changed files with 106 additions and 101 deletions

View File

@ -1210,6 +1210,7 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
SScalarParam output = {0}; SScalarParam output = {0};
ctx->code = sclExecOperator(node, ctx, &output); ctx->code = sclExecOperator(node, ctx, &output);
if (ctx->code) { if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR; return DEAL_RES_ERROR;
} }

View File

@ -344,8 +344,8 @@ static FORCE_INLINE void varToNchar(char *buf, SScalarParam *pOut, int32_t rowIn
int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; int32_t outputMaxLen = (inputLen + 1) * TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE;
char *t = taosMemoryCalloc(1, outputMaxLen); char *t = taosMemoryCalloc(1, outputMaxLen);
int32_t ret = taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t), int32_t ret =
outputMaxLen - VARSTR_HEADER_SIZE, &len); taosMbsToUcs4(varDataVal(buf), inputLen, (TdUcs4 *)varDataVal(t), outputMaxLen - VARSTR_HEADER_SIZE, &len);
if (!ret) { if (!ret) {
sclError("failed to convert to NCHAR"); sclError("failed to convert to NCHAR");
} }
@ -370,8 +370,8 @@ static FORCE_INLINE void ncharToVar(char *buf, SScalarParam *pOut, int32_t rowIn
taosMemoryFree(t); taosMemoryFree(t);
} }
//TODO opt performance, tmp is not needed. // TODO opt performance, tmp is not needed.
int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t* overflow) { int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t *overflow) {
bool vton = false; bool vton = false;
_bufConverteFunc func = NULL; _bufConverteFunc func = NULL;
@ -405,10 +405,10 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t* overflow) {
continue; continue;
} }
char* data = colDataGetVarData(pCtx->pIn->columnData, i); char *data = colDataGetVarData(pCtx->pIn->columnData, i);
int32_t convertType = pCtx->inType; int32_t convertType = pCtx->inType;
if(pCtx->inType == TSDB_DATA_TYPE_JSON){ if (pCtx->inType == TSDB_DATA_TYPE_JSON) {
if(*data == TSDB_DATA_TYPE_NULL) { if (*data == TSDB_DATA_TYPE_NULL) {
ASSERT(0); ASSERT(0);
} else if (*data == TSDB_DATA_TYPE_NCHAR) { } else if (*data == TSDB_DATA_TYPE_NCHAR) {
data += CHAR_BYTES; data += CHAR_BYTES;
@ -417,13 +417,13 @@ int32_t vectorConvertFromVarData(SSclVectorConvCtx *pCtx, int32_t* overflow) {
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR; terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
return terrno; return terrno;
} else { } else {
convertNumberToNumber(data+CHAR_BYTES, colDataGetNumData(pCtx->pOut->columnData, i), *data, pCtx->outType); convertNumberToNumber(data + CHAR_BYTES, colDataGetNumData(pCtx->pOut->columnData, i), *data, pCtx->outType);
continue; continue;
} }
} }
int32_t bufSize = pCtx->pIn->columnData->info.bytes; int32_t bufSize = pCtx->pIn->columnData->info.bytes;
char *tmp = taosMemoryMalloc(varDataTLen(data)); char *tmp = taosMemoryMalloc(varDataTLen(data));
if(!tmp){ if (!tmp) {
sclError("out of memory in vectorConvertFromVarData"); sclError("out of memory in vectorConvertFromVarData");
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -584,11 +584,12 @@ bool convertJsonValue(__compar_fn_t *fp, int32_t optr, int8_t typeLeft, int8_t t
} }
int32_t vectorConvertToVarData(SSclVectorConvCtx *pCtx) { int32_t vectorConvertToVarData(SSclVectorConvCtx *pCtx) {
SColumnInfoData* pInputCol = pCtx->pIn->columnData; SColumnInfoData *pInputCol = pCtx->pIn->columnData;
SColumnInfoData* pOutputCol = pCtx->pOut->columnData; SColumnInfoData *pOutputCol = pCtx->pOut->columnData;
char tmp[128] = {0}; char tmp[128] = {0};
if (IS_SIGNED_NUMERIC_TYPE(pCtx->inType) || pCtx->inType == TSDB_DATA_TYPE_BOOL || pCtx->inType == TSDB_DATA_TYPE_TIMESTAMP) { if (IS_SIGNED_NUMERIC_TYPE(pCtx->inType) || pCtx->inType == TSDB_DATA_TYPE_BOOL ||
pCtx->inType == TSDB_DATA_TYPE_TIMESTAMP) {
for (int32_t i = pCtx->startIndex; i <= pCtx->endIndex; ++i) { for (int32_t i = pCtx->startIndex; i <= pCtx->endIndex; ++i) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutputCol, i); colDataAppendNULL(pOutputCol, i);
@ -648,9 +649,10 @@ int32_t vectorConvertToVarData(SSclVectorConvCtx *pCtx) {
} }
// TODO opt performance // TODO opt performance
int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut, int32_t* overflow, int32_t startIndex, int32_t numOfRows) { int32_t vectorConvertSingleColImpl(const SScalarParam *pIn, SScalarParam *pOut, int32_t *overflow, int32_t startIndex,
SColumnInfoData* pInputCol = pIn->columnData; int32_t numOfRows) {
SColumnInfoData* pOutputCol = pOut->columnData; SColumnInfoData *pInputCol = pIn->columnData;
SColumnInfoData *pOutputCol = pOut->columnData;
if (NULL == pInputCol) { if (NULL == pInputCol) {
sclError("input column is NULL, hashFilter %p", pIn->pHashFilter); sclError("input column is NULL, hashFilter %p", pIn->pHashFilter);
@ -733,7 +735,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut,
} }
break; break;
} }
case TSDB_DATA_TYPE_SMALLINT:{ case TSDB_DATA_TYPE_SMALLINT: {
for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutputCol, i); colDataAppendNULL(pOutputCol, i);
@ -746,7 +748,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut,
} }
break; break;
} }
case TSDB_DATA_TYPE_INT:{ case TSDB_DATA_TYPE_INT: {
for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutputCol, i); colDataAppendNULL(pOutputCol, i);
@ -773,7 +775,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut,
} }
break; break;
} }
case TSDB_DATA_TYPE_UTINYINT:{ case TSDB_DATA_TYPE_UTINYINT: {
for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutputCol, i); colDataAppendNULL(pOutputCol, i);
@ -786,7 +788,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut,
} }
break; break;
} }
case TSDB_DATA_TYPE_USMALLINT:{ case TSDB_DATA_TYPE_USMALLINT: {
for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutputCol, i); colDataAppendNULL(pOutputCol, i);
@ -799,7 +801,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut,
} }
break; break;
} }
case TSDB_DATA_TYPE_UINT:{ case TSDB_DATA_TYPE_UINT: {
for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutputCol, i); colDataAppendNULL(pOutputCol, i);
@ -821,11 +823,11 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut,
uint64_t value = 0; uint64_t value = 0;
GET_TYPED_DATA(value, uint64_t, cCtx.inType, colDataGetData(pInputCol, i)); GET_TYPED_DATA(value, uint64_t, cCtx.inType, colDataGetData(pInputCol, i));
colDataAppendInt64(pOutputCol, i, (int64_t*)&value); colDataAppendInt64(pOutputCol, i, (int64_t *)&value);
} }
break; break;
} }
case TSDB_DATA_TYPE_FLOAT:{ case TSDB_DATA_TYPE_FLOAT: {
for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) { for (int32_t i = cCtx.startIndex; i <= cCtx.endIndex; ++i) {
if (colDataIsNull_f(pInputCol->nullbitmap, i)) { if (colDataIsNull_f(pInputCol->nullbitmap, i)) {
colDataAppendNULL(pOutputCol, i); colDataAppendNULL(pOutputCol, i);
@ -834,7 +836,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut,
float value = 0; float value = 0;
GET_TYPED_DATA(value, float, cCtx.inType, colDataGetData(pInputCol, i)); GET_TYPED_DATA(value, float, cCtx.inType, colDataGetData(pInputCol, i));
colDataAppendFloat(pOutputCol, i, (float*)&value); colDataAppendFloat(pOutputCol, i, (float *)&value);
} }
break; break;
} }
@ -847,7 +849,7 @@ int32_t vectorConvertSingleColImpl(const SScalarParam* pIn, SScalarParam* pOut,
double value = 0; double value = 0;
GET_TYPED_DATA(value, double, cCtx.inType, colDataGetData(pInputCol, i)); GET_TYPED_DATA(value, double, cCtx.inType, colDataGetData(pInputCol, i));
colDataAppendDouble(pOutputCol, i, (double*)&value); colDataAppendDouble(pOutputCol, i, (double *)&value);
} }
break; break;
} }
@ -897,7 +899,8 @@ int32_t vectorGetConvertType(int32_t type1, int32_t type2) {
return gConvertTypes[type2][type1]; return gConvertTypes[type2][type1];
} }
int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, int32_t startIndex, int32_t numOfRows) { int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_t type, int32_t startIndex,
int32_t numOfRows) {
SDataType t = {.type = type, .bytes = tDataTypes[type].bytes}; SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
output->numOfRows = input->numOfRows; output->numOfRows = input->numOfRows;
@ -914,7 +917,8 @@ int32_t vectorConvertSingleCol(SScalarParam *input, SScalarParam *output, int32_
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t vectorConvertCols(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* pLeftOut, SScalarParam* pRightOut, int32_t startIndex, int32_t numOfRows) { int32_t vectorConvertCols(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pLeftOut, SScalarParam *pRightOut,
int32_t startIndex, int32_t numOfRows) {
int32_t leftType = GET_PARAM_TYPE(pLeft); int32_t leftType = GET_PARAM_TYPE(pLeft);
int32_t rightType = GET_PARAM_TYPE(pRight); int32_t rightType = GET_PARAM_TYPE(pRight);
if (leftType == rightType) { if (leftType == rightType) {
@ -1007,9 +1011,9 @@ static void vectorMathTsAddHelper(SColumnInfoData *pLeftCol, SColumnInfoData *pR
} }
} }
static SColumnInfoData* vectorConvertVarToDouble(SScalarParam* pInput, int32_t* converted) { static SColumnInfoData *vectorConvertVarToDouble(SScalarParam *pInput, int32_t *converted) {
SScalarParam output = {0}; SScalarParam output = {0};
SColumnInfoData* pCol = pInput->columnData; SColumnInfoData *pCol = pInput->columnData;
if (IS_VAR_DATA_TYPE(pCol->info.type) && pCol->info.type != TSDB_DATA_TYPE_JSON) { if (IS_VAR_DATA_TYPE(pCol->info.type) && pCol->info.type != TSDB_DATA_TYPE_JSON) {
int32_t code = vectorConvertSingleCol(pInput, &output, TSDB_DATA_TYPE_DOUBLE, -1, -1); int32_t code = vectorConvertSingleCol(pInput, &output, TSDB_DATA_TYPE_DOUBLE, -1, -1);
@ -1536,8 +1540,8 @@ void vectorBitOr(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut,
doReleaseVec(pRightCol, rightConvert); doReleaseVec(pRightCol, rightConvert);
} }
int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t startIndex, int32_t numOfRows, int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t startIndex,
int32_t step, __compar_fn_t fp, int32_t optr) { int32_t numOfRows, int32_t step, __compar_fn_t fp, int32_t optr) {
int32_t num = 0; int32_t num = 0;
for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) { for (int32_t i = startIndex; i < numOfRows && i >= 0; i += step) {
@ -1590,8 +1594,8 @@ int32_t doVectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarPa
return num; return num;
} }
void doVectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t startIndex, int32_t numOfRows, void doVectorCompare(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t startIndex,
int32_t _ord, int32_t optr) { int32_t numOfRows, int32_t _ord, int32_t optr) {
int32_t i = 0; int32_t i = 0;
int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1; int32_t step = ((_ord) == TSDB_ORDER_ASC) ? 1 : -1;
int32_t lType = GET_PARAM_TYPE(pLeft); int32_t lType = GET_PARAM_TYPE(pLeft);
@ -1634,8 +1638,8 @@ void doVectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pO
} }
} }
void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t startIndex, int32_t numOfRows, void vectorCompareImpl(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t startIndex,
int32_t _ord, int32_t optr) { int32_t numOfRows, int32_t _ord, int32_t optr) {
SScalarParam pLeftOut = {0}; SScalarParam pLeftOut = {0};
SScalarParam pRightOut = {0}; SScalarParam pRightOut = {0};
SScalarParam *param1 = NULL; SScalarParam *param1 = NULL;
@ -1666,11 +1670,11 @@ void vectorCompareImpl(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *
sclFreeParam(&pRightOut); sclFreeParam(&pRightOut);
} }
void vectorCompare(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) { void vectorCompare(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord, int32_t optr) {
vectorCompareImpl(pLeft, pRight, pOut, -1, -1, _ord, optr); vectorCompareImpl(pLeft, pRight, pOut, -1, -1, _ord, optr);
} }
void vectorGreater(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { void vectorGreater(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord) {
vectorCompare(pLeft, pRight, pOut, _ord, OP_TYPE_GREATER_THAN); vectorCompare(pLeft, pRight, pOut, _ord, OP_TYPE_GREATER_THAN);
} }
@ -1734,10 +1738,10 @@ void vectorNotNull(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut
pOut->numOfRows = pLeft->numOfRows; pOut->numOfRows = pLeft->numOfRows;
} }
void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut, int32_t _ord) { void vectorIsTrue(SScalarParam *pLeft, SScalarParam *pRight, SScalarParam *pOut, int32_t _ord) {
vectorConvertSingleColImpl(pLeft, pOut, NULL, -1, -1); vectorConvertSingleColImpl(pLeft, pOut, NULL, -1, -1);
for(int32_t i = 0; i < pOut->numOfRows; ++i) { for (int32_t i = 0; i < pOut->numOfRows; ++i) {
if(colDataIsNull_s(pOut->columnData, i)) { if (colDataIsNull_s(pOut->columnData, i)) {
int8_t v = 0; int8_t v = 0;
colDataAppendInt8(pOut->columnData, i, &v); colDataAppendInt8(pOut->columnData, i, &v);
colDataSetNotNull_f(pOut->columnData->nullbitmap, i); colDataSetNotNull_f(pOut->columnData->nullbitmap, i);
@ -1748,7 +1752,7 @@ void vectorIsTrue(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam *pOut,
STagVal getJsonValue(char *json, char *key, bool *isExist) { STagVal getJsonValue(char *json, char *key, bool *isExist) {
STagVal val = {.pKey = key}; STagVal val = {.pKey = key};
if (tTagIsJson((const STag *)json) == false) { if (json == NULL || tTagIsJson((const STag *)json) == false) {
terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR; terrno = TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR;
if (isExist) { if (isExist) {
*isExist = false; *isExist = false;