fix: create stream does not support event_window
This commit is contained in:
parent
af53a3a226
commit
c4791d58a6
|
@ -183,16 +183,18 @@ static int32_t calcConstProject(SNode* pProject, bool dual, SNode** pNew) {
|
|||
} else {
|
||||
code = scalarCalculateConstants(pProject, pNew);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_VALUE == nodeType(*pNew) && NULL != pAssociation) {
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
strcpy(((SExprNode*)*pNew)->aliasName, aliasName);
|
||||
int32_t size = taosArrayGetSize(pAssociation);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SNode** pCol = taosArrayGetP(pAssociation, i);
|
||||
nodesDestroyNode(*pCol);
|
||||
*pCol = nodesCloneNode(*pNew);
|
||||
if (NULL == *pCol) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
break;
|
||||
if (QUERY_NODE_VALUE == nodeType(*pNew) && NULL != pAssociation) {
|
||||
int32_t size = taosArrayGetSize(pAssociation);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
SNode** pCol = taosArrayGetP(pAssociation, i);
|
||||
nodesDestroyNode(*pCol);
|
||||
*pCol = nodesCloneNode(*pNew);
|
||||
if (NULL == *pCol) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -383,7 +385,8 @@ static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator*
|
|||
int32_t index = 0;
|
||||
SNode* pProj = NULL;
|
||||
WHERE_EACH(pProj, pSetOp->pProjectionList) {
|
||||
if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) && isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) {
|
||||
if (subquery && notRefByOrderBy((SColumnNode*)pProj, pSetOp->pOrderByList) &&
|
||||
isSetUselessCol(pSetOp, index, (SExprNode*)pProj)) {
|
||||
ERASE_NODE(pSetOp->pProjectionList);
|
||||
eraseSetOpChildProjection(pSetOp, index);
|
||||
continue;
|
||||
|
|
|
@ -1342,8 +1342,8 @@ static bool isCountNotNullValue(SFunctionNode* pFunc) {
|
|||
// count(1) is rewritten as count(ts) for scannning optimization
|
||||
static int32_t rewriteCountNotNullValue(STranslateContext* pCxt, SFunctionNode* pCount) {
|
||||
SValueNode* pValue = (SValueNode*)nodesListGetNode(pCount->pParameterList, 0);
|
||||
STableNode* pTable = NULL;
|
||||
int32_t code = findTable(pCxt, NULL, &pTable);
|
||||
STableNode* pTable = NULL;
|
||||
int32_t code = findTable(pCxt, NULL, &pTable);
|
||||
if (TSDB_CODE_SUCCESS == code && QUERY_NODE_REAL_TABLE == nodeType(pTable)) {
|
||||
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
|
@ -5923,11 +5923,15 @@ static int32_t addSubtableInfoToCreateStreamQuery(STranslateContext* pCxt, STabl
|
|||
return code;
|
||||
}
|
||||
|
||||
static bool isEventWindowQuery(SSelectStmt* pSelect) {
|
||||
return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow);
|
||||
}
|
||||
|
||||
static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
||||
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
||||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
|
||||
crossTableWithUdaf(pSelect)) {
|
||||
crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect)) {
|
||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||
}
|
||||
if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) {
|
||||
|
|
Loading…
Reference in New Issue