diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 6c19a72840..dc133b465f 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -130,6 +130,7 @@ typedef struct SJoinLogicNode { SNode* pWindowOffset; SNode* pJLimit; EJoinAlgorithm joinAlgo; + SNode* winPrimEqCond; SNode* pPrimKeyEqCond; SNode* pColEqCond; SNode* pColOnCond; diff --git a/include/libs/nodes/querynodes.h b/include/libs/nodes/querynodes.h index b471761621..8214fcb10f 100644 --- a/include/libs/nodes/querynodes.h +++ b/include/libs/nodes/querynodes.h @@ -208,6 +208,7 @@ typedef struct SViewNode { #define IS_INNER_NONE_JOIN(_type, _stype) ((_type) == JOIN_TYPE_INNER && (_stype) == JOIN_STYPE_NONE) #define IS_WINDOW_JOIN(_stype) ((_stype) == JOIN_STYPE_WIN) +#define IS_ASOF_JOIN(_stype) ((_stype) == JOIN_STYPE_ASOF) typedef enum EJoinType { JOIN_TYPE_INNER = 0, @@ -243,6 +244,7 @@ typedef struct SJoinTableNode { EJoinSubType subType; SNode* pWindowOffset; SNode* pJLimit; + SNode* winPrimCond; bool hasSubQuery; bool isLowLevelJoin; SNode* pParent; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 8b77955215..1321567c1d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -636,6 +636,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) { if (pDest->info.rows + numOfRows > pDest->info.capacity) { + ASSERT(0); return TSDB_CODE_FAILED; } diff --git a/source/libs/executor/inc/mergejoin.h b/source/libs/executor/inc/mergejoin.h index da0bf51bde..c3c2a39320 100755 --- a/source/libs/executor/inc/mergejoin.h +++ b/source/libs/executor/inc/mergejoin.h @@ -140,6 +140,13 @@ typedef struct SMJoinTableCtx { SMJoinNMatchCtx nMatchCtx; } SMJoinTableCtx; +typedef struct SMJoinMatchInfo { + int32_t rowBitmapOffset; + int32_t rowMatchNum; + bool allRowsNMatch; + bool allRowsMatch; +} SMJoinMatchInfo; + typedef struct SMJoinGrpRows { SSDataBlock* blk; int32_t beginIdx; @@ -150,6 +157,7 @@ typedef struct SMJoinGrpRows { bool allRowsNMatch; bool allRowsMatch; bool readMatch; + bool clonedBlk; } SMJoinGrpRows; #define MJOIN_COMMON_CTX \ @@ -223,6 +231,7 @@ typedef struct SMJoinWindowCtx { int32_t asofOpType; int64_t winBeginOffset; int64_t winEndOffset; + bool winProjection; bool lowerRowsAcq; bool eqRowsAcq; bool greaterRowsAcq; @@ -407,7 +416,7 @@ int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pT int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable); int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable); int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp); -bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build); +bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build); int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond); int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx); int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp); @@ -420,6 +429,7 @@ int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnI int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs); int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo); int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo); +int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp); #ifdef __cplusplus diff --git a/source/libs/executor/src/mergejoin.c b/source/libs/executor/src/mergejoin.c index 4fa901f579..b6fc8c344d 100755 --- a/source/libs/executor/src/mergejoin.c +++ b/source/libs/executor/src/mergejoin.c @@ -27,6 +27,76 @@ #include "ttypes.h" #include "mergejoin.h" + + +int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) { + int64_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; + SMJoinWinCache* cache = &pCtx->cache; + int32_t buildGrpNum = taosArrayGetSize(cache->grps); + int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit); + if (buildGrpNum <= 0 || buildTotalRows <= 0) { + return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true); + } + + SMJoinGrpRows* probeGrp = &pCtx->probeGrp; + int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); + int32_t probeEndIdx = probeGrp->endIdx; + + if (0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) { + SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0); + if (pFirstBuild->readIdx == pFirstBuild->beginIdx) { + for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) { + SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx); + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + buildGrp->readIdx = buildGrp->beginIdx; + } + + cache->grpIdx = 0; + pCtx->grpRemains = false; + return TSDB_CODE_SUCCESS; + } + } + + for (; !GRP_DONE(probeGrp); ) { + probeGrp->endIdx = probeGrp->readIdx; + for (; cache->grpIdx < buildGrpNum && rowsLeft > 0; ++cache->grpIdx) { + SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx); + + if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { + MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); + rowsLeft -= GRP_REMAIN_ROWS(buildGrp); + buildGrp->readIdx = buildGrp->beginIdx; + continue; + } + + int32_t buildEndIdx = buildGrp->endIdx; + buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; + mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp); + buildGrp->readIdx += rowsLeft; + buildGrp->endIdx = buildEndIdx; + rowsLeft = 0; + break; + } + probeGrp->endIdx = probeEndIdx; + + if (cache->grpIdx >= buildGrpNum) { + cache->grpIdx = 0; + ++probeGrp->readIdx; + } + + if (rowsLeft <= 0) { + break; + } + } + + probeGrp->endIdx = probeEndIdx; + + pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; + + return TSDB_CODE_SUCCESS; +} + + static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) { SMJoinTableCtx* probe = pCtx->pJoin->probe; SMJoinTableCtx* build = pCtx->pJoin->build; @@ -1714,41 +1784,6 @@ _return: } -int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) { - SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); - - if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) { - return TSDB_CODE_SUCCESS; - } - - pGrp->beginIdx = pTable->blkRowIdx; - pGrp->readIdx = pTable->blkRowIdx; - - pTable->blkRowIdx++; - char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); - if (timestamp != *(int64_t*)pEndVal) { - for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { - char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx); - if (timestamp == *(int64_t*)pNextVal) { - continue; - } - - pGrp->endIdx = pTable->blkRowIdx - 1; - return TSDB_CODE_SUCCESS; - } - } - - pGrp->endIdx = pTable->blk->info.rows - 1; - pTable->blkRowIdx = pTable->blk->info.rows; - - if (wholeBlk) { - *wholeBlk = true; - } - - return TSDB_CODE_SUCCESS; -} - - int32_t mAsofLowerCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) { if (pCache->outBlk->info.rows <= 0) { *evictRows = 0; @@ -1821,7 +1856,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow eqRowsNum += grp.endIdx - grp.beginIdx + 1; - if (pTable->blkRowIdx == pTable->blk->info.rows) { + if (pTable->blkRowIdx == pTable->blk->info.rows && !pTable->dsFetchDone) { pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx); qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); @@ -1948,7 +1983,7 @@ int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool } bool wholeBlk = false; - MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &wholeBlk, &pCtx->probeGrp)); + MJ_ERR_RET(mJoinBuildEqGrp(pJoin->probe, timestamp, &wholeBlk, &pCtx->probeGrp)); MJ_ERR_RET(mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp, wholeBlk)); @@ -2129,81 +2164,25 @@ _return: return pCtx->finBlk; } - -int32_t mAsofGreaterDumpGrpCache(SMJoinWindowCtx* pCtx) { - int64_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows; - SMJoinWinCache* cache = &pCtx->cache; - int32_t buildGrpNum = taosArrayGetSize(cache->grps); - int64_t buildTotalRows = (cache->rowNum > pCtx->jLimit) ? pCtx->jLimit : cache->rowNum; - if (buildGrpNum <= 0 || buildTotalRows <= 0) { - return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true); - } - - SMJoinGrpRows* probeGrp = &pCtx->probeGrp; - int32_t probeRows = GRP_REMAIN_ROWS(probeGrp); - int32_t probeEndIdx = probeGrp->endIdx; - - if (0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) { - SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0); - if (pFirstBuild->readIdx == pFirstBuild->beginIdx) { - for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) { - SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx); - MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); - buildGrp->readIdx = buildGrp->beginIdx; - } - - cache->grpIdx = 0; - pCtx->grpRemains = false; - return TSDB_CODE_SUCCESS; - } +int32_t mAsofGreaterTrimCacheBlk(SMJoinWindowCtx* pCtx) { + SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0); + if (pGrp->blk == pCtx->cache.outBlk && pCtx->pJoin->build->blkRowIdx > 0) { + MJ_ERR_RET(blockDataTrimFirstRows(pGrp->blk, pCtx->pJoin->build->blkRowIdx)); + pCtx->pJoin->build->blkRowIdx = 0; + ASSERT(pCtx->pJoin->build->blk == pGrp->blk); + MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build); } - for (; !GRP_DONE(probeGrp); ) { - probeGrp->endIdx = probeGrp->readIdx; - for (; cache->grpIdx < buildGrpNum && rowsLeft > 0; ++cache->grpIdx) { - SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx); - - if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) { - MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp)); - rowsLeft -= GRP_REMAIN_ROWS(buildGrp); - buildGrp->readIdx = buildGrp->beginIdx; - continue; - } - - int32_t buildEndIdx = buildGrp->endIdx; - buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1; - mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp); - buildGrp->readIdx += rowsLeft; - buildGrp->endIdx = buildEndIdx; - rowsLeft = 0; - break; - } - probeGrp->endIdx = probeEndIdx; - - if (cache->grpIdx >= buildGrpNum) { - cache->grpIdx = 0; - ++probeGrp->readIdx; - } - - if (rowsLeft <= 0) { - break; - } - } - - probeGrp->endIdx = probeEndIdx; - - pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx; - - return TSDB_CODE_SUCCESS; + return TSDB_CODE_SUCCESS; } - - int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) { if (pCtx->cache.rowNum >= pCtx->jLimit || pCtx->pJoin->build->dsFetchDone) { return TSDB_CODE_SUCCESS; } + MJ_ERR_RET(mAsofGreaterTrimCacheBlk(pCtx)); + SMJoinTableCtx* build = pCtx->pJoin->build; SMJoinWinCache* pCache = &pCtx->cache; int32_t grpNum = taosArrayGetSize(pCache->grps); @@ -2289,7 +2268,7 @@ int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) { mAsofGreaterUpdateBuildGrpEndIdx(pCtx); - return mAsofGreaterDumpGrpCache(pCtx); + return mWinJoinDumpGrpCache(pCtx); } int32_t mAsofGreaterSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) { @@ -2342,6 +2321,10 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) { ASSERT(pCtx->cache.rowNum == 0); ASSERT(taosArrayGetSize(pCtx->cache.grps) == 0); + + if (pTable->dsFetchDone) { + return TSDB_CODE_SUCCESS; + } pTable->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pTable->downStreamIdx); qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); @@ -2374,7 +2357,7 @@ int32_t mAsofGreaterProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bo pCtx->lastEqGrp = true; - MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp)); + MJ_ERR_RET(mJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp)); return mAsofGreaterUpdateDumpEqRows(pCtx, timestamp, lastBuildGrp); } @@ -2483,7 +2466,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) { blockDataCleanup(pCtx->finBlk); if (pCtx->grpRemains) { - MJ_ERR_JRET(mAsofGreaterDumpGrpCache(pCtx)); + MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx)); if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } @@ -2560,12 +2543,40 @@ _return: return pCtx->finBlk; } +static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) { + SMJoinWinCache* pCache = &pCtx->cache; + int32_t grpNum = taosArrayGetSize(pCache->grps); + if (grpNum <= 0) { + return TSDB_CODE_SUCCESS; + } + + SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGetLast(pCache->grps); + if (!pGrp->clonedBlk) { + if (0 == pGrp->beginIdx) { + pGrp->blk = createOneDataBlock(pGrp->blk, true); + } else { + pGrp->blk = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx); + pGrp->endIdx -= pGrp->beginIdx; + pGrp->beginIdx = 0; + pGrp->readIdx = 0; + } + + pGrp->clonedBlk = true; + } + + return TSDB_CODE_SUCCESS; +} + static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) { bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe); bool buildGot = false; do { if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) { + if (NULL == pJoin->build->blk) { + mWinJoinCloneCacheBlk(pCtx); + } + buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build); } @@ -2586,6 +2597,8 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin break; } while (true); + pCtx->probeGrp.blk = pJoin->probe->blk; + return true; } @@ -2606,7 +2619,9 @@ int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SM pGrp->endIdx = pGrp->beginIdx; build->blk = NULL; - return TSDB_CODE_SUCCESS; + pCache->rowNum = 1; + } else { + pCache->rowNum = 0; } *winEnd = true; @@ -2614,6 +2629,8 @@ int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SM } } + pCache->rowNum = 0; + *winEnd = false; return TSDB_CODE_SUCCESS; } @@ -2622,35 +2639,51 @@ int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SM int32_t mWinJoinAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) { SSDataBlock* pBlk = build->blk; SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCol->srcSlot); + SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; + + if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) { + *winEnd = true; + return TSDB_CODE_SUCCESS; + } + if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) <= pCtx->winEndTs) { - SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); pGrp->readIdx = pGrp->beginIdx; pGrp->endIdx = pBlk->info.rows - 1; + + pCache->rowNum += (pGrp->endIdx - pGrp->beginIdx + 1); + if (pCache->rowNum >= pCtx->jLimit) { + pGrp->endIdx = pBlk->info.rows - 1 + pCtx->jLimit - pCache->rowNum; + pCache->rowNum = pCtx->jLimit; + + build->blk = NULL; + *winEnd = true; + return TSDB_CODE_SUCCESS; + } build->blk = NULL; *winEnd = false; return TSDB_CODE_SUCCESS; } - SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx}; - - for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) { + for (; build->blkRowIdx < pBlk->info.rows && pCache->rowNum < pCtx->jLimit; build->blkRowIdx++) { if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) { + pCache->rowNum++; continue; } - SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); - - pGrp->readIdx = pGrp->beginIdx; - pGrp->endIdx = build->blkRowIdx - 1; - - build->blk = NULL; - *winEnd = true; - return TSDB_CODE_SUCCESS; + break; } + SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp); + + pGrp->readIdx = pGrp->beginIdx; + pGrp->endIdx = build->blkRowIdx - 1; + + build->blk = NULL; + *winEnd = true; + return TSDB_CODE_SUCCESS; } @@ -2667,19 +2700,24 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { taosArrayPopFrontBatch(pCache->grps, 1); grpNum--; i--; + pCache->rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx); continue; } + int32_t startIdx = pGrp->beginIdx; for (; pGrp->beginIdx < pGrp->blk->info.rows; pGrp->beginIdx++) { if (*((int64_t*)pCol->pData + pGrp->beginIdx) < pCtx->winBeginTs) { continue; } if (*((int64_t*)pCol->pData + pGrp->beginIdx) <= pCtx->winEndTs) { + pGrp->readIdx = pGrp->beginIdx; + pCache->rowNum -= (pGrp->beginIdx - startIdx); return TSDB_CODE_SUCCESS; } pGrp->endIdx = pGrp->beginIdx; + pCache->rowNum = 0; TSWAP(pCache->grps, pCache->grpsQueue); return TSDB_CODE_SUCCESS; } @@ -2687,6 +2725,7 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { if (NULL != pCache->grpsQueue) { pCache->grps = pCache->grpsQueue; + pCache->rowNum = 1; pCache->grpsQueue = NULL; continue; } @@ -2703,6 +2742,10 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { } } + if (build->dsFetchDone) { + goto _return; + } + do { build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pCtx->pJoin->build->downStreamIdx); qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); @@ -2720,6 +2763,8 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) { } } while (true); +_return: + return TSDB_CODE_SUCCESS; } @@ -2734,16 +2779,30 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) { SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps); SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCol->srcSlot); if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) <= pCtx->winEndTs) { - pGrp->endIdx = pGrp->blk->info.rows - 1; + pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1; + if (pCache->rowNum >= pCtx->jLimit) { + pGrp->endIdx = pGrp->blk->info.rows - 1 + pCtx->jLimit - pCache->rowNum; + pCache->rowNum = pCtx->jLimit; + + return TSDB_CODE_SUCCESS; + } else { + pGrp->endIdx = pGrp->blk->info.rows - 1; + } } else { - for (; pGrp->endIdx < pGrp->blk->info.rows; pGrp->endIdx++) { + int32_t startIdx = pGrp->endIdx; + for (; pGrp->endIdx < pGrp->blk->info.rows && pCache->rowNum < pCtx->jLimit; pGrp->endIdx++) { if (*((int64_t*)pCol->pData + pGrp->endIdx) <= pCtx->winEndTs) { + pCache->rowNum++; continue; } + ASSERT(pGrp->endIdx > startIdx); + pGrp->endIdx--; - return TSDB_CODE_SUCCESS; + break; } + + return TSDB_CODE_SUCCESS; } SMJoinTableCtx* build = pCtx->pJoin->build; @@ -2755,7 +2814,13 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) { } } + if (build->dsFetchDone) { + goto _return; + } + do { + MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx)); + build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pCtx->pJoin->build->downStreamIdx); qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0); @@ -2772,6 +2837,8 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) { } } while (true); +_return: + return TSDB_CODE_SUCCESS; } @@ -2784,17 +2851,14 @@ int32_t mWinJoinMoveFillWinCache(SMJoinWindowCtx* pCtx) { } int32_t mWinJoinDumpWinCache(SMJoinWindowCtx* pCtx) { - return TSDB_CODE_SUCCESS; + return pCtx->winProjection ? mWinJoinDumpGrpCache(pCtx) : TSDB_CODE_SUCCESS; } - SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { SMJoinOperatorInfo* pJoin = pOperator->info; SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; int32_t code = TSDB_CODE_SUCCESS; int64_t probeTs = 0; - int64_t buildTs = 0; - SColumnInfoData* pBuildCol = NULL; SColumnInfoData* pProbeCol = NULL; blockDataCleanup(pCtx->finBlk); @@ -2815,7 +2879,9 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe); while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) { - MJ_ERR_JRET(mAsofJoinBuildEqGrp(pJoin->probe, probeTs, NULL, &pCtx->probeGrp)); + MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); + + MJ_ERR_JRET(mJoinBuildEqGrp(pJoin->probe, probeTs, NULL, &pCtx->probeGrp)); if (probeTs != pCtx->lastTs) { pCtx->lastTs = probeTs; @@ -2829,8 +2895,6 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) { if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) { return pCtx->finBlk; } - - MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe); } } while (true); @@ -2870,11 +2934,12 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx; pCtx->pJoin = pJoin; - pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; + pCtx->lastTs = INT64_MIN; switch (pJoinNode->subType) { case JOIN_STYPE_ASOF: pCtx->asofOpType = pJoinNode->asofOpType; + pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1; pCtx->eqRowsAcq = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType); pCtx->lowerRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType); pCtx->greaterRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType); @@ -2889,6 +2954,8 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p SWindowOffsetNode* pOffsetNode = (SWindowOffsetNode*)pJoinNode->pWindowOffset; SValueNode* pWinBegin = (SValueNode*)pOffsetNode->pStartOffset; SValueNode* pWinEnd = (SValueNode*)pOffsetNode->pEndOffset; + pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : INT64_MAX; + pCtx->winProjection = true; pCtx->winBeginOffset = pWinBegin->datum.i; pCtx->winEndOffset = pWinEnd->datum.i; pCtx->eqRowsAcq = (pCtx->winBeginOffset <= 0 && pCtx->winEndOffset >= 0); diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 25f3724b26..9aa305c64e 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -28,6 +28,38 @@ #include "mergejoin.h" +int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) { + SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot); + + pGrp->beginIdx = pTable->blkRowIdx; + pGrp->readIdx = pTable->blkRowIdx; + + pTable->blkRowIdx++; + char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1); + if (timestamp != *(int64_t*)pEndVal) { + for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) { + char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx); + if (timestamp == *(int64_t*)pNextVal) { + continue; + } + + pGrp->endIdx = pTable->blkRowIdx - 1; + return TSDB_CODE_SUCCESS; + } + } + + pGrp->endIdx = pTable->blk->info.rows - 1; + pTable->blkRowIdx = pTable->blk->info.rows; + + if (wholeBlk) { + *wholeBlk = true; + } + + return TSDB_CODE_SUCCESS; +} + + + void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) { // int32_t totalRows = pBlock->info.rows; int32_t bmLen = BitmapLen(totalRows); @@ -1013,7 +1045,7 @@ int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable, mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true); - while (wholeBlk) { + while (wholeBlk && !pTable->dsFetchDone) { pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx); qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0); @@ -1374,7 +1406,7 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) { pJoin->joinFp = mAntiJoinDo; break; case JOIN_STYPE_WIN: - //pJoin->joinFp = mWinJoinDo; + pJoin->joinFp = mWinJoinDo; break; default: break; diff --git a/source/libs/executor/test/joinTests.cpp b/source/libs/executor/test/joinTests.cpp index 2623b98ae1..f8263fc12c 100755 --- a/source/libs/executor/test/joinTests.cpp +++ b/source/libs/executor/test/joinTests.cpp @@ -3092,7 +3092,7 @@ TEST(leftAntiJoin, fullCondTest) { #endif #if 1 -#if 1 +#if 0 TEST(leftAsofJoin, noCondGreaterThanTest) { SJoinTestParam param; char* caseName = "leftAsofJoin:noCondGreaterThanTest"; @@ -3120,7 +3120,7 @@ TEST(leftAsofJoin, noCondGreaterThanTest) { } #endif -#if 1 +#if 0 TEST(leftAsofJoin, noCondGreaterEqTest) { SJoinTestParam param; char* caseName = "leftAsofJoin:noCondGreaterEqTest"; @@ -3148,7 +3148,7 @@ TEST(leftAsofJoin, noCondGreaterEqTest) { } #endif -#if 1 +#if 0 TEST(leftAsofJoin, noCondEqTest) { SJoinTestParam param; char* caseName = "leftAsofJoin:noCondEqTest"; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 888fbfb7ad..6a772c2718 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -468,6 +468,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) { COPY_SCALAR_FIELD(joinAlgo); CLONE_NODE_FIELD(pWindowOffset); CLONE_NODE_FIELD(pJLimit); + CLONE_NODE_FIELD(winPrimEqCond); CLONE_NODE_FIELD(pPrimKeyEqCond); CLONE_NODE_FIELD(pColEqCond); CLONE_NODE_FIELD(pColOnCond); diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 0f6783eab4..a4e8e74b34 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -1850,6 +1850,41 @@ static int32_t msgToCaseWhenNode(STlvDecoder* pDecoder, void* pObj) { return code; } +enum { WINDOW_OFFSET_CODE_START_OFFSET = 1, WINDOW_OFFSET_CODE_END_OFFSET }; + +static int32_t windowOffsetNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { + const SWindowOffsetNode* pNode = (const SWindowOffsetNode*)pObj; + + int32_t code = tlvEncodeObj(pEncoder, WINDOW_OFFSET_CODE_START_OFFSET, nodeToMsg, pNode->pStartOffset); + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeObj(pEncoder, WINDOW_OFFSET_CODE_END_OFFSET, nodeToMsg, pNode->pEndOffset); + } + + return code; +} + +static int32_t msgToWindowOffsetNode(STlvDecoder* pDecoder, void* pObj) { + SWindowOffsetNode* pNode = (SWindowOffsetNode*)pObj; + + int32_t code = TSDB_CODE_SUCCESS; + STlv* pTlv = NULL; + tlvForEach(pDecoder, pTlv, code) { + switch (pTlv->type) { + case WINDOW_OFFSET_CODE_START_OFFSET: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStartOffset); + break; + case WINDOW_OFFSET_CODE_END_OFFSET: + code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEndOffset); + break; + default: + break; + } + } + + return code; +} + + enum { PHY_NODE_CODE_OUTPUT_DESC = 1, PHY_NODE_CODE_CONDITIONS, @@ -4214,6 +4249,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { case QUERY_NODE_CASE_WHEN: code = caseWhenNodeToMsg(pObj, pEncoder); break; + case QUERY_NODE_WINDOW_OFFSET: + code = windowOffsetNodeToMsg(pObj, pEncoder); + break; case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: code = physiTagScanNodeToMsg(pObj, pEncoder); break; @@ -4368,6 +4406,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) { case QUERY_NODE_CASE_WHEN: code = msgToCaseWhenNode(pDecoder, pObj); break; + case QUERY_NODE_WINDOW_OFFSET: + code = msgToWindowOffsetNode(pDecoder, pObj); + break; case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: code = msgToPhysiTagScanNode(pDecoder, pObj); break; diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index d5222f9c23..007ebbeb5b 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -801,6 +801,9 @@ void nodesDestroyNode(SNode* pNode) { break; case QUERY_NODE_JOIN_TABLE: { SJoinTableNode* pJoin = (SJoinTableNode*)pNode; + nodesDestroyNode(pJoin->pWindowOffset); + nodesDestroyNode(pJoin->pJLimit); + nodesDestroyNode(pJoin->winPrimCond); nodesDestroyNode(pJoin->pLeft); nodesDestroyNode(pJoin->pRight); nodesDestroyNode(pJoin->pOnCond); @@ -1260,6 +1263,7 @@ void nodesDestroyNode(SNode* pNode) { destroyLogicNode((SLogicNode*)pLogicNode); nodesDestroyNode(pLogicNode->pWindowOffset); nodesDestroyNode(pLogicNode->pJLimit); + nodesDestroyNode(pLogicNode->winPrimEqCond); nodesDestroyNode(pLogicNode->pPrimKeyEqCond); nodesDestroyNode(pLogicNode->pColEqCond); nodesDestroyNode(pLogicNode->pColOnCond); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 312bc60ea9..06f827c985 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3158,6 +3158,55 @@ static int32_t replaceTbName(STranslateContext* pCxt, SSelectStmt* pSelect) { return pRewriteCxt.errCode; } +static int32_t addPrimEqCond(SNode** pCond, SRealTableNode* leftTable, SRealTableNode* rightTable) { + struct STableMeta* pLMeta = leftTable->pMeta; + struct STableMeta* pRMeta = rightTable->pMeta; + + *pCond = nodesMakeNode(QUERY_NODE_OPERATOR); + if (NULL == *pCond) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SOperatorNode* pOp = (SOperatorNode*)*pCond; + pOp->node.resType.type = TSDB_DATA_TYPE_BOOL; + pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes; + pOp->opType = OP_TYPE_EQUAL; + + SColumnNode* pLeft = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pLeft) { + nodesDestroyNode(*pCond); + return TSDB_CODE_OUT_OF_MEMORY; + } + pLeft->node.resType.type = pLMeta->schema[0].type; + pLeft->node.resType.bytes = pLMeta->schema[0].bytes; + pLeft->tableId = pLMeta->uid; + pLeft->colId = pLMeta->schema[0].colId; + pLeft->colType = COLUMN_TYPE_COLUMN; + strcpy(pLeft->tableName, leftTable->table.tableName); + strcpy(pLeft->tableAlias, leftTable->table.tableAlias); + strcpy(pLeft->colName, pLMeta->schema[0].name); + + pOp->pLeft = (SNode*)pLeft; + + SColumnNode* pRight = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); + if (NULL == pRight) { + nodesDestroyNode(*pCond); + return TSDB_CODE_OUT_OF_MEMORY; + } + pRight->node.resType.type = pRMeta->schema[0].type; + pRight->node.resType.bytes = pRMeta->schema[0].bytes; + pRight->tableId = pRMeta->uid; + pRight->colId = pRMeta->schema[0].colId; + pRight->colType = COLUMN_TYPE_COLUMN; + strcpy(pRight->tableName, rightTable->table.tableName); + strcpy(pRight->tableAlias, rightTable->table.tableAlias); + strcpy(pRight->colName, pRMeta->schema[0].name); + + pOp->pRight = (SNode*)pRight; + + return TSDB_CODE_SUCCESS; +} + static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTable) { if ((QUERY_NODE_TEMP_TABLE == nodeType(pJoinTable->pLeft) && !isGlobalTimeLineQuery(((STempTableNode*)pJoinTable->pLeft)->pSubquery)) || @@ -3166,6 +3215,28 @@ static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTabl return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN, "Join requires valid time series input"); } + + if (JOIN_STYPE_WIN == pJoinTable->subType) { + if (QUERY_NODE_REAL_TABLE != nodeType(pJoinTable->pLeft) || QUERY_NODE_REAL_TABLE != nodeType(pJoinTable->pRight)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN, + "Only support WINDOW join between tables"); + } + + SRealTableNode* pLeft = (SRealTableNode*)pJoinTable->pLeft; + if (TSDB_SUPER_TABLE != pLeft->pMeta->tableType && TSDB_CHILD_TABLE != pLeft->pMeta->tableType && TSDB_NORMAL_TABLE != pLeft->pMeta->tableType) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN, + "Unsupported WINDOW join table type"); + } + + SRealTableNode* pRight = (SRealTableNode*)pJoinTable->pRight; + if (TSDB_SUPER_TABLE != pRight->pMeta->tableType && TSDB_CHILD_TABLE != pRight->pMeta->tableType && TSDB_NORMAL_TABLE != pRight->pMeta->tableType) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN, + "Unsupported WINDOW join table type"); + } + + return addPrimEqCond(&pJoinTable->winPrimCond, pLeft, pRight); + } + return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 3f36be2e0a..125ddb3c1c 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -277,21 +277,24 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols return SCAN_TYPE_TABLE; } -static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema) { +static SNode* createFirstCol(SRealTableNode* pTable) { SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN); if (NULL == pCol) { return NULL; } + SSchema* pSchema = pTable->pMeta->schema; pCol->node.resType.type = pSchema->type; pCol->node.resType.bytes = pSchema->bytes; - pCol->tableId = tableId; + pCol->tableId = pTable->pMeta->uid; pCol->colId = pSchema->colId; pCol->colType = COLUMN_TYPE_COLUMN; + strcpy(pCol->tableAlias, pTable->table.tableAlias); + strcpy(pCol->tableName, pTable->table.tableName); strcpy(pCol->colName, pSchema->name); return (SNode*)pCol; } -static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) { +static int32_t addPrimaryKeyCol(SRealTableNode* pTable, SNodeList** pCols) { bool found = false; SNode* pCol = NULL; FOREACH(pCol, *pCols) { @@ -302,23 +305,23 @@ static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeL } if (!found) { - return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema)); + return nodesListMakeStrictAppend(pCols, createFirstCol(pTable)); } return TSDB_CODE_SUCCESS; } -static int32_t addSystableFirstCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) { +static int32_t addSystableFirstCol(SRealTableNode* pTable, SNodeList** pCols) { if (LIST_LENGTH(*pCols) > 0) { return TSDB_CODE_SUCCESS; } - return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema)); + return nodesListMakeStrictAppend(pCols, createFirstCol(pTable)); } -static int32_t addDefaultScanCol(const STableMeta* pMeta, SNodeList** pCols) { - if (TSDB_SYSTEM_TABLE == pMeta->tableType) { - return addSystableFirstCol(pMeta->uid, pMeta->schema, pCols); +static int32_t addDefaultScanCol(SRealTableNode* pTable, SNodeList** pCols) { + if (TSDB_SYSTEM_TABLE == pTable->pMeta->tableType) { + return addSystableFirstCol(pTable, pCols); } - return addPrimaryKeyCol(pMeta->uid, pMeta->schema, pCols); + return addPrimaryKeyCol(pTable, pCols); } static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealTable, bool hasRepeatScanFuncs, @@ -458,7 +461,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect } if (TSDB_CODE_SUCCESS == code && needScanDefaultCol(pScan->scanType)) { - code = addDefaultScanCol(pRealTable->pMeta, &pScan->pScanCols); + code = addDefaultScanCol(pRealTable, &pScan->pScanCols); } if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags && NULL == pSelect->pPartitionByList) { @@ -545,6 +548,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin; pJoin->pWindowOffset = nodesCloneNode(pJoinTable->pWindowOffset); pJoin->pJLimit = nodesCloneNode(pJoinTable->pJLimit); + pJoin->winPrimEqCond = nodesCloneNode(pJoinTable->winPrimCond); pJoin->node.pChildren = nodesMakeList(); if (NULL == pJoin->node.pChildren) { code = TSDB_CODE_OUT_OF_MEMORY; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index b268189865..de889f9948 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1196,7 +1196,13 @@ static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin if (errCond) { return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND); } + if (IS_WINDOW_JOIN(pJoin->subType)) { + return TSDB_CODE_SUCCESS; + } + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL); + } else if (IS_WINDOW_JOIN(pJoin->subType)) { + return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 74661f7e3e..d19aaa31de 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -914,6 +914,16 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi } } + if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->winPrimEqCond) { + SNode* pPrimKeyCond = NULL; + code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->winPrimEqCond, + &pPrimKeyCond); + if (TSDB_CODE_SUCCESS == code) { + code = setColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin); + } + nodesDestroyNode(pPrimKeyCond); + } + if (TSDB_CODE_SUCCESS == code) { code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets, &pJoin->pTargets); diff --git a/tests/script/tsim/join/join.sim b/tests/script/tsim/join/join.sim index 4eef250433..5a883f4612 100644 --- a/tests/script/tsim/join/join.sim +++ b/tests/script/tsim/join/join.sim @@ -67,6 +67,7 @@ run tsim/join/left_anti_join.sim run tsim/join/right_anti_join.sim run tsim/join/left_asof_join.sim run tsim/join/right_asof_join.sim +run tsim/join/left_win_join.sim print ================== restart server to commit data into disk system sh/exec.sh -n dnode1 -s stop -x SIGINT @@ -83,5 +84,6 @@ run tsim/join/left_anti_join.sim run tsim/join/right_anti_join.sim run tsim/join/left_asof_join.sim run tsim/join/right_asof_join.sim +run tsim/join/left_win_join.sim system sh/exec.sh -n dnode1 -s stop -x SIGINT diff --git a/tests/script/tsim/join/left_win_join.sim b/tests/script/tsim/join/left_win_join.sim new file mode 100644 index 0000000000..0ce5592d55 --- /dev/null +++ b/tests/script/tsim/join/left_win_join.sim @@ -0,0 +1,168 @@ +sql connect +sql use test0; + +sql_error select a.col1, b.col1 from sta a left window join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' window_offset(-1s, 1s) order by a.col1, b.col1; +sql_error select a.col1, b.col1 from sta a left window join sta b on a.ts = b.ts order by a.col1, b.col1; +sql_error select a.col1, b.col1 from sta a left window join sta b on a.ts = b.ts window_offset(-1s, 1s) order by a.col1, b.col1; +sql select a.col1, b.col1 from sta a left window join sta b window_offset(-1s, 1s) order by a.col1, b.col1; +if $rows != 28 then + return -1 +endi +sql select a.col1, b.col1 from sta a left window join sta b window_offset(-1s, 1s) jlimit 2 order by a.col1, b.col1; +if $rows != 16 then + return -1 +endi +sql select a.col1, b.col1 from sta a left window join sta b window_offset(1s, 1s) order by a.col1, b.col1; +if $rows != 9 then + return -1 +endi +sql select a.col1, b.col1 from sta a left window join sta b window_offset(-1s, 1s) where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1; +if $rows != 6 then + return -1 +endi +if $data00 != 1 then + return -1 +endi +if $data01 != 1 then + return -1 +endi +if $data10 != 1 then + return -1 +endi +if $data11 != 2 then + return -1 +endi +if $data20 != 2 then + return -1 +endi +if $data21 != 1 then + return -1 +endi +if $data30 != 2 then + return -1 +endi +if $data31 != 2 then + return -1 +endi +if $data40 != 3 then + return -1 +endi +if $data41 != 1 then + return -1 +endi +if $data50 != 3 then + return -1 +endi +if $data51 != 2 then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-1s, 1s) +if $rows != 7 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data40 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data41 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data50 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data51 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data60 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data61 != @23-11-17 16:29:05.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-1s, 1s) jlimit 1; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != @23-11-17 16:29:01.000@ then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != @23-11-17 16:29:03.000@ then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-1a, 1a) jlimit 1; +if $rows != 4 then + return -1 +endi +if $data00 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data01 != @23-11-17 16:29:00.000@ then + return -1 +endi +if $data10 != @23-11-17 16:29:02.000@ then + return -1 +endi +if $data11 != NULL then + return -1 +endi +if $data20 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data21 != @23-11-17 16:29:03.000@ then + return -1 +endi +if $data30 != @23-11-17 16:29:04.000@ then + return -1 +endi +if $data31 != NULL then + return -1 +endi + +sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-1h, 1h); +if $rows != 16 then + return -1 +endi