fix in issue
This commit is contained in:
parent
308f6a5682
commit
1a078d5d56
|
@ -2500,6 +2500,7 @@ typedef struct {
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
int64_t dstTbUid;
|
int64_t dstTbUid;
|
||||||
int32_t dstVgId; // for stream
|
int32_t dstVgId; // for stream
|
||||||
|
SEpSet epSet;
|
||||||
char* expr;
|
char* expr;
|
||||||
} STableIndexInfo;
|
} STableIndexInfo;
|
||||||
|
|
||||||
|
|
|
@ -197,6 +197,7 @@ typedef struct SAggFunctionInfo {
|
||||||
struct SScalarParam {
|
struct SScalarParam {
|
||||||
SColumnInfoData *columnData;
|
SColumnInfoData *columnData;
|
||||||
SHashObj *pHashFilter;
|
SHashObj *pHashFilter;
|
||||||
|
int32_t hashValueType;
|
||||||
void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
|
void *param; // other parameter, such as meta handle from vnode, to extract table name/tag value
|
||||||
int32_t numOfRows;
|
int32_t numOfRows;
|
||||||
};
|
};
|
||||||
|
|
|
@ -894,6 +894,15 @@ static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIn
|
||||||
info.sliding = pSma->sliding;
|
info.sliding = pSma->sliding;
|
||||||
info.dstTbUid = pSma->dstTbUid;
|
info.dstTbUid = pSma->dstTbUid;
|
||||||
info.dstVgId = pSma->dstVgId;
|
info.dstVgId = pSma->dstVgId;
|
||||||
|
|
||||||
|
SVgObj* pVg = mndAcquireVgroup(pMnode, pSma->dstVgId);
|
||||||
|
if (pVg == NULL) {
|
||||||
|
code = -1;
|
||||||
|
sdbRelease(pSdb, pSma);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
info.epSet = mndGetVgroupEpset(pMnode, pVg);
|
||||||
|
|
||||||
info.expr = taosMemoryMalloc(pSma->exprLen + 1);
|
info.expr = taosMemoryMalloc(pSma->exprLen + 1);
|
||||||
if (info.expr == NULL) {
|
if (info.expr == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
|
@ -22,11 +22,18 @@ extern "C" {
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
|
|
||||||
|
typedef struct SOperatorValueType {
|
||||||
|
int32_t opResType;
|
||||||
|
int32_t selfType;
|
||||||
|
int32_t peerType;
|
||||||
|
} SOperatorValueType;
|
||||||
|
|
||||||
typedef struct SScalarCtx {
|
typedef struct SScalarCtx {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
SArray *pBlockList; /* element is SSDataBlock* */
|
SArray *pBlockList; /* element is SSDataBlock* */
|
||||||
SHashObj *pRes; /* element is SScalarParam */
|
SHashObj *pRes; /* element is SScalarParam */
|
||||||
void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values
|
void *param; // additional parameter (meta actually) for acquire value such as tbname/tags values
|
||||||
|
SOperatorValueType type;
|
||||||
} SScalarCtx;
|
} SScalarCtx;
|
||||||
|
|
||||||
|
|
||||||
|
@ -53,7 +60,7 @@ int32_t doConvertDataType(SValueNode* pValueNode, SScalarParam* out);
|
||||||
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows);
|
SColumnInfoData* createColumnInfoData(SDataType* pType, int32_t numOfRows);
|
||||||
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
|
int32_t sclConvertToTsValueNode(int8_t precision, SValueNode* valueNode);
|
||||||
|
|
||||||
#define GET_PARAM_TYPE(_c) ((_c)->columnData->info.type)
|
#define GET_PARAM_TYPE(_c) ((_c)->columnData ? (_c)->columnData->info.type : (_c)->hashValueType)
|
||||||
#define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes)
|
#define GET_PARAM_BYTES(_c) ((_c)->columnData->info.bytes)
|
||||||
#define GET_PARAM_PRECISON(_c) ((_c)->columnData->info.precision)
|
#define GET_PARAM_PRECISON(_c) ((_c)->columnData->info.precision)
|
||||||
|
|
||||||
|
|
|
@ -208,7 +208,13 @@ int32_t sclInitParam(SNode* node, SScalarParam *param, SScalarCtx *ctx, int32_t
|
||||||
SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCL_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCL_ERR_RET(scalarGenerateSetFromList((void **)¶m->pHashFilter, node, nodeList->dataType.type));
|
int32_t type = vectorGetConvertType(ctx->type.selfType, ctx->type.peerType);
|
||||||
|
if (type == 0) {
|
||||||
|
type = nodeList->dataType.type;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCL_ERR_RET(scalarGenerateSetFromList((void **)¶m->pHashFilter, node, type));
|
||||||
|
param->hashValueType = type;
|
||||||
if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
|
if (taosHashPut(ctx->pRes, &node, POINTER_BYTES, param, sizeof(*param))) {
|
||||||
taosHashCleanup(param->pHashFilter);
|
taosHashCleanup(param->pHashFilter);
|
||||||
sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
|
sclError("taosHashPut nodeList failed, size:%d", (int32_t)sizeof(*param));
|
||||||
|
@ -334,6 +340,46 @@ _return:
|
||||||
SCL_RET(code);
|
SCL_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) {
|
||||||
|
if (NULL == pNode) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
switch (nodeType(pNode)) {
|
||||||
|
case QUERY_NODE_VALUE: {
|
||||||
|
SValueNode *valueNode = (SValueNode *)pNode;
|
||||||
|
return valueNode->node.resType.type;
|
||||||
|
}
|
||||||
|
case QUERY_NODE_NODE_LIST: {
|
||||||
|
SNodeListNode *nodeList = (SNodeListNode *)pNode;
|
||||||
|
return nodeList->dataType.type;
|
||||||
|
}
|
||||||
|
case QUERY_NODE_COLUMN: {
|
||||||
|
SColumnNode *colNode = (SColumnNode *)pNode;
|
||||||
|
return colNode->node.resType.type;
|
||||||
|
}
|
||||||
|
case QUERY_NODE_FUNCTION:
|
||||||
|
case QUERY_NODE_OPERATOR:
|
||||||
|
case QUERY_NODE_LOGIC_CONDITION: {
|
||||||
|
SScalarParam *res = (SScalarParam *)taosHashGet(ctx->pRes, &pNode, POINTER_BYTES);
|
||||||
|
if (NULL == res) {
|
||||||
|
sclError("no result for node, type:%d, node:%p", nodeType(pNode), pNode);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return res->columnData->info.type;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void sclSetOperatorValueType(SOperatorNode *node, SScalarCtx *ctx) {
|
||||||
|
ctx->type.opResType = node->node.resType.type;
|
||||||
|
ctx->type.selfType = sclGetNodeType(node->pLeft, ctx);
|
||||||
|
ctx->type.peerType = sclGetNodeType(node->pRight, ctx);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
|
int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScalarCtx *ctx, int32_t *rowNum) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t paramNum = scalarGetOperatorParamNum(node->opType);
|
int32_t paramNum = scalarGetOperatorParamNum(node->opType);
|
||||||
|
@ -348,8 +394,11 @@ int32_t sclInitOperatorParams(SScalarParam **pParams, SOperatorNode *node, SScal
|
||||||
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCL_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sclSetOperatorValueType(node, ctx);
|
||||||
|
|
||||||
SCL_ERR_JRET(sclInitParam(node->pLeft, ¶mList[0], ctx, rowNum));
|
SCL_ERR_JRET(sclInitParam(node->pLeft, ¶mList[0], ctx, rowNum));
|
||||||
if (paramNum > 1) {
|
if (paramNum > 1) {
|
||||||
|
TSWAP(ctx->type.selfType, ctx->type.peerType);
|
||||||
SCL_ERR_JRET(sclInitParam(node->pRight, ¶mList[1], ctx, rowNum));
|
SCL_ERR_JRET(sclInitParam(node->pRight, ¶mList[1], ctx, rowNum));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -627,6 +627,11 @@ int32_t vectorConvertImpl(const SScalarParam* pIn, SScalarParam* pOut) {
|
||||||
SColumnInfoData* pInputCol = pIn->columnData;
|
SColumnInfoData* pInputCol = pIn->columnData;
|
||||||
SColumnInfoData* pOutputCol = pOut->columnData;
|
SColumnInfoData* pOutputCol = pOut->columnData;
|
||||||
|
|
||||||
|
if (NULL == pInputCol) {
|
||||||
|
sclError("input column is NULL, hashFilter %p", pIn->pHashFilter);
|
||||||
|
return TSDB_CODE_APP_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
int16_t inType = pInputCol->info.type;
|
int16_t inType = pInputCol->info.type;
|
||||||
int16_t outType = pOutputCol->info.type;
|
int16_t outType = pOutputCol->info.type;
|
||||||
|
|
||||||
|
@ -826,11 +831,26 @@ int32_t vectorGetConvertType(int32_t type1, int32_t type2) {
|
||||||
return gConvertTypes[type2][type1];
|
return gConvertTypes[type2][type1];
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* pLeftOut, SScalarParam* pRightOut) {
|
int32_t vectorConvertScalarParam(SScalarParam *input, SScalarParam *output, int32_t type) {
|
||||||
if (pLeft->pHashFilter != NULL || pRight->pHashFilter != NULL) {
|
int32_t code = 0;
|
||||||
|
SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
|
||||||
|
output->numOfRows = input->numOfRows;
|
||||||
|
|
||||||
|
output->columnData = createColumnInfoData(&t, input->numOfRows);
|
||||||
|
if (output->columnData == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = vectorConvertImpl(input, output);
|
||||||
|
if (code) {
|
||||||
|
// taosMemoryFreeClear(paramOut1->data);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* pLeftOut, SScalarParam* pRightOut) {
|
||||||
int32_t leftType = GET_PARAM_TYPE(pLeft);
|
int32_t leftType = GET_PARAM_TYPE(pLeft);
|
||||||
int32_t rightType = GET_PARAM_TYPE(pRight);
|
int32_t rightType = GET_PARAM_TYPE(pRight);
|
||||||
if (leftType == rightType) {
|
if (leftType == rightType) {
|
||||||
|
@ -859,34 +879,11 @@ int32_t vectorConvert(SScalarParam* pLeft, SScalarParam* pRight, SScalarParam* p
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type != GET_PARAM_TYPE(param1)) {
|
if (type != GET_PARAM_TYPE(param1)) {
|
||||||
SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
|
return vectorConvertScalarParam(param1, paramOut1, type);
|
||||||
paramOut1->numOfRows = param1->numOfRows;
|
|
||||||
|
|
||||||
paramOut1->columnData = createColumnInfoData(&t, param1->numOfRows);
|
|
||||||
if (paramOut1->columnData == NULL) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = vectorConvertImpl(param1, paramOut1);
|
|
||||||
if (code) {
|
|
||||||
// taosMemoryFreeClear(paramOut1->data);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (type != GET_PARAM_TYPE(param2)) {
|
if (type != GET_PARAM_TYPE(param2)) {
|
||||||
SDataType t = {.type = type, .bytes = tDataTypes[type].bytes};
|
return vectorConvertScalarParam(param2, paramOut2, type);
|
||||||
paramOut2->numOfRows = param2->numOfRows;
|
|
||||||
|
|
||||||
paramOut2->columnData = createColumnInfoData(&t, param2->numOfRows);
|
|
||||||
if (paramOut2->columnData == NULL) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = vectorConvertImpl(param2, paramOut2);
|
|
||||||
if (code) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue