enh: support window join
This commit is contained in:
parent
4a516556a9
commit
50c420e6b9
|
@ -374,6 +374,7 @@
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
#define TK_NK_SPACE 600
|
#define TK_NK_SPACE 600
|
||||||
#define TK_NK_COMMENT 601
|
#define TK_NK_COMMENT 601
|
||||||
#define TK_NK_ILLEGAL 602
|
#define TK_NK_ILLEGAL 602
|
||||||
|
|
|
@ -207,6 +207,7 @@ typedef struct SViewNode {
|
||||||
} SViewNode;
|
} SViewNode;
|
||||||
|
|
||||||
#define IS_INNER_NONE_JOIN(_type, _stype) ((_type) == JOIN_TYPE_INNER && (_stype) == JOIN_STYPE_NONE)
|
#define IS_INNER_NONE_JOIN(_type, _stype) ((_type) == JOIN_TYPE_INNER && (_stype) == JOIN_STYPE_NONE)
|
||||||
|
#define IS_WINDOW_JOIN(_stype) ((_stype) == JOIN_STYPE_WIN)
|
||||||
|
|
||||||
typedef enum EJoinType {
|
typedef enum EJoinType {
|
||||||
JOIN_TYPE_INNER = 0,
|
JOIN_TYPE_INNER = 0,
|
||||||
|
|
|
@ -204,6 +204,7 @@ typedef struct SMJoinWinCache {
|
||||||
int32_t rowNum;
|
int32_t rowNum;
|
||||||
int8_t grpIdx;
|
int8_t grpIdx;
|
||||||
SArray* grps;
|
SArray* grps;
|
||||||
|
SArray* grpsQueue;
|
||||||
SSDataBlock* outBlk;
|
SSDataBlock* outBlk;
|
||||||
} SMJoinWinCache;
|
} SMJoinWinCache;
|
||||||
|
|
||||||
|
@ -220,10 +221,14 @@ typedef struct SMJoinWindowCtx {
|
||||||
// KEEP IT FIRST
|
// KEEP IT FIRST
|
||||||
|
|
||||||
int32_t asofOpType;
|
int32_t asofOpType;
|
||||||
|
int64_t winBeginOffset;
|
||||||
|
int64_t winEndOffset;
|
||||||
bool lowerRowsAcq;
|
bool lowerRowsAcq;
|
||||||
bool eqRowsAcq;
|
bool eqRowsAcq;
|
||||||
bool greaterRowsAcq;
|
bool greaterRowsAcq;
|
||||||
|
|
||||||
|
int64_t winBeginTs;
|
||||||
|
int64_t winEndTs;
|
||||||
bool eqPostDone;
|
bool eqPostDone;
|
||||||
int64_t lastTs;
|
int64_t lastTs;
|
||||||
bool rowRemains;
|
bool rowRemains;
|
||||||
|
@ -302,6 +307,7 @@ typedef struct SMJoinOperatorInfo {
|
||||||
#define ASOF_EQ_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_EQUAL == (_op))
|
#define ASOF_EQ_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_EQUAL == (_op))
|
||||||
#define ASOF_LOWER_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_GREATER_THAN == (_op))
|
#define ASOF_LOWER_ROW_INCLUDED(_op) (OP_TYPE_GREATER_EQUAL == (_op) || OP_TYPE_GREATER_THAN == (_op))
|
||||||
#define ASOF_GREATER_ROW_INCLUDED(_op) (OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_LOWER_THAN == (_op))
|
#define ASOF_GREATER_ROW_INCLUDED(_op) (OP_TYPE_LOWER_EQUAL == (_op) || OP_TYPE_LOWER_THAN == (_op))
|
||||||
|
#define WIN_ONLY_EQ_ROW_INCLUDED(_soff, _eoff) (0 == ((SValueNode*)(_soff))->datum.i && 0 == ((SValueNode*)(_eoff))->datum.i)
|
||||||
|
|
||||||
#define MJOIN_PUSH_BLK_TO_CACHE(_cache, _blk) \
|
#define MJOIN_PUSH_BLK_TO_CACHE(_cache, _blk) \
|
||||||
do { \
|
do { \
|
||||||
|
@ -391,6 +397,7 @@ SSDataBlock* mLeftJoinDo(struct SOperatorInfo* pOperator);
|
||||||
SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator);
|
SSDataBlock* mFullJoinDo(struct SOperatorInfo* pOperator);
|
||||||
SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator);
|
SSDataBlock* mSemiJoinDo(struct SOperatorInfo* pOperator);
|
||||||
SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator);
|
SSDataBlock* mAntiJoinDo(struct SOperatorInfo* pOperator);
|
||||||
|
SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator);
|
||||||
bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb);
|
bool mJoinRetrieveImpl(SMJoinOperatorInfo* pJoin, int32_t* pIdx, SSDataBlock** ppBlk, SMJoinTableCtx* pTb);
|
||||||
void mJoinSetDone(SOperatorInfo* pOperator);
|
void mJoinSetDone(SOperatorInfo* pOperator);
|
||||||
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen);
|
bool mJoinCopyKeyColsDataToBuf(SMJoinTableCtx* pTable, int32_t rowIdx, size_t *pBufLen);
|
||||||
|
|
|
@ -2560,6 +2560,289 @@ _return:
|
||||||
return pCtx->finBlk;
|
return pCtx->finBlk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool mWinJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
|
||||||
|
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
|
||||||
|
bool buildGot = false;
|
||||||
|
|
||||||
|
do {
|
||||||
|
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
|
||||||
|
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!probeGot) {
|
||||||
|
mJoinSetDone(pOperator);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (buildGot && !pCtx->lowerRowsAcq) {
|
||||||
|
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
|
||||||
|
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
|
||||||
|
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
|
||||||
|
pJoin->build->blkRowIdx = pJoin->build->blk->info.rows;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
} while (true);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mWinJoinAddWinBeginBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
|
||||||
|
SSDataBlock* pBlk = build->blk;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCol->srcSlot);
|
||||||
|
if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) >= pCtx->winBeginTs) {
|
||||||
|
for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) {
|
||||||
|
if (*((int64_t*)pCol->pData + build->blkRowIdx) < pCtx->winBeginTs) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) {
|
||||||
|
SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
|
||||||
|
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
|
||||||
|
|
||||||
|
pGrp->readIdx = pGrp->beginIdx;
|
||||||
|
pGrp->endIdx = pGrp->beginIdx;
|
||||||
|
|
||||||
|
build->blk = NULL;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
*winEnd = true;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*winEnd = false;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t mWinJoinAddWinEndBlk(SMJoinWindowCtx* pCtx, SMJoinWinCache* pCache, SMJoinTableCtx* build, bool* winEnd) {
|
||||||
|
SSDataBlock* pBlk = build->blk;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlk->pDataBlock, build->primCol->srcSlot);
|
||||||
|
if (*((int64_t*)pCol->pData + pBlk->info.rows - 1) <= pCtx->winEndTs) {
|
||||||
|
SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
|
||||||
|
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
|
||||||
|
|
||||||
|
pGrp->readIdx = pGrp->beginIdx;
|
||||||
|
pGrp->endIdx = pBlk->info.rows - 1;
|
||||||
|
|
||||||
|
build->blk = NULL;
|
||||||
|
*winEnd = false;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMJoinGrpRows grp = {.blk = pBlk, .beginIdx = build->blkRowIdx};
|
||||||
|
|
||||||
|
for (; build->blkRowIdx < pBlk->info.rows; build->blkRowIdx++) {
|
||||||
|
if (*((int64_t*)pCol->pData + build->blkRowIdx) <= pCtx->winEndTs) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMJoinGrpRows* pGrp = taosArrayPush(pCache->grps, &grp);
|
||||||
|
|
||||||
|
pGrp->readIdx = pGrp->beginIdx;
|
||||||
|
pGrp->endIdx = build->blkRowIdx - 1;
|
||||||
|
|
||||||
|
build->blk = NULL;
|
||||||
|
*winEnd = true;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t mWinJoinMoveWinBegin(SMJoinWindowCtx* pCtx) {
|
||||||
|
SMJoinWinCache* pCache = &pCtx->cache;
|
||||||
|
do {
|
||||||
|
int32_t grpNum = taosArrayGetSize(pCache->grps);
|
||||||
|
for (int32_t i = 0; i < grpNum; ++i) {
|
||||||
|
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, i);
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCol->srcSlot);
|
||||||
|
if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) < pCtx->winBeginTs) {
|
||||||
|
blockDataDestroy(pGrp->blk);
|
||||||
|
taosArrayPopFrontBatch(pCache->grps, 1);
|
||||||
|
grpNum--;
|
||||||
|
i--;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (; pGrp->beginIdx < pGrp->blk->info.rows; pGrp->beginIdx++) {
|
||||||
|
if (*((int64_t*)pCol->pData + pGrp->beginIdx) < pCtx->winBeginTs) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (*((int64_t*)pCol->pData + pGrp->beginIdx) <= pCtx->winEndTs) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
pGrp->endIdx = pGrp->beginIdx;
|
||||||
|
TSWAP(pCache->grps, pCache->grpsQueue);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL != pCache->grpsQueue) {
|
||||||
|
pCache->grps = pCache->grpsQueue;
|
||||||
|
pCache->grpsQueue = NULL;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
break;
|
||||||
|
} while (true);
|
||||||
|
|
||||||
|
SMJoinTableCtx* build = pCtx->pJoin->build;
|
||||||
|
bool winEnd = false;
|
||||||
|
if (NULL != build->blk) {
|
||||||
|
MJ_ERR_RET(mWinJoinAddWinBeginBlk(pCtx, &pCtx->cache, build, &winEnd));
|
||||||
|
if (winEnd || taosArrayGetSize(pCache->grps) > 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pCtx->pJoin->build->downStreamIdx);
|
||||||
|
qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
|
||||||
|
|
||||||
|
build->blkRowIdx = 0;
|
||||||
|
|
||||||
|
if (NULL == build->blk) {
|
||||||
|
build->dsFetchDone = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
MJ_ERR_RET(mWinJoinAddWinBeginBlk(pCtx, &pCtx->cache, build, &winEnd));
|
||||||
|
if (winEnd || taosArrayGetSize(pCache->grps) > 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
} while (true);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t mWinJoinMoveWinEnd(SMJoinWindowCtx* pCtx) {
|
||||||
|
SMJoinWinCache* pCache = &pCtx->cache;
|
||||||
|
int32_t grpNum = taosArrayGetSize(pCache->grps);
|
||||||
|
if (grpNum <= 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMJoinGrpRows* pGrp = taosArrayGetLast(pCache->grps);
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pGrp->blk->pDataBlock, pCtx->pJoin->build->primCol->srcSlot);
|
||||||
|
if (*((int64_t*)pCol->pData + pGrp->blk->info.rows - 1) <= pCtx->winEndTs) {
|
||||||
|
pGrp->endIdx = pGrp->blk->info.rows - 1;
|
||||||
|
} else {
|
||||||
|
for (; pGrp->endIdx < pGrp->blk->info.rows; pGrp->endIdx++) {
|
||||||
|
if (*((int64_t*)pCol->pData + pGrp->endIdx) <= pCtx->winEndTs) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pGrp->endIdx--;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SMJoinTableCtx* build = pCtx->pJoin->build;
|
||||||
|
bool winEnd = false;
|
||||||
|
if (NULL != build->blk) {
|
||||||
|
MJ_ERR_RET(mWinJoinAddWinEndBlk(pCtx, &pCtx->cache, build, &winEnd));
|
||||||
|
if (winEnd) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, pCtx->pJoin->build->downStreamIdx);
|
||||||
|
qDebug("%s merge join %s table got block to start win, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
|
||||||
|
|
||||||
|
build->blkRowIdx = 0;
|
||||||
|
|
||||||
|
if (NULL == build->blk) {
|
||||||
|
build->dsFetchDone = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
MJ_ERR_RET(mWinJoinAddWinEndBlk(pCtx, &pCtx->cache, build, &winEnd));
|
||||||
|
if (winEnd) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
} while (true);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t mWinJoinMoveFillWinCache(SMJoinWindowCtx* pCtx) {
|
||||||
|
MJ_ERR_RET(mWinJoinMoveWinBegin(pCtx));
|
||||||
|
MJ_ERR_RET(mWinJoinMoveWinEnd(pCtx));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t mWinJoinDumpWinCache(SMJoinWindowCtx* pCtx) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
SSDataBlock* mWinJoinDo(struct SOperatorInfo* pOperator) {
|
||||||
|
SMJoinOperatorInfo* pJoin = pOperator->info;
|
||||||
|
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int64_t probeTs = 0;
|
||||||
|
int64_t buildTs = 0;
|
||||||
|
SColumnInfoData* pBuildCol = NULL;
|
||||||
|
SColumnInfoData* pProbeCol = NULL;
|
||||||
|
|
||||||
|
blockDataCleanup(pCtx->finBlk);
|
||||||
|
|
||||||
|
if (pCtx->grpRemains) {
|
||||||
|
MJ_ERR_JRET(mWinJoinDumpWinCache(pCtx));
|
||||||
|
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
|
||||||
|
return pCtx->finBlk;
|
||||||
|
}
|
||||||
|
pCtx->grpRemains = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
do {
|
||||||
|
if (!mWinJoinRetrieve(pOperator, pJoin, pCtx)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
MJOIN_GET_TB_COL_TS(pProbeCol, probeTs, pJoin->probe);
|
||||||
|
|
||||||
|
while (!MJOIN_PROBE_TB_ROWS_DONE(pJoin->probe)) {
|
||||||
|
MJ_ERR_JRET(mAsofJoinBuildEqGrp(pJoin->probe, probeTs, NULL, &pCtx->probeGrp));
|
||||||
|
|
||||||
|
if (probeTs != pCtx->lastTs) {
|
||||||
|
pCtx->lastTs = probeTs;
|
||||||
|
pCtx->winBeginTs = probeTs + pCtx->winBeginOffset;
|
||||||
|
pCtx->winEndTs = probeTs + pCtx->winEndOffset;
|
||||||
|
MJ_ERR_JRET(mWinJoinMoveFillWinCache(pCtx));
|
||||||
|
}
|
||||||
|
|
||||||
|
MJ_ERR_JRET(mWinJoinDumpWinCache(pCtx));
|
||||||
|
|
||||||
|
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
|
||||||
|
return pCtx->finBlk;
|
||||||
|
}
|
||||||
|
|
||||||
|
MJOIN_GET_TB_CUR_TS(pProbeCol, probeTs, pJoin->probe);
|
||||||
|
}
|
||||||
|
} while (true);
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
if (code) {
|
||||||
|
pJoin->errCode = code;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pCtx->finBlk;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
|
int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
|
||||||
pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT;
|
pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT;
|
||||||
|
@ -2602,17 +2885,17 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
|
||||||
pJoin->joinFp = mAsofGreaterJoinDo;
|
pJoin->joinFp = mAsofGreaterJoinDo;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case JOIN_STYPE_WIN:
|
case JOIN_STYPE_WIN: {
|
||||||
pCtx->eqRowsAcq = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType);
|
SWindowOffsetNode* pOffsetNode = (SWindowOffsetNode*)pJoinNode->pWindowOffset;
|
||||||
pCtx->lowerRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
|
SValueNode* pWinBegin = (SValueNode*)pOffsetNode->pStartOffset;
|
||||||
pCtx->greaterRowsAcq = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType);
|
SValueNode* pWinEnd = (SValueNode*)pOffsetNode->pEndOffset;
|
||||||
|
pCtx->winBeginOffset = pWinBegin->datum.i;
|
||||||
if (pCtx->lowerRowsAcq) {
|
pCtx->winEndOffset = pWinEnd->datum.i;
|
||||||
pJoin->joinFp = mAsofLowerJoinDo;
|
pCtx->eqRowsAcq = (pCtx->winBeginOffset <= 0 && pCtx->winEndOffset >= 0);
|
||||||
} else if (pCtx->greaterRowsAcq) {
|
pCtx->lowerRowsAcq = pCtx->winBeginOffset < 0;
|
||||||
pJoin->joinFp = mAsofGreaterJoinDo;
|
pCtx->greaterRowsAcq = pCtx->winEndOffset > 0;
|
||||||
}
|
|
||||||
break;
|
break;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -2645,7 +2928,7 @@ int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJ
|
||||||
pCtx->pJoin = pJoin;
|
pCtx->pJoin = pJoin;
|
||||||
pCtx->lastEqTs = INT64_MIN;
|
pCtx->lastEqTs = INT64_MIN;
|
||||||
pCtx->hashCan = pJoin->probe->keyNum > 0;
|
pCtx->hashCan = pJoin->probe->keyNum > 0;
|
||||||
if (JOIN_STYPE_ASOF == pJoinNode->subType) {
|
if (JOIN_STYPE_ASOF == pJoinNode->subType || JOIN_STYPE_WIN == pJoinNode->subType) {
|
||||||
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
|
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
|
||||||
pJoin->subType = JOIN_STYPE_OUTER;
|
pJoin->subType = JOIN_STYPE_OUTER;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -812,7 +812,7 @@ static void mJoinSetBuildAndProbeTable(SMJoinOperatorInfo* pInfo, SSortMergeJoin
|
||||||
|
|
||||||
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
|
static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
|
||||||
if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType)))
|
if ((JOIN_STYPE_ASOF == pJoin->subType && (ASOF_LOWER_ROW_INCLUDED(pJoinNode->asofOpType) || ASOF_GREATER_ROW_INCLUDED(pJoinNode->asofOpType)))
|
||||||
|| JOIN_STYPE_WIN == pJoin->subType) {
|
|| (JOIN_STYPE_WIN == pJoin->subType && !WIN_ONLY_EQ_ROW_INCLUDED(((SWindowOffsetNode*)pJoinNode->pWindowOffset)->pStartOffset, ((SWindowOffsetNode*)pJoinNode->pWindowOffset)->pEndOffset))) {
|
||||||
return mJoinInitWindowCtx(pJoin, pJoinNode);
|
return mJoinInitWindowCtx(pJoin, pJoinNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ enum {
|
||||||
#define RIGHT_TABLE_COLS 0x2
|
#define RIGHT_TABLE_COLS 0x2
|
||||||
#define ALL_TABLE_COLS (LEFT_TABLE_COLS | RIGHT_TABLE_COLS)
|
#define ALL_TABLE_COLS (LEFT_TABLE_COLS | RIGHT_TABLE_COLS)
|
||||||
|
|
||||||
#define JT_MAX_JLIMIT 3
|
#define JT_MAX_JLIMIT 20
|
||||||
#define JT_KEY_SOLT_ID (MAX_SLOT_NUM - 1)
|
#define JT_KEY_SOLT_ID (MAX_SLOT_NUM - 1)
|
||||||
#define JT_PRIM_TS_SLOT_ID 0
|
#define JT_PRIM_TS_SLOT_ID 0
|
||||||
int32_t jtInputColType[MAX_SLOT_NUM] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_BIGINT};
|
int32_t jtInputColType[MAX_SLOT_NUM] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_BIGINT};
|
||||||
|
|
|
@ -1072,7 +1072,7 @@ parenthesized_joined_table(A) ::= NK_LP parenthesized_joined_table(B) NK_RP.
|
||||||
|
|
||||||
/************************************************ joined_table ********************************************************/
|
/************************************************ joined_table ********************************************************/
|
||||||
joined_table(A) ::=
|
joined_table(A) ::=
|
||||||
table_reference(B) join_type(C) join_subtype(D) JOIN table_reference(E) ON search_condition(F)
|
table_reference(B) join_type(C) join_subtype(D) JOIN table_reference(E) join_on_clause_opt(F)
|
||||||
window_offset_clause_opt(G) jlimit_clause_opt(H). {
|
window_offset_clause_opt(G) jlimit_clause_opt(H). {
|
||||||
A = createJoinTableNode(pCxt, C, D, B, E, F);
|
A = createJoinTableNode(pCxt, C, D, B, E, F);
|
||||||
A = addWindowOffsetClause(pCxt, A, G);
|
A = addWindowOffsetClause(pCxt, A, G);
|
||||||
|
@ -1096,6 +1096,9 @@ join_subtype(A) ::= ANTI.
|
||||||
join_subtype(A) ::= ASOF. { A = JOIN_STYPE_ASOF; }
|
join_subtype(A) ::= ASOF. { A = JOIN_STYPE_ASOF; }
|
||||||
join_subtype(A) ::= WINDOW. { A = JOIN_STYPE_WIN; }
|
join_subtype(A) ::= WINDOW. { A = JOIN_STYPE_WIN; }
|
||||||
|
|
||||||
|
join_on_clause_opt(A) ::= . { A = NULL; }
|
||||||
|
join_on_clause_opt(A) ::= ON search_condition(B). { A = B; }
|
||||||
|
|
||||||
window_offset_clause_opt(A) ::= . { A = NULL; }
|
window_offset_clause_opt(A) ::= . { A = NULL; }
|
||||||
window_offset_clause_opt(A) ::= WINDOW_OFFSET NK_LP window_offset_literal(B)
|
window_offset_clause_opt(A) ::= WINDOW_OFFSET NK_LP window_offset_literal(B)
|
||||||
NK_COMMA window_offset_literal(C) NK_RP. { A = createWindowOffsetNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); }
|
NK_COMMA window_offset_literal(C) NK_RP. { A = createWindowOffsetNode(pCxt, releaseRawExprNode(pCxt, B), releaseRawExprNode(pCxt, C)); }
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1181,6 +1181,10 @@ static int32_t pdcJoinAddWhereFilterColsToTarget(SOptimizeContext* pCxt, SJoinLo
|
||||||
|
|
||||||
static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
static int32_t pdcJoinCheckAllCond(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||||
if (NULL == pJoin->pFullOnCond) {
|
if (NULL == pJoin->pFullOnCond) {
|
||||||
|
if (IS_WINDOW_JOIN(pJoin->subType)) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
if ((!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) || NULL == pJoin->node.pConditions) {
|
if ((!IS_INNER_NONE_JOIN(pJoin->joinType, pJoin->subType)) || NULL == pJoin->node.pConditions) {
|
||||||
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN);
|
return generateUsageErrMsg(pCxt->pPlanCxt->pMsg, pCxt->pPlanCxt->msgLen, TSDB_CODE_PLAN_NOT_SUPPORT_CROSS_JOIN);
|
||||||
}
|
}
|
||||||
|
@ -1225,7 +1229,7 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->pFullOnCond) {
|
||||||
code = pdcJoinSplitCond(pJoin, &pJoin->pFullOnCond, NULL, &pLeftChildCond, &pRightChildCond, false);
|
code = pdcJoinSplitCond(pJoin, &pJoin->pFullOnCond, NULL, &pLeftChildCond, &pRightChildCond, false);
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pLeftChildCond) {
|
||||||
code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond);
|
code = pdcPushDownCondToChild(pCxt, (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0), &pLeftChildCond);
|
||||||
|
@ -1235,7 +1239,7 @@ static int32_t pdcDealJoin(SOptimizeContext* pCxt, SJoinLogicNode* pJoin) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoin->pFullOnCond) {
|
||||||
code = pdcJoinSplitPrimEqCond(pCxt, pJoin);
|
code = pdcJoinSplitPrimEqCond(pCxt, pJoin);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -906,7 +906,7 @@ static int32_t createMergeJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChi
|
||||||
code = getJoinDataBlockDescNode(pChildren, 1, &pRightDesc);
|
code = getJoinDataBlockDescNode(pChildren, 1, &pRightDesc);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pJoinLogicNode->pPrimKeyEqCond) {
|
||||||
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
|
code = setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pPrimKeyEqCond,
|
||||||
&pJoin->pPrimKeyCond);
|
&pJoin->pPrimKeyCond);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
Loading…
Reference in New Issue