From 807bba9215f5bb639ec7b9633f20f5f8bf1c053d Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 22 Feb 2024 18:59:10 +0800 Subject: [PATCH] fix: window join issues --- include/libs/nodes/plannodes.h | 2 + include/util/taoserror.h | 2 + source/libs/executor/inc/mergejoin.h | 7 +- source/libs/executor/src/mergejoin.c | 48 ++-- source/libs/executor/src/mergejoinoperator.c | 12 +- source/libs/nodes/src/nodesCloneFuncs.c | 1 + source/libs/nodes/src/nodesCodeFuncs.c | 7 + source/libs/nodes/src/nodesMsgFuncs.c | 9 +- source/libs/parser/src/parTranslater.c | 190 +++++++++++++++- source/libs/parser/src/parUtil.c | 2 + source/libs/planner/inc/planInt.h | 1 + source/libs/planner/src/planLogicCreater.c | 52 ++++- source/libs/planner/src/planPhysiCreater.c | 1 + source/libs/planner/src/planSpliter.c | 21 -- source/libs/planner/src/planUtil.c | 24 ++ source/util/src/terror.c | 3 + tests/script/tsim/join/left_win_join.sim | 217 +++++++++++++++++++ 17 files changed, 539 insertions(+), 60 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 90d58868dc..9b9903242b 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -140,6 +140,7 @@ typedef struct SJoinLogicNode { bool isSingleTableJoin; bool hasSubQuery; bool isLowLevelJoin; + bool seqWinGroup; } SJoinLogicNode; typedef struct SAggLogicNode { @@ -499,6 +500,7 @@ typedef struct SSortMergeJoinPhysiNode { SNode* pFullOnCond; SNodeList* pTargets; SQueryStat inputStat[2]; + bool seqWinGroup; } SSortMergeJoinPhysiNode; typedef struct SHashJoinPhysiNode { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index da44481dd5..ec05fb7783 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -758,6 +758,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D) #define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E) #define TSDB_CODE_PAR_ORDERBY_AMBIGUOUS TAOS_DEF_ERROR_CODE(0, 0x266F) +#define TSDB_CODE_PAR_GRP_WINDOW_NOT_ALLOWED TAOS_DEF_ERROR_CODE(0, 0x2670) +#define TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR TAOS_DEF_ERROR_CODE(0, 0x2671) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) //planner diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index 13660fd268..b8f1ea45a0 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -167,6 +167,7 @@ typedef struct SMJoinGrpRows { SSDataBlock* finBlk; \ bool lastEqGrp; \ bool lastProbeGrp; \ + bool seqWinGrp; \ int32_t blkThreshold; \ int64_t jLimit @@ -182,6 +183,7 @@ typedef struct SMJoinMergeCtx { SSDataBlock* finBlk; bool lastEqGrp; bool lastProbeGrp; + bool seqWinGrp; int32_t blkThreshold; int64_t jLimit; // KEEP IT FIRST @@ -224,6 +226,7 @@ typedef struct SMJoinWindowCtx { SSDataBlock* finBlk; bool lastEqGrp; bool lastProbeGrp; + bool seqWinGrp; int32_t blkThreshold; int64_t jLimit; // KEEP IT FIRST @@ -231,11 +234,11 @@ typedef struct SMJoinWindowCtx { int32_t asofOpType; int64_t winBeginOffset; int64_t winEndOffset; - bool winProjection; bool lowerRowsAcq; bool eqRowsAcq; bool greaterRowsAcq; + int64_t seqGrpId; int64_t winBeginTs; int64_t winEndTs; bool eqPostDone; @@ -420,7 +423,7 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond); int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx); int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp); -int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp); +int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp, bool singleProbeRow); int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin); int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx); int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx); diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index c33f4e4a2d..e9234d96ba 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -34,15 +34,20 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { SMJoinWinCache* cache = &pCtx->cache; int32_t buildGrpNum = taosArrayGetSize(cache->grps); int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit); + + pCtx->finBlk->info.id.groupId = pCtx->seqWinGrp ? pCtx->seqGrpId : 0; + if (buildGrpNum <= 0 || buildTotalRows <= 0) { - return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true); + MJ_ERR_RET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, pCtx->seqWinGrp)); + pCtx->seqGrpId++; + return TSDB_CODE_SUCCESS; } SMJoinGrpRows* probeGrp = &pCtx->probeGrp; int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); int32_t probeEndIdx = probeGrp->endIdx; - if (0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) { + if ((!pCtx->seqWinGrp) && 0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) { SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0); if (pFirstBuild->readIdx == pFirstBuild->beginIdx) { for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) { @@ -82,6 +87,11 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { if (cache->grpIdx >= buildGrpNum) { cache->grpIdx = 0; ++probeGrp->readIdx; + pCtx->seqGrpId++; + + if (pCtx->seqWinGrp) { + break; + } } if (rowsLeft <= 0) { @@ -489,7 +499,7 @@ static FORCE_INLINE int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); } - return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true); + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false); } SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { @@ -575,7 +585,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true)); + MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -812,7 +822,7 @@ static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); } - return pCtx->lastProbeGrp ? mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true) : mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false); + return pCtx->lastProbeGrp ? mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false) : mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false); } static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { @@ -1170,7 +1180,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true)); + MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -1191,7 +1201,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) { pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; - MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false)); + MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -1512,7 +1522,7 @@ static FORCE_INLINE int32_t mAntiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) { return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); } - return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true); + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false); } static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) { @@ -1766,7 +1776,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) { pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; - MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true)); + MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -1877,7 +1887,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { if (NULL == pCtx->cache.outBlk || pCtx->cache.outBlk->info.rows <= 0) { - return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true); + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, false); } int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; @@ -2523,7 +2533,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; - MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true)); + MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, false)); pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; @@ -2862,10 +2872,6 @@ int32_t mWinJoinMoveFillWinCache(SMJoinWindowCtx* pCtx) { return TSDB_CODE_SUCCESS; } -int32_t mWinJoinDumpWinCache(SMJoinWindowCtx* pCtx) { - return pCtx->winProjection ? mWinJoinDumpGrpCache(pCtx) : TSDB_CODE_SUCCESS; -} - SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; @@ -2876,8 +2882,8 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { blockDataCleanup(pCtx->finBlk); if (pCtx->grpRemains) { - MJ_ERR_JRET(mWinJoinDumpWinCache(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx)); + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) { return pCtx->finBlk; } pCtx->grpRemains = false; @@ -2902,9 +2908,9 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { MJ_ERR_JRET(mWinJoinMoveFillWinCache(pCtx)); } - MJ_ERR_JRET(mWinJoinDumpWinCache(pCtx)); + MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx)); - if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { + if (pCtx->finBlk->info.rows >= pCtx->blkThreshold || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) { return pCtx->finBlk; } } @@ -2947,6 +2953,8 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p pCtx->pJoin = pJoin; pCtx->lastTs = INT64_MIN; + pCtx->seqWinGrp = pJoinNode->seqWinGroup; + pCtx->seqGrpId = 1; switch (pJoinNode->subType) { case JOIN_STYPE_ASOF: @@ -2967,7 +2975,6 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p SValueNode* pWinBegin = (SValueNode*)pOffsetNode->pStartOffset; SValueNode* pWinEnd = (SValueNode*)pOffsetNode->pEndOffset; pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : INT64_MAX; - pCtx->winProjection = true; pCtx->winBeginOffset = pWinBegin->datum.i; pCtx->winEndOffset = pWinEnd->datum.i; pCtx->eqRowsAcq = (pCtx->winBeginOffset <= 0 && pCtx->winEndOffset >= 0); @@ -3007,6 +3014,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ pCtx->pJoin = pJoin; pCtx->lastEqTs = INT64_MIN; pCtx->hashCan = pJoin->probe->keyNum > 0; + if (JOIN_STYPE_ASOF == pJoinNode->subType || JOIN_STYPE_WIN == pJoinNode->subType) { pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; pJoin->subType = JOIN_STYPE_OUTER; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 9aa305c64e..afb82021d7 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -451,7 +451,7 @@ int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app } -int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp) { +int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp, bool singleProbeRow) { pCtx->lastEqGrp = false; pCtx->lastProbeGrp = probeGrp; @@ -461,6 +461,10 @@ int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp return TSDB_CODE_SUCCESS; } + if (probeGrp && singleProbeRow) { + rowsLeft = 1; + } + if (GRP_REMAIN_ROWS(pGrp) <= rowsLeft) { MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp)); pGrp->readIdx = pGrp->endIdx + 1; @@ -636,7 +640,7 @@ int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnI break; } - return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true); + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false); } int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { @@ -655,7 +659,7 @@ int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum break; } - return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false); + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false); } @@ -844,7 +848,7 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType))) - || (JOIN_STYPE_WIN == pJoin->subType && !WIN_ONLY_EQ_ROW_INCLUDED(((SWindowOffsetNode*)pJoinNode->pWindowOffset)->pStartOffset, ((SWindowOffsetNode*)pJoinNode->pWindowOffset)->pEndOffset))) { + || (JOIN_STYPE_WIN == pJoin->subType)) { return mJoinInitWindowCtx(pJoin, pJoinNode); } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 654c8f8336..b49290f330 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -484,6 +484,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { CLONE_NODE_FIELD(pFullOnCond); COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(hasSubQuery); + COPY_SCALAR_FIELD(seqWinGroup); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 854101a726..fdf1cdf149 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2153,6 +2153,7 @@ static const char* jkJoinPhysiPlanLeftInputRowNum = "LeftInputRowNum"; static const char* jkJoinPhysiPlanRightInputRowNum = "RightInputRowNum"; static const char* jkJoinPhysiPlanLeftInputRowSize = "LeftInputRowSize"; static const char* jkJoinPhysiPlanRightInputRowSize = "RightInputRowSize"; +static const char* jkJoinPhysiPlanSeqWinGroup = "seqWinGroup"; static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; @@ -2212,6 +2213,9 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkJoinPhysiPlanSeqWinGroup, pNode->seqWinGroup); + } return code; } @@ -2274,6 +2278,9 @@ static int32_t jsonToPhysiMergeJoinNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize, code); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkJoinPhysiPlanSeqWinGroup, &pNode->seqWinGroup); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index db014f67d2..befbb6690f 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2475,7 +2475,8 @@ enum { PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM0, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1, - PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1 + PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1, + PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP }; static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { @@ -2536,6 +2537,9 @@ static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1, pNode->inputStat[1].inputRowSize); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeBool(pEncoder, PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP, pNode->seqWinGroup); + } return code; } @@ -2604,6 +2608,9 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1: code = tlvDecodeI32(pTlv, &pNode->inputStat[1].inputRowSize); break; + case PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP: + code = tlvDecodeBool(pTlv, &pNode->seqWinGroup); + break; default: break; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f026b29d04..f51f297491 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -314,6 +314,10 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMet static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery); static int32_t setRefreshMeta(STranslateContext* pCxt, SQuery* pQuery); +static bool isWindowJoinStmt(SSelectStmt* pSelect) { + return (QUERY_NODE_JOIN_TABLE == nodeType(pSelect->pFromTable)) && IS_WINDOW_JOIN(((SJoinTableNode*)pSelect->pFromTable)->subType); +} + static int32_t replacePsedudoColumnFuncWithColumn(STranslateContext* pCxt, SNode** ppNode); static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; } @@ -2706,6 +2710,32 @@ static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); } +static bool isWindowJoinProbeTablePrimCol(SSelectStmt* pSelect, SNode* pNode) { + if (QUERY_NODE_COLUMN != nodeType(pNode)) { + return false; + } + + SColumnNode* pCol = (SColumnNode*)pNode; + SJoinTableNode* pJoinTable = (SJoinTableNode*)pSelect->pFromTable; + SRealTableNode* pProbeTable = NULL; + switch (pJoinTable->joinType) { + case JOIN_TYPE_LEFT: + pProbeTable = (SRealTableNode*)pJoinTable->pLeft; + break; + case JOIN_TYPE_RIGHT: + pProbeTable = (SRealTableNode*)pJoinTable->pRight; + break; + default: + return false; + } + + if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID && 0 == strcmp(pCol->dbName, pProbeTable->table.dbName) && 0 == strcmp(pCol->tableAlias, pProbeTable->table.tableAlias)) { + return true; + } + + return false; +} + static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { STranslateContext* pCxt = (STranslateContext*)pContext; SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; @@ -2733,6 +2763,11 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { return rewriteExprToGroupKeyFunc(pCxt, pNode); } } + if (isWindowJoinStmt(pSelect)) { + if (isWindowJoinProbeTablePrimCol(pSelect, *pNode)) { + return rewriteExprToGroupKeyFunc(pCxt, pNode); + } + } if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { if (pSelect->selectFuncNum > 1 || pSelect->hasOtherVectorFunc || !pSelect->hasSelectFunc || (isDistinctOrderBy(pCxt) && pCxt->currClause == SQL_CLAUSE_ORDER_BY)) { @@ -2753,7 +2788,7 @@ static int32_t checkExprForGroupBy(STranslateContext* pCxt, SNode** pNode) { } static int32_t checkExprListForGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect, SNodeList* pList) { - if (NULL == getGroupByList(pCxt) && NULL == pSelect->pWindow) { + if (NULL == getGroupByList(pCxt) && NULL == pSelect->pWindow && (!isWindowJoinStmt(pSelect) || !pSelect->hasAggFuncs)) { return TSDB_CODE_SUCCESS; } nodesRewriteExprs(pList, doCheckExprForGroupBy, pCxt); @@ -2826,7 +2861,7 @@ static int32_t resetSelectFuncNumWithoutDup(SSelectStmt* pSelect) { } static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) { - if (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow || + if (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow || isWindowJoinStmt(pSelect) || (!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc)) { return TSDB_CODE_SUCCESS; } @@ -2850,7 +2885,7 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) static int32_t checkHavingGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) { int32_t code = TSDB_CODE_SUCCESS; - if (NULL == getGroupByList(pCxt) && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow) { + if (NULL == getGroupByList(pCxt) && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow && !isWindowJoinStmt(pSelect)) { return code; } if (NULL != pSelect->pHaving) { @@ -2867,13 +2902,39 @@ static int32_t checkHavingGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) return code; } -static int32_t checkWindowFuncCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) { - if (NULL == pSelect->pWindow) { - return TSDB_CODE_SUCCESS; +static EDealRes searchAggFuncNode(SNode* pNode, void* pContext) { + if (QUERY_NODE_FUNCTION == nodeType(pNode)) { + SFunctionNode* pFunc = (SFunctionNode*)pNode; + if (fmIsAggFunc(pFunc->funcId)) { + *(bool*)pContext = true; + return DEAL_RES_END; + } } + return DEAL_RES_CONTINUE; +} + + +static int32_t checkWindowGrpFuncCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) { if (NULL != pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasStateKey) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN); } + if (isWindowJoinStmt(pSelect)) { + if (!pSelect->hasAggFuncs && NULL != pSelect->pHaving) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR); + } +/* + if (NULL != pSelect->pHaving) { + bool hasFunc = false; + nodesWalkExpr(pSelect->pHaving, searchAggFuncNode, &hasFunc); + if (!hasFunc) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR); + } + } +*/ + if (pSelect->hasAggFuncs) { + return checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList); + } + } return TSDB_CODE_SUCCESS; } @@ -3911,10 +3972,19 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect } static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) { - if (NULL == pSelect->pGroupByList && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow && + if (NULL == pSelect->pGroupByList && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow && !isWindowJoinStmt(pSelect) && NULL != pSelect->pHaving) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION); } + if (isWindowJoinStmt(pSelect)) { + if (NULL != pSelect->pHaving) { + bool hasFunc = false; + nodesWalkExpr(pSelect->pHaving, searchAggFuncNode, &hasFunc); + if (!hasFunc) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR); + } + } + } pCxt->currClause = SQL_CLAUSE_HAVING; int32_t code = translateExpr(pCxt, &pSelect->pHaving); return code; @@ -4781,21 +4851,53 @@ static int32_t createPrimaryKeyColByTable(STranslateContext* pCxt, STableNode* p return TSDB_CODE_SUCCESS; } -static int32_t createPrimaryKeyCol(STranslateContext* pCxt, SNode** pPrimaryKey) { +static int32_t tranCreatePrimaryKeyCol(STranslateContext* pCxt, const char* tableAlias, SNode** pPrimaryKey) { STableNode* pTable = NULL; - int32_t code = findTable(pCxt, NULL, &pTable); + int32_t code = findTable(pCxt, tableAlias, &pTable); if (TSDB_CODE_SUCCESS == code) { code = createPrimaryKeyColByTable(pCxt, pTable, pPrimaryKey); } return code; } +static EDealRes collectTableAlias(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN != nodeType(pNode)) { + return DEAL_RES_CONTINUE; + } + + SColumnNode* pCol = (SColumnNode*)pNode; + if (NULL == *(void**)pContext) { + SSHashObj* pHash = tSimpleHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY)); + if (NULL == pHash) { + return DEAL_RES_ERROR; + } + *(SSHashObj**)pContext = pHash; + } + + tSimpleHashPut(*(SSHashObj**)pContext, pCol->tableAlias, strlen(pCol->tableAlias), pCol->tableAlias, sizeof(pCol->tableAlias)); + + return DEAL_RES_CONTINUE; +} + static EDealRes appendTsForImplicitTsFuncImpl(SNode* pNode, void* pContext) { STranslateContext* pCxt = pContext; if (isImplicitTsFunc(pNode)) { SFunctionNode* pFunc = (SFunctionNode*)pNode; SNode* pPrimaryKey = NULL; - pCxt->errCode = createPrimaryKeyCol(pCxt, &pPrimaryKey); + SSHashObj* pTableAlias = NULL; + nodesWalkExprs(pFunc->pParameterList, collectTableAlias, &pTableAlias); + if (NULL == pTableAlias) { + pCxt->errCode = tranCreatePrimaryKeyCol(pCxt, NULL, &pPrimaryKey); + } else { + if (tSimpleHashGetSize(pTableAlias) > 1) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_FUNC); + } + char* tableAlias = NULL; + int32_t iter = 0; + tableAlias = tSimpleHashIterate(pTableAlias, tableAlias, &iter); + pCxt->errCode = tranCreatePrimaryKeyCol(pCxt, tableAlias, &pPrimaryKey); + tSimpleHashCleanup(pTableAlias); + } if (TSDB_CODE_SUCCESS == pCxt->errCode) { pCxt->errCode = nodesListMakeStrictAppend(&pFunc->pParameterList, pPrimaryKey); } @@ -4899,6 +5001,69 @@ static int32_t translateSelectWithoutFrom(STranslateContext* pCxt, SSelectStmt* return translateExprList(pCxt, pSelect->pProjectionList); } +static int32_t translateWinJoin(STranslateContext* pCxt, SSelectStmt* pSelect) { + if (QUERY_NODE_JOIN_TABLE != nodeType(pSelect->pFromTable) || !pSelect->hasAggFuncs) { + return TSDB_CODE_SUCCESS; + } + + SJoinTableNode* pJoinTable = (SJoinTableNode*)pSelect->pFromTable; + if (JOIN_STYPE_WIN != pJoinTable->subType) { + return TSDB_CODE_SUCCESS; + } + + if (pSelect->pGroupByList || pSelect->pPartitionByList || pSelect->pWindow) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GRP_WINDOW_NOT_ALLOWED); + } + +/* + SRealTableNode* pProbeTable = NULL; + switch (pJoinTable->joinType) { + case JOIN_TYPE_LEFT: + pProbeTable = (SRealTableNode*)pJoinTable->pLeft; + break; + case JOIN_TYPE_RIGHT: + pProbeTable = (SRealTableNode*)pJoinTable->pRight; + break; + default: + return TSDB_CODE_PAR_INTERNAL_ERROR; + } + + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSchema* pColSchema = &pProbeTable->pMeta->schema[0]; + strcpy(pCol->dbName, pProbeTable->table.dbName); + strcpy(pCol->tableAlias, pProbeTable->table.tableAlias); + strcpy(pCol->tableName, pProbeTable->table.tableName); + strcpy(pCol->colName, pColSchema->name); + strcpy(pCol->node.aliasName, pColSchema->name); + strcpy(pCol->node.userAlias, pColSchema->name); + pCol->tableId = pProbeTable->pMeta->uid; + pCol->tableType = pProbeTable->pMeta->tableType; + pCol->colId = pColSchema->colId; + pCol->colType = COLUMN_TYPE_COLUMN; + pCol->hasIndex = (pColSchema != NULL && IS_IDX_ON(pColSchema)); + pCol->node.resType.type = pColSchema->type; + pCol->node.resType.bytes = pColSchema->bytes; + pCol->node.resType.precision = pProbeTable->pMeta->tableInfo.precision; + + SGroupingSetNode* groupingSet = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET); + if (NULL == groupingSet) { + nodesDestroyNode((SNode*)pCol); + return TSDB_CODE_OUT_OF_MEMORY; + } + groupingSet->groupingSetType = GP_TYPE_NORMAL; + groupingSet->pParameterList = nodesMakeList(); + nodesListAppend(groupingSet->pParameterList, (SNode*)pCol); + + pSelect->pGroupByList = nodesMakeList(); + nodesListAppend(pSelect->pGroupByList, (SNode*)groupingSet); +*/ + return TSDB_CODE_SUCCESS; +} + static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect) { pCxt->pCurrStmt = (SNode*)pSelect; int32_t code = translateFrom(pCxt, &pSelect->pFromTable); @@ -4933,12 +5098,15 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect if (TSDB_CODE_SUCCESS == code) { code = checkIsEmptyResult(pCxt, pSelect); } + if (TSDB_CODE_SUCCESS == code) { + code = translateWinJoin(pCxt, pSelect); + } if (TSDB_CODE_SUCCESS == code) { resetSelectFuncNumWithoutDup(pSelect); code = checkAggColCoexist(pCxt, pSelect); } if (TSDB_CODE_SUCCESS == code) { - code = checkWindowFuncCoexist(pCxt, pSelect); + code = checkWindowGrpFuncCoexist(pCxt, pSelect); } if (TSDB_CODE_SUCCESS == code) { code = checkLimit(pCxt, pSelect); diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index 7bf904a37b..35543ba1ad 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -192,6 +192,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Out of memory"; case TSDB_CODE_PAR_ORDERBY_AMBIGUOUS: return "ORDER BY \"%s\" is ambiguous"; + case TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR: + return "Not supported window join having expr"; default: return "Unknown error"; } diff --git a/source/libs/planner/inc/planInt.h b/source/libs/planner/inc/planInt.h index d4074e1373..8ea23d8b7e 100644 --- a/source/libs/planner/inc/planInt.h +++ b/source/libs/planner/inc/planInt.h @@ -55,6 +55,7 @@ bool isPartTagAgg(SAggLogicNode* pAgg); bool isPartTableWinodw(SWindowLogicNode* pWindow); bool keysHasCol(SNodeList* pKeys); bool keysHasTbname(SNodeList* pKeys); +SFunctionNode* createGroupKeyAggFunc(SColumnNode* pGroupCol); #define CLONE_LIMIT 1 #define CLONE_SLIMIT 1 << 1 diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index bc48012d01..5ed6d03106 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -550,6 +550,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->pJLimit = nodesCloneNode(pJoinTable->pJLimit); pJoin->winPrimEqCond = nodesCloneNode(pJoinTable->winPrimCond); pJoin->node.pChildren = nodesMakeList(); + pJoin->seqWinGroup = (JOIN_STYPE_WIN == pJoinTable->subType) && pSelect->hasAggFuncs; if (NULL == pJoin->node.pChildren) { code = TSDB_CODE_OUT_OF_MEMORY; } @@ -713,6 +714,10 @@ static EGroupAction getDistinctGroupAction(SLogicPlanContext* pCxt, SSelectStmt* : GROUP_ACTION_NONE; } +static bool isWindowJoinStmt(SSelectStmt * pSelect) { + return (QUERY_NODE_JOIN_TABLE == nodeType(pSelect->pFromTable) && IS_WINDOW_JOIN(((SJoinTableNode*)pSelect->pFromTable)->subType)); +} + static EGroupAction getGroupAction(SLogicPlanContext* pCxt, SSelectStmt* pSelect) { return ((pCxt->pPlanCxt->streamQuery || NULL != pSelect->pLimit || NULL != pSelect->pSlimit) && !pSelect->isDistinct) ? GROUP_ACTION_KEEP : GROUP_ACTION_NONE; @@ -723,6 +728,49 @@ static EDataOrderLevel getRequireDataOrder(bool needTimeline, SSelectStmt* pSele : DATA_ORDER_LEVEL_NONE; } +static int32_t addWinJoinPrimKeyToAggFuncs(SSelectStmt* pSelect, SNodeList** pList) { + SNodeList* pTargets = (NULL == *pList) ? nodesMakeList() : *pList; + SJoinTableNode* pJoinTable = (SJoinTableNode*)pSelect->pFromTable; + SRealTableNode* pProbeTable = NULL; + switch (pJoinTable->joinType) { + case JOIN_TYPE_LEFT: + pProbeTable = (SRealTableNode*)pJoinTable->pLeft; + break; + case JOIN_TYPE_RIGHT: + pProbeTable = (SRealTableNode*)pJoinTable->pRight; + break; + default: + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } + + SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pCol) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSchema* pColSchema = &pProbeTable->pMeta->schema[0]; + strcpy(pCol->dbName, pProbeTable->table.dbName); + strcpy(pCol->tableAlias, pProbeTable->table.tableAlias); + strcpy(pCol->tableName, pProbeTable->table.tableName); + strcpy(pCol->colName, pColSchema->name); + strcpy(pCol->node.aliasName, pColSchema->name); + strcpy(pCol->node.userAlias, pColSchema->name); + pCol->tableId = pProbeTable->pMeta->uid; + pCol->tableType = pProbeTable->pMeta->tableType; + pCol->colId = pColSchema->colId; + pCol->colType = COLUMN_TYPE_COLUMN; + pCol->hasIndex = (pColSchema != NULL && IS_IDX_ON(pColSchema)); + pCol->node.resType.type = pColSchema->type; + pCol->node.resType.bytes = pColSchema->bytes; + pCol->node.resType.precision = pProbeTable->pMeta->tableInfo.precision; + + SNode* pFunc = (SNode*)createGroupKeyAggFunc(pCol); + + nodesListAppend(pTargets, pFunc); + + return TSDB_CODE_SUCCESS; +} + static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { if (!pSelect->hasAggFuncs && NULL == pSelect->pGroupByList) { return TSDB_CODE_SUCCESS; @@ -733,14 +781,16 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, return TSDB_CODE_OUT_OF_MEMORY; } + bool winJoin = isWindowJoinStmt(pSelect); pAgg->hasLastRow = pSelect->hasLastRowFunc; pAgg->hasLast = pSelect->hasLastFunc; pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc; pAgg->hasGroupKeyOptimized = false; pAgg->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc; - pAgg->node.groupAction = getGroupAction(pCxt, pSelect); + pAgg->node.groupAction = winJoin ? GROUP_ACTION_NONE : getGroupAction(pCxt, pSelect); pAgg->node.requireDataOrder = getRequireDataOrder(pAgg->hasTimeLineFunc, pSelect); pAgg->node.resultDataOrder = pAgg->onlyHasKeepOrderFunc ? pAgg->node.requireDataOrder : DATA_ORDER_LEVEL_NONE; + pAgg->node.forceCreateNonBlockingOptr = winJoin ? true : false; int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 90cf0590ee..eb27a46aa2 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -898,6 +898,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi pJoin->pWindowOffset = nodesCloneNode(pJoinLogicNode->pWindowOffset); pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit); pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; + pJoin->seqWinGroup = pJoinLogicNode->seqWinGroup; SDataBlockDescNode* pLeftDesc = NULL; SDataBlockDescNode* pRightDesc = NULL; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index d9a7475f59..6746829f7e 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1008,27 +1008,6 @@ static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitI return code; } -static SFunctionNode* createGroupKeyAggFunc(SColumnNode* pGroupCol) { - SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); - if (pFunc) { - strcpy(pFunc->functionName, "_group_key"); - strcpy(pFunc->node.aliasName, pGroupCol->node.aliasName); - strcpy(pFunc->node.userAlias, pGroupCol->node.userAlias); - int32_t code = nodesListMakeStrictAppend(&pFunc->pParameterList, nodesCloneNode((SNode*)pGroupCol)); - if (code == TSDB_CODE_SUCCESS) { - code = fmGetFuncInfo(pFunc, NULL, 0); - } - if (TSDB_CODE_SUCCESS != code) { - nodesDestroyNode((SNode*)pFunc); - pFunc = NULL; - } - char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0}; - int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", pFunc->functionName, pFunc); - taosCreateMD5Hash(name, len); - strncpy(pFunc->node.aliasName, name, TSDB_COL_NAME_LEN - 1); - } - return pFunc; -} /** * @brief For pipelined agg node, add a SortMergeNode to merge result from vnodes. diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 74b325a298..5e6341c770 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -542,3 +542,27 @@ bool keysHasCol(SNodeList* pKeys) { nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol); return hasCol; } + +SFunctionNode* createGroupKeyAggFunc(SColumnNode* pGroupCol) { + SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION); + if (pFunc) { + strcpy(pFunc->functionName, "_group_key"); + strcpy(pFunc->node.aliasName, pGroupCol->node.aliasName); + strcpy(pFunc->node.userAlias, pGroupCol->node.userAlias); + int32_t code = nodesListMakeStrictAppend(&pFunc->pParameterList, nodesCloneNode((SNode*)pGroupCol)); + if (code == TSDB_CODE_SUCCESS) { + code = fmGetFuncInfo(pFunc, NULL, 0); + } + if (TSDB_CODE_SUCCESS != code) { + nodesDestroyNode((SNode*)pFunc); + pFunc = NULL; + } + char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0}; + int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", pFunc->functionName, pFunc); + taosCreateMD5Hash(name, len); + strncpy(pFunc->node.aliasName, name, TSDB_COL_NAME_LEN - 1); + } + return pFunc; +} + + diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8625c2ea60..edb902b504 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -621,6 +621,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream quer TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR, "Invalid window join having expr") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_GRP_WINDOW_NOT_ALLOWED, "GROUP BY/PARTITION BY/WINDOW-clause can't be used in WINDOW join") + //planner TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error") diff --git a/tests/script/tsim/join/left_win_join.sim b/tests/script/tsim/join/left_win_join.sim index 934a860f7e..8ddc7b681e 100644 --- a/tests/script/tsim/join/left_win_join.sim +++ b/tests/script/tsim/join/left_win_join.sim @@ -175,3 +175,220 @@ sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(1a, -1h) if $rows != 9 then return -1 endi + +sql select count(*) from sta a left window join sta b window_offset(-1s, 1s); +if $rows != 8 then + return -1 +endi +if $data00 != 3 then + return -1 +endi +if $data10 != 3 then + return -1 +endi + +sql select a.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s); +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != 3 then + return -1 +endi +if $data20 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data21 != 4 then + return -1 +endi + +sql select a.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4; +if $rows != 7 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != 2 then + return -1 +endi +if $data20 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data21 != 3 then + return -1 +endi + +sql select a.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 having(count(*) != 2); +if $rows != 3 then + return -1 +endi +if $data00 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data01 != 3 then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != 3 then + return -1 +endi +if $data20 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data21 != 1 then + return -1 +endi + +sql select a.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 having(count(*) != 2) order by count(*); +if $rows != 3 then + return -1 +endi +if $data00 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data11 != 3 then + return -1 +endi +if $data21 != 3 then + return -1 +endi + +sql select a.ts from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 order by count(*); +if $rows != 7 then + return -1 +endi +sql select a.ts from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 order by count(*), a.ts; +if $rows != 7 then + return -1 +endi +if $data00 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:02.000@ then + return -1 +endi + +sql select a.ts, count(*),last(b.ts) from sta a left window join sta b window_offset(-1s, 1s); +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data02 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data12 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data22 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data32 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data42 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data52 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data62 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data70 != @23-11-17 16:29:05.000@ then + return -1 +endi +if $data72 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, count(*),last(b.ts) from sta a left window join sta b window_offset(-1s, 1s) limit 1; +if $rows != 1 then + return -1 +endi + + +sql select timetruncate(a.ts, 1m), count(*) from sta a left window join sta b window_offset(-1s, 1s); +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != 3 then + return -1 +endi +if $data20 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data21 != 4 then + return -1 +endi + +sql select a.ts+1s, count(*) from sta a left window join sta b window_offset(-1s, 1s); +if $rows != 8 then + return -1 +endi +if $data00 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data01 != 3 then + return -1 +endi +if $data20 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data21 != 4 then + return -1 +endi + +sql_error select a.col1, count(*) from sta a left window join sta b window_offset(-1s, 1s); +sql_error select b.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s); +sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(b.ts > 0); +sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(a.col1 > 0); +sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(a.col1 > 0); +sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(a.ts > "2023-11-17 16:29:00.000"); +sql_error select a.ts from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 having(a.ts > 0) order by count(*); +sql_error select a.ts, count(*),last(b.ts) from sta a left window join sta b window_offset(-1s, 1s) slimit 1;