diff --git a/source/libs/scalar/src/filter.c b/source/libs/scalar/src/filter.c index b551851232..8d5a925898 100644 --- a/source/libs/scalar/src/filter.c +++ b/source/libs/scalar/src/filter.c @@ -1077,10 +1077,17 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type, if (info->pctx.valHash == NULL) { info->pctx.valHash = taosHashInit(FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_VALUE_SIZE, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); + if (NULL == info->pctx.valHash) { + fltError("taosHashInit failed, size:%d", FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_VALUE_SIZE); + FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } } SFilterDataInfo dInfo = {idx, *data}; - taosHashPut(info->pctx.valHash, *data, dataLen, &dInfo, sizeof(dInfo)); + if (taosHashPut(info->pctx.valHash, *data, dataLen, &dInfo, sizeof(dInfo))) { + fltError("taosHashPut to set failed"); + FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } if (srcFlag) { FILTER_SET_FLAG(*srcFlag, FLD_DATA_NO_FREE); } @@ -1141,6 +1148,10 @@ int32_t filterAddUnitImpl(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, if (info->pctx.unitHash == NULL) { info->pctx.unitHash = taosHashInit(FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_UNIT_SIZE, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, false); + if (NULL == info->pctx.unitHash) { + fltError("taosHashInit failed, size:%d", FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_UNIT_SIZE); + FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } } else { char v[14] = {0}; FLT_PACKAGE_UNIT_HASH_KEY(&v, optr, optr2, left->idx, (right ? right->idx : -1), (right2 ? right2->idx : -1)); @@ -1198,7 +1209,11 @@ int32_t filterAddUnitImpl(SFilterInfo *info, uint8_t optr, SFilterFieldId *left, if (FILTER_GET_FLAG(info->options, FLT_OPTION_NEED_UNIQE)) { char v[14] = {0}; FLT_PACKAGE_UNIT_HASH_KEY(&v, optr, optr2, left->idx, (right ? right->idx : -1), (right2 ? right2->idx : -1)); - taosHashPut(info->pctx.unitHash, v, sizeof(v), uidx, sizeof(*uidx)); + if (taosHashPut(info->pctx.unitHash, v, sizeof(v), uidx, sizeof(*uidx))) { + fltError("taosHashPut to set failed"); + FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + } ++info->unitNum; diff --git a/source/libs/scalar/src/scalar.c b/source/libs/scalar/src/scalar.c index 389f8b01a3..191ce16da1 100644 --- a/source/libs/scalar/src/scalar.c +++ b/source/libs/scalar/src/scalar.c @@ -72,9 +72,16 @@ int32_t sclConvertValueToSclParam(SValueNode *pValueNode, SScalarParam *out, int return code; } - colDataSetVal(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); + code = colDataSetVal(in.columnData, 0, nodesGetValueFromNode(pValueNode), false); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = colInfoDataEnsureCapacity(out->columnData, 1, true); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - colInfoDataEnsureCapacity(out->columnData, 1, true); code = vectorConvertSingleColImpl(&in, out, overflow, -1, -1); sclFreeParam(&in); @@ -84,6 +91,7 @@ int32_t sclConvertValueToSclParam(SValueNode *pValueNode, SScalarParam *out, int int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) { SSDataBlock *pb = taosArrayGetP(pBlockList, 0); SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam)); + int32_t code = TSDB_CODE_SUCCESS; if (NULL == pLeft) { sclError("calloc %d failed", (int32_t)sizeof(SScalarParam)); SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); @@ -92,15 +100,16 @@ int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockL pLeft->numOfRows = pb->info.rows; if (pDst->numOfRows < pb->info.rows) { - colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows, true); + SCL_ERR_JRET(colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows, true)); } _bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN); - OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC); + SCL_ERR_JRET(OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC)); +_return: taosMemoryFree(pLeft); - return TSDB_CODE_SUCCESS; + SCL_RET(code); } int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) { @@ -345,7 +354,7 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) { colDataSetNULL(param->columnData, 0); } else { - colDataSetVal(param->columnData, 0, nodesGetValueFromNode(valueNode), false); + SCL_ERR_RET(colDataSetVal(param->columnData, 0, nodesGetValueFromNode(valueNode), false)); } break; } @@ -504,23 +513,27 @@ _return: SCL_RET(code); } -int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) { +int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx, int32_t *type) { if (NULL == pNode) { - return -1; + *type = -1; + return TSDB_CODE_SUCCESS; } switch ((int)nodeType(pNode)) { case QUERY_NODE_VALUE: { SValueNode *valueNode = (SValueNode *)pNode; - return valueNode->node.resType.type; + *type = valueNode->node.resType.type; + return TSDB_CODE_SUCCESS; } case QUERY_NODE_NODE_LIST: { SNodeListNode *nodeList = (SNodeListNode *)pNode; - return nodeList->node.resType.type; + *type = nodeList->node.resType.type; + return TSDB_CODE_SUCCESS; } case QUERY_NODE_COLUMN: { SColumnNode *colNode = (SColumnNode *)pNode; - return colNode->node.resType.type; + *type = colNode->node.resType.type; + return TSDB_CODE_SUCCESS; } case QUERY_NODE_FUNCTION: case QUERY_NODE_OPERATOR: @@ -528,19 +541,22 @@ int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) { SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &pNode, POINTER_BYTES); if (NULL == res) { sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode); - return -1; + SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - return res->columnData->info.type; + *type = (int32_t)(res->columnData->info.type); + return TSDB_CODE_SUCCESS; } } - return -1; + *type = -1; + return TSDB_CODE_SUCCESS; } -void sclSetOperatorValueType(SOperatorNode *node, SScalarCtx *ctx) { +int32_t sclSetOperatorValueType(SOperatorNode *node, SScalarCtx *ctx) { ctx->type.opResType = node->node.resType.type; - ctx->type.selfType = sclGetNodeType(node->pLeft, ctx); - ctx->type.peerType = sclGetNodeType(node->pRight, ctx); + SCL_ERR_RET(sclGetNodeType(node->pLeft, ctx, &(ctx->type.selfType))); + SCL_ERR_RET(sclGetNodeType(node->pRight, ctx, &(ctx->type.peerType))); + SCL_RET(TSDB_CODE_SUCCESS); } int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) { @@ -557,7 +573,7 @@ int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScal SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - sclSetOperatorValueType(node, ctx); + SCL_ERR_JRET(sclSetOperatorValueType(node, ctx)); SCL_ERR_JRET(sclInitParam(node->pLeft, ¶mList[0], ctx, rowNum)); if (paramNum > 1) { @@ -829,7 +845,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o } if (complete) { - colDataSetVal(output->columnData, i, (char *)&value, false); + SCL_ERR_JRET(colDataSetVal(output->columnData, i, (char *)&value, false)); if (value) { numOfQualified++; } @@ -877,9 +893,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp SScalarParam *pLeft = ¶ms[0]; SScalarParam *pRight = paramNum > 1 ? ¶ms[1] : NULL; - terrno = TSDB_CODE_SUCCESS; - OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC); - code = terrno; + SCL_ERR_JRET(OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC)); _return: @@ -930,13 +944,14 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp } if (pCase) { - vectorCompare(pCase, pWhen, &comp, TSDB_ORDER_ASC, OP_TYPE_EQUAL); + SCL_ERR_JRET(vectorCompare(pCase, pWhen, &comp, TSDB_ORDER_ASC, OP_TYPE_EQUAL)); for (int32_t i = 0; i < rowNum; ++i) { bool *equal = (bool *)colDataGetData(comp.columnData, (comp.numOfRows > 1 ? i : 0)); if (*equal) { - colDataSetVal(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), - colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0))); + SCL_ERR_JRET(colDataSetVal(output->columnData, i, + colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), + colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)))); if (0 == i && 1 == pCase->numOfRows && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) { SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList)); break; @@ -953,8 +968,9 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp for (int32_t i = 0; i < rowNum; ++i) { bool *whenValue = (bool *)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? i : 0)); if (*whenValue) { - colDataSetVal(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), - colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0))); + SCL_ERR_JRET(colDataSetVal(output->columnData, i, + colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)), + colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)))); if (0 == i && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) { SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList)); break; @@ -1065,42 +1081,34 @@ static uint8_t sclGetOpValueNodeTsPrecision(SNode *pLeft, SNode *pRight) { return 0; } -int32_t sclConvertOpValueNodeTs(SOperatorNode *node, SScalarCtx *ctx) { - int32_t code = 0; +int32_t sclConvertOpValueNodeTs(SOperatorNode *node) { if (node->pLeft && SCL_IS_VAR_VALUE_NODE(node->pLeft)) { if (node->pRight && (TSDB_DATA_TYPE_TIMESTAMP == ((SExprNode *)node->pRight)->resType.type)) { - SCL_ERR_JRET( + SCL_ERR_RET( sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, node->pRight), (SValueNode *)node->pLeft)); } } else if (node->pRight && SCL_IS_NOTNULL_CONST_NODE(node->pRight)) { if (node->pLeft && (TSDB_DATA_TYPE_TIMESTAMP == ((SExprNode *)node->pLeft)->resType.type)) { if (SCL_IS_VAR_VALUE_NODE(node->pRight)) { - SCL_ERR_JRET(sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, node->pRight), + SCL_ERR_RET(sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, node->pRight), (SValueNode *)node->pRight)); } else if (QUERY_NODE_NODE_LIST == node->pRight->type) { SNode *pNode; FOREACH(pNode, ((SNodeListNode *)node->pRight)->pNodeList) { if (SCL_IS_VAR_VALUE_NODE(pNode)) { - SCL_ERR_JRET( + SCL_ERR_RET( sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, pNode), (SValueNode *)pNode)); } } } } } - return TSDB_CODE_SUCCESS; - -_return: - - ctx->code = code; - return DEAL_RES_ERROR; } -int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node, SScalarCtx *ctx) { - int32_t code = 0; +int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node) { if (NULL == node->pCase) { return TSDB_CODE_SUCCESS; } @@ -1110,7 +1118,7 @@ int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node, SScalarCtx *ctx) { FOREACH(pNode, node->pWhenThenList) { SExprNode *pExpr = (SExprNode *)((SWhenThenNode *)pNode)->pWhen; if (TSDB_DATA_TYPE_TIMESTAMP == pExpr->resType.type) { - SCL_ERR_JRET(sclConvertToTsValueNode(pExpr->resType.precision, (SValueNode *)node->pCase)); + SCL_ERR_RET(sclConvertToTsValueNode(pExpr->resType.precision, (SValueNode *)node->pCase)); break; } } @@ -1118,18 +1126,13 @@ int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node, SScalarCtx *ctx) { SNode *pNode; FOREACH(pNode, node->pWhenThenList) { if (SCL_IS_VAR_VALUE_NODE(((SWhenThenNode *)pNode)->pWhen)) { - SCL_ERR_JRET(sclConvertToTsValueNode(((SExprNode *)node->pCase)->resType.precision, + SCL_ERR_RET(sclConvertToTsValueNode(((SExprNode *)node->pCase)->resType.precision, (SValueNode *)((SWhenThenNode *)pNode)->pWhen)); } } } return TSDB_CODE_SUCCESS; - -_return: - - ctx->code = code; - return DEAL_RES_ERROR; } EDealRes sclRewriteNonConstOperator(SNode **pNode, SScalarCtx *ctx) { @@ -1227,14 +1230,30 @@ EDealRes sclRewriteFunction(SNode **pNode, SScalarCtx *ctx) { if (type == TSDB_DATA_TYPE_JSON) { int32_t len = getJsonValueLen(output.columnData->pData); res->datum.p = taosMemoryCalloc(len, 1); + if (NULL == res->datum.p) { + sclError("calloc %d failed", len); + sclFreeParam(&output); + ctx->code = TSDB_CODE_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } memcpy(res->datum.p, output.columnData->pData, len); } else if (IS_VAR_DATA_TYPE(type)) { // res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1); res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData) + 1, 1); + if (NULL == res->datum.p) { + sclError("calloc %d failed", (int)(varDataTLen(output.columnData->pData) + 1)); + sclFreeParam(&output); + ctx->code = TSDB_CODE_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } res->node.resType.bytes = varDataTLen(output.columnData->pData); memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); } else { - nodesSetValueNodeValue(res, output.columnData->pData); + ctx->code = nodesSetValueNodeValue(res, output.columnData->pData); + if (ctx->code) { + sclFreeParam(&output); + return DEAL_RES_ERROR; + } } } @@ -1271,13 +1290,17 @@ EDealRes sclRewriteLogic(SNode **pNode, SScalarCtx *ctx) { res->node.resType = node->node.resType; res->translate = true; - strcpy(res->node.aliasName, node->node.aliasName); + (void)strcpy(res->node.aliasName, node->node.aliasName); int32_t type = output.columnData->info.type; if (IS_VAR_DATA_TYPE(type)) { res->datum.p = output.columnData->pData; output.columnData->pData = NULL; } else { - nodesSetValueNodeValue(res, output.columnData->pData); + ctx->code = nodesSetValueNodeValue(res, output.columnData->pData); + if (ctx->code) { + sclFreeParam(&output); + return DEAL_RES_ERROR; + } } nodesDestroyNode(*pNode); @@ -1290,7 +1313,10 @@ EDealRes sclRewriteLogic(SNode **pNode, SScalarCtx *ctx) { EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) { SOperatorNode *node = (SOperatorNode *)*pNode; - SCL_ERR_RET(sclConvertOpValueNodeTs(node, ctx)); + ctx->code = sclConvertOpValueNodeTs(node); + if (ctx->code) { + return DEAL_RES_ERROR; + } if ((!SCL_IS_CONST_NODE(node->pLeft)) || (!SCL_IS_CONST_NODE(node->pRight))) { return sclRewriteNonConstOperator(pNode, ctx); @@ -1324,7 +1350,11 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) { res->datum.p = output.columnData->pData; output.columnData->pData = NULL; } else { - nodesSetValueNodeValue(res, output.columnData->pData); + ctx->code = nodesSetValueNodeValue(res, output.columnData->pData); + if (ctx->code) { + sclFreeParam(&output); + return DEAL_RES_ERROR; + } } } @@ -1338,7 +1368,10 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) { EDealRes sclRewriteCaseWhen(SNode **pNode, SScalarCtx *ctx) { SCaseWhenNode *node = (SCaseWhenNode *)*pNode; - SCL_ERR_RET(sclConvertCaseWhenValueNodeTs(node, ctx)); + ctx->code = sclConvertCaseWhenValueNodeTs(node); + if (ctx->code) { + return DEAL_RES_ERROR; + } if ((!SCL_IS_CONST_NODE(node->pCase)) || (!SCL_IS_CONST_NODE(node->pElse))) { return DEAL_RES_CONTINUE; @@ -1378,9 +1411,19 @@ EDealRes sclRewriteCaseWhen(SNode **pNode, SScalarCtx *ctx) { int32_t type = output.columnData->info.type; if (IS_VAR_DATA_TYPE(type)) { // todo refactor res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData) + 1, sizeof(char)); // add \0 to the end for print json value + if (NULL == res->datum.p) { + sclError("calloc %d failed", (int)(varDataTLen(output.columnData->pData) + 1)); + sclFreeParam(&output); + ctx->code = TSDB_CODE_OUT_OF_MEMORY; + return DEAL_RES_ERROR; + } memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData)); } else { - nodesSetValueNodeValue(res, output.columnData->pData); + ctx->code = nodesSetValueNodeValue(res, output.columnData->pData); + if (ctx->code) { + sclFreeParam(&output); + return DEAL_RES_ERROR; + } } } @@ -1515,7 +1558,10 @@ EDealRes sclWalkTarget(SNode *pNode, SScalarCtx *ctx) { return DEAL_RES_ERROR; } - colDataAssign(col, res->columnData, res->numOfRows, NULL); + ctx->code = colDataAssign(col, res->columnData, res->numOfRows, NULL); + if (ctx->code) { + return DEAL_RES_ERROR; + } block->info.rows = res->numOfRows; sclFreeParam(res); @@ -1735,8 +1781,8 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) { if (1 == res->numOfRows && pb->info.rows > 0) { SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList)); } else { - colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true); - colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL); + SCL_ERR_JRET(colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true)); + SCL_ERR_JRET(colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL)); pDst->numOfRows = res->numOfRows; pDst->numOfQualified = res->numOfQualified; }