enh:[TD-30998] Handling return value in scalar.c

This commit is contained in:
sima 2024-07-16 19:26:37 +08:00
parent 434fd97615
commit 2893a93303
2 changed files with 120 additions and 59 deletions

View File

@ -1077,10 +1077,17 @@ int32_t filterAddField(SFilterInfo *info, void *desc, void **data, int32_t type,
if (info->pctx.valHash == NULL) {
info->pctx.valHash = taosHashInit(FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_VALUE_SIZE,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false);
if (NULL == info->pctx.valHash) {
fltError("taosHashInit failed, size:%d", FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_VALUE_SIZE);
FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
SFilterDataInfo dInfo = {idx, *data};
taosHashPut(info->pctx.valHash, *data, dataLen, &dInfo, sizeof(dInfo));
if (taosHashPut(info->pctx.valHash, *data, dataLen, &dInfo, sizeof(dInfo))) {
fltError("taosHashPut to set failed");
FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
if (srcFlag) {
FILTER_SET_FLAG(*srcFlag, FLD_DATA_NO_FREE);
}
@ -1141,6 +1148,10 @@ int32_t filterAddUnitImpl(SFilterInfo *info, uint8_t optr, SFilterFieldId *left,
if (info->pctx.unitHash == NULL) {
info->pctx.unitHash = taosHashInit(FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_UNIT_SIZE,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, false);
if (NULL == info->pctx.unitHash) {
fltError("taosHashInit failed, size:%d", FILTER_DEFAULT_GROUP_SIZE * FILTER_DEFAULT_UNIT_SIZE);
FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
} else {
char v[14] = {0};
FLT_PACKAGE_UNIT_HASH_KEY(&v, optr, optr2, left->idx, (right ? right->idx : -1), (right2 ? right2->idx : -1));
@ -1198,7 +1209,11 @@ int32_t filterAddUnitImpl(SFilterInfo *info, uint8_t optr, SFilterFieldId *left,
if (FILTER_GET_FLAG(info->options, FLT_OPTION_NEED_UNIQE)) {
char v[14] = {0};
FLT_PACKAGE_UNIT_HASH_KEY(&v, optr, optr2, left->idx, (right ? right->idx : -1), (right2 ? right2->idx : -1));
taosHashPut(info->pctx.unitHash, v, sizeof(v), uidx, sizeof(*uidx));
if (taosHashPut(info->pctx.unitHash, v, sizeof(v), uidx, sizeof(*uidx))) {
fltError("taosHashPut to set failed");
FLT_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
}
++info->unitNum;

View File

@ -72,9 +72,16 @@ int32_t sclConvertValueToSclParam(SValueNode *pValueNode, SScalarParam *out, int
return code;
}
colDataSetVal(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
code = colDataSetVal(in.columnData, 0, nodesGetValueFromNode(pValueNode), false);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = colInfoDataEnsureCapacity(out->columnData, 1, true);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
colInfoDataEnsureCapacity(out->columnData, 1, true);
code = vectorConvertSingleColImpl(&in, out, overflow, -1, -1);
sclFreeParam(&in);
@ -84,6 +91,7 @@ int32_t sclConvertValueToSclParam(SValueNode *pValueNode, SScalarParam *out, int
int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockList) {
SSDataBlock *pb = taosArrayGetP(pBlockList, 0);
SScalarParam *pLeft = taosMemoryCalloc(1, sizeof(SScalarParam));
int32_t code = TSDB_CODE_SUCCESS;
if (NULL == pLeft) {
sclError("calloc %d failed", (int32_t)sizeof(SScalarParam));
SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
@ -92,15 +100,16 @@ int32_t sclExtendResRows(SScalarParam *pDst, SScalarParam *pSrc, SArray *pBlockL
pLeft->numOfRows = pb->info.rows;
if (pDst->numOfRows < pb->info.rows) {
colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows, true);
SCL_ERR_JRET(colInfoDataEnsureCapacity(pDst->columnData, pb->info.rows, true));
}
_bin_scalar_fn_t OperatorFn = getBinScalarOperatorFn(OP_TYPE_ASSIGN);
OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC);
SCL_ERR_JRET(OperatorFn(pLeft, pSrc, pDst, TSDB_ORDER_ASC));
_return:
taosMemoryFree(pLeft);
return TSDB_CODE_SUCCESS;
SCL_RET(code);
}
int32_t scalarGenerateSetFromList(void **data, void *pNode, uint32_t type) {
@ -345,7 +354,7 @@ int32_t sclInitParam(SNode *node, SScalarParam *param, SScalarCtx *ctx, int32_t
if (TSDB_DATA_TYPE_NULL == valueNode->node.resType.type || valueNode->isNull) {
colDataSetNULL(param->columnData, 0);
} else {
colDataSetVal(param->columnData, 0, nodesGetValueFromNode(valueNode), false);
SCL_ERR_RET(colDataSetVal(param->columnData, 0, nodesGetValueFromNode(valueNode), false));
}
break;
}
@ -504,23 +513,27 @@ _return:
SCL_RET(code);
}
int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) {
int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx, int32_t *type) {
if (NULL == pNode) {
return -1;
*type = -1;
return TSDB_CODE_SUCCESS;
}
switch ((int)nodeType(pNode)) {
case QUERY_NODE_VALUE: {
SValueNode *valueNode = (SValueNode *)pNode;
return valueNode->node.resType.type;
*type = valueNode->node.resType.type;
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_NODE_LIST: {
SNodeListNode *nodeList = (SNodeListNode *)pNode;
return nodeList->node.resType.type;
*type = nodeList->node.resType.type;
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_COLUMN: {
SColumnNode *colNode = (SColumnNode *)pNode;
return colNode->node.resType.type;
*type = colNode->node.resType.type;
return TSDB_CODE_SUCCESS;
}
case QUERY_NODE_FUNCTION:
case QUERY_NODE_OPERATOR:
@ -528,19 +541,22 @@ int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) {
SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &pNode, POINTER_BYTES);
if (NULL == res) {
sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode);
return -1;
SCL_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
return res->columnData->info.type;
*type = (int32_t)(res->columnData->info.type);
return TSDB_CODE_SUCCESS;
}
}
return -1;
*type = -1;
return TSDB_CODE_SUCCESS;
}
void sclSetOperatorValueType(SOperatorNode *node, SScalarCtx *ctx) {
int32_t sclSetOperatorValueType(SOperatorNode *node, SScalarCtx *ctx) {
ctx->type.opResType = node->node.resType.type;
ctx->type.selfType = sclGetNodeType(node->pLeft, ctx);
ctx->type.peerType = sclGetNodeType(node->pRight, ctx);
SCL_ERR_RET(sclGetNodeType(node->pLeft, ctx, &(ctx->type.selfType)));
SCL_ERR_RET(sclGetNodeType(node->pRight, ctx, &(ctx->type.peerType)));
SCL_RET(TSDB_CODE_SUCCESS);
}
int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
@ -557,7 +573,7 @@ int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScal
SCL_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
sclSetOperatorValueType(node, ctx);
SCL_ERR_JRET(sclSetOperatorValueType(node, ctx));
SCL_ERR_JRET(sclInitParam(node->pLeft, &paramList[0], ctx, rowNum));
if (paramNum > 1) {
@ -829,7 +845,7 @@ int32_t sclExecLogic(SLogicConditionNode *node, SScalarCtx *ctx, SScalarParam *o
}
if (complete) {
colDataSetVal(output->columnData, i, (char *)&value, false);
SCL_ERR_JRET(colDataSetVal(output->columnData, i, (char *)&value, false));
if (value) {
numOfQualified++;
}
@ -877,9 +893,7 @@ int32_t sclExecOperator(SOperatorNode *node, SScalarCtx *ctx, SScalarParam *outp
SScalarParam *pLeft = &params[0];
SScalarParam *pRight = paramNum > 1 ? &params[1] : NULL;
terrno = TSDB_CODE_SUCCESS;
OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC);
code = terrno;
SCL_ERR_JRET(OperatorFn(pLeft, pRight, output, TSDB_ORDER_ASC));
_return:
@ -930,13 +944,14 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
}
if (pCase) {
vectorCompare(pCase, pWhen, &comp, TSDB_ORDER_ASC, OP_TYPE_EQUAL);
SCL_ERR_JRET(vectorCompare(pCase, pWhen, &comp, TSDB_ORDER_ASC, OP_TYPE_EQUAL));
for (int32_t i = 0; i < rowNum; ++i) {
bool *equal = (bool *)colDataGetData(comp.columnData, (comp.numOfRows > 1 ? i : 0));
if (*equal) {
colDataSetVal(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)),
colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
SCL_ERR_JRET(colDataSetVal(output->columnData, i,
colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)),
colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0))));
if (0 == i && 1 == pCase->numOfRows && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
break;
@ -953,8 +968,9 @@ int32_t sclExecCaseWhen(SCaseWhenNode *node, SScalarCtx *ctx, SScalarParam *outp
for (int32_t i = 0; i < rowNum; ++i) {
bool *whenValue = (bool *)colDataGetData(pWhen->columnData, (pWhen->numOfRows > 1 ? i : 0));
if (*whenValue) {
colDataSetVal(output->columnData, i, colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)),
colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)));
SCL_ERR_JRET(colDataSetVal(output->columnData, i,
colDataGetData(pThen->columnData, (pThen->numOfRows > 1 ? i : 0)),
colDataIsNull_s(pThen->columnData, (pThen->numOfRows > 1 ? i : 0))));
if (0 == i && 1 == pWhen->numOfRows && 1 == pThen->numOfRows && rowNum > 1) {
SCL_ERR_JRET(sclExtendResRows(output, output, ctx->pBlockList));
break;
@ -1065,42 +1081,34 @@ static uint8_t sclGetOpValueNodeTsPrecision(SNode *pLeft, SNode *pRight) {
return 0;
}
int32_t sclConvertOpValueNodeTs(SOperatorNode *node, SScalarCtx *ctx) {
int32_t code = 0;
int32_t sclConvertOpValueNodeTs(SOperatorNode *node) {
if (node->pLeft && SCL_IS_VAR_VALUE_NODE(node->pLeft)) {
if (node->pRight && (TSDB_DATA_TYPE_TIMESTAMP == ((SExprNode *)node->pRight)->resType.type)) {
SCL_ERR_JRET(
SCL_ERR_RET(
sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, node->pRight), (SValueNode *)node->pLeft));
}
} else if (node->pRight && SCL_IS_NOTNULL_CONST_NODE(node->pRight)) {
if (node->pLeft && (TSDB_DATA_TYPE_TIMESTAMP == ((SExprNode *)node->pLeft)->resType.type)) {
if (SCL_IS_VAR_VALUE_NODE(node->pRight)) {
SCL_ERR_JRET(sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, node->pRight),
SCL_ERR_RET(sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, node->pRight),
(SValueNode *)node->pRight));
} else if (QUERY_NODE_NODE_LIST == node->pRight->type) {
SNode *pNode;
FOREACH(pNode, ((SNodeListNode *)node->pRight)->pNodeList) {
if (SCL_IS_VAR_VALUE_NODE(pNode)) {
SCL_ERR_JRET(
SCL_ERR_RET(
sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, pNode), (SValueNode *)pNode));
}
}
}
}
}
return TSDB_CODE_SUCCESS;
_return:
ctx->code = code;
return DEAL_RES_ERROR;
}
int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node, SScalarCtx *ctx) {
int32_t code = 0;
int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node) {
if (NULL == node->pCase) {
return TSDB_CODE_SUCCESS;
}
@ -1110,7 +1118,7 @@ int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node, SScalarCtx *ctx) {
FOREACH(pNode, node->pWhenThenList) {
SExprNode *pExpr = (SExprNode *)((SWhenThenNode *)pNode)->pWhen;
if (TSDB_DATA_TYPE_TIMESTAMP == pExpr->resType.type) {
SCL_ERR_JRET(sclConvertToTsValueNode(pExpr->resType.precision, (SValueNode *)node->pCase));
SCL_ERR_RET(sclConvertToTsValueNode(pExpr->resType.precision, (SValueNode *)node->pCase));
break;
}
}
@ -1118,18 +1126,13 @@ int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node, SScalarCtx *ctx) {
SNode *pNode;
FOREACH(pNode, node->pWhenThenList) {
if (SCL_IS_VAR_VALUE_NODE(((SWhenThenNode *)pNode)->pWhen)) {
SCL_ERR_JRET(sclConvertToTsValueNode(((SExprNode *)node->pCase)->resType.precision,
SCL_ERR_RET(sclConvertToTsValueNode(((SExprNode *)node->pCase)->resType.precision,
(SValueNode *)((SWhenThenNode *)pNode)->pWhen));
}
}
}
return TSDB_CODE_SUCCESS;
_return:
ctx->code = code;
return DEAL_RES_ERROR;
}
EDealRes sclRewriteNonConstOperator(SNode **pNode, SScalarCtx *ctx) {
@ -1227,14 +1230,30 @@ EDealRes sclRewriteFunction(SNode **pNode, SScalarCtx *ctx) {
if (type == TSDB_DATA_TYPE_JSON) {
int32_t len = getJsonValueLen(output.columnData->pData);
res->datum.p = taosMemoryCalloc(len, 1);
if (NULL == res->datum.p) {
sclError("calloc %d failed", len);
sclFreeParam(&output);
ctx->code = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
memcpy(res->datum.p, output.columnData->pData, len);
} else if (IS_VAR_DATA_TYPE(type)) {
// res->datum.p = taosMemoryCalloc(res->node.resType.bytes + VARSTR_HEADER_SIZE + 1, 1);
res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData) + 1, 1);
if (NULL == res->datum.p) {
sclError("calloc %d failed", (int)(varDataTLen(output.columnData->pData) + 1));
sclFreeParam(&output);
ctx->code = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
res->node.resType.bytes = varDataTLen(output.columnData->pData);
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
} else {
nodesSetValueNodeValue(res, output.columnData->pData);
ctx->code = nodesSetValueNodeValue(res, output.columnData->pData);
if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
}
}
@ -1271,13 +1290,17 @@ EDealRes sclRewriteLogic(SNode **pNode, SScalarCtx *ctx) {
res->node.resType = node->node.resType;
res->translate = true;
strcpy(res->node.aliasName, node->node.aliasName);
(void)strcpy(res->node.aliasName, node->node.aliasName);
int32_t type = output.columnData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
res->datum.p = output.columnData->pData;
output.columnData->pData = NULL;
} else {
nodesSetValueNodeValue(res, output.columnData->pData);
ctx->code = nodesSetValueNodeValue(res, output.columnData->pData);
if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
}
nodesDestroyNode(*pNode);
@ -1290,7 +1313,10 @@ EDealRes sclRewriteLogic(SNode **pNode, SScalarCtx *ctx) {
EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
SOperatorNode *node = (SOperatorNode *)*pNode;
SCL_ERR_RET(sclConvertOpValueNodeTs(node, ctx));
ctx->code = sclConvertOpValueNodeTs(node);
if (ctx->code) {
return DEAL_RES_ERROR;
}
if ((!SCL_IS_CONST_NODE(node->pLeft)) || (!SCL_IS_CONST_NODE(node->pRight))) {
return sclRewriteNonConstOperator(pNode, ctx);
@ -1324,7 +1350,11 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
res->datum.p = output.columnData->pData;
output.columnData->pData = NULL;
} else {
nodesSetValueNodeValue(res, output.columnData->pData);
ctx->code = nodesSetValueNodeValue(res, output.columnData->pData);
if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
}
}
@ -1338,7 +1368,10 @@ EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
EDealRes sclRewriteCaseWhen(SNode **pNode, SScalarCtx *ctx) {
SCaseWhenNode *node = (SCaseWhenNode *)*pNode;
SCL_ERR_RET(sclConvertCaseWhenValueNodeTs(node, ctx));
ctx->code = sclConvertCaseWhenValueNodeTs(node);
if (ctx->code) {
return DEAL_RES_ERROR;
}
if ((!SCL_IS_CONST_NODE(node->pCase)) || (!SCL_IS_CONST_NODE(node->pElse))) {
return DEAL_RES_CONTINUE;
@ -1378,9 +1411,19 @@ EDealRes sclRewriteCaseWhen(SNode **pNode, SScalarCtx *ctx) {
int32_t type = output.columnData->info.type;
if (IS_VAR_DATA_TYPE(type)) { // todo refactor
res->datum.p = taosMemoryCalloc(varDataTLen(output.columnData->pData) + 1, sizeof(char)); // add \0 to the end for print json value
if (NULL == res->datum.p) {
sclError("calloc %d failed", (int)(varDataTLen(output.columnData->pData) + 1));
sclFreeParam(&output);
ctx->code = TSDB_CODE_OUT_OF_MEMORY;
return DEAL_RES_ERROR;
}
memcpy(res->datum.p, output.columnData->pData, varDataTLen(output.columnData->pData));
} else {
nodesSetValueNodeValue(res, output.columnData->pData);
ctx->code = nodesSetValueNodeValue(res, output.columnData->pData);
if (ctx->code) {
sclFreeParam(&output);
return DEAL_RES_ERROR;
}
}
}
@ -1515,7 +1558,10 @@ EDealRes sclWalkTarget(SNode *pNode, SScalarCtx *ctx) {
return DEAL_RES_ERROR;
}
colDataAssign(col, res->columnData, res->numOfRows, NULL);
ctx->code = colDataAssign(col, res->columnData, res->numOfRows, NULL);
if (ctx->code) {
return DEAL_RES_ERROR;
}
block->info.rows = res->numOfRows;
sclFreeParam(res);
@ -1735,8 +1781,8 @@ int32_t scalarCalculate(SNode *pNode, SArray *pBlockList, SScalarParam *pDst) {
if (1 == res->numOfRows && pb->info.rows > 0) {
SCL_ERR_JRET(sclExtendResRows(pDst, res, pBlockList));
} else {
colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true);
colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL);
SCL_ERR_JRET(colInfoDataEnsureCapacity(pDst->columnData, res->numOfRows, true));
SCL_ERR_JRET(colDataAssign(pDst->columnData, res->columnData, res->numOfRows, NULL));
pDst->numOfRows = res->numOfRows;
pDst->numOfQualified = res->numOfQualified;
}