enh: support window join

This commit is contained in:
dapan1121 2024-02-01 14:58:08 +08:00
parent 50c420e6b9
commit ab87953362
16 changed files with 567 additions and 147 deletions

View File

@ -130,6 +130,7 @@ typedef struct SJoinLogicNode {
SNode* pWindowOffset;
SNode* pJLimit;
EJoinAlgorithm joinAlgo;
SNode* winPrimEqCond;
SNode* pPrimKeyEqCond;
SNode* pColEqCond;
SNode* pColOnCond;

View File

@ -208,6 +208,7 @@ typedef struct SViewNode {
#define IS_INNER_NONE_JOIN(_type, _stype) ((_type) == JOIN_TYPE_INNER && (_stype) == JOIN_STYPE_NONE)
#define IS_WINDOW_JOIN(_stype) ((_stype) == JOIN_STYPE_WIN)
#define IS_ASOF_JOIN(_stype) ((_stype) == JOIN_STYPE_ASOF)
typedef enum EJoinType {
JOIN_TYPE_INNER = 0,
@ -243,6 +244,7 @@ typedef struct SJoinTableNode {
EJoinSubType subType;
SNode* pWindowOffset;
SNode* pJLimit;
SNode* winPrimCond;
bool hasSubQuery;
bool isLowLevelJoin;
SNode* pParent;

View File

@ -636,6 +636,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) {
int32_t blockDataMergeNRows(SSDataBlock* pDest, const SSDataBlock* pSrc, int32_t srcIdx, int32_t numOfRows) {
if (pDest->info.rows + numOfRows > pDest->info.capacity) {
ASSERT(0);
return TSDB_CODE_FAILED;
}

View File

@ -140,6 +140,13 @@ typedef struct SMJoinTableCtx {
SMJoinNMatchCtx nMatchCtx;
} SMJoinTableCtx;
typedef struct SMJoinMatchInfo {
int32_t rowBitmapOffset;
int32_t rowMatchNum;
bool allRowsNMatch;
bool allRowsMatch;
} SMJoinMatchInfo;
typedef struct SMJoinGrpRows {
SSDataBlock* blk;
int32_t beginIdx;
@ -150,6 +157,7 @@ typedef struct SMJoinGrpRows {
bool allRowsNMatch;
bool allRowsMatch;
bool readMatch;
bool clonedBlk;
} SMJoinGrpRows;
#define MJOIN_COMMON_CTX \
@ -223,6 +231,7 @@ typedef struct SMJoinWindowCtx {
int32_t asofOpType;
int64_t winBeginOffset;
int64_t winEndOffset;
bool winProjection;
bool lowerRowsAcq;
bool eqRowsAcq;
bool greaterRowsAcq;
@ -407,7 +416,7 @@ int32_t mJoinCreateFullBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pT
int32_t mJoinCreateBuildTbHash(SMJoinOperatorInfo* pJoin, SMJoinTableCtx* pTable);
int32_t mJoinSetKeyColsData(SSDataBlock* pBlock, SMJoinTableCtx* pTable);
int32_t mJoinProcessEqualGrp(SMJoinMergeCtx* pCtx, int64_t timestamp, bool lastBuildGrp);
bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build);
bool mJoinHashGrpCart(SSDataBlock* pBlk, SMJoinGrpRows* probeGrp, bool append, SMJoinTableCtx* probe, SMJoinTableCtx* build);
int32_t mJoinMergeGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pFirst, SMJoinGrpRows* pSecond);
int32_t mJoinHandleMidRemains(SMJoinMergeCtx* pCtx);
int32_t mJoinNonEqGrpCart(SMJoinOperatorInfo* pJoin, SSDataBlock* pRes, bool append, SMJoinGrpRows* pGrp, bool probeGrp);
@ -420,6 +429,7 @@ int32_t mJoinProcessLowerGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnI
int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColumnInfoData* pCol, int64_t* probeTs, int64_t* buildTs);
int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo);
int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo);
int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp);
#ifdef __cplusplus

View File

@ -27,6 +27,76 @@
#include "ttypes.h"
#include "mergejoin.h"
int32_t mWinJoinDumpGrpCache(SMJoinWindowCtx* pCtx) {
int64_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinWinCache* cache = &pCtx->cache;
int32_t buildGrpNum = taosArrayGetSize(cache->grps);
int64_t buildTotalRows = TMIN(cache->rowNum, pCtx->jLimit);
if (buildGrpNum <= 0 || buildTotalRows <= 0) {
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true);
}
SMJoinGrpRows* probeGrp = &pCtx->probeGrp;
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
if (0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) {
SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0);
if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
buildGrp->readIdx = buildGrp->beginIdx;
}
cache->grpIdx = 0;
pCtx->grpRemains = false;
return TSDB_CODE_SUCCESS;
}
}
for (; !GRP_DONE(probeGrp); ) {
probeGrp->endIdx = probeGrp->readIdx;
for (; cache->grpIdx < buildGrpNum && rowsLeft > 0; ++cache->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
buildGrp->readIdx = buildGrp->beginIdx;
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
probeGrp->endIdx = probeEndIdx;
if (cache->grpIdx >= buildGrpNum) {
cache->grpIdx = 0;
++probeGrp->readIdx;
}
if (rowsLeft <= 0) {
break;
}
}
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
}
static int32_t mOuterJoinHashFullCart(SMJoinMergeCtx* pCtx) {
SMJoinTableCtx* probe = pCtx->pJoin->probe;
SMJoinTableCtx* build = pCtx->pJoin->build;
@ -1714,41 +1784,6 @@ _return:
}
int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
return TSDB_CODE_SUCCESS;
}
pGrp->beginIdx = pTable->blkRowIdx;
pGrp->readIdx = pTable->blkRowIdx;
pTable->blkRowIdx++;
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
if (timestamp != *(int64_t*)pEndVal) {
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) {
continue;
}
pGrp->endIdx = pTable->blkRowIdx - 1;
return TSDB_CODE_SUCCESS;
}
}
pGrp->endIdx = pTable->blk->info.rows - 1;
pTable->blkRowIdx = pTable->blk->info.rows;
if (wholeBlk) {
*wholeBlk = true;
}
return TSDB_CODE_SUCCESS;
}
int32_t mAsofLowerCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) {
if (pCache->outBlk->info.rows <= 0) {
*evictRows = 0;
@ -1821,7 +1856,7 @@ int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindow
eqRowsNum += grp.endIdx - grp.beginIdx + 1;
if (pTable->blkRowIdx == pTable->blk->info.rows) {
if (pTable->blkRowIdx == pTable->blk->info.rows && !pTable->dsFetchDone) {
pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx);
qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
@ -1948,7 +1983,7 @@ int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool
}
bool wholeBlk = false;
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, &wholeBlk, &pCtx->probeGrp));
MJ_ERR_RET(mJoinBuildEqGrp(pJoin->probe, timestamp, &wholeBlk, &pCtx->probeGrp));
MJ_ERR_RET(mAsofLowerDumpUpdateEqRows(pCtx, pJoin, lastBuildGrp, wholeBlk));
@ -2129,81 +2164,25 @@ _return:
return pCtx->finBlk;
}
int32_t mAsofGreaterDumpGrpCache(SMJoinWindowCtx* pCtx) {
int64_t rowsLeft = pCtx->finBlk->info.capacity - pCtx->finBlk->info.rows;
SMJoinWinCache* cache = &pCtx->cache;
int32_t buildGrpNum = taosArrayGetSize(cache->grps);
int64_t buildTotalRows = (cache->rowNum > pCtx->jLimit) ? pCtx->jLimit : cache->rowNum;
if (buildGrpNum <= 0 || buildTotalRows <= 0) {
return mJoinNonEqCart((SMJoinCommonCtx*)pCtx, &pCtx->probeGrp, true);
}
SMJoinGrpRows* probeGrp = &pCtx->probeGrp;
int32_t probeRows = GRP_REMAIN_ROWS(probeGrp);
int32_t probeEndIdx = probeGrp->endIdx;
if (0 == cache->grpIdx && probeRows * buildTotalRows <= rowsLeft) {
SMJoinGrpRows* pFirstBuild = taosArrayGet(cache->grps, 0);
if (pFirstBuild->readIdx == pFirstBuild->beginIdx) {
for (; cache->grpIdx < buildGrpNum; ++cache->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
buildGrp->readIdx = buildGrp->beginIdx;
}
cache->grpIdx = 0;
pCtx->grpRemains = false;
return TSDB_CODE_SUCCESS;
}
int32_t mAsofGreaterTrimCacheBlk(SMJoinWindowCtx* pCtx) {
SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0);
if (pGrp->blk == pCtx->cache.outBlk && pCtx->pJoin->build->blkRowIdx > 0) {
MJ_ERR_RET(blockDataTrimFirstRows(pGrp->blk, pCtx->pJoin->build->blkRowIdx));
pCtx->pJoin->build->blkRowIdx = 0;
ASSERT(pCtx->pJoin->build->blk == pGrp->blk);
MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
}
for (; !GRP_DONE(probeGrp); ) {
probeGrp->endIdx = probeGrp->readIdx;
for (; cache->grpIdx < buildGrpNum && rowsLeft > 0; ++cache->grpIdx) {
SMJoinGrpRows* buildGrp = taosArrayGet(cache->grps, cache->grpIdx);
if (rowsLeft >= GRP_REMAIN_ROWS(buildGrp)) {
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp));
rowsLeft -= GRP_REMAIN_ROWS(buildGrp);
buildGrp->readIdx = buildGrp->beginIdx;
continue;
}
int32_t buildEndIdx = buildGrp->endIdx;
buildGrp->endIdx = buildGrp->readIdx + rowsLeft - 1;
mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, buildGrp);
buildGrp->readIdx += rowsLeft;
buildGrp->endIdx = buildEndIdx;
rowsLeft = 0;
break;
}
probeGrp->endIdx = probeEndIdx;
if (cache->grpIdx >= buildGrpNum) {
cache->grpIdx = 0;
++probeGrp->readIdx;
}
if (rowsLeft <= 0) {
break;
}
}
probeGrp->endIdx = probeEndIdx;
pCtx->grpRemains = probeGrp->readIdx <= probeGrp->endIdx;
return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS;
}
int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) {
if (pCtx->cache.rowNum >= pCtx->jLimit || pCtx->pJoin->build->dsFetchDone) {
return TSDB_CODE_SUCCESS;
}
MJ_ERR_RET(mAsofGreaterTrimCacheBlk(pCtx));
SMJoinTableCtx* build = pCtx->pJoin->build;
SMJoinWinCache* pCache = &pCtx->cache;
int32_t grpNum = taosArrayGetSize(pCache->grps);
@ -2289,7 +2268,7 @@ int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) {
mAsofGreaterUpdateBuildGrpEndIdx(pCtx);
return mAsofGreaterDumpGrpCache(pCtx);
return mWinJoinDumpGrpCache(pCtx);
}
int32_t mAsofGreaterSkipEqRows(SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk) {
@ -2342,6 +2321,10 @@ int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) {
ASSERT(pCtx->cache.rowNum == 0);
ASSERT(taosArrayGetSize(pCtx->cache.grps) == 0);
if (pTable->dsFetchDone) {
return TSDB_CODE_SUCCESS;
}
pTable->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pTable->downStreamIdx);
qDebug("%s merge join %s table got block to skip eq ts, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
@ -2374,7 +2357,7 @@ int32_t mAsofGreaterProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bo
pCtx->lastEqGrp = true;
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp));
MJ_ERR_RET(mJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp));
return mAsofGreaterUpdateDumpEqRows(pCtx, timestamp, lastBuildGrp);
}
@ -2483,7 +2466,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) {
blockDataCleanup(pCtx->finBlk);
if (pCtx->grpRemains) {
MJ_ERR_JRET(mAsofGreaterDumpGrpCache(pCtx));
MJ_ERR_JRET(mWinJoinDumpGrpCache(pCtx));
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
@ -2560,12 +2543,40 @@ _return:
return pCtx->finBlk;
}
static int32_t mWinJoinCloneCacheBlk(SMJoinWindowCtx* pCtx) {
SMJoinWinCache* pCache = &pCtx->cache;
int32_t grpNum = taosArrayGetSize(pCache->grps);
if (grpNum <= 0) {
return TSDB_CODE_SUCCESS;
}
SMJoinGrpRows* pGrp = (SMJoinGrpRows*)taosArrayGetLast(pCache->grps);
if (!pGrp->clonedBlk) {
if (0 == pGrp->beginIdx) {
pGrp->blk = createOneDataBlock(pGrp->blk, true);
} else {
pGrp->blk = blockDataExtractBlock(pGrp->blk, pGrp->beginIdx, pGrp->blk->info.rows - pGrp->beginIdx);
pGrp->endIdx -= pGrp->beginIdx;
pGrp->beginIdx = 0;
pGrp->readIdx = 0;
}
pGrp->clonedBlk = true;
}
return TSDB_CODE_SUCCESS;
}
static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
bool buildGot = false;
do {
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
if (NULL == pJoin->build->blk) {
mWinJoinCloneCacheBlk(pCtx);
}
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
}
@ -2586,6 +2597,8 @@ static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin
break;
} while (true);
pCtx->probeGrp.blk = pJoin->probe->blk;
return true;
}
@ -2606,7 +2619,9 @@ int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SM
pGrp->endIdx = pGrp->beginIdx;
build->blk = NULL;
return TSDB_CODE_SUCCESS;
pCache->rowNum = 1;
} else {
pCache->rowNum = 0;
}
*winEnd = true;
@ -2614,6 +2629,8 @@ int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SM
}
}
pCache->rowNum = 0;
*winEnd = false;
return TSDB_CODE_SUCCESS;
}
@ -2622,35 +2639,51 @@ int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SM
int32_t mWinJoinAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
SSDataBlock* pBlk = build->blk;
SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCol->srcSlot);
SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
if (*((int64_t*)pCol->pData + build->blkRowIdx) > pCtx->winEndTs) {
*winEnd = true;
return TSDB_CODE_SUCCESS;
}
if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) <= pCtx->winEndTs) {
SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = pBlk->info.rows - 1;
pCache->rowNum += (pGrp->endIdx - pGrp->beginIdx + 1);
if (pCache->rowNum >= pCtx->jLimit) {
pGrp->endIdx = pBlk->info.rows - 1 + pCtx->jLimit - pCache->rowNum;
pCache->rowNum = pCtx->jLimit;
build->blk = NULL;
*winEnd = true;
return TSDB_CODE_SUCCESS;
}
build->blk = NULL;
*winEnd = false;
return TSDB_CODE_SUCCESS;
}
SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) {
for (; build->blkRowIdx < pBlk->info.rows && pCache->rowNum < pCtx->jLimit; build->blkRowIdx++) {
if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) {
pCache->rowNum++;
continue;
}
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = build->blkRowIdx - 1;
build->blk = NULL;
*winEnd = true;
return TSDB_CODE_SUCCESS;
break;
}
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
pGrp->readIdx = pGrp->beginIdx;
pGrp->endIdx = build->blkRowIdx - 1;
build->blk = NULL;
*winEnd = true;
return TSDB_CODE_SUCCESS;
}
@ -2667,19 +2700,24 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) {
taosArrayPopFrontBatch(pCache->grps, 1);
grpNum--;
i--;
pCache->rowNum -= (pGrp->blk->info.rows - pGrp->beginIdx);
continue;
}
int32_t startIdx = pGrp->beginIdx;
for (; pGrp->beginIdx < pGrp->blk->info.rows; pGrp->beginIdx++) {
if (*((int64_t*)pCol->pData + pGrp->beginIdx) < pCtx->winBeginTs) {
continue;
}
if (*((int64_t*)pCol->pData + pGrp->beginIdx) <= pCtx->winEndTs) {
pGrp->readIdx = pGrp->beginIdx;
pCache->rowNum -= (pGrp->beginIdx - startIdx);
return TSDB_CODE_SUCCESS;
}
pGrp->endIdx = pGrp->beginIdx;
pCache->rowNum = 0;
TSWAP(pCache->grps, pCache->grpsQueue);
return TSDB_CODE_SUCCESS;
}
@ -2687,6 +2725,7 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) {
if (NULL != pCache->grpsQueue) {
pCache->grps = pCache->grpsQueue;
pCache->rowNum = 1;
pCache->grpsQueue = NULL;
continue;
}
@ -2703,6 +2742,10 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) {
}
}
if (build->dsFetchDone) {
goto _return;
}
do {
build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pCtx->pJoin->build->downStreamIdx);
qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
@ -2720,6 +2763,8 @@ int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) {
}
} while (true);
_return:
return TSDB_CODE_SUCCESS;
}
@ -2734,16 +2779,30 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) {
SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps);
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCol->srcSlot);
if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) <= pCtx->winEndTs) {
pGrp->endIdx = pGrp->blk->info.rows - 1;
pCache->rowNum += pGrp->blk->info.rows - pGrp->endIdx - 1;
if (pCache->rowNum >= pCtx->jLimit) {
pGrp->endIdx = pGrp->blk->info.rows - 1 + pCtx->jLimit - pCache->rowNum;
pCache->rowNum = pCtx->jLimit;
return TSDB_CODE_SUCCESS;
} else {
pGrp->endIdx = pGrp->blk->info.rows - 1;
}
} else {
for (; pGrp->endIdx < pGrp->blk->info.rows; pGrp->endIdx++) {
int32_t startIdx = pGrp->endIdx;
for (; pGrp->endIdx < pGrp->blk->info.rows && pCache->rowNum < pCtx->jLimit; pGrp->endIdx++) {
if (*((int64_t*)pCol->pData + pGrp->endIdx) <= pCtx->winEndTs) {
pCache->rowNum++;
continue;
}
ASSERT(pGrp->endIdx > startIdx);
pGrp->endIdx--;
return TSDB_CODE_SUCCESS;
break;
}
return TSDB_CODE_SUCCESS;
}
SMJoinTableCtx* build = pCtx->pJoin->build;
@ -2755,7 +2814,13 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) {
}
}
if (build->dsFetchDone) {
goto _return;
}
do {
MJ_ERR_RET(mWinJoinCloneCacheBlk(pCtx));
build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pCtx->pJoin->build->downStreamIdx);
qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
@ -2772,6 +2837,8 @@ int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) {
}
} while (true);
_return:
return TSDB_CODE_SUCCESS;
}
@ -2784,17 +2851,14 @@ int32_t mWinJoinMoveFillWinCache(SMJoinWindowCtx* pCtx) {
}
int32_t mWinJoinDumpWinCache(SMJoinWindowCtx* pCtx) {
return TSDB_CODE_SUCCESS;
return pCtx->winProjection ? mWinJoinDumpGrpCache(pCtx) : TSDB_CODE_SUCCESS;
}
SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
SMJoinOperatorInfo* pJoin = pOperator->info;
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
int32_t code = TSDB_CODE_SUCCESS;
int64_t probeTs = 0;
int64_t buildTs = 0;
SColumnInfoData* pBuildCol = NULL;
SColumnInfoData* pProbeCol = NULL;
blockDataCleanup(pCtx->finBlk);
@ -2815,7 +2879,9 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
MJ_ERR_JRET(mAsofJoinBuildEqGrp(pJoin->probe, probeTs, NULL, &pCtx->probeGrp));
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
MJ_ERR_JRET(mJoinBuildEqGrp(pJoin->probe, probeTs, NULL, &pCtx->probeGrp));
if (probeTs != pCtx->lastTs) {
pCtx->lastTs = probeTs;
@ -2829,8 +2895,6 @@ SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
return pCtx->finBlk;
}
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
}
} while (true);
@ -2870,11 +2934,12 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
pCtx->pJoin = pJoin;
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
pCtx->lastTs = INT64_MIN;
switch (pJoinNode->subType) {
case JOIN_STYPE_ASOF:
pCtx->asofOpType = pJoinNode->asofOpType;
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
pCtx->eqRowsAcq = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType);
pCtx->lowerRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
pCtx->greaterRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType);
@ -2889,6 +2954,8 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
SWindowOffsetNode* pOffsetNode = (SWindowOffsetNode*)pJoinNode->pWindowOffset;
SValueNode* pWinBegin = (SValueNode*)pOffsetNode->pStartOffset;
SValueNode* pWinEnd = (SValueNode*)pOffsetNode->pEndOffset;
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : INT64_MAX;
pCtx->winProjection = true;
pCtx->winBeginOffset = pWinBegin->datum.i;
pCtx->winEndOffset = pWinEnd->datum.i;
pCtx->eqRowsAcq = (pCtx->winBeginOffset <= 0 && pCtx->winEndOffset >= 0);

View File

@ -28,6 +28,38 @@
#include "mergejoin.h"
int32_t mJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
pGrp->beginIdx = pTable->blkRowIdx;
pGrp->readIdx = pTable->blkRowIdx;
pTable->blkRowIdx++;
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
if (timestamp != *(int64_t*)pEndVal) {
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
if (timestamp == *(int64_t*)pNextVal) {
continue;
}
pGrp->endIdx = pTable->blkRowIdx - 1;
return TSDB_CODE_SUCCESS;
}
}
pGrp->endIdx = pTable->blk->info.rows - 1;
pTable->blkRowIdx = pTable->blk->info.rows;
if (wholeBlk) {
*wholeBlk = true;
}
return TSDB_CODE_SUCCESS;
}
void mJoinTrimKeepOneRow(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList) {
// int32_t totalRows = pBlock->info.rows;
int32_t bmLen = BitmapLen(totalRows);
@ -1013,7 +1045,7 @@ int32_t mJoinRetrieveEqGrpRows(SOperatorInfo* pOperator, SMJoinTableCtx* pTable,
mJoinBuildEqGroups(pTable, timestamp, &wholeBlk, true);
while (wholeBlk) {
while (wholeBlk && !pTable->dsFetchDone) {
pTable->blk = getNextBlockFromDownstreamRemain(pOperator, pTable->downStreamIdx);
qDebug("%s merge join %s table got block for same ts, rows:%" PRId64, GET_TASKID(pOperator->pTaskInfo), MJOIN_TBTYPE(pTable->type), pTable->blk ? pTable->blk->info.rows : 0);
@ -1374,7 +1406,7 @@ int32_t mJoinSetImplFp(SMJoinOperatorInfo* pJoin) {
pJoin->joinFp = mAntiJoinDo;
break;
case JOIN_STYPE_WIN:
//pJoin->joinFp = mWinJoinDo;
pJoin->joinFp = mWinJoinDo;
break;
default:
break;

View File

@ -3092,7 +3092,7 @@ TEST(leftAntiJoin, fullCondTest) {
#endif
#if 1
#if 1
#if 0
TEST(leftAsofJoin, noCondGreaterThanTest) {
SJoinTestParam param;
char* caseName = "leftAsofJoin:noCondGreaterThanTest";
@ -3120,7 +3120,7 @@ TEST(leftAsofJoin, noCondGreaterThanTest) {
}
#endif
#if 1
#if 0
TEST(leftAsofJoin, noCondGreaterEqTest) {
SJoinTestParam param;
char* caseName = "leftAsofJoin:noCondGreaterEqTest";
@ -3148,7 +3148,7 @@ TEST(leftAsofJoin, noCondGreaterEqTest) {
}
#endif
#if 1
#if 0
TEST(leftAsofJoin, noCondEqTest) {
SJoinTestParam param;
char* caseName = "leftAsofJoin:noCondEqTest";

View File

@ -468,6 +468,7 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
COPY_SCALAR_FIELD(joinAlgo);
CLONE_NODE_FIELD(pWindowOffset);
CLONE_NODE_FIELD(pJLimit);
CLONE_NODE_FIELD(winPrimEqCond);
CLONE_NODE_FIELD(pPrimKeyEqCond);
CLONE_NODE_FIELD(pColEqCond);
CLONE_NODE_FIELD(pColOnCond);

View File

@ -1850,6 +1850,41 @@ static int32_t msgToCaseWhenNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { WINDOW_OFFSET_CODE_START_OFFSET = 1, WINDOW_OFFSET_CODE_END_OFFSET };
static int32_t windowOffsetNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SWindowOffsetNode* pNode = (const SWindowOffsetNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, WINDOW_OFFSET_CODE_START_OFFSET, nodeToMsg, pNode->pStartOffset);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, WINDOW_OFFSET_CODE_END_OFFSET, nodeToMsg, pNode->pEndOffset);
}
return code;
}
static int32_t msgToWindowOffsetNode(STlvDecoder* pDecoder, void* pObj) {
SWindowOffsetNode* pNode = (SWindowOffsetNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case WINDOW_OFFSET_CODE_START_OFFSET:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pStartOffset);
break;
case WINDOW_OFFSET_CODE_END_OFFSET:
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pEndOffset);
break;
default:
break;
}
}
return code;
}
enum {
PHY_NODE_CODE_OUTPUT_DESC = 1,
PHY_NODE_CODE_CONDITIONS,
@ -4214,6 +4249,9 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
case QUERY_NODE_CASE_WHEN:
code = caseWhenNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_WINDOW_OFFSET:
code = windowOffsetNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
code = physiTagScanNodeToMsg(pObj, pEncoder);
break;
@ -4368,6 +4406,9 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
case QUERY_NODE_CASE_WHEN:
code = msgToCaseWhenNode(pDecoder, pObj);
break;
case QUERY_NODE_WINDOW_OFFSET:
code = msgToWindowOffsetNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
code = msgToPhysiTagScanNode(pDecoder, pObj);
break;

View File

@ -801,6 +801,9 @@ void nodesDestroyNode(SNode* pNode) {
break;
case QUERY_NODE_JOIN_TABLE: {
SJoinTableNode* pJoin = (SJoinTableNode*)pNode;
nodesDestroyNode(pJoin->pWindowOffset);
nodesDestroyNode(pJoin->pJLimit);
nodesDestroyNode(pJoin->winPrimCond);
nodesDestroyNode(pJoin->pLeft);
nodesDestroyNode(pJoin->pRight);
nodesDestroyNode(pJoin->pOnCond);
@ -1260,6 +1263,7 @@ void nodesDestroyNode(SNode* pNode) {
destroyLogicNode((SLogicNode*)pLogicNode);
nodesDestroyNode(pLogicNode->pWindowOffset);
nodesDestroyNode(pLogicNode->pJLimit);
nodesDestroyNode(pLogicNode->winPrimEqCond);
nodesDestroyNode(pLogicNode->pPrimKeyEqCond);
nodesDestroyNode(pLogicNode->pColEqCond);
nodesDestroyNode(pLogicNode->pColOnCond);

View File

@ -3158,6 +3158,55 @@ static int32_t replaceTbName(STranslateContext* pCxt, SSelectStmt* pSelect) {
return pRewriteCxt.errCode;
}
static int32_t addPrimEqCond(SNode** pCond, SRealTableNode* leftTable, SRealTableNode* rightTable) {
struct STableMeta* pLMeta = leftTable->pMeta;
struct STableMeta* pRMeta = rightTable->pMeta;
*pCond = nodesMakeNode(QUERY_NODE_OPERATOR);
if (NULL == *pCond) {
return TSDB_CODE_OUT_OF_MEMORY;
}
SOperatorNode* pOp = (SOperatorNode*)*pCond;
pOp->node.resType.type = TSDB_DATA_TYPE_BOOL;
pOp->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
pOp->opType = OP_TYPE_EQUAL;
SColumnNode* pLeft = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pLeft) {
nodesDestroyNode(*pCond);
return TSDB_CODE_OUT_OF_MEMORY;
}
pLeft->node.resType.type = pLMeta->schema[0].type;
pLeft->node.resType.bytes = pLMeta->schema[0].bytes;
pLeft->tableId = pLMeta->uid;
pLeft->colId = pLMeta->schema[0].colId;
pLeft->colType = COLUMN_TYPE_COLUMN;
strcpy(pLeft->tableName, leftTable->table.tableName);
strcpy(pLeft->tableAlias, leftTable->table.tableAlias);
strcpy(pLeft->colName, pLMeta->schema[0].name);
pOp->pLeft = (SNode*)pLeft;
SColumnNode* pRight = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pRight) {
nodesDestroyNode(*pCond);
return TSDB_CODE_OUT_OF_MEMORY;
}
pRight->node.resType.type = pRMeta->schema[0].type;
pRight->node.resType.bytes = pRMeta->schema[0].bytes;
pRight->tableId = pRMeta->uid;
pRight->colId = pRMeta->schema[0].colId;
pRight->colType = COLUMN_TYPE_COLUMN;
strcpy(pRight->tableName, rightTable->table.tableName);
strcpy(pRight->tableAlias, rightTable->table.tableAlias);
strcpy(pRight->colName, pRMeta->schema[0].name);
pOp->pRight = (SNode*)pRight;
return TSDB_CODE_SUCCESS;
}
static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTable) {
if ((QUERY_NODE_TEMP_TABLE == nodeType(pJoinTable->pLeft) &&
!isGlobalTimeLineQuery(((STempTableNode*)pJoinTable->pLeft)->pSubquery)) ||
@ -3166,6 +3215,28 @@ static int32_t checkJoinTable(STranslateContext* pCxt, SJoinTableNode* pJoinTabl
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN,
"Join requires valid time series input");
}
if (JOIN_STYPE_WIN == pJoinTable->subType) {
if (QUERY_NODE_REAL_TABLE != nodeType(pJoinTable->pLeft) || QUERY_NODE_REAL_TABLE != nodeType(pJoinTable->pRight)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN,
"Only support WINDOW join between tables");
}
SRealTableNode* pLeft = (SRealTableNode*)pJoinTable->pLeft;
if (TSDB_SUPER_TABLE != pLeft->pMeta->tableType && TSDB_CHILD_TABLE != pLeft->pMeta->tableType && TSDB_NORMAL_TABLE != pLeft->pMeta->tableType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN,
"Unsupported WINDOW join table type");
}
SRealTableNode* pRight = (SRealTableNode*)pJoinTable->pRight;
if (TSDB_SUPER_TABLE != pRight->pMeta->tableType && TSDB_CHILD_TABLE != pRight->pMeta->tableType && TSDB_NORMAL_TABLE != pRight->pMeta->tableType) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_SUPPORT_JOIN,
"Unsupported WINDOW join table type");
}
return addPrimEqCond(&pJoinTable->winPrimCond, pLeft, pRight);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -277,21 +277,24 @@ static EScanType getScanType(SLogicPlanContext* pCxt, SNodeList* pScanPseudoCols
return SCAN_TYPE_TABLE;
}
static SNode* createFirstCol(uint64_t tableId, const SSchema* pSchema) {
static SNode* createFirstCol(SRealTableNode* pTable) {
SColumnNode* pCol = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pCol) {
return NULL;
}
SSchema* pSchema = pTable->pMeta->schema;
pCol->node.resType.type = pSchema->type;
pCol->node.resType.bytes = pSchema->bytes;
pCol->tableId = tableId;
pCol->tableId = pTable->pMeta->uid;
pCol->colId = pSchema->colId;
pCol->colType = COLUMN_TYPE_COLUMN;
strcpy(pCol->tableAlias, pTable->table.tableAlias);
strcpy(pCol->tableName, pTable->table.tableName);
strcpy(pCol->colName, pSchema->name);
return (SNode*)pCol;
}
static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) {
static int32_t addPrimaryKeyCol(SRealTableNode* pTable, SNodeList** pCols) {
bool found = false;
SNode* pCol = NULL;
FOREACH(pCol, *pCols) {
@ -302,23 +305,23 @@ static int32_t addPrimaryKeyCol(uint64_t tableId, const SSchema* pSchema, SNodeL
}
if (!found) {
return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema));
return nodesListMakeStrictAppend(pCols, createFirstCol(pTable));
}
return TSDB_CODE_SUCCESS;
}
static int32_t addSystableFirstCol(uint64_t tableId, const SSchema* pSchema, SNodeList** pCols) {
static int32_t addSystableFirstCol(SRealTableNode* pTable, SNodeList** pCols) {
if (LIST_LENGTH(*pCols) > 0) {
return TSDB_CODE_SUCCESS;
}
return nodesListMakeStrictAppend(pCols, createFirstCol(tableId, pSchema));
return nodesListMakeStrictAppend(pCols, createFirstCol(pTable));
}
static int32_t addDefaultScanCol(const STableMeta* pMeta, SNodeList** pCols) {
if (TSDB_SYSTEM_TABLE == pMeta->tableType) {
return addSystableFirstCol(pMeta->uid, pMeta->schema, pCols);
static int32_t addDefaultScanCol(SRealTableNode* pTable, SNodeList** pCols) {
if (TSDB_SYSTEM_TABLE == pTable->pMeta->tableType) {
return addSystableFirstCol(pTable, pCols);
}
return addPrimaryKeyCol(pMeta->uid, pMeta->schema, pCols);
return addPrimaryKeyCol(pTable, pCols);
}
static int32_t makeScanLogicNode(SLogicPlanContext* pCxt, SRealTableNode* pRealTable, bool hasRepeatScanFuncs,
@ -458,7 +461,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
}
if (TSDB_CODE_SUCCESS == code && needScanDefaultCol(pScan->scanType)) {
code = addDefaultScanCol(pRealTable->pMeta, &pScan->pScanCols);
code = addDefaultScanCol(pRealTable, &pScan->pScanCols);
}
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags && NULL == pSelect->pPartitionByList) {
@ -545,6 +548,7 @@ static int32_t createJoinLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
pJoin->isLowLevelJoin = pJoinTable->isLowLevelJoin;
pJoin->pWindowOffset = nodesCloneNode(pJoinTable->pWindowOffset);
pJoin->pJLimit = nodesCloneNode(pJoinTable->pJLimit);
pJoin->winPrimEqCond = nodesCloneNode(pJoinTable->winPrimCond);
pJoin->node.pChildren = nodesMakeList();
if (NULL == pJoin->node.pChildren) {
code = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -1196,7 +1196,13 @@ static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin
if (errCond) {
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND);
}
if (IS_WINDOW_JOIN(pJoin->subType)) {
return TSDB_CODE_SUCCESS;
}
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_EXPECTED_TS_EQUAL);
} else if (IS_WINDOW_JOIN(pJoin->subType)) {
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_JOIN_COND);
}
return TSDB_CODE_SUCCESS;

View File

@ -914,6 +914,16 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
}
}
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->winPrimEqCond) {
SNode* pPrimKeyCond = NULL;
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->winPrimEqCond,
&pPrimKeyCond);
if (TSDB_CODE_SUCCESS == code) {
code = setColEqCond(pPrimKeyCond, pJoin->subType, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoin);
}
nodesDestroyNode(pPrimKeyCond);
}
if (TSDB_CODE_SUCCESS == code) {
code = setListSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->node.pTargets,
&pJoin->pTargets);

View File

@ -67,6 +67,7 @@ run tsim/join/left_anti_join.sim
run tsim/join/right_anti_join.sim
run tsim/join/left_asof_join.sim
run tsim/join/right_asof_join.sim
run tsim/join/left_win_join.sim
print ================== restart server to commit data into disk
system sh/exec.sh -n dnode1 -s stop -x SIGINT
@ -83,5 +84,6 @@ run tsim/join/left_anti_join.sim
run tsim/join/right_anti_join.sim
run tsim/join/left_asof_join.sim
run tsim/join/right_asof_join.sim
run tsim/join/left_win_join.sim
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -0,0 +1,168 @@
sql connect
sql use test0;
sql_error select a.col1, b.col1 from sta a left window join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' window_offset(-1s, 1s) order by a.col1, b.col1;
sql_error select a.col1, b.col1 from sta a left window join sta b on a.ts = b.ts order by a.col1, b.col1;
sql_error select a.col1, b.col1 from sta a left window join sta b on a.ts = b.ts window_offset(-1s, 1s) order by a.col1, b.col1;
sql select a.col1, b.col1 from sta a left window join sta b window_offset(-1s, 1s) order by a.col1, b.col1;
if $rows != 28 then
return -1
endi
sql select a.col1, b.col1 from sta a left window join sta b window_offset(-1s, 1s) jlimit 2 order by a.col1, b.col1;
if $rows != 16 then
return -1
endi
sql select a.col1, b.col1 from sta a left window join sta b window_offset(1s, 1s) order by a.col1, b.col1;
if $rows != 9 then
return -1
endi
sql select a.col1, b.col1 from sta a left window join sta b window_offset(-1s, 1s) where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1;
if $rows != 6 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data01 != 1 then
return -1
endi
if $data10 != 1 then
return -1
endi
if $data11 != 2 then
return -1
endi
if $data20 != 2 then
return -1
endi
if $data21 != 1 then
return -1
endi
if $data30 != 2 then
return -1
endi
if $data31 != 2 then
return -1
endi
if $data40 != 3 then
return -1
endi
if $data41 != 1 then
return -1
endi
if $data50 != 3 then
return -1
endi
if $data51 != 2 then
return -1
endi
sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-1s, 1s)
if $rows != 7 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data40 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data41 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data50 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data51 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data60 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data61 != @23-11-17 16:29:05.000@ then
return -1
endi
sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-1s, 1s) jlimit 1;
if $rows != 4 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data11 != @23-11-17 16:29:01.000@ then
return -1
endi
if $data20 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data31 != @23-11-17 16:29:03.000@ then
return -1
endi
sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-1a, 1a) jlimit 1;
if $rows != 4 then
return -1
endi
if $data00 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data01 != @23-11-17 16:29:00.000@ then
return -1
endi
if $data10 != @23-11-17 16:29:02.000@ then
return -1
endi
if $data11 != NULL then
return -1
endi
if $data20 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data21 != @23-11-17 16:29:03.000@ then
return -1
endi
if $data30 != @23-11-17 16:29:04.000@ then
return -1
endi
if $data31 != NULL then
return -1
endi
sql select a.ts, b.ts from tba1 a left window join tba2 b window_offset(-1h, 1h);
if $rows != 16 then
return -1
endi