fix: window join issues

This commit is contained in:
dapan1121 2024-02-22 18:59:10 +08:00
parent 98ed2458ea
commit 807bba9215
17 changed files with 539 additions and 60 deletions

View File

@ -140,6 +140,7 @@ typedef struct SJoinLogicNode {
bool isSingleTableJoin; bool isSingleTableJoin;
bool hasSubQuery; bool hasSubQuery;
bool isLowLevelJoin; bool isLowLevelJoin;
bool seqWinGroup;
} SJoinLogicNode; } SJoinLogicNode;
typedef struct SAggLogicNode { typedef struct SAggLogicNode {
@ -499,6 +500,7 @@ typedef struct SSortMergeJoinPhysiNode {
SNode* pFullOnCond; SNode* pFullOnCond;
SNodeList* pTargets; SNodeList* pTargets;
SQueryStat inputStat[2]; SQueryStat inputStat[2];
bool seqWinGroup;
} SSortMergeJoinPhysiNode; } SSortMergeJoinPhysiNode;
typedef struct SHashJoinPhysiNode { typedef struct SHashJoinPhysiNode {

View File

@ -758,6 +758,8 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D) #define TSDB_CODE_PAR_COL_QUERY_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x266D)
#define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E) #define TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE TAOS_DEF_ERROR_CODE(0, 0x266E)
#define TSDB_CODE_PAR_ORDERBY_AMBIGUOUS TAOS_DEF_ERROR_CODE(0, 0x266F) #define TSDB_CODE_PAR_ORDERBY_AMBIGUOUS TAOS_DEF_ERROR_CODE(0, 0x266F)
#define TSDB_CODE_PAR_GRP_WINDOW_NOT_ALLOWED TAOS_DEF_ERROR_CODE(0, 0x2670)
#define TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR TAOS_DEF_ERROR_CODE(0, 0x2671)
#define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF) #define TSDB_CODE_PAR_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x26FF)
//planner //planner

View File

@ -167,6 +167,7 @@ typedef struct SMJoinGrpRows {
SSDataBlock* finBlk; \ SSDataBlock* finBlk; \
bool lastEqGrp; \ bool lastEqGrp; \
bool lastProbeGrp; \ bool lastProbeGrp; \
bool seqWinGrp; \
int32_t blkThreshold; \ int32_t blkThreshold; \
int64_t jLimit int64_t jLimit
@ -182,6 +183,7 @@ typedef struct SMJoinMergeCtx {
SSDataBlock* finBlk; SSDataBlock* finBlk;
bool lastEqGrp; bool lastEqGrp;
bool lastProbeGrp; bool lastProbeGrp;
bool seqWinGrp;
int32_t blkThreshold; int32_t blkThreshold;
int64_t jLimit; int64_t jLimit;
// KEEP IT FIRST // KEEP IT FIRST
@ -224,6 +226,7 @@ typedef struct SMJoinWindowCtx {
SSDataBlock* finBlk; SSDataBlock* finBlk;
bool lastEqGrp; bool lastEqGrp;
bool lastProbeGrp; bool lastProbeGrp;
bool seqWinGrp;
int32_t blkThreshold; int32_t blkThreshold;
int64_t jLimit; int64_t jLimit;
// KEEP IT FIRST // KEEP IT FIRST
@ -231,11 +234,11 @@ typedef struct SMJoinWindowCtx {
int32_t asofOpType; int32_t asofOpType;
int64_t winBeginOffset; int64_t winBeginOffset;
int64_t winEndOffset; int64_t winEndOffset;
bool winProjection;
bool lowerRowsAcq; bool lowerRowsAcq;
bool eqRowsAcq; bool eqRowsAcq;
bool greaterRowsAcq; bool greaterRowsAcq;
int64_t seqGrpId;
int64_t winBeginTs; int64_t winBeginTs;
int64_t winEndTs; int64_t winEndTs;
bool eqPostDone; bool eqPostDone;
@ -420,7 +423,7 @@ bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append
int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond); int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond);
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx); int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx);
int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp); int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp);
int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp); int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp, bool singleProbeRow);
int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin); int32_t mJoinCopyMergeMidBlk(SMJoinMergeCtx* pCtx, SSDataBlock** ppMid, SSDataBlock** ppFin);
int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx); int32_t mJoinFilterAndMarkRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startGrpIdx, int32_t startRowIdx);
int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx); int32_t mJoinFilterAndMarkHashRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SMJoinTableCtx* build, int32_t startRowIdx);

View File

@ -34,15 +34,20 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
SMJoinWinCache* cache = &pCtx->cache; SMJoinWinCache* cache = &pCtx->cache;
int32_t buildGrpNum = taosArrayGetSize(cache->grps); int32_t buildGrpNum = taosArrayGetSize(cache->grps);
int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit); int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit);
pCtx->finBlk->info.id.groupId = pCtx->seqWinGrp ? pCtx->seqGrpId : 0;
if (buildGrpNum <= 0 || buildTotalRows <= 0) { if (buildGrpNum <= 0 || buildTotalRows <= 0) {
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true); MJ_ERR_RET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, pCtx->seqWinGrp));
pCtx->seqGrpId++;
return TSDB_CODE_SUCCESS;
} }
SMJoinGrpRows* probeGrp = &pCtx->probeGrp; SMJoinGrpRows* probeGrp = &pCtx->probeGrp;
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx; int32_t probeEndIdx = probeGrp->endIdx;
if (0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) { if ((!pCtx->seqWinGrp) && 0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) {
SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0); SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0);
if (pFirstBuild->readIdx == pFirstBuild->beginIdx) { if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) { for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) {
@ -82,6 +87,11 @@ int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
if (cache->grpIdx >= buildGrpNum) { if (cache->grpIdx >= buildGrpNum) {
cache->grpIdx = 0; cache->grpIdx = 0;
++probeGrp->readIdx; ++probeGrp->readIdx;
pCtx->seqGrpId++;
if (pCtx->seqWinGrp) {
break;
}
} }
if (rowsLeft <= 0) { if (rowsLeft <= 0) {
@ -489,7 +499,7 @@ static FORCE_INLINE int32_t mLeftJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
} }
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true); return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);
} }
SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) { SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
@ -575,7 +585,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true)); MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -812,7 +822,7 @@ static FORCE_INLINE int32_t mFullJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
} }
return pCtx->lastProbeGrp ? mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true) : mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false); return pCtx->lastProbeGrp ? mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false) : mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false);
} }
static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) { static bool mFullJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin) {
@ -1170,7 +1180,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true)); MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -1191,7 +1201,7 @@ SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator) {
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows; pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false)); MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -1512,7 +1522,7 @@ static FORCE_INLINE int32_t mAntiJoinHandleGrpRemains(SMJoinMergeCtx* pCtx) {
return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx); return (pCtx->hashJoin) ? (*pCtx->hashCartFp)(pCtx) : (*pCtx->mergeCartFp)(pCtx);
} }
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true); return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);
} }
static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) { static int32_t mAntiJoinHashFullCart(SMJoinMergeCtx* pCtx) {
@ -1766,7 +1776,7 @@ SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator) {
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true)); MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk; return pCtx->finBlk;
} }
@ -1877,7 +1887,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow
int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) { int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
if (NULL == pCtx->cache.outBlk || pCtx->cache.outBlk->info.rows <= 0) { if (NULL == pCtx->cache.outBlk || pCtx->cache.outBlk->info.rows <= 0) {
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true); return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, false);
} }
int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; int32_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
@ -2523,7 +2533,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) {
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx; pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1; pCtx->probeGrp.endIdx = pJoin->probe->blk->info.rows - 1;
MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true)); MJ_ERR_JRET(mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true, false));
pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows; pJoin->probe->blkRowIdx = pJoin->probe->blk->info.rows;
@ -2862,10 +2872,6 @@ int32_t mWinJoinMoveFillWinCache(SMJoinWindowCtx* pCtx) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t mWinJoinDumpWinCache(SMJoinWindowCtx* pCtx) {
return pCtx->winProjection ? mWinJoinDumpGrpCache(pCtx) : TSDB_CODE_SUCCESS;
}
SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
@ -2876,8 +2882,8 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
blockDataCleanup(pCtx->finBlk); blockDataCleanup(pCtx->finBlk);
if (pCtx->grpRemains) { if (pCtx->grpRemains) {
MJ_ERR_JRET(mWinJoinDumpWinCache(pCtx)); MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (pCtx->finBlk->info.rows >= pCtx->blkThreshold || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
pCtx->grpRemains = false; pCtx->grpRemains = false;
@ -2902,9 +2908,9 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
MJ_ERR_JRET(mWinJoinMoveFillWinCache(pCtx)); MJ_ERR_JRET(mWinJoinMoveFillWinCache(pCtx));
} }
MJ_ERR_JRET(mWinJoinDumpWinCache(pCtx)); MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { if (pCtx->finBlk->info.rows >= pCtx->blkThreshold || (pCtx->finBlk->info.rows > 0 && pCtx->seqWinGrp)) {
return pCtx->finBlk; return pCtx->finBlk;
} }
} }
@ -2947,6 +2953,8 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
pCtx->pJoin = pJoin; pCtx->pJoin = pJoin;
pCtx->lastTs = INT64_MIN; pCtx->lastTs = INT64_MIN;
pCtx->seqWinGrp = pJoinNode->seqWinGroup;
pCtx->seqGrpId = 1;
switch (pJoinNode->subType) { switch (pJoinNode->subType) {
case JOIN_STYPE_ASOF: case JOIN_STYPE_ASOF:
@ -2967,7 +2975,6 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
SValueNode* pWinBegin = (SValueNode*)pOffsetNode->pStartOffset; SValueNode* pWinBegin = (SValueNode*)pOffsetNode->pStartOffset;
SValueNode* pWinEnd = (SValueNode*)pOffsetNode->pEndOffset; SValueNode* pWinEnd = (SValueNode*)pOffsetNode->pEndOffset;
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : INT64_MAX; pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : INT64_MAX;
pCtx->winProjection = true;
pCtx->winBeginOffset = pWinBegin->datum.i; pCtx->winBeginOffset = pWinBegin->datum.i;
pCtx->winEndOffset = pWinEnd->datum.i; pCtx->winEndOffset = pWinEnd->datum.i;
pCtx->eqRowsAcq = (pCtx->winBeginOffset <= 0 && pCtx->winEndOffset >= 0); pCtx->eqRowsAcq = (pCtx->winBeginOffset <= 0 && pCtx->winEndOffset >= 0);
@ -3007,6 +3014,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
pCtx->pJoin = pJoin; pCtx->pJoin = pJoin;
pCtx->lastEqTs = INT64_MIN; pCtx->lastEqTs = INT64_MIN;
pCtx->hashCan = pJoin->probe->keyNum > 0; pCtx->hashCan = pJoin->probe->keyNum > 0;
if (JOIN_STYPE_ASOF == pJoinNode->subType || JOIN_STYPE_WIN == pJoinNode->subType) { if (JOIN_STYPE_ASOF == pJoinNode->subType || JOIN_STYPE_WIN == pJoinNode->subType) {
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
pJoin->subType = JOIN_STYPE_OUTER; pJoin->subType = JOIN_STYPE_OUTER;

View File

@ -451,7 +451,7 @@ int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool app
} }
int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp) { int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp, bool singleProbeRow) {
pCtx->lastEqGrp = false; pCtx->lastEqGrp = false;
pCtx->lastProbeGrp = probeGrp; pCtx->lastProbeGrp = probeGrp;
@ -461,6 +461,10 @@ int32_t mJoinNonEqCart(SMJoinCommonCtx* pCtx, SMJoinGrpRows* pGrp, bool probeGrp
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (probeGrp && singleProbeRow) {
rowsLeft = 1;
}
if (GRP_REMAIN_ROWS(pGrp) <= rowsLeft) { if (GRP_REMAIN_ROWS(pGrp) <= rowsLeft) {
MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp)); MJ_ERR_RET(mJoinNonEqGrpCart(pCtx->pJoin, pCtx->finBlk, true, pGrp, probeGrp));
pGrp->readIdx = pGrp->endIdx + 1; pGrp->readIdx = pGrp->endIdx + 1;
@ -636,7 +640,7 @@ int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnI
break; break;
} }
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true); return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeNEqGrp, true, false);
} }
int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) { int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs) {
@ -655,7 +659,7 @@ int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum
break; break;
} }
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false); return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->buildNEqGrp, false, false);
} }
@ -844,7 +848,7 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) { static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType))) if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType)))
|| (JOIN_STYPE_WIN == pJoin->subType && !WIN_ONLY_EQ_ROW_INCLUDED(((SWindowOffsetNode*)pJoinNode->pWindowOffset)->pStartOffset, ((SWindowOffsetNode*)pJoinNode->pWindowOffset)->pEndOffset))) { || (JOIN_STYPE_WIN == pJoin->subType)) {
return mJoinInitWindowCtx(pJoin, pJoinNode); return mJoinInitWindowCtx(pJoin, pJoinNode);
} }

View File

@ -484,6 +484,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
CLONE_NODE_FIELD(pFullOnCond); CLONE_NODE_FIELD(pFullOnCond);
COPY_SCALAR_FIELD(isSingleTableJoin); COPY_SCALAR_FIELD(isSingleTableJoin);
COPY_SCALAR_FIELD(hasSubQuery); COPY_SCALAR_FIELD(hasSubQuery);
COPY_SCALAR_FIELD(seqWinGroup);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -2153,6 +2153,7 @@ static const char* jkJoinPhysiPlanLeftInputRowNum = "LeftInputRowNum";
static const char* jkJoinPhysiPlanRightInputRowNum = "RightInputRowNum"; static const char* jkJoinPhysiPlanRightInputRowNum = "RightInputRowNum";
static const char* jkJoinPhysiPlanLeftInputRowSize = "LeftInputRowSize"; static const char* jkJoinPhysiPlanLeftInputRowSize = "LeftInputRowSize";
static const char* jkJoinPhysiPlanRightInputRowSize = "RightInputRowSize"; static const char* jkJoinPhysiPlanRightInputRowSize = "RightInputRowSize";
static const char* jkJoinPhysiPlanSeqWinGroup = "seqWinGroup";
static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) { static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) {
const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj; const SSortMergeJoinPhysiNode* pNode = (const SSortMergeJoinPhysiNode*)pObj;
@ -2212,6 +2213,9 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize); code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkJoinPhysiPlanSeqWinGroup, pNode->seqWinGroup);
}
return code; return code;
} }
@ -2274,6 +2278,9 @@ static int32_t jsonToPhysiMergeJoinNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize, code); tjsonGetNumberValue(pJson, jkJoinPhysiPlanRightInputRowSize, pNode->inputStat[1].inputRowSize, code);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkJoinPhysiPlanSeqWinGroup, &pNode->seqWinGroup);
}
return code; return code;
} }

View File

@ -2475,7 +2475,8 @@ enum {
PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM0, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM0,
PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE0,
PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_NUM1,
PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1 PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1,
PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP
}; };
static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
@ -2536,6 +2537,9 @@ static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1, pNode->inputStat[1].inputRowSize); code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1, pNode->inputStat[1].inputRowSize);
} }
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP, pNode->seqWinGroup);
}
return code; return code;
} }
@ -2604,6 +2608,9 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1: case PHY_SORT_MERGE_JOIN_CODE_INPUT_ROW_SIZE1:
code = tlvDecodeI32(pTlv, &pNode->inputStat[1].inputRowSize); code = tlvDecodeI32(pTlv, &pNode->inputStat[1].inputRowSize);
break; break;
case PHY_SORT_MERGE_JOIN_CODE_SEQ_WIN_GROUP:
code = tlvDecodeBool(pTlv, &pNode->seqWinGroup);
break;
default: default:
break; break;
} }

View File

@ -314,6 +314,10 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMet
static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery); static int32_t setQuery(STranslateContext* pCxt, SQuery* pQuery);
static int32_t setRefreshMeta(STranslateContext* pCxt, SQuery* pQuery); static int32_t setRefreshMeta(STranslateContext* pCxt, SQuery* pQuery);
static bool isWindowJoinStmt(SSelectStmt* pSelect) {
return (QUERY_NODE_JOIN_TABLE == nodeType(pSelect->pFromTable)) && IS_WINDOW_JOIN(((SJoinTableNode*)pSelect->pFromTable)->subType);
}
static int32_t replacePsedudoColumnFuncWithColumn(STranslateContext* pCxt, SNode** ppNode); static int32_t replacePsedudoColumnFuncWithColumn(STranslateContext* pCxt, SNode** ppNode);
static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; } static bool afterGroupBy(ESqlClause clause) { return clause > SQL_CLAUSE_GROUP_BY; }
@ -2706,6 +2710,32 @@ static EDealRes rewriteExprToGroupKeyFunc(STranslateContext* pCxt, SNode** pNode
return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR); return (TSDB_CODE_SUCCESS == pCxt->errCode ? DEAL_RES_IGNORE_CHILD : DEAL_RES_ERROR);
} }
static bool isWindowJoinProbeTablePrimCol(SSelectStmt* pSelect, SNode* pNode) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return false;
}
SColumnNode* pCol = (SColumnNode*)pNode;
SJoinTableNode* pJoinTable = (SJoinTableNode*)pSelect->pFromTable;
SRealTableNode* pProbeTable = NULL;
switch (pJoinTable->joinType) {
case JOIN_TYPE_LEFT:
pProbeTable = (SRealTableNode*)pJoinTable->pLeft;
break;
case JOIN_TYPE_RIGHT:
pProbeTable = (SRealTableNode*)pJoinTable->pRight;
break;
default:
return false;
}
if (pCol->colId == PRIMARYKEY_TIMESTAMP_COL_ID && 0 == strcmp(pCol->dbName, pProbeTable->table.dbName) && 0 == strcmp(pCol->tableAlias, pProbeTable->table.tableAlias)) {
return true;
}
return false;
}
static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) { static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
STranslateContext* pCxt = (STranslateContext*)pContext; STranslateContext* pCxt = (STranslateContext*)pContext;
SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt; SSelectStmt* pSelect = (SSelectStmt*)pCxt->pCurrStmt;
@ -2733,6 +2763,11 @@ static EDealRes doCheckExprForGroupBy(SNode** pNode, void* pContext) {
return rewriteExprToGroupKeyFunc(pCxt, pNode); return rewriteExprToGroupKeyFunc(pCxt, pNode);
} }
} }
if (isWindowJoinStmt(pSelect)) {
if (isWindowJoinProbeTablePrimCol(pSelect, *pNode)) {
return rewriteExprToGroupKeyFunc(pCxt, pNode);
}
}
if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) { if (isScanPseudoColumnFunc(*pNode) || QUERY_NODE_COLUMN == nodeType(*pNode)) {
if (pSelect->selectFuncNum > 1 || pSelect->hasOtherVectorFunc || !pSelect->hasSelectFunc || if (pSelect->selectFuncNum > 1 || pSelect->hasOtherVectorFunc || !pSelect->hasSelectFunc ||
(isDistinctOrderBy(pCxt) && pCxt->currClause == SQL_CLAUSE_ORDER_BY)) { (isDistinctOrderBy(pCxt) && pCxt->currClause == SQL_CLAUSE_ORDER_BY)) {
@ -2753,7 +2788,7 @@ static int32_t checkExprForGroupBy(STranslateContext* pCxt, SNode** pNode) {
} }
static int32_t checkExprListForGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect, SNodeList* pList) { static int32_t checkExprListForGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect, SNodeList* pList) {
if (NULL == getGroupByList(pCxt) && NULL == pSelect->pWindow) { if (NULL == getGroupByList(pCxt) && NULL == pSelect->pWindow && (!isWindowJoinStmt(pSelect) || !pSelect->hasAggFuncs)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
nodesRewriteExprs(pList, doCheckExprForGroupBy, pCxt); nodesRewriteExprs(pList, doCheckExprForGroupBy, pCxt);
@ -2826,7 +2861,7 @@ static int32_t resetSelectFuncNumWithoutDup(SSelectStmt* pSelect) {
} }
static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow || if (NULL != pSelect->pGroupByList || NULL != pSelect->pWindow || isWindowJoinStmt(pSelect) ||
(!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc)) { (!pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc && !pSelect->hasInterpFunc)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -2850,7 +2885,7 @@ static int32_t checkAggColCoexist(STranslateContext* pCxt, SSelectStmt* pSelect)
static int32_t checkHavingGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t checkHavingGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (NULL == getGroupByList(pCxt) && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow) { if (NULL == getGroupByList(pCxt) && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow && !isWindowJoinStmt(pSelect)) {
return code; return code;
} }
if (NULL != pSelect->pHaving) { if (NULL != pSelect->pHaving) {
@ -2867,13 +2902,39 @@ static int32_t checkHavingGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect)
return code; return code;
} }
static int32_t checkWindowFuncCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) { static EDealRes searchAggFuncNode(SNode* pNode, void* pContext) {
if (NULL == pSelect->pWindow) { if (QUERY_NODE_FUNCTION == nodeType(pNode)) {
return TSDB_CODE_SUCCESS; SFunctionNode* pFunc = (SFunctionNode*)pNode;
if (fmIsAggFunc(pFunc->funcId)) {
*(bool*)pContext = true;
return DEAL_RES_END;
}
} }
return DEAL_RES_CONTINUE;
}
static int32_t checkWindowGrpFuncCoexist(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL != pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasStateKey) { if (NULL != pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasStateKey) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_NO_VALID_FUNC_IN_WIN);
} }
if (isWindowJoinStmt(pSelect)) {
if (!pSelect->hasAggFuncs && NULL != pSelect->pHaving) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR);
}
/*
if (NULL != pSelect->pHaving) {
bool hasFunc = false;
nodesWalkExpr(pSelect->pHaving, searchAggFuncNode, &hasFunc);
if (!hasFunc) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR);
}
}
*/
if (pSelect->hasAggFuncs) {
return checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList);
}
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -3911,10 +3972,19 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect
} }
static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t translateHaving(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pGroupByList && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow && if (NULL == pSelect->pGroupByList && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow && !isWindowJoinStmt(pSelect) &&
NULL != pSelect->pHaving) { NULL != pSelect->pHaving) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION); return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GROUPBY_LACK_EXPRESSION);
} }
if (isWindowJoinStmt(pSelect)) {
if (NULL != pSelect->pHaving) {
bool hasFunc = false;
nodesWalkExpr(pSelect->pHaving, searchAggFuncNode, &hasFunc);
if (!hasFunc) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR);
}
}
}
pCxt->currClause = SQL_CLAUSE_HAVING; pCxt->currClause = SQL_CLAUSE_HAVING;
int32_t code = translateExpr(pCxt, &pSelect->pHaving); int32_t code = translateExpr(pCxt, &pSelect->pHaving);
return code; return code;
@ -4781,21 +4851,53 @@ static int32_t createPrimaryKeyColByTable(STranslateContext* pCxt, STableNode* p
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t createPrimaryKeyCol(STranslateContext* pCxt, SNode** pPrimaryKey) { static int32_t tranCreatePrimaryKeyCol(STranslateContext* pCxt, const char* tableAlias, SNode** pPrimaryKey) {
STableNode* pTable = NULL; STableNode* pTable = NULL;
int32_t code = findTable(pCxt, NULL, &pTable); int32_t code = findTable(pCxt, tableAlias, &pTable);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = createPrimaryKeyColByTable(pCxt, pTable, pPrimaryKey); code = createPrimaryKeyColByTable(pCxt, pTable, pPrimaryKey);
} }
return code; return code;
} }
static EDealRes collectTableAlias(SNode* pNode, void* pContext) {
if (QUERY_NODE_COLUMN != nodeType(pNode)) {
return DEAL_RES_CONTINUE;
}
SColumnNode* pCol = (SColumnNode*)pNode;
if (NULL == *(void**)pContext) {
SSHashObj* pHash = tSimpleHashInit(3, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY));
if (NULL == pHash) {
return DEAL_RES_ERROR;
}
*(SSHashObj**)pContext = pHash;
}
tSimpleHashPut(*(SSHashObj**)pContext, pCol->tableAlias, strlen(pCol->tableAlias), pCol->tableAlias, sizeof(pCol->tableAlias));
return DEAL_RES_CONTINUE;
}
static EDealRes appendTsForImplicitTsFuncImpl(SNode* pNode, void* pContext) { static EDealRes appendTsForImplicitTsFuncImpl(SNode* pNode, void* pContext) {
STranslateContext* pCxt = pContext; STranslateContext* pCxt = pContext;
if (isImplicitTsFunc(pNode)) { if (isImplicitTsFunc(pNode)) {
SFunctionNode* pFunc = (SFunctionNode*)pNode; SFunctionNode* pFunc = (SFunctionNode*)pNode;
SNode* pPrimaryKey = NULL; SNode* pPrimaryKey = NULL;
pCxt->errCode = createPrimaryKeyCol(pCxt, &pPrimaryKey); SSHashObj* pTableAlias = NULL;
nodesWalkExprs(pFunc->pParameterList, collectTableAlias, &pTableAlias);
if (NULL == pTableAlias) {
pCxt->errCode = tranCreatePrimaryKeyCol(pCxt, NULL, &pPrimaryKey);
} else {
if (tSimpleHashGetSize(pTableAlias) > 1) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TIMELINE_FUNC);
}
char* tableAlias = NULL;
int32_t iter = 0;
tableAlias = tSimpleHashIterate(pTableAlias, tableAlias, &iter);
pCxt->errCode = tranCreatePrimaryKeyCol(pCxt, tableAlias, &pPrimaryKey);
tSimpleHashCleanup(pTableAlias);
}
if (TSDB_CODE_SUCCESS == pCxt->errCode) { if (TSDB_CODE_SUCCESS == pCxt->errCode) {
pCxt->errCode = nodesListMakeStrictAppend(&pFunc->pParameterList, pPrimaryKey); pCxt->errCode = nodesListMakeStrictAppend(&pFunc->pParameterList, pPrimaryKey);
} }
@ -4899,6 +5001,69 @@ static int32_t translateSelectWithoutFrom(STranslateContext* pCxt, SSelectStmt*
return translateExprList(pCxt, pSelect->pProjectionList); return translateExprList(pCxt, pSelect->pProjectionList);
} }
static int32_t translateWinJoin(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (QUERY_NODE_JOIN_TABLE != nodeType(pSelect->pFromTable) || !pSelect->hasAggFuncs) {
return TSDB_CODE_SUCCESS;
}
SJoinTableNode* pJoinTable = (SJoinTableNode*)pSelect->pFromTable;
if (JOIN_STYPE_WIN != pJoinTable->subType) {
return TSDB_CODE_SUCCESS;
}
if (pSelect->pGroupByList || pSelect->pPartitionByList || pSelect->pWindow) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_GRP_WINDOW_NOT_ALLOWED);
}
/*
SRealTableNode* pProbeTable = NULL;
switch (pJoinTable->joinType) {
case JOIN_TYPE_LEFT:
pProbeTable = (SRealTableNode*)pJoinTable->pLeft;
break;
case JOIN_TYPE_RIGHT:
pProbeTable = (SRealTableNode*)pJoinTable->pRight;
break;
default:
return TSDB_CODE_PAR_INTERNAL_ERROR;
}
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSchema* pColSchema = &pProbeTable->pMeta->schema[0];
strcpy(pCol->dbName, pProbeTable->table.dbName);
strcpy(pCol->tableAlias, pProbeTable->table.tableAlias);
strcpy(pCol->tableName, pProbeTable->table.tableName);
strcpy(pCol->colName, pColSchema->name);
strcpy(pCol->node.aliasName, pColSchema->name);
strcpy(pCol->node.userAlias, pColSchema->name);
pCol->tableId = pProbeTable->pMeta->uid;
pCol->tableType = pProbeTable->pMeta->tableType;
pCol->colId = pColSchema->colId;
pCol->colType = COLUMN_TYPE_COLUMN;
pCol->hasIndex = (pColSchema != NULL && IS_IDX_ON(pColSchema));
pCol->node.resType.type = pColSchema->type;
pCol->node.resType.bytes = pColSchema->bytes;
pCol->node.resType.precision = pProbeTable->pMeta->tableInfo.precision;
SGroupingSetNode* groupingSet = (SGroupingSetNode*)nodesMakeNode(QUERY_NODE_GROUPING_SET);
if (NULL == groupingSet) {
nodesDestroyNode((SNode*)pCol);
return TSDB_CODE_OUT_OF_MEMORY;
}
groupingSet->groupingSetType = GP_TYPE_NORMAL;
groupingSet->pParameterList = nodesMakeList();
nodesListAppend(groupingSet->pParameterList, (SNode*)pCol);
pSelect->pGroupByList = nodesMakeList();
nodesListAppend(pSelect->pGroupByList, (SNode*)groupingSet);
*/
return TSDB_CODE_SUCCESS;
}
static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect) {
pCxt->pCurrStmt = (SNode*)pSelect; pCxt->pCurrStmt = (SNode*)pSelect;
int32_t code = translateFrom(pCxt, &pSelect->pFromTable); int32_t code = translateFrom(pCxt, &pSelect->pFromTable);
@ -4933,12 +5098,15 @@ static int32_t translateSelectFrom(STranslateContext* pCxt, SSelectStmt* pSelect
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkIsEmptyResult(pCxt, pSelect); code = checkIsEmptyResult(pCxt, pSelect);
} }
if (TSDB_CODE_SUCCESS == code) {
code = translateWinJoin(pCxt, pSelect);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
resetSelectFuncNumWithoutDup(pSelect); resetSelectFuncNumWithoutDup(pSelect);
code = checkAggColCoexist(pCxt, pSelect); code = checkAggColCoexist(pCxt, pSelect);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkWindowFuncCoexist(pCxt, pSelect); code = checkWindowGrpFuncCoexist(pCxt, pSelect);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkLimit(pCxt, pSelect); code = checkLimit(pCxt, pSelect);

View File

@ -192,6 +192,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Out of memory"; return "Out of memory";
case TSDB_CODE_PAR_ORDERBY_AMBIGUOUS: case TSDB_CODE_PAR_ORDERBY_AMBIGUOUS:
return "ORDER BY \"%s\" is ambiguous"; return "ORDER BY \"%s\" is ambiguous";
case TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR:
return "Not supported window join having expr";
default: default:
return "Unknown error"; return "Unknown error";
} }

View File

@ -55,6 +55,7 @@ bool isPartTagAgg(SAggLogicNode* pAgg);
bool isPartTableWinodw(SWindowLogicNode* pWindow); bool isPartTableWinodw(SWindowLogicNode* pWindow);
bool keysHasCol(SNodeList* pKeys); bool keysHasCol(SNodeList* pKeys);
bool keysHasTbname(SNodeList* pKeys); bool keysHasTbname(SNodeList* pKeys);
SFunctionNode* createGroupKeyAggFunc(SColumnNode* pGroupCol);
#define CLONE_LIMIT 1 #define CLONE_LIMIT 1
#define CLONE_SLIMIT 1 << 1 #define CLONE_SLIMIT 1 << 1

View File

@ -550,6 +550,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pJoin->pJLimit = nodesCloneNode(pJoinTable->pJLimit); pJoin->pJLimit = nodesCloneNode(pJoinTable->pJLimit);
pJoin->winPrimEqCond = nodesCloneNode(pJoinTable->winPrimCond); pJoin->winPrimEqCond = nodesCloneNode(pJoinTable->winPrimCond);
pJoin->node.pChildren = nodesMakeList(); pJoin->node.pChildren = nodesMakeList();
pJoin->seqWinGroup = (JOIN_STYPE_WIN == pJoinTable->subType) && pSelect->hasAggFuncs;
if (NULL == pJoin->node.pChildren) { if (NULL == pJoin->node.pChildren) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
} }
@ -713,6 +714,10 @@ static EGroupAction getDistinctGroupAction(SLogicPlanContext* pCxt, SSelectStmt*
: GROUP_ACTION_NONE; : GROUP_ACTION_NONE;
} }
static bool isWindowJoinStmt(SSelectStmt * pSelect) {
return (QUERY_NODE_JOIN_TABLE == nodeType(pSelect->pFromTable) && IS_WINDOW_JOIN(((SJoinTableNode*)pSelect->pFromTable)->subType));
}
static EGroupAction getGroupAction(SLogicPlanContext* pCxt, SSelectStmt* pSelect) { static EGroupAction getGroupAction(SLogicPlanContext* pCxt, SSelectStmt* pSelect) {
return ((pCxt->pPlanCxt->streamQuery || NULL != pSelect->pLimit || NULL != pSelect->pSlimit) && !pSelect->isDistinct) ? GROUP_ACTION_KEEP return ((pCxt->pPlanCxt->streamQuery || NULL != pSelect->pLimit || NULL != pSelect->pSlimit) && !pSelect->isDistinct) ? GROUP_ACTION_KEEP
: GROUP_ACTION_NONE; : GROUP_ACTION_NONE;
@ -723,6 +728,49 @@ static EDataOrderLevel getRequireDataOrder(bool needTimeline, SSelectStmt* pSele
: DATA_ORDER_LEVEL_NONE; : DATA_ORDER_LEVEL_NONE;
} }
static int32_t addWinJoinPrimKeyToAggFuncs(SSelectStmt* pSelect, SNodeList** pList) {
SNodeList* pTargets = (NULL == *pList) ? nodesMakeList() : *pList;
SJoinTableNode* pJoinTable = (SJoinTableNode*)pSelect->pFromTable;
SRealTableNode* pProbeTable = NULL;
switch (pJoinTable->joinType) {
case JOIN_TYPE_LEFT:
pProbeTable = (SRealTableNode*)pJoinTable->pLeft;
break;
case JOIN_TYPE_RIGHT:
pProbeTable = (SRealTableNode*)pJoinTable->pRight;
break;
default:
return TSDB_CODE_PLAN_INTERNAL_ERROR;
}
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SSchema* pColSchema = &pProbeTable->pMeta->schema[0];
strcpy(pCol->dbName, pProbeTable->table.dbName);
strcpy(pCol->tableAlias, pProbeTable->table.tableAlias);
strcpy(pCol->tableName, pProbeTable->table.tableName);
strcpy(pCol->colName, pColSchema->name);
strcpy(pCol->node.aliasName, pColSchema->name);
strcpy(pCol->node.userAlias, pColSchema->name);
pCol->tableId = pProbeTable->pMeta->uid;
pCol->tableType = pProbeTable->pMeta->tableType;
pCol->colId = pColSchema->colId;
pCol->colType = COLUMN_TYPE_COLUMN;
pCol->hasIndex = (pColSchema != NULL && IS_IDX_ON(pColSchema));
pCol->node.resType.type = pColSchema->type;
pCol->node.resType.bytes = pColSchema->bytes;
pCol->node.resType.precision = pProbeTable->pMeta->tableInfo.precision;
SNode* pFunc = (SNode*)createGroupKeyAggFunc(pCol);
nodesListAppend(pTargets, pFunc);
return TSDB_CODE_SUCCESS;
}
static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) { static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect, SLogicNode** pLogicNode) {
if (!pSelect->hasAggFuncs && NULL == pSelect->pGroupByList) { if (!pSelect->hasAggFuncs && NULL == pSelect->pGroupByList) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -733,14 +781,16 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
bool winJoin = isWindowJoinStmt(pSelect);
pAgg->hasLastRow = pSelect->hasLastRowFunc; pAgg->hasLastRow = pSelect->hasLastRowFunc;
pAgg->hasLast = pSelect->hasLastFunc; pAgg->hasLast = pSelect->hasLastFunc;
pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc; pAgg->hasTimeLineFunc = pSelect->hasTimeLineFunc;
pAgg->hasGroupKeyOptimized = false; pAgg->hasGroupKeyOptimized = false;
pAgg->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc; pAgg->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc;
pAgg->node.groupAction = getGroupAction(pCxt, pSelect); pAgg->node.groupAction = winJoin ? GROUP_ACTION_NONE : getGroupAction(pCxt, pSelect);
pAgg->node.requireDataOrder = getRequireDataOrder(pAgg->hasTimeLineFunc, pSelect); pAgg->node.requireDataOrder = getRequireDataOrder(pAgg->hasTimeLineFunc, pSelect);
pAgg->node.resultDataOrder = pAgg->onlyHasKeepOrderFunc ? pAgg->node.requireDataOrder : DATA_ORDER_LEVEL_NONE; pAgg->node.resultDataOrder = pAgg->onlyHasKeepOrderFunc ? pAgg->node.requireDataOrder : DATA_ORDER_LEVEL_NONE;
pAgg->node.forceCreateNonBlockingOptr = winJoin ? true : false;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;

View File

@ -898,6 +898,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
pJoin->pWindowOffset = nodesCloneNode(pJoinLogicNode->pWindowOffset); pJoin->pWindowOffset = nodesCloneNode(pJoinLogicNode->pWindowOffset);
pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit); pJoin->pJLimit = nodesCloneNode(pJoinLogicNode->pJLimit);
pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder; pJoin->node.inputTsOrder = pJoinLogicNode->node.inputTsOrder;
pJoin->seqWinGroup = pJoinLogicNode->seqWinGroup;
SDataBlockDescNode* pLeftDesc = NULL; SDataBlockDescNode* pLeftDesc = NULL;
SDataBlockDescNode* pRightDesc = NULL; SDataBlockDescNode* pRightDesc = NULL;

View File

@ -1008,27 +1008,6 @@ static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitI
return code; return code;
} }
static SFunctionNode* createGroupKeyAggFunc(SColumnNode* pGroupCol) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (pFunc) {
strcpy(pFunc->functionName, "_group_key");
strcpy(pFunc->node.aliasName, pGroupCol->node.aliasName);
strcpy(pFunc->node.userAlias, pGroupCol->node.userAlias);
int32_t code = nodesListMakeStrictAppend(&pFunc->pParameterList, nodesCloneNode((SNode*)pGroupCol));
if (code == TSDB_CODE_SUCCESS) {
code = fmGetFuncInfo(pFunc, NULL, 0);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pFunc);
pFunc = NULL;
}
char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0};
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", pFunc->functionName, pFunc);
taosCreateMD5Hash(name, len);
strncpy(pFunc->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
}
return pFunc;
}
/** /**
* @brief For pipelined agg node, add a SortMergeNode to merge result from vnodes. * @brief For pipelined agg node, add a SortMergeNode to merge result from vnodes.

View File

@ -542,3 +542,27 @@ bool keysHasCol(SNodeList* pKeys) {
nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol); nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol);
return hasCol; return hasCol;
} }
SFunctionNode* createGroupKeyAggFunc(SColumnNode* pGroupCol) {
SFunctionNode* pFunc = (SFunctionNode*)nodesMakeNode(QUERY_NODE_FUNCTION);
if (pFunc) {
strcpy(pFunc->functionName, "_group_key");
strcpy(pFunc->node.aliasName, pGroupCol->node.aliasName);
strcpy(pFunc->node.userAlias, pGroupCol->node.userAlias);
int32_t code = nodesListMakeStrictAppend(&pFunc->pParameterList, nodesCloneNode((SNode*)pGroupCol));
if (code == TSDB_CODE_SUCCESS) {
code = fmGetFuncInfo(pFunc, NULL, 0);
}
if (TSDB_CODE_SUCCESS != code) {
nodesDestroyNode((SNode*)pFunc);
pFunc = NULL;
}
char name[TSDB_FUNC_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_POINTER_PRINT_BYTES + 1] = {0};
int32_t len = snprintf(name, sizeof(name) - 1, "%s.%p", pFunc->functionName, pFunc);
taosCreateMD5Hash(name, len);
strncpy(pFunc->node.aliasName, name, TSDB_COL_NAME_LEN - 1);
}
return pFunc;
}

View File

@ -621,6 +621,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream quer
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VIEW_QUERY, "Invalid view query type")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COL_QUERY_MISMATCH, "Columns number mismatch with query result")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VIEW_CONFLICT_WITH_TABLE, "View name is conflict with table")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_WJOIN_HAVING_EXPR, "Invalid window join having expr")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_GRP_WINDOW_NOT_ALLOWED, "GROUP BY/PARTITION BY/WINDOW-clause can't be used in WINDOW join")
//planner //planner
TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error") TAOS_DEFINE_ERROR(TSDB_CODE_PLAN_INTERNAL_ERROR, "Planner internal error")

View File

@ -175,3 +175,220 @@ sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(1a, -1h)
if $rows != 9 then if $rows != 9 then
return -1 return -1
endi endi
sql select count(*) from sta a left window join sta b window_offset(-1s, 1s);
if $rows != 8 then
return -1
endi
if $data00 != 3 then
return -1
endi
if $data10 != 3 then
return -1
endi
sql select a.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s);
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != 3 then
return -1
endi
if $data20 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data21 != 4 then
return -1
endi
sql select a.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4;
if $rows != 7 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != 2 then
return -1
endi
if $data20 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data21 != 3 then
return -1
endi
sql select a.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 having(count(*) != 2);
if $rows != 3 then
return -1
endi
if $data00 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data01 != 3 then
return -1
endi
if $data10 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data11 != 3 then
return -1
endi
if $data20 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data21 != 1 then
return -1
endi
sql select a.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 having(count(*) != 2) order by count(*);
if $rows != 3 then
return -1
endi
if $data00 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data11 != 3 then
return -1
endi
if $data21 != 3 then
return -1
endi
sql select a.ts from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 order by count(*);
if $rows != 7 then
return -1
endi
sql select a.ts from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 order by count(*), a.ts;
if $rows != 7 then
return -1
endi
if $data00 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:02.000@ then
return -1
endi
sql select a.ts, count(*),last(b.ts) from sta a left window join sta b window_offset(-1s, 1s);
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data02 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data12 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data22 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data32 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data42 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data52 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data62 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data70 != @23-11-17 16:29:05.000@ then
return -1
endi
if $data72 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select a.ts, count(*),last(b.ts) from sta a left window join sta b window_offset(-1s, 1s) limit 1;
if $rows != 1 then
return -1
endi
sql select timetruncate(a.ts, 1m), count(*) from sta a left window join sta b window_offset(-1s, 1s);
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != 3 then
return -1
endi
if $data20 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data21 != 4 then
return -1
endi
sql select a.ts+1s, count(*) from sta a left window join sta b window_offset(-1s, 1s);
if $rows != 8 then
return -1
endi
if $data00 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data01 != 3 then
return -1
endi
if $data20 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data21 != 4 then
return -1
endi
sql_error select a.col1, count(*) from sta a left window join sta b window_offset(-1s, 1s);
sql_error select b.ts, count(*) from sta a left window join sta b window_offset(-1s, 1s);
sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(b.ts > 0);
sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(a.col1 > 0);
sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(a.col1 > 0);
sql_error select a.ts, b.ts from sta a left window join sta b window_offset(-1s, 1s) having(a.ts > "2023-11-17 16:29:00.000");
sql_error select a.ts from sta a left window join sta b window_offset(-1s, 1s) where b.col1 between 2 and 4 having(a.ts > 0) order by count(*);
sql_error select a.ts, count(*),last(b.ts) from sta a left window join sta b window_offset(-1s, 1s) slimit 1;