fix: add _wstart when _wstart does not appear in select projection list
This commit is contained in:
parent
577aee289b
commit
ea5ab9f8fd
|
@ -126,6 +126,7 @@ SArray* extractPartitionColInfo(SNodeList* pNodeList);
|
||||||
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNodeList, int32_t* numOfOutputCols,
|
||||||
int32_t type);
|
int32_t type);
|
||||||
|
|
||||||
|
void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode);
|
||||||
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
|
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs);
|
||||||
|
|
||||||
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset);
|
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowEntryInfoOffset);
|
||||||
|
|
|
@ -1012,6 +1012,100 @@ static SColumn* createColumn(int32_t blockId, int32_t slotId, int32_t colId, SDa
|
||||||
return pCol;
|
return pCol;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void createExprFromTargetNode(SExprInfo* pExp, STargetNode* pTargetNode) {
|
||||||
|
pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
|
||||||
|
pExp->pExpr->_function.num = 1;
|
||||||
|
pExp->pExpr->_function.functionId = -1;
|
||||||
|
|
||||||
|
int32_t type = nodeType(pTargetNode->pExpr);
|
||||||
|
// it is a project query, or group by column
|
||||||
|
if (type == QUERY_NODE_COLUMN) {
|
||||||
|
pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
|
||||||
|
SColumnNode* pColNode = (SColumnNode*)pTargetNode->pExpr;
|
||||||
|
|
||||||
|
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||||
|
pExp->base.numOfParams = 1;
|
||||||
|
|
||||||
|
SDataType* pType = &pColNode->node.resType;
|
||||||
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
||||||
|
pType->precision, pColNode->colName);
|
||||||
|
pExp->base.pParam[0].pCol =
|
||||||
|
createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
|
||||||
|
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
||||||
|
} else if (type == QUERY_NODE_VALUE) {
|
||||||
|
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
|
||||||
|
SValueNode* pValNode = (SValueNode*)pTargetNode->pExpr;
|
||||||
|
|
||||||
|
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||||
|
pExp->base.numOfParams = 1;
|
||||||
|
|
||||||
|
SDataType* pType = &pValNode->node.resType;
|
||||||
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
||||||
|
pType->precision, pValNode->node.aliasName);
|
||||||
|
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
|
||||||
|
nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
|
||||||
|
} else if (type == QUERY_NODE_FUNCTION) {
|
||||||
|
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
|
||||||
|
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
|
||||||
|
|
||||||
|
SDataType* pType = &pFuncNode->node.resType;
|
||||||
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
||||||
|
pType->precision, pFuncNode->node.aliasName);
|
||||||
|
|
||||||
|
pExp->pExpr->_function.functionId = pFuncNode->funcId;
|
||||||
|
pExp->pExpr->_function.pFunctNode = pFuncNode;
|
||||||
|
|
||||||
|
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
|
||||||
|
tListLen(pExp->pExpr->_function.functionName));
|
||||||
|
#if 1
|
||||||
|
// todo refactor: add the parameter for tbname function
|
||||||
|
if (!pFuncNode->pParameterList && (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0)) {
|
||||||
|
pFuncNode->pParameterList = nodesMakeList();
|
||||||
|
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
||||||
|
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
||||||
|
if (NULL == res) { // todo handle error
|
||||||
|
} else {
|
||||||
|
res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
||||||
|
nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
||||||
|
|
||||||
|
pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
|
||||||
|
pExp->base.numOfParams = numOfParam;
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfParam; ++j) {
|
||||||
|
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
|
||||||
|
if (p1->type == QUERY_NODE_COLUMN) {
|
||||||
|
SColumnNode* pcn = (SColumnNode*)p1;
|
||||||
|
|
||||||
|
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
|
||||||
|
pExp->base.pParam[j].pCol =
|
||||||
|
createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
|
||||||
|
} else if (p1->type == QUERY_NODE_VALUE) {
|
||||||
|
SValueNode* pvn = (SValueNode*)p1;
|
||||||
|
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
||||||
|
nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (type == QUERY_NODE_OPERATOR) {
|
||||||
|
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
|
||||||
|
SOperatorNode* pNode = (SOperatorNode*)pTargetNode->pExpr;
|
||||||
|
|
||||||
|
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
||||||
|
pExp->base.numOfParams = 1;
|
||||||
|
|
||||||
|
SDataType* pType = &pNode->node.resType;
|
||||||
|
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
||||||
|
pType->precision, pNode->node.aliasName);
|
||||||
|
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) {
|
SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t* numOfExprs) {
|
||||||
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
|
int32_t numOfFuncs = LIST_LENGTH(pNodeList);
|
||||||
int32_t numOfGroupKeys = 0;
|
int32_t numOfGroupKeys = 0;
|
||||||
|
@ -1035,98 +1129,7 @@ SExprInfo* createExprInfo(SNodeList* pNodeList, SNodeList* pGroupKeys, int32_t*
|
||||||
}
|
}
|
||||||
|
|
||||||
SExprInfo* pExp = &pExprs[i];
|
SExprInfo* pExp = &pExprs[i];
|
||||||
|
createExprFromTargetNode(pExp, pTargetNode);
|
||||||
pExp->pExpr = taosMemoryCalloc(1, sizeof(tExprNode));
|
|
||||||
pExp->pExpr->_function.num = 1;
|
|
||||||
pExp->pExpr->_function.functionId = -1;
|
|
||||||
|
|
||||||
int32_t type = nodeType(pTargetNode->pExpr);
|
|
||||||
// it is a project query, or group by column
|
|
||||||
if (type == QUERY_NODE_COLUMN) {
|
|
||||||
pExp->pExpr->nodeType = QUERY_NODE_COLUMN;
|
|
||||||
SColumnNode* pColNode = (SColumnNode*)pTargetNode->pExpr;
|
|
||||||
|
|
||||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
|
||||||
pExp->base.numOfParams = 1;
|
|
||||||
|
|
||||||
SDataType* pType = &pColNode->node.resType;
|
|
||||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
|
||||||
pType->precision, pColNode->colName);
|
|
||||||
pExp->base.pParam[0].pCol =
|
|
||||||
createColumn(pColNode->dataBlockId, pColNode->slotId, pColNode->colId, pType, pColNode->colType);
|
|
||||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_COLUMN;
|
|
||||||
} else if (type == QUERY_NODE_VALUE) {
|
|
||||||
pExp->pExpr->nodeType = QUERY_NODE_VALUE;
|
|
||||||
SValueNode* pValNode = (SValueNode*)pTargetNode->pExpr;
|
|
||||||
|
|
||||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
|
||||||
pExp->base.numOfParams = 1;
|
|
||||||
|
|
||||||
SDataType* pType = &pValNode->node.resType;
|
|
||||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
|
||||||
pType->precision, pValNode->node.aliasName);
|
|
||||||
pExp->base.pParam[0].type = FUNC_PARAM_TYPE_VALUE;
|
|
||||||
nodesValueNodeToVariant(pValNode, &pExp->base.pParam[0].param);
|
|
||||||
} else if (type == QUERY_NODE_FUNCTION) {
|
|
||||||
pExp->pExpr->nodeType = QUERY_NODE_FUNCTION;
|
|
||||||
SFunctionNode* pFuncNode = (SFunctionNode*)pTargetNode->pExpr;
|
|
||||||
|
|
||||||
SDataType* pType = &pFuncNode->node.resType;
|
|
||||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
|
||||||
pType->precision, pFuncNode->node.aliasName);
|
|
||||||
|
|
||||||
pExp->pExpr->_function.functionId = pFuncNode->funcId;
|
|
||||||
pExp->pExpr->_function.pFunctNode = pFuncNode;
|
|
||||||
|
|
||||||
strncpy(pExp->pExpr->_function.functionName, pFuncNode->functionName,
|
|
||||||
tListLen(pExp->pExpr->_function.functionName));
|
|
||||||
#if 1
|
|
||||||
// todo refactor: add the parameter for tbname function
|
|
||||||
if (!pFuncNode->pParameterList && (strcmp(pExp->pExpr->_function.functionName, "tbname") == 0)) {
|
|
||||||
pFuncNode->pParameterList = nodesMakeList();
|
|
||||||
ASSERT(LIST_LENGTH(pFuncNode->pParameterList) == 0);
|
|
||||||
SValueNode* res = (SValueNode*)nodesMakeNode(QUERY_NODE_VALUE);
|
|
||||||
if (NULL == res) { // todo handle error
|
|
||||||
} else {
|
|
||||||
res->node.resType = (SDataType){.bytes = sizeof(int64_t), .type = TSDB_DATA_TYPE_BIGINT};
|
|
||||||
nodesListAppend(pFuncNode->pParameterList, (SNode*)res);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t numOfParam = LIST_LENGTH(pFuncNode->pParameterList);
|
|
||||||
|
|
||||||
pExp->base.pParam = taosMemoryCalloc(numOfParam, sizeof(SFunctParam));
|
|
||||||
pExp->base.numOfParams = numOfParam;
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfParam; ++j) {
|
|
||||||
SNode* p1 = nodesListGetNode(pFuncNode->pParameterList, j);
|
|
||||||
if (p1->type == QUERY_NODE_COLUMN) {
|
|
||||||
SColumnNode* pcn = (SColumnNode*)p1;
|
|
||||||
|
|
||||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_COLUMN;
|
|
||||||
pExp->base.pParam[j].pCol =
|
|
||||||
createColumn(pcn->dataBlockId, pcn->slotId, pcn->colId, &pcn->node.resType, pcn->colType);
|
|
||||||
} else if (p1->type == QUERY_NODE_VALUE) {
|
|
||||||
SValueNode* pvn = (SValueNode*)p1;
|
|
||||||
pExp->base.pParam[j].type = FUNC_PARAM_TYPE_VALUE;
|
|
||||||
nodesValueNodeToVariant(pvn, &pExp->base.pParam[j].param);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (type == QUERY_NODE_OPERATOR) {
|
|
||||||
pExp->pExpr->nodeType = QUERY_NODE_OPERATOR;
|
|
||||||
SOperatorNode* pNode = (SOperatorNode*)pTargetNode->pExpr;
|
|
||||||
|
|
||||||
pExp->base.pParam = taosMemoryCalloc(1, sizeof(SFunctParam));
|
|
||||||
pExp->base.numOfParams = 1;
|
|
||||||
|
|
||||||
SDataType* pType = &pNode->node.resType;
|
|
||||||
pExp->base.resSchema = createResSchema(pType->type, pType->bytes, pTargetNode->slotId, pType->scale,
|
|
||||||
pType->precision, pNode->node.aliasName);
|
|
||||||
pExp->pExpr->_optrRoot.pRootNode = pTargetNode->pExpr;
|
|
||||||
} else {
|
|
||||||
ASSERT(0);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pExprs;
|
return pExprs;
|
||||||
|
|
|
@ -3439,6 +3439,44 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
|
||||||
|
if (pInfo->numOfNotFillExpr == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < pInfo->numOfNotFillExpr; ++i) {
|
||||||
|
SExprInfo* exprInfo = pInfo->pNotFillExprInfo + i;
|
||||||
|
if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN &&
|
||||||
|
exprInfo->base.numOfParams == 1 &&
|
||||||
|
exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t createWStartTsAsNotFillExpr(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode) {
|
||||||
|
bool wstartExist = isWstartColumnExist(pInfo);
|
||||||
|
if (wstartExist == false) {
|
||||||
|
if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
|
||||||
|
qError("pWStartTs of fill physical node is not a target node");
|
||||||
|
return TSDB_CODE_QRY_SYS_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
|
SExprInfo* notFillExprs = taosMemoryRealloc(pInfo->pNotFillExprInfo, (pInfo->numOfNotFillExpr + 1) * sizeof(SExprInfo));
|
||||||
|
if (notFillExprs == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
createExprFromTargetNode(notFillExprs + pInfo->numOfNotFillExpr, (STargetNode*)pPhyFillNode->pWStartTs);
|
||||||
|
|
||||||
|
++pInfo->numOfNotFillExpr;
|
||||||
|
pInfo->pNotFillExprInfo = notFillExprs;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
|
SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
|
SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
|
||||||
|
@ -3450,7 +3488,10 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
|
SSDataBlock* pResBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc);
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr);
|
||||||
pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
|
pInfo->pNotFillExprInfo = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pInfo->numOfNotFillExpr);
|
||||||
|
int32_t code = createWStartTsAsNotFillExpr(pInfo, pPhyFillNode);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
SInterval* pInterval =
|
SInterval* pInterval =
|
||||||
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
|
||||||
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
|
||||||
|
@ -3471,7 +3512,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
|
||||||
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
|
SArray* pColMatchColInfo = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc,
|
||||||
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
&numOfOutputCols, COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
int32_t code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr,
|
code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pInfo->pNotFillExprInfo, pInfo->numOfNotFillExpr,
|
||||||
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
|
(SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
|
||||||
pTaskInfo->id.str, pInterval, type, order);
|
pTaskInfo->id.str, pInterval, type, order);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue