Merge pull request #17656 from taosdata/fix/3.0_bugfix_wxy

fix: stream supports 'partition by tbname, column'
This commit is contained in:
Hongze Cheng 2022-10-26 11:16:51 +08:00 committed by GitHub
commit b2affd12b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 10 additions and 8 deletions

View File

@ -3036,12 +3036,14 @@ static EDealRes checkStateExpr(SNode* pNode, void* pContext) {
return DEAL_RES_CONTINUE; return DEAL_RES_CONTINUE;
} }
static bool isPartitionByTbname(SNodeList* pPartitionByList) { static bool hasPartitionByTbname(SNodeList* pPartitionByList) {
if (1 != LIST_LENGTH(pPartitionByList)) { SNode* pPartKey = NULL;
return false; FOREACH(pPartKey, pPartitionByList) {
if (QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType) {
return true;
}
} }
SNode* pPartKey = nodesListGetNode(pPartitionByList, 0); return false;
return QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType;
} }
static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) {
@ -3049,7 +3051,7 @@ static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!isPartitionByTbname(pSelect->pPartitionByList)) { !hasPartitionByTbname(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -5343,12 +5345,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm
static bool crossTableWithoutAggOper(SSelectStmt* pSelect) { static bool crossTableWithoutAggOper(SSelectStmt* pSelect) {
return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc &&
!pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && !pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!isPartitionByTbname(pSelect->pPartitionByList); !hasPartitionByTbname(pSelect->pPartitionByList);
} }
static bool crossTableWithUdaf(SSelectStmt* pSelect) { static bool crossTableWithUdaf(SSelectStmt* pSelect) {
return pSelect->hasUdaf && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && return pSelect->hasUdaf && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!isPartitionByTbname(pSelect->pPartitionByList); !hasPartitionByTbname(pSelect->pPartitionByList);
} }
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {