enh: support asof join
This commit is contained in:
parent
fb20dd51de
commit
3fecb387cc
|
@ -318,6 +318,7 @@ typedef struct SMJoinOperatorInfo {
|
|||
(_cache)->rowNum += (_blk)->info.rows; \
|
||||
pGrp->blk = (_blk); \
|
||||
pGrp->beginIdx = 0; \
|
||||
pGrp->endIdx = (_blk)->info.rows - 1; \
|
||||
} while (0)
|
||||
|
||||
#define MJOIN_RESTORE_TB_BLK(_cache, _tb) \
|
||||
|
@ -332,6 +333,15 @@ typedef struct SMJoinOperatorInfo {
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define MJOIN_SAVE_TB_BLK(_cache, _tb) \
|
||||
do { \
|
||||
ASSERT(taosArrayGetSize((_cache)->grps) >= 1); \
|
||||
SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \
|
||||
ASSERT(pGrp->blk == (_tb)->blk); \
|
||||
pGrp->beginIdx = (_tb)->blkRowIdx; \
|
||||
pGrp->readIdx = pGrp->beginIdx; \
|
||||
} while (0)
|
||||
|
||||
#define MJOIN_POP_TB_BLK(_cache) \
|
||||
do { \
|
||||
SMJoinGrpRows* pGrp = taosArrayGet((_cache)->grps, 0); \
|
||||
|
@ -378,6 +388,9 @@ typedef struct SMJoinOperatorInfo {
|
|||
} while (0)
|
||||
|
||||
|
||||
|
||||
void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin);
|
||||
void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin);
|
||||
int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
|
||||
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode);
|
||||
SSDataBlock* mInnerJoinDo(struct SOperatorInfo* pOperator);
|
||||
|
@ -408,6 +421,7 @@ int32_t mJoinProcessGreaterGrp(SMJoinMergeCtx* pCtx, SMJoinTableCtx* pTb, SColum
|
|||
int32_t mJoinFilterAndKeepSingleRow(SSDataBlock* pBlock, SFilterInfo* pFilterInfo);
|
||||
int32_t mJoinFilterAndNoKeepRows(SSDataBlock* pBlock, SFilterInfo* pFilterInfo);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -1714,30 +1714,42 @@ _return:
|
|||
}
|
||||
|
||||
|
||||
static bool mAsofJoinRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
|
||||
bool probeGot = mJoinRetrieveImpl(pJoin, &pJoin->probe->blkRowIdx, &pJoin->probe->blk, pJoin->probe);
|
||||
bool buildGot = false;
|
||||
int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
|
||||
|
||||
do {
|
||||
if (probeGot || MJOIN_DS_NEED_INIT(pOperator, pJoin->build)) {
|
||||
buildGot = mJoinRetrieveImpl(pJoin, &pJoin->build->blkRowIdx, &pJoin->build->blk, pJoin->build);
|
||||
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (!probeGot) {
|
||||
mJoinSetDone(pOperator);
|
||||
return false;
|
||||
pGrp->beginIdx = pTable->blkRowIdx;
|
||||
pGrp->readIdx = pTable->blkRowIdx;
|
||||
|
||||
pTable->blkRowIdx++;
|
||||
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
|
||||
if (timestamp != *(int64_t*)pEndVal) {
|
||||
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
|
||||
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
|
||||
if (timestamp == *(int64_t*)pNextVal) {
|
||||
continue;
|
||||
}
|
||||
|
||||
break;
|
||||
} while (true);
|
||||
|
||||
pCtx->probeGrp.blk = pJoin->probe->blk;
|
||||
pCtx->buildGrp.blk = pJoin->build->blk;
|
||||
|
||||
return true;
|
||||
pGrp->endIdx = pTable->blkRowIdx - 1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t mAsofJoinCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) {
|
||||
pGrp->endIdx = pTable->blk->info.rows - 1;
|
||||
pTable->blkRowIdx = pTable->blk->info.rows;
|
||||
|
||||
if (wholeBlk) {
|
||||
*wholeBlk = true;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t mAsofLowerCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newRows, int32_t* evictRows) {
|
||||
if (pCache->outBlk->info.rows <= 0) {
|
||||
*evictRows = 0;
|
||||
return TMIN(jLimit, newRows);
|
||||
|
@ -1757,10 +1769,10 @@ int32_t mAsofJoinCalcRowNum(SMJoinWinCache* pCache, int64_t jLimit, int32_t newR
|
|||
return newRows;
|
||||
}
|
||||
|
||||
int32_t mAsofJoinAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool fromBegin) {
|
||||
int32_t mAsofLowerAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool fromBegin) {
|
||||
int32_t evictRows = 0;
|
||||
SMJoinWinCache* pCache = &pCtx->cache;
|
||||
int32_t rows = mAsofJoinCalcRowNum(pCache, pCtx->jLimit, pGrp->endIdx - pGrp->beginIdx + 1, &evictRows);
|
||||
int32_t rows = mAsofLowerCalcRowNum(pCache, pCtx->jLimit, pGrp->endIdx - pGrp->beginIdx + 1, &evictRows);
|
||||
if (evictRows > 0) {
|
||||
MJ_ERR_RET(blockDataTrimFirstRows(pCache->outBlk, evictRows));
|
||||
}
|
||||
|
@ -1769,40 +1781,8 @@ int32_t mAsofJoinAddRowsToCache(SMJoinWindowCtx* pCtx, SMJoinGrpRows* pGrp, bool
|
|||
return blockDataMergeNRows(pCache->outBlk, pGrp->blk, startIdx, rows);
|
||||
}
|
||||
|
||||
int32_t mAsofJoinBuildEqGrp(SMJoinTableCtx* pTable, int64_t timestamp, bool* wholeBlk, SMJoinGrpRows* pGrp) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pTable->blk->pDataBlock, pTable->primCol->srcSlot);
|
||||
|
||||
if (*(int64_t*)colDataGetNumData(pCol, pTable->blkRowIdx) != timestamp) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pGrp->beginIdx = pTable->blkRowIdx;
|
||||
pGrp->readIdx = pTable->blkRowIdx;
|
||||
|
||||
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
|
||||
if (timestamp != *(int64_t*)pEndVal) {
|
||||
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
|
||||
char* pNextVal = colDataGetNumData(pCol, pTable->blkRowIdx);
|
||||
if (timestamp == *(int64_t*)pNextVal) {
|
||||
continue;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
||||
pGrp->endIdx = pTable->blk->info.rows - 1;
|
||||
pTable->blkRowIdx = pTable->blk->info.rows;
|
||||
|
||||
if (wholeBlk) {
|
||||
*wholeBlk = true;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t mAsofJoinAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) {
|
||||
int32_t mAsofLowerAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowCtx* pCtx, SMJoinTableCtx* pTable, int64_t timestamp) {
|
||||
int64_t eqRowsNum = 0;
|
||||
SMJoinGrpRows grp = {.blk = pTable->blk};
|
||||
|
||||
|
@ -1833,7 +1813,7 @@ int32_t mAsofJoinAddEqRowsToCache(struct SOperatorInfo* pOperator, SMJoinWindowC
|
|||
}
|
||||
|
||||
if (eqRowsNum < pCtx->jLimit) {
|
||||
MJ_ERR_RET(mAsofJoinAddRowsToCache(pCtx, &grp, false));
|
||||
MJ_ERR_RET(mAsofLowerAddRowsToCache(pCtx, &grp, false));
|
||||
}
|
||||
|
||||
eqRowsNum += grp.endIdx - grp.beginIdx + 1;
|
||||
|
@ -1874,6 +1854,7 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
|
|||
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
|
||||
|
||||
pCtx->grpRemains = false;
|
||||
pCtx->cache.outRowIdx = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1886,6 +1867,7 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
|
|||
}
|
||||
|
||||
pCtx->grpRemains = false;
|
||||
pCtx->cache.outRowIdx = 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -1897,8 +1879,8 @@ int32_t mAsofLowerDumpGrpCache(SMJoinWindowCtx* pCtx) {
|
|||
buildGrp.readIdx = 0;
|
||||
MJ_ERR_RET(mJoinMergeGrpCart(pCtx->pJoin, pCtx->finBlk, true, probeGrp, &buildGrp));
|
||||
rowsLeft -= grpNum * pCtx->cache.outBlk->info.rows;
|
||||
pCtx->cache.outRowIdx = 0;
|
||||
probeGrp->readIdx += grpNum;
|
||||
probeGrp->endIdx = probeEndIdx;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -1935,7 +1917,7 @@ int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJ
|
|||
|
||||
if (!pCtx->eqPostDone && !lastBuildGrp) {
|
||||
pCtx->eqPostDone = true;
|
||||
return mAsofJoinAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs);
|
||||
return mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1943,7 +1925,7 @@ int32_t mAsofLowerDumpUpdateEqRows(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* pJ
|
|||
|
||||
if (!pCtx->eqPostDone && !lastBuildGrp) {
|
||||
pCtx->eqPostDone = true;
|
||||
MJ_ERR_RET(mAsofJoinAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs));
|
||||
MJ_ERR_RET(mAsofLowerAddEqRowsToCache(pJoin->pOperator, pCtx, pJoin->build, pCtx->lastTs));
|
||||
}
|
||||
|
||||
return mAsofLowerDumpGrpCache(pCtx);
|
||||
|
@ -1953,7 +1935,9 @@ int32_t mAsofLowerProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bool
|
|||
SMJoinOperatorInfo* pJoin = pCtx->pJoin;
|
||||
|
||||
pCtx->lastEqGrp = true;
|
||||
if (!lastBuildGrp) {
|
||||
pCtx->eqPostDone = false;
|
||||
}
|
||||
|
||||
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp));
|
||||
|
||||
|
@ -2002,9 +1986,7 @@ int32_t mAsofLowerProcessGreaterGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p
|
|||
pCtx->probeGrp.readIdx = pCtx->probeGrp.beginIdx;
|
||||
pCtx->probeGrp.endIdx = pCtx->probeGrp.beginIdx;
|
||||
|
||||
MJ_ERR_RET(mAsofJoinAddRowsToCache(pCtx, &pCtx->buildGrp, false));
|
||||
|
||||
return mAsofLowerDumpGrpCache(pCtx);
|
||||
return mAsofLowerAddRowsToCache(pCtx, &pCtx->buildGrp, false);
|
||||
}
|
||||
|
||||
int32_t mAsofLowerHandleGrpRemains(SMJoinWindowCtx* pCtx) {
|
||||
|
@ -2028,6 +2010,11 @@ static bool mAsofLowerRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* pJo
|
|||
break;
|
||||
} while (true);
|
||||
|
||||
if (buildGot && NULL == pCtx->cache.outBlk) {
|
||||
pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false);
|
||||
blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit);
|
||||
}
|
||||
|
||||
pCtx->probeGrp.blk = pJoin->probe->blk;
|
||||
pCtx->buildGrp.blk = pJoin->build->blk;
|
||||
|
||||
|
@ -2147,6 +2134,7 @@ int32_t mAsofGreaterDumpGrpCache(SMJoinWindowCtx* pCtx) {
|
|||
buildGrp->readIdx = buildGrp->beginIdx;
|
||||
}
|
||||
|
||||
cache->grpIdx = 0;
|
||||
pCtx->grpRemains = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -2203,18 +2191,26 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) {
|
|||
int32_t grpNum = taosArrayGetSize(pCache->grps);
|
||||
ASSERT(grpNum >= 1 && grpNum <= 2);
|
||||
|
||||
SSDataBlock* pBlk = NULL;
|
||||
SMJoinGrpRows* pGrp = taosArrayGet(pCache->grps, grpNum - 1);
|
||||
if (pGrp->blk != pCache->outBlk) {
|
||||
pBlk = pGrp->blk;
|
||||
int32_t beginIdx = (1 == grpNum) ? build->blkRowIdx : 0;
|
||||
MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pGrp->blk, beginIdx, pGrp->blk->info.rows - beginIdx));
|
||||
if (1 == grpNum) {
|
||||
pGrp->blk = pCache->outBlk;
|
||||
pGrp->beginIdx = 0;
|
||||
pGrp->readIdx = 0;
|
||||
//pGrp->endIdx = pGrp->blk->info.rows - 1;
|
||||
} else {
|
||||
taosArrayPop(pCache->grps);
|
||||
pGrp = taosArrayGet(pCache->grps, 0);
|
||||
ASSERT(pGrp->blk == pCache->outBlk);
|
||||
//pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
|
||||
}
|
||||
|
||||
//ASSERT((pGrp->endIdx - pGrp->beginIdx + 1) == pCtx->cache.rowNum);
|
||||
}
|
||||
|
||||
do {
|
||||
if (NULL != pBlk) {
|
||||
MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, pBlk, build->blkRowIdx, pBlk->info.rows - build->blkRowIdx));
|
||||
}
|
||||
|
||||
build->blk = getNextBlockFromDownstreamRemain(pCtx->pJoin->pOperator, build->downStreamIdx);
|
||||
qDebug("%s merge join %s table got block to fill grp, rows:%" PRId64, GET_TASKID(pCtx->pJoin->pOperator->pTaskInfo), MJOIN_TBTYPE(build->type), build->blk ? build->blk->info.rows : 0);
|
||||
|
||||
|
@ -2225,8 +2221,14 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
|
||||
if ((pCache->rowNum + build->blk->info.rows) >= pCtx->jLimit) {
|
||||
MJOIN_PUSH_BLK_TO_CACHE(pCache, build->blk);
|
||||
pBlk = build->blk;
|
||||
break;
|
||||
}
|
||||
|
||||
MJ_ERR_RET(blockDataMergeNRows(pCache->outBlk, build->blk, 0, build->blk->info.rows));
|
||||
|
||||
//pGrp->endIdx = pGrp->blk->info.rows - pGrp->beginIdx;
|
||||
} while (pCache->rowNum < pCtx->jLimit);
|
||||
|
||||
MJOIN_RESTORE_TB_BLK(pCache, build);
|
||||
|
@ -2234,11 +2236,36 @@ int32_t mAsofGreaterChkFillGrpCache(SMJoinWindowCtx* pCtx) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void mAsofGreaterUpdateBuildGrpEndIdx(SMJoinWindowCtx* pCtx) {
|
||||
int32_t grpNum = taosArrayGetSize(pCtx->cache.grps);
|
||||
if (grpNum <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
SMJoinGrpRows* pGrp = taosArrayGet(pCtx->cache.grps, 0);
|
||||
if (1 == grpNum) {
|
||||
pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows - pGrp->beginIdx, pCtx->jLimit) - 1;
|
||||
return;
|
||||
}
|
||||
|
||||
ASSERT(pCtx->jLimit > (pGrp->blk->info.rows - pGrp->beginIdx));
|
||||
pGrp->endIdx = pGrp->blk->info.rows - 1;
|
||||
|
||||
int64_t remainRows = pCtx->jLimit - (pGrp->endIdx - pGrp->beginIdx + 1);
|
||||
|
||||
pGrp = taosArrayGet(pCtx->cache.grps, 1);
|
||||
pGrp->endIdx = pGrp->beginIdx + TMIN(pGrp->blk->info.rows, remainRows) - 1;
|
||||
}
|
||||
|
||||
int32_t mAsofGreaterFillDumpGrpCache(SMJoinWindowCtx* pCtx, bool lastBuildGrp) {
|
||||
if (!lastBuildGrp) {
|
||||
MJ_ERR_RET(mAsofGreaterChkFillGrpCache(pCtx));
|
||||
}
|
||||
|
||||
MJOIN_SAVE_TB_BLK(&pCtx->cache, pCtx->pJoin->build);
|
||||
|
||||
mAsofGreaterUpdateBuildGrpEndIdx(pCtx);
|
||||
|
||||
return mAsofGreaterDumpGrpCache(pCtx);
|
||||
}
|
||||
|
||||
|
@ -2250,6 +2277,7 @@ int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool*
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pTable->blkRowIdx++;
|
||||
char* pEndVal = colDataGetNumData(pCol, pTable->blk->info.rows - 1);
|
||||
if (timestamp != *(int64_t*)pEndVal) {
|
||||
for (; pTable->blkRowIdx < pTable->blk->info.rows; ++pTable->blkRowIdx) {
|
||||
|
@ -2258,6 +2286,7 @@ int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool*
|
|||
continue;
|
||||
}
|
||||
|
||||
*wholeBlk = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
|
@ -2269,7 +2298,6 @@ int32_t mAsofGreaterSkipEqRows(SMJoinTableCtx* pTable, int64_t timestamp, bool*
|
|||
|
||||
int32_t mAsofGreaterSkipAllEqRows(SMJoinWindowCtx* pCtx, int64_t timestamp) {
|
||||
SMJoinWinCache* cache = &pCtx->cache;
|
||||
int32_t grpNum = taosArrayGetSize(cache->grps);
|
||||
SMJoinTableCtx* pTable = pCtx->pJoin->build;
|
||||
bool wholeBlk = false;
|
||||
|
||||
|
@ -2314,7 +2342,6 @@ int32_t mAsofGreaterProcessEqualGrp(SMJoinWindowCtx* pCtx, int64_t timestamp, bo
|
|||
SMJoinOperatorInfo* pJoin = pCtx->pJoin;
|
||||
|
||||
pCtx->lastEqGrp = true;
|
||||
pCtx->cache.grpIdx = 0;
|
||||
|
||||
MJ_ERR_RET(mAsofJoinBuildEqGrp(pJoin->probe, timestamp, NULL, &pCtx->probeGrp));
|
||||
|
||||
|
@ -2338,8 +2365,6 @@ int32_t mAsofGreaterProcessLowerGrp(SMJoinWindowCtx* pCtx, SMJoinOperatorInfo* p
|
|||
break;
|
||||
}
|
||||
|
||||
pCtx->cache.grpIdx = 0;
|
||||
|
||||
return mAsofGreaterFillDumpGrpCache(pCtx, false);
|
||||
}
|
||||
|
||||
|
@ -2382,7 +2407,7 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
|
|||
return false;
|
||||
}
|
||||
|
||||
if (buildGot && pCtx->asofGreaterRow) {
|
||||
if (buildGot) {
|
||||
SColumnInfoData* pProbeCol = taosArrayGet(pJoin->probe->blk->pDataBlock, pJoin->probe->primCol->srcSlot);
|
||||
SColumnInfoData* pBuildCol = taosArrayGet(pJoin->build->blk->pDataBlock, pJoin->build->primCol->srcSlot);
|
||||
if (*((int64_t*)pProbeCol->pData + pJoin->probe->blkRowIdx) > *((int64_t*)pBuildCol->pData + pJoin->build->blk->info.rows - 1)) {
|
||||
|
@ -2395,22 +2420,21 @@ static bool mAsofGreaterRetrieve(SOperatorInfo* pOperator, SMJoinOperatorInfo* p
|
|||
} while (true);
|
||||
|
||||
if (buildGot) {
|
||||
if (NULL == pCtx->cache.outBlk) {
|
||||
pCtx->cache.outBlk = createOneDataBlock(pJoin->build->blk, false);
|
||||
blockDataEnsureCapacity(pCtx->cache.outBlk, pCtx->jLimit);
|
||||
}
|
||||
|
||||
MJOIN_PUSH_BLK_TO_CACHE(&pCtx->cache, pJoin->build->blk);
|
||||
MJOIN_RESTORE_TB_BLK(&pCtx->cache, pJoin->build);
|
||||
}
|
||||
|
||||
pCtx->probeGrp.blk = pJoin->probe->blk;
|
||||
pCtx->buildGrp.blk = pJoin->build->blk;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
int32_t mAsofGreaterHandleGrpRemains(SMJoinWindowCtx* pCtx) {
|
||||
return mAsofGreaterDumpGrpCache(pCtx);
|
||||
}
|
||||
|
||||
|
||||
SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) {
|
||||
SMJoinOperatorInfo* pJoin = pOperator->info;
|
||||
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
|
||||
|
@ -2423,7 +2447,7 @@ SSDataBlock* mAsofGreaterJoinDo(struct SOperatorInfo* pOperator) {
|
|||
blockDataCleanup(pCtx->finBlk);
|
||||
|
||||
if (pCtx->grpRemains) {
|
||||
MJ_ERR_JRET(mAsofGreaterHandleGrpRemains(pCtx));
|
||||
MJ_ERR_JRET(mAsofGreaterDumpGrpCache(pCtx));
|
||||
if (pCtx->finBlk->info.rows >= pCtx->blkThreshold) {
|
||||
return pCtx->finBlk;
|
||||
}
|
||||
|
@ -2501,33 +2525,36 @@ _return:
|
|||
}
|
||||
|
||||
|
||||
int32_t mJoinInitWinCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
|
||||
int32_t mJoinInitWindowCache(SMJoinWinCache* pCache, SMJoinOperatorInfo* pJoin, SMJoinWindowCtx* pCtx) {
|
||||
pCache->pageLimit = MJOIN_BLK_SIZE_LIMIT;
|
||||
|
||||
pCache->colNum = pJoin->build->finNum;
|
||||
pCache->outBlk = createOneDataBlock(pCtx->finBlk, false);
|
||||
if (NULL == pCache->outBlk) {
|
||||
|
||||
pCache->grps = taosArrayInit(2, sizeof(SMJoinGrpRows));
|
||||
if (NULL == pCache->grps) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pCache->outBlk->info.capacity = pCtx->jLimit;
|
||||
|
||||
SMJoinTableCtx* build = pJoin->build;
|
||||
for (int32_t i = 0; i < pCache->colNum; ++i) {
|
||||
SColumnInfoData* pCol = taosArrayGet(pCache->outBlk->pDataBlock, build->finCols[i].dstSlot);
|
||||
doEnsureCapacity(pCol, NULL, pCtx->jLimit, false);
|
||||
}
|
||||
//taosArrayReserve(pTable->eqGrps, 1);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void mJoinDestroyWindowCtx(SMJoinOperatorInfo* pJoin) {
|
||||
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
|
||||
|
||||
pCtx->finBlk = blockDataDestroy(pCtx->finBlk);
|
||||
pCtx->cache.outBlk = blockDataDestroy(pCtx->cache.outBlk);
|
||||
|
||||
taosArrayDestroy(pCtx->cache.grps);
|
||||
}
|
||||
|
||||
int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
|
||||
SMJoinWindowCtx* pCtx = &pJoin->ctx.windowCtx;
|
||||
|
||||
pCtx->pJoin = pJoin;
|
||||
pCtx->asofOpType = pJoinNode->asofOpType;
|
||||
pCtx->asofEqRow = ASOF_EQ_ROW_INCLUDED(pCtx->asofOpType);
|
||||
pCtx->asofLowerRow = ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType);
|
||||
pCtx->asofGreaterRow = ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
|
||||
pCtx->asofLowerRow = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType);
|
||||
pCtx->asofGreaterRow = (JOIN_TYPE_RIGHT != pJoin->joinType) ? ASOF_GREATER_ROW_INCLUDED(pCtx->asofOpType) : ASOF_LOWER_ROW_INCLUDED(pCtx->asofOpType);
|
||||
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
|
||||
|
||||
if (pCtx->asofLowerRow) {
|
||||
|
@ -2545,18 +2572,26 @@ int32_t mJoinInitWindowCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* p
|
|||
|
||||
pCtx->blkThreshold = pCtx->finBlk->info.capacity * 0.9;
|
||||
|
||||
MJ_ERR_RET(mJoinInitWinCache(&pCtx->cache, pJoin, pCtx));
|
||||
MJ_ERR_RET(mJoinInitWindowCache(&pCtx->cache, pJoin, pCtx));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void mJoinDestroyMergeCtx(SMJoinOperatorInfo* pJoin) {
|
||||
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
|
||||
|
||||
pCtx->finBlk = blockDataDestroy(pCtx->finBlk);
|
||||
pCtx->midBlk = blockDataDestroy(pCtx->midBlk);
|
||||
}
|
||||
|
||||
|
||||
int32_t mJoinInitMergeCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode* pJoinNode) {
|
||||
SMJoinMergeCtx* pCtx = &pJoin->ctx.mergeCtx;
|
||||
|
||||
pCtx->pJoin = pJoin;
|
||||
pCtx->lastEqTs = INT64_MIN;
|
||||
pCtx->hashCan = pJoin->probe->keyNum > 0;
|
||||
if (JOIN_STYPE_ASOF == pJoin->subType) {
|
||||
if (JOIN_STYPE_ASOF == pJoinNode->subType) {
|
||||
pCtx->jLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
|
||||
pJoin->subType = JOIN_STYPE_OUTER;
|
||||
} else {
|
||||
|
|
|
@ -744,7 +744,7 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
|
|||
memcpy(&pTable->inputStat, pStat, sizeof(*pStat));
|
||||
|
||||
pTable->eqGrps = taosArrayInit(8, sizeof(SMJoinGrpRows));
|
||||
taosArrayReserve(pTable->eqGrps, 1);
|
||||
//taosArrayReserve(pTable->eqGrps, 1);
|
||||
|
||||
if (E_JOIN_TB_BUILD == pTable->type) {
|
||||
pTable->createdBlks = taosArrayInit(8, POINTER_BYTES);
|
||||
|
@ -765,7 +765,7 @@ static int32_t mJoinInitTableInfo(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysi
|
|||
pTable->noKeepEqGrpRows = (JOIN_STYPE_ANTI == pJoin->subType && NULL == pJoin->pFPreFilter);
|
||||
pTable->multiEqGrpRows = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pFPreFilter);
|
||||
pTable->multiRowsGrp = !((JOIN_STYPE_SEMI == pJoin->subType || JOIN_STYPE_ANTI == pJoin->subType) && NULL == pJoin->pPreFilter);
|
||||
if (JOIN_STYPE_ASOF == pJoin->subType) {
|
||||
if (JOIN_STYPE_ASOF == pJoinNode->subType) {
|
||||
pTable->eqRowLimit = pJoinNode->pJLimit ? ((SLimitNode*)pJoinNode->pJLimit)->limit : 1;
|
||||
}
|
||||
} else {
|
||||
|
@ -819,6 +819,14 @@ static int32_t mJoinInitCtx(SMJoinOperatorInfo* pJoin, SSortMergeJoinPhysiNode*
|
|||
return mJoinInitMergeCtx(pJoin, pJoinNode);
|
||||
}
|
||||
|
||||
static void mJoinDestroyCtx(SMJoinOperatorInfo* pJoin) {
|
||||
if (JOIN_STYPE_ASOF == pJoin->subType || JOIN_STYPE_WIN == pJoin->subType) {
|
||||
return mJoinDestroyWindowCtx(pJoin);
|
||||
}
|
||||
|
||||
return mJoinDestroyMergeCtx(pJoin);
|
||||
}
|
||||
|
||||
void mJoinSetDone(SOperatorInfo* pOperator) {
|
||||
setOperatorCompleted(pOperator);
|
||||
if (pOperator->pDownstreamGetParams) {
|
||||
|
@ -1290,8 +1298,8 @@ void destroyMergeJoinTableCtx(SMJoinTableCtx* pTable) {
|
|||
|
||||
void destroyMergeJoinOperator(void* param) {
|
||||
SMJoinOperatorInfo* pJoin = (SMJoinOperatorInfo*)param;
|
||||
pJoin->ctx.mergeCtx.finBlk = blockDataDestroy(pJoin->ctx.mergeCtx.finBlk);
|
||||
pJoin->ctx.mergeCtx.midBlk = blockDataDestroy(pJoin->ctx.mergeCtx.midBlk);
|
||||
|
||||
mJoinDestroyCtx(pJoin);
|
||||
|
||||
if (pJoin->pFPreFilter != NULL) {
|
||||
filterFreeInfo(pJoin->pFPreFilter);
|
||||
|
|
|
@ -77,6 +77,7 @@ enum {
|
|||
#define RIGHT_TABLE_COLS 0x2
|
||||
#define ALL_TABLE_COLS (LEFT_TABLE_COLS | RIGHT_TABLE_COLS)
|
||||
|
||||
#define JT_MAX_JLIMIT 3
|
||||
#define JT_KEY_SOLT_ID (MAX_SLOT_NUM - 1)
|
||||
int32_t jtInputColType[MAX_SLOT_NUM] = {TSDB_DATA_TYPE_TIMESTAMP, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_INT, TSDB_DATA_TYPE_BIGINT};
|
||||
|
||||
|
@ -112,6 +113,7 @@ typedef struct {
|
|||
int32_t colCond;
|
||||
int32_t joinType;
|
||||
int32_t subType;
|
||||
int64_t jLimit;
|
||||
|
||||
int32_t leftTotalRows;
|
||||
int32_t rightTotalRows;
|
||||
|
@ -175,6 +177,8 @@ typedef struct {
|
|||
typedef struct {
|
||||
EJoinType joinType;
|
||||
EJoinSubType subType;
|
||||
int32_t asofOp;
|
||||
int64_t jLimit;
|
||||
int32_t cond;
|
||||
bool filter;
|
||||
bool asc;
|
||||
|
@ -773,26 +777,33 @@ void createBlockDescNode(SDataBlockDescNode** ppNode) {
|
|||
*ppNode = pDesc;
|
||||
}
|
||||
|
||||
SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(EJoinType type, EJoinSubType sub, int32_t cond, bool filter, bool asc) {
|
||||
SSortMergeJoinPhysiNode* createDummySortMergeJoinPhysiNode(SJoinTestParam* param) {
|
||||
SSortMergeJoinPhysiNode* p = (SSortMergeJoinPhysiNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN);
|
||||
p->joinType = type;
|
||||
p->subType = sub;
|
||||
p->joinType = param->joinType;
|
||||
p->subType = param->subType;
|
||||
p->asofOpType = param->asofOp;
|
||||
if (param->jLimit > 1 || taosRand() % 2) {
|
||||
SLimitNode* limitNode = (SLimitNode*)nodesMakeNode(QUERY_NODE_LIMIT);
|
||||
limitNode->limit = param->jLimit;
|
||||
p->pJLimit = (SNode*)limitNode;
|
||||
}
|
||||
p->leftPrimSlotId = 0;
|
||||
p->rightPrimSlotId = 0;
|
||||
p->node.inputTsOrder = asc ? ORDER_ASC : ORDER_DESC;
|
||||
p->node.inputTsOrder = param->asc ? ORDER_ASC : ORDER_DESC;
|
||||
|
||||
jtCtx.joinType = type;
|
||||
jtCtx.subType = sub;
|
||||
jtCtx.asc = asc;
|
||||
jtCtx.leftColOnly = (JOIN_TYPE_LEFT == type && JOIN_STYPE_SEMI == sub);
|
||||
jtCtx.rightColOnly = (JOIN_TYPE_RIGHT == type && JOIN_STYPE_SEMI == sub);
|
||||
jtCtx.joinType = param->joinType;
|
||||
jtCtx.subType = param->subType;
|
||||
jtCtx.asc = param->asc;
|
||||
jtCtx.jLimit = param->jLimit;
|
||||
jtCtx.leftColOnly = (JOIN_TYPE_LEFT == param->joinType && JOIN_STYPE_SEMI == param->subType);
|
||||
jtCtx.rightColOnly = (JOIN_TYPE_RIGHT == param->joinType && JOIN_STYPE_SEMI == param->subType);
|
||||
|
||||
createColCond(p, cond);
|
||||
createFilterStart(p, filter);
|
||||
createColCond(p, param->cond);
|
||||
createFilterStart(p, param->filter);
|
||||
createTargetSlotList(p);
|
||||
createColEqCondEnd(p);
|
||||
createColOnCondEnd(p);
|
||||
createFilterEnd(p, filter);
|
||||
createFilterEnd(p, param->filter);
|
||||
updateColRowInfo();
|
||||
createBlockDescNode(&p->node.pOutputDataBlockDesc);
|
||||
|
||||
|
@ -2197,7 +2208,7 @@ void handleTestDone() {
|
|||
void runSingleTest(char* caseName, SJoinTestParam* param) {
|
||||
bool contLoop = true;
|
||||
|
||||
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param->joinType, param->subType, param->cond, param->filter, param->asc);
|
||||
SSortMergeJoinPhysiNode* pNode = createDummySortMergeJoinPhysiNode(param);
|
||||
createDummyBlkList(200, 200, 200, 200, 20);
|
||||
|
||||
while (contLoop) {
|
||||
|
@ -2744,6 +2755,62 @@ TEST(leftAntiJoin, fullCondTest) {
|
|||
#endif
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
#if 1
|
||||
TEST(leftAsofJoin, noCondGreaterThanTest) {
|
||||
SJoinTestParam param;
|
||||
char* caseName = "leftAntiJoin:noCondGreaterThanTest";
|
||||
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
|
||||
|
||||
param.pTask = pTask;
|
||||
param.joinType = JOIN_TYPE_LEFT;
|
||||
param.subType = JOIN_STYPE_ASOF;
|
||||
param.cond = TEST_NO_COND;
|
||||
param.asofOp = OP_TYPE_GREATER_THAN;
|
||||
param.asc = true;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.jLimit = taosRand() % 2 ? (1 + (taosRand() % JT_MAX_JLIMIT)) : 1;
|
||||
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
||||
printStatInfo(caseName);
|
||||
taosMemoryFree(pTask);
|
||||
}
|
||||
#endif
|
||||
|
||||
#if 1
|
||||
TEST(leftAsofJoin, eqCondTest) {
|
||||
SJoinTestParam param;
|
||||
char* caseName = "leftAntiJoin:eqCondTest";
|
||||
SExecTaskInfo* pTask = createDummyTaskInfo(caseName);
|
||||
|
||||
param.pTask = pTask;
|
||||
param.joinType = JOIN_TYPE_LEFT;
|
||||
param.subType = JOIN_STYPE_ANTI;
|
||||
param.cond = TEST_EQ_COND;
|
||||
param.asc = true;
|
||||
|
||||
for (jtCtx.loopIdx = 0; jtCtx.loopIdx < JT_MAX_LOOP; ++jtCtx.loopIdx) {
|
||||
param.filter = false;
|
||||
runSingleTest(caseName, ¶m);
|
||||
|
||||
param.filter = true;
|
||||
runSingleTest(caseName, ¶m);
|
||||
}
|
||||
|
||||
printStatInfo(caseName);
|
||||
taosMemoryFree(pTask);
|
||||
handleCaseEnd();
|
||||
}
|
||||
#endif
|
||||
#endif
|
||||
|
||||
|
||||
|
||||
int main(int argc, char** argv) {
|
||||
|
|
|
@ -2119,6 +2119,9 @@ static const char* jkJoinPhysiPlanJoinType = "JoinType";
|
|||
static const char* jkJoinPhysiPlanSubType = "SubType";
|
||||
static const char* jkJoinPhysiPlanWinOffset = "WindowOffset";
|
||||
static const char* jkJoinPhysiPlanJoinLimit = "JoinLimit";
|
||||
static const char* jkJoinPhysiPlanAsofOp = "AsofOp";
|
||||
static const char* jkJoinPhysiPlanLeftPrimExpr = "LeftPrimExpr";
|
||||
static const char* jkJoinPhysiPlanRightPrimExpr = "RightPrimExpr";
|
||||
static const char* jkJoinPhysiPlanLeftPrimSlotId = "LeftPrimSlotId";
|
||||
static const char* jkJoinPhysiPlanRightPrimSlotId = "RightPrimSlotId";
|
||||
static const char* jkJoinPhysiPlanLeftEqCols = "LeftEqCols";
|
||||
|
@ -2151,6 +2154,15 @@ static int32_t physiMergeJoinNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkJoinPhysiPlanJoinLimit, nodeToJson, pNode->pJLimit);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanAsofOp, pNode->asofOpType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkJoinPhysiPlanLeftPrimExpr, nodeToJson, pNode->leftPrimExpr);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddObject(pJson, jkJoinPhysiPlanRightPrimExpr, nodeToJson, pNode->rightPrimExpr);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId);
|
||||
}
|
||||
|
@ -2204,6 +2216,15 @@ static int32_t jsonToPhysiMergeJoinNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkJoinPhysiPlanJoinLimit, &pNode->pJLimit);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetNumberValue(pJson, jkJoinPhysiPlanAsofOp, pNode->asofOpType, code);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkJoinPhysiPlanLeftPrimExpr, &pNode->leftPrimExpr);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeObject(pJson, jkJoinPhysiPlanRightPrimExpr, &pNode->rightPrimExpr);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonGetNumberValue(pJson, jkJoinPhysiPlanLeftPrimSlotId, pNode->leftPrimSlotId, code);
|
||||
}
|
||||
|
|
|
@ -2427,6 +2427,9 @@ enum {
|
|||
PHY_SORT_MERGE_JOIN_CODE_SUB_TYPE,
|
||||
PHY_SORT_MERGE_JOIN_CODE_WINDOW_OFFSET,
|
||||
PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT,
|
||||
PHY_SORT_MERGE_JOIN_CODE_ASOF_OP,
|
||||
PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_EXPR,
|
||||
PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_EXPR,
|
||||
PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID,
|
||||
PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_SLOT_ID,
|
||||
PHY_SORT_MERGE_JOIN_CODE_LEFT_EQ_COLS,
|
||||
|
@ -2456,6 +2459,15 @@ static int32_t physiMergeJoinNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT, nodeToMsg, pNode->pJLimit);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_ASOF_OP, pNode->asofOpType);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_EXPR, nodeToMsg, pNode->leftPrimExpr);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_EXPR, nodeToMsg, pNode->rightPrimExpr);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI32(pEncoder, PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID, pNode->leftPrimSlotId);
|
||||
}
|
||||
|
@ -2515,6 +2527,15 @@ static int32_t msgToPhysiMergeJoinNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_SORT_MERGE_JOIN_CODE_JOIN_LIMIT:
|
||||
code = msgToNodeFromTlv(pTlv, (void**)&pNode->pJLimit);
|
||||
break;
|
||||
case PHY_SORT_MERGE_JOIN_CODE_ASOF_OP:
|
||||
code = tlvDecodeI32(pTlv, &pNode->asofOpType);
|
||||
break;
|
||||
case PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_EXPR:
|
||||
code = msgToNodeFromTlv(pTlv, (void**)&pNode->leftPrimExpr);
|
||||
break;
|
||||
case PHY_SORT_MERGE_JOIN_CODE_RIGHT_PRIM_EXPR:
|
||||
code = msgToNodeFromTlv(pTlv, (void**)&pNode->rightPrimExpr);
|
||||
break;
|
||||
case PHY_SORT_MERGE_JOIN_CODE_LEFT_PRIM_SLOT_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->leftPrimSlotId);
|
||||
break;
|
||||
|
|
|
@ -727,7 +727,7 @@ static int32_t pdcPushDownCondToChild(SOptimizeContext* pCxt, SLogicNode* pChild
|
|||
}
|
||||
|
||||
static bool pdcJoinIsPrim(SNode* pNode, SSHashObj* pTables) {
|
||||
if (QUERY_NODE_COLUMN != nodeType(pNode) || QUERY_NODE_FUNCTION != nodeType(pNode)) {
|
||||
if (QUERY_NODE_COLUMN != nodeType(pNode) && QUERY_NODE_FUNCTION != nodeType(pNode)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ if $data00 != 1 then
|
|||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
print $data01
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 1 then
|
||||
|
|
|
@ -65,6 +65,7 @@ run tsim/join/left_semi_join.sim
|
|||
run tsim/join/right_semi_join.sim
|
||||
run tsim/join/left_anti_join.sim
|
||||
run tsim/join/right_anti_join.sim
|
||||
run tsim/join/left_asof_join.sim
|
||||
|
||||
print ================== restart server to commit data into disk
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
@ -79,5 +80,6 @@ run tsim/join/left_semi_join.sim
|
|||
run tsim/join/right_semi_join.sim
|
||||
run tsim/join/left_anti_join.sim
|
||||
run tsim/join/right_anti_join.sim
|
||||
run tsim/join/left_asof_join.sim
|
||||
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
|
@ -0,0 +1,488 @@
|
|||
sql connect
|
||||
sql use test0;
|
||||
|
||||
sql_error select a.col1, b.col1 from sta a left asof join sta b on a.ts = b.ts and a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1;
|
||||
sql select a.col1, b.col1 from sta a left asof join sta b on a.ts = b.ts order by a.col1, b.col1;
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
sql select a.col1, b.col1 from sta a left asof join sta b on a.ts >= b.ts order by a.col1, b.col1;
|
||||
if $rows != 8 then
|
||||
return -1
|
||||
endi
|
||||
sql select a.col1, b.col1 from sta a left asof join sta b on a.ts = b.ts where a.ts < '2023-11-17 16:29:02' and b.ts < '2023-11-17 16:29:01' order by a.col1, b.col1;
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != 1 then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts = b.ts;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != NULL then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on b.ts = a.ts;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != NULL then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts >= b.ts ;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on b.ts <= a.ts ;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts ;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on b.ts < a.ts ;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts <= b.ts ;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:05.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on b.ts >= a.ts ;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:05.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts < b.ts ;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:05.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:05.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on b.ts > a.ts ;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:05.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:05.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba2 a left asof join tba1 b on a.ts >= b.ts;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:05.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba2 a left asof join tba1 b on a.ts > b.ts;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:05.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba2 a left asof join tba1 b on a.ts <= b.ts;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:05.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != NULL then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba2 a left asof join tba1 b on a.ts < b.ts;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:05.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != NULL then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select a.ts, b.ts from tba1 a left asof join tba2 b on a.ts > b.ts jlimit 2
|
||||
if $rows != 7 then
|
||||
return -1
|
||||
endi
|
||||
if $data00 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data01 != NULL then
|
||||
return -1
|
||||
endi
|
||||
if $data10 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data11 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data20 != @23-11-17 16:29:02.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data21 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data30 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data31 != @23-11-17 16:29:00.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data40 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data41 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data50 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data51 != @23-11-17 16:29:01.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data60 != @23-11-17 16:29:04.000@ then
|
||||
return -1
|
||||
endi
|
||||
if $data61 != @23-11-17 16:29:03.000@ then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
|
||||
|
Loading…
Reference in New Issue